[并发编程] - Executor框架#ThreadPoolExecutor源码解读03
文章目录
- Pre
- execute源码分析
- addWorker()解读
- Worker解读
Pre
[并发编程] - Executor框架#ThreadPoolExecutor源码解读02
说了一堆结论性的东西,作为开发人员着实是不过瘾,那这里我们就来剖根问底来看下线程池是如何工作的。
execute源码分析
ThreadPoolExecutor te = new ThreadPoolExecutor(5,10,500,TimeUnit.SECONDS,new ArrayBlockingQueue(5));for (int i = 0; i < 6; i++) {te.submit(()->{System.out.println("i m task :"+Thread.currentThread().getName());});}
使用ThreadPoolExecutor 自定义了一个线程池
参数对应如下
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue
调用了 AbstractExecutorService#submit
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}
最核心的方法 execute ,由子类ThredPoolExecutor实现
public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** clt记录着runState和workerCount*/int c = ctl.get();/** workerCountOf方法取出低29位的值,表示当前活动的线程数;* 如果当前活动线程数小于corePoolSize,则新建一个线程放从入线程池中;* 并把任务添加到该线程中。*/if (workerCountOf(c) < corePoolSize) {/** addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;* 如果为true,根据corePoolSize来判断;* 如果为false,则根据maximumPoolSize来判断*/if (addWorker(command, true))return;/** 如果添加失败,则重新获取ctl值*/c = ctl.get();}/** 如果当前线程池是运行状态并且任务添加到队列成功*/if (isRunning(c) && workQueue.offer(command)) {// 重新获取ctl值int recheck = ctl.get();// 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了,// 这时需要移除该command// 执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回if (! isRunning(recheck) && remove(command))reject(command);/** 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法* 这里传入的参数表示:* 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;* 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;* 如果判断workerCount大于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。*/else if (workerCountOf(recheck) == 0)addWorker(null, false);}/** 如果执行到这里,有两种情况:* 1. 线程池已经不是RUNNING状态;* 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。* 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;* 如果失败则拒绝该任务*/else if (!addWorker(command, false))reject(command);
}
主要的流程,注释中也写的很清楚了
/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/
简单来说,在执行execute()方法时如果状态一直是RUNNING时,的执行过程如下:
- 如果
workerCount < corePoolSize
,则创建并启动一个线程来执行新提交的任务; - 如果
workerCount >= corePoolSize
,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中; - 如果
workerCount >= corePoolSize && workerCount < maximumPoolSize
,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务; - 如果
workerCount >= maximumPoolSize
,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
Note : 这里要注意一下addWorker(null, false);
,也就是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中获取任务。所以,在workerCountOf(recheck) == 0
时执行addWorker(null, false);
也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。
addWorker()解读
private boolean addWorker(Runnable firstTask, boolean core) {}
addWorker方法的主要工作是在线程池中创建一个新的线程并执行,
- firstTask参数 用于指定新增的线程执行的第一个任务,
- core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();// 获取运行状态int rs = runStateOf(c);/** 这个if判断* 如果rs >= SHUTDOWN,则表示此时不再接收新任务;* 接着判断以下3个条件,只要有1个不满足,则返回false:* 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务* 2. firsTask为空* 3. 阻塞队列不为空* * 首先考虑rs == SHUTDOWN的情况* 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false;* 然后,如果firstTask为空,并且workQueue也为空,则返回false,* 因为队列中已经没有任务了,不需要再添加线程了*/// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {// 获取线程数int wc = workerCountOf(c);// 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false;// 这里的core是addWorker方法的第二个参数,如果为true表示根据corePoolSize来比较,// 如果为false则根据maximumPoolSize来比较。// if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 尝试增加workerCount,如果成功,则跳出第一个for循环if (compareAndIncrementWorkerCount(c))break retry;// 如果增加workerCount失败,则重新获取ctl的值c = ctl.get(); // Re-read ctl// 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 根据firstTask来创建Worker对象w = new Worker(firstTask);// 每一个Worker对象都会创建一个线程final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int rs = runStateOf(ctl.get());// rs < SHUTDOWN表示是RUNNING状态;// 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。// 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// workers是一个HashSetworkers.add(w);int s = workers.size();// largestPoolSize记录着线程池中出现过的最大线程数量if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// 启动线程t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}
Worker解读
addWorker中多次提到了这个Work这个类, 其实就是 线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象
ThreadPoolExector中内部类 Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{private static final long serialVersionUID = 6138294804551838833L;final Thread thread;Runnable firstTask;volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */public void run() {runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}
继承了AQS,并实现了Runnable接口 ,
两个比较重要的属性
- firstTask用它来保存传入的任务;
- thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。
在调用构造方法时,需要把任务传入,这里通过getThreadFactory().newThread(this);
来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。
Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的
[并发编程] - Executor框架#ThreadPoolExecutor源码解读03相关推荐
- [并发编程] - Executor框架#ThreadPoolExecutor源码解读02
文章目录 Pre 线程池的具体实现 线程池的创建 参数解读 corePoolSize maximumPoolSize keepAliveTime unit workQueue threadFactor ...
- [并发编程] - Executor框架#ThreadPoolExecutor源码解读01
文章目录 Pre Thread Java线程与OS线程 生命状态 状态切换 线程池 why use case Advantage Executor框架 ThreadPoolExecutor 源码分析 ...
- Java 并发编程——Executor框架和线程池原理
Java 并发编程系列文章 Java 并发基础--线程安全性 Java 并发编程--Callable+Future+FutureTask java 并发编程--Thread 源码重新学习 java并发 ...
- iOS AOP 框架 - Aspects 源码解读
Aspects 是什么? Aspects 是 iOS 上的一个轻量级 AOP 库.它利用 method swizzling 技术为已有的类或者实例方法添加额外的代码,它是著名框架 PSPDFKit ( ...
- 多线程高并发编程(8) -- Fork/Join源码分析
一.概念 Fork/Join就是将一个大任务分解(fork)成许多个独立的小任务,然后多线程并行去处理这些小任务,每个小任务处理完得到结果再进行合并(join)得到最终的结果. 流程:任务继承Recu ...
- 转: java并发编程-Executor框架
Executor框架是指java 5中引入的一系列并发库中与executor相关的一些功能类,其中包括线程池,Executor,Executors,ExecutorService,Completion ...
- 02.并发编程(2)Thread类源码分析
概述 在说线程之前先说下进程,进程和线程都是一个时间段的描述,是CPU工作时间段的描述. 进程,是并发执行的程序在执行过程中分配和管理资源的基本单位,是一个动态概念,竟争计算机系统资源的基本单位.每一 ...
- java编程executor框架_Java并发编程 - Executor框架(一)Executor,
1.并发编程的一种编程方式是把任务拆分为一些列的小任务,即Runnable,然后将这些任务提交给一个Executor执行, Executor.execute(Runnalbe) .Executor在执 ...
- Java并发编程-Executor框架之Callable和Future接口
在上一篇文章中我们已经了解了Executor框架进行线程管理,这篇文章将学习Executor框架的另一个特性,我们知道执行Runnable任务是没有返回值得,但Executor可以运行并发任务并获得返 ...
最新文章
- python3哪个教程好-Python3 教程
- Dynamic CRM 2013学习笔记(一)插件输入实体参数解析
- 争做RTC领域标杆——与华为云副总裁薛浩聊聊我们的视频时代
- .net一个函数要用另一个函数的值_VLOOKUP函数
- 物联网通信技术最全科普!你一定要了解的NB-IoT
- 正弦光栅的生成matlab,matlab做正弦光栅衍射的计算机模拟
- 手机软件项目管理(1)—软件供应商评判项
- 网页特效offset、client、scroll系列属性的作用
- JQuery模拟二------添加extend函数和简单选择器
- JDBC06 其他操作及批处理Batch
- 一个泛型句柄类--C++模板和泛型编程--c++ primer
- solidworks迈迪插件_迈迪工具集V55特别PJ版_打包下载
- 百度主页被“/?tn=88093251_85_hao_pg“劫持的一种解决办法
- mysql复制数据到同一张表
- 负反馈与马歇尔的均衡论
- Pytorch(gpu),cuda,cudnn安装
- 眼界 思维 意识 习惯
- 如何免费下载的全球的矢量边界(WGS84)
- JAVA练习题:求税后工资问题
- Linux 运行vcs仿真命令,VCS使用以及命令行调试
热门文章
- php编程习惯,经验分享:PHP编程的5个良好习惯(二)
- 计算机主板风扇安装,5个装机注意事项 让你装电脑少走弯路
- ubuntu 安装 postgres
- 数据挖掘流程(四):建模调参
- python 笔记:装饰器
- R语言实战应用精讲50篇(十九)-R语言gganimate函数应用案例:静态图变成动态,让你的图表更酷炫
- Tableau可视化分析实战系列Tableau基础概念全解析 (一)-数据结构及字段
- Selenium爬携程酒店评论+jieba数据分析实战
- 增大iphone音量技巧_就算我们把手机音量开到最大!外放声音还是小,那是这个设置没开...
- Ubuntu基础知识