我们的程序里,时常要使用多线程。因此多线程的管理变的尤为重要。ThreadPoolExecutor很好的解决了这一点。本篇文章主要从源码入手,分析ThreadPoolExecutor的原理。

1.标记和构造方法####

和很多状态对象一样,ThreadPoolExecutor也通过一个int的头3位来记录线程池的状态,后面20多位来标记工作线程数量。并且提供通用的位运算接口来获得你所需要的数据。

private static final int RUNNING = -1 << COUNT_BITS;

private static final int SHUTDOWN = 0 << COUNT_BITS;

private static final int STOP = 1 << COUNT_BITS;

private static final int TIDYING = 2 << COUNT_BITS;

private static final int TERMINATED = 3 << COUNT_BITS;

我们先来看下ThreadPoolExecutor的构造方法,这里似乎我们又要老生常谈了,网上已经有很多关于线程池各个参数的介绍了,这里,非墨还是会再说一遍,这样加深一下大家的印象。

public ThreadPoolExecutor(int corePoolSize, //核心线程数量

int maximumPoolSize,//最大线程数量

long keepAliveTime,//存活时间

TimeUnit unit,//存活时间单位

BlockingQueue workQueue,//工作队列

ThreadFactory threadFactory,//线程构造工厂

RejectedExecutionHandler handler//拒绝请求回调

) {

if (corePoolSize < 0 ||

maximumPoolSize <= 0 ||

maximumPoolSize < corePoolSize ||

keepAliveTime < 0)

throw new IllegalArgumentException();

if (workQueue == null || threadFactory == null || handler == null)

throw new NullPointerException();

this.corePoolSize = corePoolSize;

this.maximumPoolSize = maximumPoolSize;

this.workQueue = workQueue;

this.keepAliveTime = unit.toNanos(keepAliveTime);

this.threadFactory = threadFactory;

this.handler = handler;

}

2.执行流程####

按照我们熟知的线程池机制,

1.当请求被post到我们的线程池中,我们的线程池会先生成一个核心线程来执行它

2.当核心线程满了的时候,将会把这个请求放入到我们的工作请求队列workQueue中。

3.如果你提供的队列是一个有界队列的时候,线程池将会判断你的最大线程数是否超过你的核心线程。如果超过核心线程的话,线程池会生成新的线程去执行它。

4.如果这个时候,已经达到了最大线程数,那么线程池将走到拒绝回调

5.如果线程池的最大线程数不大于核心线程数,并且工作队列已满,那么将直接走拒绝回调

实际上这个流程已经在ThreadPoolExecutor.execute方法注释中有详细的说明。即便没有说明,我们也可以从它的代码流程简单看出一些端倪:

//code ThreadPoolExecutor.execute(Runnable)

int c = ctl.get();//获取当前运行线程数

if (workerCountOf(c) < corePoolSize) {//如果当前线程数小于核心线程数

if (addWorker(command, true))

return;

c = ctl.get();

}

if (isRunning(c) && workQueue.offer(command)) {//将请求命令加入到工作队列

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

else if (!addWorker(command, false))//加入到非核心线程中

reject(command);//如果非核心线程没有执行,那么将走拒绝请求回调

3.深入源码###

我们以execute为入口,深入分析一下这个线程池的源码。int c = ctl.get()方法我们暂时不说,后面我们将会补充,我们暂且把它理解为获得一个数量,而这个数量c将会传入到workerCountOf方法中。这个方法名称我们就能知道其用意,就是为了获得当前工作线程数量。

private static int workerCountOf(int c) { return c & CAPACITY; }

上文我们说到,线程池会通过一个int的后几位来记录线程数量,而workerCountOf就是通过位运算来获得当前工作线程数。在获得当前线程数了以后,如果当前线程数小于

corePoolSize的话,将会通过addWorker方法把command加入到工作线程中。addWorker需要提供两个参数,一个是你的command,另外一个boolean量是为了标识是否是往core线程中加。

private boolean addWorker(Runnable firstTask, boolean core) {

retry:

for (;;) {

int c = ctl.get();//获得一个含有状态和数量的值

int rs = runStateOf(c);//获得当前线程池状态

...

for (;;) {//第二个for

int wc = workerCountOf(c);

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))

return false;

if (compareAndIncrementWorkerCount(c))

break retry;

c = ctl.get(); // Re-read ctl

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

}

}

