如果你对线程池的概念还不甚了解,那么Java 线程池 Executor浅入浅出这篇文章可以让你一步步从0到1了解线程池的搭建过程。关于线程池的更详细的说明,《Java并发编程实战》这本书上讲的很透彻,强烈推荐阅读。

Executors中为我们提供了多种静态工厂方法来创建各种特性的线程池,其中大多数是返回ThreadPoolExecutor对象。因此本篇博文直接对ThreadPoolExecutor的原理进行剖析,加深对线程池设计的理解。ThreadPoolExecutor有如下两个重要参数:

corePoolSize

线程池核心线程的数量。如果线程池里得线程个数小于corePoolSize,那么就会立即创建一个新线程对象来执行任务。如果任务数大于核心线程数怎么办呢?那么就将任务放在队列中,等待执行。
比如我们去银行办理业务,有若干个人工服务窗口,当来办理业务的客户比较少的时候,可以不用排队,直接去空闲的窗口去办理业务。如果客户比较多了,那么就需要就需要领号排队了。但是服务窗口有限,来办理业务的人太多了,怎么办?maximumPoolSize就派上用场了。
我们需要注意的是,初始化ThreadPoolExecutor的时候,线程Thread不会立即创建和启动,而是等到提交任务的时候才会创建启动,当然调用了prestartAllCoreThreads的话,是可以创建并启动所有Thread.

    /***启动所有核心线程,让他们处于idly状态等待任务的到来。* @return 返回启动的线程的数量*/public int prestartAllCoreThreads() {int n = 0;while (addWorker(null, true))++n;return n;}

另外ThreadPoolExecutor提供了getPoolSize()方法,该方法并不是返回初始化的时候设置的corePoolSize的大小,而是获取线程中实际存在的线程的数量,如果线程池处于TIDYING状态,则返回0:

   //存储正在工作的线程private final HashSet<Worker> workers = new HashSet<>();/*** Returns the current number of threads in the pool.** @return the number of threads*/public int getPoolSize() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//如果线程处于TIDYING状态,说明线程池里的线程为0return runStateAtLeast(ctl.get(), TIDYING) ? 0: workers.size();} finally {mainLock.unlock();}}

maximumPoolSize

既然有核心线程,那么也有非核心线程,但是这个线程池这个池子不能无限大,maximumPoolSize就限定了线程池中允许的最大线程数。
在线程池阻塞队列(非无界队列)装满了任务之后,如果还有任务加入,如果当前的线程数小于maximumPoolSize,则会新建线程来执行任务。否则就会拒绝执行。
比如去银行排队办理业务的客户越来越多,银行服务大厅已经装不下了。此时如果除了服务窗口之外,银行还有额外的资源可以办理客户的业务,那么就使用额外的资源来处理任务。如果额外的资源也已经满负荷服务客户,对于后面新来的客户,只能“欢迎下次光临”了。

workQueue

用来保存等待执行任务的队列,等待的任务必须实现Runnable接口,线程池与工作队列WorkQueue密切相关,因为并不是一个线程负责处理一个任务,而是一个线程负责处理好多任务。WorkQueue负责保存所有等待执行的任务,而线程池里的线程从工作队列中获取一个任务,然后执行,执行完毕后等待下一个任务。我们先来看看工作线程(下文简称WorkThread)怎么执行任务的,具体的在ThreadPoolExecutorl类的runWorker方法里面,该方法的主要作用就是不断的从WorkQueue中获取任务并执行之:

  final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // 释放锁,允许中断boolean completedAbruptly = true;try {//开启一个循环,从WorkQueue中获取任务并执行之while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//空方法:任务执行之前的回调beforeExecute(wt, task);Throwable thrown = null;//执行任务task.run();//空方法:任务执行完毕之后的回调afterExecute(task, thrown);}} finally {task = null;//记录当前Worker已经完成任务的数量w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {//使得工作线程退出processWorkerExit(w, completedAbruptly);}}

注意Worker 是一个Runnable,runWorker方法就是在Worker的run方法中执行的。

Worker对象简单说明

Worker是线程池里一个重要的对象,该类还是一个Runnable,该类持有一个Thread,代表着Worker对象在哪个线程中执行,在初始化Worker对象的时候就创建了线程。.

