我总结出了很多互联网公司的面试题及答案,并整理成了文档,以及各种学习的进阶学习资料,免费分享给大家。
扫描二维码或搜索下图红色VX号,加VX好友,拉你进【程序员面试学习交流群】免费领取。也欢迎各位一起在群里探讨技术。
推荐文章:Java 面试知识点解析;Mysql优化技巧(数据库设计、命名规范、索引优化

前言

这篇主要讲述ThreadPoolExecutor的源码分析,贯穿类的创建、任务的添加到线程池的关闭整个流程,让你知其然所以然。希望你可以通过本篇博文知道ThreadPoolExecutor是怎么添加任务、执行任务的,以及延伸的知识点。那么先来看看ThreadPoolExecutor的继承关系吧。

继承关系

Executor接口


public interface Executor {void execute(Runnable command);}

Executor接口只有一个方法execute,传入线程任务参数

ExecutorService接口


public interface ExecutorService extends Executor {void shutdown();List<Runnable> shutdownNow();boolean isShutdown();boolean isTerminated();boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;}

ExecutorService接口继承Executor接口,并增加了submit、shutdown、invokeAll等等一系列方法。

AbstractExecutorService抽象类


public abstract class AbstractExecutorService implements ExecutorService {protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)throws InterruptedException, ExecutionException, TimeoutException {...}public <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException {... }public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {...}public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {...}public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException {...}}

AbstractExecutorService抽象类实现ExecutorService接口,并且提供了一些方法的默认实现,例如submit方法、invokeAny方法、invokeAll方法。

像execute方法、线程池的关闭方法(shutdown、shutdownNow等等)就没有提供默认的实现。

ThreadPoolExecutor

先介绍下ThreadPoolExecutor线程池的状态吧

线程池状态

int 是4个字节,也就是32位(注:一个字节等于8位


//记录线程池状态和线程数量(总共32位,前三位表示线程池状态,后29位表示线程数量)private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//线程数量统计位数29  Integer.SIZE=32 private static final int COUNT_BITS = Integer.SIZE - 3;//容量 000 11111111111111111111111111111private static final int CAPACITY   = (1 << COUNT_BITS) - 1;//运行中 111 00000000000000000000000000000private static final int RUNNING    = -1 << COUNT_BITS;//关闭 000 00000000000000000000000000000private static final int SHUTDOWN   =  0 << COUNT_BITS;//停止 001 00000000000000000000000000000private static final int STOP       =  1 << COUNT_BITS;//整理 010 00000000000000000000000000000private static final int TIDYING    =  2 << COUNT_BITS;//终止 011 00000000000000000000000000000private static final int TERMINATED =  3 << COUNT_BITS;//获取运行状态(获取前3位)private static int runStateOf(int c)     { return c & ~CAPACITY; }//获取线程个数(获取后29位)private static int workerCountOf(int c)  { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }
  • RUNNING:接受新任务并且处理阻塞队列里的任务
  • SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务
  • STOP:拒绝新任务并且抛弃阻塞队列里的任务同时会中断正在处理的任务
  • TIDYING:所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为0,将要调用terminated方法
  • TERMINATED:终止状态。terminated方法调用完成以后的状态

线程池状态转换

RUNNING -> SHUTDOWN显式调用shutdown()方法, 或者隐式调用了finalize()方法(RUNNING or SHUTDOWN) -> STOP显式调用shutdownNow()方法SHUTDOWN -> TIDYING当线程池和任务队列都为空的时候STOP -> TIDYING当线程池为空的时候TIDYING -> TERMINATED当 terminated() hook 方法执行完成时候

构造函数

有四个构造函数,其他三个都是调用下面代码中的这个构造函数


public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {}

参数介绍

参数 类型 含义
corePoolSize int 核心线程数
maximumPoolSize int 最大线程数
keepAliveTime long 存活时间
unit TimeUnit 时间单位
workQueue BlockingQueue 存放线程的队列
threadFactory ThreadFactory 创建线程的工厂
handler RejectedExecutionHandler 多余的的线程处理器(拒绝策略)

提交任务

submit


public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}

流程步骤如下

  1. 调用submit方法,传入Runnable或者Callable对象
  2. 判断传入的对象是否为null,为null则抛出异常,不为null继续流程
  3. 将传入的对象转换为RunnableFuture对象
  4. 执行execute方法,传入RunnableFuture对象
  5. 返回RunnableFuture对象

流程图如下

execute


