java线程池中的Worker解析

上一篇说到java线程池中添加真实的线程都是在Worker对象中完成的。今天看下Worker中是如何进行线程管理的。

上一篇说道coresize和maxsize两个池子的大小后,线程池会更具情况添加线程。添加线程主要依赖方法

addWorker(Runable command)方法,本篇将对addWorker方法进行详细分析。java.util.concurrent.ThreadPoolExecutor#addWorker方法

这里主要看重要的几行

w = new Worker(firstTask);
final Thread t = w.thread;
……if (workerAdded) {
t.start();
workerStarted = true;
}

这里线程实际就是Worker.thread对象。下面看下Worker

 Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}

这里的thread来自在创建ThreadPoolExecutor中传入的ThreadFactory,该工厂方法用来按照模板创建线程。即方法

this.thread = getThreadFactory().newThread(this);

看下默认的工厂newThread(this)方法

public Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}

这里实际将worker对象作为runnable 对象传入进来,最终是new了一个Thread(最外层的worker的this实例),因此

最终t.start方法回调的就是这个传入的worker对象的run方法。所以直接看Worker类的run方法

 private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID = 6138294804551838833L;/** Thread this worker is running in.  Null if factory fails. */final Thread thread;/** Initial task to run.  Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker  */public void run() {runWorker(this);}

run方法又调用了runWorker(this)方法

{Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}

1 将传入worker对象中的thread成员放到一个临时的Runnable task中,然后将引用置位null(即代码中的 w.firstTask)

2 w.unlock方法,即可以中断

3 最重要的部分while循环

while (task != null || (task = getTask()) != null)

task即worker对象构造中传入的runable 即上一篇看到的comman,即真实的任务对象

如果task不为空,则进入后面的task.run方法,直接调用Runable对象的run方法。这里解释一下原因:

整个动作是由工厂的Thread触发的:即工厂newThread出来的线程start方法

3.1  start方法会回调Thread(Runnable r)构造器中r的run方法

3.2  r 实际是一个worker的this引用,因此调用的是worker对象的run方法

3.3  worker的run方法调用了runworker方法,最终到达while循环中的task.run方法

如果task为空,还需要判断getTask()是否为空

getTask方法是从阻塞队列BlockedQueue中去任务,即上一篇中第二个if判断中的queue.offer

getTask方法:

 private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}boolean timed;      // Are workers subject to culling?for (;;) {int wc = workerCountOf(c);timed = allowCoreThreadTimeOut || wc > corePoolSize;if (wc <= maximumPoolSize && ! (timedOut && timed))break;if (compareAndDecrementWorkerCount(c))return null;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

注意最后的代码:

 Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();

这里是一个三目运算符,但是结果都是

workQueue.poll或者workQueue.take

都是出队操作。

接着上面的while循环,下面这块代码主要是判断线程池状态如果再不正常情况下,线程中断

if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try 

然后是真正线程执行的部分

try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} 

这里有三个回调方法

beforeExecute(wt, task);
task.run();
afterExecute(task, thrown);

其中beforeExecute和afterExecute可以在自己的任务中重写这两个方法。

最后在finally中

finally {task = null;w.completedTasks++;w.unlock();}

更新线程完成的数量

while结束后,执行

finally {processWorkerExit(w, completedAbruptly);}

这个processWorkerExit方法,这个方法主要是用来更新线程池中alive的数量

 private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}tryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}}

这里重点就这两句

 completedTaskCount += w.completedTasks;workers.remove(w);

完成数量+1

workers.remove掉w对象

为什么说是更新线程池中alive的数量呢,以为线程池getalivecount方法是这样的

public int getActiveCount() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int n = 0;for (Worker w : workers)if (w.isLocked())++n;return n;} finally {mainLock.unlock();}}

便利workers中的所有worker,因此是remove后alivecount数量减少。

