Java中使用线程池技术一般都是使用Executors这个工厂类,它提供了非常简单方法来创建各种类型的线程池:

public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor() public static ExecutorService newCachedThreadPool() public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

核心的接口其实是Executor,它只有一个execute方法抽象为对任务(Runnable接口)的执行, ExecutorService接口在Executor的基础上提供了对任务执行的生命周期的管理,主要是submitshutdown方法, AbstractExecutorServiceExecutorService一些方法做了默认的实现,主要是submit和invoke方法,而真正的任务执行 的Executor接口execute方法是由子类实现,就是ThreadPoolExecutor,它实现了基于线程池的任务执行框架,所以要了解 JDK的线程池,那么就得先看这个类。

再看execute方法之前需要先介几个变量或类。

ctl

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

这个变量是整个类的核心,AtomicInteger保证了对这个变量的操作是原子的,通过巧妙的操作,ThreadPoolExecutor用这一个变量保存了两个内容:

  • 所有有效线程的数量
  • 各个线程的状态(runState)

低29位存线程数,高3位存runState,这样runState有5个值:

  • RUNNING:-536870912
  • SHUTDOWN:0
  • STOP:536870912
  • TIDYING:1073741824
  • TERMINATED:1610612736

线程池中各个状态间的转换比较复杂,主要记住下面内容就可以了:

  • RUNNING状态:线程池正常运行,可以接受新的任务并处理队列中的任务;
  • SHUTDOWN状态:不再接受新的任务,但是会执行队列中的任务;
  • STOP状态:不再接受新任务,不处理队列中的任务

围绕ctl变量有一些操作,了解这些方法是看懂后面一些晦涩代码的基础:

View Code

corePoolSize

核心线程池大小,活动线程小于corePoolSize则直接创建,大于等于则先加到workQueue中,队列满了才创建新的线程。

keepAliveTime

线程从队列中获取任务的超时时间,也就是说如果线程空闲超过这个时间就会终止。

Worker

private final class Worker extends AbstractQueuedSynchronizer implements Runnable ...

内部类Worker是对任务的封装,所有submit的Runnable都被封装成了Worker,它本身也是一个Runnable, 然后利用AQS框架(关于AQS可以看我这篇文章)实现了一个简单的非重入的互斥锁, 实现互斥锁主要目的是为了中断的时候判断线程是在空闲还是运行,可以看后面shutdownshutdownNow方法的分析。

// state只有0和1,互斥
protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true;// 成功获得锁  } // 线程进入等待队列 return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; }

之所以不用ReentrantLock是为了避免任务执行的代码中修改线程池的变量,如setCorePoolSize,因为ReentrantLock是可重入的。

execute

execute方法主要三个步骤:

  • 活动线程小于corePoolSize的时候创建新的线程;
  • 活动线程大于corePoolSize时都是先加入到任务队列当中;
  • 任务队列满了再去启动新的线程,如果线程数达到最大值就拒绝任务。
