您正在阅读的是旧版本但仍受支持的 ROS 2 文档。 Jazzy.

从节点记录袋(Python)

目标 将数据从自己的 Python 节点记录到数据包中。

辅导水平: 高级

时间 20 分钟

背景介绍

rosbag2 不只是提供 玫瑰2 背包 命令行工具。它还提供了 Python API,用于从自己的源代码中读取和写入数据包。这样,您就可以订阅主题,并在对数据进行其他处理的同时,将接收到的数据保存到数据包中。例如,您可以这样做来保存来自主题的数据和处理该数据的结果,而无需将处理过的数据发送到主题上进行记录。由于任何数据都可以记录在数据袋中,因此也可以保存由主题以外的其他来源生成的数据,例如用于训练集的合成数据。例如,这对于快速生成一个包含大量样本、播放时间较长的数据包非常有用。

先决条件

您应该有 rosbag2 作为 ROS 2 常规设置的一部分安装的软件包。

如果你在 Linux 上安装了 Debian 软件包,它可能已经默认安装。如果没有,可以使用此命令进行安装。

sudo apt install ros-humble-rosbag2

本教程讨论如何使用 ROS 2 工具包,包括从终端使用。您应该已经完成了 ROS 2 工具包基础教程.

任务

1 创建软件包

打开一个新终端,然后 为您的 ROS 2 安装提供源代码 以便 玫瑰2 命令即可运行。

跟进 本说明 创建一个名为 ros2_ws.

导航进入 ros2_ws/src 目录,并创建一个新软件包:

ros2 pkg create --build-type ament_python --license Apache-2.0 bag_recorder_nodes_py --ependencies rclpy rosbag2_py example_interfaces std_msgs

您的终端将返回一条信息,验证是否创建了软件包 记录袋节点 及其所有必要的文件和文件夹。文件 --依赖 参数会自动将必要的依赖行添加到 package.xml.在这种情况下,软件包将使用 rosbag2_py 软件包以及 rclpy 软件包。对 接口示例 包也是信息定义所必需的。

1.1 更新 package.xmlsetup.py

因为您使用了 --依赖 选项,就不必在创建软件包时手动将依赖关系添加到 package.xml.不过,请务必一如既往地将说明、维护者电子邮件和姓名以及许可证信息添加到 package.xml.

<描述>;Python 背包 写作 教程</description>;
维护者 电子邮件="[email protected]";>;您的 名称维护人员</maintainer>;
许可证阿帕奇 许可证 2.0</license>;

还请务必将此信息添加到 setup.py 文件。

维护者='您的姓名';,
维护者电子邮件='[email protected]';,
描述='Python书包写作教程';,
许可='阿帕奇许可证 2.0';,

2 编写 Python 节点

内部 ros2_ws/src/bag_recorder_nodes_py/bag_recorder_nodes_py 目录下,新建一个名为 simple_bag_recorder.py 并粘贴以下代码。

舶来品 rclpy
 rclpy.node 舶来品 节点
 rclpy.serialization 舶来品 序列化消息
 std_msgs.msg 舶来品 字符串

舶来品 rosbag2_py

 简单包记录器(节点):
    捍卫 启动(自我):
        棒极了().启动('simple_bag_recorder';)
        自我.作家 = rosbag2_py.顺序作家()

        存储选项 = rosbag2_py.存储.存储选项(
            uri='my_bag';,
            storage_id='sqlite3';)
        转换器选项 = rosbag2_py.存储.转换器选项('', '')
        自我.作家.(存储选项, 转换器选项)

        主题信息 = rosbag2_py.存储.主题元数据(
            名字='唠叨';,
            类型='std_msgs/msg/String';,
            序列化格式='cdr';)
        自我.作家.创建主题(主题信息)

        自我.订阅费 = 自我.创建订阅(
            字符串,
            '唠叨';,
            自我.topic_callback,
            10)
        自我.订阅费

    捍卫 topic_callback(自我, 信息):
        自我.作家.写道(
            '唠叨';,
            序列化消息(信息),
            自我.获取时钟().现在().纳秒)


捍卫 主要(参数=):
    rclpy.启动(参数=参数)
     = 简单包记录器()
    rclpy.后旋()
    rclpy.关闭()


