文章介绍ROS通讯中基于发布/订阅模型的话题模式,默认情况下的消息发送和接收处理,以及如何引入多线程来提高程序性能。

通信机制

ROS常用的通信机制包含话题发布/订阅模型的话题模式与服务器/客户端的服务模式,两种模式的差异如下表

条目 话题模式 服务模式
通信模型 发布/订阅 客户端/服务器
反馈机制
底层协议 ROSTCP/ROSUDP ROSTCP/ROSUDP
缓冲区
实时性
节点关系 多对多 一对多(一个Server)
使用场景 弱逻辑处理,多数据传输 强逻辑处理,少数据传输
此处会针对话题模式做一些原理介绍与其多线程的应用。下面是ROS中一些基本的中英文技术词汇的对照:
中文 消息 话题 节点 发布者 订阅者 回调函数
英文 message topic node publisher subscriber callback

话题模式

话题模式是有多个发布者与多个订阅者参与的异步通信模式,他们通过节点管理器(ros master)注册信息,建立连接和通信。
同一节点的所有发布者发布的消息被存储在全局的消息队列中,根据时间戳进行取舍。所有的订阅者被所在节点调用时(e.g. ros::spin())去消息队列中取得订阅话题的消息并执行回调函数。
此处可以理解为生产者消费者模型,发布者负责生产消息存入消息队列中,订阅者负责从消息队列取出消息并消费(回调函数),这里通过消息队列作为数据的缓存区来解耦发布者与订阅者。

跟经典的生产消费模型不同的是,同一节点的发布者与订阅者都有自己专门的消息队列去存储消息,主要是因为发布者是基于话题发送,而不是直接向订阅者发送,所以必须要有一个消息队列来存放发布的消息,以供订阅者来获取。而且这个消息队列的好处是在网络差、带宽小、延时高的时候,保证数据不容易丢失。因为发布者与订阅者不一定在同一台主机上,因此消息需要通过网络来交换。但是网络的性能时好时坏,如果订阅者没有消息队列,那么每次运行回调函数前都要先通过网络取回消息,然后才能处理。当网络很差时,就会让系统堵塞。而有消息队列的话,订阅者就可以一边处理队列中的消息,一边通过网络缓存新的消息,而不用每次处理消息前都要临时去读一个回来。这样就增加了系统的可靠性。
这里会引入的一个问题,同一话题的消息在发布消息队列发送到订阅消息队列的时候会产生拷贝,造成内存浪费。针对发布节点与订阅节点在同一台机器的情况,ROS引入了nodelet来避免这个开销。

发布者的消息队列

节点中的全局消息队列会将所有发布者的消息存储起来,此处的消息队列可以理解为一个线程安全,根据时间戳排序的消息存储器,如:

消息队列的长度,即一个消息队列中能存储多少条消息是由创建发布者的时候定义,如

// chatter_pub为发布者对象实例,std_msgs::String为消息类型,“chatter”为话题名称,1000为话题的消息队列长度
ros::Publisher chatter_pub = n.advertise<std_msgs::String>("chatter", 1000);

此处的1000即为消息队列中会缓存多少条信息。 当同一话题所有发布者生产的频率大于对应订阅者消费的频率时,发布的新消息存入消息队列时,会将时间戳最老的消息丢弃掉。如上图中,如果定义的队列长度为5,当前消息队列中的消息数也为5,当新消息被发布到消息队列中时,消息1(timestamp = t1)会被丢弃以便储存新的消息。
同一节点的不同发布者有可能会定义不同的消息队列长度,所以节点的全局消息队列应该是取所有发布者队列长度之和。

// Node "test"
// 全局消息队列长度为3
ros::Publisher pub1 = n.advertise<std_msgs::Int32>("pub1", 1);
ros::Publisher pub2 = n.advertise<std_msgs::Int32>("pub2", 2);
int count = 0;
pub1.publish(count++); // 队列中消息为[pub1-1]
pub2.publish(count++); // 队列中消息为[pub1-1, pub2-2]
// pub1的队列长度为1,为了储存新的信息(pub1-3),之前的信息(pub1-1)被丢弃
pub1.publish(count++); // 队列中消息为[pub2-2, pub1-3]
pub2.publish(count++); // 队列中消息为[pub2-2, pub1-3, pub2-4]
// pub1的队列长度为2,为了储存新的信息(pub2-5),之前的信息(pub2-2)被丢弃
pub2.publish(count++); // 队列中消息为[pub1-3,pub2-4,pub2-5]

