多线程报了个java.util.concurrent.RejectedExecutionException: event executor terminated

线程池的拒绝策略

ThreadPoolExecutor内部有实现4个拒绝策略,默认为AbortPolicy策略

  • CallerRunsPolicy:由调用execute方法提交任务的线程来执行这个任务
  • AbortPolicy:抛出异常RejectedExecutionException拒绝提交任务
  • DiscardPolicy:直接抛弃任务,不做任何处理
  • DiscardOldestPolicy:去除任务队列中的第一个任务,重新提交

线程池中,有三个重要的参数,决定影响了拒绝策略:corePoolSize - 核心线程数,也即最小的线程数。workQueue - 阻塞队列 。 maximumPoolSize - 最大线程数

当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到 maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。

总结起来,也就是一句话,当提交的任务数大于(workQueue.size() + maximumPoolSize ),就会触发线程池的拒绝策略。

当你的线程跑满,并且等待队列也满了以后,再执行任务,就会报错,你也可以抛弃掉新增的任务
你的线程池最大是100,队列是200,所以你任务跑多了,就会报这个错误

拒绝策略的源码

CallerRunsPolicy

 /*** A handler for rejected tasks that runs the rejected task* directly in the calling thread of the {@code execute} method,* unless the executor has been shut down, in which case the task* is discarded.* 用于拒绝任务的处理程序,* 可以直接在{@code execute}方法的调用线程中运行被拒绝的任务* 除非执行器已被关闭,否则将丢弃该任务。*/public static class CallerRunsPolicy implements RejectedExecutionHandler {/*** Creates a {@code CallerRunsPolicy}.* 创建一个{@code CallerRunsPolicy}。*/public CallerRunsPolicy() { }/*** Executes task r in the caller's thread, unless the executor* has been shut down, in which case the task is discarded.* 除非执行器已关闭,否则在调用者线程中执行任务,* r 在这种情况下,该任务将被丢弃。** @param r the runnable task requested to be executed*          r 请求执行的可运行任务* @param e the executor attempting to execute this task*          e 尝试执行此任务的执行者*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}}

分析:

CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)

这样生产者虽然没有被阻塞,但提交任务也会被暂停。

但这种策略也有隐患,当生产者较少时,生产者消费任务的时间里,消费者可能已经把任务都消费完了,队列处于空状态,当生产者执行完任务后才能再继续生产任务,这个过程中可能导致消费者线程的饥饿。

AbortPolicy

 /*** A handler for rejected tasks that throws a* {@code RejectedExecutionException}.* 抛出{@code RejectedExecutionException}的拒绝任务处理程序。*/public static class AbortPolicy implements RejectedExecutionHandler {/*** Creates an {@code AbortPolicy}.*/public AbortPolicy() { }/*** Always throws RejectedExecutionException.* 总是抛出RejectedExecutionException* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task* @throws RejectedExecutionException always*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}

分析:

该策略是默认饱和策略。

使用该策略时在饱和时会抛出RejectedExecutionException(继承自RuntimeException),调用者可捕获该异常自行处理。

DiscardPolicy

   /*** A handler for rejected tasks that silently discards the* rejected task.* 拒绝任务的处理程序,默认丢弃拒绝任务。*/public static class DiscardPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardPolicy}.*/public DiscardPolicy() { }/*** Does nothing, which has the effect of discarding task r.* 不执行任何操作,这具有丢弃任务 r 的作用。* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}

分析:

如代码所示,不做任何处理直接抛弃任务

DiscardOldestPolicy

   /*** A handler for rejected tasks that discards the oldest unhandled* request and then retries {@code execute}, unless the executor* is shut down, in which case the task is discarded.* 处理被拒绝任务的处理程序,它丢弃最旧的未处理请求,* 然后重试{@code execute},* 除非执行器*被关闭,在这种情况下,该任务将被丢弃。*/public static class DiscardOldestPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardOldestPolicy} for the given executor.*/public DiscardOldestPolicy() { }/*** Obtains and ignores the next task that the executor* would otherwise execute, if one is immediately available,* and then retries execution of task r, unless the executor* is shut down, in which case task r is instead discarded.* 获取并忽略执行者*会立即执行的下一个任务(如果一个任务立即可用),* 然后重试任务r的执行,除非执行者*被关闭,在这种情况下,任务r会被丢弃。* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}}

分析:

如代码,先将阻塞队列中的头元素出队抛弃,再尝试提交任务。如果此时阻塞队列使用PriorityBlockingQueue优先级队列,将会导致优先级最高的任务被抛弃,因此不建议将该种策略配合优先级队列使用。

自定义策略

看完发现默认的几个拒绝策略并不是特别的友好,那么可不可以咱们自己搞个呢?

可以发现,所有的拒绝策略都是实现了 RejectedExecutionHandler 接口

public interface RejectedExecutionHandler {/*** Method that may be invoked by a {@link ThreadPoolExecutor} when* {@link ThreadPoolExecutor#execute execute} cannot accept a* task.  This may occur when no more threads or queue slots are* available because their bounds would be exceeded, or upon* shutdown of the Executor.** <p>In the absence of other alternatives, the method may throw* an unchecked {@link RejectedExecutionException}, which will be* propagated to the caller of {@code execute}.** @param r the runnable task requested to be executed* @param executor the executor attempting to execute this task* @throws RejectedExecutionException if there is no remedy*/void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

这个接口只有一个 rejectedExecution 方法。

r 为待执行任务;executor 为线程池;方法可能会抛出拒绝异常。

那么咱们就可以通过实现 RejectedExecutionHandler 接口扩展

两个栗子:一

