前言

谈到 Java 的线程池最熟悉的莫过于 ExecutorService 接口了,jdk1.5 新增的 java.util.concurrent 包下的这个 api,大大的简化了多线程代码的开发。而不论你用 FixedThreadPool 还是 CachedThreadPool 其背后实现都是ThreadPoolExecutor。ThreadPoolExecutor 是一个典型的缓存池化设计的产物,因为池子有大小,当池子体积不够承载时,就涉及到拒绝策略。JDK 中已经预设了 4 种线程池拒绝策略,下面结合场景详细聊聊这些策略的使用场景,以及我们还能扩展哪些拒绝策略。

池化设计思想

池话设计应该不是一个新名词。我们常见的如 Java 线程池、JDBC 连接池、Redis 连接池等就是这类设计的代表实现。这种设计会初始预设资源,解决的问题就是抵消每次获取资源的消耗,如创建线程的开销,获取远程连接的开销等。就好比你去食堂打饭,打饭的大妈会先把饭盛好几份放那里,你来了就直接拿着饭盒加菜即可,不用再临时又盛饭又打菜,效率就高了。除了初始化资源,池化设计还包括如下这些特征:池子的初始值、池子的活跃值、池子的最大值等,这些特征可以直接映射到 Java 线程池和数据库连接池的成员属性中。

线程池触发拒绝策略的时机

和数据源连接池不一样,线程池除了初始大小和池子最大值,还多了一个阻塞队列来缓冲。数据源连接池一般请求的连接数超过连接池的最大值的时候就会触发拒绝策略,策略一般是阻塞等待设置的时间或者直接抛异常。而线程池的触发时机如下图:

img

如图,想要了解线程池什么时候触发拒绝粗略,需要明确上面三个参数的具体含义,是这三个参数总体协调的结果,而不是简单的超过最大线程数就会触发线程拒绝粗略,当提交的任务数大于 corePoolSize 时,会优先放到队列缓冲区,只有填满了缓冲区后,才会判断当前运行的任务是否大于 maxPoolSize,小于时会新建线程处理。大于时就触发了拒绝策略,总结就是:当前提交任务数大于(maxPoolSize + queueCapacity)时就会触发线程池的拒绝策略了。

JDK内置4种线程池拒绝策略

拒绝策略接口定义

在分析 JDK 自带的线程池拒绝策略前,先看下 JDK 定义的 拒绝策略接口,如下:

1public interface RejectedExecutionHandler {
2    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
3}

接口定义很明确,当触发拒绝策略时,线程池会调用你设置的具体的策略,将当前提交的任务以及线程池实例本身传递给你处理,具体作何处理,不同场景会有不同的考虑,下面看 JDK 为我们内置了哪些实现:

CallerRunsPolicy(调用者运行策略)

 1    public static class CallerRunsPolicy implements RejectedExecutionHandler {23        public CallerRunsPolicy() { }45        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {6            if (!e.isShutdown()) {7                r.run();8            }9        }
10    }

功能:当触发拒绝策略时,只要线程池没有关闭,就由提交任务的当前线程处理。

使用场景:一般在不允许失败的、对性能要求不高、并发量较小的场景下使用,因为线程池一般情况下不会关闭,也就是提交的任务一定会被运行,但是由于是调用者线程自己执行的,当多次提交任务时,就会阻塞后续任务执行,性能和效率自然就慢了。

AbortPolicy(中止策略)

 1    public static class AbortPolicy implements RejectedExecutionHandler {23        public AbortPolicy() { }45        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {6            throw new RejectedExecutionException("Task " + r.toString() +7                                                 " rejected from " +8                                                 e.toString());9        }
10    }

功能:当触发拒绝策略时,直接抛出拒绝执行的异常,中止策略的意思也就是打断当前执行流程

使用场景:这个就没有特殊的场景了,但是一点要正确处理抛出的异常。ThreadPoolExecutor 中默认的策略就是AbortPolicy,ExecutorService 接口的系列 ThreadPoolExecutor 因为都没有显示的设置拒绝策略,所以默认的都是这个。但是请注意,ExecutorService 中的线程池实例队列都是无界的,也就是说把内存撑爆了都不会触发拒绝策略。当自己自定义线程池实例时,使用这个策略一定要处理好触发策略时抛的异常,因为他会打断当前的执行流程。

