作者:JingQ

https://www.sevenyuan.cn

在java中,使用线程时通过new Thread实现很简单,但是如果并发数量很多时,频繁地创建线程就会大大降低系统的效率。

所以可以通过线程池,使得线程可以复用,每执行完一个任务,并不是被销毁,而是可以继续执行其他任务。

花了两天时间去看了高洪岩写的《JAVA并发编程》,是想要知其然,知其所以然,在使用的情况下,了解学习了一下原理记录下java.util.concurrent并发包下的ThreadPoolExecutor特性和实现

使用示例

粗暴点,我们直接看如何使用吧

(一)使用Executors

简单举个????:

Executors.newCachedThreadPool();        //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor();   //创建容量为1的缓冲池
Executors.newFixedThreadPool(int);    //创建固定容量大小的缓冲池

具体实现逻辑:

public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}

通过该Executors的静态方法进行线程池的创建,而且从具体实现来看,还是调用了new ThreadPoolExecutor(),只是内部参数已经帮我们配置好了。

(二) 使用ThreadPoolExecutor

既然真正实现都是用ThreadPoolExecutor,那就自己设定好方法的参数吧。

public static void main(String[] args) {ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.HOURS, new LinkedBlockingDeque<>());for(int i=0;i<10;i++){MyTask myTask = new MyTask(i);executor.execute(myTask);System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+executor.getQueue().size()+",已执行完别的任务数目:"+executor.getCompletedTaskCount());}executor.shutdown();}static class MyTask implements Runnable {private int taskNum;public MyTask(int num) {this.taskNum = num;}@Overridepublic void run() {System.out.println("正在执行task "+taskNum);try {Thread.sleep(4000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("task "+taskNum+"执行完毕");}}

打印效果如下:

正在执行task 0
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行完别的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 1
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 2
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 3
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 4
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:5,已执行完别的任务数目:0
task 2执行完毕
task 0执行完毕
task 3执行完毕
task 1执行完毕
正在执行task 8
task 4执行完毕
正在执行task 7
正在执行task 6
正在执行task 5
正在执行task 9
task 8执行完毕
task 6执行完毕
task 7执行完毕
task 5执行完毕
task 9执行完毕

任务Task提交之后,由于是多线程状态下,所以打印效果并不是同步的,可以看出任务都已经顺利执行。

我这个实现参数是5个corePoolSize核心线程数和5个maximumPoolSize最大线程数,当线程池中的线程数超过5个的时候,将新来的任务放进缓存队列中,小伙伴可以试下把任务数(for循环的个数)提高一点,让缓存等待的任务数超过5个,看看默认的任务拒绝策略(AbortPolicy)会抛出什么错误hhh

下面来看看ThreadPoolExecutor的庐山真面目吧~


ThreadPoolExecutor

它有以下四个构造方法:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler);}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

从构造方法可以看出,前三个方法最终都是调用第四个构造器进行初始化工作的。

参数解释:

  • corePoolSize:池中保持的线程数,包括空闲的线程,也就是核心池的大小

  • maximumPoolSize:池中锁允许最大线程数

  • keepAliveTime:当线程数量超过corePoolSize,在没有超过指定的时间内不从线程池中删除,如果超过该时间,则删除

  • unit:keepAliveTime的时间单位

  • workQueue:执行前用来保存任务的队列,此队列只保存由execute方法提交的Runnable任务

workQueue(任务队列,是一个阻塞队列)

ArrayBlockingQueue:

public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull =  lock.newCondition();}

LinkedBlockingDeque:(支持列头和列尾操作,pollFirst/pollLast)

public LinkedBlockingDeque() {this(Integer.MAX_VALUE);}public LinkedBlockingDeque(int capacity) {if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;}

从源码构造函数可以看到,不传参数的时候,默认阻塞队列中的大小是Integer.MAX_VALUE;

SynchronousQueue:

public SynchronousQueue() {this(false);}public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();}

Array和Linked在传入大小小于0时将会报错,比较常用的是LinkedBlockingDeque和SynchronousQueue,线程池的排队策略与BlockingQueue有关。

