线程池解析(三)——Worker源码解析
相关文章
线程池解析(一)——概念总结
线程池解析(二)——execute、addWorker源码解析
线程池解析(三)——Worker源码解析
线程池解析(四)——submit源码解析(Runnable、Callable、Future、FutureTask)
概述
在ThreadPoolExecutor中以Worker为单位对工作线程进行管理。
那么Worker具体是做了什么呢?本文将围绕这个话题展开。
Worker源码
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID = 6138294804551838833L;/** Thread this worker is running in. Null if factory fails. */final Thread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker. */public void run() {runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}
AbstractQueuedSynchronizer
AQS作用
如果对AQS不是很了解的读者可以看下笔者前面的文章:
ReentrantLock和AQS 源码解析
Worker继承了AbstractQueuedSynchronizer,主要目的有两个:
- 将锁的粒度细化到每个工Worker。
如果多个Worker使用同一个锁,那么一个Worker Running持有锁的时候,其他Worker就无法执行,这显然是不合理的。
- 直接使用CAS获取,避免阻塞。
如果这个锁使用阻塞获取,那么在多Worker的情况下执行shutDown。如果这个Worker此时正在Running无法获取到锁,那么执行shutDown()线程就会阻塞住了,显然是不合理的。
源码解析
// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }
从源码得知,Worker中是通过CAS去获取锁的,期间不会有阻塞的逻辑。
- 如果获取成功。会记录下当前拿到锁的线程,然后返回 true。
- 如果获取失败。会直接返回false.
AQS使用(interruptIdleWorkers源码)
直接讲源码可能还是比较难以理解,我们直接从interruptIdleWorkers的源码来理解下Worker中的AQS是如何使用的。
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}}
interruptIdleWorkers是用来关闭空闲的Worker的,其逻辑主要如下:
- 获取到线程池的ReentrantLock,上锁,执行完操作后再解锁。
- 遍历Worker,尝试获取worker的锁,如果可以获取到就说明是空闲的,interrupt这个Worker的线程。(如果一个Worker不是空闲的,那么它的锁会被占用,CAS会获取失败)
Worker.run() 与 runWorker()
另外Worker还实现了Runnable,run()方法中最终是走到了线程池的runWorker()方法。源码如下:
public void run() {runWorker(this);}
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}
逻辑如下:
- 首先会去Worker的firstTask中取任务,如果没有就使用getTask()方法去取。(getTask()是会阻塞的,解析在后文)取到任务之后就会占用Worker的锁。
- 如果线程池目前状态大于或等于STOP(STOP,TIDYING,TERMINATED),那么需要把当前Worker中的线程也中止。(等待队列中的任务也不会再执行了)
- 如果线程池目前没有关闭。此时会执行task.run(),在当前线程执行任务。
- 此任务的逻辑执行完毕后,会释放Worker的锁。
getTask()源码解析
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
逻辑如下:
- 如果“线程池已经处于shutdown并且线程池为空”或者“线程池已经大于等于stop”,那么此时会递减Worker的数量并且直接返回。
- SHUTDOWN
不会接受新的任务,但是会执行队列中的任务。- STOP
不会接受新的任务,也不会执行队列中的任务。
- 如果此时Worker的数量多于maximumPoolSize。或者Worker数量大于corePoolSize并且存在等待超时的情况,那么会去尝试减少Worker的数量。(根据超时逻辑来回收Worker)
- 阻塞等待去获取workQueue中的任务,获取到就返回,同时会存在超时逻辑。
超时逻辑:
核心线程数需要设置allowCoreThreadTimeOut才会超时,否则一直不会回收。
非核心线程一直有超时逻辑,超时不用就会被回收。
总结
getTask总结
- getTask中比较中比较重要的功能是超时回收逻辑。
核心线程数需要设置allowCoreThreadTimeOut才会超时,否则一直不会回收。
非核心线程一直有超时逻辑,超时不用就会被回收。 - getTask也支持了线程池的shutDown与stop状态。
如果处于shutDown状态并且任务队列为空,会回收Worker。
如果处于Stop状态,不管任务队列什么样子,会回收Worker。
Worker AQS总结
主要两个作用
- 将锁的粒度细化到每个Worker。
- 直接使用CAS获取,避免阻塞。
runWorker总结
- 如果线程池目前状态大于或等于STOP,会终止Worker中的线程。
- 如果线程池此时是Running或者shutDown。会先去执行Worker的firstTask,如果firstTask执行结束或者为空,则会循环去执行工作队列中的任务,如果工作队列为空,会阻塞住。
线程池解析(三)——Worker源码解析相关推荐
- 多线程与高并发(七):详解线程池 - 自定义线程池,JDK自带线程池,ForkJoin,源码解析等
Executor 接口关系 Callable:类似于Runnable,但是可以有返回值 Future:存储将来执行的结果.Callable被执行完之后的结果,被封装到Future里面. Future ...
- Java Executor源码解析(3)—ThreadPoolExecutor线程池execute核心方法源码【一万字】
基于JDK1.8详细介绍了ThreadPoolExecutor线程池的execute方法源码! 上一篇文章中,我们介绍了:Java Executor源码解析(2)-ThreadPoolExecutor ...
- Multidex记录三:源码解析
个人博客地址 http://dandanlove.com/ Multidex记录一:介绍和使用 Multidex记录二:缺陷&解决 Multidex记录三:源码解析 记录Multidex源码解 ...
- 源码解析:Spring源码解析笔记(五)接口设计总览
本文由colodoo(纸伞)整理 QQ 425343603 Java学习交流群(717726984) Spring解析笔记 启动过程部分已经完成,对启动过程源码有兴趣的朋友可以作为参考文章. 源码解析 ...
- mysql 线程池源码模块_易语言Mysql线程池2.0模块源码
易语言Mysql线程池2.0模块源码 易语言Mysql线程池2.0模块源码 系统结构:GetThis,初始化,关闭类线程,线程_测试,其他_附加文本,连接池初始化,取mysql句柄,释放mysql句柄 ...
- Java线程池架构(一)原理和源码解析
在前面介绍JUC的文章中,提到了关于线程池Execotors的创建介绍,在文章:<java之JUC系列-外部Tools>中第一部分有详细的说明,请参阅: 文章中其实说明了外部的使用方式,但 ...
- Python数据爬取之0基础小白实战(三)源码解析
前两篇(一)软件安装.(二)初窥门槛我解决了软件版本不匹配的问题并学习关键技术.找到重要源码,完成了程序思路总体设计,本篇废话不多说,我们直接上源码. 任务描述 获取2015-2020年通过申请的国家 ...
- Android四大组件之ContentProvider 全面解析,ContentResolver源码解析如何调用其它APP的ContentProvider
今天来总结下Android中的ContentProvider(以下简称CP),具体代码请见https://github.com/Mangosir/ContentProviderReview/tree/ ...
- spring 源码深度解析_spring源码解析之SpringIOC源码解析(下)
前言:本篇文章接SpringIOC源码解析(上),上一篇文章介绍了使用XML的方式启动Spring,介绍了refresh 方法中的一些方法基本作用,但是并没有展开具体分析.今天就和大家一起撸一下ref ...
最新文章
- SpringMVC一例 是否需要重定向
- yum 安装 一个小问题导致找不到安装包
- 第十章第二节 阿基米德原理
- arcgis下载地址
- word中没文字地方添加下划线方法
- 互联网与物联网有什么区别?
- 一起学习网站开发之基于Spring boot的微信登录开发流程和知识点
- 入门学习必收藏!精选Photoshop、D…
- MapReduce学习笔记(二)——Mapper、Reducer和Driver
- Redis和消息队列
- chromecast 协议_如何删除Chromecast的网络范围内的Android通知
- How to covert HEIF to JPG with Java
- 神经网络及其matlab仿真
- HTML5的基础知识整理
- 内地人去香港旅游注意事项
- 如何让快速在CAD图纸中标注文字
- Linux入门合集(入门一篇就够了!)
- 台式机win10关闭fn热键_Win10系统怎么禁用f1-f12快捷键
- 船舶航行matlab程序,基于船舶运动控制的Matlab仿真
- JavaScript __ 对象小记