前言:在最新的阿里规范中强制使用ThreadPoolExecutor方式创建线程池,不允许使用Executors,因此有必要对ThreadPoolExecutor进行进一步了解。

1.ThreadPoolExecutor介绍

线程池类,直接看其入参最多的构造函数:

参数意义:

corePoolSize

核心线程数的大小。默认情况下,在创建了线程池之后,线程池中的线程数为0,当有任务到来后,如果线程池中存活的线程数小于corePoolSize,则创建一个线程。

maximumPoolSize

线程池中允许的最大线程数,这个参数表示了线程池中最多能创建的线程数量。当任务数量比corePoolSize大时,任务添加到workQueue,当workQueue满了,将继续创建线程以处理任务。maximumPoolSize表示当wordQueue满了,线程池中最多可以创建的线程数量。

keepAliveTime、unit

当线程池处于空闲状态时,超过keepAliveTime时间之后,空闲的线程会被终止。只有当线程池中的线程数大于corePoolSize时,这个参数才会起作用,但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;当线程数大于corePoolSize时,如果一个线程的空闲时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。

workQueue

工作队列,存储提交的等待任务。

threadFactory

线程工厂,指定创建线程的工厂

handler

当任务超出线程池范围和队列容量时,采取何种拒绝策略。

对于上述参数,源码注释中有很详细的解释。这里笔者挑出认为重要的几段:

这里表明了corePoolSize、maximumPoolSize和workQueue的关系(上述注释说的非常的清楚,这里稍微翻译下):

#1.默认情况下,线程池初始化的时候,线程数为0。当接收到新任务时,如果线程池中存活的线程数小于corePoolSize,则新建一个线程。

#2.当运行的线程数超出核心线程数时,执行器更多的选择是将任务放入队列中,而不是新建一个线程。

#3.当队列满后,任务不能提交到队列,在不超过maximumPoolSize(最大线程数)的情况下,会创建一个新线程去执行任务,当超过maximumPoolSize时,任务将被拒绝(这里就关联到接下来说要介绍的内容,在任务操作maximumPoolSize时,线程池所使用拒绝策略)。

当执行器关闭、线程池满了、队列满了,则新任务会被拒绝。使用的拒绝策略有以下几种:

注释解释的非常清楚,线程池采用的拒绝策略共有4种:

#1.AbortPolicy : 默认策略,当任务被拒绝时间抛出异常RejectedExecutionException。

#2.CallerRunsPolicy : 让线程再次调用execute(),这种策略并不会丢弃任务,但是会降低执行器处理任务的速率。

#3.DiscardPolicy : 直接丢弃新任务。

#4.DiscardOldestPolicy : 如果执行器未关闭,删除队列中第一个任务,再次执行任务。如果失败会重试(repeated)。

接下来看线程池的排队策略。

线程池提供了3种排队的策略:

#1.直接提交(SynchronousQueue):直接提交任务,不保存任务。直接提交策略无容量限制,但是当任务数量过速增长有可能撑爆“JVM”。在生产中一般不采用此策略。

#2.无界队列(LinkedBlockingQueue):当所有核心线程都在忙时,用一个无界队列存放提交的任务。最大线程数设置了也无效。使用无界队列会保存核心线程处理不了的任务,队列无上限,因此最大线程数设置了也无效,无界队列需谨慎使用。

#3.有界队列(ArrayBlockingQueue):用 一个有界队列帮助防止资源被耗尽,不过调整和控制比较难。因为队列容量小了,任务不能立即执行,当然需要配合拒绝策略;队列容量太大,又比较耗费资源。当然在生产环境中一般使用有界队列的排队策略,因为使用有界队列可以保存超过核心线程的任务,并且队列有上限,超过上限,新建线程抛错,可以更好的保护资源,防止崩溃。

通过以上分析,可以发现corePoolSize、maximumPoolSize和排队策略是相互影响的,maximumPoolSize的值并不一定有效。

接下来看看线程池的存活机制

当创建的线程超过核心线程数时,线程池会让该线程保持存活keepAliveTime时间,超过该时间后会销毁该线程。默认情况下该值对非核心线程有效,如果想让核心线程也适用于该机制,可以调用allowCoreThreadTimeOut()方法,但是这样的话就不存在核心线程的概念了。