DiscardPolicy(丢弃策略)

1    public static class DiscardPolicy implements RejectedExecutionHandler {
2
3        public DiscardPolicy() { }
4
5        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
6        }
7    }

功能:直接静悄悄的丢弃这个任务,不触发任何动作

使用场景:如果你提交的任务无关紧要,你就可以使用它 。因为它就是个空实现,会悄无声息的吞噬你的的任务。所以这个策略基本上不用了

DiscardOldestPolicy(弃老策略)

 1    public static class DiscardOldestPolicy implements RejectedExecutionHandler {23        public DiscardOldestPolicy() { }45        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {6            if (!e.isShutdown()) {7                e.getQueue().poll();8                e.execute(r);9            }
10        }
11    }

功能:如果线程池未关闭,就弹出队列头部的元素,然后尝试执行

使用场景:这个策略还是会丢弃任务,丢弃时也是毫无声息,但是特点是丢弃的是老的未执行的任务,而且是待执行优先级较高的任务。基于这个特性,我能想到的场景就是,发布消息,和修改消息,当消息发布出去后,还未执行,此时更新的消息又来了,这个时候未执行的消息的版本比现在提交的消息版本要低就可以被丢弃了。因为队列中还有可能存在消息版本更低的消息会排队执行,所以在真正处理消息的时候一定要做好消息的版本比较

第三方实现的拒绝策略

Dubbo 中的线程拒绝策略

 1public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {23    protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);45    private final String threadName;67    private final URL url;89    private static volatile long lastPrintTime = 0;
10
11    private static Semaphore guard = new Semaphore(1);
12
13    public AbortPolicyWithReport(String threadName, URL url) {
14        this.threadName = threadName;
15        this.url = url;
16    }
17
18    @Override
19    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
20        String msg = String.format("Thread pool is EXHAUSTED!" +
21                        " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
22                        " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
23                threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
24                e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
25                url.getProtocol(), url.getIp(), url.getPort());
26        logger.warn(msg);
27        dumpJStack();
28        throw new RejectedExecutionException(msg);
29    }
30
31    private void dumpJStack() {
32       //省略实现
33    }
34}

可以看到,当dubbo的工作线程触发了线程拒绝后,主要做了三个事情,原则就是尽量让使用者清楚触发线程拒绝策略的真实原因

  • 输出了一条警告级别的日志,日志内容为线程池的详细设置参数,以及线程池当前的状态,还有当前拒绝任务的一些详细信息。可以说,这条日志,使用dubbo的有过生产运维经验的或多或少是见过的,这个日志简直就是日志打印的典范,其他的日志打印的典范还有spring。得益于这么详细的日志,可以很容易定位到问题所在

  • 输出当前线程堆栈详情,这个太有用了,当你通过上面的日志信息还不能定位问题时,案发现场的dump线程上下文信息就是你发现问题的救命稻草,这个可以参考《dubbo线程池耗尽事件-"CyclicBarrier惹的祸"》

  • 继续抛出拒绝执行异常,使本次任务失败,这个继承了JDK默认拒绝策略的特性

Netty 中的线程池拒绝策略

 1    private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {2        NewThreadRunsPolicy() {3            super();4        }56        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {7            try {8                final Thread t = new Thread(r, "Temporary task executor");9                t.start();
10            } catch (Throwable e) {
11                throw new RejectedExecutionException(
12                        "Failed to start a new thread", e);
13            }
14        }
15    }

Netty 中的实现很像 JDK 中的 CallerRunsPolicy,舍不得丢弃任务。不同的是,CallerRunsPolicy 是直接在调用者线程执行的任务。而 Netty是新建了一个线程来处理的。所以,Netty的实现相较于调用者执行策略的使用面就可以扩展到支持高效率高性能的场景了。但是也要注意一点,Netty的实现里,在创建线程时未做任何的判断约束,也就是说只要系统还有资源就会创建新的线程来处理,直到new不出新的线程了,才会抛创建线程失败的异常

ActiveMQ 中的线程池拒绝策略

 1 new RejectedExecutionHandler() {2                @Override3                public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {4                    try {5                        executor.getQueue().offer(r, 60, TimeUnit.SECONDS);6                    } catch (InterruptedException e) {7                        throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");8                    }9
10                    throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
11                }
12            });

