您正在阅读的是旧版本但仍受支持的 ROS 2 文档。 Jazzy.

建立高效的进程内通信

背景介绍

ROS 应用程序通常由单个 "节点 "组成,这些 "节点 "只能执行有限的任务,并与系统的其他部分解耦。这有利于故障隔离、更快的开发、模块化和代码重用,但往往要以牺牲性能为代价。在 ROS 1 初步开发完成后,高效节点组合的需求变得非常明显,于是 Nodelets 应运而生。在 ROS 2 中,我们旨在通过解决一些需要重组节点的基本问题来改进 Nodelets 的设计。

在本演示中,我们将重点介绍如何通过单独定义节点,并在不更改节点代码或限制其功能的情况下,将其组合到不同的流程布局中,从而手动组成节点。

安装演示程序

参见 安装说明 了解安装 ROS 2 的详情。

如果您从软件包中安装了 ROS 2,请确保有 ros-humble-内进程演示 已安装。如果您下载了压缩包或从源代码构建了 ROS 2,那么它已经是安装的一部分了。

运行和理解演示

有几个不同的演示:有些是玩具问题,旨在突出进程内通信功能的特点;有些是端到端的示例,使用 OpenCV 并演示将节点重新组合为不同配置的能力。

双节点管道演示

该演示旨在展示进程内的发布/订阅连接在使用 std::unique_ptrs.

首先让我们来看看消息来源:

https://github.com/ros2/demos/blob/humble/intra_process_demo/src/two_node_pipeline/two_node_pipeline.cpp

#include 时间顺序<chrono>;
#include <cinttypes>;
#include <cstdio>;
#include <内存>;
#include <字符串>;
#include <utility>;

#include "rclcpp/rclcpp.hpp";
#include "std_msgs/msg/int32.hpp";

使用 命名空间 标准::计时器;

// 生成信息的节点。
结构 制片人 :  rclcpp::节点
{
  制片人( 标准::字符串 及样品; 名字,  标准::字符串 及样品; 产量)
  : 节点(名字, rclcpp::节点选项().使用内部流程通信())
  {
    // 在输出主题上创建一个发布者。
    公共_ = ->;创建出版商<;std_msgs::信息::Int32>;(产量, 10);
    标准::弱_ptr<;标准::删除指针<;decltype(公共_.获取())>::类型>; 捕获的出版物 = 公共_;
    // 创建一个定时器,以 ~1Hz 的频率在输出主题上发布。
    汽车 回调 = [捕获的出版物]() ->; 空白 {
        汽车 pub_ptr = 捕获的出版物.水闸();
        如果 (!pub_ptr) {
          返回;
        }
        天电 int32_t 计数 = 0;
        std_msgs::信息::Int32::UniquePtr 信息( std_msgs::信息::Int32());
        信息->;数据 = 计数++;
        printf(
          "Published message with value: %d, and address:0x%"; PRIXPTR ";\n";, 信息->;数据,
          重读<;标准::uintptr_t>;(信息.获取()));
        pub_ptr->;发布(标准::行动(信息));
      };
    定时器 = ->;创建隔离墙计时器(1s, 回调);
  }

  rclcpp::出版商<;std_msgs::信息::Int32>::SharedPtr 公共_;
  rclcpp::定时器基数::SharedPtr 定时器;
};

// 消耗信息的节点。
结构 消费者 :  rclcpp::节点
{
  消费者( 标准::字符串 及样品; 名字,  标准::字符串 及样品; 输入)
  : 节点(名字, rclcpp::节点选项().使用内部流程通信())
  {
    // 在输入主题上创建订阅,在收到新信息时打印。
    子_ = ->;创建订阅<;std_msgs::信息::Int32>;(
      输入,
      10,
      [](std_msgs::信息::Int32::UniquePtr 信息) {
        printf(
          "收到的信息值为:%d,地址为:0x%":0x%"; PRIXPTR ";\n";, 信息->;数据,
          重读<;标准::uintptr_t>;(信息.获取()));
      });
  }

  rclcpp::订阅<;std_msgs::信息::Int32>::SharedPtr 子_;
};

