点击关注公众号,实用技术文章及时了解

来源:blog.csdn.net/u013332124/article/details/79587436

原理概述

其实java线程池的实现原理很简单,说白了就是一个线程集合workerSet和一个阻塞队列workQueue。当用户向线程池提交一个任务(也就是线程)时,线程池会先将任务放入workQueue中。

workerSet中的线程会不断的从workQueue中获取线程然后执行。当workQueue中没有任务的时候,worker就会阻塞,直到队列中有任务了就取出来继续执行。

线程池的几个主要参数的作用

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
  • corePoolSize: 规定线程池有几个线程(worker)在运行。

  • maximumPoolSize: 当workQueue满了,不能添加任务的时候,这个参数才会生效。规定线程池最多只能有多少个线程(worker)在执行。

  • keepAliveTime: 超出corePoolSize大小的那些线程的生存时间,这些线程如果长时间没有执行任务并且超过了keepAliveTime设定的时间,就会消亡。

  • unit: 生存时间对于的单位

  • workQueue: 存放任务的队列

  • threadFactory: 创建线程的工厂

  • handler: 当workQueue已经满了,并且线程池线程数已经达到maximumPoolSize,将执行拒绝策略。

任务提交后的流程分析

用户通过submit提交一个任务。线程池会执行如下流程:

  • 判断当前运行的worker数量是否超过corePoolSize,如果不超过corePoolSize。就创建一个worker直接执行该任务。—— 线程池最开始是没有worker在运行的

  • 如果正在运行的worker数量超过或者等于corePoolSize,那么就将该任务加入到workQueue队列中去。

  • 如果workQueue队列满了,也就是offer方法返回false的话,就检查当前运行的worker数量是否小于maximumPoolSize,如果小于就创建一个worker直接执行该任务。

  • 如果当前运行的worker数量是否大于等于maximumPoolSize,那么就执行RejectedExecutionHandler来拒绝这个任务的提交。

源码解析

我们先来看一下ThreadPoolExecutor中的几个关键属性。

//这个属性是用来存放 当前运行的worker数量以及线程池状态的
//int是32位的,这里把int的高3位拿来充当线程池状态的标志位,后29位拿来充当当前运行worker的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//存放任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
//worker的集合,用set来存放
private final HashSet<Worker> workers = new HashSet<Worker>();
//历史达到的worker数最大值
private int largestPoolSize;
//当队列满了并且worker的数量达到maxSize的时候,执行具体的拒绝策略
private volatile RejectedExecutionHandler handler;
//超出coreSize的worker的生存时间
private volatile long keepAliveTime;
//常驻worker的数量
private volatile int corePoolSize;
//最大worker的数量,一般当workQueue满了才会用到这个参数
private volatile int maximumPoolSize;

1. 提交任务相关源码

下面是execute方法的源码

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();//workerCountOf(c)会获取当前正在运行的worker数量if (workerCountOf(c) < corePoolSize) {//如果workerCount小于corePoolSize,就创建一个worker然后直接执行该任务if (addWorker(command, true))return;c = ctl.get();}//isRunning(c)是判断线程池是否在运行中,如果线程池被关闭了就不会再接受任务//后面将任务加入到队列中if (isRunning(c) && workQueue.offer(command)) {//如果添加到队列成功了,会再检查一次线程池的状态int recheck = ctl.get();//如果线程池关闭了,就将刚才添加的任务从队列中移除if (! isRunning(recheck) && remove(command))//执行拒绝策略reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}//如果加入队列失败,就尝试直接创建worker来执行任务else if (!addWorker(command, false))//如果创建worker失败,就执行拒绝策略reject(command);
}

添加worker的方法addWorker源码