public void execute(Runnable command) {//传进来的线程为null,则抛出空指针异常if (command == null)throw new NullPointerException();//获取当前线程池的状态+线程个数变量int c = ctl.get();/*** 3个步骤*///1.判断当前线程池线程个数是否小于corePoolSize,小于则调用addWorker方法创建新线程运行,且传进来的Runnable当做第一个任务执行。//如果调用addWorker方法返回false,则直接返回if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}//2.如果线程池处于RUNNING状态,则添加任务到阻塞队列if (isRunning(c) && workQueue.offer(command)) {//二次检查int recheck = ctl.get();//如果当前线程池状态不是RUNNING则从队列删除任务,并执行拒绝策略if (! isRunning(recheck) && remove(command))reject(command);//否者如果当前线程池线程空,则添加一个线程else if (workerCountOf(recheck) == 0)addWorker(null, false);}//3.新增线程,新增失败则执行拒绝策略else if (!addWorker(command, false))reject(command);}

其实从上面代码注释中可以看出就三个判断,

  1. 核心线程数是否已满
  2. 队列是否已满
  3. 线程池是否已满

然后根据这三个条件进行不同的操作,下图是Java并发编程的艺术书中的线程池的主要处理流程,或许会比较容易理解些

下面是整个流程的详细步骤

  1. 调用execute方法,传入Runable对象
  2. 判断传入的对象是否为null,为null则抛出异常,不为null继续流程
  3. 获取当前线程池的状态和线程个数变量
  4. 判断当前线程数是否小于核心线程数,是走流程5,否则走流程6
  5. 添加线程数,添加成功则结束,失败则重新获取当前线程池的状态和线程个数变量,
  6. 判断线程池是否处于RUNNING状态,是则添加任务到阻塞队列,否则走流程10,添加任务成功则继续流程7
  7. 重新获取当前线程池的状态和线程个数变量
  8. 重新检查线程池状态,不是运行状态则移除之前添加的任务,有一个false走流程9,都为true则走流程11
  9. 检查线程池线程数量是否为0,否则结束流程,是调用addWorker(null, false),然后结束
  10. 调用!addWorker(command, false),为true走流程11,false则结束
  11. 调用拒绝策略reject(command),结束

可能看上面会有点绕,不清楚的可以看下面的流程图

addWorker


private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 检查当前线程池状态是否是SHUTDOWN、STOP、TIDYING或者TERMINATED// 且!(当前状态为SHUTDOWN、且传入的任务为null,且队列不为null)// 条件都成立则返回falseif (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;//循环for (;;) {int wc = workerCountOf(c);//如果当前的线程数量超过最大容量或者大于(根据传入的core决定是核心线程数还是最大线程数)核心线程数 || 最大线程数,则返回falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//CAS增加c,成功则跳出retryif (compareAndIncrementWorkerCount(c))break retry;//CAS失败执行下面方法,查看当前线程数是否变化,变化则继续retry循环,没变化则继续内部循环c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;}}//CAS成功boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//新建一个线程w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {//加锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//重新检查线程池状态//避免ThreadFactory退出故障或者在锁获取前线程池被关闭int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // 先检查线程是否是可启动的throw new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//判断worker是否添加成功,成功则启动线程,然后将workerStarted设置为trueif (workerAdded) {t.start();workerStarted = true;}}} finally {//判断线程有没有启动成功,没有则调用addWorkerFailed方法if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

这里可以将addWorker分为两部分,第一部分增加线程池个数,第二部分是将任务添加到workder里面并执行。

第一部分主要是两个循环,外层循环主要是判断线程池状态,下面描述来自Java中线程池ThreadPoolExecutor原理探究

rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty())

展开!运算后等价于

s >= SHUTDOWN &&(rs != SHUTDOWN ||firstTask != null ||workQueue.isEmpty())

也就是说下面几种情况下会返回false:

  • 当前线程池状态为STOP,TIDYING,TERMINATED
  • 当前线程池状态为SHUTDOWN并且已经有了第一个任务
  • 当前线程池状态为SHUTDOWN并且任务队列为空

内层循环作用是使用cas增加线程个数,如果线程个数超限则返回false,否者进行cas,cas成功则退出双循环,否者cas失败了,要看当前线程池的状态是否变化了,如果变了,则重新进入外层循环重新获取线程池状态,否者进入内层循环继续进行cas尝试。

到了第二部分说明CAS成功了,也就是说线程个数加一了,但是现在任务还没开始执行,这里使用全局的独占锁来控制workers里面添加任务,其实也可以使用并发安全的set,但是性能没有独占锁好(这个从注释中知道的)。这里需要注意的是要在获取锁后重新检查线程池的状态,这是因为其他线程可可能在本方法获取锁前改变了线程池的状态,比如调用了shutdown方法。添加成功则启动任务执行。

所以这里也将流程图分为两部分来描述

第一部分流程图

第二部分流程图

Worker对象

Worker是定义在ThreadPoolExecutor中的finnal类,其中继承了AbstractQueuedSynchronizer类和实现Runnable接口,其中的run方法如下