activeMq中的策略属于最大努力执行任务型,当触发拒绝策略时,在尝试一分钟的时间重新将任务塞进任务队列,当一分钟超时还没成功时,就抛出异常

PinPoint 中的线程池拒绝策略

 1public class RejectedExecutionHandlerChain implements RejectedExecutionHandler {2    private final RejectedExecutionHandler[] handlerChain;34    public static RejectedExecutionHandler build(List<RejectedExecutionHandler> chain) {5        Objects.requireNonNull(chain, "handlerChain must not be null");6        RejectedExecutionHandler[] handlerChain = chain.toArray(new RejectedExecutionHandler[0]);7        return new RejectedExecutionHandlerChain(handlerChain);8    }9
10    private RejectedExecutionHandlerChain(RejectedExecutionHandler[] handlerChain) {
11        this.handlerChain = Objects.requireNonNull(handlerChain, "handlerChain must not be null");
12    }
13
14    @Override
15    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
16        for (RejectedExecutionHandler rejectedExecutionHandler : handlerChain) {
17            rejectedExecutionHandler.rejectedExecution(r, executor);
18        }
19    }
20}

pinpoint的拒绝策略实现很有特点,和其他的实现都不同。他定义了一个拒绝策略链,包装了一个拒绝策略列表,当触发拒绝策略时,会将策略链中的rejectedExecution依次执行一遍

结语

前文从线程池设计思想,以及线程池触发拒绝策略的时机引出java线程池拒绝策略接口的定义。并辅以JDK内置4种以及四个第三方开源软件的拒绝策略定义描述了线程池拒绝策略实现的各种思路和使用场景。希望阅读此文后能让你对java线程池拒绝策略有更加深刻的认识,能够根据不同的使用场景更加灵活的应用。

作者:KL

链接:http://www.kailing.pub/article/index/arcid/255.html

END

关注我

公众号(zhisheng)里回复 面经、ES、Flink、 Spring、Java、Kafka、监控 等关键字可以查看更多关键字对应的文章

Flink 实战

1、《从0到1学习Flink》—— Apache Flink 介绍2、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门3、《从0到1学习Flink》—— Flink 配置文件详解4、《从0到1学习Flink》—— Data Source 介绍5、《从0到1学习Flink》—— 如何自定义 Data Source ?6、《从0到1学习Flink》—— Data Sink 介绍7、《从0到1学习Flink》—— 如何自定义 Data Sink ?8、《从0到1学习Flink》—— Flink Data transformation(转换)9、《从0到1学习Flink》—— 介绍 Flink 中的 Stream Windows10、《从0到1学习Flink》—— Flink 中的几种 Time 详解11、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 ElasticSearch12、《从0到1学习Flink》—— Flink 项目如何运行?13、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 Kafka14、《从0到1学习Flink》—— Flink JobManager 高可用性配置15、《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍16、《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL17、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ18、《从0到1学习Flink》—— 你上传的 jar 包藏到哪里去了19、大数据“重磅炸弹”——实时计算框架 Flink20、《Flink 源码解析》—— 源码编译运行21、为什么说流处理即未来?22、OPPO数据中台之基石:基于Flink SQL构建实数据仓库23、流计算框架 Flink 与 Storm 的性能对比24、Flink状态管理和容错机制介绍25、原理解析 | Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理26、Apache Flink 是如何管理好内存的?27、《从0到1学习Flink》——Flink 中这样管理配置,你知道?28、《从0到1学习Flink》——Flink 不可以连续 Split(分流)?29、Flink 从0到1学习—— 分享四本 Flink 的书和二十多篇 Paper 论文30、360深度实践:Flink与Storm协议级对比31、Apache Flink 1.9 重大特性提前解读32、如何基于Flink+TensorFlow打造实时智能异常检测平台?只看这一篇就够了33、美团点评基于 Flink 的实时数仓建设实践34、Flink 灵魂两百问,这谁顶得住?35、一文搞懂 Flink 的 Exactly Once 和 At Least Once36、你公司到底需不需要引入实时计算引擎?37、Flink 从0到1学习 —— 如何使用 Side Output 来分流?

Flink 源码解析

知识星球里面可以看到下面文章

喜欢就点个"在看"呗^_^