订阅者的消费队列

订阅者的消费队列与发布者中的一样,只是为了储存消息数据。这些消息被创建订阅者注册的回调函数所消费,如

// 收到消息时调用的函数,std_msgs::String为消息类型
void chatterCallback(const std_msgs::String::ConstPtr& msg)
{ROS_INFO("I heard: [%s]", msg->data.c_str());
}
// chatter_sub为订阅者实例,“chatter”为话题名称,1000为话题的消息队列长度,chatterCallback为回调函数
ros::Subscriber chatter_sub = n.subscribe("chatter", 1000, chatterCallback);

ROS在处理回调函数时,并不是消息传来就立刻进行处理的,而是在程序调用spinOnce()和spin()时统一调用。

ros::spin()和ros::spinOnce()为消息回调处理函数。它俩通常会出现在ROS的主循环中,程序需要不断调用ros::spin() 或 ros::spinOnce(),两者区别在于前者调用后不会再返回,也就是你的主程序到这儿就不往下执行了,而后者在调用后还可以继续执行之后的程序。
默认情况下,节点中的主程序(e.g. while循环中写在ros::spinOnce()之前的执行程序)和所有回调函数都是串行的,当节点无法及时处理回调函数时,这些未被处理的回调函数就会按照先后的顺序放入一个队列,该回调队列的长度就是定义订阅器时的消息队列长度了。节点会先处理时间辍最小的回调函数,然后依次处理队列中所有的回调函数。但是,当发布器的频率过快时,会出现未被处理的回调函数数量超过队列长度,它会自动丢弃时间辍最长(最老的)的回调函数。下面的案例介绍了一个节点两个订阅者的例子:

// "chatter"话题的发布频率为10Hz
void callback1(const std_msgs::String::ConstPtr& msg)
{// 延迟一秒ros::Duration(1.0).sleep();ROS_INFO("callback1 : I heard: [%s]", msg->data.c_str());
}void callback2(const std_msgs::String::ConstPtr& msg)
{ROS_INFO("callback2 : I heard: [%s]", msg->data.c_str());
}int main(int argc, char **argv)
{// 订阅者sub1的消息队列长度为3 (一个调用周期内只会处理三个消息)ros::Subscriber sub1 = n.subscribe("chatter", 3, callback1);// 订阅者sub2的消息队列长度为1 (一个调用周期内只会处理一个消息)ros::Subscriber sub2 = n.subscribe("chatter", 1, callback2);while (ros::ok()){ROS_INFO("======================Start of while loop=======================");ros::spinOnce();}return 0;
}

上述的代码输出如下

在一个调用周期内,callback1和callback2方程为串行(阻塞)运行,callback1会被调用三次,callback2被调用一次 (由定义的消息队列长度决定)。两次调用callback2之间阻塞调用了四次callback1(次数取决与所有回调函数被放入队列中的顺序),导致其接收的数据差为40。
如果节点的不同回调方程(包括主函数中调用的方程)同时花费大量的计算时间,在单线程下很有可能造成某个回调方程被调用时所用到的数据失效。
这类问题在硬件计算平台允许的情况下,可以引入多线程异步(非阻塞)处理回调方程来解决。

最新数据处理

在了解了发布者与订阅者的消息队列机制后,可以同时更改双方消息队列的长度为1来达到回调函数处理最新的消息数据。这样保证发布节点的消息只有一个,订阅节点的消息也只有一个,所以每当回调函数被调用时只能获取到最新的数据。
这适用于一些对数据实时性要求比较高(不考虑通讯和函数调用延迟)的功能模块。

小结

  • 节点会拥有全局的发布者消息和订阅者消息队列。
  • 发布者消息队列用来缓存节点内所有发布的消息,并通过之前在节点管理器中注册的信息发布到对应话题。
  • 订阅者消息队列用来缓存节点内所有订阅的消息,并在消息处理函数(e.g. ros::spinOnce())被调用时串行调用对应的回调函数。
  • 话题模式通过消息队列缓存消息的形式解耦了发布者与订阅者,彼此之间不需要知道对方的设置或者执行,发布者只需要发布消息到发布队列,而订阅者只需要通过回调函数消费订阅队列中的消息。

多线程应用

