一、背景

今天在浏览代码发现前辈使用了ThreadPoolTaskExecutor,一时间有点懵,因为并不属于任何一个jdk下的线程池。后面浏览资料发现它属于Spring自带,所以根据网上博客来学习下:
首先在学习Spring自带的ThreadPoolTaskExecutor之前,我们先来回顾下老朋友:jdk下的ThreadPoolExecutor,很多人容易把这两个搞混。

二、ThreadPoolExecutor

这个类是JDK中的线程池类,继承自Executor, Executor 顾名思义是专门用来处理多线程相关的一个接口,所有线程相关的类都实现了这个接口,里面有一个execute()方法,用来执行线程,线程池主要提供一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁的额外开销,提高了响应的速度。相关的继承实现类图例如ScheduledThreadPoolExecutor。

2.1、线程池接口

ExecutorService为线程池接口,提供了线程池生命周期方法,继承自Executor接口ThreadPoolExecutor为线程池实现类,提供了线程池的维护操作等相关方法,继承自AbstractExecutorService,AbstractExecutorService实现了ExecutorService接口。

2.2、java.util.concurrent.Executor 负责线程的使用和调度的根接口

    |--ExecutorService 子接口: 线程池的主要接口|--ThreadPoolExecutor 线程池的实现类|--ScheduledExceutorService 子接口: 负责线程的调度|--ScheduledThreadPoolExecutor : 继承ThreadPoolExecutor,实现了ScheduledExecutorService

2.3、工具类 : Executors

Executors为线程池工具类,相当于一个工厂类,用来创建合适的线程池,返回ExecutorService类型的线程池。有人如下方法。
ExecutorService newFixedThreadPool() : 创建固定大小的线程池
ExecutorService newCachedThreadPool() : 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量。
ExecutorService newSingleThreadExecutor() : 创建单个线程池。 线程池中只有一个线程

ScheduledExecutorService newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务

其中AbstractExecutorService是他的抽象父类,继承自ExecutorService,ExecutorService 接口扩展Executor接口,增加了生命周期方法。

实际应用中我一般都比较喜欢使用Exectuors工厂类来创建线程池,里面有五个方法,分别创建不同的线程池,如上,创建一个制定大小的线程池,Exectuors工厂实际上就是调用的ExectuorPoolService的构造方法,传入默认参数。

public class Executors {/*** Creates a thread pool that reuses a fixed number of threads* operating off a shared unbounded queue.  At any point, at most* {@code nThreads} threads will be active processing tasks.* If additional tasks are submitted when all threads are active,* they will wait in the queue until a thread is available.* If any thread terminates due to a failure during execution* prior to shutdown, a new one will take its place if needed to* execute subsequent tasks.  The threads in the pool will exist* until it is explicitly {@link ExecutorService#shutdown shutdown}.** @param nThreads the number of threads in the pool* @return the newly created thread pool* @throws IllegalArgumentException if {@code nThreads <= 0}*/public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}/*** Creates a thread pool that maintains enough threads to support* the given parallelism level, and may use multiple queues to* reduce contention. The parallelism level corresponds to the* maximum number of threads actively engaged in, or available to* engage in, task processing. The actual number of threads may* grow and shrink dynamically. A work-stealing pool makes no* guarantees about the order in which submitted tasks are* executed.** @param parallelism the targeted parallelism level* @return the newly created thread pool* @throws IllegalArgumentException if {@code parallelism <= 0}* @since 1.8*/public static ExecutorService newWorkStealingPool(int parallelism) {return new ForkJoinPool(parallelism,ForkJoinPool.defaultForkJoinWorkerThreadFactory,null, true);}

2.4、ThreadPoolExecutor

当然,例如阿里的规范。是不允许直接使用Executors去创建线程池的,我们可以使用ThreadPoolExecutor

package com.mbw.design_rule.figure;import java.util.concurrent.*;public class ThreadDemo {public static void main(String[] args) {ExecutorService es = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());}
}

首先是静态方法newSingleThreadExecutor()、newFixedThreadPool(int nThreads)、newCachedThreadPool()。我们来看一下其源码实现(基于JDK8)。

public class Executors {public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}}