如果 姓名____ == '__main__';:
    主要()

2.1 检查代码

"(《世界人权宣言》) 舶来品 顶部的语句是软件包依赖关系。注意导入 rosbag2_py 包中提供了处理袋文件所需的函数和结构。

在类的构造函数中,我们首先创建一个写入器对象,用来将数据写入数据包。我们正在创建一个 顺序作家,它按照接收到的顺序将信息写入信息袋中。其他具有不同行为的写入器可以在 rosbag2.

自我.作家 = rosbag2_py.顺序作家()

现在我们有了写入器对象,可以用它打开文件袋。我们要指定要创建的数据包的 URI 和格式 (sqlite3),其他选项保留默认值。使用默认转换选项时,将不执行转换,并以接收到的序列化格式存储报文。

存储选项 = rosbag2_py.存储.存储选项(
    uri='my_bag';,
    storage_id='sqlite3';)
转换器选项 = rosbag2_py.存储.转换器选项('', '')
自我.作家.(存储选项, 转换器选项)

接下来,我们需要告诉写入器我们希望存储的主题。为此,我们需要创建一个 主题元数据 对象,并将其注册到写入器中。该对象指定了主题名称、主题数据类型和使用的序列化格式。

主题信息 = rosbag2_py.存储.主题元数据(
    名字='唠叨';,
    类型='std_msgs/msg/String';,
    序列化格式='cdr';)
自我.作家.创建主题(主题信息)

现在写入器已经设置好记录我们传递给它的数据,我们创建一个订阅并为其指定一个回调。我们将在回调中向袋子写入数据。

自我.订阅费 = 自我.创建订阅(
    字符串,
    '唠叨';,
    自我.topic_callback,
    10)
自我.订阅费

回调以未序列化的形式接收信息(这是 rclpy API),并将消息传递给写入器,同时指定数据的主题和要与消息一起记录的时间戳。但是,写入器需要将序列化后的信息存储在信息袋中。这意味着我们需要在将数据传递给写入器之前将其序列化。为此,我们调用 序列化消息() 并将结果传递给写入器,而不是直接传递消息。

捍卫 topic_callback(自我, 信息):
    自我.作家.写道(
        '唠叨';,
        序列化消息(信息),
        自我.获取时钟().现在().纳秒)

文件以 主要 函数用于创建节点实例并启动 ROS 处理。

捍卫 主要(参数=):
    rclpy.启动(参数=参数)
     = 简单包记录器()
    rclpy.后旋()
    rclpy.关闭()

2.2 添加入口点

打开 setup.py 文件中的 记录袋节点 包,并为节点添加一个入口点。

入口={
    'console_scripts';: [
        'simple_bag_recorder = bag_recorder_nodes_py.simple_bag_recorder:main';,
    ],
},

3 构建和运行

返回工作区的根目录、 ros2_ws然后创建新软件包。

colcon build --packages-select bag_recorder_nodes_py

打开一个新终端,导航至 ros2_ws并获取设置文件。

source install/setup.bash

现在运行节点:

ros2 run bag_recorder_nodes_py simple_bag_recorder

打开第二个终端,运行 话匣子 示例节点。

ros2 run demo_nodes_cpp talker

这将开始在 唠叨 主题。写袋节点收到这些数据后,会将其写入 我的包 包。如果 我的包 目录已经存在,您必须在运行 简单记录袋 节点。这是因为 rosbag2 默认情况下不会覆盖现有包,因此目标目录不可能存在。

终止两个节点。然后,在一个终端启动 听众 示例节点。

ros2 运行 demo_nodes_cpp 监听器

在另一个终端,使用 玫瑰2 背包 播放节点录制的音袋。

ros2 背包扮演 my_bag

您将看到来自信息包的信息正在被 听众 节点。

如果您想再次运行写袋节点,首先需要删除 我的包 目录。

4 记录节点的合成数据