private boolean addWorker(Runnable firstTask, boolean core) {retry://使用自旋+cas失败重试来保证线程竞争问题for (;;) {//先获取线程池的状态int c = ctl.get();int rs = runStateOf(c);// 如果线程池是关闭的,或者workQueue队列非空,就直接返回false,不做任何处理if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);//根据入参core 来判断可以创建的worker数量是否达到上限,如果达到上限了就拒绝创建workerif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//没有的话就尝试修改ctl添加workerCount的值。这里用了cas操作,如果失败了下一个循环会继续重试,直到设置成功if (compareAndIncrementWorkerCount(c))//如果设置成功了就跳出外层的那个for循环break retry;//重读一次ctl,判断如果线程池的状态改变了,会再重新循环一次c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {final ReentrantLock mainLock = this.mainLock;//创建一个worker,将提交上来的任务直接交给workerw = new Worker(firstTask);final Thread t = w.thread;if (t != null) {//加锁,防止竞争mainLock.lock();try {int c = ctl.get();int rs = runStateOf(c);//还是判断线程池的状态if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {//如果worker的线程已经启动了,会抛出异常if (t.isAlive()) throw new IllegalThreadStateException();//添加新建的worker到线程池中workers.add(w);int s = workers.size();//更新历史worker数量的最大值if (s > largestPoolSize)largestPoolSize = s;//设置新增标志位workerAdded = true;}} finally {mainLock.unlock();}//如果worker是新增的,就启动该线程if (workerAdded) {t.start();//成功启动了线程,设置对应的标志位workerStarted = true;}}} finally {//如果启动失败了,会触发执行相应的方法if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

2. Worker的结构

Worker是ThreadPoolExecutor内部定义的一个内部类。我们先看一下Worker的继承关系

private final class Worker extends AbstractQueuedSynchronizer implements Runnable

它实现了Runnable接口,所以可以拿来当线程用。同时它还继承了AbstractQueuedSynchronizer同步器类,主要用来实现一个不可重入的锁。

一些属性还有构造方法:

//运行的线程,前面addWorker方法中就是直接通过启动这个线程来启动这个worker
final Thread thread;
//当一个worker刚创建的时候,就先尝试执行这个任务
Runnable firstTask;
//记录完成任务的数量
volatile long completedTasks;
Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;//创建一个Thread,将自己设置给他,后面这个thread启动的时候,也就是执行worker的run方法this.thread = getThreadFactory().newThread(this);
}

worker的run方法

public void run() {//这里调用了ThreadPoolExecutor的runWorker方法runWorker(this);
}

ThreadPoolExecutor的runWorker方法

