文章目录

  • Pre
  • execute源码分析
    • addWorker()解读
    • Worker解读


Pre

[并发编程] - Executor框架#ThreadPoolExecutor源码解读02
说了一堆结论性的东西,作为开发人员着实是不过瘾,那这里我们就来剖根问底来看下线程池是如何工作的。


execute源码分析

        ThreadPoolExecutor te = new ThreadPoolExecutor(5,10,500,TimeUnit.SECONDS,new ArrayBlockingQueue(5));for (int i = 0; i < 6; i++) {te.submit(()->{System.out.println("i m task :"+Thread.currentThread().getName());});}

使用ThreadPoolExecutor 自定义了一个线程池

参数对应如下

int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue

调用了 AbstractExecutorService#submit

   public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}

最核心的方法 execute ,由子类ThredPoolExecutor实现

 public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** clt记录着runState和workerCount*/int c = ctl.get();/** workerCountOf方法取出低29位的值,表示当前活动的线程数;* 如果当前活动线程数小于corePoolSize,则新建一个线程放从入线程池中;* 并把任务添加到该线程中。*/if (workerCountOf(c) < corePoolSize) {/** addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;* 如果为true,根据corePoolSize来判断;* 如果为false,则根据maximumPoolSize来判断*/if (addWorker(command, true))return;/** 如果添加失败,则重新获取ctl值*/c = ctl.get();}/** 如果当前线程池是运行状态并且任务添加到队列成功*/if (isRunning(c) && workQueue.offer(command)) {// 重新获取ctl值int recheck = ctl.get();// 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了,// 这时需要移除该command// 执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回if (! isRunning(recheck) && remove(command))reject(command);/** 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法* 这里传入的参数表示:* 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;* 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;* 如果判断workerCount大于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。*/else if (workerCountOf(recheck) == 0)addWorker(null, false);}/** 如果执行到这里,有两种情况:* 1. 线程池已经不是RUNNING状态;* 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。* 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;* 如果失败则拒绝该任务*/else if (!addWorker(command, false))reject(command);
}

主要的流程,注释中也写的很清楚了

     /** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/

简单来说,在执行execute()方法时如果状态一直是RUNNING时,的执行过程如下:

  1. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
  2. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
  3. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
  4. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

Note : 这里要注意一下addWorker(null, false);,也就是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中获取任务。所以,在workerCountOf(recheck) == 0时执行addWorker(null, false);也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。


addWorker()解读

 private boolean addWorker(Runnable firstTask, boolean core) {}

addWorker方法的主要工作是在线程池中创建一个新的线程并执行,

  • firstTask参数 用于指定新增的线程执行的第一个任务,
  • core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();// 获取运行状态int rs = runStateOf(c);/** 这个if判断* 如果rs >= SHUTDOWN,则表示此时不再接收新任务;* 接着判断以下3个条件,只要有1个不满足,则返回false:* 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务* 2. firsTask为空* 3. 阻塞队列不为空* * 首先考虑rs == SHUTDOWN的情况* 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false;* 然后,如果firstTask为空,并且workQueue也为空,则返回false,* 因为队列中已经没有任务了,不需要再添加线程了*/// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {// 获取线程数int wc = workerCountOf(c);// 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false;// 这里的core是addWorker方法的第二个参数,如果为true表示根据corePoolSize来比较,// 如果为false则根据maximumPoolSize来比较。// if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 尝试增加workerCount,如果成功,则跳出第一个for循环if (compareAndIncrementWorkerCount(c))break retry;// 如果增加workerCount失败,则重新获取ctl的值c = ctl.get();  // Re-read ctl// 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 根据firstTask来创建Worker对象w = new Worker(firstTask);// 每一个Worker对象都会创建一个线程final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int rs = runStateOf(ctl.get());// rs < SHUTDOWN表示是RUNNING状态;// 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。// 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// workers是一个HashSetworkers.add(w);int s = workers.size();// largestPoolSize记录着线程池中出现过的最大线程数量if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {// 启动线程t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

Worker解读

addWorker中多次提到了这个Work这个类, 其实就是 线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象

ThreadPoolExector中内部类 Worker

private final class Worker extends AbstractQueuedSynchronizer  implements Runnable{private static final long serialVersionUID = 6138294804551838833L;final Thread thread;Runnable firstTask;volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker  */public void run() {runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock()        { acquire(1); }public boolean tryLock()  { return tryAcquire(1); }public void unlock()      { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}

继承了AQS,并实现了Runnable接口 ,
两个比较重要的属性

  1. firstTask用它来保存传入的任务;
  2. thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。

在调用构造方法时,需要把任务传入,这里通过getThreadFactory().newThread(this);来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。

Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的

[并发编程] - Executor框架#ThreadPoolExecutor源码解读03相关推荐

