1. 前言

线程池是JAVA开发中最常使用的池化技术之一,可以减少线程资源的重复创建与销毁造成的开销。

2. 灵魂拷问:怎么做到线程重复利用?

很多同学会联想到连接池,理所当然的说:需要的时候从池中取出线程,执行完任务再放回去。

如何用代码实现呢?

此时就会发现,调用线程的start方法后,生命周期就不由父线程直接控制了。线程的run方法执行完成就销毁了,所谓的“取出”和“放回”只不过是想当然的操作。

这里先说答案:生产者消费者模型

3. ThreadPoolExecutor的实现

image

3.1 结构

首先看下ThreadPoolExecutor的继承结构

顶级接口是Executor,定义execute方法

ExecutorService添加了submit方法,支持返回future获取执行结果,以及线程池运行状态的相关方法

本文着重讲线程池的执行流程,因此将暂时忽略线程池的状态相关的代码,也建议新手看源码时从核心流程看起。

3.2 核心方法:execute()

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

int c = ctl.get();

// 判断是否小于核心线程数

if (workerCountOf(c) < corePoolSize) {

//添加worker,添加成功则退出

if (addWorker(command, true))

return;

c = ctl.get();

}

// 核心线程数已用完则放入队列

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

// 双重检查,避免入队完成后,所有线程已销毁,导致没有消费者消费当前任务

if (! isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

// 队列已满则开启非核心线程,达到最大线程数则使用拒绝策略

else if (!addWorker(command, false))

reject(command);

}

execute方法就是一个生产的过程,主要分为开启线程和入队

开启线程会传入command(即当前任务),开启的线程会立即消费该任务

入队的任务则会由Worker消费

主要关注corePoolSize,maximumPoolSize,queueSize(任务队列长度),workerCount(当前worker数量)这几个参数,可以总结为以下:

已满

未满

操作

corePoolSize

开启核心线程

corePoolSize

queueSize

入队

queueSize

maximumPoolSize

开启非核心线程

maximumPoolSize

拒绝

3.3 消费者:Worker

image

Worker类实现Runnable接口,继承AQS,主要先关注thread和firstTask两个属性和run方法

Worker(Runnable firstTask) {

setState(-1);

this.firstTask = firstTask;

this.thread = getThreadFactory().newThread(this);

}

从Worker的构造方法可以看出,thread就是线程池中真正消费任务的线程,创建时会传入this(即Worker对象),而Worker实现了Runnable,因此线程运行时就是执行了Worker的run方法。

final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock();

boolean completedAbruptly = true;

try {

// getTask会阻塞,因此不会造成cpu飙高

while (task != null || (task = getTask()) != null) {

// ···

try {

beforeExecute(wt, task);

Throwable thrown = null;

try {

// 执行传入的Runnable

task.run();

} catch (RuntimeException x) {

thrown = x; throw x;

} catch (Error x) {

thrown = x; throw x;

} catch (Throwable x) {

thrown = x; throw new Error(x);

} finally {

afterExecute(task, thrown);

}

} finally {

// 修改为null,否则下次循环不会调用getTask

task = null;

// ···

}

}

completedAbruptly = false;

} finally {

processWorkerExit(w, completedAbruptly);

}

}

再来看run方法,直接调用了ThreadPoolExecutor的runWorker方法,runWorker中有一个while循环,循环体执行了task.run方法

task首先会获取Worker中的firstTask属性,并将其置为null,因此firstTask只会执行一次,后续task将通过getTask方法获取。

因此Worker的工作流程可以概括为:消费完Worker的firstTask后,循环执行getTask获取任务并消费,获取不到task时,就退出循环,线程销毁。

此处便可以看出生产者消费者模型了。

private Runnable getTask() {

boolean timedOut = false;

for (;;) {

int c = ctl.get();

// ···

int wc = workerCountOf(c);

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))

&& (wc > 1 || workQueue.isEmpty())) {

// 尝试减少计数,失败则会continue循环重试

if (compareAndDecrementWorkerCount(c))

// 此处返回null,runWorker将退出循环

return null;

continue;

}

try {

Runnable r = timed ?

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

workQueue.take();

if (r != null)

return r;

timedOut = true;

} catch (InterruptedException retry) {

timedOut = false;

}

}

}

runWorker方法退出循环的条件是getTask返回null。

观察getTask,只有同时满足以下情况时才会返回null

条件

解读

1

wc > maximumPoolSize || (timed && timedOut)

workQueue.poll方法超时

2

wc > 1 || workQueue.isEmpty()

队列任务全部执行完

3

compareAndDecrementWorkerCount(c)

cas减少workerCount成功

返回的task是通过workQueue.poll和workQueue.take得到的

两者执行时线程均会挂起,直至workQueue中有新的任务