{//由ThreadFactory创建,代表着Worker对象在哪个线程中执行final Thread thread;/** Initial task to run.  Possibly null.Worker对象第一个执行的任务,可能是null,后面在执行任务的时候就要从WorkerQueue里面获取 */Runnable firstTask;/** 记录每个线程完成工作的数量*/volatile long completedTasks;Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;//创建线程this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker. */public void run() {runWorker(this);}// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock()        { acquire(1); }public boolean tryLock()  { return tryAcquire(1); }public void unlock()      { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}

另外Worker提供了CAS算法进行自旋,提供了lockunlock等方法,其中1代表着处于locked状态,而0除以非locked状态.关于自旋CAS的基础知识,感兴趣的可以阅读博主的《java线程知识点拾遗(CAS)》。

execute方法

ThreadPoolExecutor提供了execute方法,主要是用新的线程或者线程池里已经有的线程,在未来某个时期执行一个任务,当线程池shutDown或者达到最大容量的时候,任务就不会被执行,此时线程池会执行拒绝策略RejectedExecutionHandler。下面再来大致分析下该方法:

 public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();//如果线程池里的线程小于核心数量if (workerCountOf(c) < corePoolSize) {//创建一个新线程,并将command作为线程执行的第一个任务if (addWorker(command, true))return;c = ctl.get();}//如果线程池处于Running状态,且将任务command成功加入WorkerQueue中if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//再次校验状态,如果线程池不是处于Running状态,则从队列中移除并执行拒绝策略。if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)//注意这是一个很细节的点//线程池没有活动线程了,但是队列里还有任务没执行这种特殊情况,则重新添加一个线程执行。addWorker(null, false);}//队列已经满了,则重新创建线程执行,如果失败则执行拒绝策略else if (!addWorker(command, false))reject(command);}

在这里有一个值得注意的细节,调用workQueue.offer(command)将任务添加到队列之后,如果线程中有正在执行任务的线程,则这个任务会在未来某个时间内执行。但是如果任务入队之后,恰好线程池中没有活跃的线程了,怎么办呢,就再次addWorker,启动新线程执行。(举个不恰当的例子,就好像去即将打烊的饭店去吃饭,你已经扫码付好钱(入队列),然而厨师恰巧都已经准备下班了,不得不重新给你做饭一样),注意此时addWorker的第一个参数是null,因为我们已经将command通过offer添加到队列中了,具体代码段如下:

  else if (workerCountOf(recheck) == 0)//注意这是一个很细节的点//线程池没有活动线程了,但是队列里还有任务没执行这种特殊情况,则重新添加一个线程执行。addWorker(null, false)

可以用如下流程图来大致表示下线程池的工作流程。

addWorker方法简析

addWorker的主要作用就是,在线程池当前的状态和既有的线程边界(corePoolSize,maximumPoolSize)下,新的任务是否能添加成功。

     ** @param firstTask  当Worker被创建的时候(此时线程池里的线程数量小于核心线程),firstTask将作为Worker对象的第一个任务来执行。* 此时firstTask并不会添加到队列中。 或者当队列已经满的情况下,也是直接创建Worker,但是fistTask也不会添加到WorkQueue中去。 */private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {// 外层自旋int c = ctl.get();//获取线程状态int rs = runStateOf(c);// Check if queue empty only if necessary.//如果线程池处于关闭状态,并且WorkerQueue中还有任务在等待执行,则返回falseif (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {//内层自旋int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))//如果线程池内的线程达到最大值,比如达到corePoolSize或者maximunPoolSize,则返回falsereturn false;if (compareAndIncrementWorkerCount(c))// 以CAS的方式尝试把线程数加1// 注意这里只是把线程池中的线程数加1,并没有在线程池中真正的创建线程break retry;c = ctl.get();  // Re-read ctl//如果此时线程池的状态和上次获取的不一样,则继续执行外层自旋if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}//end inner for}//end outter forboolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//创建Worker,同时也就创建了Thread对象w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//将Worker添加到HashSet种   workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {//启动线程执行任务t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

addWorker工作的大致流程如下:

1、判断线程池状态,如果需要创建Worker对象并同时创建工作线程
2、将此worker添加进workers集合中。
3、启动worker对应的线程,运行worker的run,最终执行了runWorker方法。
4、如果失败了回滚worker的创建动作(addWorkerFailed 方法),即将worker从workers集合中删除,并原子性的减少workerCount。

keepAliveTime的使用原理

该变量表示线程空闲的时间。线程的创建和销毁是需要代价的。线程执行完任务后不会立即销毁,而是继续存活一段时间,存活的时间就是keepAliveTime。默认情况下,该参数只有在线程数大于corePoolSize时才会生效.该变量在从WorkQueue里获取任务的时候会被使用,在前文runWorker里就调用了从workQueue里面获取任务的方法:getTask():


private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {//开启一个循环int c = ctl.get();int rs = runStateOf(c);//当线程处于SHUTDOWN 状态时,并且workQueue请求队列为空,线程数量的表示-1,并返回一个null。if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}//获取当前工作线程数int wc = workerCountOf(c);//如果allowCoreThreadTimeOut设置为true,或者线程数量大于核心线程数量,keepAliveTime才起作用boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {//减少工作线程if (compareAndDecrementWorkerCount(c))return null;continue;}try {//从阻塞队列中取任务,根据情况选择一直阻塞等待还是超时等待Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();//成功从队列中获取到了任务,并返回之    if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}

可以看出,keepAliveTime是在 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法中起作用的。在runWorker方法中,开始while循环获取任务,如果超时了就可以跳出while循环,从而Thread完成了其任务而被销毁。:

 final void runWorker(Worker w) {//省略部分代码while (task != null || (task = getTask()) != null) {//省略部分代码}//省略部分代码}

Java 线程池 ThreadPoolExecutor源码简析相关推荐

  1. JAVA线程池(ThreadPoolExecutor)源码分析

    JAVA5提供了多种类型的线程池,如果你对这些线程池的特点以及类型不太熟悉或者非常熟悉,请帮忙看看这篇文章(顺便帮忙解决里面存在的问题,谢谢!):     http://xtu-xiaoxin.ite ...

  2. Java线程池ThreadPoolExecutor源码分析

    继承关系 Executor接口 public interface Executor {void execute(Runnable command); } ExecutorService接口 publi ...

  3. idea 线程内存_Java线程池系列之-Java线程池底层源码分析系列(一)

    课程简介: 课程目标:通过本课程学习,深入理解Java线程池,提升自身技术能力与价值. 适用人群:具有Java多线程基础的人群,希望深入理解线程池底层原理的人群. 课程概述:多线程的异步执行方式,虽然 ...

  4. idea 线程内存_Java线程池系列之-Java线程池底层源码分析系列(二)

    课程简介: 课程目标:通过本课程学习,深入理解Java线程池,提升自身技术能力与价值. 适用人群:具有Java多线程基础的人群,希望深入理解线程池底层原理的人群. 课程概述:多线程的异步执行方式,虽然 ...

  5. Java并发编程实战(chapter_3)(线程池ThreadPoolExecutor源码分析)

    为什么80%的码农都做不了架构师?>>>    这个系列一直没再写,很多原因,中间经历了换工作,熟悉项目,熟悉新团队等等一系列的事情.并发课题对于Java来说是一个又重要又难的一大块 ...

  6. Java线程池状态判断源码_深入浅出Java线程池:源码篇

    前言 在上一篇文章深入浅出Java线程池:理论篇中,已经介绍了什么是线程池以及基本的使用.(本来写作的思路是使用篇,但经网友建议后,感觉改为理论篇会更加合适).本文则深入线程池的源码,主要是介绍Thr ...

  7. java ArrayList 概述 与源码简析

    ArrayList 概述 与源码简析 1 ArrayList 创建 ArrayList<String> list = new ArrayList<>(); //构造一个初始容量 ...

  8. Java并发之线程池ThreadPoolExecutor源码分析学习

    线程池学习 以下所有内容以及源码分析都是基于JDK1.8的,请知悉. ​ 我写博客就真的比较没有顺序了,这可能跟我的学习方式有关,我自己也觉得这样挺不好的,但是没办法说服自己去改变,所以也只能这样想到 ...

  9. c++ 线程池_JAVA并发编程:线程池ThreadPoolExecutor源码分析

    前面的文章已经详细分析了线程池的工作原理及其基本应用,接下来本文将从底层源码分析一下线程池的执行过程.在看源码的时候,首先带着以下两个问题去仔细阅读.一是线程池如何保证核心线程数不会被销毁,空闲线程数 ...

  10. threadpoolexecutor创建线程池_线程池ThreadPoolExecutor源码分析

    什么是线程池 创建线程要花费昂贵的资源和时间,如果任务来了才创建那么响应时间会变长,而且一个进程能创建的线程数量有限.为了避免这些问题,在程序启动的时候就创建若干线程来响应出来,它们被称为线程池,里面 ...

最新文章

  1. mysql教程左右链接_mysql的左右内连接用法实例
  2. R语言libPaths函数获取或者设置包安装的路径实战
  3. JPA 2.1: What is new
  4. [Usaco2010 Nov]Visiting Cows
  5. sudo 安装 常见错误
  6. Unity 游戏用XLua的HotFix实现热更原理揭秘
  7. bcb异常处理显示错误行号_python基础篇:错误和异常
  8. Tableau可视化学习笔记:day09-10
  9. “互联网+”时代,网络安全市场将达千亿级别
  10. sybase 数据库空间使用情况
  11. ios人脸识别_适用于Android和iOS的10种最佳人脸识别应用程序
  12. ls、du命令的用法
  13. 让div中的p标签文字垂直居中的方法
  14. stlinkv2红灯闪烁_ST-LINK V2 DIY笔记(一)
  15. VMware 虚拟机中Siemens S7PLCSIM advancedv3.0upd2无法正常仿真的处理过程
  16. 给Python漫画分集标题下载工具开发Qt界面
  17. 以高能低碳技术融入PC全生命周期,英特尔联合业界推出绿色商用电脑
  18. 知识图谱在人工智能中的应用与思考
  19. matlab空域滤波,MATLAB数字图像处理基本操作及空域滤波
  20. mysql中进行数据分析_SQL数据分析(一)

热门文章

  1. 疯狂python讲义这本书怎么样_疯狂Python讲义(读书笔记)
  2. linux拨号日志,Linux系统日志管理:(1)连接时间日志
  3. php mess,Mess.php
  4. Cesium:加载本地高程/地形数据
  5. CSS:设置图片不可拖动
  6. HTML:canvas简述
  7. 实战爬虫:利用python中itchat模块给心爱的人每天发天气预报
  8. 我想我是适合独处的人
  9. mac 下tomcat 9.0+eclipse+mysql8.0.12配置相关问题
  10. 给android app添加注册机功能