线程池大家都## 标题很熟悉,无论是平时的业务开发还是框架中间件都会用到,大部分都是基于JDK线程池ThreadPoolExecutor做的封装,

都会牵涉到这几个核心参数的设置:核心线程数,等待(任务)队列,最大线程数,拒绝策略等。

但如果线程池设置不当就会引起一系列问题, 下面就说下我最近碰到的问题。

案件还原

比如你有一个项目中有个接口部分功能使用了线程池,这个功能会去调用多个第三方接口,都有一定的耗时,为了不影响主流程的性能,不增加整体响应时间,所以放在线程池里和主线程并行执行,等线程池里的任务执行完通过future.get的方式获取线程池里的线程执行结果,然后合并到主流程的结果里返回,大致流程如下:


线程池参数为:

  • coresize:50
  • max:200
  • queuesize:1
  • keepalivetime:60s
  • 拒绝策略为reject

假设每次请求提交5个task到线程池,平均每个task是耗时50ms

没过一会就收到了线程池满了走了拒绝策略的报错

结合你对线程池的了解,先思考下为什么

线程池的工作流程如下:

根据这个我们来列一个时间线

  1. 项目刚启动 第1次请求(每次5个task提交到线程池),创建5个核心线程
  2. 第2次请求 继续创建5个(共10个核心线程了)
  3. 直到第10次 核心线程数会达满50个
  4. 核心线程处理完之后核心线程会干嘛呢

根据 jdk1.8的线程池的源码:
线程池的线程处理处理了交给它的task之后,它会去getTask()

源码如下:

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}
//加入Java开发交流君样:756584822一起吹水聊天int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//注意这段Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

请注意上面代码中的bool类型的timed的赋值逻辑,

由于allowCoreThreadTimeOut默认为false,也就是说:

只要创建的线程数量超过了核心线程数,那么干完手上活后的线程(不管是核心线程,还是超过队列后新开的线程)就会走进

//线程状态为 timedwaiting
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)

由于我们上面步骤里面还没有超过coresize所以会走进

//线程状态为 waiting
workQueue.take()

所以答案是:上面步骤干活的核心线程处理完之后核心线程会进入waiting状态,
只要队列一有活就会被唤醒去干活。

  1. 到第11次的时候
    好家伙,到这步骤的时候 ,核心线程数已满,那么就往队列里面塞,但是设置的queuesize=1,
    每次有5个task,那就是说往队列里面塞1个,剩下4个(别较真我懂你意思)要创建新的max线程了。

结果:

核心线程数:50
队列:1
max线程:4个
因为50个核心线程在waiting中,所以队列只要一add,就会立马被消费,假设消费的这个核心线程名字是小A。

这里要细品一下:

这里已经总线程数大于核心线程数了,那么getTask()里面

// timed=trueboolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

那么小A干完活就会走进

//线程状态为 timedwaiting
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)

此处核心线程小A就会变成timedwaiting的状态(keepalive设置的是60s)

  1. 到第12次的时候
    继续往队列塞1个,创建4个max线程,max线程已经有8个了

这里 又会有一个新的核心线程小B ,会变成timedwaiting状态了

max线程们干完手上的活后,也会去调用getTask() 也会进入timedwaiting状态

因为queuesize=1,狼多肉少

  1. 继续下去,那么最终会变成
    max满了,线程们都在timedwaiting(keepalive设置的是60s)

新的提交就会走拒绝策略了

问题总结

其实核心与非核心对于线程池来说都是一样的,只要一旦线程数超过了核心线程数,那么线程就会走进timewaiting

把queuesize调大就好了?
这里又有一个新的注意点:
上面举例的是I/O密集型业务,queuesize不是越大越好的,
因为:

线程池新创建的线程会优先处理新请求进来的任务,而不是去处理队列里的任务,队列里的任务只能等核心线程数忙完了才能被执行,这样可能造成队列里的任务长时间等待,导致队列积压,尤其是I/O密集场景

慎用CallRunnerPolicy这个拒绝策略
一定得理解这个策略会带来什么影响,

先看下这个拒绝策略的源码