ThreadFactory:线程工厂

主要用来创建线程,可以在newThread()方法中自定义线程名字和设置线程异常情况的处理逻辑。

举个????:

static class MyThreadFactory implements ThreadFactory {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread();thread.setName("JingQ" + new Date());thread.setUncaughtExceptionHandler((t, e) -> {doSomething();e.printStackTrace();});return thread;}}
handler:拒绝策略

有以下四种:

  • ThreadPoolExecutor.AbortPolicy:当任务添加到线程中被拒绝时,它会抛出RejectedExecutionException异常。

  • ThreadPoolExecutor.DiscardPolicy:任务被拒绝时,线程池丢弃被拒绝的任务

  • ThreadPoolExecutor.DiscardOldestPolicy:任务被拒绝时,线程池会放弃等待队列中最旧的未处理文物,然后将被拒绝的任务添加到等待队列中

  • ThreadPoolExecutor.CallerRunsPolicy:任务被拒绝时,会使用调用线程池的Thread线程对象处理被拒绝的任务


ThreadPoolExecutor继承结构

可以看出,实际上ThreadPoolExecutor是继承了AbstractExecutorService类和引用了ExecutorService、Executor接口。

AbstractExecutorService

public abstract class AbstractExecutorService implements ExecutorService {protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };public Future<?> submit(Runnable task) {};public <T> Future<T> submit(Runnable task, T result) { };public <T> Future<T> submit(Callable<T> task) { };private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)throws InterruptedException, ExecutionException, TimeoutException {};public <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException {};public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {};public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {};public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException {};
}

AbstarctExecutorService是一个抽象类,它实现的是ExecutorService接口

ExecutorService

public interface ExecutorService extends Executor {void shutdown();boolean isShutdown();boolean isTerminated();boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}

接口ExecutorService引用了Executor接口,Executor接口比较简单,只有一个execute方法定义

Executor

public interface Executor {void execute(Runnable command);
}

小结:

Executor是一个顶级接口,定义了一个execute方法,返回值为空,参数为Runnable。

ExecutorService继承了Executor并且定义了其它一些方法,结果如下图:

抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法。

最后ThreadPoolExecutor继承了AbstractExecutorService,我们最常用到它两个方法,submit和execute,下面介绍一下这两者:

execute():
public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** (以下是个人渣翻译,有误请轻喷~)* 有以下三步流程:** 1. 如果少于核心池大小的线程正在运行,* 那么尝试以给定的命令作为它的第一个任务启动一个新线程。* 调用添加worker原子性检查运行状态和workder的数量,* 这样可以防止错误警报在不应该返回的情况下添加线程,返回false。** 2. 如果一个任务可以成功地排队,那么我们仍然需要再次检查是否应该添加一个线程* (因为现有的线程在上次检查后死亡),或者是在该方法进入后关闭了池。* 因此,我们重新检查状态,如果必要的话,如果停止的话,需要回滚队列。* 如果没有新的线程,就去启动它** 3. 如果我们不能排队任务,那么我们尝试添加一个新线程。* 如果失败了,我们知道任务队列已经被关闭或饱和,所以拒绝这个任务。*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}
submit:public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}

小结

execute()方法在ThreadPoolExecutor中进行了重写,submit()方法是在AbstractExecutorService实现的,ThreadPoolExecutor并没有重写,并且execute方法是没有返回结果的,submit的返回类型是Future,能够获得任务的结果,但是实际执行的还是execute方法。

当然,还有例如shutdown、getQueue、getActiveCount、getPoolSize等方法没有介绍到,推荐胖友们打开IDE进行查看吧~

ps:关于线程池的原理并未深入记录,有关它的任务拒绝策略、线程初始化、ThreadPoolExecutor构造之后,当任务超过设定值,它的执行策略等原理都值得去深入学习,下回记录~

推荐好文

强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!分享一套基于SpringBoot和Vue的企业级中后台开源项目,代码很规范!
能挣钱的,开源 SpringBoot 商城系统,功能超全,超漂亮!