这里,通过上面的代码我们可以清晰的了解ctl变量的存在的目的:

1.首先,当从类型上看clt是一个原子类型,说明它是要支持多线程调用的

2.ctl里面的值需要存储两个信息,一个是线程数量,一个是当前线程池的工作状态。

这时候是否有读者还在纳闷,为什么我的线程数小于我的核心线程数,我往我的线程池里加,还是可能出现加不进去的情况。事实上,“第二个for”循环已经很好的说明了这一点。因为线程池不能保证是同一个线程调用addWorker方法。线程池需要同步过后,才能保证是否是否往核心线程里面加。这就是为什么在ThreadPoolExecutor.execute方法里,在判断完核心线程数量之后,如果失败了,还要再取一次当前线程数的原因。

if (workerCountOf(c) < corePoolSize) {

if (addWorker(command, true))

return;

c = ctl.get();//再取一次

}

好的,我们继续回到"第二个for"。我们可以看出,线程池在同步方面不仅细化了粒度,而且用的是CAS算法。这种算法可以劲量的避免由于sync引起的线程阻塞。

for (;;) {//第二个for

int wc = workerCountOf(c);

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))

return false;

//当线程池数量超过核心线程的时候退出,返回false

if (compareAndIncrementWorkerCount(c))

break retry;//当增加线程成功的时候,跳出循环

c = ctl.get(); // Re-read ctl

if (runStateOf(c) != rs)

continue retry;//状态不一致继续循环

// else CAS failed due to workerCount change; retry inner loop

}

由于我们现在只有一个线程在工作,不存在多线程竞争的情况,因此我们选择跳出循环的逻辑。跳出循环以后,程序将真正意义上的生成一个Worker线程来执行指令。

//code private boolean addWorker(Runnable firstTask, boolean core)

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

w = new Worker(firstTask);//生成一个worker对象

final Thread t = w.thread;

if (t != null) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();//锁定全局锁

try {

// Recheck while holding lock.

// Back out on ThreadFactory failure or if

// shut down before lock acquired.

int rs = runStateOf(ctl.get());//检查线程池状态

if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // precheck that t is startable

throw new IllegalThreadStateException();

workers.add(w);//将Worker线程纳入workers集合对象管理

int s = workers.size();

if (s > largestPoolSize)

largestPoolSize = s;//重新赋值largestPoolSize变量

workerAdded = true;

}

} finally {

mainLock.unlock();

}

if (workerAdded) {

t.start();

workerStarted = true;

}

}

} finally {

if (! workerStarted)

addWorkerFailed(w);

}

return workerStarted;

上面的代码非常简单,线程池将生成一个Worker的线程包装类。不论是是否是核心线程,所有的线程都被纳入到workers集合对象进行管理。如果一切流程都正常workerAdded将为true,Worker里的线程将被启动。启动后Worker将执行线程的run方法,而在run方法中,又调用到Worker的runWorker(Worker)方法:

public void run() {

runWorker(this);

}

runworker是真正的线程执行流程的代码段:

// code runworker

