文章目录

  • 概述
  • ThreadPoolExecutor
    • ThreadPoolExecutor 的主要属性
    • Worker 主要属性
  • 线程池的状态
    • 线程池的状态流转
  • 线程池提交任务的执行流程
  • 线程数量的设置
  • 线程池的种类
    • FixedThreadPool
    • CachedThreadPool
    • SingleThreadExecutor
    • ScheduledThreadPoolExecutor
    • SingleThreadScheduledExecutor
    • ForkJoinPool
      • work-stealing
      • ForkJoinPool 注意事项

概述

线程池,是资源池化思想的一种实现。线程是一种宝贵且有限的 CPU 资源,一个线程的创建跟销毁的成本是比较高的。
所以创建线程池,主要有以下两个目的:

  • 复用线程:单个线程创建使用完毕后,可以不用立马销毁,而是把这个线程放入到线程池中,等待下次执行任务
  • 管理线程:线程是一种宝贵且有限的 CPU 资源,其数量并不是无上限的
    • 可以通过线程池来限制创建线程的数量,也可以通过线程池来决定何时销毁冗余创建的线程
    • 在线程池中存在多个线程时,可以通过线程池来协调每个线程的任务执行情况,从而避免出现线程处于空闲状态,造成线程资源的浪费

ThreadPoolExecutor

ThreadPoolExecutor 是一个通用的线程池的实现,类图如下所示:

可以看到,ThreadPoolExecutor 主要实现了 ExecutorExecutorServiceAbstractExecutorService

ThreadPoolExecutor 的主要属性

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private final BlockingQueue<Runnable> workQueue;private final ReentrantLock mainLock = new ReentrantLock();/*** Set containing all worker threads in pool. Accessed only when* holding mainLock.*/private final HashSet<Worker> workers = new HashSet<Worker>();/*** Wait condition to support awaitTermination*/private final Condition termination = mainLock.newCondition();/*** Tracks largest attained pool size. Accessed only under* mainLock.*/private int largestPoolSize;/*** Counter for completed tasks. Updated only on termination of* worker threads. Accessed only under mainLock.*/private long completedTaskCount;private volatile ThreadFactory threadFactory;/*** Handler called when saturated or shutdown in execute.*/private volatile RejectedExecutionHandler handler;/*** Timeout in nanoseconds for idle threads waiting for work.* Threads use this timeout when there are more than corePoolSize* present or if allowCoreThreadTimeOut. Otherwise they wait* forever for new work.*/private volatile long keepAliveTime;/*** If false (default), core threads stay alive even when idle.* If true, core threads use keepAliveTime to time out waiting* for work.*/private volatile boolean allowCoreThreadTimeOut;/*** Core pool size is the minimum number of workers to keep alive* (and not allow to time out etc) unless allowCoreThreadTimeOut* is set, in which case the minimum is zero.*/private volatile int corePoolSize;/*** Maximum pool size. Note that the actual maximum is internally* bounded by CAPACITY.*/private volatile int maximumPoolSize;/*** The default rejected execution handler*/private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();
  • 内置一个 AtomicInteger,高 3 位用于表示线程池当前状态,低 29 位用于表示线程个数
  • 内置一个阻塞队列,用于核心线程都处于执行状态后保存任务
  • 内置一个 ReentrantLock 独占锁,以及一个由锁对象创建出来的条件对象 Condition termination
  • 一个去重集合 HashSet<Worker> workers,用于保存线程
  • 历史出现过的最多线程数量 int largestPoolSize
  • 已完成的任务数量 long completedTaskCount
  • volatile 关键字修饰的线程组 ThreadFactory threadFactory
  • volatile 关键字修饰的拒绝策略 RejectedExecutionHandler handler
  • volatile 关键字修饰的空闲线程最大存活时间 long keepAliveTime
  • volatile 关键字修饰的是否允许核心线程超时 boolean allowCoreThreadTimeOut
  • volatile 关键字修饰的核心线程数量 int corePoolSize
  • volatile 关键字修饰的最大线程数量 int maximumPoolSize
  • 默认的拒绝策略 new AbortPolicy(),即默认的拒绝策略是抛出异常