int 主要(int 参数, 烧焦 * 参数[])
{
  setvbuf(数据输出, NULL, _IONBF, BUFSIZ);
  rclcpp::启动(参数, 参数);
  rclcpp::执行人::单线程执行器 执行人;

  汽车 监制 = 标准::共享<;制片人>;("制作人";, "号码";);
  汽车 消费者 = 标准::共享<;消费者>;("消费者";, "号码";);

  执行人.添加节点(监制);
  执行人.添加节点(消费者);
  执行人.后旋();

  rclcpp::关闭();

  返回 0;
}

通过查看 主要 函数,我们有一个生产者节点和一个消费者节点,我们将它们添加到一个单线程执行器中,然后调用 spin。

如果你查看 "生产者 "节点在 制片人 在结构图中,您可以看到我们创建了一个在 "数字 "主题上发布信息的发布器和一个定时器,定时器会定期创建一条新信息,打印出其在内存中的地址和内容值,然后将其发布。

消费者 "节点比较简单,你可以在 消费者 结构,因为它只订阅 "数字 "主题,并打印收到的信息的地址和值。

我们期望生产者打印出一个地址和值,消费者打印出一个匹配的地址和值。这表明进程内通信确实有效,至少对于简单的图形来说,避免了不必要的复制。

让我们运行演示,执行 玫瑰2 运行 进程内演示 双节点管道 可执行文件(别忘了先获取设置文件的源代码):

$ 玫瑰2 运行 进程内演示 双节点管道 已发布 信息  价值: 0,  地址 0x7fb02303faf0 已发布 信息  价值: 1,  地址 0x7fb020cf0520
 已收到 信息  价值: 1,  地址 0x7fb020cf0520 已发布 信息  价值: 2,  地址 0x7fb020e12900
 已收到 信息  价值: 2,  地址 0x7fb020e12900 已发布 信息  价值: 3,  地址 0x7fb020cf0520
 已收到 信息  价值: 3,  地址 0x7fb020cf0520 已发布 信息  价值: 4,  地址 0x7fb020e12900
 已收到 信息  价值: 4,  地址 0x7fb020e12900 已发布 信息  价值: 5,  地址 0x7fb02303cea0
 已收到 信息  价值: 5,  地址 0x7fb02303cea0
[...]

你会注意到,这些信息大约每秒一条。这是因为我们告诉定时器大约每秒触发一次。

您可能还注意到,第一条信息(值为 0)没有相应的 "收到消息...... "行。这是因为发布/订阅是 "尽力而为 "的,我们没有启用任何类似 "锁定 "的行为。这意味着,如果发布者在订阅建立之前发布了一条消息,订阅将不会收到该消息。这种竞赛条件会导致前几条信息丢失。在这种情况下,由于信息每秒只发送一次,通常只会丢失第一条信息。

最后,您可以看到 "已发布邮件...... "和 "已接收邮件...... "两行的值相同,地址也相同。这表明接收到的信息的地址与已发布的信息的地址相同,而不是副本。这是因为我们是用 std::unique_ptr可以安全地在系统中移动消息的所有权。您还可以使用 及样品;std::shared_ptr但在这种情况下不会出现零拷贝。

循环流水线演示

这个演示与上一个演示类似,但生产者不是为每次迭代创建新消息,而是只使用一个消息实例。这是通过在图中创建一个循环,并在旋转执行器之前从外部让其中一个节点发布消息,从而 "启动 "通信:

https://github.com/ros2/demos/blob/humble/intra_process_demo/src/cyclic_pipeline/cyclic_pipeline.cpp

#include 时间顺序<chrono>;
#include <cinttypes>;
#include <cstdio>;
#include <内存>;
#include <字符串>;
#include <utility>;

#include "rclcpp/rclcpp.hpp";
#include "std_msgs/msg/int32.hpp";

使用 命名空间 标准::计时器;