public void run() {runWorker(this);}

线程启动时调用了runWorker方法,关于类的其他方面这里就不在叙述。

runWorker


final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock();boolean completedAbruptly = true;try {//循环获取任务while (task != null || (task = getTask()) != null) {w.lock();// 当线程池是处于STOP状态或者TIDYING、TERMINATED状态时,设置当前线程处于中断状态// 如果不是,当前线程就处于RUNNING或者SHUTDOWN状态,确保当前线程不处于中断状态// 重新检查当前线程池的状态是否大于等于STOP状态if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//提供给继承类使用做一些统计之类的事情,在线程运行前调用beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//提供给继承类使用做一些统计之类的事情,在线程运行之后调用afterExecute(task, thrown);}} finally {task = null;//统计当前worker完成了多少个任务w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {//整个线程结束时调用,线程退出操作。统计整个线程池完成的任务个数之类的工作processWorkerExit(w, completedAbruptly);}}

getTask

getTask方法的主要作用其实从方法名就可以看出来了,就是获取任务


private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?//循环for (;;) {int c = ctl.get();int rs = runStateOf(c);//线程线程池状态和队列是否为空if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}//线程数量int wc = workerCountOf(c);boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//(当前线程数是否大于最大线程数或者)//且(线程数大于1或者任务队列为空)//这里有个问题(timed && timedOut)timedOut = false,好像(timed && timedOut)一直都是false吧if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//获取任务Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

关闭线程池

shutdown

当调用shutdown方法时,线程池将不会再接收新的任务,然后将先前放在队列中的任务执行完成。

下面是shutdown方法的源码


public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();}

shutdownNow

立即停止所有的执行任务,并将队列中的任务返回


public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP);interruptWorkers();tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;}

shutdown和shutdownNow区别

shutdown和shutdownNow这两个方法的作用都是关闭线程池,流程大致相同,只有几个步骤不同,如下

  1. 加锁
  2. 检查关闭权限
  3. CAS改变线程池状态
  4. 设置中断标志(线程池不在接收任务,队列任务会完成)/中断当前执行的线程
  5. 调用onShutdown方法(给子类提供的方法)/获取队列中的任务
  6. 解锁
  7. 尝试将线程池状态变成终止状态TERMINATED
  8. 结束/返回队列中的任务

总结

线程池可以给我们多线程编码上提供极大便利,就好像数据库连接池一样,减少了线程的开销,提供了线程的复用。而且ThreadPoolExecutor也提供了一些未实现的方法,供我们来使用,像beforeExecute、afterExecute等方法,我们可以通过这些方法来对线程进行进一步的管理和统计。

在使用线程池上好需要注意,提交的线程任务可以分为CPU 密集型任务IO 密集型任务,然后根据任务的不同进行分配不同的线程数量。

  • CPU密集型任务:

    • 应当分配较少的线程,比如 CPU个数相当的大小
  • IO 密集型任务:
    • 由于线程并不是一直在运行,所以可以尽可能的多配置线程,比如 CPU 个数 * 2
  • 混合型任务:
    • 可以将其拆分为 CPU 密集型任务以及 IO 密集型任务,这样来分别配置。

好了,这篇博文到这里就结束了,文中可能会有些纰漏,欢迎留言指正。

如果本文对你有所帮助,给个star呗,谢谢。本文GitHub地址:点这里点这里

参考资料

  1. 并发编程网-Java中线程池ThreadPoolExecutor原理探究
  2. Java并发编程的艺术

转载:https://www.cnblogs.com/fixzd/p/9253203.html

推荐内容:
java面试题之----HashMap常见面试题总结
Java 微服务框架选型(Dubbo 和 Spring Cloud?)
Java面试官最常问的volatile关键字
推荐几个IDEA插件,Java开发者撸码利器。
java实现HTTP请求的三种方式
【面试题】2018年最全Java面试通关秘籍汇总集!
最新Java技术
2019 Java面试题
Java数据结构和算法(十一)——红黑树
面试 12:玩转 Java 快速排序

