并发编程之 Executor 线程池原理与源码解读

线程是调度 CPU 资源的最小单位,线程模型分为 KLT 模型与 ULT 模型,JVM使用的是 KLT 模型。java线程与 OS 线程保持 1:1 的映射关系,也就是说有一个 java 线程也会在操作系统里有一个对应的线程。java 线程有多种生命状态:

  • NEW 新建
  • RUNNABLE 运行
  • BLOCKED 阻塞
  • WAITING 等待
  • TIMED_WAITING 超时等待
  • TERMINAIED 终结

1. ThreadPoolExecutor

ThreadPoolExecutor 构造方法

public ThreadPoolExecutor(int corePoolSize,      // 核心线程数int maximumPoolSize,    // 最大线程数 = 核心线程数 + 非核心线程数long keepAliveTime,      // 最大允许线程不干活的时间TimeUnit unit,           // 时间单位BlockingQueue<Runnable> workQueue, // 存放未来得及执行的任务ThreadFactory threadFactory,      // 创建线程的工厂RejectedExecutionHandler handler  // 拒绝策略
) {...}

线程本身没有标记核心和非核心,只是用线程数量进行区分

1.1 线程池运行的流程图

1.2 线程池的状态

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

RUNNING

(1)状态说明:线程池处在 RUNNING 状态时, 能够接收新任务,以及对已添加的任务进行处理

(2)状态切换:线程池的初始化状态是 RUNNING。换句话说,线程一旦被创建,就处于 RUNNING 状态,并且线程池中的任务数为 0 !

SHUDOWN

(1)状态说明:线程池处在 SHUTDOWN 状态时,不接收新任务,但能够处理已添加的任务

(2)状态切换:调用线程池的 shutdown() 接口时,线程池由 RUNNING -> SHUTDOWN

STOP

(1)状态说明:线程池处在 STOP 状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务

(2)状态切换:调用线程池的shutdownNow() 接口时,线程池由(RUNNING or SHUTDOWN)->STOP

TIDYING

(1)状态说明:当所有的任务已终止,ctl 记录的 “任务数量” 为 0,线程池会变为 TIDYING 状态。当线程池变为 TIDYING 状态时,会执行钩子函数 terminated() 。terminated() 再 ThreadPoolExecutor 类中是空的,若用户想在线程池变为 TIDYING 时,进行相应的处理;可以通过重载 terminated() 函数来实现。

(2)状态切换:当线程池在 SHUTDOWN 状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。当线程池在 STOP 状态下,线程池中执行任务为空时,就会由 STOP -> TIDYING

TERMINATED

(1)状态说明:线程池彻底终止,就变成 TERMINATED 状态

(2)状态切换:线程池处在 TIDYING 状态时,执行完 terminated() 之后,就会由 TIDYING -> TERMINATED

进入 TERMINATED 的条件如下:

  • 线程池不是 RUNNING 状态
  • 线程池状态不是 TIDYING 状态或 TERMINATED 状态
  • 如果线程池状态是 SHUTDOWN 并且 workerQueue 为空
  • workerCount 为 0
  • 设置 TIDYING 状态成功

1.3 重要方法源码解读

1.3.1 execute() 方法

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 1. 如果工作的线程数 < 核心线程数,创建核心线程if (workerCountOf(c) < corePoolSize) {// true 表示要创建的线程为核心线程,如果创建成功则返回if (addWorker(command, true))return;// 核心线程创建失败,重新获取 c 的状态c = ctl.get();}// 2. 如果线程池没有关闭,即线程池正在运行,那么将当前任务添加到阻塞队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 再次检查,如果线程池已关闭,那么就移除任务if (! isRunning(recheck) && remove(command))// 拒绝任务reject(command);// 如果线程池的状态是 shutdown 状态else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 3. 创建非核心线程,如果非核心线程数达到上限,触发拒绝策略else if (!addWorker(command, false))reject(command);
}

1.3.2 Worker 类

worker 是继承 AbstractQueuedSynchronizer 类,且实现了 Runnable 接口

