线程池的拒绝策略

ThreadPoolExecutor内部有实现4个拒绝策略,默认为AbortPolicy策略

  • CallerRunsPolicy:由调用execute方法提交任务的线程来执行这个任务
  • AbortPolicy:抛出异常RejectedExecutionException拒绝提交任务
  • DiscardPolicy:直接抛弃任务,不做任何处理
  • DiscardOldestPolicy:去除任务队列中的第一个任务,重新提交

线程池中,有三个重要的参数,决定影响了拒绝策略:corePoolSize - 核心线程数,也即最小的线程数。workQueue - 阻塞队列 。 maximumPoolSize - 最大线程数

当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到 maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。

总结起来,也就是一句话,当提交的任务数大于(workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略。

拒绝策略的源码

CallerRunsPolicy

    /*** A handler for rejected tasks that runs the rejected task* directly in the calling thread of the {@code execute} method,* unless the executor has been shut down, in which case the task* is discarded.* 用于拒绝任务的处理程序,* 可以直接在{@code execute}方法的调用线程中运行被拒绝的任务* 除非执行器已被关闭,否则将丢弃该任务。*/public static class CallerRunsPolicy implements RejectedExecutionHandler {/*** Creates a {@code CallerRunsPolicy}.* 创建一个{@code CallerRunsPolicy}。*/public CallerRunsPolicy() { }/*** Executes task r in the caller's thread, unless the executor* has been shut down, in which case the task is discarded.* 除非执行器已关闭,否则在调用者线程中执行任务,* r 在这种情况下,该任务将被丢弃。** @param r the runnable task requested to be executed*          r 请求执行的可运行任务* @param e the executor attempting to execute this task*          e 尝试执行此任务的执行者*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}}
分析:

CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)

这样生产者虽然没有被阻塞,但提交任务也会被暂停。

但这种策略也有隐患,当生产者较少时,生产者消费任务的时间里,消费者可能已经把任务都消费完了,队列处于空状态,当生产者执行完任务后才能再继续生产任务,这个过程中可能导致消费者线程的饥饿。

AbortPolicy

    /*** A handler for rejected tasks that throws a* {@code RejectedExecutionException}.* 抛出{@code RejectedExecutionException}的拒绝任务处理程序。*/public static class AbortPolicy implements RejectedExecutionHandler {/*** Creates an {@code AbortPolicy}.*/public AbortPolicy() { }/*** Always throws RejectedExecutionException.* 总是抛出RejectedExecutionException* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task* @throws RejectedExecutionException always*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}
分析:

该策略是默认饱和策略。

使用该策略时在饱和时会抛出RejectedExecutionException(继承自RuntimeException),调用者可捕获该异常自行处理。

DiscardPolicy

    /*** A handler for rejected tasks that silently discards the* rejected task.* 拒绝任务的处理程序,默认丢弃拒绝任务。*/public static class DiscardPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardPolicy}.*/public DiscardPolicy() { }/*** Does nothing, which has the effect of discarding task r.* 不执行任何操作,这具有丢弃任务 r 的作用。* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}
分析:

如代码所示,不做任何处理直接抛弃任务

DiscardOldestPolicy

    /*** A handler for rejected tasks that discards the oldest unhandled* request and then retries {@code execute}, unless the executor* is shut down, in which case the task is discarded.* 处理被拒绝任务的处理程序,它丢弃最旧的未处理请求,* 然后重试{@code execute},* 除非执行器*被关闭,在这种情况下,该任务将被丢弃。*/public static class DiscardOldestPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardOldestPolicy} for the given executor.*/public DiscardOldestPolicy() { }/*** Obtains and ignores the next task that the executor* would otherwise execute, if one is immediately available,* and then retries execution of task r, unless the executor* is shut down, in which case task r is instead discarded.* 获取并忽略执行者*会立即执行的下一个任务(如果一个任务立即可用),* 然后重试任务r的执行,除非执行者*被关闭,在这种情况下,任务r会被丢弃。* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}}
分析:

如代码,先将阻塞队列中的头元素出队抛弃,再尝试提交任务。如果此时阻塞队列使用PriorityBlockingQueue优先级队列,将会导致优先级最高的任务被抛弃,因此不建议将该种策略配合优先级队列使用。

自定义策略

看完发现默认的几个拒绝策略并不是特别的友好,那么可不可以咱们自己搞个呢?

可以发现,所有的拒绝策略都是实现了 RejectedExecutionHandler 接口

public interface RejectedExecutionHandler {/*** Method that may be invoked by a {@link ThreadPoolExecutor} when* {@link ThreadPoolExecutor#execute execute} cannot accept a* task.  This may occur when no more threads or queue slots are* available because their bounds would be exceeded, or upon* shutdown of the Executor.** <p>In the absence of other alternatives, the method may throw* an unchecked {@link RejectedExecutionException}, which will be* propagated to the caller of {@code execute}.** @param r the runnable task requested to be executed* @param executor the executor attempting to execute this task* @throws RejectedExecutionException if there is no remedy*/void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

这个接口只有一个 rejectedExecution 方法。

r 为待执行任务;executor 为线程池;方法可能会抛出拒绝异常。

那么咱们就可以通过实现 RejectedExecutionHandler 接口扩展

两个栗子:一

netty自己实现的线程池里面私有的一个拒绝策略。单独启动一个新的临时线程来执行任务。

    private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {final Thread t = new Thread(r, "Temporary task executor");t.start();} catch (Throwable e) {throw new RejectedExecutionException("Failed to start a new thread", e);}}}

两个栗子:二