java多线程系列:ThreadPoolExecutor源码分析,java基础面试笔试题相关推荐

  1. JAVA线程池(ThreadPoolExecutor)源码分析

    JAVA5提供了多种类型的线程池,如果你对这些线程池的特点以及类型不太熟悉或者非常熟悉,请帮忙看看这篇文章(顺便帮忙解决里面存在的问题,谢谢!):     http://xtu-xiaoxin.ite ...

  2. java jdbc(mysql)驱动源码分析,JAVA JDBC(MySQL)驱动源码分析(四)

    connect方法是java.sql.Driver接口中定义的方法,如果连接的数据库不同,那么为不同的数据库编写JDBC驱动将变得很灵活,实现Driver接口即可.连接数据库时首先得装载JDBC驱动, ...

  3. Java多线程 -- JUC包源码分析2 -- Copy On Write/CopyOnWriteArrayList/CopyOnWriteArraySet

    本人新书出版,对技术感兴趣的朋友请关注: https://mp.weixin.qq.com/s/uq2cw2Lgf-s4nPHJ4WH4aw 上1篇讲述了Java并发编程的第1个基本思想–CAS/乐观 ...

  4. Java线程池ThreadPoolExecutor源码分析

    继承关系 Executor接口 public interface Executor {void execute(Runnable command); } ExecutorService接口 publi ...

  5. Java集合Collection源码系列-ArrayList源码分析

    Java集合系列-ArrayList源码分析 文章目录 Java集合系列-ArrayList源码分析 前言 一.为什么想去分析ArrayList源码? 二.源码分析 1.宏观上分析List 2.方法汇 ...

  6. idea 线程内存_Java线程池系列之-Java线程池底层源码分析系列(一)

    课程简介: 课程目标:通过本课程学习,深入理解Java线程池,提升自身技术能力与价值. 适用人群:具有Java多线程基础的人群,希望深入理解线程池底层原理的人群. 课程概述:多线程的异步执行方式,虽然 ...

  7. idea 线程内存_Java线程池系列之-Java线程池底层源码分析系列(二)

    课程简介: 课程目标:通过本课程学习,深入理解Java线程池,提升自身技术能力与价值. 适用人群:具有Java多线程基础的人群,希望深入理解线程池底层原理的人群. 课程概述:多线程的异步执行方式,虽然 ...

  8. 死磕 java集合之ArrayDeque源码分析

    问题 (1)什么是双端队列? (2)ArrayDeque是怎么实现双端队列的? (3)ArrayDeque是线程安全的吗? (4)ArrayDeque是有界的吗? 简介 双端队列是一种特殊的队列,它的 ...

  9. 深入源码分析Java线程池的实现原理

    转载自   深入源码分析Java线程池的实现原理 程序的运行,其本质上,是对系统资源(CPU.内存.磁盘.网络等等)的使用.如何高效的使用这些资源是我们编程优化演进的一个方向.今天说的线程池就是一种对 ...

  10. 【死磕 Java 集合】— LinkedTransferQueue源码分析

    [死磕 Java 集合]- LinkedTransferQueue源码分析 问题 (1)LinkedTransferQueue是什么东东? (2)LinkedTransferQueue是怎么实现阻塞队 ...

最新文章

  1. oracle数据库详细性能参数,ORACLE数据库性能参数的优化
  2. DBMS_STATS.GATHER_TABLE_STATS详解
  3. postgresql中表的继承及分区表(四)
  4. 【Python面试】 列举Python中的标准异常类?
  5. 神经网络- receptive field
  6. 百度地图批量转换 GPS坐标转百度地图坐标 问题
  7. (转)The Standard C Library 经典的基础(上)
  8. RocketMQ入门到入土(七 )为什么同一个消费组设置不同tag会出现奇怪现象
  9. x264_param_default分析
  10. 数据库服务器信息填写,数据库服务器是怎么填写
  11. (转)在endnote中制作GB/T7714《文后参考文献著录规则》的输出格式
  12. linux禁调usb,Linux主机禁用USB接口
  13. Apple音视频播放器 QuickTime Player7.7.9 专业版
  14. 时间序列 R 08 指数平滑 Exponential smoothing
  15. Excel中《分列》高效到爆的用法,8个案例,太实用了!
  16. readmemh函数引用的txt格式_[转载](zz)用于读取和写入文本文件Verilog代码
  17. 面向对象程序设计php,php面向对象的程序设计
  18. 转:加密的惨剧!慎用文件夹加密软件!
  19. 疫情下的口罩生活,演绎出了哪些心理剧场?
  20. YouTube 十岁了,这十年里视频网站改变了什么?

热门文章

  1. 泛海精灵软件预发布统计报告 反馈
  2. VC学习笔记:简单绘图
  3. NameNode之文件系统目录树
  4. Linux脚本5秒后启动程序,嵌入式Linux启动时间优化的秘密之四-启动脚本
  5. 集成springboot案例_SpringBoot开发者都在用的五款优质扩展,每个都很能打!
  6. python 直线过滤掉不在边缘上的点_不存在所谓的机器学习平台
  7. oracle 行数大于一时,oracle – PL / SQL ORA-01422:精确的提取返回超过请求的行数
  8. 静态库-动态库混合编译
  9. CIF、QCIF、HD1、D1格式介绍
  10. php基础知识填空题,比较基础的php面试题及答案填空题