通过查看源码我们知道上述三种静态方法的内部实现均使用了ThreadPoolExecutor类。难怪阿里会建议通过ThreadPoolExecutor的方式实现,原来Executors类的静态方法也是用的它,只不过帮我们配了一些参数而已
第二是ThreadPoolExecutor类的构造方法。既然现在要直接使用ThreadPoolExecutor类了,那么其中的初始化参数就要我们自己配了,了解其构造方法势在必行。

ThreadPoolExecutor类一共有四个构造方法,我们只需要了解之中的一个就可以了,因为其他三种构造方法只是帮我们配置了一些默认参数,最后还是调用了它。

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {}

其中的参数含义是:
corePoolSize:线程池中的线程数量;
maximumPoolSize:线程池中的最大线程数量;
keepAliveTime:当线程池线程数量超过corePoolSize时,多余的空闲线程会在多长时间内被销毁;
unit:keepAliveTime的时间单位;
workQueue:任务队列,被提交但是尚未被执行的任务;
threadFactory:线程工厂,用于创建线程,一般情况下使用默认的,即Executors类的静态方法defaultThreadFactory();handler:拒绝策略。当任务太多来不及处理时,如何拒绝任务。
对于这些参数要有以下了解:

①corePoolSize与maximumPoolSize的关系

首先corePoolSize肯定是 <= maximumPoolSize。

其他关系如下:

若当前线程池中线程数 < corePoolSize,则每来一个任务就创建一个线程去执行;
若当前线程池中线程数 >= corePoolSize,会尝试将任务添加到任务队列。如果添加成功,则任务会等待空闲线程将其取出并执行;

若队列已满,且当前线程池中线程数 < maximumPoolSize,创建新的线程,这类线程又叫救急线程
若当前线程池中线程数 >= maximumPoolSize,则会采用拒绝策略(JDK提供了四种,下面会介绍到)
注意:关系3是针对的有界队列,无界队列永远都不会满,所以只有前2种关系。

②workQueue

参数workQueue是指提交但未执行的任务队列。若当前线程池中线程数>=corePoolSize时,就会尝试将任务添加到任务队列中。主要有以下几种:

  • SynchronousQueue:直接提交队列。SynchronousQueue没有容量,所以实际上提交的任务不会被添加到任务队列,总是将新任务提交给线程执行,如果没有空闲的线程,则尝试创建新的线程,如果线程数量已经达到最大值(maximumPoolSize),则执行拒绝策略。
  • LinkedBlockingQueue:无界的任务队列。当有新的任务来到时,若系统的线程数小于corePoolSize,线程池会创建新的线程执行任务;当系统的线程数量等于corePoolSize后,因为是无界的任务队列,总是能成功将任务添加到任务队列中,所以线程数量不再增加。若任务创建的速度远大于任务处理的速度,无界队列会快速增长,直到内存耗尽。

③handler

JDK内置了四种拒绝策略

  • DiscardOldestPolicy策略:丢弃任务队列中最早添加的任务,并尝试提交当前任务;
  • CallerRunsPolicy策略:调用主线程执行被拒绝的任务,这提供了一种简单的反馈控制机制,将降低新任务的提交速度。
  • DiscardPolicy策略:默默丢弃无法处理的任务,不予任何处理。
  • AbortPolicy策略:直接抛出异常,阻止系统正常工作。

④处理流程

1.查看核心线程池是否已满,不满就创建一条线程执行任务,否则执行第二步。

2.查看任务队列是否已满,不满就将任务存储在任务队列中,否则执行第三步。

3.查看线程池是否已满,即就是是否达到最大线程池数,不满就创建一条线程执行任务,否则就按照策略处理无法执行的任务。

流程图如下

三、ThreadPoolTaskExecutor

上一篇分享了JDK自带的线程池ThreadPoolExecutor的配置和参数详解,然而我们实际开发中更多的是使用SpringBoot来开发,Spring默认也是自带了一个线程池方便我们开发,它就是ThreadPoolTaskExecutor,ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理。接下来我们就来聊聊Spring的线程池吧。

3.1、@Async

