(转)ThreadPoolExecutor最佳实践--如何选择队列
转自: https://blog.hufeifei.cn/2018/08/12/Java/ThreadPoolExecutor%E6%9C%80%E4%BD%B3%E5%AE%9E%E8%B7%B5--%E5%A6%82%E4%BD%95%E9%80%89%E6%8B%A9%E9%98%9F%E5%88%97/
前一篇文章《如何选择线程数》讲了如何决定线程池中线程个数,这篇文章讨论“如何选择工作队列”。
再次强调一下,ThreadPoolExecutor最核心的四点:
1、当有任务提交的时候,会创建核心线程去执行任务(即使有核心线程空闲);
2、当核心线程数达到corePoolSize时,后续提交的都会进BlockingQueue中排队;
3、当BlockingQueue满了(offer失败),就会创建临时线程(临时线程空闲超过一定时间后,会被销毁);
4、当线程总数达到maximumPoolSize时,后续提交的任务都会被RejectedExecutionHandler拒绝。
1、BlockingQueue
线程池中工作队列由BlockingQueue实现类提供功能,BlockingQueue定义了这么几组方法:
Throws exception | Special value | Blocks | Times out | |
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove | remove() | poll() | take() | poll(time, unit) |
Examine | element() | peek() | not applicable | not applicable |
阻塞队列是最典型的“生产者消费者”模型:
- 生产者调用put()方法将生产的元素入队,消费者调用take()方法;
- 当队列满了,生产者调用的put()方法会阻塞,直到队列有空间可入队;
- 当队列为空,消费者调用的get()方法会阻塞,直到队列有元素可消费;
但是需要十分注意的是:ThreadPoolExecutor提交任务时使用offer方法(不阻塞),工作线程从队列取任务使用take方法(阻塞)。正是因为ThreadPoolExecutor使用了不阻塞的offer方法,所以当队列容量已满,线程池会去创建新的临时线程;同样因为工作线程使用take()方法取任务,所以当没有任务可取的时候线程池的线程将会空闲阻塞。
事实上,工作线程的超时销毁是调用
offer(e, time, unit)
实现的。
2、JDK提供的阻塞队列实现
JDK中提供了以下几个BlockingQueue实现类:
2.1、ArrayBlockingQueue
这是一个由数组实现的容量固定的有界阻塞队列。这个队列的实现非常简单:
Copy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
private void enqueue(E x) {final Object[] items = this.items;items[putIndex] = x; // 入队if (++putIndex == items.length) // 如果指针到了末尾putIndex = 0; // 下一个入队的位置变为0count++;notEmpty.signal(); // 提醒消费者线程消费 } private E dequeue() {final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null; // 出队置空if (++takeIndex == items.length) // 如果指针到了末尾takeIndex = 0; // 下一个出队的位置变为0count--;if (itrs != null)itrs.elementDequeued();notFull.signal(); // 提醒生产者线程生产return x; } |
通过简单的指针循环实现了一个环形队列:
下面有一张维基百科关于环形缓冲区的的动画,虽然动画描述内容与ArrayBlockingQueue实现有所差异,但贵在生动形象(着实找不到更好的动画了)。
ArrayBlockingQueue主要复杂在迭代,允许迭代中修改队列(删除元素时会更新迭代器),并不会抛出ConcurrentModificationException;好在大多数场景中我们不会迭代阻塞队列。
2.2、SynchronousQueue
这是一个非常有意思的集合,更准确的说它并不是一个集合容器,因为它没有容量。你可以“偷偷地”把它看作new ArrayBlockingQueue(0)
,之所以用”偷偷地”这么龌龊的词,首先是因为ArrayBlockingQueue
在capacity<1
时会抛异常,其次ArrayBlockingQueue(0)
并不能实现SynchronousQueue
这么强大的功能。
正如SynchronousQueue的名字所描述一样——“同步队列”,它专门用于生产者线程与消费者线程之间的同步:
- 因为它任何时候都是空的,所以消费者线程调用take()方法的时候就会发生阻塞,直到有一个生产者线程生产了一个元素,消费者线程就可以拿到这个元素并返回。
- 同样的,你也可以认为任何时候都是满的,所以生产者线程调用put()方法的时候就会发生阻塞,直到有一个消费者线程消费了一个元素,生产者才会返回。
另外还有几点需要注意:
- SynchronousQueue不能遍历,因为它没有元素可以遍历;
- 所有的阻塞队列都不允许插入null元素,因为当生产者生产了一个null的时候,消费者调用poll()返回null,无法判断是生产者生产了一个null元素,还是队列本身就是空。
CachedThreadPool使用的就是同步队列:
Copy
1 2 3 4 5 |
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); } |
因为SynchronousQueue无容量的特性,所以CachedThreadPool不会对任务进行排队,如果线程池中没有空闲线程,CachedThreadPool会立即创建一个新线程来接收这个任务。
所以使用CachedThreadPool要注意避免提交长时间阻塞的任务,可能会由于线程数过多而导致内存溢出(OutOfOutOfMemoryError)。
2.3、LinkedBlockingQueue
这是一个由单链表实现的默认无界的阻塞队列。LinkedBlockingQueue提供了一个可选有界的构造函数,而在未指明容量时,容量默认为Integer.MAX_VALUE。
按照官方文档的说法LinkedBlockingQueue是一种可选有界(optionally-bounded)阻塞队列。
SingleThreadPool和FixedThreadPool使用的就是LinkedBlockingQueue
Copy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory); } public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory)); } |
因为FixedThreadPool使用无界的LinkedBlockingQueue,所以当没有线程空闲时,新提交的任务都会提交到阻塞队列中,由于队列永远也不会满,FixedThreadPool永远也不会创建新的临时线程。
但是需要注意的是,不要往FixedThreadPool提交过多的任务,因为所有未处理的任务都会到LinkedBlockingQueue中排队,队列中任务过多也可能会导致内存溢出。虽然这个过程会比较缓慢,因为队列中的请求所占用的资源比线程占用的资源要少得多。
2.4、其他队列
DelayQueue和PriorityBlockingQueue底层都是使用二叉堆实现的优先级阻塞队列。
区别在于:
- 前者要求队列中的元素实现Delayed接口,通过执行时延从队列中提取任务,时间没到任务取不出来;
- 后者对元素没有要求,可以实现Comparable接口也可以提供Comparator来对队列中的元素进行比较,跟时间没有任何关系,仅仅是按照优先级取任务。
当我们提交的任务有优先顺序时可以考虑选用这两种队列
事实上ScheduledThreadPoolExecutor内部实现了一个类似于DelayQueue的队列。
除了这两个,BlockingQueue还有两个子接口BlockingDeque(双端阻塞队列),TransferQueue(传输队列)
并且两个接口都有自己唯一的实现类:
- LinkedBlockingDeque:使用双向队列实现的双端阻塞队列,双端意味着可以像普通队列一样FIFO(先进先出),可以以像栈一样FILO(先进后出)
- LinkedTransferQueue:它是ConcurrentLinkedQueue、LinkedBlockingQueue和SynchronousQueue的结合体,但是把它用在ThreadPoolExecutor中,和无限制的LinkedBlockingQueue行为一致。
3、让生产者阻塞的线程池
前面说到CachedThreadPool和FixedThreadPool都有可能导致内存溢出,前者是由于线程数过多,后者是由于队列任务过多。而究其根本就是因为任务生产速度远大于线程池处理任务的速度。
所以有一个想法就是让生产任务的线程在任务处理不过来的时候休息一会儿——也就是阻塞住任务生产者。
但是前面提到过ThreadPoolExecutor内部将任务提交到队列时,使用的是不阻塞的offer方法。
我提供的第一种方式是:重写offer方法把它变成阻塞式。
3.1、重写BlockingQueue的offer
这种处理方式是将原来非阻塞的offer覆盖,使用阻塞的put方法实现。
Copy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
public class ThreadPoolTest {private static class Task implements Runnable {private int taskId;Task(int taskId) {this.taskId = taskId;}@Override public void run() {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException ignore) {}System.out.println("task " + taskId + " end");}}public static void main(String[] args) {ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(2) {@Override public boolean offer(Runnable runnable) {try {super.put(runnable); // 使用阻塞的put重写offer方法} catch (InterruptedException e) {e.printStackTrace();}return true;}});threadPool.submit(new Task(1));System.out.println("task 1 submitted");threadPool.submit(new Task(2));System.out.println("task 2 submitted");threadPool.submit(new Task(3));System.out.println("task 3 submitted");threadPool.submit(new Task(4));System.out.println("task 4 submitted");threadPool.submit(new Task(5));System.out.println("task 5 submitted");threadPool.submit(new Task(6));System.out.println("task 6 submitted");threadPool.shutdown();}} |
执行的过程中会发现Task5要等到线程池中的一个任务执行完成后,才能提交成功。
这种方式把BlockingQueue的行为修改了,这时线程池的maximumPoolSize形同虚设,因为ThreadPoolExecutor调用offer入队失败返回false后才会创建临时线程。现在offer改成了阻塞式的,实际上永远是返回true,所以永远都不会创建临时线程,maximumPoolSize的限制也就没有什么意义了。
3.2、重写拒绝策略
在介绍第二种方式之前,先简单介绍JDK中提供了四种拒绝策略:
- AbortPolicy——抛出RejectedExecutionException异常的方式拒绝任务。
- DiscardPolicy——什么都不干,静默地丢弃任务
- DiscardOldestPolicy——把队列中最老的任务拿出来扔掉
- CallerRunsPolicy——在任务提交的线程把任务给执行了
ThreadPoolExecutor默认使用AbortPolicy
DiscardPolicy和DiscardOldestPolicy两种策略看上去都不怎么靠谱,除非真有这种特别的需求,比如客户端应用中网络请求拥堵(服务端宕机或网络不通畅)的话可以选择抛弃最老的请求,大多数情况还是使用默认的拒绝策略。
我们的第二种做法就是写一个自己的RejectedExecutionHandler。这种方式相对“温柔”一些,在线程池提交任务的最后一步——被线程池拒绝的任务,可以在拒绝后调用队列的put()
方法,让任务的提交者阻塞,直到队列中任务被被线程池执行后,队列有了多余空间,调用方才返回。
Copy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
public class ThreadPoolTest {private static class Task implements Runnable {private int taskId;Task(int taskId) {this.taskId = taskId;}@Overridepublic void run() {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException ignore) {}System.out.println("task " + taskId + " end");}}private static class BlockCallerPolicy implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {executor.getQueue().put(r);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(2), new BlockCallerPolicy());threadPool.submit(new Task(1));System.out.println("task 1 submitted");threadPool.submit(new Task(2));System.out.println("task 2 submitted");threadPool.submit(new Task(3));System.out.println("task 3 submitted");threadPool.submit(new Task(4));System.out.println("task 4 submitted");threadPool.submit(new Task(5));System.out.println("task 5 submitted");threadPool.submit(new Task(6));System.out.println("task 6 submitted");threadPool.shutdown();}} |
使用这种方式的好处是线程池仍可以设置maximumPoolSize,当任务入队失败仍可以创建临时线程执行任务,只有当线程总数大于maximumPoolSize时,任务才会被拒绝。
4、Tomcat中的线程池
作为一个最常用的Java应用服务器之一,Tomcat中线程池还是值得我们借鉴学习的。
注意下面代码来自Tomcat8.5.27,版本不同实现可能略有差异
org.apache.catelina.core.StandardThreadExecutor
Copy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
public class StandardThreadExecutor extends LifecycleMBeanBaseimplements Executor, ResizableExecutor {// Tomcat线程池默认的配置protected int threadPriority = Thread.NORM_PRIORITY;protected boolean daemon = true;protected String namePrefix = "tomcat-exec-";protected int maxThreads = 200;protected int minSpareThreads = 25;protected int maxIdleTime = 60000;...protected boolean prestartminSpareThreads = false;protected int maxQueueSize = Integer.MAX_VALUE;protected void startInternal() throws LifecycleException {// 任务队列:这里你看到的是一个无界队列,但是队列里面进行了特殊处理taskqueue = new TaskQueue(maxQueueSize);TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());// 创建线程池,这里的ThreadPoolExecutor是Tomcat继承自JDK的ThreadPoolExecutorexecutor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), // 核心线程数与最大线程数maxIdleTime, TimeUnit.MILLISECONDS, // 默认6万毫秒的超时时间,也就是一分钟taskqueue, tf); // 玄机在任务队列的设置executor.setThreadRenewalDelay(threadRenewalDelay);if (prestartminSpareThreads) {executor.prestartAllCoreThreads(); // 预热所有的核心线程}taskqueue.setParent(executor);setState(LifecycleState.STARTING);}... } |
org.apache.tomcat.util.threads.ThreadPoolExecutor
Copy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {private final AtomicInteger submittedCount = new AtomicInteger(0);private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);@Overrideprotected void afterExecute(Runnable r, Throwable t) {submittedCount.decrementAndGet(); // 执行完成后提交数量减一if (t == null) {// 如果有必要抛个异常让线程终止stopCurrentThreadIfNeeded();}}@Overridepublic void execute(Runnable command) {execute(command,0,TimeUnit.MILLISECONDS);}public void execute(Runnable command, long timeout, TimeUnit unit) {submittedCount.incrementAndGet(); // 提交时数量加一try {super.execute(command);} catch (RejectedExecutionException rx) {if (super.getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue)super.getQueue();try {// 如果任务被拒绝,则强制入队if (!queue.force(command, timeout, unit)) {// 由于TaskQueue默认无界,所以默认强制入队会成功submittedCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.");}} catch (InterruptedException x) {submittedCount.decrementAndGet(); // 任务被拒绝,任务数减一throw new RejectedExecutionException(x);}} else {submittedCount.decrementAndGet(); // 任务被拒绝,任务数减一throw rx;}}} } |
org.apache.tomcat.util.threads.TaskQueue
Copy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
public class TaskQueue extends LinkedBlockingQueue<Runnable> {private volatile ThreadPoolExecutor parent = null;public boolean force(Runnable o) {if ( parent==null || parent.isShutdown() )throw new RejectedExecutionException("Executor not running, can't force a command into the queue");// 因为LinkedBlockingQueue无界,所以调用offer强制入队return super.offer(o);}public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if ( parent==null || parent.isShutdown() )throw new RejectedExecutionException("Executor not running, can't force a command into the queue");return super.offer(o,timeout,unit);}@Overridepublic boolean offer(Runnable o) {// 不是上面Tomcat中定义地ThreadPoolExecutor,不做任何检查if (parent==null) return super.offer(o);// 线程数达到最大线程数,尝试入队if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);// 提交的任务数小于线程数,也就是有空余线程,入队让空闲线程取任务if (parent.getSubmittedCount() < parent.getPoolSize()) return super.offer(o);// 走到这说明线程池没有空闲线程// 这里返回false,改变了LinkedBlockingQueue默认的行为// 使得Tomcat可以创建临时线程if (parent.getPoolSize() < parent.getMaximumPoolSize()) return false;// 到这里说明临时线程也没有空闲,只能排队了return super.offer(o);} } |
Tomcat的线程池扩展了JDK线程池的功能,主要体现在两点:
- Tomcat的ThreadPoolExecutor使用的TaskQueue,是无界的LinkedBlockingQueue,但是通过taskQueue的offer方法覆盖了LinkedBlockingQueue的offer方法,改写了规则,使得线程池能在任务较多的情况下增长线程池数量——JDK是先排队再涨线程池,Tomcat则是先涨线程池再排队。
- Tomcat的ThreadPoolExecutor改写了execute方法,当任务被reject时,捕获异常,并强制入队。
参考链接:
支持生产阻塞的线程池 :http://ifeve.com/blocking-threadpool-executor/
Disruptor框架:http://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf
线程池调整的重要性:https://blog.bramp.net/post/2015/12/17/the-importance-of-tuning-your-thread-pools/
线程池调整的重要性(译):http://www.importnew.com/17633.html
SynchronousQueue与TransferQueue的区别:https://stackoverflow.com/questions/7317579/difference-between-blockingqueue-and-transferqueue/7317650
Tomcat配置线程池:https://tomcat.apache.org/tomcat-8.5-doc/config/executor.html
(转)ThreadPoolExecutor最佳实践--如何选择队列相关推荐
- ThreadPoolExecutor最佳实践--如何选择线程数
去年的一篇<ThreadPoolExecutor详解>大致讲了ThreadPoolExecutor内部的代码实现. 总结一下,主要有以下四点: 当有任务提交的时候,会创建核心线程去执行任务 ...
- websphere mq_最佳实践:WebSphere MQ共享队列和应用程序
websphere mq 关于IBM®WebSphere®MQ共享队列的优点,尤其是它们提供消息的连续可用性的能力,已有很多论述. 但是,在规划和实施过程中,出现了一些有关最佳使用共享队列及其对应用程 ...
- 分布式 PostgreSQL 集群(Citus),分布式表中的分布列选择最佳实践
确定应用程序类型 在 Citus 集群上运行高效查询要求数据在机器之间正确分布.这因应用程序类型及其查询模式而异. 大致上有两种应用程序在 Citus 上运行良好.数据建模的第一步是确定哪些应用程序类 ...
- 选择嵌入式分析提供商的五个最佳实践
越来越多的用户都期望在企业应用程序中嵌入数据分析和可视化.然而并非所有的分析平台都可以直接嵌入到其他应用之中.企业应用的技术产品经理应使用本文介绍的最佳实践,选择合适的嵌入式数据分析提供商. 摘要 关 ...
- 活动目录最佳实践分析器
这个功能在Windows Server 2008 R2 上就有了,尽管不可能经常使用它,但这个功能还是不错的,尤其对刚建的域: 1.打开"服务器管理器":选择"AD DS ...
- 如何用深度学习做自然语言处理?这里有份最佳实践清单
如何用深度学习做自然语言处理?这里有份最佳实践清单 By 机器之心2017年7月26日 14:16 对于如何使用深度学习进行自然语言处理,本文作者 Sebastian Ruder 给出了一份详细的最佳 ...
- 基于消息队列 RocketMQ 的大型分布式应用上云最佳实践
简介:Apache RocketMQ 作为阿里巴巴开源的支撑万亿级数据洪峰的分布式消息中间件,在众多行业广泛应用.在选型过程中,开发者一定会关注开源版与商业版的业务价值对比. 那么,今天就围绕着商业版 ...
- JAVA应用开发MQ实战最佳实践——Series2:消息队列RocketMQ性能测试案例
简介:JAVA应用开发MQ实战最佳实践--Series2:消息队列RocketMQ性能测试案例 往期内容 JAVA应用开发MQ实战最佳实践--Series1:RocketMQ综述及代码设计 1. 消息 ...
- Elastic Stack最佳实践系列:Beats->ES,一个更轻型的架构选择
作者:李捷,Elastic首席云解决方案架构师 ELK生态下,构建日志分析系统的选择 说起开源的日志分析系统,ELK几乎无人不晓,这个生态并非是Elastic特意而为,毕竟Elasticsearch的 ...
最新文章
- numpy——hsplit()、vsplit()函数的详细使用
- windows系统下实现Redis的配置与连接操作
- ORA-01940:cannot drop a user that is currently connected
- 浪潮服务器系统套件,浪潮服务器随机套件版本列表
- 20、淘宝技术这十年
- Spring Security基本原理
- passwd命令限制用户密码到期时间
- 噪音通道模型_噪声信道模型(NoiseChannelModel)
- UE4搭建场景与特效文档—地形、水体、植被、雨雾效果
- 功率因数 matlab,基于Matlab的功率因数校正电路的仿真分析
- VBA:获取工作簿中所有表的名称、地址
- java poi 段落行间距,Apache POI Word - 段落( Paragraph)
- java jar包资源文件_深入jar包:从jar包中读取资源文件
- MapReduce任务卡在Running Job状态的多种解决方法
- 小伙因家人“催催催” 欲轻生 民警苦劝将其救下
- Stata数据处理:xtbalance-非平衡面板之转换
- ubuntu 18.04 新配主机 无wifi适配器 找不到wifi问题解决
- WEB网页短信系统建设方案
- 中柏ezpad4s_大块头双重性格 中柏EZpad 4s时尚版评测
- html 的table表格和iview的Table加斑马纹
热门文章
- LG P4899 [IOI2018] werewolf 狼人(kruskal重构树,二维数点)
- Loj #6089. 小 Y 的背包计数问题
- D - ABC Conjecture Gym - 102798D
- P2607 [ZJOI2008]骑士
- [CQOI2015]选数(数论分块+杜教筛)
- 洛谷P2056:[ZJOI2007]捉迷藏(点分树、STL)
- CF39C-Moon Craters【dp】
- CF9D-How many trees?【dp】
- P1447-[NOI2010]能量采集【GCD,数论,容斥】
- P1090-合并果子【离散化,队列,时间复杂度O(n)】