netty自己实现的线程池里面私有的一个拒绝策略。单独启动一个新的临时线程来执行任务。

  private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {final Thread t = new Thread(r, "Temporary task executor");t.start();} catch (Throwable e) {throw new RejectedExecutionException("Failed to start a new thread", e);}}}

两个栗子:二

dubbo的一个例子,它直接继承的 AbortPolicy ,加强了日志输出,并且输出dump文件

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {String msg = String.format("Thread pool is EXHAUSTED!" +" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),url.getProtocol(), url.getIp(), url.getPort());logger.warn(msg);dumpJStack();throw new RejectedExecutionException(msg);}
}

自己玩

参考类似的思路,最简单的做法,我们可以直接定义一个RejectedExecutionHandler,当队列满时改为调用BlockingQueue.put来实现生产者的阻塞:

 new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {if (!executor.isShutdown()) {try {executor.getQueue().put(r);} catch (InterruptedException e) {// should not be interrupted}}}};

这样,我们就无需再关心Queue和Consumer的逻辑,只要把精力集中在生产者和消费者线程的实现逻辑上,只管往线程池提交任务就行了。

相比最初的设计,这种方式的代码量能减少不少,而且能避免并发环境的很多问题。当然,你也可以采用另外的手段,例如在提交时采用信号量做入口限制等,但是如果仅仅是要让生产者阻塞,那就显得复杂了。

总结

四种线程池拒绝策略,具体使用哪种策略,还得根据实际业务场景才能做出抉择。

关于java.util.concurrent.RejectedExecutionException: event executor terminated相关推荐

  1. java.util.concurrent.RejectedExecutionException: event executor terminated 错误分析

    java.util.concurrent.RejectedExecutionException: event executor terminated 错误分析

  2. 【ruoyi】java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoo

    前言 ruoyi 4.6.0 jdk1.8 错误 11:48:16.879 [http-nio-9031-exec-25] INFO c.r.f.s.r.UserRealm - [doGetAuthe ...

  3. java.util.concurrent.RejectedExecutionException

    2019独角兽企业重金招聘Python工程师标准>>> 遇到java.util.concurrent.RejectedExecutionException 目前看来,最主要有2种原因 ...

  4. 1]解决java.util.concurrent.RejectedExecutionException

    今天学习了java的并发,线程池,同一时间执行一个操作. 报错:java.util.concurrent.RejectedExecutionException,排查发现是等待队列设小了,导致 拒绝策略 ...

  5. java util下的并发包_jdk并发包下:使用java.util.concurrent.Executor线程池

    多线程,线程池Executor的接口类图: 其他都不重要,就ExecutorService是主要的: 基本上分为单纯线程池和定时任务线程池: 说白了除了ForkJoinPool所有都继承于Thread ...

  6. 线程池(java.util.concurrent.ThreadPoolExecutor)的使用

    如果大家觉得这个类还不能完全满足自己的要求的话,其实可以照搬这个源码,然后适当改动一下来适合自己的需求,也不失为一种捷径的.呵呵,本人最近在自己做的一个项目中,就来了这一手.虽然不是多好,主要是为了满 ...

  7. java.util.concurrent 包下面的所有类

    java.util.concurrent 包下面的所有类 原子操作数类: java.util.concurrent.atomic.AtomicBoolean.class java.util.concu ...

  8. 线程池java.util.concurrent.ThreadPoolExecutor总结

    http://uule.iteye.com/blog/1123185 线程池还具有提高系统性能的优点,因为创建线程和清除线程的开销比较大. 有两种不同类型的线程池:一是固定线程数量的线程池:二是可变数 ...

  9. JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor

    http://www.diybl.com/course/3_program/java/javajs/200797/70003.html 在多线程大师Doug Lea的贡献下,在JDK1.5中加入了许多 ...

最新文章

  1. C++库文件和头文件编写教程
  2. LAPJV算法学习笔记
  3. Windows文件被占用解决办法
  4. 1.NET 4.6.1向.NET core 2.0项目迁移(HelloWorld篇)
  5. 区块链BaaS云服务(12)易居(中国) 房地产 EBaaS(Estate Blockchain as a Service)
  6. boost::basic_string_view相关的测试程序
  7. boost::mpl::filter_view模块实现日历相关的测试程序
  8. DFS:C 小Y的难题(1)
  9. elementui树形复选框,element-ui checkbox 组件的树形联动
  10. Java_FileInputStream_读一个文件
  11. C# List的克隆
  12. Linux Shell变量使用
  13. C++基础::string(三)
  14. Atititi. naming spec 联系人命名与remark备注指南规范v5 r99.docx
  15. 数据结构与算法(java)
  16. python输出unicode对应字符_python unicode字符串
  17. 用户研究:用户行为分析
  18. 中控指纹仪linux驱动下载,中控uru4500指纹仪驱动
  19. pycharm清华镜像源使用
  20. 数据挖掘学习--主成分分析

热门文章

  1. iMeta | 华南农大曾振灵/熊文广等-家庭中宠物犬与主人耐药基因的共存研究
  2. Win flex-bison 的简单使用
  3. 秩和检验(秩的概念,秩和检验法)
  4. 2020.05.29
  5. OSPFv2原理详解(基于RFC2328)+配置介绍+RFC2328翻译
  6. 实时云渲染有哪些优势?
  7. 2022年电赛A题单相交流电子负载一等奖(代码工程+PCB原理图资料)
  8. Ofiice 2016 Excel 表中某一列按照另一列的数值进行排序
  9. css 文字溢出...显示,hover时显示隐藏文字
  10. 超详细Docker部署SpringBoot+Vue项目(三更博客项目部署)