上述回调函数串行调用可以通过引入多线程的方式,并发处理节点内不同的函数。
在了解具体的做法之前,下面章节先简单介绍一下进程和线程的概念。

进程和线程

进程是对操作系统上正在运行程序的一个抽象,线程是进程中的指令执行流的最小单位,是CPU调度的基本单位。简而言之,一个程序至少有一个进程,一个进程至少有一个线程。
进程有独立的地址空间,线程有自己的堆栈和局部变量,但线程之间没有单独的地址空间,一个线程死掉就等于整个进程死掉,所以多进程的程序要比多线程的程序健壮,但在进程切换时,耗费资源较大,效率要差一些。相对进程而言,线程是一个更加接近于执行体的概念。它可以与同进程中的其他线程共享数据,但拥有自己的栈空间,拥有独立的执行序列,对于一些要求同时进行并且又要共享某些变量的并发操作,只能用线程,不能用进程。
在ROS中可以理解为一个可运行的节点(定义了main()方程)即为一个进程,而默认的执行流程为单线程,即只有一个执行体串行执行所有定义的任务(函数)。

ROS中的多线程

ROS提供的用于处理callback的线程机制。接口包括自旋,CallbackQueue队列处理,time callback等。因接口提供的简单易用,下面只做简单介绍:
ros::MultiThreadedSpinner是阻塞式的spinner(程序执行到这里不会在往下执行,无法使用在while循环中), 类似于ros::spin(), 你可以在它的构造函数中指定线程数量, 但如果不指定或者设为0, 它会根据你的CPU内核数创建线程。

ros::MultiThreadedSpinner spinner(4); // Use 4 threads
spinner.spin(); // spin() will not return until the node has been shutdown

ros::AsyncSpinner不是阻塞式的,类似ros::spinOnce(),拥有start()和stop()两个函数, start()等待在那个点上的所有回调,stop()停止回调,并且在销毁时自动停止.

ros::AsyncSpinner spinner(4); // Use 4 threads
spinner.start();
while (ros::ok())
{// Do something
}
ros::waitForShutdown();

ROS默认只有一个全局的回调队列,每当调用ros::spinOnce()时统一调用在列的回调函数。当一个节点的订阅者比较多,要处理的回调函数不在一个频率上调用时,可以自定义消息队列来独自调用。ros::CallbackQueue可以设置自定义队列,这个可使用所有订阅、服务、定时器等。自定义的队列不是roscpp的默认队列,意味着ros::spin()和ros::spinOnce()不会处理这些回调,你需要单独处理这些回调。