不同之处在于poll方法阻塞keepAliveTime时间后会自动唤醒并返回null,此时timeOut置为true,即满足条件1,随后继续循环,重复检查是否大于核心线程数且队列为空,是则尝试减少workerCount并退出循环

private boolean addWorker(Runnable firstTask, boolean core) {

retry:

for (;;) {

int c = ctl.get();

// ···

for (;;) {

int wc = workerCountOf(c);

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))

return false;

if (compareAndIncrementWorkerCount(c))

break retry;

c = ctl.get();

// ···

}

}

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

w = new Worker(firstTask);

final Thread t = w.thread;

if (t != null) {

// ···

if (workerAdded) {

t.start();

workerStarted = true;

}

}

} finally {

// ···

}

return workerStarted;

}

了解了Worker之后,再来看execute中调用的addWorker方法

方法有两个参数,firstTask即为Worker的firstTask,core则标记需要新增的是否是核心线程

retry循环与线程池状态相关,内层for循环则是重复尝试cas增加线程,若大于corePoolSize或者maximumPoolSize则新增失败,cas成功后,new一个Worker并start

3.4 总结

image

回到最初的问题,线程是如何做到重复利用的?

并不存在取出线程使用完再归还的操作,线程启动后进入循环,主动获取任务执行,退出循环则线程销毁。

execute方法控制新增Worker和任务入队

附:手写简易线程池

public class MyThreadPool implements Executor {

private int corePoolSize;

private int maximumPoolSize;

private BlockingQueue queue;

//记录当前工作线程数

private AtomicInteger count;

private long keepAliveTime;

private RejectHandler rejectHandler;

public MyThreadPool(int corePoolSize, int maximumPoolSize, BlockingQueue queue, long keepAliveTime, RejectHandler rejectHandler) {

this.corePoolSize = corePoolSize;

this.maximumPoolSize = maximumPoolSize;

this.queue = queue;

this.keepAliveTime = keepAliveTime;

this.rejectHandler = rejectHandler;

count = new AtomicInteger(0);

}

@Override

public void execute(Runnable task) {

int ct = count.get();

//核心线程数未满,尝试增加核心线程

if (ct < corePoolSize && count.compareAndSet(ct, ct + 1)) {

new Worker(task).start();

return;

}

//入队

if (queue.offer(task)) {

return;

}

//重新获取一遍count,否则如果在core分支cas失败,此处必然也失败

ct = count.get();

//队列已满,尝试增加非核心线程

if (ct < maximumPoolSize && count.compareAndSet(ct, ct + 1)) {

new Worker(task).start();

return;

}

//已达最大线程数,拒绝

rejectHandler.reject(task);

}

class Worker extends Thread {

Runnable firstTask;

public Worker(Runnable firstTask) {

this.firstTask = firstTask;

}

@Override

public void run() {

Runnable task = firstTask;

firstTask = null;

while (true) {

try {

//getTask会阻塞

if (task != null || (task = getTask()) != null) {

task.run();

} else {

//getTask超时才会进入,直接退出,线程销毁

break;

}

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

//置空,否则不能getTask

task = null;

}

}

}

}

Runnable getTask() throws InterruptedException {

//标记是否超时过

boolean timedOut = false;

while (true) {

int ct = count.get();

//超出核心线程数才进入超时逻辑,即使timeOut由于线程poll超时过一次变成true,执行到这里如果不超出corePoolSize,可以再次进入take分支

if (ct > corePoolSize) {

//超出核心线程数

if (timedOut) {

//已超时过,尝试减少工作线程数,失败会continue,然后重新比较corePoolSize,重试减少线程数

if (count.compareAndSet(ct, ct - 1)) {

return null;

} else {

continue;

}

}

Runnable task = queue.poll(keepAliveTime, TimeUnit.MILLISECONDS);

if (task == null) {

//poll超时才进入

timedOut = true;

continue;

}

return task;

} else {

//必然能获取到task

return queue.take();

}

}

}

public static interface RejectHandler {

void reject(Runnable r);

}

public static void main(String[] args) {

MyThreadPool pool = new MyThreadPool(2, 5, new LinkedBlockingQueue<>(100), 2000, r -> {

System.out.println(r + ": reject");

});

for (int i = 0; i < 3; i++) {

final int x = i;

new Thread(() -> {

for (int j = 0; j < 5; j++) {

final int y = j;

pool.execute(() -> {

try {

Thread.sleep(3000L);

} catch (InterruptedException e) {

e.printStackTrace();

}

LocalDateTime now = LocalDateTime.now();

System.out.println(String.format("线程i=%s, j=%s,执行结束: %s", x, y, now.format(DateTimeFormatter.ISO_DATE_TIME)));

});

}

}).start();

}

}

}