综合以上,线程池在多次执行任务后,会一直维持部分线程存活,即使它是闲置的。目的是为了减少线程销毁创建的开销,下次有任务需要执行,直接从池子里拿线程就能用了。但核心线程不能维护太多,因为也需要一定开销。最大的线程数保护了整个系统的稳定性,避免并发量大的时候,把线程挤满。工作队列则是保证了任务顺序和暂存,系统的可靠性。线程存活规则的目的和维护核心线程的目的类似,但降低了它的存活的时间。

2.线程状态控制

ctl变量是整个线程池的核心控制状态,它是一个AtomicInteger类型的原子对象,它记录了线程池中生效线程数和线程池的运行状态。

  • workerCount,生效的线程数,基本上可以理解为存活的线程数。
  • runState,线程池运行状态。

ctl总共32位,其中低29位代表workerCount,所以最大线程数为(2^29)-1。高3位代表runState。

runState有5个值:

各值对应的值如下:

RUNNING    -- 对应的高3位值是111。
SHUTDOWN   -- 对应的高3位值是000。
STOP       -- 对应的高3位值是001。
TIDYING    -- 对应的高3位值是010。
TERMINATED -- 对应的高3位值是011。

  • RUNNING,接收新任务处理队列任务。
  • SHUTDOWN,不接收新任务,但处理队列任务。
  • STOP,不接收新任务,也不处理队列任务,并且中断所有处理中的任务。
  • TIDYING,所有任务都被终结,有效线程为0,并触发terminated()方法。
  • TERMINATED,当terminated()方法执行结束。

线程池各个状态之间的切换:

当调用了shutdown(),状态会从RUNNING变成SHUTDOWN,不再接收新任务,此时会处理完队列里面的任务。
如果调用的是shutdownNow(),状态会直接变成STOP。
当线程或者队列都是空的时候,状态就会变成TIDYING。
当terminated()执行完的时候,就会变成TERMINATED。

3.关键函数解析

execute(Runnable) 

 1 public void execute(Runnable command) {
 2         if (command == null)
 3             throw new NullPointerException();
 4         /*
 5          * Proceed in 3 steps:
 6          *
 7          * 1. If fewer than corePoolSize threads are running, try to
 8          * start a new thread with the given command as its first
 9          * task.  The call to addWorker atomically checks runState and
10          * workerCount, and so prevents false alarms that would add
11          * threads when it shouldn't, by returning false.
12          *
13          * 2. If a task can be successfully queued, then we still need
14          * to double-check whether we should have added a thread
15          * (because existing ones died since last checking) or that
16          * the pool shut down since entry into this method. So we
17          * recheck state and if necessary roll back the enqueuing if
18          * stopped, or start a new thread if there are none.
19          *
20          * 3. If we cannot queue task, then we try to add a new
21          * thread.  If it fails, we know we are shut down or saturated
22          * and so reject the task.
23          */
24         int c = ctl.get();
25         if (workerCountOf(c) < corePoolSize) {
26             if (addWorker(command, true))
27                 return;
28             c = ctl.get();
29         }
30         if (isRunning(c) && workQueue.offer(command)) {
31             int recheck = ctl.get();
32             if (! isRunning(recheck) && remove(command))
33                 reject(command);
34             else if (workerCountOf(recheck) == 0)
35                 addWorker(null, false);
36         }
37         else if (!addWorker(command, false))
38             reject(command);
39     }

execute函数的主要流程源码中的注释已经讲得非常清楚了。

  • 如果少于核心线程在运行,则尝试创建一个新的线程。
  • 如果任务成功入队,需再次检查线程池状态看是否需要入队,因为在入队过程中,有可能状态发生变化;如果确认入队但没有存活线程,则新建一个空线程。
  • 如果不能入队,则尝试新创建一个线程,如果失败,则拒绝任务。
  • 注意在第二步最后会新建一个线程,这里会有一个轮询机制让下个task出队,然后直接利用这个空闲线程。

在execute中我们主要关注addWorker()函数。

首先看下该函数的整体注释了解其大致流程。

  • 该函数会检查当前线程池是否可以创建worker(线程)。
  • 当线程池stop或者shut down,又或者线程工厂创建线程失败时都会返回false。
  • 在线程创建失败时,会进行回滚。
  • 注意core参数:true表示以corePoolSize作为参照,false表示以maximumPoolSize为参照。

