ROS源码学习 二、线程池
2021SC@SDUSC
目录
1.写在前面
2.ROS线程池概述
3.ROS线程池模型
4.ROS线程池源码详解
5.总结
1.写在前面
ROS作为一个操作系统,其职责是协调具有不同功能的node之间的通讯与合作.要想实现这个目标,最重要的方面之一就是在合理利用资源的条件下实现较高的并发性.在ROS的内核ros-core中,负责通讯的服务器正是通过将监听到的socket连接经过封装后调用线程池的相关方法来实现并发通讯的.
2.ROS线程池概述
在操作系统这门课上我们都学习了线程的概念,线程在本文中不妨理解为实现了部分数据共享,可以分组统一管理的轻量级进程.线程在操作系统上大体上有一对一、多对一和多对多三种模型,其区别在于每个用户线程与实际处理机(内核线程)的对应关系.
(三种线程模型的示意图)
一对一模型将用户线程与内核线程一一对应,其优点是可以增加程序运行速度,提高程序的并行能力,但是对于处理机资源不能很好的利用.多对一模型显然并不能加速计算,但是仍然提供了并发能力使得程序能够同时处理多个用户的请求,使得分时系统的设计目标得以实现.多对多模型则是结合了两者的优点,将处理机放入缓冲池中择机分配给用户线程,这种设计既能减少对处理机实际数量的需求,又能尽量满足每个用户线程的计算要求,使程序具有较高的并发性,是一种较为理想的选择.本文中即将讨论的线程池即是基于多对多模型的.
经过查阅资料(见JVM线程)发现,Java中的线程在Linux/Windows系统中是一对一模型,是操作系统可感知的,且一个Java线程对应一个内核线程;在Solaris中默认使用Light Weighted Process,LWP的方式通过调度器激活等策略将Java线程映射到系统线程上.因此,我们可以认为Java线程并不是多对一模型,而是一对一模型或者是仿真一对一模型.
之前已经提到,一对一模型对资源的利用程度并不是很好,因此ROS在软件层面通过编写线程池的方式将其实际使用变为多对多模式,接下来将进行详细解读.
3.ROS线程池模型
ROS线程池的类与依赖关系如图所示,接下来对各个类的结构的方法进行大体的描述.
(1)Task:Task为ThreadPool的内部接口,代表需要线程池分配线程执行的作业.也就是说,需要使用线程池中线程执行的方法都需要封装继承了Task接口的类并重写为run方法.
(2)InterruptableTask:InterruptableTask为继承了Task的ThreadPool内部接口,顾名思义,InterruptableTask为接受被打断的作业,在被打断时可以调用其中断处理方法.
(3)Poolable:Poolable为ThreadPool的内部类,是线程池容纳的对象,即基本的工作单元.Poolable维护成员shuttingDown和thread,shuttingDown负责标记是否应当停止工作,thread是用于实际执行task的对象.
(4)ThreadPool是线程池类,负责对外提供Poolable对象并对其进行统一管理,回收完成工作的Poolable对象.ThreadPool的数据成员有用于标记线程组的ThreadGroup、标记线程池最大容量的maxSize、标记当前线程池容量的num,另外还有记录待分配Poolable对象的空闲池waitingThreads列表,管理运行中Poolable对象的runningThrads列表,以及已提交但暂未分配Poolable对象的task列表.
总而言之,ROS将一对一模型映射为多对多模型的线程池运行逻辑为:
- 新建线程池对象
- 将任务封装为线程池支持的task(继承Task接口)
- 将task提交给线程池,线程池根据不同提交方式分配Poolable对象(下文称为worker)运行任务
- 在worker运行任务完成后由线程池负责管理与回收
(作业处理逻辑以及worker的生命周期示意图)
4.ROS线程池源码详解
(1)Task/InterruptableTask接口
public interface Task{void run() throws Throwable;}public interface InterruptableTask extends Task{void shutdown() throws Throwable;}
前文已经提到,想要提交给线程池的任务必须封装为Task,因此功能类需要实现Task接口,也就是重写run方法,以run方法作为业务逻辑的入口.另外还有InterruptableTask接口,如果想要在出现异常时中断作业,可以继承这个接口,并重写shutdown方法以进行异常处理或一些收尾操作.
(2)Poolable类
private class Poolable{private volatile boolean shuttingDown;private Task task;private Thread thread;Poolable(ThreadGroup pGroup, int pNum){thread = new Thread(pGroup, pGroup.getName() + "-" + pNum) {public void run(){while (!shuttingDown){final Task t = getTask(); if (t == null) {try{synchronized (this){if (!shuttingDown && getTask() == null) {wait(); }}} catch (InterruptedException e){// Do nothing}} else{try{t.run(); resetTask(); repool(Poolable.this);} catch (Throwable e){remove(Poolable.this);Poolable.this.shutdown(); resetTask(); }}}}};thread.start();}synchronized void shutdown() {shuttingDown = true;final Task t = getTask(); if (t != null && t instanceof InterruptableTask) {try{((InterruptableTask) t).shutdown();} catch (Throwable th){}}task = null;synchronized (thread){thread.notify(); }}private Task getTask(){return task;}private void resetTask(){task = null;}void start(Task pTask){task = pTask;synchronized (thread){thread.notify();}}}
Poolable类的数据成员在上文已经提及过了,下面将重点对其方法进行分析.
1、首先考察其构造方法,构造方法接收一个线程组对象和一个整型对象.首先构造方法使用参数创建一个线程对象,指定其线程组和线程号.Java的Thread对象通常需要继承自Thread或使用实现了Runnable接口的对象作为参数实例化以完善其run方法(注意此处的run方法与Task的run方法在本质上并不相同!),而在此处选择了在构造方法后附加其实现方法这种语法糖来实例化Thread对象.
由于构造方法实际上只完成了初始化Thread这一项工作,接下来我们将分析Thread类是如何完善run方法以使其能够行使worker的职责的.在run方法内部,首先是一个while循环,当shuttingDown为false时循环执行.在一轮循环中,使用一个局部常变量t来引用Task对象,也就是说在一轮while循环中,task是不允许改变的.由此我们也可以判断,worker实际上是通过run方法内部的一轮while循环来完成一个task的.
接下来进行判断,在任务为空时,检查是否应被中止,若此worker尚未被设置为终止且目前暂无作业等待执行(getTask == null),则调用wait()方法.因为此处位于synchronized块中,因此调用此方法等价于将worker的thread对象的控制流阻塞于此,其目的是在空闲时避免自旋等待消耗处理机资源.
若成功获取到了task,则调用task的run方法(业务逻辑),完成且返回后调用resetTask方法,将此worker的task清空.在清空task后调用线程池的repool方法,将自身其交还给线程池,由线程池判断最终去留.如果在此发生异常,则调用线程池的remove方法,将此worker移出线程池的引用列表(将不会再为此异常worker分配task),然后调用shuttingDown方法将状态设置为停用,并将worker的task变量清空.
在初始化完成Thread对象后,立刻执行该线程,开始此worker被分配的作业.
2、注意到shuttingdown方法被synchronized修饰,即此方法只能互斥调用,具体作用在稍后分析.首先此方法将worker的shuttingDown变量设置为true,这么做的后果是在worker的thread线程执行到while循环时或检测到task为空时直接退出,在以后此worker变不再有任何执行task的能力.也就是说,shuttingDown并不是结束task,而是结束线程池中一个worker的生命.
在设置了停用标志后,获取这个worker正在执行的task,如果这个task实现了InterruptableTask接口,那么则会调用其shutdown方法来进行后续处理.在完成对task的处理后,将这个worker的task设置为null.
考虑到不仅会对运行的worker调用shuttingDown方法,还有可能对空闲的worker调用此方法,而空闲的worker中thread阻塞在了方法中,对其简单的移除索引列表并设置停用位不仅其无法正确响应,还会丢失引用使得资源无法快速释放.所以方法在此处进入synchronized代码块获取阻塞的thread所释放的锁,在同步区调用thread的notify方法以唤醒此阻塞的线程,让其进入下一轮while循环判断为假后退出.
讲到这里,我们就可以明白为何这段代码中既使用synchronized修饰方法也要用synchronized修饰代码块了,其根本原因在于防止数据不一致问题.我们知道,锁的用处有同步也有互斥功能,而在这个方法中我们要明白,代码块中的synchronized是业务逻辑要求的,其作用并非制造互斥区,而仅仅在于获取此worker的线程对象放弃的对象锁,从而唤醒这个线程使其退出循环,起到同步的效果;而修饰方法的synchronized关键字则是为了实现互斥访问,防止数据产生不一致现象:假设方法无synchronized修饰,那么从多个地点同时调用这个方法,可能会有如下现象发生:
- A线程占有处理机,执行此方法并调用了task的shutdown方法(比如效果为向磁盘写入日志),然后处理器被调度至其他线程
- B线程获得了处理机并调用了task的shutdown方法,也向磁盘写了日志,然后处理机被调度至其他线程
- 重复这个模式
- 磁盘写了入了多份相同的日志,甚至多个shutdown方法并发执行导致日志信息无法阅读
因此,使用synchronized修饰方法能够保证方法的中断处理阶段只执行一次,不是业务要求,而是并发编程的内在要求.
3、getTask方法简单返回worker所承担的任务引用、resetTask则将worker的task设为空.
4、start方法用于在分配task后启动worker,首先使用synchronized关键字获取到线程的锁,然后调用notify唤醒,之后task便得以执行,起到了开始任务的功能.
(3)ThreadPool类
public ThreadPool(int pMaxSize, String pName){maxSize = pMaxSize;threadGroup = new ThreadGroup(pName);}
1、首先来分析ThreadPool的构造方法,该方法接收整型变量pMaxSize和字符串pName,其中pMaxSize规定了线程池的最大容量,pName则用来初始化ThreadGroup并为其命名,在同一个线程池worker的Thread对象都属于一个ThreadGroup.
private synchronized void remove(Poolable pPoolable) {runningThreads.remove(pPoolable);waitingThreads.remove(pPoolable);}
2、remove方法调用ArrayList数据成员runningThreads和waitingThreads的remove方法,将一个worker从中删除.
void repool(Poolable pPoolable){boolean discarding = false;Task task = null;Poolable poolable = null;synchronized (this) {if (runningThreads.remove(pPoolable)) {if (maxSize != 0 && runningThreads.size() + waitingThreads.size() >= maxSize) {discarding = true; } else{waitingThreads.add(pPoolable); if (waitingTasks.size() > 0) {task = (Task) waitingTasks.remove(waitingTasks.size() - 1); poolable = getPoolable(task, false);}}} else {discarding = true;}if (discarding){remove(pPoolable); }}if (poolable != null) {poolable.start(task); }if (discarding) {pPoolable.shutdown();}}
3、repool方法为上文中worker完成任务后调用的方法.首先声明布尔变量discarding并设为false用于标记是否丢弃此worker.然后声明Task和Poolable的引用,接着进入互斥区.
因为此时worker刚刚完成task,所以调用runningThreads的remove方法将worker从中移除,如果成功移除代表worker成功完成作业,如果worker在完成作业中出现异常,则会通过shuttingDown方法将worker移除,此时调用remove则会返回false.
当返回值为true时,检查线程池的worker数量是否溢出(包括空闲的和运行的).若数量溢出,则将此worker标记为弃用,否则将其加入waitingThreads列表,并检查waitingTasks中是否有排队的作业.如果有则以LIFO的方式从排队作业列表中取出一个task,然后调用getPoolable方法为尝试其分配一个worker.当返回值为false时,则将此worker标记为弃用.
接下来检测worker是否被标记为弃用,如果是则从两个列表中移除.此时已退出互斥区.接下来检查poolable引用是否为null,仅当worker成功返回且调用getPoolable尝试为排队作业分配worker成功时此引用不为空,然后在此处开始这个从排队列表中取出的作业.
在最后,如果弃用位为true,则调用这个worker的shutdown方法.这些操作看上去与worker的thread成员的run方法中的异常处理分支重复,然而要注意到的是并非只有产生了异常的worker才需要丢弃,溢出的worker以及线程池析构时也需要销毁worker.此处处理的正是worker数量过多时正常返回的worker.
注意到这个方法存在互斥区,设置为互斥的目的在于引用忙碌和闲置worker的列表类别为ArrayList,并不是线程安全的,因此如果多个地点同时调用时会导致if分支被多次测试通过,从而导致worker被多次加入线程池、作业排队列表被多次弹栈(也并非线程安全的).
private synchronized Poolable getPoolable(Task pTask, boolean pQueue) {if (maxSize != 0 && runningThreads.size() >= maxSize) {if (pQueue) {waitingTasks.add(pTask); }return null; }Poolable poolable;if (waitingThreads.size() > 0) {poolable = (Poolable) waitingThreads.remove(waitingThreads.size() - 1); } else{poolable = new Poolable(threadGroup, num++); }runningThreads.add(poolable); return poolable; }
4、getPoolable方法可以看作ThreadPool的核心方法,其作用是向线程池提交作业以运行.这是一个互斥方法,因为作业等待队列不是线程安全的.实际上,ThreadPool的三个列表都不是线程安全的,因此涉及到对这三个列表CRUD的操作代码都需要考虑是否是临界区.
此方法接收Task对象作为提交的作业,接收一个布尔变量标记是否接受排队.读取参数后,首先检查目前线程池的正在运行的作业数量是否超过上限,如果超过上限则根据排队策略选择进入队列或者离开,无论是那种排队策略,此刻都没有产生就绪状态的worker,因此返回值为null.
如果线程池仍有空间,则首先检查是否有空闲的worker,如果有则返回一个,如果没有则新建一个.因为这个worker马上将要执行作业,因此将其加入runningThreads列表,最后将其返回.
为了保证安全,这个方法也是互斥的.
public boolean addTask(Task pTask) {final Poolable poolable = getPoolable(pTask, true);if (poolable != null){poolable.start(pTask);return true;}return false;}public boolean startTask(Task pTask) {final Poolable poolable = getPoolable(pTask, false);if (poolable == null){return false;}poolable.start(pTask);return true;}
5、getPoolable并public的方法,想要提交任务要通过addTask或者startTask.
addTask调用getPoolable并允许排队,在能够立即运行时返回true,在稍后会执行为false;startTask调用相同的方法但不允许排队,在能够立即运行时返回true,不能运行则返回false.
综合来看,startTask更加注重实时性,而addTask则要求作业提交后早晚会被处理.
public synchronized void shutdown(){while (!waitingThreads.isEmpty()){Poolable poolable = (Poolable) waitingThreads.remove(waitingThreads.size() - 1);poolable.shutdown();}while (!runningThreads.isEmpty()){Poolable poolable = (Poolable) runningThreads.remove(runningThreads.size() - 1);poolable.shutdown();}}
6、需要注意此方法并非worker中的shutdown,此方法可以看做线程池的析构函数.调用此方法时会销毁每一个worker.同样是互斥方法,
public int getMaxThreads(){return maxSize;}public synchronized int getNumThreads(){return num;}
7、最后这两个方法简单返回线程池的最大容量和当前容量.
5.总结
ROS的线程池是ROS并发处理RPC得以实现的基础,而后者又是ROS-Core连结各类node,进行topic和service的调用时的底层实现,因此了解其线程池运行逻辑对于学习ROS是十分重要的.
另外,在学习线程池源代码的同时,我们看到了如何将一对一线程模型通过编程改变为多对多模型,既基本保持了原有的效率又节省了资源,也体会到了这样一种设计思想:当操作系统的实现不满足需求时,就编程对操作系统进行仿真(如Task代表程序,Poolable代表处理机,Pooable+运行的Thread代表进程),以较低的代价达到自己的目的.最后,在并发编程对时候一定要对线程安全十分敏感,不然一定会因小失大,看似提高了效率,实则埋下巨大的隐患.
ROS源码学习 二、线程池相关推荐
- 深入源码分析Java线程池的实现原理
转载自 深入源码分析Java线程池的实现原理 程序的运行,其本质上,是对系统资源(CPU.内存.磁盘.网络等等)的使用.如何高效的使用这些资源是我们编程优化演进的一个方向.今天说的线程池就是一种对 ...
- hibernate 并发获取session失败 空指针_高并发之|通过ThreadPoolExecutor类的源码深度解析线程池执行任务的核心流程...
核心逻辑概述 ThreadPoolExecutor是Java线程池中最核心的类之一,它能够保证线程池按照正常的业务逻辑执行任务,并通过原子方式更新线程池每个阶段的状态. ThreadPoolExecu ...
- Libuv源码分析 —— 8. 线程池
网络I/O 在 上一节 的学习中,我们已经搞明白了网络I/O的基本过程,并通过了解进程/线程间通信来熟悉这个流程.下面,让咱们学习线程池中的线程如何工作.并和主进程进行通信的吧! 线程池 Libuv ...
- PHP 源码学习之线程安全
PHP 源码学习之线程安全 了解线程安全之前,我们先回顾几点基础知识点,是我们后面分析学习的基础. 变量的作用域 从作用域上来说,C语言可以定义4种不同的变量:全局变量,静态全局变量,局部变量,静态局 ...
- Hbase Compaction 源码分析 - CompactSplitThread 线程池选择
目录 CompactSplitThread requestCompactionInternal方法 selectCompaction方法 requestCompaction方法 其他相关文章 Hbas ...
- Java 中 Integer 源码学习之缓存池了解
Java 中 Integer 源码学习之缓存池了解 面试题 new Integer(123) 与 Integer.valueOf(123) 的区别? new Integer(123) 每次都会新建一个 ...
- 从原理到实现丨手把手教你写一个线程池丨源码分析丨线程池内部组成及优化
人人都能学会的线程池 手写完整版 1. 线程池的使用场景 2. 线程池的内部组成 3. 线程池优化 [项目实战]从原理到实现丨手把手教你写一个线程池丨源码分析丨线程池内部组成及优化 内容包括:C/C+ ...
- 小豹子带你看源码:Java 线程池(二)实例化
承上启下:上一篇文章小豹子讲了我为什么想要研究线程池的代码,以及我计划要怎样阅读代码.这篇文章我主要阅读了线程池实例化相关的代码,并提出了自己的疑问. 3 千里之行,始于实例化 3.1 先创建一个线程 ...
- 从源码角度解析线程池中顶层接口和抽象类
摘要:我们就来看看线程池中那些非常重要的接口和抽象类,深度分析下线程池中是如何将抽象这一思想运用的淋漓尽致的. 本文分享自华为云社区<[高并发]深度解析线程池中那些重要的顶层接口和抽象类> ...
最新文章
- 550 Ip frequency limited
- 不用比较运算符及循环控制语句,判断int型的a、b两数的大小
- 【Samba】安装与配置
- gym 101858
- 国家有线网挂牌时间再度推迟 预计为2012年底
- 写一个http服务器
- yum update Transaction Check Error
- Java虚拟机(JVM)面试题大集合
- 终于有人讲明白了,原来这才是全球低时延一张网技术
- Linux jar包 后台运行
- HBase常用Shell命令
- android studio应用开发案例,Android应用开发案例教程(Android Studio版).pptx
- 步进电机正反转实验_三相异步电机正反转控制原理图
- PE安装Win10纯净版教程【附Win10企业版/专业版/64/32位系统下载地址以及系统激活工具和解压软件安装包】
- 大数据分析」最详细的大数据分析师技能图谱详解与零基础自学内容大全
- 阿里云ACP认证哪个值得考?考试时间怎么安排?
- win10重置网络后所有网卡都消失了
- AtCoder Grand Contest 021完整题解
- [emWin]利用内存设备加速GIF图片显示——2021.03
- Socket异步服务器,可以监控客户端的状态,功能有,文字测试,服务端向客户端传输屏幕录像(UDP传输)、监控客户端屏幕(UDP传输),抖动用户窗体、发送文件给用户、扫描客户的C盘目录。
热门文章
- 问题 E: 来淄博旅游
- 2016 icpc沈阳部分题解
- 工欲善其事,必先利其器 – 网络抓包
- Java课程设计项目 客户信息管理软件 客户信息管理系统的实现
- GPT分区下DEEPINV20.6添加win7(MBR)启动
- 面试(一)2019年春招面试(初学者面试,大佬别见笑)
- 遇到问题–python–pytest: error: unrecognized arguments: --cov-report=html
- 计算机excel阶乘,Excel利用VBA计算阶乘
- WinRAR4.20注册文件key文件注册码
- 关于wap上网及彩信的一点想法