如果你提交线程池的任务即时失败也没有关系的话,用这个拒绝策略是致命的,
因为一旦超过线程池的负载后开始吞噬tomcat线程。

用future.get的方式慎用DiscardPolicy这个拒绝策略

如果需要得到线程池里的线程执行结果,使用future的方式,拒绝策略不建议使用DiscardPolicy,这种丢弃策略虽然不执行子线程的任务,

但是还是会返回future对象(其实在这种情况下我们已经不需要线程池返回的结果了),然后后续代码即使判断了future!=null也没用,

这样的话还是会走到future.get()方法,如果get方法没有设置超时时间会导致一直阻塞下去

类似下面的伪代码:

// 如果线程池已满,新的请求会被直接执行拒绝策略,此时如果拒绝策略设置的是DiscardPolicy丢弃任务,
// 则还是会返回future对象, 这样的话后续流程还是可能会走到get获取结果的逻辑
Future<String> future = executor.submit(() -> {// 业务逻辑,比如调用第三方接口等操作return result;
});// 主流程调用逻辑
if(future != null) // 如果拒绝策略是DiscardPolicy还是会走到下面代码future.get(超时时间); // 调用方阻塞等待结果返回,直到超时

推荐解决方案

  1. 用动态线程池,可以动态修改coresize,maxsize,queuesize,keepalivetime
    对线程池的核心指标进行埋点监控,可以通过继承 ThreadPoolExecutor 然后Override掉beforeExecute,afterExecute,shutdown,shutdownNow方法,进行埋点记录到es
    可以埋点的数据有:
    包括线程池运行状态、核心线程数、最大线程数、任务等待数、已完成任务数、线程池异常关闭等信息

    基于以上数据,我们可以实时监控和排查定位问题

参考代码:

/*** 自定义线程池<p>* 1.监控线程池状态及异常关闭等情况<p>* 2.监控线程池运行时的各项指标, 比如:任务执行时间、任务等待数、已完成任务数、任务异常信息、核心线程数、最大线程数等<p>* author: maoyingxu*/
public class ThreadPoolExt extends ThreadPoolExecutor{private TimeUnit timeUnit;public ThreadPoolExt(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);this.timeUnit = unit;} //加入Java开发交流君样:756584822一起吹水聊天@Overrideprotected void beforeExecute(Thread t, Runnable r) {monitor("ThreadPool monitor data:"); // 监控线程池运行时的各项指标}@Overrideprotected void afterExecute(Runnable r, Throwable ex) {// 记录线程池执行任务的时间ELKLogUtils.addAppendedValue(StoredLogTag.RUNNING_DETAIL, MessageFormat.format("ThreadPool task executeTime:{0}", executeTime));if (ex != null) { // 监控线程池中的线程执行是否异常LogUtils.warn("unknown exception caught in ThreadPool afterExecute:", ex);}}@Overridepublic void shutdown() {monitor("ThreadPool will be shutdown:"); // 线程池将要关闭事件,此方法会等待线程池中正在执行的任务和队列中等待的任务执行完毕再关闭super.shutdown();}@Overridepublic List<Runnable> shutdownNow() {monitor("ThreadPool going to immediately be shutdown:"); // 线程池立即关闭事件,此方法会立即关闭线程池,但是会返回队列中等待的任务// 记录被丢弃的任务, 目前只记录日志, 后续可根据业务场景做进一步处理List<Runnable> dropTasks = null;try {dropTasks = super.shutdownNow();ELKLogUtils.addAppendedValue(StoredLogTag.RUNNING_DETAIL, MessageFormat.format("{0}ThreadPool discard task count:{1}{2}",System.lineSeparator(), dropTasks!=null ? dropTasks.size() : 0, System.lineSeparator()));} catch (Exception e) {LogUtils.addClogException("ThreadPool shutdownNow error", e);}//加入Java开发交流君样:756584822一起吹水聊天return dropTasks;}/*** 监控线程池运行时的各项指标, 比如:任务等待数、任务异常信息、已完成任务数、核心线程数、最大线程数等* @param title*/private void monitor(String title){try {// 线程池监控信息记录, 这里需要注意写ES的时机,尤其是多个子线程的日志合并到主流程的记录方式String threadPoolMonitor = MessageFormat.format("{0}{1}core pool size:{2}, current pool size:{3}, queue wait size:{4}, active count:{5}, completed task count:{6}, " +"task count:{7}, largest pool size:{8}, max pool size:{9}, keep alive time:{10}, is shutdown:{11}, is terminated:{12}, " +"thread name:{13}{14}",System.lineSeparator(), title, this.getCorePoolSize(), this.getPoolSize(),this.getQueue().size(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(), this.getLargestPoolSize(),this.getMaximumPoolSize(), this.getKeepAliveTime(timeUnit != null ? timeUnit : TimeUnit.SECONDS), this.isShutdown(),this.isTerminated(), Thread.currentThread().getName(), System.lineSeparator());ELKLogUtils.addAppendedValue(StoredLogTag.RUNNING_DETAIL, threadPoolMonitor);LogUtils.info(title, threadPoolMonitor);ELKLogUtils.addFieldValue(APPIndexedLogTag.THREAD_POOL_USE_RATE, useRate); // ES埋点线程池使用率, useRate = (getActiveCount()/getMaximumPoolSize())*100Cat.logEvent(key, String.valueOf(useRate)); // 报警设置} catch (Exception e) {LogUtils.addClogException("ThreadPool monitor error", e);}}}
  1. 重写线程池拒绝策略, 拒绝策略主要参考了 Dubbo的线程池拒绝策略
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {// 省略部分代码@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {String msg = String.format("Thread pool is EXHAUSTED!" +" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: "+ "%d)," +" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),e.getLargestPoolSize(),e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),url.getProtocol(), url.getIp(), url.getPort());logger.warn(msg); // 记录最大负载情况下线程池的核心线程数,活跃数,最大线程数等参数dumpJStack(); // 记录线程堆栈信息包括锁争用信息throw new RejectedExecutionException(msg);}private void dumpJStack() {long now = System.currentTimeMillis();//dump every 10 minutes 每隔10分钟记录一次if (now - lastPrintTime < TEN_MINUTES_MILLS) {return;}//加入Java开发交流君样:756584822一起吹水聊天if (!guard.tryAcquire()) { // 加锁访问return;}ExecutorService pool = Executors.newSingleThreadExecutor(); // 这里单独开启一个新的线程去执行(阿里的Java开发规范不允许直接调用Executors.newSingleThreadExecutor, 估计dubbo那时候还没出开发规范...)pool.execute(() -> {String dumpPath = url.getParameter(DUMP_DIRECTORY, System.getProperty("user.home"));SimpleDateFormat sdf;String os = System.getProperty(OS_NAME_KEY).toLowerCase();// window system don't support ":" in file nameif (os.contains(OS_WIN_PREFIX)) {sdf = new SimpleDateFormat(WIN_DATETIME_FORMAT);} else {sdf = new SimpleDateFormat(DEFAULT_DATETIME_FORMAT);}String dateStr = sdf.format(new Date());//try-with-resourcestry (FileOutputStream jStackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {JVMUtil.jstack(jStackStream);} catch (Throwable t) {logger.error("dump jStack error", t);} finally {guard.release();}lastPrintTime = System.currentTimeMillis();});//must shutdown thread pool ,if not will lead to OOMpool.shutdown();}}

最后,祝大家早日学有所成,拿到满意offer

面试官问:你做过什么Java线程池实践,我写了一篇博客给他看~相关推荐

  1. 面试官问了我几道Java基础没答上来

    面试官问了我几道Java基础没答上来 文章目录 面试官问了我几道Java基础没答上来 1.面向对象的三大特性,分别解释下? 2.说到多态,再来说下方法重载和重写的区别? 3.Java是面向对象的语言, ...

  2. 面试官:使用无界队列的线程池会导致内存飙升吗?

    Executors创建线程池方式有如下几种: Executors.newFixedThreadPool(10);//LinkedBlockingQueue 无限加入队列 Executors.newSc ...

  3. 线程池拒绝策略 开发中常用什么策略_面试官:说说你知道多少种线程池拒绝策略...

    往期文章 为什么阿里Java规约要求谨慎使用SimpleDateFormathttps://www.toutiao.com/i6696127929048367629/ 为什么我强烈推荐你用枚举来实现单 ...

  4. 面试官:说说你知道多少种线程池拒绝策略

    前言 线程池,相信很多人都有用过,没用过相信的也有学习过.但是,线程池的拒绝策略,相信知道的人会少许多. 四种线程池拒绝策略 当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPool ...

  5. 面试官问你HashMap底层你用线程安全吊打他

    面试中,HashMap可以说是必问的,既然这样,我们应该怎么准备怎么回答呢,看看这篇文章,估计你会懂点东西. 先看看这两张图,是其内部的存储结构 说起HashMap,我们可以先从底层实现说起,Hash ...

  6. 【高并发】面试官问我:为啥局部变量是线程安全的?

    写在前面 相信很多小伙伴都知道局部变量是线程安全的,那你知道为什么局部变量是线程安全的吗? 前言 多个线程同时访问共享变量时,会导致并发问题.那么,如果将变量放在方法内部,是不是还会存在并发问题呢?如 ...

  7. ‍面试官问:Mybatis和Mybatis-Plus执行插入语句后可以返回主键ID吗? ‍我:看我回答...

    一.Mybatis执行插入语句后可以返回主键ID吗? 在想写什么内容的时候,正好看到一个基础面试题上有这个问题,就把它记录下来了.

  8. go byte转uint_面试官问我go反射,我怀疑他让我写ORM框架

    该文章始发于公众号[迈莫coding] 地址:https://mp.weixin.qq.com/s/lgZykTL8ls6aG0OMNSbZMw 目录 概念 ValueOf使用格式 反射获取值对象(V ...

  9. 当面试官问 promise 的时候,他们希望听到什么(二)

    目录 前言 一.前提知识 1.JS 单线程机制 2.JS 任务队列与事件循环 3.Promise 回顾 二.题目实战 1.开头提到的题目 2.稍有难度 3.挑战升级 前言 上一篇文章,当面试官问 pr ...

最新文章

  1. 深度神经网络混合精度训练
  2. Ruby Profiler详解之stackprof
  3. 35.使用拦截器实现权限验证
  4. 鸿蒙 电视 安卓,华为鸿蒙2.0来了!打通手机、电视、PC全平台,Mate 40 整装齐发...
  5. JDBC学习笔记 day1
  6. 2011年5月18日早会资料
  7. 【Python】Python中的日志级别
  8. (转)Moblin V2活动映像安装详解
  9. 电子元器件封装知识大全(内含AD封装库下载资料)
  10. LoRa无线网络技术概述
  11. 解决ajaxSubmit无法传递自动回填和下拉框的数据
  12. AI中台——智能聊天机器人平台的架构与应用
  13. SpringBoot 项目(若依脚手架)2
  14. wav转mp3的最简单方法
  15. matlab练习程序(RGB2CMY、CMY2RGB)
  16. codeforces884D(大概是构造,优先队列)
  17. MySQL数据导入1026报错问题
  18. # 学号12 2016-2017-2 《程序设计与数据结构》第9周学习总结
  19. Visual Studio 2010安装、配置及使用
  20. CodeForces-B

热门文章

  1. 【招聘(上海)】东方财富证券招聘.net开发
  2. 在Docker中运行asp.net core 跨平台应用程序
  3. .NET Core运行时和基础类库性能提升
  4. TechEmpower 13轮测试中的ASP.NET Core性能测试
  5. 我为什么用docker-compose来打包开发环境
  6. Vue保持用户登录及权限控制
  7. js请求php文件 302,采集某个 url, js 请求 200,浏览器访问 302
  8. 数据分块加载——BigPipe 技术【类似facebook】
  9. android 特效绘图,Android绘图机制与处理技巧——Android图像处理之图形特效处理...
  10. Apache错误日志提示AH02004: SSL Proxy: Peer certificate is expired