final void runWorker(Worker w) {//获取当前线程Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;//执行unlock方法,允许其他线程来中断自己w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//如果前面的firstTask有值,就直接执行这个任务//如果没有具体的任务,就执行getTask()方法从队列中获取任务//这里会不断执行循环体,除非线程中断或者getTask()返回null才会跳出这个循环while (task != null || (task = getTask()) != null) {//执行任务前先锁住,这里主要的作用就是给shutdown方法判断worker是否在执行中的//shutdown方法里面会尝试给这个线程加锁,如果这个线程在执行,就不会中断它w.lock();//判断线程池状态,如果线程池被强制关闭了,就马上退出if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//执行任务前调用。预留的方法,可扩展beforeExecute(wt, task);Throwable thrown = null;try {//真正的执行任务task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//执行任务后调用。预留的方法,可扩展afterExecute(task, thrown);}} finally {task = null;//记录完成的任务数量w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}

下面来看一下getTask()方法,这里面涉及到keepAliveTime的使用,从这个方法我们可以看出先吃池是怎么让超过corePoolSize的那部分worker销毁的。

private Runnable getTask() {boolean timedOut = false; for (;;) {int c = ctl.get();int rs = runStateOf(c);// 如果线程池已经关闭了,就直接返回null,//如果这里返回null,调用的那个worker就会跳出while循环,然后执行完销毁线程//SHUTDOWN状态表示执行了shutdown()方法//STOP表示执行了shutdownNow()方法if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}//获取当前正在运行中的worker数量int wc = workerCountOf(c);// 如果设置了核心worker也会超时或者当前正在运行的worker数量超过了corePoolSize,就要根据时间判断是否要销毁线程了//其实就是从队列获取任务的时候要不要设置超时间时间,如果超过这个时间队列还没有任务进来,就会返回nullboolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果上一次循环从队列获取到的未null,这时候timedOut就会为true了if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {//通过cas来设置WorkerCount,如果多个线程竞争,只有一个可以设置成功//最后如果没设置成功,就进入下一次循环,说不定下一次worker的数量就没有超过corePoolSize了,也就不用销毁worker了if (compareAndDecrementWorkerCount(c))return null;continue;}try {//如果要设置超时时间,就设置一下咯//过了这个keepAliveTime时间还没有任务进队列就会返回null,那worker就会销毁Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;//如果r为null,就设置timedOut为truetimedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}

3. 添加Callable任务的实现源码

public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;
}

要添加一个有返回值的任务的实现也很简单。其实就是对任务做了一层封装,将其封装成Future,然后提交给线程池执行,最后返回这个future。

这里的 newTaskFor(task) 方法会将其封装成一个FutureTask类。

外部的线程拿到这个future,执行get()方法的时候,如果任务本身没有执行完,执行线程就会被阻塞,直到任务执行完。

下面是FutureTask的get方法

public V get() throws InterruptedException, ExecutionException {int s = state;//判断状态,如果任务还没执行完,就进入休眠,等待唤醒if (s <= COMPLETING)s = awaitDone(false, 0L);//返回值return report(s);
}

FutureTask中通过一个state状态来判断任务是否完成。当run方法执行完后,会将state状态置为完成,同时唤醒所有正在等待的线程。我们可以看一下FutureTask的run方法

public void run() {//判断线程的状态if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {//执行call方法result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)//这个方法里面会设置返回内容,并且唤醒所以等待中的线程set(result);}} finally {runner = null;int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}

4. shutdown和shutdownNow方法的实现

shutdown方法会将线程池的状态设置为SHUTDOWN,线程池进入这个状态后,就拒绝再接受任务,然后会将剩余的任务全部执行完

public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//检查是否可以关闭线程checkShutdownAccess();//设置线程池状态advanceRunState(SHUTDOWN);//尝试中断workerinterruptIdleWorkers();//预留方法,留给子类实现onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();
}private void interruptIdleWorkers() {interruptIdleWorkers(false);
}private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//遍历所有的workerfor (Worker w : workers) {Thread t = w.thread;//先尝试调用w.tryLock(),如果获取到锁,就说明worker是空闲的,就可以直接中断它//注意的是,worker自己本身实现了AQS同步框架,然后实现的类似锁的功能//它实现的锁是不可重入的,所以如果worker在执行任务的时候,会先进行加锁,这里tryLock()就会返回falseif (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();}
}

shutdownNow做的比较绝,它先将线程池状态设置为STOP,然后拒绝所有提交的任务。最后中断左右正在运行中的worker,然后清空任务队列。

public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();//检测权限advanceRunState(STOP);//中断所有的workerinterruptWorkers();//清空任务队列tasks = drainQueue();} finally {mainLock.unlock();}tryTerminate();return tasks;
}private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//遍历所有worker,然后调用中断方法for (Worker w : workers)w.interruptIfStarted();} finally {mainLock.unlock();}
}

总结

java线程池的实现原理还是挺简单的。但是有一些细节还是需要去看源码才能得出答案。另外,附送学习资源:Java进阶视频资源

本文也没办法把所有的源码都讲解一遍,只列了比较重要的一些源码。有兴趣的同学可以自己打开源码好好看一下,肯定会对实现原理了解的更加深刻。

最后,如果本文有哪里说的不对或者遗漏的地方,也烦请指出,感激不尽。

推荐:

主流Java进阶技术(学习资料分享)

PS:因为公众号平台更改了推送规则,如果不想错过内容,记得读完点一下“在看”,加个“星标”,这样每次新文章推送才会第一时间出现在你的订阅列表里。点“在看”支持我们吧!