接下来分析addWorker源码:

 1 private boolean addWorker(Runnable firstTask, boolean core) {
 2         retry: // 标记,表示跳出循环时,从哪里开始执行,类似于goto
 3         for (;;) {
 4             int c = ctl.get(); // 获取ctl对应的值,“生效线程数”和“线程池状态”
 5             int rs = runStateOf(c); // 获取线程池状态
 6
 7             // Check if queue empty only if necessary.
 8             // 如果该if判断想要返回false,队列为空为必要条件,因为addWorker()不只是在接收新任务会调用到,处理队列的任务也会调用到。在线程池状态为SHUTDOWN时还会处理队列中的任务,所以队列不为空会继续向下执行
 9             if (rs >= SHUTDOWN &&
10                 ! (rs == SHUTDOWN &&
11                    firstTask == null &&
12                    ! workQueue.isEmpty()))
13                 return false;
14             /* 内循环意义:判断worker是否符合corePoolSize和maximumPoolSize定义,不满足则返回false;                  然后利用CAS自增workerCount,如果CAS成功则退出循环;                  如果CAS失败会继续自旋,在自旋过程中会检查线程池状态,如果发生变化,则回退到外层循环,重新执行。
15                因此内循环的主要作用就是让workerCount在符合条件下自增。
16             */
17             for (;;) {
18                 int wc = workerCountOf(c);
19                 if (wc >= CAPACITY ||
20                     wc >= (core ? corePoolSize : maximumPoolSize))
21                     return false;
22                 if (compareAndIncrementWorkerCount(c))
23                     break retry;
24                 c = ctl.get();  // Re-read ctl
25                 if (runStateOf(c) != rs)
26                     continue retry;
27                 // else CAS failed due to workerCount change; retry inner loop
28             }
29         }
30
31         boolean workerStarted = false;
32         boolean workerAdded = false;
33         Worker w = null;
34         // 这段代码的主要功能:添加任务到线程池,并启动任务所在的线程
35         try {
36             // 创建一个Worker对象,包含一个由线程工厂创建的线程和一个需执行的任务
37             w = new Worker(firstTask);
38             final Thread t = w.thread;
39             if (t != null) {
40                 // 线程创建成功 获取一个可重入锁,把Worker对象放入worker成员变量中
41                 final ReentrantLock mainLock = this.mainLock;
42                 mainLock.lock();
43                 try {
44                     // Recheck while holding lock.
45                     // Back out on ThreadFactory failure or if
46                     // shut down before lock acquired.
47                     int rs = runStateOf(ctl.get());
48                     // 检查线程池状态和线程状态
49                     if (rs < SHUTDOWN ||
50                         (rs == SHUTDOWN && firstTask == null)) {
51                         if (t.isAlive()) // precheck that t is startable
52                             throw new IllegalThreadStateException();
53                         workers.add(w); // 将Worker变量加入workers中(集合)
54                         // 更新largestPoolSize
55                         int s = workers.size();
56                         if (s > largestPoolSize)
57                             largestPoolSize = s;
58                         workerAdded = true;
59                     }
60                 } finally {
61                     mainLock.unlock();
62                 }
63                 // 如果任务添加成功,则启动任务所在的线程
64                 if (workerAdded) {
65                     t.start();
66                     workerStarted = true;
67                 }
68             }
69         } finally {
70             // 如果任务添加失败则执行addWorkerFailed进行回滚
71             if (! workerStarted)
72                 addWorkerFailed(w);
73         }
74         return workerStarted;
75     }

addWorkerFailed(Worker),任务添加失败回滚函数:

 1   private void addWorkerFailed(Worker w) {
 2         final ReentrantLock mainLock = this.mainLock;
 3         // 加锁回滚
 4         mainLock.lock();
 5         try {
 6             if (w != null)
 7                 workers.remove(w); // 回滚workers
 8             decrementWorkerCount();// 回滚workerCount
 9             tryTerminate();// 判断线程池状态,是否需要终结线程池
10         } finally {
11             mainLock.unlock();
12         }
13     }

4.总结

ThreadPoolExecutor我们主要关注其addWorker方法,对于其他方法,可翻看源码,比较好理解。

核心要点:

  • 当核心线程忙碌时,线程池更倾向于把任务放进队列,而不是新建线程。
  • 三种不同的排队策略,根据选择队列的不同,maximumPoolSize不一定有用的。
  • ctl是线程池的核心控制状态,包含的runState线程池运行状态和workCount有效线程数。
  • retry:是一种标记循环的语法,retry可以是任何变量命名合法字符。

by Shawn Chen,2019.02.16,下午。

转载于:https://www.cnblogs.com/developer_chan/p/10341266.html