try {

while (task != null || (task = getTask()) != null) {

w.lock();

// If pool is stopping, ensure thread is interrupted;

// if not, ensure thread is not interrupted. This

// requires a recheck in second case to deal with

// shutdownNow race while clearing interrupt

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt();//判断当前线程池状态,

try {

beforeExecute(wt, task);//执行前切面

Throwable thrown = null;

try {

task.run();

} catch (RuntimeException x) {

thrown = x; throw x;

} catch (Error x) {

thrown = x; throw x;

} catch (Throwable x) {

thrown = x; throw new Error(x);

} finally {

afterExecute(task, thrown);//执行后切面

}

} finally {

task = null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

processWorkerExit(w, completedAbruptly);

}

runworker方法中引申出来两个方法beforeExecute和afterExecute。可以通过继承的方式来监控command的执行。相当于在command.run之前和之后切了两个面,是一种面向方面的编程模式。当Task执行完成之后,由于while循环,将再次执行while的判断条件task = getTask()) != null; getTask方法是可能阻塞的,阻塞的时间是根据你在构造线程池的时候设置的超时时间来决定的。

private Runnable getTask() {

boolean timedOut = false; //是否判断超时

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

decrementWorkerCount();

return null;

}

int wc = workerCountOf(c);

// Are workers subject to culling?

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

//allowCoreThreadTimeOut变量用于控制是否让核心线程也进行超时判断

if ((wc > maximumPoolSize || (timed && timedOut))

&& (wc > 1 || workQueue.isEmpty())) {

if (compareAndDecrementWorkerCount(c))

return null;

continue;

}

try {

Runnable r = timed ?//通过timed变量来选择使用poll方法还是take方法

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

workQueue.take();

if (r != null)

return r;

timedOut = true;//如果poll获取的r为空,标记为超时

} catch (InterruptedException retry) {

timedOut = false;

}

}

}

>getTask还是一个循环操作,第一次执行的时候,会通过timed变量来判断是否有超时检查,如果有超时检查的话将调用poll方法。如果poll在规定的时间内并没有获得任何的执行对象,返回的r为null,timedOut将被标记为true。这时候,又再次进入循环。这时候,如果你是非核心线程,是扩展线程的话,那么,if ((wc > maximumPoolSize || (timed && timedOut))这个判断为true,程序将返回一个null。

在runWorker方法中,如果getTask返回的对象为null,runWorker将跳出while循环,执行finally语句:

finally {

processWorkerExit(w, completedAbruptly);

}

>processWorkerExit方法需要传递两个变量,第一个变量是Worker对象,第二个变量是completedAbruptly变量,这个变量是干什么用的呢?因为你的程序跳出可能存在两种情况,一种是正常跳出,一种是异常跳出,如果是异常跳出的话,这个时候你的workercount未必正常的执行decrement操作,因此通过这个变量来标记程序的执行状态。

//code processWorkerExit

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

completedTaskCount += w.completedTasks;

workers.remove(w);

} finally {

mainLock.unlock();

}

mainLock是一个全局锁,主要是为了同步全局的workers变量。上面的代码中,线程池将记录一下task执行数据,并且将worker从workers队列中删除。

这个时候,基本上整个线程池的流程都已经概述完了,当然,我们还确实一个变量,那就是RejectedExecutionHandler类型变量。这个得回到我们的execute方法:

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

else if (!addWorker(command, false))

reject(command);//拒绝请求

>当线程池拒绝请求的时候,将调用reject方法,而reject方法将会回调RejectedExecutionHandler的rejectedExecution方法:

final void reject(Runnable command) {

handler.rejectedExecution(command, this);

}

线程池提供一个默认的拒绝请求回调:

//code ThreadPoolExecutor

private static final RejectedExecutionHandler defaultHandler =

new AbortPolicy();

//code AbortPolicy

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

throw new RejectedExecutionException("Task " + r.toString() +

" rejected from " +

e.toString());

}

也就是采用异常的方式来拒绝请求。

这样,ThreadPoolExecutor的主要源码和结构已经分析完了,当然还有其他的特性和功能需要看官们自己去探索。

-----非墨