在聊spring线程池之前,首先需要了解一个注解–@Async,当一个方法标上该注解时,在被调用的时候会开启一个新的线程开始异步操作,即用于异步处理
在SpringBoot环境中,要使用@Async注解,我们需要先在启动类上加上@EnableAsync注解。这个与在SpringBoot中使用@Scheduled注解需要在启动类中加上@EnableScheduling是一样的道理。加上@EnableAsync注解后,如果我们想在调用一个方法的时候开启一个新的线程开始异步操作,我们只需要在这个方法上加上@Async注解,当然前提是,这个方法所在的类必须在Spring环境中
如果对@Async底层感兴趣可以去浏览https://blog.csdn.net/BryantLmm/article/details/85129372

①创建线程池

我们可以使用springBoot默认的线程池,不过一般我们会自定义线程池(因为比较灵活),配置方式有:

  • 使用 xml 文件配置的方式
  • 使用Java代码结合@Configuration进行配置(推荐使用)
    那么接下来我们是用第二种方法创建线程池
package com.mbw.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.ThreadPoolExecutor;@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {/** 核心线程数(默认线程数) */private static final int CORE_POOL_SIZE = 20;/** 最大线程数 */private static final int MAX_POOL_SIZE = 100;/** 允许线程空闲时间(单位:默认为秒) */private static final int KEEP_ALIVE_TIME = 10;/** 缓冲队列大小 */private static final int QUEUE_CAPACITY = 200;/** 线程池名前缀 */private static final String THREAD_NAME_PREFIX = "mbw-Async-";@Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名public ThreadPoolTaskExecutor taskExecutor(){ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(CORE_POOL_SIZE);executor.setMaxPoolSize(MAX_POOL_SIZE);executor.setQueueCapacity(QUEUE_CAPACITY);executor.setKeepAliveSeconds(KEEP_ALIVE_TIME);executor.setThreadNamePrefix(THREAD_NAME_PREFIX);// 线程池对拒绝任务的处理策略// CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 初始化executor.initialize();return executor;}
}

样就会申明一个线程池Bean

②编写异步方法

在使用多线程方法上标注@Async时表明调用的线程池,如下
注意:该类一定要处于spring环境中

package com.mbw.design_rule;import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class AsyncTask {@Async("taskExecutor")public void doTaskThree() throws Exception {log.info("开始做任务三");long start = System.currentTimeMillis();Thread.sleep(10000);long end = System.currentTimeMillis();log.info("完成任务三,耗时:" + (end - start) + "毫秒");}}

③编写controller测试异步方法

package com.mbw.controller;import com.mbw.design_rule.AsyncDemo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class DemoController {@Autowiredpublic AsyncDemo asyncDemo;@GetMapping("/async")public void getResult() throws Exception {for (int i = 0; i < 5; i++) {asyncDemo.doTaskThree();}}
}

启动后:

3.2、Spring默认线程池simpleAsyncTaskExecutor

Spring异步线程池的接口类是TaskExecutor,本质还是java.util.concurrent.Executor,没有配置的情况下,默认使用的是simpleAsyncTaskExecutor。
@Async演示Spring默认的simpleAsyncTaskExecutor

@Component
@EnableAsync
public class ScheduleTask {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Async@Scheduled(fixedRate = 2000)public void testScheduleTask() {try {Thread.sleep(6000);System.out.println("Spring1自带的线程池" + Thread.currentThread().getName() + "-" + sdf.format(new Date()));} catch (InterruptedException e) {e.printStackTrace();}}@Async@Scheduled(cron = "*/2 * * * * ?")public void testAsyn() {try {Thread.sleep(1000);System.out.println("Spring2自带的线程池" + Thread.currentThread().getName() + "-" + sdf.format(new Date()));} catch (Exception ex) {ex.printStackTrace();}}
}

从运行结果可以看出Spring默认的@Async用线程池名字为SimpleAsyncTaskExecutor,而且每次都会重新创建一个新的线程,所以可以看到TaskExecutor-后面带的数字会一直变大。

simpleAsyncTaskExecutor的特点是,每次执行任务时,它会重新启动一个新的线程,并允许开发者控制并发线程的最大数量(concurrencyLimit),从而起到一定的资源节流作用。默认是concurrencyLimit取值为-1,即不启用资源节流。

3.3、Spring的线程池ThreadPoolTaskExecutor

上面介绍了Spring默认的线程池simpleAsyncTaskExecutor,但是Spring更加推荐我们开发者使用ThreadPoolTaskExecutor类来创建线程池,其本质是对java.util.concurrent.ThreadPoolExecutor的包装。

这个类则是spring包下的,是Spring为我们开发者提供的线程池类,这里重点讲解这个类的用法。

Spring提供了xml给我们配置ThreadPoolTaskExecutor线程池,但是现在普遍都在用SpringBoot开发项目,所以直接上yaml或者properties配置即可,或者也可以使用@Configuration配置也行,关于使用在3.1讲解@Async的时候其实已经使用,这里在通过配置文件的形式回顾下:
application.properties

# 核心线程池数
spring.task.execution.pool.core-size=5
# 最大线程池数
spring.task.execution.pool.max-size=10
# 任务队列的容量
spring.task.execution.pool.queue-capacity=5
# 非核心线程的存活时间
spring.task.execution.pool.keep-alive=60
# 线程池的前缀名称
spring.task.execution.thread-name-prefix=god-jiang-task-

AsyncScheduledTaskConfig.java

@Configuration
public class AsyncScheduledTaskConfig {@Value("${spring.task.execution.pool.core-size}")private int corePoolSize;@Value("${spring.task.execution.pool.max-size}")private int maxPoolSize;@Value("${spring.task.execution.pool.queue-capacity}")private int queueCapacity;@Value("${spring.task.execution.thread-name-prefix}")private String namePrefix;@Value("${spring.task.execution.pool.keep-alive}")private int keepAliveSeconds;@Beanpublic Executor myAsync() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//最大线程数executor.setMaxPoolSize(maxPoolSize);//核心线程数executor.setCorePoolSize(corePoolSize);//任务队列的大小executor.setQueueCapacity(queueCapacity);//线程前缀名executor.setThreadNamePrefix(namePrefix);//线程存活时间executor.setKeepAliveSeconds(keepAliveSeconds);/*** 拒绝处理策略* CallerRunsPolicy():交由调用方线程运行,比如 main 线程。* AbortPolicy():直接抛出异常。* DiscardPolicy():直接丢弃。* DiscardOldestPolicy():丢弃队列中最老的任务。*/executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());//线程初始化executor.initialize();return executor;}
}

在方法上添加@Async注解,然后还需要在@SpringBootApplication启动类或者@Configuration注解类上 添加注解@EnableAsync启动多线程注解,@Async就会对标注的方法开启异步多线程调用,注意,这个方法的类一定要交给Spring容器来管理

@Component
@EnableAsync
public class ScheduleTask {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");@Async("myAsync")@Scheduled(fixedRate = 2000)public void testScheduleTask() {try {Thread.sleep(6000);System.out.println("Spring1自带的线程池" + Thread.currentThread().getName() + "-" + sdf.format(new Date()));} catch (InterruptedException e) {e.printStackTrace();}}@Async("myAsync")@Scheduled(cron = "*/2 * * * * ?")public void testAsyn() {try {Thread.sleep(1000);System.out.println("Spring2自带的线程池" + Thread.currentThread().getName() + "-" + sdf.format(new Date()));} catch (Exception ex) {ex.printStackTrace();}}
}


以上从运行结果可以看出,自定义ThreadPoolTaskExecutor可以实现线程的复用,而且还能控制好线程数,写出更好的多线程并发程序。

另外需要注意的是:关于注解失效需要注意以下几点

  • 注解的方法必须是public方法
  • 方法一定要从另一个类中调用,也就是从类的外部调用,类的内部调用是无效的,因为**@Transactional和@Async注解的实现都是基于Spring的AOP,而AOP的实现是基于动态代理模式实现的**。那么注解失效的原因就很明显了,有可能因为调用方法的是对象本身而不是代理对象,因为没有经过Spring容器。
  • 异步方法使用注解@Async的返回值只能为void或者Future
    而关于ThreadPoolTaskExecutor的拒绝策略和处理流程其实和ThreadPoolExecutor是一致的,可以从ThreadPoolTaskExecutor配置类配置拒绝策略的时候看出:本质上还是使用ThreadPoolExecutor的拒绝策略

四、参考链接

https://zhuanlan.zhihu.com/p/346086161
https://blog.csdn.net/weixin_43168010/article/details/97613895

https://blog.csdn.net/weixin_43168010/article/details/94436901

ThreadPoolTaskExecutor和ThreadPoolExecutor相关推荐

  1. ThreadPoolTaskExecutor和ThreadPoolExecutor区别

    初学者很容易看错,如果没有看到spring或者JUC源码的人肯定是不太了解的. ThreadPoolTaskExecutor是spring core包中的,而ThreadPoolExecutor是JD ...

  2. 线程池ThreadPoolExecutor详解(整理详细)

    ThreadPoolExecutor 1.什么是线程池? (首先要理解什么是线程) 线程池,thread pool,是一种线程使用模式,线程池维护着多个线程,等待着监督管理者分配可并发执行的任务. 通 ...

  3. Executor 与 ExecutorService 和 Executors 傻傻分不清

    转载自  Executor 与 ExecutorService 和 Executors 傻傻分不清 java.util.concurrent.Executor, java.util.concurren ...

  4. 【译】Executor, ExecutorService 和 Executors 间的不同

    Executor, ExecutorService 和 Executors 间的不同 java.util.concurrent.Executor, java.util.concurrent.Execu ...

  5. SpringCloud多线程链路追踪

    Spring Cloud项目,如果采用sleuth实现链路追踪,使用线程池时建议采用Configuration配置方式实例化线程池Bean, 这样可以用到sleuth链路追踪,输出日志有traceId ...

  6. java springboot 监控线程池的状态

    @Autowiredprivate Executor personInfoTaskExecutor;/*** 监控线程池状态* @return*/@GetMapping("asyncExce ...

  7. Executor, ExecutorService 和 Executors 间的不同

    java.util.concurrent.Executor, java.util.concurrent.ExecutorService, java.util.concurrent. Executors ...

  8. Spring中ThreadPoolTaskExecutor的线程调度及问题

    问题现象 原因分析 任务调度逻辑 汇总分析 解决方案 问题现象 在我们的系统中,使用了这样的配置来开启异步操作: spring配置 <task:annotation-driven executo ...

  9. java多线程 ThreadPoolExecutor 策略的坑

    无论是使用jdk的线程池ThreadPoolExecutor 还是spring的线程池ThreadPoolTaskExecutor 都会使用到一个阻塞队列来进行存储线程任务. 当线程不够用时,则将后续 ...

最新文章

  1. HTML5 实现手机拍照上传
  2. 半径对氢原子基态能级的影响H
  3. 领域设计基本理论知识总结(转)
  4. android 保存流媒体,Android实现使用流媒体播放远程mp3文件的方法
  5. Android学习起步 - 新建工程及相关
  6. opencore0.6.4_心灵终结3.3.4
  7. remote: GitLab: Author ‘xxx‘ is not a member of team(Git修改和配置本地用户名和邮箱)
  8. 【NOIP】关押罪犯
  9. php-fpm配置笔记
  10. php中添加一个链接,使用php在推文中链接一个标签
  11. MySQL创建字段+数据处理函数+汇总数据(聚集函数)+分组数据
  12. linux 运行选择哪个cpu核,判断Linux进程在哪个CPU核运行的方法
  13. java 查找注解_Java利用反射如何查找使用指定注解的类详解
  14. Unity上的Oculus Quest2开发(1) ——首先要空工程能在Quest上跑起来吧
  15. python锁机制_Python并发编程之谈谈线程中的“锁机制”(三)
  16. 【Opps】Navicat 15 注册机出现 “No All Pattern Found File Already Patched? ”怎么办?
  17. 流程图中的实线_流程图符号_流程图中的带箭头的线段代表什么?
  18. sql注入之时间注入
  19. 前端判断是微信浏览器还是qq还是微信浏览器
  20. 明明都保意外,定期寿险和意外险到底区别在哪里?

热门文章

  1. 软考-已得出栈序列,求栈的最小容量。
  2. python代码写开心消消乐
  3. Matlab结果性能评价---std函数(计算矩阵、数组和向量元素标准差)
  4. 转:团队协作效率低?多半是这5大障碍搞的鬼
  5. 做不喜欢但是应该做的事
  6. Lua 定义二维数组
  7. 全球及中国自行车碳纤维车架行业商业模式分析及投资风险预测2022年版
  8. 探索Kotlin的隐性成本-2
  9. 7-2 例4-3游泳池改造预算
  10. 基于51单片机光照强度检测系统