ThreadPoolExecutor解析相关推荐

  1. Java:ThreadPoolExecutor解析

    目录 功能介绍 线程池相关类图 源码解析 基本概念 字段域 常量 线程构造重要字段 线程控制重要字段 方法 执行任务 ThreadPoolExecutor的关闭 功能介绍 线程池,顾名思义一个线程的池 ...

  2. Java:ThreadPoolExecutor解析续--Executors

    简介 Eexecutor作为灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,执 ...

  3. Java并发编程之线程池ThreadPoolExecutor解析

    线程池存在的意义 平常使用线程即new Thread()然后调用start()方法去启动这个线程,但是在频繁的业务情况下如果在生产环境大量的创建Thread对象是则会浪费资源,不仅增加GC回收压力,并 ...

  4. Java创建线程池的方式

    Java创建线程池的方式 文章目录 Java创建线程池的方式 一.通过Executors工厂方法创建 1.Executors.newSingleThreadExecutor() 2.Executors ...

  5. 码出高效:Java开发手册笔记(线程池及其源码)

    码出高效:Java开发手册笔记(线程池及其源码) 码出高效:Java开发手册笔记(线程池及其源码) 码出高效:Java开发手册笔记(线程池及其源码) 前言 一.线程池的作用 线程的生命周期 二.线程池 ...

  6. 一文读懂线程池的实现原理

    欢迎大家关注我的微信公众号[老周聊架构],Java后端主流技术栈的原理.源码分析.架构以及各种互联网高并发.高性能.高可用的解决方案. 一.前言 上个月底群里的一个好朋友向老周提出啥时候分享 Thre ...

  7. 【读书笔记】码出高效:Java开发手册

    第一章 计算机基础 走进0与1的世界 计算机就是晶体管.电路板组装起来的电子设备,无论是图形图像的渲染.网络远程共享,还是大数据计算,归根到底都是 0 与 1 的信号处理.信息存储和逻辑计算的元数据只 ...

  8. 多线程与高并发(八):ThreadPoolExecutor源码解析, SingleThreadPool,CachedPool,FixedThreadPool,ForkJoinPoll 等

    线程池 今天我们来看看JDK给我们提供的默认的线程池的实现. ThreadPoolExecutor:我们通常所说的线程池.多个线程共享同一个任务队列. SingleThreadPool CachedP ...

  9. 面试官系统精讲Java源码及大厂真题 - 37 ThreadPoolExecutor 源码解析

    37 ThreadPoolExecutor 源码解析 当你做成功一件事,千万不要等待着享受荣誉,应该再做那些需要的事. -- 巴斯德 引导语 线程池我们在工作中经常会用到.在请求量大时,使用线程池,可 ...

最新文章

  1. [ActionScript 3.0] AS3.0 简单封装Socket的通信
  2. 黑马lavarel教程---3、数据库和视图注意点
  3. jquery 在div追加文本_前端技术--JQuery
  4. 面试题:如何编写一个杯子测试用例
  5. 路由重发分之RIP-OSPF
  6. 以太坊geth节点各种报错(求助)
  7. 度度熊的01世界 DFS
  8. security center启动类型更改不了_Word 启动缓慢解决方法
  9. js中比較好的继承方式
  10. 机器学习- 吴恩达Andrew Ng - week3-4 solve overfitting
  11. 存储基础知识 - 网络存储主要技术
  12. QQ登录界面测试用例设计:
  13. grpc-go源码剖析二十之grpc客户端帧接收器是如何处理不同的帧的?
  14. Power BI中计算同比、环比
  15. Silverlight 4 - MVVM with Commanding and WCF RIA Services
  16. 环境工程部门怎么实施自动化软件学习时间更多
  17. WIN10系统下ODBC的配置
  18. 【转载】8B/10B Encode/Decode详解
  19. 单片机的两个外围电路:复位电路和时钟电路
  20. Web Scraping with Python 学习笔记8

热门文章

  1. matlab自带图片下载,数字图像处理中Matlab的应用.pdf
  2. c 表达式必须是可修改的左值_C++中的左值,右值,左值引用,右值引用
  3. 比其他行业晚了十年的工业软件,转型的核心和动力是什么?
  4. 短视频源码,仿抖音源码,助您在短视频行业开辟出一条新路
  5. 日志服务发布Windows Logtail,完整支持两大平台
  6. java----单例模式
  7. .NET微信公众号开发系列 - 项目介绍
  8. easyui汉化啊!
  9. 将not exists更改为外连接
  10. Vmware Linux 固定IP配置(CenOS7)