java executor 源码_Java线程池ThreadPoolExecutor深度探索及源码解析相关推荐

  1. Java 线程池(ThreadPoolExecutor)原理分析与使用 – 码农网

    线程池的详解 Java 线程池(ThreadPoolExecutor)原理分析与使用 – 码农网 http://www.codeceo.com/article/java-threadpool-exec ...

  2. java线程池执行器_Java线程池ThreadPoolExecutor的使用

    Java线程池ThreadPoolExecutor的使用 ThreadPoolExecutor就是我们用来实现线程的一个执行器,它实现了Excutor和ExecutorService接口.Excuto ...

  3. 从源码学习线程池的使用原理及核心思想解析

    文章内容引用自 咕泡科技 咕泡出品,必属精品 文章目录 1为什么要使用线程池 2几种常用线程池介绍 3从初始化开始 4执行任务execute 5添加线程addWorker 6运行新的线程runWork ...

  4. JAVA8线程池THREADPOOLEXECUTOR底层原理及其源码解析

    小侃一下 1. 使用线程池的好处. 为什么要使用线程池? 2. 线程池核心参数介绍 3. 提交任务到线程池中的流程 3.1 ThreadPoolExecutor#execute方法整体流程 3.2 排 ...

  5. java 线程池 源码_java线程池源码分析

    我们在关闭线程池的时候会使用shutdown()和shutdownNow(),那么问题来了: 这两个方法又什么区别呢? 他们背后的原理是什么呢? 线程池中线程超过了coresize后会怎么操作呢? 为 ...

  6. java低层源码_Java线程池及其底层源码实现分析

    */ Callable接口 && Runnable接口 callable调用call方法 runnable调用run方法 都可以被线程调用,但callable的call方法具有返回值( ...

  7. Java 线程池ThreadPoolExecutor的应用与源码解析

    ThreadPoolExecutor 工作原理 假设corePool=5,队列大小为100,maxnumPoolSize为10 向线程池新提交一个任务,会根据ThreadFactory创建一个新的线程 ...

  8. java线程池拒绝策略_Java线程池ThreadPoolExecutor的4种拒绝策略

    最近在做大批量数据采集转换工作,基础数据在本地但是需要调用网络资源完成数据转换.多方面原因在保证良好运行情况下,最多开5个线程进行网络资源调用.方案是基础数据在数据库分页,循环遍历每一条数据,创建调用 ...

  9. 深入理解线程池(ThreadPoolExecutor)——细致入微的讲源码。

    在上一篇博文<图解线程池原理>中,大体上介绍了线程池的工作原理. 这一篇从源码层面,细致剖析,文章会很长. 如果上篇文章内容没吸收,先看上篇,先易后难嘛. 本文源码是 java 1.8 版 ...

最新文章

  1. php mysql update语句_mysql SELECT FOR UPDATE 语句
  2. 非常好用的一些软件和网站
  3. 拥抱.NET Core,如何开发一个跨平台类库
  4. TimeUnit.SECONDS.sleep()和sleep区别
  5. linux红黑树节点没有数据,真正理解红黑树,真正的(Linux内核里大量用到的数据 -电脑资料...
  6. Oracle触发器6-管理触发器
  7. 【错误】函数调用导致堆栈不对称。原因可能是托管的 PInvoke 签名与非托管的目标签名不匹配。
  8. win7共享20人限制 清除_中控智慧考勤门禁_机器直连软件及清除管理员
  9. C语言打印九九口诀表
  10. 计算机两万字符英语文献翻译,自动化专业相关英文文献加翻译(20000字符).doc
  11. java 网络 序列化_Java网络通信基础系列-Netty序列化
  12. Java内存模型(JMM)详解-可见性volatile
  13. 【人工智能】八皇后问题-启发式求解
  14. spinnaker-简介
  15. 智能除味器——外壳结构部分设计(3D打印)
  16. Excel 批量重命名照片
  17. opencv控制鼠标事件
  18. 双位置继电器HJWS-9440
  19. linux top 指定pid,51CTO博客-专业IT技术博客创作平台-技术成就梦想
  20. win7通过注册表关闭自动睡眠和锁屏

热门文章

  1. qnx slm7.0(程序员开发手册-翻译)
  2. Your ApplicationContext is unlikely tostart due to a @ComponentScan of the defau
  3. vue完整项目,实现即可上岗web前端。
  4. Ruby+Watir安装
  5. Kubernetes — Dashboard
  6. 每日一题(day5)
  7. Git 派生属于自己的分支
  8. 沉没之城中文破解版 v1.0免安装绿色版
  9. 浅谈JAVA程序破解(原创)
  10. 三步装机教程,电脑如何一键安装系统