java 池化_溯本求源: JAVA线程池工作原理相关推荐

  1. 双线性池化_卷积神经网络中的各种池化操作

    池化操作(Pooling)是CNN中非常常见的一种操作,Pooling层是模仿人的视觉系统对数据进行降维,池化操作通常也叫做子采样(Subsampling)或降采样(Downsampling),在构建 ...

  2. winform判断线程有没有完成_并发编程系列1:线程池的架构实现、大小配置、及四种线程池使用...

    △ 公众号回复关键词"架构" 即可领取<1500+BAT架构及面试专题合集> 本篇为线程池系列文章之一,不经常使用线程池的童鞋,还有对几种线程的使用不甚了解的童鞋,可以 ...

  3. tomcat 查看当前请求数_原生线程池这么强大,Tomcat 为何还需扩展线程池?

    前言 Tomcat/Jetty 是目前比较流行的 Web 容器,两者接受请求之后都会转交给线程池处理,这样可以有效提高处理的能力与并发度.JDK 提高完整线程池实现,但是 Tomcat/Jetty 都 ...

  4. 项目使用线程池_并发编程系列1:线程池的架构实现、大小配置、及四种线程池使用...

    △ 公众号回复关键词"架构" 即可领取<1500+BAT架构及面试专题合集> 本篇为线程池系列文章之一,不经常使用线程池的童鞋,还有对几种线程的使用不甚了解的童鞋,可以 ...

  5. java 利特尔法则_Java Web应用中调优线程池的重要性

    不论你是否关注,Java Web应用都或多或少的使用了线程池来处理请求.线程池的实现细节可能会被忽视,但是有关于线程池的使用和调优迟早是需要了解的.本文主要介绍Java线程池的使用和如何正确的配置线程 ...

  6. Java基础巩固(二)异常,多线程,线程池,IO流,Properties集合,IO工具类,字符流,对象流,Stream,Lambda表达式

    一.异常,多线程 学习目标 : 异常的概述 异常的分类 异常的处理方式 自定义异常 多线程入门 1 异常的概述 1.1 什么是异常? 异常就是程序出现了不正常情况 , 程序在执行过程中 , 数据导致程 ...

  7. java线程池拒绝策略_Java核心知识 多线程并发 线程池原理(二十三)

    线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后 启动这些任务,如果线程数量超过了最大数量超出数量的线程排队等候,等其它线程执行完毕, 再从队列中取出任务来执行.他 ...

  8. java线程池应用的好处_java高级应用:线程池全面解析

    什么是线程池? 很简单,简单看名字就知道是装有线程的池子,我们可以把要执行的多线程交给线程池来处理,和连接池的概念一样,通过维护一定数量的线程池来达到多个线程的复用. 线程池的好处 我们知道不用线程池 ...

  9. 一起学JAVA之【基础篇】4种默认线程池介绍

    一起学JAVA之[基础篇]4种默认线程池介绍 默认线程池创建方式 java.util.concurrent 提供了一个创建线程池的工具类Executors,里面有四种常用的线程池创建方法 public ...

最新文章

  1. 实现HTTP协议Get、Post和文件上传功能——使用WinHttp接口实现
  2. @EnableConfigurationProperties 注解和@ConfigurationProperties注解实现配置绑定
  3. ejb+jpa_使用Arquillian(包括JPA,EJB,Bean验证和CDI)测试Java EE 6
  4. linux远程虚拟桌面,2020-07-23 Linux 远程连接虚拟桌面
  5. Project Life Cycle
  6. 跨浏览器的placehold
  7. 堆与优先队列课内模板
  8. 为什么领导们总是劝大家不要只盯着工资?
  9. 系统开机 linux 时间不对,linux下查看系统运行时间和最近一次的开机启动时间
  10. 【开发】 eclipse汉化包
  11. Android 10 深色模式适配
  12. CPU 性能优化的几个思路
  13. 16本版式设计书籍推荐(附PDF链接)设计从业人员必备
  14. C语言字母区分大写,C语言中不区分大小写英文字母。()
  15. Vue3实现中英文切换
  16. 操作系统课程项目 OS project —— Pintos from Project 1 to Project 3
  17. 服务器2003共享文件夹,2003服务器共享文件夹
  18. 笔记《基于无人驾驶方程式赛车的传感器融合目标检测算法研究及实现》
  19. 图像特征(二)——形状特征(主轮廓特征、区域特征、图像的矩及Hu矩)
  20. 适合普通大众、非科班的路线

热门文章

  1. Loj #6060. 「2017 山东一轮集训 Day1 / SDWC2018 Day1」Set
  2. 【Flask】Flask常用信号
  3. Winrunner与QTP
  4. Android 文件的上传
  5. FLL - C++与VFP 双向混合编程
  6. x == (x = y) 不等于 (x = y) == x ?
  7. 装NOILinux的奇妙经历
  8. 项目发布Debug和Release版的区别
  9. 必须在构造函数基/成员初始值设定项列表中初始化
  10. 安装Was liberty之步骤