Worker 主要属性

Worker 即工作线程。Worker 继承了 AQS 并实现了 Runnable 接口,主要有以下几个属性:

  • Thread thread:线程对象
  • Runnable firstTask:首次执行的任务

线程池的状态

线程池一共有 5 个状态,分别是:

  • RUNNING:运行状态,接受新任务并且处理阻塞队列里的任务
  • SHUTDOWN:标记关闭状态,拒绝新任务,但是处理正在执行和阻塞队列里的任务
  • STOP:关闭状态,拒绝新任务并且清空阻塞队列,同时会中断所有线程
  • TIDYING:清场状态,线程池和阻塞队列都为空,将要调用 terminated() 方法

线程池的状态流转

  • 线程池一旦被创建,就是 RUNNING 状态,可以正常地执行任务,以及接受新任务
  • RUNNING 状态调用 shutdown() 方法后,线程池就会切换到 SHUTDOWN 状态,标记关闭状态下
    • 遍历线程池,当前所有空闲线程都会被调用 interrupt() 方法设置中断( 被设置中断的线程将会从线程池中移除)
    • 不再接受新任务,新提交的任务将直接丢弃
    • 当前正在执行任务的线程把正在执行的任务继续完成,且这些线程将会继续从阻塞队列中获取任务执行
  • RUNNING 或者 SHUTDOWN 状态调用 shutdownNow() 方法后,线程池就会切换到 STOP 状态
    • 不再接受新任务
    • 遍历线程池,调用每个线程的 interrupt() 方法设置中断
    • 遍历阻塞队列,移除所有任务
  • SHUTDOWN 或者 STOP 状态持续一段时间后,当线程池中没有线程,并且阻塞队列也为空后,线程池将会切换到 TIDYING 状态
  • TIDYING 状态将会做最后的收尾工作,确保所有资源都被释放

线程池提交任务的执行流程

  • 当有新任务提交时,判断当前核心线程数是否小于最大核心线程数

    • 如果小于最大核心线程数,则创建一个新线程执行任务
  • 如果大于核心线程数,则尝试将任务提交到阻塞队列中
    • 如果阻塞队列未满,则将任务提交到阻塞队列中
  • 如果阻塞队列已满,则尝试开启新线程
    • 如果当前线程数量小于最大线程数,则创建一个新线程执行任务
    • 如果当前线程数量等于最大线程数,则执行拒绝策略
      • AbortPolicy:直接抛出异常
      • CallerRunsPolicy:由提交任务的线程执行
      • DiscardPolicy:直接丢弃任务
      • DiscardOldestPolicy:将阻塞队列队头的任务取出并丢弃,然后重试提交

线程数量的设置

线程数量的经验计算公式为:

线程数 = CPU 核心数 *(1 + 平均等待时间 / 平均工作时间)

线程池的种类

FixedThreadPool

FixedThreadPool,即固定数量线程池,特点是核心线程数量 = 最大线程数量,且阻塞队列容量较大甚至无界

  • 固定数量,意味着线程不会频繁地创建和销毁,在线程数量达到核心线程数量后,在执行任务过程中没有发生异常的情况下,线程数量将不会再发生变化
  • 阻塞队列容量非常大,代表着任务可以被无条件地,一直不断地添加进来,可能会引发 GC 问题
    • 当任务的执行速度远小于任务的添加速度时,可能会导致阻塞队列中的节点数量特别巨大,吃光堆内存空间,导致 OOM

FixedThreadExecutor 适用于计算密集型任务, 确保 CPU 在长时间被单个工作线程使用的情况下, 尽可能少地创建、分配、销毁线程, 即适用于长期且数量可控的任务

CachedThreadPool