Java多线程:ThreadPoolExecutor初探相关推荐

  1. java多线程 ThreadPoolExecutor 策略的坑

    无论是使用jdk的线程池ThreadPoolExecutor 还是spring的线程池ThreadPoolTaskExecutor 都会使用到一个阻塞队列来进行存储线程任务. 当线程不够用时,则将后续 ...

  2. 转: java多线程-ThreadPoolExecutor的拒绝策略RejectedExecutionHandler

    转自:  https://blog.csdn.net/qq_25806863/article/details/71172823 概述 原文地址 http://blog.csdn.net/qq_2580 ...

  3. JAVA多线程之扩展ThreadPoolExecutor

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  4. Java多线程读取本地照片为二进制流,并根据系统核数动态确定线程数

    Java多线程读取图片内容并返回 1. ExecutorService线程池 2. 效率截图 3. 源码 1. ExecutorService线程池 ExecutorService线程池,并可根据系统 ...

  5. java多线程抽奖_java 线程池、多线程并发实战(生产者消费者模型 1 vs 10) 附案例源码...

    导读 前二天写了一篇<Java 多线程并发编程>点我直达,放国庆,在家闲着没事,继续写剩下的东西,开干! 线程池 为什么要使用线程池 例如web服务器.数据库服务器.文件服务器或邮件服务器 ...

  6. 300 行代码带你搞懂 Java 多线程!

    线程 线程的概念,百度是这样解释的: 线程(英语:Thread)是操作系统能够进行运算调度的最小单位.它被包含在进程之中,是进程中的实际运作单位.一条线程指的是进程中一个单一顺序的控制流,一个进程中可 ...

  7. Java多线程面试准备:聊聊Executor框架

    点击上方"好好学java",选择"置顶公众号" 优秀学习资源.干货第一时间送达! 精彩内容 java实战练习项目教程 2018微服务资源springboot.s ...

  8. java多线程学习-java.util.concurrent详解

    http://janeky.iteye.com/category/124727 java多线程学习-java.util.concurrent详解(一) Latch/Barrier 博客分类: java ...

  9. Java多线程知识小抄集(四)——完结

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  10. Java多线程知识小抄集(三)

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

最新文章

  1. python中实现延时回调普通函数示例代码
  2. 轻松查看Internet Explorer缓存文件
  3. Entity Framework 6.3 和 EF Core 3.0 路线图
  4. TOP命令监视系统任务及掩码umask的作用
  5. matlab判断向量组线性相关性的三种方法
  6. python open函数关于w+ r+ 读写操作的理解
  7. swf文件转mp4视频格式的失败与成功
  8. c语言erf函数,erf_数值 | Numerics_C_参考手册_非常教程
  9. cocos creator js 获取屏幕宽度
  10. JPG怎么批量转换成PDF
  11. 一道我根本猜不出来的Trajan
  12. 高动态范围红外图像压缩
  13. Unity pc端内嵌网页插件Embedded Browser基本使用流程
  14. 一起学英语第二季第五期
  15. 【OpenCV】人脸旋转角度计算
  16. JAVA大华摄像头抓拍与API接口集成
  17. Win11家庭版如何安装组策略编辑器
  18. 干货分享:常见的测试类型有哪些?
  19. AI贺新年,开发者的虎年这样过才有意思
  20. 盖茨基金会:全球至少要到2108年才能实现性别平等,比期望晚了三代人 | 美通社头条...

热门文章

  1. org.springframework.dao.DataIntegrityViolationException: Error attempting to get column
  2. %.1f%%在python中格式化输出表示什么形式
  3. 准备安装knoppix
  4. 【ie兼容】判断ie版本跳转到指定页面下载浏览器
  5. TCS三步构思——基调、内容、结构
  6. 第十一章 人类复杂疾病与计算系统生物学
  7. 实时的空号检测API,稳定可靠
  8. 计算机英语backup,计算机英语:关于数据丢失
  9. hive 如何解析包含 json 的字符串字段
  10. java应用程序ui设计,Android应用UI设计流程