文章目录

  • 继承体系
  • 七个参数
  • RejectedExecutionHandler
    • CallerRunsPolicy
    • AbortPolicy
    • DiscardPolicy
    • DiscardOldestPolicy
    • 自定义handler
  • 扩展:核心线程满了以后,如何将任务先交由非核心线程处理
    • 源码解析
    • 扩展源码
      • 自定义 Queue,重写 offer 方法
      • 自定义线程池,继承 ThreadPoolExecutor,修改核心逻辑
    • 总结
  • 参考资料

阿里巴巴Java开发手册中强制规定,线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

所以下面我们就康康 ThreadPoolExecutor 的7个参数。

继承体系

七个参数

    /*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize the number of threads to keep in the pool, even*        if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the*        pool* @param keepAliveTime when the number of threads is greater than*        the core, this is the maximum time that excess idle threads*        will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are*        executed.  This queue will hold only the {@code Runnable}*        tasks submitted by the {@code execute} method.* @param threadFactory the factory to use when the executor*        creates a new thread* @param handler the handler to use when execution is blocked*        because the thread bounds and queue capacities are reached* @throws IllegalArgumentException if one of the following holds:<br>*         {@code corePoolSize < 0}<br>*         {@code keepAliveTime < 0}<br>*         {@code maximumPoolSize <= 0}<br>*         {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}*         or {@code threadFactory} or {@code handler} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {}
  • corePoolSize 核心线程数,不会被回收,除非设置了 allowCoreThreadTimeOut
  • maximumPoolSize 线程池中允许最大的线程数量。
    • corePoolSize和maximumPoolSize的关系类似于工厂的编制员工和临时工。corePoolSize是编制员工的数量,(maximumPoolSize - corePoolSize)的数量是工厂订单旺季时,可以招的临时工的数量。等到工厂淡季的时候,就把这些临时工解散,但是编制员工淡季也不解散。
  • keepAliveTime 空闲线程被终止前的等待时间
  • unit 等待时间的单位
  • workQueue 任务队列。线程池提交任务时,先交由空闲的核心线程处理,没有空闲的核心线程以后就放到阻塞队列里。阻塞队列也满了就创建非核心线程来处理。但是非核心线程的数量加核心线程的数量不能超过 maximumPoolSize。
    • ArrayBlockingQueue
    • LinkedBlockingQueue
    • SynchronousQueue
    • PriorityBlockQueue
  • threadFactory 创建线程的工厂
  • handler 拒绝策略。当线程池中线程数量达到maximumPoolSize以后采取的策略,是丢掉还是让线程去执行,也可以自定义现实。

RejectedExecutionHandler

默认实现的拒绝策略

CallerRunsPolicy

提交任务的线程去处理

    /*** A handler for rejected tasks that runs the rejected task* directly in the calling thread of the {@code execute} method,* unless the executor has been shut down, in which case the task* is discarded.*/public static class CallerRunsPolicy implements RejectedExecutionHandler {}

这个有点不好理解,举个栗子,比如现在线程池满了,main方法中又来了一个线程,那么这个任务就交给main线程去处理掉。

AbortPolicy

抛个异常出去

    /*** A handler for rejected tasks that throws a* {@code RejectedExecutionException}.*/public static class AbortPolicy implements RejectedExecutionHandler {}

DiscardPolicy

默默丢掉新来的任务

    /*** A handler for rejected tasks that silently discards the rejected task.*/public static class DiscardPolicy implements RejectedExecutionHandler {}

DiscardOldestPolicy

丢掉workQueue里排队最久还没被处理掉的任务,然后重复去excute这个任务。除非,excutor被shut down了,那这个任务就被丢弃了。

    /*** A handler for rejected tasks that discards the oldest unhandled* request and then retries {@code execute}, unless the executor* is shut down, in which case the task is discarded.*/public static class DiscardOldestPolicy implements RejectedExecutionHandler {}

自定义handler

