【多线程】线程池拒绝策略详解与自定义拒绝策略
线程池的拒绝策略
ThreadPoolExecutor内部有实现4个拒绝策略,默认为AbortPolicy策略
- CallerRunsPolicy:由调用execute方法提交任务的线程来执行这个任务
- AbortPolicy:抛出异常RejectedExecutionException拒绝提交任务
- DiscardPolicy:直接抛弃任务,不做任何处理
- DiscardOldestPolicy:去除任务队列中的第一个任务,重新提交
线程池中,有三个重要的参数,决定影响了拒绝策略:corePoolSize - 核心线程数,也即最小的线程数。workQueue - 阻塞队列 。 maximumPoolSize - 最大线程数
当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到 maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。
总结起来,也就是一句话,当提交的任务数大于(workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略。
拒绝策略的源码
CallerRunsPolicy
/*** A handler for rejected tasks that runs the rejected task* directly in the calling thread of the {@code execute} method,* unless the executor has been shut down, in which case the task* is discarded.* 用于拒绝任务的处理程序,* 可以直接在{@code execute}方法的调用线程中运行被拒绝的任务* 除非执行器已被关闭,否则将丢弃该任务。*/public static class CallerRunsPolicy implements RejectedExecutionHandler {/*** Creates a {@code CallerRunsPolicy}.* 创建一个{@code CallerRunsPolicy}。*/public CallerRunsPolicy() { }/*** Executes task r in the caller's thread, unless the executor* has been shut down, in which case the task is discarded.* 除非执行器已关闭,否则在调用者线程中执行任务,* r 在这种情况下,该任务将被丢弃。** @param r the runnable task requested to be executed* r 请求执行的可运行任务* @param e the executor attempting to execute this task* e 尝试执行此任务的执行者*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}}
分析:
CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)
这样生产者虽然没有被阻塞,但提交任务也会被暂停。
但这种策略也有隐患,当生产者较少时,生产者消费任务的时间里,消费者可能已经把任务都消费完了,队列处于空状态,当生产者执行完任务后才能再继续生产任务,这个过程中可能导致消费者线程的饥饿。
AbortPolicy
/*** A handler for rejected tasks that throws a* {@code RejectedExecutionException}.* 抛出{@code RejectedExecutionException}的拒绝任务处理程序。*/public static class AbortPolicy implements RejectedExecutionHandler {/*** Creates an {@code AbortPolicy}.*/public AbortPolicy() { }/*** Always throws RejectedExecutionException.* 总是抛出RejectedExecutionException* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task* @throws RejectedExecutionException always*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}
分析:
该策略是默认饱和策略。
使用该策略时在饱和时会抛出RejectedExecutionException(继承自RuntimeException),调用者可捕获该异常自行处理。
DiscardPolicy
/*** A handler for rejected tasks that silently discards the* rejected task.* 拒绝任务的处理程序,默认丢弃拒绝任务。*/public static class DiscardPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardPolicy}.*/public DiscardPolicy() { }/*** Does nothing, which has the effect of discarding task r.* 不执行任何操作,这具有丢弃任务 r 的作用。* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}
分析:
如代码所示,不做任何处理直接抛弃任务
DiscardOldestPolicy
/*** A handler for rejected tasks that discards the oldest unhandled* request and then retries {@code execute}, unless the executor* is shut down, in which case the task is discarded.* 处理被拒绝任务的处理程序,它丢弃最旧的未处理请求,* 然后重试{@code execute},* 除非执行器*被关闭,在这种情况下,该任务将被丢弃。*/public static class DiscardOldestPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardOldestPolicy} for the given executor.*/public DiscardOldestPolicy() { }/*** Obtains and ignores the next task that the executor* would otherwise execute, if one is immediately available,* and then retries execution of task r, unless the executor* is shut down, in which case task r is instead discarded.* 获取并忽略执行者*会立即执行的下一个任务(如果一个任务立即可用),* 然后重试任务r的执行,除非执行者*被关闭,在这种情况下,任务r会被丢弃。* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}}
分析:
如代码,先将阻塞队列中的头元素出队抛弃,再尝试提交任务。如果此时阻塞队列使用PriorityBlockingQueue优先级队列,将会导致优先级最高的任务被抛弃,因此不建议将该种策略配合优先级队列使用。
自定义策略
看完发现默认的几个拒绝策略并不是特别的友好,那么可不可以咱们自己搞个呢?
可以发现,所有的拒绝策略都是实现了 RejectedExecutionHandler 接口
public interface RejectedExecutionHandler {/*** Method that may be invoked by a {@link ThreadPoolExecutor} when* {@link ThreadPoolExecutor#execute execute} cannot accept a* task. This may occur when no more threads or queue slots are* available because their bounds would be exceeded, or upon* shutdown of the Executor.** <p>In the absence of other alternatives, the method may throw* an unchecked {@link RejectedExecutionException}, which will be* propagated to the caller of {@code execute}.** @param r the runnable task requested to be executed* @param executor the executor attempting to execute this task* @throws RejectedExecutionException if there is no remedy*/void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
这个接口只有一个 rejectedExecution 方法。
r 为待执行任务;executor 为线程池;方法可能会抛出拒绝异常。
那么咱们就可以通过实现 RejectedExecutionHandler 接口扩展
两个栗子:一
netty自己实现的线程池里面私有的一个拒绝策略。单独启动一个新的临时线程来执行任务。
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {final Thread t = new Thread(r, "Temporary task executor");t.start();} catch (Throwable e) {throw new RejectedExecutionException("Failed to start a new thread", e);}}}
两个栗子:二
dubbo的一个例子,它直接继承的 AbortPolicy ,加强了日志输出,并且输出dump文件
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {String msg = String.format("Thread pool is EXHAUSTED!" +" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),url.getProtocol(), url.getIp(), url.getPort());logger.warn(msg);dumpJStack();throw new RejectedExecutionException(msg);}
}
自己玩
参考类似的思路,最简单的做法,我们可以直接定义一个RejectedExecutionHandler,当队列满时改为调用BlockingQueue.put来实现生产者的阻塞:
new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {if (!executor.isShutdown()) {try {executor.getQueue().put(r);} catch (InterruptedException e) {// should not be interrupted}}}};
这样,我们就无需再关心Queue和Consumer的逻辑,只要把精力集中在生产者和消费者线程的实现逻辑上,只管往线程池提交任务就行了。
相比最初的设计,这种方式的代码量能减少不少,而且能避免并发环境的很多问题。当然,你也可以采用另外的手段,例如在提交时采用信号量做入口限制等,但是如果仅仅是要让生产者阻塞,那就显得复杂了。
总结
四种线程池拒绝策略,具体使用哪种策略,还得根据实际业务场景才能做出抉择。
【多线程】线程池拒绝策略详解与自定义拒绝策略相关推荐
- 并发编程五:java并发线程池底层原理详解和源码分析
文章目录 java并发线程池底层原理详解和源码分析 线程和线程池性能对比 Executors创建的三种线程池分析 自定义线程池分析 线程池源码分析 继承关系 ThreadPoolExecutor源码分 ...
- 线程池invokeAll方法详解
线程池invokeAll方法详解 问题起源与抽象 问题排查与猜测 猜测一:invokeAll 在异步执行后会不会同步等待线程执行完毕获取最终结果 猜测二:队列里面可能存在第一次调用 invokeAll ...
- java线程池ThreadPoolExecutor类详解
线程池有哪些状态 1. RUNNING: 接收新的任务,且执行等待队列中的任务 Accept new tasks and process queued tasks 2. SHUTDOWN: 不接收 ...
- 线程与线程池(一条龙详解)
一:前言 一个问题引出的学习笔记 并发类库提供的线程池实现有哪些? 其实Executors已经为我们封装好了 4 种常见的功能线程池,如下: 定长线程池(FixedThreadPool) 定时线程池( ...
- Java线程池七大参数详解和配置
目录 一.corePoolSize核心线程数 二.maximunPoolSize最大线程数 三.keepAliveTime空闲线程存活时间 四.unit空闲线程存活时间的单位 五.workQueue线 ...
- ThreadPoolExecutor线程池核心参数详解
理解ThreadPoolExecutor线程池的corePoolSize.maximumPoolSize和poolSize 我们知道,受限于硬件.内存和性能,我们不可能无限制的创建任意数量的线程,因为 ...
- python队列线程池_实例详解:python高级编程之消息队列(Queue)与进程池(Pool)
今天为大家带来的内容是:python高级编程之消息队列(Queue)与进程池(Pool),结合了实例的形式详细分析了Python消息队列与进程池的相关原理.使用技巧与操作注意事项!!! Queue消息 ...
- 线程池之ThreadPoolExecutor详解
转自:https://thinkwon.blog.csdn.net/article/details/102541900
- Redis的淘汰策略详解
接上一篇Redis的过期策略详解 Redis的过期策略详解 所谓的淘汰策略就是: 我们redis中的数据都没有过期,但是内存有大小,所以我们得淘汰一些没有过期的数据!! 那么怎么去淘汰了,我们上一篇讲 ...
最新文章
- Android 删除无用的导包
- jTessBoxEditor工具进行Tesseract3.02.02样本训练
- GridView中使用DataFromatString
- Java网络编程之IP地址和InetAddress类
- VMWare虚拟机转换成KVM
- 不愿意和别人打交道_最不愿与陌生人打交道的星座
- activiti 生命周期_一文让你读懂什么是Activiti工作流
- strstr不区分大小写_Excel find 函数竟然还能这么用!奖金梯次计算,连 if 都不需要...
- idea war包和jar包区别 以及用maven如何打包
- 动态加载html 添加样式表,使页面动态加载不同CSS样式表,从而实现不同风格模板的方法...
- [poj1222]EXTENDED LIGHTS OUT(高斯消元)
- GalleryView禁止选中项目向中间滑动
- “用户体验及可用性测试”读后感
- CISSP认证科普,涨姿势
- JS 无形装逼,最为致命
- 中国家庭收入调查(CHIP)数据88-13年
- Invoking “make cmake_check_build_system“ failed
- vivox50支持鸿蒙,vivoX50pro—好马配好鞍,强大的微云台相机
- 再聊面试,这次关于钱,关于培训,关于内卷
- 互联网小拼,这一生的故事,你要看看吗《打工人的那些事》