Java线程池的实现原理,你清楚么?相关推荐

  1. 深入源码分析Java线程池的实现原理

    转载自   深入源码分析Java线程池的实现原理 程序的运行,其本质上,是对系统资源(CPU.内存.磁盘.网络等等)的使用.如何高效的使用这些资源是我们编程优化演进的一个方向.今天说的线程池就是一种对 ...

  2. Java 线程池的工作原理

    文章目录 概念 线程中的基本方法 线程复用 线程池的核心组件和核心类 线程池的工作原理 线程池中的workQueue任务队列 直接提交队列(SynchronousQueue) 有界任务队列(Array ...

  3. Java线程池使用与原理

    线程池是什么? 我们可以利用java很容易创建一个新线程,同时操作系统创建一个线程也是一笔不小的开销.所以基于线程的复用,就提出了线程池的概念,我们使用线程池创建出若干个线程,执行完一个任务后,该线程 ...

  4. 好文推荐:深入分析Java线程池的实现原理

    线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,合理的使用线程池对线程进行统一分配.调优和监控,有以下好处: 1.降低资源消耗: 2.提高响应速度: 3.提高线程的可管理 ...

  5. 深入源码,深度解析Java 线程池的实现原理

    java 系统的运行归根到底是程序的运行,程序的运行归根到底是代码的执行,代码的执行归根到底是虚拟机的执行,虚拟机的执行其实就是操作系统的线程在执行,并且会占用一定的系统资源,如CPU.内存.磁盘.网 ...

  6. 全面解读Java线程池的工作原理

    目录 一.为什么引入线程池技术? 二.Executor框架 2.1 Runnable.Callable与Future接口 2.2 Executor接口 2.2.1 Executor 2.2.2 Exe ...

  7. 彻底搞懂Java线程池的工作原理

    一.线程池的基础知识 创建线程需要占用一定的操作系统资源,在高并发情况下,频繁的创建和销毁线程会大量消耗CPU和内存资源,对程序性能造成很大的影响.为了避免这一问题,Java提供了线程池(通过线程复用 ...

  8. 给女朋友讲 : Java线程池的内部原理

    文章持续更新,微信搜索「 万猫学社 」第一时间阅读. 关注后回复「 电子书 」,免费获取12本Java必读技术书籍. 餐厅的约会 餐盘在灯光的照耀下格外晶莹洁白,女朋友拿起红酒杯轻轻地抿了一小口,对我 ...

  9. java线程池的工作原理_JAVA线程池原理详解一

    线程池的优点 1.线程是稀缺资源,使用线程池可以减少创建和销毁线程的次数,每个工作线程都可以重复使用. 2.可以根据系统的承受能力,调整线程池中工作线程的数量,防止因为消耗过多内存导致服务器崩溃. 线 ...

最新文章

  1. 为什么free()时不需要传指针大小
  2. mysql proxy 读写分离 1
  3. PHP获取当前页面的网址
  4. C++类的使用(四)—— 继承
  5. 安装DNN时,数据库连接字符串的设置
  6. CF903G-Yet Another Maxflow Problem【线段树,最大流】
  7. Matlab中的logspace函数,matlab之logspace函数
  8. Java并发优化思路
  9. 安卓4.4.4安装哪个微信版本_??微信又更新!暗黑模式可独立设置,新增群接龙固定入口...
  10. 常见Java开发过程中遇到的问题及其解决办法
  11. Redis集群的原理和搭建
  12. power bi可视化表_在Power BI报表视图中创建可视化
  13. 你不知道的Event
  14. VMware View 要求操作句柄的状态错误
  15. ADODB.Connection、ADODB.RecordSet
  16. 51单片机开发入门(1)-单片机简介
  17. 文本相似度计算(中英文)详解实战
  18. 高德地图坐标查询工具——JavaScript
  19. matlab析取范式求主析取范式用电脑,(p∧q)∨r 求其主析取范式 再用主析取范式求主合取范式...
  20. 使用Excel和Matlab批量修改图片名称

热门文章

  1. vivo S5官方广告正式揭晓:11月14日发布!
  2. 头回见!95后女大学生买iPhone11出租:租借者想尝鲜或显摆
  3. 奥斯卡公布最佳动画长片初选名单 《哪吒》等32部动画入选
  4. 腾讯音乐2019Q2财报:在线音乐付费用户达到创纪录的3100万
  5. 这款App因涉嫌传销被罚7456万:会员层级达51级 收取佣金4.5亿
  6. 走捷径拿到大厂25K高级测试Offer,别不服!
  7. 把光标放在EditText中文本最后
  8. javascript获取窗口和div位置
  9. stl::vector排序二例
  10. 【clickhouse】clickhouse 副本与分片 分片详解