java多线程系列:ThreadPoolExecutor源码分析,java基础面试笔试题
我总结出了很多互联网公司的面试题及答案,并整理成了文档,以及各种学习的进阶学习资料,免费分享给大家。
扫描二维码或搜索下图红色VX号,加VX好友,拉你进【程序员面试学习交流群】免费领取。也欢迎各位一起在群里探讨技术。
推荐文章:Java 面试知识点解析;Mysql优化技巧(数据库设计、命名规范、索引优化
前言
这篇主要讲述ThreadPoolExecutor的源码分析,贯穿类的创建、任务的添加到线程池的关闭整个流程,让你知其然所以然。希望你可以通过本篇博文知道ThreadPoolExecutor是怎么添加任务、执行任务的,以及延伸的知识点。那么先来看看ThreadPoolExecutor的继承关系吧。
继承关系
Executor接口
public interface Executor {void execute(Runnable command);}
Executor接口只有一个方法execute,传入线程任务参数
ExecutorService接口
public interface ExecutorService extends Executor {void shutdown();List<Runnable> shutdownNow();boolean isShutdown();boolean isTerminated();boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;}
ExecutorService接口继承Executor接口,并增加了submit、shutdown、invokeAll等等一系列方法。
AbstractExecutorService抽象类
public abstract class AbstractExecutorService implements ExecutorService {protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)throws InterruptedException, ExecutionException, TimeoutException {...}public <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException {... }public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {...}public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {...}public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException {...}}
AbstractExecutorService抽象类实现ExecutorService接口,并且提供了一些方法的默认实现,例如submit方法、invokeAny方法、invokeAll方法。
像execute方法、线程池的关闭方法(shutdown、shutdownNow等等)就没有提供默认的实现。
ThreadPoolExecutor
先介绍下ThreadPoolExecutor线程池的状态吧
线程池状态
int 是4个字节,也就是32位(注:一个字节等于8位
)
//记录线程池状态和线程数量(总共32位,前三位表示线程池状态,后29位表示线程数量)private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//线程数量统计位数29 Integer.SIZE=32 private static final int COUNT_BITS = Integer.SIZE - 3;//容量 000 11111111111111111111111111111private static final int CAPACITY = (1 << COUNT_BITS) - 1;//运行中 111 00000000000000000000000000000private static final int RUNNING = -1 << COUNT_BITS;//关闭 000 00000000000000000000000000000private static final int SHUTDOWN = 0 << COUNT_BITS;//停止 001 00000000000000000000000000000private static final int STOP = 1 << COUNT_BITS;//整理 010 00000000000000000000000000000private static final int TIDYING = 2 << COUNT_BITS;//终止 011 00000000000000000000000000000private static final int TERMINATED = 3 << COUNT_BITS;//获取运行状态(获取前3位)private static int runStateOf(int c) { return c & ~CAPACITY; }//获取线程个数(获取后29位)private static int workerCountOf(int c) { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }
- RUNNING:接受新任务并且处理阻塞队列里的任务
- SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务
- STOP:拒绝新任务并且抛弃阻塞队列里的任务同时会中断正在处理的任务
- TIDYING:所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为0,将要调用terminated方法
- TERMINATED:终止状态。terminated方法调用完成以后的状态
线程池状态转换
RUNNING -> SHUTDOWN显式调用shutdown()方法, 或者隐式调用了finalize()方法(RUNNING or SHUTDOWN) -> STOP显式调用shutdownNow()方法SHUTDOWN -> TIDYING当线程池和任务队列都为空的时候STOP -> TIDYING当线程池为空的时候TIDYING -> TERMINATED当 terminated() hook 方法执行完成时候
构造函数
有四个构造函数,其他三个都是调用下面代码中的这个构造函数
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {}
参数介绍
参数 | 类型 | 含义 |
---|---|---|
corePoolSize | int | 核心线程数 |
maximumPoolSize | int | 最大线程数 |
keepAliveTime | long | 存活时间 |
unit | TimeUnit | 时间单位 |
workQueue | BlockingQueue | 存放线程的队列 |
threadFactory | ThreadFactory | 创建线程的工厂 |
handler | RejectedExecutionHandler | 多余的的线程处理器(拒绝策略) |
提交任务
submit
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}
流程步骤如下
- 调用submit方法,传入Runnable或者Callable对象
- 判断传入的对象是否为null,为null则抛出异常,不为null继续流程
- 将传入的对象转换为RunnableFuture对象
- 执行execute方法,传入RunnableFuture对象
- 返回RunnableFuture对象
流程图如下
execute
public void execute(Runnable command) {//传进来的线程为null,则抛出空指针异常if (command == null)throw new NullPointerException();//获取当前线程池的状态+线程个数变量int c = ctl.get();/*** 3个步骤*///1.判断当前线程池线程个数是否小于corePoolSize,小于则调用addWorker方法创建新线程运行,且传进来的Runnable当做第一个任务执行。//如果调用addWorker方法返回false,则直接返回if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//2.如果线程池处于RUNNING状态,则添加任务到阻塞队列if (isRunning(c) && workQueue.offer(command)) {//二次检查int recheck = ctl.get();//如果当前线程池状态不是RUNNING则从队列删除任务,并执行拒绝策略if (! isRunning(recheck) && remove(command))reject(command);//否者如果当前线程池线程空,则添加一个线程else if (workerCountOf(recheck) == 0)addWorker(null, false);}//3.新增线程,新增失败则执行拒绝策略else if (!addWorker(command, false))reject(command);}
其实从上面代码注释中可以看出就三个判断,
- 核心线程数是否已满
- 队列是否已满
- 线程池是否已满
然后根据这三个条件进行不同的操作,下图是Java并发编程的艺术书中的线程池的主要处理流程,或许会比较容易理解些
下面是整个流程的详细步骤
- 调用execute方法,传入Runable对象
- 判断传入的对象是否为null,为null则抛出异常,不为null继续流程
- 获取当前线程池的状态和线程个数变量
- 判断当前线程数是否小于核心线程数,是走流程5,否则走流程6
- 添加线程数,添加成功则结束,失败则重新获取当前线程池的状态和线程个数变量,
- 判断线程池是否处于RUNNING状态,是则添加任务到阻塞队列,否则走流程10,添加任务成功则继续流程7
- 重新获取当前线程池的状态和线程个数变量
- 重新检查线程池状态,不是运行状态则移除之前添加的任务,有一个false走流程9,都为true则走流程11
- 检查线程池线程数量是否为0,否则结束流程,是调用addWorker(null, false),然后结束
- 调用!addWorker(command, false),为true走流程11,false则结束
- 调用拒绝策略reject(command),结束
可能看上面会有点绕,不清楚的可以看下面的流程图
addWorker
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 检查当前线程池状态是否是SHUTDOWN、STOP、TIDYING或者TERMINATED// 且!(当前状态为SHUTDOWN、且传入的任务为null,且队列不为null)// 条件都成立则返回falseif (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;//循环for (;;) {int wc = workerCountOf(c);//如果当前的线程数量超过最大容量或者大于(根据传入的core决定是核心线程数还是最大线程数)核心线程数 || 最大线程数,则返回falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//CAS增加c,成功则跳出retryif (compareAndIncrementWorkerCount(c))break retry;//CAS失败执行下面方法,查看当前线程数是否变化,变化则继续retry循环,没变化则继续内部循环c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;}}//CAS成功boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//新建一个线程w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {//加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//重新检查线程池状态//避免ThreadFactory退出故障或者在锁获取前线程池被关闭int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // 先检查线程是否是可启动的throw new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//判断worker是否添加成功,成功则启动线程,然后将workerStarted设置为trueif (workerAdded) {t.start();workerStarted = true;}}} finally {//判断线程有没有启动成功,没有则调用addWorkerFailed方法if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
这里可以将addWorker分为两部分,第一部分增加线程池个数,第二部分是将任务添加到workder里面并执行。
第一部分主要是两个循环,外层循环主要是判断线程池状态,下面描述来自Java中线程池ThreadPoolExecutor原理探究
rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty())
展开!运算后等价于
s >= SHUTDOWN &&(rs != SHUTDOWN ||firstTask != null ||workQueue.isEmpty())
也就是说下面几种情况下会返回false:
- 当前线程池状态为STOP,TIDYING,TERMINATED
- 当前线程池状态为SHUTDOWN并且已经有了第一个任务
- 当前线程池状态为SHUTDOWN并且任务队列为空
内层循环作用是使用cas增加线程个数,如果线程个数超限则返回false,否者进行cas,cas成功则退出双循环,否者cas失败了,要看当前线程池的状态是否变化了,如果变了,则重新进入外层循环重新获取线程池状态,否者进入内层循环继续进行cas尝试。
到了第二部分说明CAS成功了,也就是说线程个数加一了,但是现在任务还没开始执行,这里使用全局的独占锁来控制workers里面添加任务,其实也可以使用并发安全的set,但是性能没有独占锁好(这个从注释中知道的)。这里需要注意的是要在获取锁后重新检查线程池的状态,这是因为其他线程可可能在本方法获取锁前改变了线程池的状态,比如调用了shutdown方法。添加成功则启动任务执行。
所以这里也将流程图分为两部分来描述
第一部分流程图
第二部分流程图
Worker对象
Worker是定义在ThreadPoolExecutor中的finnal类,其中继承了AbstractQueuedSynchronizer类和实现Runnable接口,其中的run方法如下
public void run() {runWorker(this);}
线程启动时调用了runWorker方法,关于类的其他方面这里就不在叙述。
runWorker
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock();boolean completedAbruptly = true;try {//循环获取任务while (task != null || (task = getTask()) != null) {w.lock();// 当线程池是处于STOP状态或者TIDYING、TERMINATED状态时,设置当前线程处于中断状态// 如果不是,当前线程就处于RUNNING或者SHUTDOWN状态,确保当前线程不处于中断状态// 重新检查当前线程池的状态是否大于等于STOP状态if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//提供给继承类使用做一些统计之类的事情,在线程运行前调用beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//提供给继承类使用做一些统计之类的事情,在线程运行之后调用afterExecute(task, thrown);}} finally {task = null;//统计当前worker完成了多少个任务w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {//整个线程结束时调用,线程退出操作。统计整个线程池完成的任务个数之类的工作processWorkerExit(w, completedAbruptly);}}
getTask
getTask方法的主要作用其实从方法名就可以看出来了,就是获取任务
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?//循环for (;;) {int c = ctl.get();int rs = runStateOf(c);//线程线程池状态和队列是否为空if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}//线程数量int wc = workerCountOf(c);boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//(当前线程数是否大于最大线程数或者)//且(线程数大于1或者任务队列为空)//这里有个问题(timed && timedOut)timedOut = false,好像(timed && timedOut)一直都是false吧if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//获取任务Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
关闭线程池
shutdown
当调用shutdown方法时,线程池将不会再接收新的任务,然后将先前放在队列中的任务执行完成。
下面是shutdown方法的源码
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();}
shutdownNow
立即停止所有的执行任务,并将队列中的任务返回
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP);interruptWorkers();tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;}
shutdown和shutdownNow区别
shutdown和shutdownNow这两个方法的作用都是关闭线程池,流程大致相同,只有几个步骤不同,如下
- 加锁
- 检查关闭权限
- CAS改变线程池状态
- 设置中断标志(线程池不在接收任务,队列任务会完成)/中断当前执行的线程
- 调用onShutdown方法(给子类提供的方法)/获取队列中的任务
- 解锁
- 尝试将线程池状态变成终止状态TERMINATED
- 结束/返回队列中的任务
总结
线程池可以给我们多线程编码上提供极大便利,就好像数据库连接池一样,减少了线程的开销,提供了线程的复用。而且ThreadPoolExecutor也提供了一些未实现的方法,供我们来使用,像beforeExecute、afterExecute等方法,我们可以通过这些方法来对线程进行进一步的管理和统计。
在使用线程池上好需要注意,提交的线程任务可以分为CPU 密集型任务
和IO 密集型任务
,然后根据任务的不同进行分配不同的线程数量。
- CPU密集型任务:
- 应当分配较少的线程,比如
CPU
个数相当的大小
- 应当分配较少的线程,比如
- IO 密集型任务:
- 由于线程并不是一直在运行,所以可以尽可能的多配置线程,比如 CPU 个数 * 2
- 混合型任务:
- 可以将其拆分为
CPU
密集型任务以及IO
密集型任务,这样来分别配置。
- 可以将其拆分为
好了,这篇博文到这里就结束了,文中可能会有些纰漏,欢迎留言指正。
如果本文对你有所帮助,给个star呗,谢谢。本文GitHub地址:点这里点这里
参考资料
- 并发编程网-Java中线程池ThreadPoolExecutor原理探究
- Java并发编程的艺术
转载:https://www.cnblogs.com/fixzd/p/9253203.html
推荐内容:
java面试题之----HashMap常见面试题总结
Java 微服务框架选型(Dubbo 和 Spring Cloud?)
Java面试官最常问的volatile关键字
推荐几个IDEA插件,Java开发者撸码利器。
java实现HTTP请求的三种方式
【面试题】2018年最全Java面试通关秘籍汇总集!
最新Java技术
2019 Java面试题
Java数据结构和算法(十一)——红黑树
面试 12:玩转 Java 快速排序
java多线程系列:ThreadPoolExecutor源码分析,java基础面试笔试题相关推荐
- JAVA线程池(ThreadPoolExecutor)源码分析
JAVA5提供了多种类型的线程池,如果你对这些线程池的特点以及类型不太熟悉或者非常熟悉,请帮忙看看这篇文章(顺便帮忙解决里面存在的问题,谢谢!): http://xtu-xiaoxin.ite ...
- java jdbc(mysql)驱动源码分析,JAVA JDBC(MySQL)驱动源码分析(四)
connect方法是java.sql.Driver接口中定义的方法,如果连接的数据库不同,那么为不同的数据库编写JDBC驱动将变得很灵活,实现Driver接口即可.连接数据库时首先得装载JDBC驱动, ...
- Java多线程 -- JUC包源码分析2 -- Copy On Write/CopyOnWriteArrayList/CopyOnWriteArraySet
本人新书出版,对技术感兴趣的朋友请关注: https://mp.weixin.qq.com/s/uq2cw2Lgf-s4nPHJ4WH4aw 上1篇讲述了Java并发编程的第1个基本思想–CAS/乐观 ...
- Java线程池ThreadPoolExecutor源码分析
继承关系 Executor接口 public interface Executor {void execute(Runnable command); } ExecutorService接口 publi ...
- Java集合Collection源码系列-ArrayList源码分析
Java集合系列-ArrayList源码分析 文章目录 Java集合系列-ArrayList源码分析 前言 一.为什么想去分析ArrayList源码? 二.源码分析 1.宏观上分析List 2.方法汇 ...
- idea 线程内存_Java线程池系列之-Java线程池底层源码分析系列(一)
课程简介: 课程目标:通过本课程学习,深入理解Java线程池,提升自身技术能力与价值. 适用人群:具有Java多线程基础的人群,希望深入理解线程池底层原理的人群. 课程概述:多线程的异步执行方式,虽然 ...
- idea 线程内存_Java线程池系列之-Java线程池底层源码分析系列(二)
课程简介: 课程目标:通过本课程学习,深入理解Java线程池,提升自身技术能力与价值. 适用人群:具有Java多线程基础的人群,希望深入理解线程池底层原理的人群. 课程概述:多线程的异步执行方式,虽然 ...
- 死磕 java集合之ArrayDeque源码分析
问题 (1)什么是双端队列? (2)ArrayDeque是怎么实现双端队列的? (3)ArrayDeque是线程安全的吗? (4)ArrayDeque是有界的吗? 简介 双端队列是一种特殊的队列,它的 ...
- 深入源码分析Java线程池的实现原理
转载自 深入源码分析Java线程池的实现原理 程序的运行,其本质上,是对系统资源(CPU.内存.磁盘.网络等等)的使用.如何高效的使用这些资源是我们编程优化演进的一个方向.今天说的线程池就是一种对 ...
- 【死磕 Java 集合】— LinkedTransferQueue源码分析
[死磕 Java 集合]- LinkedTransferQueue源码分析 问题 (1)LinkedTransferQueue是什么东东? (2)LinkedTransferQueue是怎么实现阻塞队 ...
最新文章
- oracle数据库详细性能参数,ORACLE数据库性能参数的优化
- DBMS_STATS.GATHER_TABLE_STATS详解
- postgresql中表的继承及分区表(四)
- 【Python面试】 列举Python中的标准异常类?
- 神经网络- receptive field
- 百度地图批量转换 GPS坐标转百度地图坐标 问题
- (转)The Standard C Library 经典的基础(上)
- RocketMQ入门到入土(七 )为什么同一个消费组设置不同tag会出现奇怪现象
- x264_param_default分析
- 数据库服务器信息填写,数据库服务器是怎么填写
- (转)在endnote中制作GB/T7714《文后参考文献著录规则》的输出格式
- linux禁调usb,Linux主机禁用USB接口
- Apple音视频播放器 QuickTime Player7.7.9 专业版
- 时间序列 R 08 指数平滑 Exponential smoothing
- Excel中《分列》高效到爆的用法,8个案例,太实用了!
- readmemh函数引用的txt格式_[转载](zz)用于读取和写入文本文件Verilog代码
- 面向对象程序设计php,php面向对象的程序设计
- 转:加密的惨剧!慎用文件夹加密软件!
- 疫情下的口罩生活,演绎出了哪些心理剧场?
- YouTube 十岁了,这十年里视频网站改变了什么?
热门文章
- 泛海精灵软件预发布统计报告 反馈
- VC学习笔记:简单绘图
- NameNode之文件系统目录树
- Linux脚本5秒后启动程序,嵌入式Linux启动时间优化的秘密之四-启动脚本
- 集成springboot案例_SpringBoot开发者都在用的五款优质扩展,每个都很能打!
- python 直线过滤掉不在边缘上的点_不存在所谓的机器学习平台
- oracle 行数大于一时,oracle – PL / SQL ORA-01422:精确的提取返回超过请求的行数
- 静态库-动态库混合编译
- CIF、QCIF、HD1、D1格式介绍
- php基础知识填空题,比较基础的php面试题及答案填空题