The CallbackQueue class has two ways of invoking the callbacks inside it: callAvailable() and callOne(). callAvailable() will take everything currently in the queue and invoke all of them. callOne() will simply invoke the oldest callback on the queue.`

ros::CallbackQueue callback_queue;
ros::NodeHandle nh;
ros::SubscribeOptions ops=ros::SubscribeOptions::create<std_msgs::String>("chatter",1, state_callback, ros::VoidPtr(),&callback_queue); //指定一个自定义队列
ros::Subscriber listen_state= nh.subscribe(ops);   // 必须添加,ros::SubscribeOptions自定义的队列才能回调
ros::AsyncSpinner state_spinner(1,&callback_queue);

或者

ros::CallbackQueue callback_queue;
void callbackThread()
{  ros::NodeHandle n;  while (n.ok())  {  callback_queue.callAvailable();}
}
ros::NodeHandle n;
ros::NodeHandle nh;
nh.setCallbackQueue(&callback_queue);
ros::Subscriber sub = n.subscribe("chatter", 3, callback1);
ros::Subscriber sub2 = nh.subscribe("chatter", 1, callback2);
// 开一个线程单独调用callback_queue中的回调函数
std::thread t1(callbackThread);
while(ros::ok())
{ROS_INFO("================Start of while loop===========================");ros::Duration(1.0).sleep();ros::spinOnce();
}
t1.join();

将多线程AsyncSpinner引入上述案例中,

// "chatter"话题的发布频率为10Hz
void callback1(const std_msgs::String::ConstPtr& msg)
{// 延迟一秒ros::Duration(1.0).sleep();ROS_INFO("callback1 : I heard: [%s]", msg->data.c_str());
}void callback2(const std_msgs::String::ConstPtr& msg)
{ROS_INFO("callback2 : I heard: [%s]", msg->data.c_str());
}int main(int argc, char **argv)
{// 订阅者sub1的消息队列长度为3 (一个调用周期内只会处理三个消息)ros::Subscriber sub1 = n.subscribe("chatter", 3, callback1);// 订阅者sub2的消息队列长度为1 (一个调用周期内只会处理一个消息)ros::Subscriber sub2 = n.subscribe("chatter", 1, callback2);// 异步模式ros::AsyncSpinner spinner(2);while(ros::ok()){ROS_INFO("================Start of while loop===========================");ros::Duration(1.0).sleep();spinner.start();}return 0;
}

输出如下:

可以看到每个循环中(间隔一秒),callback1被调用了一次,callback2被调用了十次。这里while循环中的ros::Duration(1.0).sleep(),callback1和callback2三个执行任务会并发执行。

线程安全

  • 线程安全是多线程编程时的计算机程序代码中的一个概念。在拥有共享数据的多条线程并行执行的程序中,线程安全的代码会通过同步机制保证各个线程都可以正常且正确的执行,不会出现数据污染等意外情况。
  • 多个线程有对同一个全局变量进行写的操作时,会出现线程安全问题。
  • 每个进程中访问临界资源(比如全局变量等公用资源)的那段程序(代码)称为临界区(临界资源是一次仅允许一个进程使用的共享资源,如全局变量等),也称为临界段。
  • ROS本身的发布者与订阅者是线程安全的,所以在引入多线程后,不需要做额外的保护措施。如在上例中的两个回调函数中,可以用同一个发布者发布消息。

但是如果是操作用户自定义的一些临界资源,需要加锁保护。如

// 临界资源
int global_counter = 0;
// 互斥锁保护临界资源
std::mutex mtx;
// "chatter"话题的发布频率为10Hz
void callback1(const std_msgs::String::ConstPtr& msg)
{// 延迟一秒ros::Duration(1.0).sleep();ROS_INFO("callback1 : I heard: [%s]", msg->data.c_str());// 需要加锁保护global_countermtx.lock();global_counter++;mtx.unlock();
}void callback2(const std_msgs::String::ConstPtr& msg)
{global_counter++ROS_INFO("callback2 : I heard: [%s]", msg->data.c_str());// 需要加锁保护global_countermtx.lock();global_counter++;mtx.unlock();
}int main(int argc, char **argv)
{// 订阅者sub1的消息队列长度为3 (一个调用周期内只会处理三个消息)ros::Subscriber sub1 = n.subscribe("chatter", 3, callback1);// 订阅者sub2的消息队列长度为1 (一个调用周期内只会处理一个消息)ros::Subscriber sub2 = n.subscribe("chatter", 1, callback2);// 异步模式ros::AsyncSpinner spinner(2);while(ros::ok()){ROS_INFO("================Start of while loop===========================");ros::Duration(1.0).sleep();spinner.start();}return 0;
}

上述的global_counter由于会被并发的两个线程访问,需要引入mtx(std::mutex)来保证线程安全。

线程的数量

虽然一个进程内使用多个线程理论上会提升程序性能,但是由于受到物理计算平台(CPU)的限制,过多的线程会让系统更加频繁的切换所执行的线程,带来大量的切换开销。
常规的合理数量为:
CPU密集型(计算需求大)= CPU的核数*个数
IO密集型(文件读写操作) = 可以设置的大一些

相关资料

  • Callbacks and spinning - http://wiki.ros.org/roscpp/Overview/Callbacks%20and%20Spinning
  • ROS nodelet - http://wiki.ros.org/nodelet
  • 通信机制 -https://blog.csdn.net/zhi_neng_zhi_fu/article/details/100139851
  • 回调函数处理与回调队列 - https://blog.csdn.net/Azahaxia/article/details/113934774
  • 多线程应用 - https://blog.csdn.net/Qm13416479599/article/details/90265676
  • ROS中的多线程 - https://blog.csdn.net/tobebest_lah/article/details/103050076
  • 多线程数量 - https://blog.csdn.net/eternal_yangyun/article/details/103236125

ROS话题模式及多线程相关推荐

  1. ROS学习笔记6(理解ROS话题)

    文章目录 1 启动 1.1 roscore 1.2 turtlesim 1.3 turtel_teleop_key 2 ROS 话题 2.1 使用rqt_graph 2.2 使用rostopic 2. ...

  2. 理解ROS话题---ROS学习第5篇

    文章目录 1. 通过键盘控制turtle 2. ROS话题 2.1 使用rqt_graph 2.2 介绍rostopic 2.3 使用功能rostopic echo 2.4 使用rostopic li ...

  3. python使用rabbitMQ介绍五(话题模式)

    一.模式介绍 话题模式(Topic)基本思想和路由模式是一样的,只不过路由键支持模糊匹配,符号"#"匹配一个或多个词,符号"*"匹配不多不少一个词 话题模式相当 ...

  4. Python写ROS话题

    Python写ROS话题 导入ROS模块 发送话题 接收话题 第一种方式:rospy.Subscriber 第二种方式:rospy.wait_for_message 完整程序 多线程处理同时接受多个话 ...

  5. 并发编程含义比较广泛,包含多线程编程、多进程编程及分布式程序等 目录 1. “共享内存系统”,消息传递系统”。 1 1.1. 共享模式 多进程 多线程 1 1.2. Actor消息模式 事件驱动 2

    并发编程含义比较广泛,包含多线程编程.多进程编程及分布式程序等 目录 1. "共享内存系统",消息传递系统". 1 1.1. 共享模式 多进程 多线程 1 1.2. Ac ...

  6. 十八、ROS话题名称设置

    文章目录 1. ROS话题名称重映射 1.1 简介 1.2 实现方法 1.2.1 rosrun设置话题重映射 1.2.1.1 方案一 1.2.1.2 方案二 1.2.2 launch文件设置话题重映射 ...

  7. ROS话题可视化工具PlotJuggler

    ROS话题可视化工具PlotJuggler 安装 使用 PlotJuggler,是一个基于Qt的应用程序,允许用户加载,搜索和绘图数据.可以实现数据的实时绘制,在线读取.保存等功能. 安装 ros用户 ...

  8. 【ROS话题通信】发布者和订阅者

    前言 本文记录ROS话题通信的学习过程,便于后续复习.首先明确,ROS中的话题通信,在ROS通信中非常重要,实现了分布式发布接收消息,也是实现了不同编程语言间的解耦,下面记录下自己学习过程中的相关代码 ...

  9. 【ROS入门-4】嘴对嘴讲解ROS的核心概念——ROS话题通信机制

    文章目录 前言 ROS系列文章 ROS的通信机制 话题(topic) 发布者 订阅者 消息(Message) 用C++来写话题通信的代码 发布者 订阅者 使用rqt_graph 源码附录 引用说明 参 ...

最新文章

  1. JsonUtils fasterxml jackson
  2. [Linux]Shell的运算符和特殊变量
  3. git工具tig用法
  4. lambda函数+map函数的结合使用 list(map(lambda x: list(x)[0], X))
  5. 【Java】JScrollPane的内容显示与刷新问题
  6. 飞鸽传书技术更新换代非常快
  7. 能在ARC下跑的ASIHTTPRequest框架(以及升了5.0,用了ARC后N多开源框架不能用的解决方案) .
  8. autoLayout+sizeClass屏幕适配
  9. mysql timeout expired处理
  10. java中的包装类和基本类型_java中基本类型和包装类型实践经验
  11. 携程Apollo 启动失败遇到的坑
  12. Moscow Pre-Finals Workshop 2020 - Legilimens+Coffee Chicken Contest (XX Open Cup, Grand Prix of Nanj
  13. 龙威PS305D维修案例收集
  14. 论文阅读_TASE: Reducing Latency of Symbolic Execution with Transactional Memory
  15. java hl7v3_hl7 java 解析
  16. win7下搭建外网svn服务器
  17. Python创建分栏排版的Word文档
  18. mac添加应用程序到启动台_如何在Mac上启动应用程序
  19. 2023年全国最新二级建造师精选真题及答案56
  20. gnuplot 等高线脚本

热门文章

  1. 基于Springboot+Netty实现Web聊天室
  2. js向ul中写html语言,javascript操作ul中li的方法
  3. 对Lua中Userdata的理解
  4. 视频切片后自动上传至国内免费CDN,无成本加速视频播放
  5. 大学“电路分析基础”试题合集第七章
  6. 简单的反汇编beam文件
  7. Vue.js的开发流程
  8. linux 进程最小化,如何实现Linux系统最小化(4)
  9. Spring Security简单理解
  10. SpringSecurity实现过滤器