public void execute(Runnable command) {if (command == null) throw new NullPointerException(); int c = ctl.get(); // 活动线程数 < corePoolSize if (workerCountOf(c) < corePoolSize) { // 直接启动新的线程。第二个参数true:addWorker中会重新检查workerCount是否小于corePoolSize if (addWorker(command, true)) // 添加成功返回 return; c = ctl.get(); } // 活动线程数 >= corePoolSize // runState为RUNNING && 队列未满 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // double check // 非RUNNING状态 则从workQueue中移除任务并拒绝 if (!isRunning(recheck) && remove(command)) reject(command);// 采用线程池指定的策略拒绝任务 // 线程池处于RUNNING状态 || 线程池处于非RUNNING状态但是任务移除失败 else if (workerCountOf(recheck) == 0) // 这行代码是为了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。 // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务 addWorker(null, false); // 两种情况: // 1.非RUNNING状态拒绝新的任务 // 2.队列满了启动新的线程失败(workCount > maximumPoolSize) } else if (!addWorker(command, false)) reject(command); }

注释比较清楚了就不再解释了,其中比较难理解的应该是addWorker(null, false);这一行,这要结合addWorker一起来看。 主要目的是防止HUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。

addWorker

这个方法理解起来比较费劲。

View Code

runWorker

任务添加成功后实际执行的是runWorker这个方法,这个方法非常重要,简单来说它做的就是:

  • 第一次启动会执行初始化传进来的任务firstTask;
  • 然后会从workQueue中取任务执行,如果队列为空则等待keepAliveTime这么长时间。
View Code

getTask

View Code

processWorkerExit

线程退出会执行这个方法做一些清理工作。

View Code

tryTerminate

processWorkerExit方法中会尝试调用tryTerminate来终止线程池。这个方法在任何可能导致线程池终止的动作后执行:比如减少wokerCount或SHUTDOWN状态下从队列中移除任务。

View Code

shutdown和shutdownNow

shutdown这个方法会将runState置为SHUTDOWN,会终止所有空闲的线程。

public void shutdown() {final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 线程池状态设为SHUTDOWN,如果已经至少是这个状态那么则直接返回  advanceRunState(SHUTDOWN); // 注意这里是中断所有空闲的线程:runWorker中等待的线程被中断 → 进入processWorkerExit → // tryTerminate方法中会保证队列中剩余的任务得到执行。  interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }

shutdownNow方法将runState置为STOP。和shutdown方法的区别,这个方法会终止所有的线程。

public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // STOP状态:不再接受新任务且不再执行队列中的任务。  advanceRunState(STOP); // 中断所有线程  interruptWorkers(); // 返回队列中还没有被执行的任务。 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }

主要区别在于shutdown调用的是interruptIdleWorkers这个方法,而shutdownNow实际调用的是Worker类的interruptIfStarted方法:

private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // w.tryLock能获取到锁,说明该线程没有在运行,因为runWorker中执行任务会先lock, // 因此保证了中断的肯定是空闲的线程。 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }

void interruptIfStarted() {Thread t;// 初始化时state == -1if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }

这就是前面提到的Woker类实现AQS的主要作用。

注意:shutdown方法可能会在finalize被隐式的调用。

这篇博客基本都是代码跟注释,所以如果不是分析ThreadPoolExecutor源码的话看起来会非常无聊。

总结:

  • int corePoolSize:核心线程数

  • int maximumPoolSize:最大线程数

  • BlockingQueue workQueue:任务队列

  • long keepAliveTime:和TimeUnit unit一起构成线程的最大空闲时间,一旦超过该时间还没有任务处理,该线程就走向结束了。它是针对当前线程数已经超过corePoolSize核心线程数了或者核心线程数也开启超时策略,即属性allowCoreThreadTimeOut=true

  • ThreadFactory threadFactory:线程工厂

  • RejectedExecutionHandler handler:拒绝策略,当任务太多来不及处理,可拒绝该任务

先简单描述下ThreadPoolExecutor的execute(futureTask)过程的大概情况:

1 如果当前线程数小于corePoolSize,则直接创建出一个线程,用于执行新加进来的任务

2 如果当前线程数已经超过corePoolSize,则将该任务放到BlockingQueue workQueue任务队列中,该任务队列可以是有限容量也可以是无限容量的。每个线程处理完一个任务后,都会不断的从BlockingQueue workQueue任务队列中取出任务并执行

3 如果BlockingQueue workQueue是有限容量的,已满无法放进新的任务了,如果此时的线程数小于maximumPoolSize,则直接创建一个线程执行该任务

4 如果线程数已达到maximumPoolSize不能再创建线程了,则直接使用RejectedExecutionHandler handler拒绝该任务

原文转载:http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-ThreadPoolExecutor.html

转载于:https://www.cnblogs.com/AndyAo/p/8135063.html

Java 1.7 ThreadPoolExecutor源码解析相关推荐

  1. 面试官系统精讲Java源码及大厂真题 - 37 ThreadPoolExecutor 源码解析

    37 ThreadPoolExecutor 源码解析 当你做成功一件事,千万不要等待着享受荣誉,应该再做那些需要的事. -- 巴斯德 引导语 线程池我们在工作中经常会用到.在请求量大时,使用线程池,可 ...

  2. 多线程与高并发(八):ThreadPoolExecutor源码解析, SingleThreadPool,CachedPool,FixedThreadPool,ForkJoinPoll 等

    线程池 今天我们来看看JDK给我们提供的默认的线程池的实现. ThreadPoolExecutor:我们通常所说的线程池.多个线程共享同一个任务队列. SingleThreadPool CachedP ...

  3. Java集合之TreeMap源码解析上篇

    上期回顾 上期我从树型结构谈到了红黑树的概念以及自平衡的各种变化(指路上期←戳),本期我将会对TreeMap结合红黑树理论进行解读. 首先,我们先来回忆一下红黑树的5条基本规则. 1.结点是红色或者黑 ...

  4. ThreadPoolExecutor源码解析

    ThreadPoolExecutor源码解析及工作机制 首先介绍前提条件及本文用词说明 线程中断 interrupt()只是设置线程中断位,线程并没有被真正被中断,还是RUNNABLE状态 线程池中线 ...

  5. JAVA线程池(ThreadPoolExecutor)源码分析

    JAVA5提供了多种类型的线程池,如果你对这些线程池的特点以及类型不太熟悉或者非常熟悉,请帮忙看看这篇文章(顺便帮忙解决里面存在的问题,谢谢!):     http://xtu-xiaoxin.ite ...

  6. java arraylist 赋值_ArrayList源码解析,老哥,来一起复习一哈?

    点击上方"码农沉思录",选择"设为星标" 优质文章,及时送达 前言 JDK源码解析系列文章,都是基于JDK8分析的,虽然JDK14已经出来,但是JDK8我还不会 ...

  7. Java并发之Semaphore源码解析

    Semaphore 前情提要 在学习本章前,需要先了解ReentrantLock源码解析,ReentrantLock源码解析里介绍的方法有很多是本章的铺垫.下面,我们进入本章正题Semaphore. ...

  8. ThreadPoolExecutor源码解析(一)

    1.ThreadPoolExcuter原理说明 首先我们要知道为什么要使用ThreadPoolExcuter,具体可以看看文档中的说明: 线程池可以解决两个不同问题:由于减少了每个任务的调用开销,在执 ...

  9. Java FileReader InputStreamReader类源码解析

    FileReader 前面介绍FileInputStream的时候提到过,它是从文件读取字节,如果要从文件读取字符的话可以使用FileReader.FileReader是可以便利读取字符文件的类,构造 ...

最新文章

  1. 数据结构中几种经典排序简介
  2. vuewebsocket做消息提醒_企业微信群怎么定时群发消息?如何突破群发次数限制?...
  3. 人工智能---机器学习
  4. spring 配置只读事务_只读副本和Spring Data第1部分:配置数据库
  5. 将Java类作为子进程运行
  6. WCF Testing Tool(转)
  7. Struts2源码阅读(二)_ActionContext及CleanUP Filter
  8. Linux常用命令汇总及使用方法(二)之文本编辑器VI
  9. 【报告分享】新世代、新圈层:2020垂直圈层营销报告(附下载链接)
  10. python编程入门与案例详解-自学Python 编程基础、科学计算及数据分析
  11. mysql的底层运行原理,【数据库】震惊!!MySQL的底层原理竟然是这样
  12. L230 RF可靠性测试-RF指标
  13. 相控阵天线均匀面阵方向图(六)-----方向图函数的不同表达形式
  14. 毕业论文写作与学术规范
  15. docker安装mysql8 并且忽略大小写问题解决
  16. 怎样用C++在控制台中编写俄罗斯方块
  17. eos的石墨烯技术是什么
  18. Orkut 试用报告
  19. UVA-10074 最大子矩阵 DP
  20. 快速把照片做成MV,用什么软件好?抖音火爆效果制作

热门文章

  1. [hihoCoder] 第五十周: 欧拉路·二
  2. 学以致用十-----centos7.2+python3.6+vim8.1+YouCompleteMe
  3. Cisco 2811 语音网关+callmanager拨打外线详解配置
  4. 《中国人工智能学会通讯》——10.25 跨姿态和光照变化的低分辨率人脸识别
  5. 赵雅智_Swift(2)_swift常量和变量
  6. 每天一个linux 命令 find命令
  7. 如何在linux当中,大量的添加用户
  8. tomcat处理图片或者文件不在项目里
  9. Oracle11gR2 RAC+DataGuard安装实施维护2+1_数据库集群容灾视频教程
  10. mysql数据库,当数据类型是float时,查询居然查询不出数据来