  1. [并发编程] - Executor框架#ThreadPoolExecutor源码解读02

    文章目录 Pre 线程池的具体实现 线程池的创建 参数解读 corePoolSize maximumPoolSize keepAliveTime unit workQueue threadFactor ...

  2. [并发编程] - Executor框架#ThreadPoolExecutor源码解读01

    文章目录 Pre Thread Java线程与OS线程 生命状态 状态切换 线程池 why use case Advantage Executor框架 ThreadPoolExecutor 源码分析 ...

  3. Java 并发编程——Executor框架和线程池原理

    Java 并发编程系列文章 Java 并发基础--线程安全性 Java 并发编程--Callable+Future+FutureTask java 并发编程--Thread 源码重新学习 java并发 ...

  4. iOS AOP 框架 - Aspects 源码解读

    Aspects 是什么? Aspects 是 iOS 上的一个轻量级 AOP 库.它利用 method swizzling 技术为已有的类或者实例方法添加额外的代码,它是著名框架 PSPDFKit ( ...

  5. 多线程高并发编程(8) -- Fork/Join源码分析

    一.概念 Fork/Join就是将一个大任务分解(fork)成许多个独立的小任务,然后多线程并行去处理这些小任务,每个小任务处理完得到结果再进行合并(join)得到最终的结果. 流程:任务继承Recu ...

  6. 转: java并发编程-Executor框架

    Executor框架是指java 5中引入的一系列并发库中与executor相关的一些功能类,其中包括线程池,Executor,Executors,ExecutorService,Completion ...

  7. 02.并发编程(2)Thread类源码分析

    概述 在说线程之前先说下进程,进程和线程都是一个时间段的描述,是CPU工作时间段的描述. 进程,是并发执行的程序在执行过程中分配和管理资源的基本单位,是一个动态概念,竟争计算机系统资源的基本单位.每一 ...

  8. java编程executor框架_Java并发编程 - Executor框架(一)Executor,

    1.并发编程的一种编程方式是把任务拆分为一些列的小任务,即Runnable,然后将这些任务提交给一个Executor执行, Executor.execute(Runnalbe) .Executor在执 ...

  9. Java并发编程-Executor框架之Callable和Future接口

    在上一篇文章中我们已经了解了Executor框架进行线程管理,这篇文章将学习Executor框架的另一个特性,我们知道执行Runnable任务是没有返回值得,但Executor可以运行并发任务并获得返 ...

最新文章

  1. python3哪个教程好-Python3 教程
  2. Dynamic CRM 2013学习笔记(一)插件输入实体参数解析
  3. 争做RTC领域标杆——与华为云副总裁薛浩聊聊我们的视频时代
  4. .net一个函数要用另一个函数的值_VLOOKUP函数
  5. 物联网通信技术最全科普!你一定要了解的NB-IoT
  6. 正弦光栅的生成matlab,matlab做正弦光栅衍射的计算机模拟
  7. 手机软件项目管理(1)—软件供应商评判项
  8. 网页特效offset、client、scroll系列属性的作用
  9. JQuery模拟二------添加extend函数和简单选择器
  10. JDBC06 其他操作及批处理Batch
  11. 一个泛型句柄类--C++模板和泛型编程--c++ primer
  12. solidworks迈迪插件_迈迪工具集V55特别PJ版_打包下载
  13. 百度主页被“/?tn=88093251_85_hao_pg“劫持的一种解决办法
  14. mysql复制数据到同一张表
  15. 负反馈与马歇尔的均衡论
  16. Pytorch(gpu),cuda,cudnn安装
  17. 眼界 思维 意识 习惯
  18. 如何免费下载的全球的矢量边界(WGS84)
  19. JAVA练习题:求税后工资问题
  20. Linux 运行vcs仿真命令,VCS使用以及命令行调试

热门文章

  1. php编程习惯,经验分享:PHP编程的5个良好习惯(二)
  2. 计算机主板风扇安装,5个装机注意事项 让你装电脑少走弯路
  3. ubuntu 安装 postgres
  4. 数据挖掘流程(四):建模调参
  5. python 笔记:装饰器
  6. R语言实战应用精讲50篇(十九)-R语言gganimate函数应用案例:静态图变成动态,让你的图表更酷炫
  7. Tableau可视化分析实战系列Tableau基础概念全解析 (一)-数据结构及字段
  8. Selenium爬携程酒店评论+jieba数据分析实战
  9. 增大iphone音量技巧_就算我们把手机音量开到最大!外放声音还是小,那是这个设置没开...
  10. Ubuntu基础知识