private final class Workerextends AbstractQueuedSynchronizerimplements Runnable {...}
成员变量
final Thread thread;
/** Initial task to run.  Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
构造方法
Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;// 使用线程工厂,创建一个新的线程,并将这个 worker 自己作为任务丢给新创建的线程this.thread = getThreadFactory().newThread(this);
}
run() 方法

内部类 Worker 的 run() 方法调用外部类方法 runWorker()

public void run() {runWorker(this);
}

1.3.3 runWorker() 方法(为什么线程可以重用)

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;// 线程刚刚创建,正在创建不允许中断 w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 为什么线程池可以重用?  循环 + 条件// 线程超时处理?while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}// 出现 Exception 时,该行不会执行completedAbruptly = false;} finally {// completedAbruptly = trueprocessWorkerExit(w, completedAbruptly);}
}

如果任务执行出现异常,异常会往外抛出,同时执行如上两个 finally 代码块。第一个 finally 释放锁;第二个 finally清理掉异常任务,并调用 addWorker(null,false)方法,继续从队列中取出下一个任务执行。

private void processWorkerExit(Worker w, boolean completedAbruptly) {// completedAbruptly = true 表明有线程因抛出异常而挂掉,总的线程数量需要减去 1if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;// 从 workers 集合中移除执行完毕的任务(包含挂掉的线程任务)workers.remove(w);} finally {mainLock.unlock();}// 如果符合终止条件,尝试终止线程(清理掉空闲线程)tryTerminate();int c = ctl.get();// 判断当前线程池的生命状态为 RUNNING 或 SHUTDOWN,继续从队列获取任务执行if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}// 继续获取队列中的一下个任务去执行addWorker(null, false);}
}

1.3.4 getTask() 方法

allowCoreThreadTimeOut
  • true 标识核心线程也会过期,过期的核心线程将会被清理(需手动设置)
  • false (默认)标识核心线程不会过期
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {// 状态判断int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?// allowCoreThreadTimeOut 设置为 true,那么核心线程也会过期(需手动设置,默认不过期)boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}// try {Runnable r = timed ?// timed = true  阻塞队列 当队列为空,条件不满足,被阻塞指定时间!// timed = false 阻塞队列 take() 没有时间限制,那么就会一直被阻塞,等在这里workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}

1.3.5 addWorker() 方法

private boolean addWorker(Runnable firstTask, boolean core) {// 判断线程池状态retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 创建一个新的 worker,并将第一个任务丢给它w = new Worker(firstTask);final Thread t = w.thread;// 如果 worker 的成员变量 thread 不为空if (t != null) {// 开始获取锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());// 状态判断if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// 将 worker 放到 HashSet<Worker> workers 集合中去,如果线程被终结需维持引用workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// 启动线程,调用的是 Worker 中的 run() 方法t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

1.4 拒绝策略

如果在任务不允许丢的情况下,可以重写拒绝策略,然后将任务存放在中间件中如 redis。

当阻塞队列已满,就将任务写入redis中,开启一个线程,专门监测阻塞队列使用情况。当阻塞队列占用降低到 50% 时,再将 redis 中存放的任务取出放入阻塞队列中。

处理拒绝策略的 handler,声明为 volatile 是为了在多线程运行环境下,可能会更改拒绝策略,以便通知到其它线程

private volatile RejectedExecutionHandler handler;

默认的拒绝策略,直接往外抛异常

private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();

1.4. 1 AbortPolicy

​ 默认的拒绝策略,直接往外抛异常

public AbortPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());
}

1.4.2 DiscardOldestPolicy

如果线程池没有关闭,将阻塞队列对头的线程丢弃,将本次的 task 推到队尾排队

public static class DiscardOldestPolicy implements RejectedExecutionHandler {public DiscardOldestPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {// 如果线程池没有关闭,将老的对头线程丢弃掉,将 task 放到队尾if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}
}

1.4.3 CallerRunsPolicy

如果线程池拒绝执行本次任务,且线程池没有关闭,直接由当前提交的线程 task 自己去执行,不给线程池执行

public static class CallerRunsPolicy implements RejectedExecutionHandler {public CallerRunsPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {// 如果线程池没有关闭,直接由当前提交的线程 task 自己去执行,不给线程池执行if (!e.isShutdown()) {r.run();}}
}

1.4.4 DiscardPolicy

public static class DiscardPolicy implements RejectedExecutionHandler {public DiscardPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
}

1.5 ThreadPoolExecutor 参数设置

1.5.1 线程数量设置

  • CPU 密集型 CPU 核数 + 1(计算任务较多,CPU 没有等待时间,一直在计算任务)

  • IO 密集型 2 * CPU 核数 + 1(IO读写等待时间长,充分利用 CPU 时间片,等待时间可执行更多的 IO 操作)

rocketMQ、Eureka、Nacos 2 * CPU

1.5.2 最佳线程数

最佳线程数 = CPU 核数 * [1 + ( I/O 耗时 / CPU 耗时 ) ]

2. ScheduledThreadPoolExecutor

2.1 定时线程池的类结构图

并发编程之 Executor 线程池原理与源码解读相关推荐

  1. 并发编程之Executor线程池原理与源码解读

    1. 线程池 "线程池",顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不 仅会消耗系统资源,还会降低系统的稳定性,因此Java中提供线程池对线程进行统一分配. 调 ...

  2. Java并发编程之Java线程池

    Java线程池: 线程池的核心配置参数: //线程等待任务的超时时间,当线程池的线程个数超过corePoolSize时生效,当线程等待任务的时间超过keepAliveTime时,线程池会停止超过cor ...

  3. 【Java 并发编程】线程池机制 ( 线程池执行任务细节分析 | 线程池执行 execute 源码分析 | 先创建核心线程 | 再放入阻塞队列 | 最后创建非核心线程 )

    文章目录 一.线程池执行任务细节分析 二.线程池执行 execute 源码分析 一.线程池执行任务细节分析 线程池执行细节分析 : 核心线程数 101010 , 最大小成熟 202020 , 非核心线 ...

  4. Java线程池状态判断源码_深入浅出Java线程池:源码篇

    前言 在上一篇文章深入浅出Java线程池:理论篇中,已经介绍了什么是线程池以及基本的使用.(本来写作的思路是使用篇,但经网友建议后,感觉改为理论篇会更加合适).本文则深入线程池的源码,主要是介绍Thr ...

  5. 【开源项目】动态线程池框架Hippo4j源码解析

    动态线程池框架Hippo4j源码解析 项目简介 Hippo-4J 通过对 JDK 线程池增强,以及扩展三方框架底层线程池等功能,为业务系统提高线上运行保障能力. 快速开始 https://hippo4 ...

  6. 【Android 异步操作】线程池 ( 线程池 execute 方法源码解析 )

    文章目录 一.线程池 execute 方法源码解析 二.线程池 execute 方法完整源码及注释 一.线程池 execute 方法源码解析 进入 ThreadPoolExecutor 中 , 查看线 ...

  7. Java并发系列一《线程池原理》

    目录 前言 一.线程池原理 1.线程池创建方式 2. 线程池7大参数 3.执行原理 二.源码分析 1.走进线程池 execute 方法 2.其他.. 三.应用场景 四.总结 前言 本文内容线程池总结, ...

  8. JUC并发编程之Java线程(二)

    二.Java线程 2.1 创建和运行线程 方法一:Thread创建线程方式: 继承Thread类 匿名内部类方式 public class CreateThread01 {public static ...

  9. java并发编程——线程池的工作原理与源码解读

    2019独角兽企业重金招聘Python工程师标准>>> 线程池的简单介绍 基于多核CPU的发展,使得多线程开发日趋流行.然而线程的创建和销毁,都涉及到系统调用,比较消耗系统资源,所以 ...

最新文章

  1. 目标检测--Beyond Skip Connections: Top-Down Modulation for Object Detection
  2. java变量设置_配置环境变量
  3. java compareto方法怎么排序的_深入理解Java中Comparable和Comparator排序
  4. 计算机英语多层,多层式结构,multi-layer structure,在线英语词典,英文翻译,专业英语...
  5. 程序员必备的代码审查(Code Review)清单
  6. python3精要(19)-全局变量global和工厂函数,lambda,变量作用范围,nonlocal
  7. spring session实现集群中session共享
  8. CADENCE ORCAD原理图导出FPGA UCF的方法
  9. html2canvas 截图div_H5快照截图[html2canvas]+图片下载
  10. P1552-[APIO2012]派遣【左偏树】
  11. k8s核心技术-Helm(概述)---K8S_Google工作笔记0044
  12. 支撑位和压力位怎么看是什么意思?
  13. PHP 照相 滤镜,PHP GIF / PNG True Colorize滤镜,可保留亮度和Alpha
  14. 突然发现,工作已满四年了
  15. matlab如何释放内存,怎么能释放已经使用的内存
  16. Mob研究院 · BAT数据洞察报告
  17. _id随机的 es_ES再现偷ID事件?仅与阿水ID相差1个字,玩家却释怀,原是系统作梗...
  18. 微信,世界上最成功的私链
  19. ssd linux 硬盘备份,SSD最佳备份良伴 群晖3步搞定系统备份
  20. python3 pygame 黑白棋 翻转棋_Python3 + pygame 实现黑白棋(翻转棋)

热门文章

  1. 中级口译口试心得(转)
  2. Java猜数小游戏、c语言猜数小游戏
  3. Oracle 数据库损坏恢复
  4. Android系统设置单双卡
  5. 单点登录服务Authelia(下篇)
  6. Android仿QQ登录下拉历史列表
  7. 一次手工注入waf [转载]
  8. Spring boot 线程池之单线程问题
  9. 绿色软件:飞鸽传书使用指南
  10. 计网实验c/c++、网络嗅探器的设计与实现