任何数据都可以记录到数据包中,而不仅仅是通过主题接收到的数据。从自己的节点写入数据包的一个常见用例是生成和存储合成数据。在本节中,您将学习如何编写一个节点,生成一些数据并将其存储到数据包中。我们将演示两种方法。第一种方法使用带有定时器的节点;如果数据生成是在节点外部进行的,例如直接从硬件(如摄像头)读取数据,则可以使用这种方法。第二种方法不使用节点;这是在不需要使用 ROS 基础架构的任何功能时可以使用的方法。

4.1 编写 Python 节点

内部 ros2_ws/src/bag_recorder_nodes_py/bag_recorder_nodes_py 目录下,新建一个名为 data_generator_node.py 并粘贴以下代码。

舶来品 rclpy
 rclpy.node 舶来品 节点
 rclpy.serialization 舶来品 序列化消息
 example_interfaces.msg 舶来品 Int32

舶来品 rosbag2_py

 数据生成器节点(节点):
    捍卫 启动(自我):
        棒极了().启动('data_generator_node';)
        自我.数据 = Int32()
        自我.数据.数据 = 0
        自我.作家 = rosbag2_py.顺序作家()

        存储选项 = rosbag2_py.存储.存储选项(
            uri='timed_synthetic_bag';,
            storage_id='sqlite3';)
        转换器选项 = rosbag2_py.存储.转换器选项('', '')
        自我.作家.(存储选项, 转换器选项)

        主题信息 = rosbag2_py.存储.主题元数据(
            名字='合成';,
            类型='example_interfaces/msg/Int32';,
            序列化格式='cdr';)
        自我.作家.创建主题(主题信息)

        自我.定时器 = 自我.创建计时器(1, 自我.定时器回调)

    捍卫 定时器回调(自我):
        自我.作家.写道(
            '合成';,
            序列化消息(自我.数据),
            自我.获取时钟().现在().纳秒)
        自我.数据.数据 += 1


捍卫 主要(参数=):
    rclpy.启动(参数=参数)
    dgn = 数据生成器节点()
    rclpy.后旋(dgn)
    rclpy.关闭()


如果 姓名____ == '__main__';:
    主要()

4.2 检查代码

该代码的大部分内容与第一个示例相同。重要的不同之处在此说明。

首先,改变袋子的名称。

存储选项 = rosbag2_py.存储.存储选项(
    uri='timed_synthetic_bag';,
    storage_id='sqlite3';)

主题名称和存储的数据类型也会改变。

主题信息 = rosbag2_py.存储.主题元数据(
    名字='合成';,
    类型='example_interfaces/msg/Int32';,
    序列化格式='cdr';)
自我.作家.创建主题(主题信息)

该节点有一个计时器,而不是对主题的订阅。定时器以一秒为周期触发,并在触发时调用给定的成员函数。

自我.定时器 = 自我.创建计时器(1, 自我.定时器回调)

在定时器回调中,我们生成(或以其他方式获取,例如从连接到某些硬件的串行端口读取)我们希望存储在袋中的数据。与上一个示例一样,数据尚未序列化,因此我们必须在将其传递给写入器之前将其序列化。

自我.作家.写道(
    '合成';,
    序列化消息(自我.数据),
    自我.获取时钟().现在().纳秒)

4.3 添加可执行文件

打开 setup.py 文件中的 记录袋节点 包,并为节点添加一个入口点。

入口={
    'console_scripts';: [
        'simple_bag_recorder = bag_recorder_nodes_py.simple_bag_recorder:main';,
        'data_generator_node = bag_recorder_nodes_py.data_generator_node:main';,
    ],
},

4.4 构建和运行

返回工作区的根目录、 ros2_ws,并创建您的软件包。

colcon build --packages-select bag_recorder_nodes_py

打开一个新终端,导航至 ros2_ws并获取设置文件。

source install/setup.bash

如果 定时合成袋 目录已经存在,运行节点前必须先删除该目录。

现在运行节点:

ros2 run bag_recorder_nodes_py data_generator_node

等待 30 秒左右,然后使用 按住-c.接着,播放制作好的袋子。

ROS2合成袋定时播放

打开第二个终端,回显 /合成 主题。

ros2 topic echo /synthetic

您将看到以每秒一条信息的速度打印到控制台的已生成并存储在数据包中的数据。

5 记录可执行文件的合成数据

