警告
您正在阅读的 ROS 2 文档版本已达到 EOL(生命周期结束),不再受官方支持。如果您想了解最新信息,请访问 Jazzy.
编写动作服务器和客户端(Python)
目标 用 Python 实现动作服务器和客户端。
辅导水平: 中级
时间 15 分钟
背景介绍
在 ROS 2 中,操作是一种异步通信形式。 行动客户 将目标申请发送至 行动服务器. 行动服务器 将目标反馈和结果发送至 行动客户.
先决条件
您需要 动作教程界面
软件包和 Fibonacci.action
接口、 创建行动.
任务
1 编写行动服务器
让我们集中精力编写一个动作服务器,使用我们在 创建行动 教程。
到目前为止,您已经创建了软件包,并使用 玫瑰2 运行
来运行节点。不过,为了保持本教程的简洁性,我们将把动作服务器的范围限定在一个文件中。如果你想了解完整的动作教程包,请查看
行动教程.
在你的主目录下打开一个新文件,我们称之为 fibonacci_action_server.py
并添加以下代码:
1舶来品 rclpy
2从 rclpy.action 舶来品 行动服务器
3从 rclpy.node 舶来品 节点
4
5从 action_tutorials_interfaces.action 舶来品 斐波那契
6
7
8类 FibonacciActionServer(节点):
9
10 捍卫 启动(自我):
11 棒极了().启动('fibonacci_action_server';)
12 自我.行动服务器 = 行动服务器(
13 自我,
14 斐波那契,
15 '斐波那契';,
16 自我.执行回调)
17
18 捍卫 执行回调(自我, 目标句柄):
19 自我.get_logger().信息(执行目标......';)
20 结果 = 斐波那契.结果()
21 返回 结果
22
23
24捍卫 主要(参数=无):
25 rclpy.启动(参数=参数)
26
27 fibonacci_action_server = FibonacciActionServer()
28
29 rclpy.后旋(fibonacci_action_server)
30
31
32如果 姓名____ == '__main__';:
33 主要()
第 8 行定义了一个类 FibonacciActionServer
的子类 节点
.通过调用 节点
构造函数,命名我们的节点 fibonacci_action_server
:
棒极了().启动('fibonacci_action_server';)
在构造函数中,我们还实例化了一个新的动作服务器:
自我.行动服务器 = 行动服务器(
自我,
斐波那契,
'斐波那契';,
自我.执行回调)
行动服务器需要四个参数:
要添加动作客户端的 ROS 2 节点:
自我
.行动类型:
斐波那契
(在第 5 行导入)。行动名称:
斐波那契
.执行已接受目标的回调函数:
self.execute_callback
.该回调 必须 返回该操作类型的结果信息。
我们还定义了一个 执行回调
方法:
捍卫 执行回调(自我, 目标句柄):
自我.get_logger().信息(执行目标......';)
结果 = 斐波那契.结果()
返回 结果
一旦目标被接受,就会调用该方法来执行目标。
让我们试着运行我们的行动服务器:
python3 fibonacci_action_server.py
python3 fibonacci_action_server.py
蟒蛇 fibonacci_action_server.py
在另一个终端,我们可以使用命令行界面发送目标:
玫瑰2 行动 发送目标 斐波那契 action_tutorials_interfaces/action/Fibonacci "{order:5}";
在运行动作服务器的终端中,你应该会看到一条记录信息 "正在执行目标...",随后会出现目标状态未设置的警告。默认情况下,如果目标句柄状态未在执行回调中设置,则会假定目标状态为 流产 州。
我们可以使用以下方法 成功() 在目标句柄上,表示目标已成功实现:
1 捍卫 执行回调(自我, 目标句柄):
2 自我.get_logger().信息(执行目标......';)
3 目标句柄.继承()
4 结果 = 斐波那契.结果()
5 返回 结果
现在,如果重启行动服务器并发送另一个目标,就会看到目标已完成,状态为 成功
.
现在,让我们实际计算并返回所请求的斐波那契数列:
1 捍卫 执行回调(自我, 目标句柄):
2 自我.get_logger().信息(执行目标......';)
3
4 顺序 = [0, 1]
5
6 对于 i 于 范围(1, 目标句柄.要求.订单):
7 顺序.追加(顺序[i] + 顺序[i-1])
8
9 目标句柄.继承()
10
11 结果 = 斐波那契.结果()
12 结果.顺序 = 顺序
13 返回 结果
计算序列后,我们将其赋值给结果信息字段,然后再返回。
再次重启操作服务器并发送另一个目标。你应该会看到目标以正确的结果序列完成。
1.2 发布反馈意见
在目标执行过程中向动作客户端提供反馈是动作的一大亮点。我们可以通过调用目标句柄的 publish_feedback() 方法。
我们将更换 顺序
变量,而是使用反馈信息来存储序列。在 for 循环中每次更新反馈信息后,我们都会发布反馈信息并休眠,以达到戏剧效果:
1舶来品 时间
2
3舶来品 rclpy
4从 rclpy.action 舶来品 行动服务器
5从 rclpy.node 舶来品 节点
6
7从 action_tutorials_interfaces.action 舶来品 斐波那契
8
9
10类 FibonacciActionServer(节点):
11
12 捍卫 启动(自我):
13 棒极了().启动('fibonacci_action_server';)
14 自我.行动服务器 = 行动服务器(
15 自我,
16 斐波那契,
17 '斐波那契';,
18 自我.执行回调)
19
20 捍卫 执行回调(自我, 目标句柄):
21 自我.get_logger().信息(执行目标......';)
22
23 反馈信息 = 斐波那契.反馈意见()
24 反馈信息.部分序列 = [0, 1]
25
26 对于 i 于 范围(1, 目标句柄.要求.订单):
27 反馈信息.部分序列.追加(
28 反馈信息.部分序列[i] + 反馈信息.部分序列[i-1])
29 自我.get_logger().信息('反馈: {0}'.格式(反馈信息.部分序列))
30 目标句柄.发布反馈(反馈信息)
31 时间.睡眠(1)
32
33 目标句柄.继承()
34
35 结果 = 斐波那契.结果()
36 结果.顺序 = 反馈信息.部分序列
37 返回 结果
重启行动服务器后,我们可以通过使用命令行工具中的 --反馈
选择:
玫瑰2 行动 发送目标 --反馈 斐波那契 action_tutorials_interfaces/action/Fibonacci "{order:5}";
2 撰写行动客户
我们还将把操作客户端的范围设定为单个文件。打开一个新文件,我们称之为 fibonacci_action_client.py
并添加以下模板代码:
1舶来品 rclpy
2从 rclpy.action 舶来品 动作客户端
3从 rclpy.node 舶来品 节点
4
5从 action_tutorials_interfaces.action 舶来品 斐波那契
6
7
8类 FibonacciActionClient(节点):
9
10 捍卫 启动(自我):
11 棒极了().启动('fibonacci_action_client';)
12 自我.行动客户端 = 动作客户端(自我, 斐波那契, '斐波那契';)
13
14 捍卫 发送目标(自我, 订单):
15 goal_msg = 斐波那契.目标()
16 goal_msg.订单 = 订单
17
18 自我.行动客户端.等待服务器()
19
20 返回 自我.行动客户端.发送目标同步(goal_msg)
21
22
23捍卫 主要(参数=无):
24 rclpy.启动(参数=参数)
25
26 动作客户端 = FibonacciActionClient()
27
28 未来 = 动作客户端.发送目标(10)
29
30 rclpy.自旋直到未来完成(动作客户端, 未来)
31
32
33如果 姓名____ == '__main__';:
34 主要()
我们定义了一个 FibonacciActionClient
的子类 节点
.通过调用 节点
构造函数,命名我们的节点 斐波那契行动客户端
:
棒极了().启动('fibonacci_action_client';)
在类构造函数中,我们还将使用上一教程中的自定义动作定义创建一个动作客户端。 创建行动:
自我.行动客户端 = 动作客户端(自我, 斐波那契, '斐波那契';)
我们创建了一个 动作客户端
传递三个参数:
要添加动作客户端的 ROS 2 节点:
自我
行动类型:
斐波那契
行动名称:
斐波那契
我们的操作客户端将能与具有相同操作名称和类型的操作服务器进行通信。
我们还定义了一种方法 发送目标
在 FibonacciActionClient
类:
捍卫 发送目标(自我, 订单):
goal_msg = 斐波那契.目标()
goal_msg.订单 = 订单
自我.行动客户端.等待服务器()
返回 自我.行动客户端.发送目标同步(goal_msg)
该方法等待行动服务器可用,然后向服务器发送目标。它将返回一个我们可以稍后等待的 future。
在类定义之后,我们定义了一个函数 main()
初始化 ROS 2 并创建我们的 FibonacciActionClient
节点。然后,它会发送一个目标,并等待该目标完成。
最后,我们将 main()
在 Python 程序的入口点中。
让我们先运行之前构建的动作服务器来测试我们的动作客户端:
python3 fibonacci_action_server.py
python3 fibonacci_action_server.py
蟒蛇 fibonacci_action_server.py
在另一个终端,运行操作客户端:
python3 fibonacci_action_client.py
python3 fibonacci_action_client.py
蟒蛇 fibonacci_action_client.py
当行动服务器成功执行目标时,您应该会看到它打印的信息:
[信息] [fibonacci_action_server]: 执行 目标...
[信息] [fibonacci_action_server]: 反馈: 矩阵('i', [0, 1, 1])
[信息] [fibonacci_action_server]: 反馈: 矩阵('i', [0, 1, 1, 2])
[信息] [fibonacci_action_server]: 反馈: 矩阵('i', [0, 1, 1, 2, 3])
[信息] [fibonacci_action_server]: 反馈: 矩阵('i', [0, 1, 1, 2, 3, 5])
# 等等。
操作客户端应该会启动,然后很快结束。此时,我们就有了一个正常运行的操作客户端,但看不到任何结果,也得不到任何反馈。
2.1 获取结果
因此,我们可以发送目标,但如何知道目标何时完成?我们可以通过几个步骤获取结果信息。首先,我们需要为发送的目标获取一个目标句柄。然后,我们可以使用目标句柄来请求结果。
下面是这个示例的完整代码:
1舶来品 rclpy
2从 rclpy.action 舶来品 动作客户端
3从 rclpy.node 舶来品 节点
4
5从 action_tutorials_interfaces.action 舶来品 斐波那契
6
7
8类 FibonacciActionClient(节点):
9
10 捍卫 启动(自我):
11 棒极了().启动('fibonacci_action_client';)
12 自我.行动客户端 = 动作客户端(自我, 斐波那契, '斐波那契';)
13
14 捍卫 发送目标(自我, 订单):
15 goal_msg = 斐波那契.目标()
16 goal_msg.订单 = 订单
17
18 自我.行动客户端.等待服务器()
19
20 自我.发送未来目标 = 自我.行动客户端.发送目标同步(goal_msg)
21
22 自我.发送未来目标.添加捐献回调(自我.目标响应回调)
23
24 捍卫 目标响应回调(自我, 未来):
25 目标句柄 = 未来.结果()
26 如果 不 目标句柄.无争议:
27 自我.get_logger().信息('目标被拒 :(';)
28 返回
29
30 自我.get_logger().信息('接受目标:)';)
31
32 自我._get_result_future_未来 = 目标句柄.get_result_async()
33 自我._get_result_future_未来.添加捐献回调(自我.get_result_callback)
34
35 捍卫 get_result_callback(自我, 未来):
36 结果 = 未来.结果().结果
37 自我.get_logger().信息('结果: {0}'.格式(结果.顺序))
38 rclpy.关闭()
39
40
41捍卫 主要(参数=无):
42 rclpy.启动(参数=参数)
43
44 动作客户端 = FibonacciActionClient()
45
46 动作客户端.发送目标(10)
47
48 rclpy.后旋(动作客户端)
49
50
51如果 姓名____ == '__main__';:
52 主要()
"(《世界人权宣言》) ActionClient.send_goal_async() 方法会将一个 future 返回给目标句柄。首先,我们要注册一个未来完成时的回调:
自我.发送未来目标.添加捐献回调(自我.目标响应回调)
请注意,当行动服务器接受或拒绝目标请求时,未来就完成了。让我们看看 目标响应回调
更详细的信息。我们可以检查目标是否被拒绝,并提前返回,因为我们知道不会有结果:
捍卫 目标响应回调(自我, 未来):
目标句柄 = 未来.结果()
如果 不 目标句柄.无争议:
自我.get_logger().信息('目标被拒 :(';)
返回
自我.get_logger().信息('接受目标:)';)
现在我们已经有了目标句柄,可以用它来请求结果,方法是 get_result_async().与发送目标类似,当结果就绪时,我们将得到一个完成的未来。让我们像处理目标响应一样注册一个回调:
自我._get_result_future_未来 = 目标句柄.get_result_async()
自我._get_result_future_未来.添加捐献回调(自我.get_result_callback)
在回调中,我们会记录结果序列,并关闭 ROS 2,以便干净利落地退出:
捍卫 get_result_callback(自我, 未来):
结果 = 未来.结果().结果
自我.get_logger().信息('结果: {0}'.格式(结果.顺序))
rclpy.关闭()
在单独的终端中运行动作服务器,然后尝试运行我们的 Fibonacci 动作客户端!
python3 fibonacci_action_client.py
python3 fibonacci_action_client.py
蟒蛇 fibonacci_action_client.py
您应该能看到目标被接受和最终结果的记录信息。
2.2 获取反馈
我们的行动客户端可以发送目标。不错!但如果我们能从行动服务器上获得一些关于我们发送的目标的反馈,那就更好了。
下面是这个示例的完整代码:
1舶来品 rclpy
2从 rclpy.action 舶来品 动作客户端
3从 rclpy.node 舶来品 节点
4
5从 action_tutorials_interfaces.action 舶来品 斐波那契
6
7
8类 FibonacciActionClient(节点):
9
10 捍卫 启动(自我):
11 棒极了().启动('fibonacci_action_client';)
12 自我.行动客户端 = 动作客户端(自我, 斐波那契, '斐波那契';)
13
14 捍卫 发送目标(自我, 订单):
15 goal_msg = 斐波那契.目标()
16 goal_msg.订单 = 订单
17
18 自我.行动客户端.等待服务器()
19
20 自我.发送未来目标 = 自我.行动客户端.发送目标同步(goal_msg, 反馈回调=自我.反馈回调)
21
22 自我.发送未来目标.添加捐献回调(自我.目标响应回调)
23
24 捍卫 目标响应回调(自我, 未来):
25 目标句柄 = 未来.结果()
26 如果 不 目标句柄.无争议:
27 自我.get_logger().信息('目标被拒 :(';)
28 返回
29
30 自我.get_logger().信息('接受目标:)';)
31
32 自我._get_result_future_未来 = 目标句柄.get_result_async()
33 自我._get_result_future_未来.添加捐献回调(自我.get_result_callback)
34
35 捍卫 get_result_callback(自我, 未来):
36 结果 = 未来.结果().结果
37 自我.get_logger().信息('结果: {0}'.格式(结果.顺序))
38 rclpy.关闭()
39
40 捍卫 反馈回调(自我, 反馈信息):
41 反馈 = 反馈信息.反馈
42 自我.get_logger().信息('收到反馈: {0}'.格式(反馈.部分序列))
43
44
45捍卫 主要(参数=无):
46 rclpy.启动(参数=参数)
47
48 动作客户端 = FibonacciActionClient()
49
50 动作客户端.发送目标(10)
51
52 rclpy.后旋(动作客户端)
53
54
55如果 姓名____ == '__main__';:
56 主要()
下面是反馈信息的回调函数:
捍卫 反馈回调(自我, 反馈信息):
反馈 = 反馈信息.反馈
自我.get_logger().信息('收到反馈: {0}'.格式(反馈.部分序列))
在回调中,我们会获取信息的反馈部分,并打印出 部分序列
字段显示在屏幕上。
我们需要向动作客户端注册回调。为此,我们需要在发送目标时将回调传递给动作客户端:
自我.发送未来目标 = 自我.行动客户端.发送目标同步(goal_msg, 反馈回调=自我.反馈回调)
一切就绪。如果我们运行我们的操作客户端,你应该会看到反馈信息被打印到屏幕上。
摘要
在本教程中,您将逐行组装 Python 动作服务器和动作客户端,并配置它们以交换目标、反馈和结果。