Java 1.7 ThreadPoolExecutor源码解析
Java中使用线程池技术一般都是使用Executors
这个工厂类,它提供了非常简单方法来创建各种类型的线程池:
public static ExecutorService newFixedThreadPool(int nThreads) public static ExecutorService newSingleThreadExecutor() public static ExecutorService newCachedThreadPool() public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
核心的接口其实是Executor
,它只有一个execute
方法抽象为对任务(Runnable接口)的执行, ExecutorService
接口在Executor
的基础上提供了对任务执行的生命周期的管理,主要是submit
和shutdown
方法, AbstractExecutorService
对ExecutorService
一些方法做了默认的实现,主要是submit和invoke方法,而真正的任务执行 的Executor接口execute
方法是由子类实现,就是ThreadPoolExecutor
,它实现了基于线程池的任务执行框架,所以要了解 JDK的线程池,那么就得先看这个类。
再看execute
方法之前需要先介几个变量或类。
ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
这个变量是整个类的核心,AtomicInteger
保证了对这个变量的操作是原子的,通过巧妙的操作,ThreadPoolExecutor用这一个变量保存了两个内容:
- 所有有效线程的数量
- 各个线程的状态(runState)
低29位存线程数,高3位存runState
,这样runState有5个值:
- RUNNING:-536870912
- SHUTDOWN:0
- STOP:536870912
- TIDYING:1073741824
- TERMINATED:1610612736
线程池中各个状态间的转换比较复杂,主要记住下面内容就可以了:
- RUNNING状态:线程池正常运行,可以接受新的任务并处理队列中的任务;
- SHUTDOWN状态:不再接受新的任务,但是会执行队列中的任务;
- STOP状态:不再接受新任务,不处理队列中的任务
围绕ctl变量有一些操作,了解这些方法是看懂后面一些晦涩代码的基础:
corePoolSize
核心线程池大小,活动线程小于corePoolSize则直接创建,大于等于则先加到workQueue中,队列满了才创建新的线程。
keepAliveTime
线程从队列中获取任务的超时时间,也就是说如果线程空闲超过这个时间就会终止。
Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable ...
内部类Worker
是对任务的封装,所有submit的Runnable都被封装成了Worker,它本身也是一个Runnable, 然后利用AQS框架(关于AQS可以看我这篇文章)实现了一个简单的非重入的互斥锁, 实现互斥锁主要目的是为了中断的时候判断线程是在空闲还是运行,可以看后面shutdown
和shutdownNow
方法的分析。
// state只有0和1,互斥 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true;// 成功获得锁 } // 线程进入等待队列 return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; }
之所以不用ReentrantLock是为了避免任务执行的代码中修改线程池的变量,如setCorePoolSize
,因为ReentrantLock是可重入的。
execute
execute
方法主要三个步骤:
- 活动线程小于corePoolSize的时候创建新的线程;
- 活动线程大于corePoolSize时都是先加入到任务队列当中;
- 任务队列满了再去启动新的线程,如果线程数达到最大值就拒绝任务。
public void execute(Runnable command) {if (command == null) throw new NullPointerException(); int c = ctl.get(); // 活动线程数 < corePoolSize if (workerCountOf(c) < corePoolSize) { // 直接启动新的线程。第二个参数true:addWorker中会重新检查workerCount是否小于corePoolSize if (addWorker(command, true)) // 添加成功返回 return; c = ctl.get(); } // 活动线程数 >= corePoolSize // runState为RUNNING && 队列未满 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // double check // 非RUNNING状态 则从workQueue中移除任务并拒绝 if (!isRunning(recheck) && remove(command)) reject(command);// 采用线程池指定的策略拒绝任务 // 线程池处于RUNNING状态 || 线程池处于非RUNNING状态但是任务移除失败 else if (workerCountOf(recheck) == 0) // 这行代码是为了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。 // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务 addWorker(null, false); // 两种情况: // 1.非RUNNING状态拒绝新的任务 // 2.队列满了启动新的线程失败(workCount > maximumPoolSize) } else if (!addWorker(command, false)) reject(command); }
注释比较清楚了就不再解释了,其中比较难理解的应该是addWorker(null, false);
这一行,这要结合addWorker一起来看。 主要目的是防止HUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。
addWorker
这个方法理解起来比较费劲。
runWorker
任务添加成功后实际执行的是runWorker
这个方法,这个方法非常重要,简单来说它做的就是:
- 第一次启动会执行初始化传进来的任务firstTask;
- 然后会从workQueue中取任务执行,如果队列为空则等待keepAliveTime这么长时间。
getTask
processWorkerExit
线程退出会执行这个方法做一些清理工作。
tryTerminate
processWorkerExit方法中会尝试调用tryTerminate来终止线程池。这个方法在任何可能导致线程池终止的动作后执行:比如减少wokerCount或SHUTDOWN状态下从队列中移除任务。
shutdown和shutdownNow
shutdown这个方法会将runState置为SHUTDOWN
,会终止所有空闲的线程。
public void shutdown() {final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 线程池状态设为SHUTDOWN,如果已经至少是这个状态那么则直接返回 advanceRunState(SHUTDOWN); // 注意这里是中断所有空闲的线程:runWorker中等待的线程被中断 → 进入processWorkerExit → // tryTerminate方法中会保证队列中剩余的任务得到执行。 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
shutdownNow方法将runState置为STOP。和shutdown方法的区别,这个方法会终止所有的线程。
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // STOP状态:不再接受新任务且不再执行队列中的任务。 advanceRunState(STOP); // 中断所有线程 interruptWorkers(); // 返回队列中还没有被执行的任务。 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
主要区别在于shutdown调用的是interruptIdleWorkers
这个方法,而shutdownNow实际调用的是Worker类的interruptIfStarted
方法:
private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // w.tryLock能获取到锁,说明该线程没有在运行,因为runWorker中执行任务会先lock, // 因此保证了中断的肯定是空闲的线程。 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
void interruptIfStarted() {Thread t;// 初始化时state == -1if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
这就是前面提到的Woker类实现AQS的主要作用。
注意:shutdown方法可能会在finalize被隐式的调用。
这篇博客基本都是代码跟注释,所以如果不是分析ThreadPoolExecutor
源码的话看起来会非常无聊。
总结:
int corePoolSize:核心线程数
int maximumPoolSize:最大线程数
BlockingQueue workQueue:任务队列
long keepAliveTime:和TimeUnit unit一起构成线程的最大空闲时间,一旦超过该时间还没有任务处理,该线程就走向结束了。它是针对当前线程数已经超过corePoolSize核心线程数了或者核心线程数也开启超时策略,即属性allowCoreThreadTimeOut=true
ThreadFactory threadFactory:线程工厂
RejectedExecutionHandler handler:拒绝策略,当任务太多来不及处理,可拒绝该任务
先简单描述下ThreadPoolExecutor的execute(futureTask)过程的大概情况:
1 如果当前线程数小于corePoolSize,则直接创建出一个线程,用于执行新加进来的任务
2 如果当前线程数已经超过corePoolSize,则将该任务放到BlockingQueue workQueue任务队列中,该任务队列可以是有限容量也可以是无限容量的。每个线程处理完一个任务后,都会不断的从BlockingQueue workQueue任务队列中取出任务并执行
3 如果BlockingQueue workQueue是有限容量的,已满无法放进新的任务了,如果此时的线程数小于maximumPoolSize,则直接创建一个线程执行该任务
4 如果线程数已达到maximumPoolSize不能再创建线程了,则直接使用RejectedExecutionHandler handler拒绝该任务
原文转载:http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-ThreadPoolExecutor.html
转载于:https://www.cnblogs.com/AndyAo/p/8135063.html
Java 1.7 ThreadPoolExecutor源码解析相关推荐
- 面试官系统精讲Java源码及大厂真题 - 37 ThreadPoolExecutor 源码解析
37 ThreadPoolExecutor 源码解析 当你做成功一件事,千万不要等待着享受荣誉,应该再做那些需要的事. -- 巴斯德 引导语 线程池我们在工作中经常会用到.在请求量大时,使用线程池,可 ...
- 多线程与高并发(八):ThreadPoolExecutor源码解析, SingleThreadPool,CachedPool,FixedThreadPool,ForkJoinPoll 等
线程池 今天我们来看看JDK给我们提供的默认的线程池的实现. ThreadPoolExecutor:我们通常所说的线程池.多个线程共享同一个任务队列. SingleThreadPool CachedP ...
- Java集合之TreeMap源码解析上篇
上期回顾 上期我从树型结构谈到了红黑树的概念以及自平衡的各种变化(指路上期←戳),本期我将会对TreeMap结合红黑树理论进行解读. 首先,我们先来回忆一下红黑树的5条基本规则. 1.结点是红色或者黑 ...
- ThreadPoolExecutor源码解析
ThreadPoolExecutor源码解析及工作机制 首先介绍前提条件及本文用词说明 线程中断 interrupt()只是设置线程中断位,线程并没有被真正被中断,还是RUNNABLE状态 线程池中线 ...
- JAVA线程池(ThreadPoolExecutor)源码分析
JAVA5提供了多种类型的线程池,如果你对这些线程池的特点以及类型不太熟悉或者非常熟悉,请帮忙看看这篇文章(顺便帮忙解决里面存在的问题,谢谢!): http://xtu-xiaoxin.ite ...
- java arraylist 赋值_ArrayList源码解析,老哥,来一起复习一哈?
点击上方"码农沉思录",选择"设为星标" 优质文章,及时送达 前言 JDK源码解析系列文章,都是基于JDK8分析的,虽然JDK14已经出来,但是JDK8我还不会 ...
- Java并发之Semaphore源码解析
Semaphore 前情提要 在学习本章前,需要先了解ReentrantLock源码解析,ReentrantLock源码解析里介绍的方法有很多是本章的铺垫.下面,我们进入本章正题Semaphore. ...
- ThreadPoolExecutor源码解析(一)
1.ThreadPoolExcuter原理说明 首先我们要知道为什么要使用ThreadPoolExcuter,具体可以看看文档中的说明: 线程池可以解决两个不同问题:由于减少了每个任务的调用开销,在执 ...
- Java FileReader InputStreamReader类源码解析
FileReader 前面介绍FileInputStream的时候提到过,它是从文件读取字节,如果要从文件读取字符的话可以使用FileReader.FileReader是可以便利读取字符文件的类,构造 ...
最新文章
- 数据结构中几种经典排序简介
- vuewebsocket做消息提醒_企业微信群怎么定时群发消息?如何突破群发次数限制?...
- 人工智能---机器学习
- spring 配置只读事务_只读副本和Spring Data第1部分:配置数据库
- 将Java类作为子进程运行
- WCF Testing Tool(转)
- Struts2源码阅读(二)_ActionContext及CleanUP Filter
- Linux常用命令汇总及使用方法(二)之文本编辑器VI
- 【报告分享】新世代、新圈层:2020垂直圈层营销报告(附下载链接)
- python编程入门与案例详解-自学Python 编程基础、科学计算及数据分析
- mysql的底层运行原理,【数据库】震惊!!MySQL的底层原理竟然是这样
- L230 RF可靠性测试-RF指标
- 相控阵天线均匀面阵方向图(六)-----方向图函数的不同表达形式
- 毕业论文写作与学术规范
- docker安装mysql8 并且忽略大小写问题解决
- 怎样用C++在控制台中编写俄罗斯方块
- eos的石墨烯技术是什么
- Orkut 试用报告
- UVA-10074 最大子矩阵 DP
- 快速把照片做成MV,用什么软件好?抖音火爆效果制作
热门文章
- [hihoCoder] 第五十周: 欧拉路·二
- 学以致用十-----centos7.2+python3.6+vim8.1+YouCompleteMe
- Cisco 2811 语音网关+callmanager拨打外线详解配置
- 《中国人工智能学会通讯》——10.25 跨姿态和光照变化的低分辨率人脸识别
- 赵雅智_Swift(2)_swift常量和变量
- 每天一个linux 命令 find命令
- 如何在linux当中,大量的添加用户
- tomcat处理图片或者文件不在项目里
- Oracle11gR2 RAC+DataGuard安装实施维护2+1_数据库集群容灾视频教程
- mysql数据库,当数据类型是float时,查询居然查询不出数据来