CachedThreadPool,即缓存线程池,特点是

  • 核心线程数量为 0,意味着在线程池空闲时,不会占用任何的线程资源
  • 最大线程数非常巨大,意味着可以创建非常多的线程,可能会导致系统线程资源耗尽的问题
  • 线程有较短的最大存活时间(如 60s),意味着每个工作线程的存活时间比较短
  • 阻塞队列不存储元素,意味着每次有新任务提交时,要么可以立马被一个空闲线程执行,要么可以立马创建一个新线程执行去执行

CachedThreadPool 适用于存在数量多且耗时少的任务场景,由于未限制线程最大数量,在任务的执行速度远小于任务的添加速度时,有可能会导致系统线程资源耗尽或引发 OOM

SingleThreadExecutor

SingleThreadExecutor,即单线程线程池,特点是:

  • 核心线程数量 = 最大线程数量 = 1,意味着线程池中最多只会有一个线程,意味着所有提交的任务都会被串行化执行,任意时刻不会有同时两个任务在被同时执行
  • 阻塞队列容量非常大甚至无界,代表着任务可以被无条件地,一直不断地添加进来,可能会引发 GC 问题

SingleThreadExecutor 适用于多个任务需要串行化执行的场景,由于阻塞队列无界,还是有可能引发 OOM
注意,Executors.newSingleThreadExecutor() 并不等价于 Executors.newFixedThreadPool(1)Executors 中的源码为:

public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}

可以看到,newSingleThreadExecutor() 方法创建出来的 ThreadPoolExecutor 对象被包了一层 FinalizableDelegatedExecutorService,所以这两者不等价

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor,是定时任务线程池,可以使用它来实现定时任务,主要的构造方法为:

    public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}

可以看到,ScheduledThreadPoolExecutor 的特点是:

  • 最大线程数无界
  • 空闲线程的最大存活时间为 0,即在线程池不会存储空闲线程
  • 阻塞队列 DelayedWorkQueue 是一个底层数据结构为堆的延迟阻塞队列,无界

SingleThreadScheduledExecutor

SingleThreadScheduledExecutor 即单线程定时任务线程池,可以保证所有定时任务都会由一个线程串行执行

ForkJoinPool

ForkJoinPool,是一个用于管理 ForkJoinWrokerThread 的线程池,ForkJoin 是一种基于分治思想的并发编程框架,通过将任务分解后使用多个线程并行计算,最后合并所有子任务的计算结果得到最终的计算结果。
ForkJoinPool 线程池可以把一个大任务拆分成小任务并行执行,但是任务类必须继承自 RecursiveTask 或者 RecursiveAction

work-stealing