import java.util.concurrent.*;public class T14_MyRejectedHandler {public static void main(String[] args) {ExecutorService service = new ThreadPoolExecutor(4, 4,0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(6),Executors.defaultThreadFactory(),new MyHandler());}static class MyHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {//自定义处理方式,可以先打个日志 xxx 被我拒绝了 然后我把它放到了 redis 里 ...//log("r rejected")//save r in kafka、mysql、redis}}
}

扩展:核心线程满了以后,如何将任务先交由非核心线程处理

上文我们介绍线程池的核心参数 workQueue 时讲到,线程池提交任务时,先交由空闲的核心线程处理,没有空闲的核心线程以后就放到阻塞队列里。阻塞队列也满了就创建非核心线程来处理。

但是有没有一种可能,没有空闲的核心线程以后,先创建非核心线程,直到核心线程数 + 非核心线程达到 maximumPoolSize 再将任务放到队列里。

答案是有的,在 Dubbo 源码中就有现成的方案。Github issue :Extension: Eager Thread Pool #1568

看最后一句,Reference: Tomcat’s thread pool design 其实他也是借鉴了 Tomcat 的设计。

源码解析

先一起看下扩展之前,ThreadPoolExecutor 的 execute() 方法的逻辑,此处保留了源码中的注释。

    /*** Executes the given task sometime in the future.  The task* may execute in a new thread or in an existing pooled thread.** If the task cannot be submitted for execution, either because this* executor has been shutdown or because its capacity has been reached,* the task is handled by the current {@link RejectedExecutionHandler}.** @param command the task to execute* @throws RejectedExecutionException at discretion of*         {@code RejectedExecutionHandler}, if the task*         cannot be accepted for execution* @throws NullPointerException if {@code command} is null*/public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {// 创建线程if (addWorker(command, true))return;c = ctl.get();}// 线程数量达到了 corePoolSize,新任务放入队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 入队失败,创建非核心线程else if (!addWorker(command, false))// 创建新线程失败执行拒绝策略reject(command);}

扩展源码

自定义 Queue,重写 offer 方法

package eagerthreadpool;import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {private static final long serialVersionUID = -2635853580887179627L;// 自定义的线程池类,继承自ThreadPoolExecutorprivate EagerThreadPoolExecutor executor;public TaskQueue(int capacity) {super(capacity);}public void setExecutor(EagerThreadPoolExecutor exec) {executor = exec;}// offer方法的含义是:将任务提交到队列中,返回值为true/false,分别代表提交成功/提交失败@Overridepublic boolean offer(Runnable runnable) {if (executor == null) {throw new RejectedExecutionException("The task queue does not have executor!");}// 线程池的当前线程数int currentPoolThreadSize = executor.getPoolSize();if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {// 已提交的任务数量小于当前线程数,意味着线程池中有空闲线程,直接扔进队列里,让线程去处理return super.offer(runnable);}// return false to let executor create new worker.if (currentPoolThreadSize < executor.getMaximumPoolSize()) {// 重点: 当前线程数小于 最大线程数 ,返回false,暗含入队失败,让线程池去创建新的线程return false;}// 重点: 代码运行到此处,说明当前线程数 >= 最大线程数,需要真正的提交到队列中return super.offer(runnable);}public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if (executor.isShutdown()) {throw new RejectedExecutionException("Executor is shutdown!");}return super.offer(o, timeout, unit);}
}

自定义线程池,继承 ThreadPoolExecutor,修改核心逻辑

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;public class EagerThreadPoolExecutor extends ThreadPoolExecutor {/*** 定义一个成员变量,用于记录当前线程池中已提交的任务数量,在队列的 offer() 方法中要用*/private final AtomicInteger submittedTaskCount = new AtomicInteger(0);public EagerThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, TaskQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);}public int getSubmittedTaskCount() {return submittedTaskCount.get();}@Overrideprotected void afterExecute(Runnable r, Throwable t) {// ThreadPoolExecutor的勾子方法,在task执行完后需要将池中已提交的任务数 - 1submittedTaskCount.decrementAndGet();}@Overridepublic void execute(Runnable command) {if (command == null) {throw new NullPointerException();}// do not increment in method beforeExecute!// 将池中已提交的任务数 + 1submittedTaskCount.incrementAndGet();try {super.execute(command);} catch (RejectedExecutionException rx) {// retry to offer the task into queue.final TaskQueue queue = (TaskQueue) super.getQueue();try {if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {submittedTaskCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.", rx);}} catch (InterruptedException x) {submittedTaskCount.decrementAndGet();throw new RejectedExecutionException(x);}} catch (Throwable t) {// decrease any waysubmittedTaskCount.decrementAndGet();throw t;}}
}

总结

自定义的 EagerThreadPoolExecutor 依赖自定义的 TaskQueue 的 offer 返回值来决定是否创建更多的线程,达到先判断 maximumPoolSize 再判断队列的目的。

参考资料

雄哥:Java如何让线程池满后再放队列
ThreadPoolExecutor 源码解析

java 线程池 ThreadPoolExecutor 源码扩展 支持先增加非核心线程处理任务后放任务队列相关推荐

  1. Java并发编程实战(chapter_3)(线程池ThreadPoolExecutor源码分析)

    为什么80%的码农都做不了架构师?>>>    这个系列一直没再写,很多原因,中间经历了换工作,熟悉项目,熟悉新团队等等一系列的事情.并发课题对于Java来说是一个又重要又难的一大块 ...

  2. Java并发之线程池ThreadPoolExecutor源码分析学习

    线程池学习 以下所有内容以及源码分析都是基于JDK1.8的,请知悉. ​ 我写博客就真的比较没有顺序了,这可能跟我的学习方式有关,我自己也觉得这样挺不好的,但是没办法说服自己去改变,所以也只能这样想到 ...

  3. JAVA线程池(ThreadPoolExecutor)源码分析

    JAVA5提供了多种类型的线程池,如果你对这些线程池的特点以及类型不太熟悉或者非常熟悉,请帮忙看看这篇文章(顺便帮忙解决里面存在的问题,谢谢!):     http://xtu-xiaoxin.ite ...

  4. Java线程池ThreadPoolExecutor源码分析

    继承关系 Executor接口 public interface Executor {void execute(Runnable command); } ExecutorService接口 publi ...

  5. c++ 线程池_JAVA并发编程:线程池ThreadPoolExecutor源码分析

    前面的文章已经详细分析了线程池的工作原理及其基本应用,接下来本文将从底层源码分析一下线程池的执行过程.在看源码的时候,首先带着以下两个问题去仔细阅读.一是线程池如何保证核心线程数不会被销毁,空闲线程数 ...

  6. threadpoolexecutor创建线程池_线程池ThreadPoolExecutor源码分析

    什么是线程池 创建线程要花费昂贵的资源和时间,如果任务来了才创建那么响应时间会变长,而且一个进程能创建的线程数量有限.为了避免这些问题,在程序启动的时候就创建若干线程来响应出来,它们被称为线程池,里面 ...

  7. 【java】java中的线程池 ThreadPoolExecutor源码分析

    文章目录 1.概述 4.源码 4.1 关键属性 4.2 构造函数 4.4 状态控制 4.5 ThreadLocalMap 4.6 execute方法源码分析 4.7 addWorker方法源码分析 4 ...

  8. Java线程池状态判断源码_深入浅出Java线程池:源码篇

    前言 在上一篇文章深入浅出Java线程池:理论篇中,已经介绍了什么是线程池以及基本的使用.(本来写作的思路是使用篇,但经网友建议后,感觉改为理论篇会更加合适).本文则深入线程池的源码,主要是介绍Thr ...

  9. idea 线程内存_Java线程池系列之-Java线程池底层源码分析系列(一)

    课程简介: 课程目标:通过本课程学习,深入理解Java线程池,提升自身技术能力与价值. 适用人群:具有Java多线程基础的人群,希望深入理解线程池底层原理的人群. 课程概述:多线程的异步执行方式,虽然 ...

  10. idea 线程内存_Java线程池系列之-Java线程池底层源码分析系列(二)

    课程简介: 课程目标:通过本课程学习,深入理解Java线程池,提升自身技术能力与价值. 适用人群:具有Java多线程基础的人群,希望深入理解线程池底层原理的人群. 课程概述:多线程的异步执行方式,虽然 ...

最新文章

  1. 软件框架设计的艺术----读书总结
  2. python opencv键盘监听
  3. 关于完美拖拽的问题三
  4. decimal转为string sql_SQL注入详解|OWASP Top 10安全风险实践(二)
  5. c++输出重定向 linux,C++ stderr/stdout 重定向到文件
  6. spring boot admin 2.2 获取日志失败_SB实战20-Spring Boot的日志和报告
  7. C语言运算符的优先级及结合性
  8. 【推荐】程序员/设计师能用上的 75 份速查表
  9. python列表去括号_python的常用序列
  10. HTML仿网易云音乐首页静态,用 Vue.js 模仿一个简单的网易云音乐
  11. windows7安装cuda10.2
  12. 最大流最小割定理 (定理,割集)
  13. 安装配置MySQL5.7详细教程
  14. 微信小程序之WeUI组件库的使用
  15. To King Cover
  16. android 蓝牙自动断开,Android蓝牙:连接()/断开()
  17. 数据库原理:了解范式(1NF、2NF、3NF、BCNF),做例题快速弄懂
  18. 达梦数据库之备份还原
  19. linux多线程编程和linux 2.6下的nptl,Linux多线程编程和Linux 2.6下的NPTL
  20. Excel Sheet Column

热门文章

  1. python正则表达式匹配数字或者逗号_将数字与正则表达式相匹配-只有数字和逗号...
  2. Python学习心得
  3. Spark Shuffle之Tungsten-Sort
  4. 【黑帽SEO系列】网页劫持
  5. C语言中钩子函数使用讲解
  6. 机器学习——马氏距离
  7. Android蜗牛睡眠技术文档,蜗牛睡眠app 问题提问集中贴 常见问题汇总
  8. android指南针校准 代码_Android:指南针的制作
  9. uniapp点击打开外部应用跳转链接,指定App打开应用市场
  10. Android架构师绩效考核表,半年绩效考核总结(7.9)