java线程池中的Worker解析相关推荐

  1. Java 线程池中的线程复用是如何实现的?

    前几天,技术群里有个群友问了一个关于线程池的问题,内容如图所示: 关于线程池相关知识可以先看下这篇:为什么阿里巴巴Java开发手册中强制要求线程池不允许使用Executors创建? 那么就来和大家探讨 ...

  2. Java线程池中线程的状态简介

    2019独角兽企业重金招聘Python工程师标准>>> 首先明确一下线程在JVM中的各个状态(JavaCore文件中) 1.死锁,Deadlock(重点关注) 2.执行中,Runna ...

  3. java 等待线程池结束_如何等待java线程池中所有任务完成

    一.等待线程池所有线程完成: 有时候我们需要等待java thread pool中所有任务完成后再做某些操作,如想要等待所有任务完成,仅需调用threadPool.awaitTermination() ...

  4. Java线程池中submit()和execute()方法有什么区别

    两个方法都可以向线程池提交任务,execute()方法的返回类型是void,它定义在Executor接口中,而submit()方法返回有计算结构的Future对象,它定义在ExecutorServic ...

  5. 【Android 异步操作】线程池 ( Worker 简介 | 线程池中的工作流程 runWorker | 从线程池任务队列中获取任务 getTask )

    文章目录 一.线程池中的 Worker ( 工作者 ) 二.线程池中的工作流程 runWorker 三.线程池任务队列中获取任务 getTask 在博客 [Android 异步操作]线程池 ( 线程池 ...

  6. java线程池newfi_Java 线程池中的线程复用是如何实现的?

    前几天,技术群里有个群友问了一个关于线程池的问题,内容如图所示: 那么就来和大家探讨下这个问题,在线程池中,线程会从 workQueue 中读取任务来执行,最小的执行单位就是 Worker,Worke ...

  7. Java 线程池ThreadPoolExecutor的应用与源码解析

    ThreadPoolExecutor 工作原理 假设corePool=5,队列大小为100,maxnumPoolSize为10 向线程池新提交一个任务,会根据ThreadFactory创建一个新的线程 ...

  8. JAVA线程池(ThreadPoolExecutor)源码分析

    JAVA5提供了多种类型的线程池,如果你对这些线程池的特点以及类型不太熟悉或者非常熟悉,请帮忙看看这篇文章(顺便帮忙解决里面存在的问题,谢谢!):     http://xtu-xiaoxin.ite ...

  9. Java 线程池框架核心代码分析--转

    原文地址:http://www.codeceo.com/article/java-thread-pool-kernal.html 前言 多线程编程中,为每个任务分配一个线程是不现实的,线程创建的开销和 ...

  10. Java线程池框架核心代码分析

    前言 多线程编程中,为每个任务分配一个线程是不现实的,线程创建的开销和资源消耗都是很高的.线程池应运而生,成为我们管理线程的利器.Java 通过Executor接口,提供了一种标准的方法将任务的提交过 ...

最新文章

  1. css清除浮动的处理方法
  2. [LeetCode] Wildcard Matching 题解
  3. leetcode 1164 python
  4. thinkphp5 验证码出不来的常见问题
  5. 学会爱,也学会批处理
  6. 介绍Linux系统如何初始化和启动系统服务的
  7. java生成缩略图,接收图片,按指定宽高或按比例生成缩略图
  8. fastdfs返回的url_FastDFS上传文件Demospringboot实现
  9. 系统分析与设计-我爱烤鱼创新过程与UP过程对比分析之我见
  10. 【算法动画图解】:安利一款昨天发现的app
  11. JMeter下载安装以及使用教程
  12. 程序员人生:技术人员的职业发展规划
  13. 移动前端开发需要注意的20个要点
  14. 1g等于多少mb计算机网络,1KB等于多少MB?1G等于多少MB?等于多少kb呢?
  15. 1620:质因数分解
  16. C语言 · 求arccos值
  17. 百度搜索下拉框及百度相关搜索中刷关键字方法
  18. Mybatis-Plus实现乐观锁配置
  19. 2023 《电脑PC游戏》 红警3:起义时刻
  20. 网络爬虫:商品比价定向爬虫

热门文章

  1. 最新!2016中国城市GDP排名出炉
  2. codeforces A. Parity
  3. Acme CAD Converter 命令行模式
  4. 链家网页爬虫_链家房源爬虫(含源码)
  5. 台式计算机上的硬磁盘,如何在台式计算机上安装机械硬盘驱动器?在台式计算机上安装机械硬盘驱动器的详细步骤...
  6. UE4面试基础知识(一)
  7. 5分钟学会使用Excel插入数据统计图
  8. 京东登录注册页面的简单实现——(仿)
  9. 6.18上午CVPR直播 | 清华三维视觉研究团队:三维人体重建与渲染、高精度人脸生成
  10. Java牛客网输入测试用例