工作窃取算法(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。主要实现的思路有一下几个关键点:

  • 每个工作者线程都会对应一个双端任务队列
  • 当某个工作者线程自己的任务队列中的任务执行完毕之后,将会从其他工作者线程的任务队列里窃取任务出来执行
  • 工作者线程从自己的任务队列的头部来获取任务(removeFirst
  • 工作者线程执行任务窃取时,将从其他工作者线程的任务队列的尾部来获取任务(removeLast

ForkJoinPool 注意事项

  • 任务类必须继承自 RecursiveTask 或者 RecursiveAction
  • 子任务间没有相互依赖
  • 任务最好不要包含阻塞 IO 操作

常用并发工具类(线程池)相关推荐

  1. 常用并发工具类(锁和线程间通信工具类)

    常用并发工具类总结 JUC 下的常用并发工具类(锁和线程间通信工具类),主要包括 ReentrantLock.ReentrantReadWriteLock.CountDownLatch.CyclicB ...

  2. 常用并发工具类(并发集合类)

    文章目录 概述 BlockingQueue ArrayBlockingQueue 数据存储相关属性 阻塞特性相关属性 主要方法 LinkedBlockingQueue LinkedBlockingQu ...

  3. CountDownLatch——常用并发工具类

    1)功能 CountDownLatch是一个同步的辅助类,允许一个或多个线程,等待其他一组线程完成操作,再继续执行. 2)原理: CountDownLatch是通过一个计数器来实现的,计数器的初始值为 ...

  4. JUC 常用 4 大并发工具类

    欢迎关注方志朋的博客,回复"666"获面试宝典 什么是JUC? JUC就是java.util.concurrent包,这个包俗称JUC,里面都是解决并发问题的一些东西 该包的位置位 ...

  5. day14 线程池、原子性、并发工具类

    目录 一.线程池 1.1 线程状态 1.2 线程池的基本原理 1.3 Executors默认线程池 1.4 创建指定上限的线程池 1.5 ThreadPoolExecutor(线程池执行器) 1.5. ...

  6. 第十章_多线程(2)_线程池原子性并发工具类

    目录 一.线程池 1 - 线程状态 2 - 线程池 3 - Executors线程池 二.Volatile 三.原子性 四.并发工具类 1 - 并发工具类-Hashtable 2 - 并发工具类-Co ...

  7. 线程池,Volatile,原子性类AtomicInteger,乐观锁悲观锁,并发工具类Hashtable,ConcurrentHashMap类,Semaphore类

      目录 一.线程的状态 二.线程池 1.创建线程池的方式 1.1线程池-Executors默认线程池 1.2线程池-Executors创建指定上限的线程池 1.3线程池-ThreadPoolExec ...

  8. 线程池、volatile、原子性、并发工具类

    目录 线程状态 线程池-基本原理 线程池 - Executors默认线程池 线程池 - ThreadPoolExecutor 线程池参数-拒绝策略 volatile 原子性 原子性 - AtomicI ...

  9. 《Java并发编程的艺术》——Java中的并发工具类、线程池、Execute框架(笔记)

    文章目录 八.Java中的并发工具类 8.1 等待多线程完成的CountDownLatch 8.2 同步屏障CyclicBarrier 8.2.1 CyclicBarrier简介 8.2.2 Cycl ...

最新文章

  1. 131. 分割回文串(回溯算法)
  2. 【Apache】 LXC 容器中重启 Apache 报错: Failed to set up mount namespacing: Permission denied
  3. K8S operator方式部署redis-cluster
  4. pythonanywhere使用:进入虚拟机及修改django项目的css样式
  5. js兼容安卓与ios的复制到粘贴板功能
  6. java中的values函数_巧用valueat函数(快逸免费版)
  7. 【华为云技术分享】盘点物联网常用开发板
  8. C# 中用stopwatch测试代码运行时间
  9. 用JavaScript编写COM组件的实例
  10. 条码软件如何设置十六进制并跳号打印
  11. 使用OpenCV将一个三角形图形扭曲到另一个三角形
  12. Collecting Luggage - UVALive 2397 - 蓝桥杯 算法训练
  13. evolution邮箱_b2evolution简介
  14. 教你快速随意重命名多个文件夹名称
  15. phpStudy无法打开http://localhost/
  16. 一次jvm导致线上内存占用过高问题定位
  17. pear php linux,linux下安装PEAR、Zend Debugger和Smarty
  18. java网络爬虫0基础_简单的java爬虫程序(入门)
  19. i5 13400 和 i5-12400 差距 i513400对比12400选哪个好
  20. PGE - A Representation Learning Framework for Property Graphs 属性图表示学习框架 KDD 2019

热门文章

  1. python爬虫urllib 数据处理_Python 爬虫笔记之Urllib的用法
  2. pygame render怎么显示中文_PyGame开发游戏(2D)02.基础图元
  3. 51单片机怎么显示当前时间_51单片机玩转物联网基础篇06-LCD1602液晶显示器
  4. ruby推送示例_Ruby直到示例循环
  5. 单层神经网络线性回归_单层神经网络| 使用Python的线性代数
  6. matlab拔河比赛_拔河比赛
  7. asp.net 操作ftp 通用代码[测试通过]
  8. Oracle笔记:备份还原
  9. C#中利用Expression表达式树进行多个Lambda表达式合并
  10. 蒙特卡洛法求圆周率 c语言,c++蒙特卡洛法求圆周率