前言
在机器人开发中,多线程的使用司空见惯。ROS2借助executor类帮助开发者简化多线程的使用,但是还是得先把基本概念搞清楚,才能正确的使用。本文解释了ROS1和ROS2中的并发/多线程概念,并且给出了ROS2版本一些实际例子帮助理解。
之前我也写了一篇也是关于ROS2多线程的文章:ROS2 service类型消息的同步调用问题浅析,更偏向于代码开发,可以阅读完本文之后再看,理解更深。
并发
让我们回到基础知识。并发意味着系统或软件可以同时运行许多任务。例如,在单核CPU机器上,可以通过使用线程来实现并发。
线程是可以由操作系统创建和管理的轻量级执行单元。每个线程都有自己的堆栈,并且可以与其他线程并发执行代码。
例如,假设我们有一台单核 CPU 机器和两个并发执行的线程。操作系统将以快速的速度在两个线程之间切换,允许每个线程在切换到另一个线程之前执行一小段时间。此过程将重复,直到两个线程都完成其任务。我们将其称为并发机制。
- 为什么我们在这里讨论并发?
下面是一个使用多个节点并需要并发来执行其任务的人形机器人的示例:
传感器节点: 该节点从机器人的传感器获取数据,例如摄像头、激光雷达、IMU 和力扭矩传感器。它将这些数据发布到其他节点可以订阅的主题。
感知节点: 该节点处理传感器数据以感知机器人的周围环境,例如检测障碍物或识别物体。它将这些信息发布到其他节点可用于导航或规划的主题。
导航节点: 该节点执行定位和映射,使机器人能够在其环境中导航,并生成低级控制命令来移动机器人的关节。它订阅传感器数据和感知主题以获取机器人周围环境的信息,并向驱动节点发布控制命令。
驱动节点: 该节点接收来自导航节点的控制命令,并将其转换为低级电机命令以移动机器人的关节。
规划节点: 该节点生成供机器人执行的高级计划,例如拾取物体或导航到特定位置。
在此示例中,所有节点同时运行并通过 ROS 消息传递系统相互通信。并发性的使用使机器人能够同时执行多个任务并实时响应环境的变化。
因此,了解 ROS 1 中的并发对于理解 ROS 2 执行器概念非常重要,因为 ROS 2 建立在 ROS 1 中引入的并发机制和概念的基础上。
在本例中,发布节点向 “hello_world_topic ”主题发送了一条包含字符串 “Hello, world!”的消息。订阅者节点监听同一主题上的消息,并使用 ROS_INFO 宏将收到的消息打印到控制台。
在 ROS 1(使用 roscpp 库)中,系统使用的线程模型是抽象出来的,与用户无关。这意味着,虽然系统可以在内部使用线程来管理诸如网络通信和调度等任务,但却不会将这些线程 API 暴露给用户应用程序。上述应用程序会产生多少个线程?
在这里,定时器线程用于以指定速率执行发布者回调函数。回调队列是一个 FIFO 数据结构,用于存储等待执行的待处理回调。队列中的回调由与节点相关联的 spinner 线程执行。当 spinner 线程运行时,它会检查回调队列中的待执行回调,并按照添加到队列中的顺序执行这些回调。
网络线程负责管理 ROS 节点间的网络通信。网络线程与用于执行 Pub/sub 和服务回调的 spinner 线程是分开的。不过,网络线程和 spinner 线程协同工作,为 ROS 节点之间提供无缝、高效的通信。
订阅者节点收到消息后,会将其添加到消息队列中。与节点相关联的 Spinner 线程负责从队列中读取消息,并为订阅者执行回调函数。
一般来说,如果回调函数的执行时间足够短,即使 CPU 只有一个内核,系统也能及时处理传入的信息并执行回调函数。但是,如果回调函数的执行时间较长,处理传入信息和执行回调可能需要更长的时间。这可能导致接收到的信息在用户端开始排队。
就障碍物检测系统而言,如果障碍物检测器回调函数的执行时间较长,则可能需要更长的时间来处理有关障碍物存在的传入信息。这会导致系统响应延迟,从而有可能使机器人撞上障碍物。使用具有额外 CPU 内核的硬件是解决这一问题的方法之一。
ROS1 - 多线程旋spinner:
ros::MultiThreadedSpinner spinner(2); // 2 threads
spinner.spin();// orros::AsyncSpinner spinner(2);
spinner.start();
在 ROS 1 中,你可以使用 MultiThreadedSpinner 或 AsyncSpinner 类在 ROS 节点中启用多线程,并使用多个 CPU 内核并行处理传入的消息,如果回调函数的执行时间较长,这可以提高 ROS 节点的性能。
默认情况下,给定订阅的回调函数会按顺序执行,每个回调函数的执行都受一个 mutex(一种同步对象)的保护。这意味着在任何给定时间内,只能执行一个给定订阅的回调函数,同一订阅的任何其他回调函数都将排队等待,直到静态代理被释放。要解决这个问题:
上面提供的代码展示了如何使用 ros::SubscribeOptions 类和 ros::MultiThreadedSpinner 类在 ROS 1 节点中启用并发回调执行。
在ROS2,rclcpp::执行者::执行程序类似于ros::旋()和ros::MultiThreadedSpinner在ROS1,但是,它提供了更多的适应性和定制。
Executors
概念解释
执行器(Executor)是一个负责调度和执行 ROS2 系统任务的组件。与 ROS 1 中的机制相比,显式 Executor 类(rclcpp 中的 executor.hpp、rclpy 中的 executors.py 或 rclc 中的 executor.h)提供了更多的执行管理控制,尽管其基本应用程序接口非常相似。
Executor 有许多功能和定制选项,包括
- 支持同时spin多个节点: 您可以使用 add_node() 方法为 Executor 添加多个节点。
- 我们可以使用 rclcpp::callback_group::CallbackGroup 类指定回调函数的执行顺序。您可以创建多个 rclcpp::callback_group::CallbackGroup 实例,并将回调函数添加到相应的组中,然后使用 add_callback_group() 方法指定各组的执行顺序。
- 我们可以使用 rclcpp::callback_group::CallbackGroup 类指定用于消息处理的线程数,并自定义回调排队和取消排队的方式。
rclcpp::executor::SingleThreadedExecutor 类也是 rclcpp::executor::Executor 的子类。如果你想确保回调函数以特定顺序执行,或者想避免管理多个线程的开销,那么它就非常有用。
在其他方面,rclcpp::executor::MultiThreadedExecutor 可在多个线程中并发执行回调函数。它使用线程池处理传入消息,并同时执行回调函数,让你可以利用多个 CPU 内核,提高系统性能。
此外,rclcpp::executor::StaticSingleThreadedExecutor 允许我们在运行时指定回调函数的执行顺序。我相信,这个接口类似于单线程执行器。
基本用法
在最简单的情况下,主线程通过调用 rclcpp::spin(…) 来处理节点传入的消息和事件,如下所示:
int main(int argc, char* argv[])
{// Some initialization.rclcpp::init(argc, argv);...// Instantiate a node.rclcpp::Node::SharedPtr node = ...// Run the executor.rclcpp::spin(node);// Shutdown and exit....return 0;
}
对 spin(node) 的调用基本上是对单线程执行器(Single-Threaded Executor)的实例化和调用,它是最简单的执行器:
rclcpp::executors::SingleThreadedExecutor executor;
executor.add_node(node);
executor.spin();
通过调用 Executor 实例的 spin(),当前线程开始查询 rcl 层和中间件层的传入消息和其他事件,并调用相应的回调函数,直到节点关闭。为了不影响中间件的 QoS 设置,收到的消息不会存储在客户端库层的队列中,而是保存在中间件中,直到回调函数对其进行处理。(wait set 用于向执行器通报中间件层的可用信息,每个队列有一个二进制标志。wait set还用于检测计时器何时过期。)
单线程执行器也用于组件(components)的容器进程,即在创建和执行节点时不使用显式主函数的所有情况。
executors种类
多线程执行器(Multi-Threaded Executor)可创建可配置数量的线程,以便并行处理多个消息或事件。静态单线程执行器(Static singlr-Thtraeded Executor)优化了在订阅、计时器、服务服务器、动作服务器等方面扫描节点结构的运行成本。它只在节点添加时执行一次扫描,而其他两个执行器则定期扫描此类变化。因此,静态单线程执行器只能用于在初始化过程中创建所有订阅、定时器等的节点。
通过为每个节点调用 add_node(…),所有三个执行器都可用于多个节点。
rclcpp::Node::SharedPtr node1 = ...
rclcpp::Node::SharedPtr node2 = ...
rclcpp::Node::SharedPtr node3 = ...rclcpp::executors::StaticSingleThreadedExecutor executor;
executor.add_node(node1);
executor.add_node(node2);
executor.add_node(node3);
executor.spin();
在上述示例中,静态单线程执行器的一个线程用于同时为三个节点提供服务。对于多线程执行器,实际并行性取决于回调组.
回调组
ROS 2 允许将节点的回调组织成组。在 rclcpp 中,这种回调组可以通过节点类的 create_callback_group 函数创建。在 rclpy 中,同样的方法是调用特定回调组类型的构造函数。回调组必须在节点的整个执行过程中存储(例如作为类成员),否则执行器将无法触发回调。然后,可以在创建订阅、定时器等时指定回调组。- 例如通过订阅选项:
my_callback_group = create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);rclcpp::SubscriptionOptions options;
options.callback_group = my_callback_group;my_subscription = create_subscription<Int32>("/topic", rclcpp::SensorDataQoS(),callback, options);
创建时未指明回调组的所有订阅、定时器等都将分配给默认回调组。默认回调组可通过 rclcpp 中的 NodeBaseInterface::get_default_callback_group() 和 rclpy 中的 Node.default_callback_group 查询。
回调组有两种类型,必须在实例化时指定类型:
- Mutualy exclusive 互斥: 该组的回调不得并行执行。
- Reentrant 可重入: 该组的回调可以并行执行。
不同回调组的回调总是可以并行执行。多线程执行器使用其线程作为线程池,根据这些条件并行处理尽可能多的回调。有关如何高效使用回调组的提示,请参阅使用回调组。
自 Galactic 以来,rclcpp 中 Executor 基类的接口通过一个新函数 add_callback_group(…) 得到了完善。这允许将回调组分配给不同的 Executor。通过使用操作系统调度器配置底层线程,特定的回调可以优先于其他回调。例如,控制循环的订阅和定时器可以优先于节点的所有其他订阅和标准服务。examples_rclcpp_cbg_executor 软件包提供了这一机制的演示。
调度语义
如果回调的处理时间短于消息和事件发生的周期,执行器基本上会以先进先出的顺序处理它们。但是,如果某些回调的处理时间较长,消息和事件就会在堆栈的下层排队。wait set机制向执行器报告的关于这些队列的信息非常少。具体来说,它只报告某个主题是否有任何消息。执行器利用这些信息以循环方式处理报文(包括服务和操作),但不是按先进先出的顺序。下面的流程图直观地展示了这种调度语义。
Casini 等人在 ECRTS 2019 上发表的一篇论文首次描述了这一语义。(注:该论文还解释说,定时器事件的优先级高于所有其他消息。Eloquent 取消了这一优先级)。
展望
虽然 rclcpp 的三个执行器能很好地满足大多数应用程序的要求,但也存在一些问题,使它们不适合实时应用程序,因为实时应用程序需要明确定义的执行时间、确定性和对执行顺序的自定义控制。下面是其中一些问题的总结:
- 复杂和混合的调度语义。理想情况下,你需要定义明确的调度语义来执行正式的时序分析。
- 回调可能会出现优先级倒置。优先级较高的回调可能会被优先级较低的回调阻塞。
- 无法明确控制回调执行顺序。
- 对特定主题的触发没有内置控制。
此外,执行器在 CPU 和内存使用方面的开销也相当大。静态单线程执行器大大减少了这种开销,但对于某些应用程序来说,这可能还不够。
这些问题已通过以下开发得到部分解决:
- rclcpp WaitSet: rclcpp 的 WaitSet 类允许直接等待订阅、定时器、服务服务器、动作服务器等,而不是使用 Executor。它可用于实现确定性的、用户定义的处理序列,可能同时处理来自不同订阅的多条消息。examples_rclcpp_wait_set 软件包提供了几个使用这种用户级等待集机制的示例。
- rclc 执行器: 这个为 micro-ROS 开发的 C 客户端库 rclc 中的执行器为用户提供了对执行过程的细粒度控制。
Callback groups:
考虑避障系统,我们有两个节点。第一个节点(节点 1)有两个回调:callback1 和callback2。回调 1 从摄像头获取图像,而回调 2 则检测图像中的物体。第二个节点执行激光雷达处理。
在高负载条件下,数据输入频率很高,回调 2 可能需要等待回调 1 执行完毕后才能访问共享数据。如果回调 1 需要很长时间才能完成,它可能会锁定共享数据,从而阻止回调 2 访问这些数据。由于 callback2 无法访问共享数据,这可能会导致系统延迟和效率低下。在 callback2 等待期间,其他任务(如定时器事件或来自其他主题的传入数据)也可能被延迟。为了解决这个问题,我们可能需要实施调度或优先级策略,以确保重要任务能及时访问共享资源。
解决这一问题的方法之一是使用回调组。通过将节点 1 callback1 和节点 1 callback2 置于同一个互斥回调组中,我们可以告诉系统它们不能同时运行。这意味着执行器不会尝试同时执行它们,从而释放了线程池中的其他线程,以便在等待条件满足的同时执行其他任务。这有助于防止因一个回调长时间锁定共享资源而导致的系统延迟和低效。
ROS2 中的执行器在调度方面可能会遇到瓶颈,这有几个原因。
首先,执行器与底层操作系统的调度机制没有直接接口。这意味着执行器无法充分利用操作系统的调度功能,如优先处理某些任务或为某些进程分配更多的 CPU 时间。
此外,执行器用来管理异步通信的 rmw(ROS 中间件)和 rcl(ROS 客户端库)WAIT SET INTERFACE(等待集接口)也可能效率低下。该接口依赖于 “轮询 ”机制,即执行器持续检查新消息或事件。这会导致 CPU 占用率高和性能降低,尤其是在高吞吐量系统中。
通过观察上述流程图,我们可以发现没有办法对传入的回调进行优先级排序或分类。此外,它也没有利用底层操作系统调度程序的实时功能,对执行顺序进行更精细的控制。这种行为的总体影响是,对时间要求较高的回调可能会错过截止日期,并且由于其服务时间晚于非关键回调而表现不佳。
多个小组正试图解决这一问题。PiCAS 基于 “任务链 ”的概念开发了优先级驱动的链式感知调度,链中的每个任务都依赖于前一个任务的完成。这样,调度器就能根据任务链的重要性和延迟对整个系统的潜在影响来确定任务的优先级。基于环形缓冲区的无锁 ROS 2 执行器。Micro-ROS 提出的基于预算的实时执行器。
在这篇文章中,我们探讨了 ROS2 中与并发相关的一些关键概念和接口,包括执行器和回调组。我们看到了这些接口如何帮助提高 ROS2 系统的可扩展性和实时性能。
值得注意的是,并发和实时性能是一个复杂的话题,有许多因素会影响 ROS2 系统的行为。不过,通过了解本篇文章中讨论的原理和界面,我们可以更好地设计和优化系统,以满足我们的需求。
参考文档
https://docs.ros.org/en/foxy/Concepts/About-Executors.html
https://medium.com/@nullbyte.in/ros2-from-the-ground-up-part-5-concurrency-executors-and-callback-groups-c45900973fd2
https://nicolovaligi.com/articles/concurrency-and-parallelism-in-ros1-and-ros2-application-apis/