dubbo的一个例子,它直接继承的 AbortPolicy ,加强了日志输出,并且输出dump文件

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {String msg = String.format("Thread pool is EXHAUSTED!" +" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),url.getProtocol(), url.getIp(), url.getPort());logger.warn(msg);dumpJStack();throw new RejectedExecutionException(msg);}
}

自己玩

参考类似的思路,最简单的做法,我们可以直接定义一个RejectedExecutionHandler,当队列满时改为调用BlockingQueue.put来实现生产者的阻塞:

    new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {if (!executor.isShutdown()) {try {executor.getQueue().put(r);} catch (InterruptedException e) {// should not be interrupted}}}};

这样,我们就无需再关心Queue和Consumer的逻辑,只要把精力集中在生产者和消费者线程的实现逻辑上,只管往线程池提交任务就行了。

相比最初的设计,这种方式的代码量能减少不少,而且能避免并发环境的很多问题。当然,你也可以采用另外的手段,例如在提交时采用信号量做入口限制等,但是如果仅仅是要让生产者阻塞,那就显得复杂了。

总结

四种线程池拒绝策略,具体使用哪种策略,还得根据实际业务场景才能做出抉择。

【多线程】线程池拒绝策略详解与自定义拒绝策略相关推荐

  1. 并发编程五:java并发线程池底层原理详解和源码分析

    文章目录 java并发线程池底层原理详解和源码分析 线程和线程池性能对比 Executors创建的三种线程池分析 自定义线程池分析 线程池源码分析 继承关系 ThreadPoolExecutor源码分 ...

  2. 线程池invokeAll方法详解

    线程池invokeAll方法详解 问题起源与抽象 问题排查与猜测 猜测一:invokeAll 在异步执行后会不会同步等待线程执行完毕获取最终结果 猜测二:队列里面可能存在第一次调用 invokeAll ...

  3. java线程池ThreadPoolExecutor类详解

    线程池有哪些状态 1. RUNNING:  接收新的任务,且执行等待队列中的任务 Accept new tasks and process queued tasks  2. SHUTDOWN: 不接收 ...

  4. 线程与线程池(一条龙详解)

    一:前言 一个问题引出的学习笔记 并发类库提供的线程池实现有哪些? 其实Executors已经为我们封装好了 4 种常见的功能线程池,如下: 定长线程池(FixedThreadPool) 定时线程池( ...

  5. Java线程池七大参数详解和配置

    目录 一.corePoolSize核心线程数 二.maximunPoolSize最大线程数 三.keepAliveTime空闲线程存活时间 四.unit空闲线程存活时间的单位 五.workQueue线 ...

  6. ThreadPoolExecutor线程池核心参数详解

    理解ThreadPoolExecutor线程池的corePoolSize.maximumPoolSize和poolSize 我们知道,受限于硬件.内存和性能,我们不可能无限制的创建任意数量的线程,因为 ...

  7. python队列线程池_实例详解:python高级编程之消息队列(Queue)与进程池(Pool)

    今天为大家带来的内容是:python高级编程之消息队列(Queue)与进程池(Pool),结合了实例的形式详细分析了Python消息队列与进程池的相关原理.使用技巧与操作注意事项!!! Queue消息 ...

  8. 线程池之ThreadPoolExecutor详解

    转自:https://thinkwon.blog.csdn.net/article/details/102541900

  9. Redis的淘汰策略详解

    接上一篇Redis的过期策略详解 Redis的过期策略详解 所谓的淘汰策略就是: 我们redis中的数据都没有过期,但是内存有大小,所以我们得淘汰一些没有过期的数据!! 那么怎么去淘汰了,我们上一篇讲 ...

最新文章

  1. Android 删除无用的导包
  2. jTessBoxEditor工具进行Tesseract3.02.02样本训练
  3. GridView中使用DataFromatString
  4. Java网络编程之IP地址和InetAddress类
  5. VMWare虚拟机转换成KVM
  6. 不愿意和别人打交道_最不愿与陌生人打交道的星座
  7. activiti 生命周期_一文让你读懂什么是Activiti工作流
  8. strstr不区分大小写_Excel find 函数竟然还能这么用!奖金梯次计算,连 if 都不需要...
  9. idea war包和jar包区别 以及用maven如何打包
  10. 动态加载html 添加样式表,使页面动态加载不同CSS样式表,从而实现不同风格模板的方法...
  11. [poj1222]EXTENDED LIGHTS OUT(高斯消元)
  12. GalleryView禁止选中项目向中间滑动
  13. “用户体验及可用性测试”读后感
  14. CISSP认证科普,涨姿势
  15. JS 无形装逼,最为致命
  16. 中国家庭收入调查(CHIP)数据88-13年
  17. Invoking “make cmake_check_build_system“ failed
  18. vivox50支持鸿蒙,vivoX50pro—好马配好鞍,强大的微云台相机
  19. 再聊面试,这次关于钱,关于培训,关于内卷
  20. 互联网小拼,这一生的故事,你要看看吗《打工人的那些事》

热门文章

  1. 延迟加载图片并监听图片加载完成
  2. java.lang.NullPointerException错误分析
  3. HTML的数据 转成 JSON数据中的 因HTML有大量及其它特殊符号会把JSON字符串截断该怎么...
  4. 【Visual C++】游戏开发笔记十四 游戏画面绘图(四) 华丽的CImage类
  5. 超声检查预测一年后RA病人的MRI侵蚀进展
  6. NASM汇编程序中的宏定义
  7. JS学习之Node类型
  8. 如何利用抽象工厂更换数据库
  9. java源码分析之ArrayList
  10. tcp/ip详解--分层