现在,您可以创建一个存储来自主题以外数据源的数据包,您将学习如何从非节点可执行文件生成和记录合成数据。这种方法的优点是代码简单,可以快速创建大量数据。

5.1 编写 Python 可执行文件

内部 ros2_ws/src/bag_recorder_nodes_py/bag_recorder_nodes_py 目录下,新建一个名为 data_generator_executable.py 并粘贴以下代码。

 rclpy.clock 舶来品 时钟
 rclpy.duration 舶来品 持续时间
 rclpy.serialization 舶来品 序列化消息
 example_interfaces.msg 舶来品 Int32

舶来品 rosbag2_py


捍卫 主要(参数=):
    作家 = rosbag2_py.顺序作家()

    存储选项 = rosbag2_py.存储.存储选项(
        uri='big_synthetic_bag',
        storage_id='sqlite3';)
    转换器选项 = rosbag2_py.存储.转换器选项('', '')
    作家.(存储选项, 转换器选项)

    主题信息 = rosbag2_py.存储.主题元数据(
        名字='合成';,
        类型='example_interfaces/msg/Int32';,
        序列化格式='cdr';)
    作家.创建主题(主题信息)

    时间戳 = 时钟().现在()
    对于 ii  范围(0, 100):
        数据 = Int32()
        数据.数据 = ii
        作家.写道(
            '合成';,
            序列化消息(数据),
            时间戳.纳秒)
        时间戳 += 持续时间(秒钟=1)

如果 姓名____ == '__main__';:
    主要()

5.2 检查代码

比较一下这个示例和上一个示例就会发现,它们并没有什么不同。唯一明显的不同是使用 for 循环而不是定时器来驱动数据生成。

请注意,我们现在也在为数据生成时间戳,而不是依赖每个样本的当前系统时间。时间戳可以是你需要的任何值。数据将以这些时间戳给出的速度回放,因此这是控制样本默认回放速度的有效方法。还要注意的是,虽然每个采样之间的时间间隔是一整秒,但该可执行文件不需要在每个采样之间等待一秒钟。这样,我们就能在比回放时间更短的时间内生成大量数据,覆盖更宽的时间跨度。

时间戳 = 时钟().现在()
对于 ii  范围(0, 100):
    数据 = Int32()
    数据.数据 = ii
    作家.写道(
        '合成';,
        序列化消息(数据),
        时间戳.纳秒)
    时间戳 += 持续时间(秒钟=1)

5.3 添加可执行文件

打开 setup.py 文件中的 记录袋节点 包,并为节点添加一个入口点。

入口={
    'console_scripts';: [
        'simple_bag_recorder = bag_recorder_nodes_py.simple_bag_recorder:main';,
        'data_generator_node = bag_recorder_nodes_py.data_generator_node:main';,
        'data_generator_executable = bag_recorder_nodes_py.data_generator_executable:main';,
    ],
},

5.4 构建和运行

返回工作区的根目录、 ros2_ws,并创建您的软件包。

colcon build --packages-select bag_recorder_nodes_py

打开终端,导航至 ros2_ws并获取设置文件。

source install/setup.bash

如果 大合成包 目录已经存在,运行可执行文件前必须先删除该目录。

现在运行可执行文件:

ros2 run bag_recorder_nodes_py data_generator_executable

请注意,可执行文件的运行和结束速度都非常快。

现在回放制作好的袋子。

ROS2 袋装大合成树脂袋

打开第二个终端,回显 /合成 主题。

ros2 topic echo /synthetic

您将看到以每秒一条信息的速度将生成并存储在数据袋中的数据打印到控制台。尽管数据包的生成速度很快,但仍会按照时间戳指示的速度进行回放。

摘要

您创建了一个节点,它可以将收到的主题数据记录到数据包中。您测试了使用该节点记录一个数据包,并通过回放该数据包来验证数据是否被记录。这种方法可以用来录制一个数据包,其中包含的数据比它从某个主题接收到的数据更多,例如处理接收到的数据所得到的结果。然后,您继续创建节点和可执行文件,以生成合成数据并将其存储到数据包中。后一种方法对于生成合成数据非常有用,例如,可以用作训练集。