Java 线程池 ThreadPoolExecutor 八种拒绝策略浅析相关推荐

  1. ThreadPoolExecutor 八种拒绝策略,对的,不是4种

    转载自  ThreadPoolExecutor 八种拒绝策略,对的,不是4种 前言 谈到 Java 的线程池最熟悉的莫过于 ExecutorService 接口了,jdk1.5 新增的 java.ut ...

  2. 《Java线程池》:任务拒绝策略

    <Java线程池>:任务拒绝策略 转载:https://blog.csdn.net/u010412719/article/details/52132613 在没有分析线程池原理之前先来分析 ...

  3. 线程池的四种拒绝策略

    一.前言 线程池,相信很多人都有用过,没用过相信的也有学习过.但是,线程池的拒绝策略,相信知道的人会少许多. 二.四种线程池拒绝策略 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximum ...

  4. Java多线程学习七:线程池的 4 种拒绝策略和 6 种常见的线程池

    以便在必要的时候按照我们的策略来拒绝任务,那么拒绝任务的时机是什么呢?线程池会在以下两种情况下会拒绝新提交的任务. 第一种情况是当我们调用 shutdown 等方法关闭线程池后,即便此时可能线程池内部 ...

  5. ThreadPoolExecutor 八种拒绝策略,对的,不是4种!

    点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达今日推荐:2020年7月程序员工资统计,平均14357元,又跌了,扎心个人原创100W+访问量博客:点击前往,查看更多 来源 ...

  6. java线程池ThreadPoolExecutor类详解

    线程池有哪些状态 1. RUNNING:  接收新的任务,且执行等待队列中的任务 Accept new tasks and process queued tasks  2. SHUTDOWN: 不接收 ...

  7. Java线程池ThreadPoolExecutor使用和分析(三) - 终止线程池原理

    相关文章目录: Java线程池ThreadPoolExecutor使用和分析(一) Java线程池ThreadPoolExecutor使用和分析(二) - execute()原理 Java线程池Thr ...

  8. Java线程池ThreadPoolExecutor使用和分析

    Java线程池ThreadPoolExecutor使用和分析(一) Java线程池ThreadPoolExecutor使用和分析(二) Java线程池ThreadPoolExecutor使用和分析(三 ...

  9. Java线程池的四种创建方式

    Java线程池的四种创建方式 Java使用Thread类来表示线程,所有的线程都是Thread类或者是他的子类.Java有四种方式来创建线程. (1)继承Thread类创建线程 (2)实现Runnab ...

最新文章

  1. python开发桌面软件-python适合开发桌面软件吗?
  2. php唯一性查询,ThinkPHP5.0数据更新验证唯一性怎么验证。
  3. 关于压缩jar包时提示*.*没有这个文件或目录的问题以及解决办法:
  4. 《论语》读后颜渊第十二主要大意
  5. JAVA知识基础(二):基本语法
  6. 8本前沿技术书,助力这届「青年人」将科幻变成现实
  7. Java语言基本元素
  8. phpcmsv9 更换域名出现页面无法访问后怎么办
  9. python集成学习算法_python: 一句话说机器学习算法和调参-集成学习篇
  10. MySQL中针对大数据量常用技术
  11. 2D曲线插值拟合基础
  12. 宋森安——CHARLS中国健康与养老调查数据清洗(一)
  13. android api解析之TextWatcher(editText输入监控之一)
  14. AES解码:BadPaddingException: pad block corrupted异常
  15. Android App签名的那些事
  16. 学透for循环-传统for循环与增强for循环
  17. source not found解决方法(亲测)
  18. koa框架数据导出为excel格式
  19. 计算机投诉信英语作文,投诉信英语作文(通用5篇)
  20. Android App实战项目之实现手写签名APP功能(附源码,简单易懂 可直接实用)

热门文章

  1. excel转换 txt竖线分隔符 为 xlsx 转换为 csv
  2. 日报写作规范_写作验收标准的验收标准
  3. 自动化测试脚本统一规范模板
  4. 修改浏览器的标题和图标
  5. 中医在计算机上的应用,计算机工程管理中医院管理应用
  6. 云原生 | go-micro全量配置详解
  7. markdown字体大小颜色样式
  8. [编程语言基础] 函数式编程
  9. 在RHEL6_Oracle_Linux_6上生成正确的udev_rule_规则文件
  10. 全世界最著名的经济金融学网站