// 该节点接收一个 Int32,等待 1 秒,然后递增并发送。
结构 递增管道 :  rclcpp::节点
{
  递增管道( 标准::字符串 及样品; 名字,  标准::字符串 及样品; ,  标准::字符串 及样品; 向外)
  : 节点(名字, rclcpp::节点选项().使用内部流程通信())
  {
    // 在输出主题上创建一个发布者。
    酒吧 = ->;创建出版商<;std_msgs::信息::Int32>;(向外, 10);
    标准::弱_ptr<;标准::删除指针<;decltype(酒吧.获取())>::类型>; 捕获的出版物 = 酒吧;
    // 在输入主题上创建订阅。
    字幕 = ->;创建订阅<;std_msgs::信息::Int32>;(
      ,
      10,
      [捕获的出版物](std_msgs::信息::Int32::UniquePtr 信息) {
        汽车 pub_ptr = 捕获的出版物.水闸();
        如果 (!pub_ptr) {
          返回;
        }
        printf(
          "收到信息,信息值为:%d,地址为:0x%":0x%"; PRIXPTR ";\n";, 信息->;数据,
          重读<;标准::uintptr_t>;(信息.获取()));
        printf(睡眠 1 秒...\n";);
        如果 (!rclcpp::sleep_for(1s)) {
          返回;    // 如果睡眠失败(例如在 :kbd:`ctrl-c` 上)则返回。
        }
        printf(完成。\n";);
        信息->;数据++;    // 增加信息数据。
        printf(
          "递增并以值:%d 和地址发送:0x%"; PRIXPTR ";\n";, 信息->;数据,
          重读<;标准::uintptr_t>;(信息.获取()));
        pub_ptr->;发布(标准::行动(信息));    // 将信息发送到输出主题。
      });
  }

  rclcpp::出版商<;std_msgs::信息::Int32>::SharedPtr 酒吧;
  rclcpp::订阅<;std_msgs::信息::Int32>::SharedPtr 字幕;
};

int 主要(int 参数, 烧焦 * 参数[])
{
  setvbuf(数据输出, NULL, _IONBF, BUFSIZ);
  rclcpp::启动(参数, 参数);
  rclcpp::执行人::单线程执行器 执行人;

  // 通过连接两个 IncrementerPipe's 的输入和输出主题,创建一个简单的循环。
  // 期望它们之间传递的信息地址永远不会改变。
  汽车 管道1 = 标准::共享<;递增管道>;("pipe1";, "topic1";, "topic2";);
  汽车 管道2 = 标准::共享<;递增管道>;("pipe2";, "topic2";, "topic1";);
  rclcpp::sleep_for(1s);  // 等待订阅建立,以避免出现竞赛条件。
  // 发布第一条信息(启动循环)。
  标准::唯一参数<;std_msgs::信息::Int32>; 信息( std_msgs::信息::Int32());
  信息->;数据 = 42;
  printf(
    "Published first message with value: %d, and address:0x%"; PRIXPTR ";\n";, 信息->;数据,
    重读<;标准::uintptr_t>;(信息.获取()));
  管道1->;酒吧->;发布(标准::行动(信息));

  执行人.添加节点(管道1);
  执行人.添加节点(管道2);
  执行人.后旋();

  rclcpp::关闭();

  返回 0;
}

与上一个演示不同,本演示只使用了一个节点,并以不同的名称和配置实例化了两次。图最终是 管道1 ->; 管道2 ->; 管道1 ......循环往复。

线路 pipe1->pub->publish(msg); 启动进程,但从那时起,信息会通过每个节点在自己的订阅回调中调用发布在节点之间来回传递。

这里的预期是节点每秒来回传递一次信息,每次都会递增信息的值。因为消息是以 唯一参数 在开始时创建的同一信息会被持续使用。

为了验证这些期望,让我们运行它:

$ 玫瑰2 运行 进程内演示 cyclic_pipeline 已发布 第一次 信息  价值:  42,  地址 0x7fd2ce0a2bc0 收到 信息  价值:         42,  地址 0x7fd2ce0a2bc0
  睡觉 对于 1 第二...
  完成的.递增  发送  价值: 43,  地址 0x7fd2ce0a2bc0 收到 信息  价值:         43,  地址 0x7fd2ce0a2bc0
  睡觉 对于 1 第二...
  完成的.递增  发送  价值: 44,  地址 0x7fd2ce0a2bc0 收到 信息  价值:         44,  地址 0x7fd2ce0a2bc0
  睡觉 对于 1 第二...
  完成的.递增  发送  价值: 45,  地址 0x7fd2ce0a2bc0 收到 信息  价值:         45,  地址 0x7fd2ce0a2bc0
  睡觉 对于 1 第二...
  完成的.递增  发送  价值: 46,  地址 0x7fd2ce0a2bc0 收到 信息  价值:         46,  地址 0x7fd2ce0a2bc0
  睡觉 对于 1 第二...
  完成的.递增  发送  价值: 47,  地址 0x7fd2ce0a2bc0 收到 信息  价值:         47,  地址 0x7fd2ce0a2bc0
  睡觉 对于 1 第二...
[...]

每次迭代时,你都会看到数字不断增加,从 42 开始......因为是 42,而且一直重复使用相同的信息,指针地址不会改变,这就避免了不必要的复制。

图像管道演示

在本演示中,我们将使用 OpenCV 捕捉、注释和查看图像。

备注

如果您使用的是 macOS,而这些示例不起作用或收到类似以下的错误信息 ddsi_conn_write 失败 -1则需要增加系统范围内的 UDP 数据包大小:

$ 苏都 sysctl -w net.inet.udp.recvspace=209715
$ 苏都 sysctl -w net.inet.udp.maxdgram=65500

重新启动后,这些更改将不会持续。

简单管道

首先,我们将有一个由三个节点组成的管道,排列如下: 摄像机节点 ->; 水印节点 ->; 图像视图节点

"(《世界人权宣言》) 摄像机节点 从摄像设备读取数据 0 在计算机上写入图像信息并发布。图像 水印节点 的输出。 摄像机节点 并在发布前添加更多文字。最后 图像视图节点 的输出。 水印节点在图像中写入更多文字,然后用 cv::imshow.

在每个节点中,正在发送或已经接收或两者兼有的信息地址被写入图像。水印节点和图像视图节点的设计目的是在不复制图像的情况下修改图像,因此,只要节点处于同一流程中,图像上印记的地址都应该是相同的,而且图的组织结构也应保持如上所示的流水线。

备注

在某些系统上(我们在 Linux 系统上看到过这种情况),打印到屏幕上的地址可能不会改变。这是因为重复使用了同一个唯一指针。在这种情况下,管道仍在运行。

让我们执行以下可执行文件来运行演示:

玫瑰2 运行 进程内演示 统一图像管道

你应该看到这样的内容:

../././_images/intra-process-demo-pipeline-single-window.png

按空格键可以暂停图像渲染,再次按空格键可以继续渲染。您还可以按 qESC 退出。

如果暂停图像查看器,就可以比较图像上写的地址,发现它们是一样的。

带有两个图像查看器的管道

现在我们来看一个与上面类似的示例,只不过它有两个图像视图节点。所有节点仍在同一进程中,但现在应显示两个图像视图窗口。(macOS用户请注意:图像视图窗口可能会相互重叠)。让我们使用命令运行它:

玫瑰2 运行 进程内演示 带有两个图像视图的图像管道
./././_images/intra-process-demo-pipeline-two-windows-copy.png

与上一个示例一样,您可以按空格键暂停渲染,然后再按一次空格键继续渲染。您可以停止更新,检查写入屏幕的指针。

正如您在上面的示例图片中看到的那样,我们有一张所有指针都相同的图片,而另一张图片的前两个条目的指针与第一张图片相同,但第二张图片的最后一个指针却不同。要理解为什么会出现这种情况,请考虑一下图的拓扑结构:

摄像机节点 ->; 水印节点 ->; 图像视图节点
                              ->; 图像视图节点 2

之间的联系 摄像机节点水印节点 可以使用相同的指针而无需复制,因为只有一个进程内订阅可以接收到报文。但对于 水印节点 和两个图像视图节点的关系是一对多,因此如果图像视图节点使用 唯一参数 那么就不可能将同一指针的所有权同时传递给两个回调。不过,它可以传递给其中一个。至于哪一个会获得原始指针,并没有定义,而只是最后一个被传递的指针。

请注意,图像视图节点没有订阅 唯一参数 回调。而不是用 共享_ptrs.这意味着系统提供的 共享_ptr 这两个回调。处理第一个进程内订阅时,内部存储的 唯一参数 晋升为 共享_ptr.每个回调都将接收同一消息的共享所有权。

带有进程间查看器的管道

还有一件重要的事情要做好,那就是避免在进程间订阅时中断进程内的零拷贝行为。为了测试这一点,我们可以运行第一个图像管道演示、 统一图像管道然后运行一个独立的 图像视图节点 (别忘了在前缀中加上 玫瑰2 运行 进程内演示 在终端中)。看起来会是这样

../././_images/intra-process-demo-pipeline-inter-process.png

很难同时暂停两幅图像,因此图像可能不一致,但需要注意的是 统一图像管道 图像视图显示每个步骤的相同地址。这意味着,即使也订阅了外部视图,进程内的零拷贝也会保留。您还可以看到,进程间图像视图的前两行文本和第三行文本中独立图像查看器的进程 ID 不同。