工作中经常用到线程池,最常见的写法是Executors.newFixedThreadPool,线程池具体是如何处理我们提交的任务的呢?这里再走读源码记录一遍。

线程池这块的整体结构:

从整体上看类图,具体线程池具体实现其实就2个类ThreadPoolExecutor和ScheduledThreadPoolExecutor。

经常使用到的Executors类中可以看到它提供静态方法创建各种ThreadPoolExecutor以及ScheduledThreadPoolExecutor。
例如,创建固定线程池:

public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

创建单个线程的线程池:

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory));
}

创建具有延时执行功能的线程池:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) {return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

其实无非就是包装了创建线程池对象的方法,返回对应的实例,其实是工厂模式。
具体threadFactory、LinkedBlockingQueue、ThreadPoolExecutor、ScheduledThreadPoolExecutor类干了什么后面再继续看。

1. ThreadPoolExecutor

先看ThreadPoolExecutor类,它继承了AbstractExecutorService,我们知道一般抽象类其实是统一实现了一部分通用的接口功能,避免各子类重复实现,从这一点分析,也可以看出来上面的整体类图是有遗漏的,不太可能只有一个ThreadPoolExecutor继承它,应该还有其它子类,继续分析它的子类,会发现在Executors在创建线程池时,有静态内部类继承了AbstractExecutorService:

static class DelegatedExecutorService extends AbstractExecutorService {private final ExecutorService e;DelegatedExecutorService(ExecutorService executor) { e = executor; }public void execute(Runnable command) { e.execute(command); }public void shutdown() { e.shutdown(); }public List<Runnable> shutdownNow() { return e.shutdownNow(); }public boolean isShutdown() { return e.isShutdown(); }public boolean isTerminated() { return e.isTerminated(); }public boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException {return e.awaitTermination(timeout, unit);}public Future<?> submit(Runnable task) {return e.submit(task);}public <T> Future<T> submit(Callable<T> task) {return e.submit(task);}...

这个方法包装了AbstractExecutorService,没有设置corePoolSize、keepAliveTime等方法,这个包装类用来创建单线程池

(newSingleThreadExecutor/newSingleThreadScheduledExecutor)等不可设置这些属性的实例。接下来继续看ThreadPoolExecutor源码属性

,它有几个状态值:

static final int RUNNING    = 0;
static final int SHUTDOWN   = 1;
static final int STOP       = 2;
static final int TERMINATED = 3;

线程池状态之间的切换:

其它属性:

private final BlockingQueue<Runnable> workQueue;
private final HashSet<Worker> workers = new HashSet<Worker>();
private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();
private volatile long  keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
private volatile int   corePoolSize;
private volatile int   maximumPoolSize;
private volatile int   poolSize;
...

workQueue用于存储可执行的任务,是一个阻塞队列类型BlockingQueue。
workers保存了所有线程池中工作的线程。
mainLock锁用来控制poolSize、corePoolSize、maximumPoolSize、runState、workers集合。由此可见多线程环境下,对共享变量的所有访问都需要同步化。
keepAliveTime如果池中当前有多于 corePoolSize 的线程,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止。
allowCoreThreadTimeOut 超时策略应用于核心线程。
corePoolSize核心线程数
maximumPoolSize允许的最大线程数。
poolSize池中的当前线程数。

其内部方法有很多,那就先从我们最常用的方法开始看起,串成一条执行链路。

newFixedThreadPool.execute(new ThreadForpools(index));

根据代码的执行逻辑,大概提交一个任务的执行流程如下:

看源码,你会发现都使用到了充入锁ReentrantLock:

private boolean addIfUnderCorePoolSize(Runnable firstTask) {Thread t = null;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (poolSize < corePoolSize && runState == RUNNING)t = addThread(firstTask);} finally {mainLock.unlock();}return t != null;
}

mainLock 是全局属性,一个线程池实例共用一个锁。

当任务添加到worders后,又是怎么执行的呢?

worker实际上是线程池自己定义的一个内部类 java.util.concurrent.ThreadPoolExecutor.Worker:

当提交一个任务到workers中后,调用了线程的start方法,实际上执行了,Worker类的run方法:

 public void run() {try {hasRun = true;Runnable task = firstTask;firstTask = null;while (task != null || (task = getTask()) != null) {runTask(task);task = null;}} finally {workerDone(this);}}

启动一个任务后,该线程会不停的调用getTask方法获取任务调用runTask执行:

Runnable getTask() {for (;;) {try {int state = runState;if (state > SHUTDOWN)return null;Runnable r;if (state == SHUTDOWN)  // Help drain queuer = workQueue.poll();else if (poolSize > corePoolSize || allowCoreThreadTimeOut)r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);elser = workQueue.take();if (r != null)return r;if (workerCanExit()) {if (runState >= SHUTDOWN) // Wake up othersinterruptIdleWorkers();return null;}// Else retry} catch (InterruptedException ie) {// On interruption, re-check runState}}
}

getTask会判断线程池状态:

  • 1.如果是STOP或者TERMINATED,则返回task=null,调用workers.remove(w) 从workers中移除。

  • 2.如果是SHUTDOWN,则获取任务队列workQueue头部的任务(workQueue.poll())

  • 3.如果是poolSize > corePoolSize || allowCoreThreadTimeOut,则返回workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),这里就体现了keepAliveTime的用法,如果allowCoreThreadTimeOut设置为true,则空闲的多余corePoolSize的线程将会在存活keepAliveTime时长后终止。

  • 4.如果是其他情况,则会调用workQueue.take() 一直阻塞,直到有新的任务添加到workQueue。

当获取到task!=null时,继续调用runTask执行,如果线程池的状态是STOP|TERMINATED,或者当前任务状态是hasRun但是被中断了,则中断当前任务线程,否则调用run方法,这里需要注意了,start和run方法的区别,直接调用run方法是不会创建新线程的,只是同步的run方法调用,当该任务线程执行完run方法后,又会堵塞到workQueue.take方法上,所以能够保证线程池的线程一直存活。

再看下,线程池的状态是如何切换的

runState因为是int,所以初始是0(RUNNING),有如下的方法会改变线程池的状态:

public void shutdown()
public List shutdownNow()

1.调用shutDown方法,无返回。

  • 置线程池状态为SHUTDOWN,中断所有workers空闲线程。这是判断是否线程空闲,用了个很巧妙的办法:

  • 在执行runTask时,都会获取runLock锁,然后在判断是否空闲时,参数下获取该锁tryLock:

如果没有获取到,说明该线程是任务执行状态,如果获取到则interrupt。

  • 调用tryTerminate如果poolSize=0,workQueue也为空,则把线程池状态置为TERMINATED。

2.调用shutDownNow方法,有返回

  • 置线程池状态为STOP,立即调用workers中所有执行线程的thread.interrupt方法
  • copy workQueue中任务
  • 调用tryTerminate 如果poolSize=0,workQueue也为空,则把线程池状态置为TERMINATED。

可以看出来这二个方法的区别是:
shutDownNow 直接调用thread.interrupt,不会等正在执行中的任务线程执行结束,返回任务队列中的任务。
shutDown 等待执行中的线程执行结束,没有返回值。

2.BlockingQueue

队列是作为线程池暂存提交任务的地方,先来看下队列这块的整体结构:

从整体上看ConcurrentLinkedQueue的功能相对较少,只具备Queue接口、abstractQueue类的功能。其它6类在此之上还具备blockingQueue或/blockingDeque接口的功能。

先看初始化线程池时,最常用的LinkedBlockingQueue。

LinkedBlockingQueue阻塞队列大小的配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,如果不指定,它就是无边界的。说是无边界,其实是采用了默认大小为Integer.MAX_VALUE(231-1)的容量 。它的内部实现是一个链表。

上面走读线程池源码的时候说到,如果当前线程数大于corePoolSize并且线程池状态是RUNNING,就会添加到阻塞任务队列中,调用了workQueue.offer(command)。

public boolean offer(E e) {if (e == null) throw new NullPointerException();//count表示当前队列中元素个数,AtomicInteger类型确保并发条件下的更新操作是原子的。final AtomicInteger count = this.count;//如果队列满了,直接返回falseif (count.get() == capacity)return false;int c = -1;final ReentrantLock putLock = this.putLock;//如果队列没有满,则获取该队列putLock锁putLock.lock();try {if (count.get() < capacity) {enqueue(e);c = count.getAndIncrement();if (c + 1 < capacity)//如果放入后,队列没有满,则唤醒notFull条件等待线程。notFull.signal();}} finally {putLock.unlock();}if (c == 0)signalNotEmpty();return c >= 0;}```当worker在执行时,线程池状态state == SHUTDOWN 或者 poolSize > corePoolSize 或者allowCoreThreadTimeOut有设置时, 线程池获取任务队列都会调用poll() : ```java
public E poll() {final AtomicInteger count = this.count;if (count.get() == 0)return null;E x = null;int c = -1;final ReentrantLock takeLock = this.takeLock;takeLock.lock();try {if (count.get() > 0) {//获取队列首项,并从队列中移除x = dequeue();c = count.getAndDecrement();if (c > 1)//移除后,队列非空,则唤醒notEmpty条件等待线程notEmpty.signal();}} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}//获取队列首项,并从队列中移除private E dequeue() {// assert takeLock.isHeldByCurrentThread();Node<E> h = head;Node<E> first = h.next;h.next = h; // help GChead = first;E x = first.item;first.item = null;return x;}```
可以看出调用poll方法时,如果获取到锁,则不会堵塞,队列为空则直接返回null。所以线程池为 SHUTDOWN状态时,不会堵塞任务队列的获取,能速度结束线程执行。但是如果线程池状态为RUNNING切poolSize<=corePoolSize时,会调用阻塞队列的workQueue.take()获取任务:```java
public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;//获取锁时,可中断等待takeLock.lockInterruptibly();try {   // 防止虚假唤醒,所以用whilewhile (count.get() == 0) {//队列为空时,等待notEmpty条件唤醒notEmpty.await();}//否则返回队首任务,并从队列中移除。x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}

可见看出,当队列任务为空时,take方法会堵塞当前任务线程,同时等待队列任务添加,这样就能保持当前线程不会结束从而被回收。

ThreadPoolExecutor源码走读相关推荐

  1. Apache Spark源码走读之16 -- spark repl实现详解

    欢迎转载,转载请注明出处,徽沪一郎. 概要 之所以对spark shell的内部实现产生兴趣全部缘于好奇代码的编译加载过程,scala是需要编译才能执行的语言,但提供的scala repl可以实现代码 ...

  2. JAVA线程池(ThreadPoolExecutor)源码分析

    JAVA5提供了多种类型的线程池,如果你对这些线程池的特点以及类型不太熟悉或者非常熟悉,请帮忙看看这篇文章(顺便帮忙解决里面存在的问题,谢谢!):     http://xtu-xiaoxin.ite ...

  3. [并发编程] - Executor框架#ThreadPoolExecutor源码解读03

    文章目录 Pre execute源码分析 addWorker()解读 Worker解读 Pre [并发编程] - Executor框架#ThreadPoolExecutor源码解读02 说了一堆结论性 ...

  4. [并发编程] - Executor框架#ThreadPoolExecutor源码解读02

    文章目录 Pre 线程池的具体实现 线程池的创建 参数解读 corePoolSize maximumPoolSize keepAliveTime unit workQueue threadFactor ...

  5. [并发编程] - Executor框架#ThreadPoolExecutor源码解读01

    文章目录 Pre Thread Java线程与OS线程 生命状态 状态切换 线程池 why use case Advantage Executor框架 ThreadPoolExecutor 源码分析 ...

  6. 多线程与高并发(八):ThreadPoolExecutor源码解析, SingleThreadPool,CachedPool,FixedThreadPool,ForkJoinPoll 等

    线程池 今天我们来看看JDK给我们提供的默认的线程池的实现. ThreadPoolExecutor:我们通常所说的线程池.多个线程共享同一个任务队列. SingleThreadPool CachedP ...

  7. 面试官系统精讲Java源码及大厂真题 - 37 ThreadPoolExecutor 源码解析

    37 ThreadPoolExecutor 源码解析 当你做成功一件事,千万不要等待着享受荣誉,应该再做那些需要的事. -- 巴斯德 引导语 线程池我们在工作中经常会用到.在请求量大时,使用线程池,可 ...

  8. spark on yarn yarn-client模式实现源码走读

    Spark版本2.4.0 在SparkContext的初始化过程中,将会根据配置的启动模式来选择不同的任务调度器TaskScheduler,而这个不同模式的实现也是在这里根据选择的TaskSchedu ...

  9. Java 1.7 ThreadPoolExecutor源码解析

    Java中使用线程池技术一般都是使用Executors这个工厂类,它提供了非常简单方法来创建各种类型的线程池: public static ExecutorService newFixedThread ...

最新文章

  1. FILE文件流的中fopen、fread、fseek、fclose的使用
  2. SQL Server 字符串操作
  3. 计算机 维修 pdf,简单计算机维修..pdf
  4. LeetCode 669. Trim a Binary Search Tree修剪二叉搜索树 (C++)
  5. latex的基本使用
  6. 浏览器css bug及bug解决方法
  7. 使用scala使用fastjson将map转json报错
  8. WinRAR安装程序打包教程
  9. VS+QT多语言实现(中文乱码问题、tr()包含不生成问题)
  10. 数据库事务隔离级别与锁
  11. 图片太大怎么压缩变小?教你四招快捷压缩图片
  12. hourglass论文_Stacked Hourglass networks
  13. 2.模仿小米通讯录的快速索引demo
  14. Android端恶意锁屏勒索应用分析
  15. 业务流程管理工具的概览和比较分析
  16. 你知道豆瓣电影是怎么评分的吗?(实战篇—手把手教你分析豆瓣电影)
  17. 加入购物车里面的商品被商家调整价格以后如何处理金额问题
  18. android手机几大厂商排行榜,各大手机厂商的核心产品UI排行榜出炉
  19. RVM算法的matlab实现
  20. 数字图像matlab边缘检测(一)

热门文章

  1. rufus最新版本3.14 格式化和创建可引导USB闪存盘工具
  2. 计算机启动的四种方式,电脑有几种开机方式
  3. PDF文件一键压缩工具V1.0-免费版
  4. C# 建一个Windows 服务 定时发邮件
  5. Presto (三) --------- Presto 优化
  6. 完美的正方形分割(二)
  7. excel最强教科书电子版_Excel最强教科书(完全版全彩印刷)
  8. [经验总结]Perl模块使用 = 简短例子代码集合
  9. 张小龙首次全面阐述小程序,宣布1月9日上线(附微信公开课演讲全文)
  10. 图像处理计算机基本配置,图形图像工作的电脑配置推荐_DIY攒机硬件郎中-中关村在线...