并发编程之 Executor 线程池原理与源码解读
并发编程之 Executor 线程池原理与源码解读
线程是调度 CPU 资源的最小单位,线程模型分为 KLT 模型与 ULT 模型,JVM使用的是 KLT 模型。java线程与 OS 线程保持 1:1 的映射关系,也就是说有一个 java 线程也会在操作系统里有一个对应的线程。java 线程有多种生命状态:
- NEW 新建
- RUNNABLE 运行
- BLOCKED 阻塞
- WAITING 等待
- TIMED_WAITING 超时等待
- TERMINAIED 终结
1. ThreadPoolExecutor
ThreadPoolExecutor 构造方法
public ThreadPoolExecutor(int corePoolSize, // 核心线程数int maximumPoolSize, // 最大线程数 = 核心线程数 + 非核心线程数long keepAliveTime, // 最大允许线程不干活的时间TimeUnit unit, // 时间单位BlockingQueue<Runnable> workQueue, // 存放未来得及执行的任务ThreadFactory threadFactory, // 创建线程的工厂RejectedExecutionHandler handler // 拒绝策略
) {...}
线程本身没有标记核心和非核心,只是用线程数量进行区分
1.1 线程池运行的流程图
1.2 线程池的状态
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
RUNNING
(1)状态说明:线程池处在 RUNNING 状态时, 能够接收新任务,以及对已添加的任务进行处理
(2)状态切换:线程池的初始化状态是 RUNNING。换句话说,线程一旦被创建,就处于 RUNNING 状态,并且线程池中的任务数为 0 !
SHUDOWN
(1)状态说明:线程池处在 SHUTDOWN 状态时,不接收新任务,但能够处理已添加的任务
(2)状态切换:调用线程池的 shutdown() 接口时,线程池由 RUNNING -> SHUTDOWN
STOP
(1)状态说明:线程池处在 STOP 状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务
(2)状态切换:调用线程池的shutdownNow() 接口时,线程池由(RUNNING or SHUTDOWN)->STOP
TIDYING
(1)状态说明:当所有的任务已终止,ctl 记录的 “任务数量” 为 0,线程池会变为 TIDYING 状态。当线程池变为 TIDYING 状态时,会执行钩子函数 terminated() 。terminated() 再 ThreadPoolExecutor 类中是空的,若用户想在线程池变为 TIDYING 时,进行相应的处理;可以通过重载 terminated() 函数来实现。
(2)状态切换:当线程池在 SHUTDOWN 状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。当线程池在 STOP 状态下,线程池中执行任务为空时,就会由 STOP -> TIDYING
TERMINATED
(1)状态说明:线程池彻底终止,就变成 TERMINATED 状态
(2)状态切换:线程池处在 TIDYING 状态时,执行完 terminated() 之后,就会由 TIDYING -> TERMINATED
进入 TERMINATED 的条件如下:
- 线程池不是 RUNNING 状态
- 线程池状态不是 TIDYING 状态或 TERMINATED 状态
- 如果线程池状态是 SHUTDOWN 并且 workerQueue 为空
- workerCount 为 0
- 设置 TIDYING 状态成功
1.3 重要方法源码解读
1.3.1 execute() 方法
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 1. 如果工作的线程数 < 核心线程数,创建核心线程if (workerCountOf(c) < corePoolSize) {// true 表示要创建的线程为核心线程,如果创建成功则返回if (addWorker(command, true))return;// 核心线程创建失败,重新获取 c 的状态c = ctl.get();}// 2. 如果线程池没有关闭,即线程池正在运行,那么将当前任务添加到阻塞队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 再次检查,如果线程池已关闭,那么就移除任务if (! isRunning(recheck) && remove(command))// 拒绝任务reject(command);// 如果线程池的状态是 shutdown 状态else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 3. 创建非核心线程,如果非核心线程数达到上限,触发拒绝策略else if (!addWorker(command, false))reject(command);
}
1.3.2 Worker 类
worker 是继承 AbstractQueuedSynchronizer 类,且实现了 Runnable 接口
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable {...}
成员变量
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
构造方法
Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;// 使用线程工厂,创建一个新的线程,并将这个 worker 自己作为任务丢给新创建的线程this.thread = getThreadFactory().newThread(this);
}
run() 方法
内部类 Worker 的 run() 方法调用外部类方法 runWorker()
public void run() {runWorker(this);
}
1.3.3 runWorker() 方法(为什么线程可以重用)
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;// 线程刚刚创建,正在创建不允许中断 w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 为什么线程池可以重用? 循环 + 条件// 线程超时处理?while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}// 出现 Exception 时,该行不会执行completedAbruptly = false;} finally {// completedAbruptly = trueprocessWorkerExit(w, completedAbruptly);}
}
如果任务执行出现异常,异常会往外抛出,同时执行如上两个 finally 代码块。第一个 finally 释放锁;第二个 finally清理掉异常任务,并调用 addWorker(null,false)方法,继续从队列中取出下一个任务执行。
private void processWorkerExit(Worker w, boolean completedAbruptly) {// completedAbruptly = true 表明有线程因抛出异常而挂掉,总的线程数量需要减去 1if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;// 从 workers 集合中移除执行完毕的任务(包含挂掉的线程任务)workers.remove(w);} finally {mainLock.unlock();}// 如果符合终止条件,尝试终止线程(清理掉空闲线程)tryTerminate();int c = ctl.get();// 判断当前线程池的生命状态为 RUNNING 或 SHUTDOWN,继续从队列获取任务执行if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}// 继续获取队列中的一下个任务去执行addWorker(null, false);}
}
1.3.4 getTask() 方法
allowCoreThreadTimeOut
- true 标识核心线程也会过期,过期的核心线程将会被清理(需手动设置)
- false (默认)标识核心线程不会过期
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {// 状态判断int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?// allowCoreThreadTimeOut 设置为 true,那么核心线程也会过期(需手动设置,默认不过期)boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}// try {Runnable r = timed ?// timed = true 阻塞队列 当队列为空,条件不满足,被阻塞指定时间!// timed = false 阻塞队列 take() 没有时间限制,那么就会一直被阻塞,等在这里workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}
1.3.5 addWorker() 方法
private boolean addWorker(Runnable firstTask, boolean core) {// 判断线程池状态retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 创建一个新的 worker,并将第一个任务丢给它w = new Worker(firstTask);final Thread t = w.thread;// 如果 worker 的成员变量 thread 不为空if (t != null) {// 开始获取锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());// 状态判断if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// 将 worker 放到 HashSet<Worker> workers 集合中去,如果线程被终结需维持引用workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// 启动线程,调用的是 Worker 中的 run() 方法t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}
1.4 拒绝策略
如果在任务不允许丢的情况下,可以重写拒绝策略,然后将任务存放在中间件中如 redis。
当阻塞队列已满,就将任务写入redis中,开启一个线程,专门监测阻塞队列使用情况。当阻塞队列占用降低到 50% 时,再将 redis 中存放的任务取出放入阻塞队列中。
处理拒绝策略的 handler,声明为 volatile 是为了在多线程运行环境下,可能会更改拒绝策略,以便通知到其它线程
private volatile RejectedExecutionHandler handler;
默认的拒绝策略,直接往外抛异常
private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();
1.4. 1 AbortPolicy
默认的拒绝策略,直接往外抛异常
public AbortPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());
}
1.4.2 DiscardOldestPolicy
如果线程池没有关闭,将阻塞队列对头的线程丢弃,将本次的 task 推到队尾排队
public static class DiscardOldestPolicy implements RejectedExecutionHandler {public DiscardOldestPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {// 如果线程池没有关闭,将老的对头线程丢弃掉,将 task 放到队尾if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}
}
1.4.3 CallerRunsPolicy
如果线程池拒绝执行本次任务,且线程池没有关闭,直接由当前提交的线程 task 自己去执行,不给线程池执行
public static class CallerRunsPolicy implements RejectedExecutionHandler {public CallerRunsPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {// 如果线程池没有关闭,直接由当前提交的线程 task 自己去执行,不给线程池执行if (!e.isShutdown()) {r.run();}}
}
1.4.4 DiscardPolicy
public static class DiscardPolicy implements RejectedExecutionHandler {public DiscardPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
}
1.5 ThreadPoolExecutor 参数设置
1.5.1 线程数量设置
CPU 密集型 CPU 核数 + 1(计算任务较多,CPU 没有等待时间,一直在计算任务)
IO 密集型 2 * CPU 核数 + 1(IO读写等待时间长,充分利用 CPU 时间片,等待时间可执行更多的 IO 操作)
rocketMQ、Eureka、Nacos 2 * CPU
1.5.2 最佳线程数
最佳线程数 = CPU 核数 * [1 + ( I/O 耗时 / CPU 耗时 ) ]
2. ScheduledThreadPoolExecutor
2.1 定时线程池的类结构图
并发编程之 Executor 线程池原理与源码解读相关推荐
- 并发编程之Executor线程池原理与源码解读
1. 线程池 "线程池",顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不 仅会消耗系统资源,还会降低系统的稳定性,因此Java中提供线程池对线程进行统一分配. 调 ...
- Java并发编程之Java线程池
Java线程池: 线程池的核心配置参数: //线程等待任务的超时时间,当线程池的线程个数超过corePoolSize时生效,当线程等待任务的时间超过keepAliveTime时,线程池会停止超过cor ...
- 【Java 并发编程】线程池机制 ( 线程池执行任务细节分析 | 线程池执行 execute 源码分析 | 先创建核心线程 | 再放入阻塞队列 | 最后创建非核心线程 )
文章目录 一.线程池执行任务细节分析 二.线程池执行 execute 源码分析 一.线程池执行任务细节分析 线程池执行细节分析 : 核心线程数 101010 , 最大小成熟 202020 , 非核心线 ...
- Java线程池状态判断源码_深入浅出Java线程池:源码篇
前言 在上一篇文章深入浅出Java线程池:理论篇中,已经介绍了什么是线程池以及基本的使用.(本来写作的思路是使用篇,但经网友建议后,感觉改为理论篇会更加合适).本文则深入线程池的源码,主要是介绍Thr ...
- 【开源项目】动态线程池框架Hippo4j源码解析
动态线程池框架Hippo4j源码解析 项目简介 Hippo-4J 通过对 JDK 线程池增强,以及扩展三方框架底层线程池等功能,为业务系统提高线上运行保障能力. 快速开始 https://hippo4 ...
- 【Android 异步操作】线程池 ( 线程池 execute 方法源码解析 )
文章目录 一.线程池 execute 方法源码解析 二.线程池 execute 方法完整源码及注释 一.线程池 execute 方法源码解析 进入 ThreadPoolExecutor 中 , 查看线 ...
- Java并发系列一《线程池原理》
目录 前言 一.线程池原理 1.线程池创建方式 2. 线程池7大参数 3.执行原理 二.源码分析 1.走进线程池 execute 方法 2.其他.. 三.应用场景 四.总结 前言 本文内容线程池总结, ...
- JUC并发编程之Java线程(二)
二.Java线程 2.1 创建和运行线程 方法一:Thread创建线程方式: 继承Thread类 匿名内部类方式 public class CreateThread01 {public static ...
- java并发编程——线程池的工作原理与源码解读
2019独角兽企业重金招聘Python工程师标准>>> 线程池的简单介绍 基于多核CPU的发展,使得多线程开发日趋流行.然而线程的创建和销毁,都涉及到系统调用,比较消耗系统资源,所以 ...
最新文章
- 目标检测--Beyond Skip Connections: Top-Down Modulation for Object Detection
- java变量设置_配置环境变量
- java compareto方法怎么排序的_深入理解Java中Comparable和Comparator排序
- 计算机英语多层,多层式结构,multi-layer structure,在线英语词典,英文翻译,专业英语...
- 程序员必备的代码审查(Code Review)清单
- python3精要(19)-全局变量global和工厂函数,lambda,变量作用范围,nonlocal
- spring session实现集群中session共享
- CADENCE ORCAD原理图导出FPGA UCF的方法
- html2canvas 截图div_H5快照截图[html2canvas]+图片下载
- P1552-[APIO2012]派遣【左偏树】
- k8s核心技术-Helm(概述)---K8S_Google工作笔记0044
- 支撑位和压力位怎么看是什么意思?
- PHP 照相 滤镜,PHP GIF / PNG True Colorize滤镜,可保留亮度和Alpha
- 突然发现,工作已满四年了
- matlab如何释放内存,怎么能释放已经使用的内存
- Mob研究院 · BAT数据洞察报告
- _id随机的 es_ES再现偷ID事件?仅与阿水ID相差1个字,玩家却释怀,原是系统作梗...
- 微信,世界上最成功的私链
- ssd linux 硬盘备份,SSD最佳备份良伴 群晖3步搞定系统备份
- python3 pygame 黑白棋 翻转棋_Python3 + pygame 实现黑白棋(翻转棋)