承上启下:上一篇文章小豹子讲了线程池的实例化过程,粗略介绍了线程池的状态转换;这篇文章主要讲了我运行线程池时遇到的小问题,以及 execute 方法的源码理解。

4 并不算疑难的 Bug

按照我们的规划,下一步就应该提交任务,探究线程池执行任务时的内部行为,但首先,我要提交一个任务嘛。于是,接着上一篇文章的代码,我提交了一个任务:

@Test
public void submitTest() {// 创建线程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread();}}, new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("拒绝服务");}});// 提交任务,该任务为睡眠 1 秒后打印 HellothreadPoolExecutor.submit(new Callable<String>() {@Overridepublic String call() throws InterruptedException {Thread.sleep(1000L);System.out.println("Hello");return null;}});
}
复制代码

而我并没有看到任何输出,程序也并没有睡眠一秒,而是马上结束了。哦对,我想起来,我们创建的线程默认是守护线程,当所有用户线程结束之后,程序就会结束了,并不会理会是否还有守护线程在运行。那么我们用一个简单易行的办法来解决这个问题 —— 不让用户线程结束,让它多睡一会:

@Test
public void submitTest() throws InterruptedException {// 创建线程池ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread();}}, new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("拒绝服务");}});// 提交任务,该任务为睡眠 1 秒后打印 HellothreadPoolExecutor.submit(new Callable<String>() {@Overridepublic String call() throws InterruptedException {Thread.sleep(1000L);System.out.println("Hello");return null;}});// 使主线程休眠 5 秒,防止守护线程意外退出Thread.sleep(5000L);
}
复制代码

然而,程序等待 5 秒之后,依旧没有输出。我的第一个反应是,我对于线程池的用法不对。是不是还需要调用某个方法来“激活”或者“启动”线程池?而无论在文档中,还是各博客的例子中,我都没有找到类似的方法。我们仔细思考一下这个 Bug,产生这样问题的可能原因有三:

  1. ThreadPoolExecutor 内部代码有问题
  2. 我对 ThreadPoolExecutor 的使用方法不对
  3. 我设计的 ThreadFactoryRejectedExecutionHandler 有问题

原因 1,可能性太小,几乎没有。那么原因2、3,我们现在没法排除,于是我尝试构建一个最小可重现错误,将 ThreadPoolExecutor 剥离出来,看 Bug 是否重现:

最小可重现(minimal reproducible)这个思想是我在翻译《使用 Rust 开发一个简单的 Web 应用,第 4 部分 —— CLI 选项解析》时,作者用到的思想。就是在我们无法定位 Bug 时,剥离出当前代码中我们认为无关的部分,剥离后观察 Bug 是否重现,一步步缩小 Bug 的范围。通俗的说,就是排除法。

private class MyThreadFactory implements ThreadFactory{@Overridepublic Thread newThread(Runnable r) {return new Thread();}
}@Test
public void reproducibleTest() throws InterruptedException {new MyThreadFactory().newThread(new Runnable() {@Overridepublic void run() {try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("Hello");}}).start();Thread.sleep(5000L);
}
复制代码

还是没有任何输出,不过这是一个好消息,这意味着我们定位了问题所在:现在问题只可能出现在 MyThreadFactory 中,短短 6 行代码会有什么问题呢?哎呦(拍大腿),我没有把 Runnable r 传给 new Thread() 啊,我一直在执行一个空线程啊,怎么可能有任何输出!于是:return new Thread(r); 这样一改就好了。

5 重构

上面的问题看似简单,但能出现这么低级的错误,值得我思考。我因为产生该错误的原因有二:

  1. 我不了解 ThreadPoolExecutor 的原理,从语法上看 ThreadFactory 的实现类只需要传出一个 Thread 实例就行了,却不知 Runnable r 不可或缺。
  2. 测试代码结构凌乱不堪。即便是测试代码,也不应该写成面条,自己看尚不能清楚明了,何谈读者?

于是,我决定对测试代码进行重构。这次重构,一要使线程工厂产生非守护线程,防止因为主进程的退出导致线程池中线程全部意外退出;二要对每个操作打日志,我们要能直观的观察到线程池在做什么,值得一提的是,对于阻塞队列的日志操作,我使用了动态代理的方式对每一个方法打日志,不熟悉动态代理的童鞋可以戳我之前写的小豹子带你看源码:JDK 动态代理。

// import...public class ThreadPoolExecutorTest {/*** 记录启动时间*/private final static long START_TIME = System.currentTimeMillis();/*** 自定义线程工厂,产生非守护线程,并打印日志*/private class MyThreadFactory implements ThreadFactory {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(false);debug("创建线程 - %s", thread.getName());return thread;}}/*** 自定义拒绝服务异常处理器,打印拒绝服务信息*/private class MyRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {debug("拒绝请求,Runnable:%s,ThreadPoolExecutor:%s", r, executor);}}/*** 自定义任务,休眠 1 秒后打印当前线程名,并返回线程名*/private class MyTask implements Callable<String> {@Overridepublic String call() throws InterruptedException {Thread.sleep(1000L);String threadName = Thread.currentThread().getName();debug("MyTask - %s", threadName);return threadName;}}/*** 对 BlockingQueue 的动态代理,实现对 BlockingQueue 的所有方法调用打 Log*/private class PrintInvocationHandler implements InvocationHandler {private final BlockingQueue<?> blockingQueue;private PrintInvocationHandler(BlockingQueue<?> blockingQueue) {this.blockingQueue = blockingQueue;}@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {debug("BlockingQueue - %s,参数为:%s", method.getName(), Arrays.toString(args));Object result = method.invoke(blockingQueue, args);debug("BlockingQueue - %s 执行完毕,返回值为:%s", method.getName(), String.valueOf(result));return result;}}/*** 产生 BlockingQueue 代理类* @param blockingQueue 原 BlockingQueue* @param <E> 任意类型* @return 动态代理 BlockingQueue,执行任何方法时会打 Log*/@SuppressWarnings("unchecked")private <E> BlockingQueue<E> debugQueue(BlockingQueue<E> blockingQueue) {return (BlockingQueue<E>) Proxy.newProxyInstance(this.getClass().getClassLoader(),new Class<?>[]{BlockingQueue.class},new PrintInvocationHandler(blockingQueue));}/*** 实例化一个 核心池为 3,最大池为 5,存活时间为 20s,利用上述阻塞队列、线程工厂、拒绝服务处理器的线程池实例* @return 返回 ThreadPoolExecutor 实例*/private ThreadPoolExecutor newTestPoolInstance() {return new ThreadPoolExecutor(3, 5, 20,TimeUnit.SECONDS, debugQueue(new LinkedBlockingQueue<>()),new MyThreadFactory(), new MyRejectedExecutionHandler());}/*** 向控制台打印日志,自动输出时间,线程等信息* @param info* @param arg*/private void debug(String info, Object... arg) {long time = System.currentTimeMillis() - START_TIME;System.out.println(String.format(((double) time / 1000) + "-" + Thread.currentThread().getName() + "-" + info, arg));}/*** 测试实例化操作*/private void newInstanceTest() {newTestPoolInstance();}/*** 测试提交操作,提交 10 次任务*/private void submitTest() {ThreadPoolExecutor threadPool = newTestPoolInstance();for (int i = 0; i < 10; i++) {threadPool.submit(new MyTask());}}public static void main(String[] args) {ThreadPoolExecutorTest test = new ThreadPoolExecutorTest();test.submitTest();}
}
复制代码

编译,运行 =>

0.047-main-创建线程 - Thread-0
0.064-main-创建线程 - Thread-1
0.064-main-创建线程 - Thread-2
0.064-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@4d7e1886]
0.064-main-BlockingQueue - offer 执行完毕,返回值为:true
0.064-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@3cd1a2f1]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
0.065-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@2f0e140b]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
0.065-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@7440e464]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
0.065-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@49476842]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
0.065-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@78308db1]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
0.065-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@27c170f0]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
1.065-Thread-1-MyTask - Thread-1
1.065-Thread-0-MyTask - Thread-0
1.065-Thread-2-MyTask - Thread-2
1.065-Thread-1-BlockingQueue - take,参数为:null
1.065-Thread-0-BlockingQueue - take,参数为:null
1.065-Thread-2-BlockingQueue - take,参数为:null
1.065-Thread-0-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@3cd1a2f1
1.065-Thread-2-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@2f0e140b
1.065-Thread-1-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@4d7e1886
2.065-Thread-1-MyTask - Thread-1
2.065-Thread-2-MyTask - Thread-2
2.065-Thread-0-MyTask - Thread-0
2.065-Thread-1-BlockingQueue - take,参数为:null
2.065-Thread-2-BlockingQueue - take,参数为:null
2.065-Thread-0-BlockingQueue - take,参数为:null
2.065-Thread-1-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@7440e464
2.065-Thread-2-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@49476842
2.065-Thread-0-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@78308db1
3.066-Thread-1-MyTask - Thread-1
3.066-Thread-2-MyTask - Thread-2
3.066-Thread-0-MyTask - Thread-0
3.066-Thread-2-BlockingQueue - take,参数为:null
3.066-Thread-1-BlockingQueue - take,参数为:null
3.066-Thread-0-BlockingQueue - take,参数为:null
3.066-Thread-2-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@27c170f0
4.067-Thread-2-MyTask - Thread-2
4.067-Thread-2-BlockingQueue - take,参数为:null
复制代码

日志的格式是:时间(秒)-线程名-信息

从日志输出中,我们可以获知:

  • 当队列为空,线程数少于核心线程数时,提交任务会触发创建线程,并立即执行任务
  • 当核心线程均忙,再提交的请求会被存储至阻塞队列,等待线程空闲后执行队列中的任务
  • 除主线程外,始终只有三个工作线程
  • 当队列为空,工作线程还在运行的时候,工作线程会因为阻塞队列的 take 方法阻塞(这一点由日志后几行可以看出,只有调用日志,没有调用完成的日志)

由此,我产生一个疑问:为什么始终只有三个线程?我的设置不是“核心池为 3,最大池为 5”吗?为什么只有三个线程在工作呢?

6 submit 任务

终于开始看源码了,我们以 submit 为切入点,探寻我们提交任务时,线程池做了什么,submit 方法本身很简单,就是将传入参数封装为 RunnableFuture 实例,然后调用 execute 方法,以下给出 submit 多个重载方法其中之一:

public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;
}
复制代码

那么,我们继续看 execute 的代码:

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);
}
复制代码

我们首先解释一下 addWorker 方法,暂时我们只需要了解几件事情就可以理解 execute 代码了:

  • 该方法用于新建一个工作线程
  • 该方法线程安全
  • 该方法第一个参数是新线程要执行的第一个任务,第二个参数是是否新建核心线程
  • 该方法如果新建线程成功,则返回 true,否则返回 false

那么我们回过头来理解 execute 代码:

为了帮助理解,我根据代码逻辑画了一个流程图:

现在我明白了,只有等待队列插入失败(如达到容量上限等)情况下,才会创建非核心线程来处理任务,也就是说,我们使用的 LinkedBlockingQueue 队列来作为等待队列,那是看不到非核心线程被创建的现象的。

有心的读者可能注意到了,整个过程没有加锁啊,怎样保证并发安全呢?我们观察这段代码,其实没必要全部加锁,只需要保证 addWorkerremoveworkQueue.offer 三个方法的线程安全,该方法就没必要加锁。事实上,在 addWorker 中是有对线程池状态的 recheck 的,如果创建失败会返回 false。

系列文章

  • Java 线程池(一)缘起 & 计划
  • Java 线程池(二)实例化
  • Java 线程池(三)提交任务
  • 未完待续……

小豹子还是一个大三的学生,小豹子希望你能“批判性的”阅读本文,对本文内容中不正确、不妥当之处进行严厉的批评,小豹子感激不尽。

小豹子带你看源码:Java 线程池(三)提交任务相关推荐

  1. 小豹子带你看源码:Java 线程池(二)实例化

    承上启下:上一篇文章小豹子讲了我为什么想要研究线程池的代码,以及我计划要怎样阅读代码.这篇文章我主要阅读了线程池实例化相关的代码,并提出了自己的疑问. 3 千里之行,始于实例化 3.1 先创建一个线程 ...

  2. java 缘起_小豹子带你看源码:Java 线程池(一)缘起 计划

    1 缘起 怎么想起来看线程池的代码? 很简单,因为我不会用. 原先遇到用线程池一直是 Executors 直接构造一个出来.啊,newFixedThreadPool 就是创建定容线程池,线程数是固定的 ...

  3. 小豹子带你看源码:ArrayList

    世界上最牛的 Java 代码去哪找?当然是 JDK 咯-计划学习一下常见容器的源码. 我会把我觉得比较有意思或者玄学的地方更新到这里. 以下 JDK 源码及 Javadoc 均从 java versi ...

  4. cocos creator教育益智游戏《小火车运货》源码H5+安卓+IOS三端源码

    cocos creator2.4.2教育益智游戏<小火车运货>源码H5+安卓+IOS三端源码,开发脚本为typeScript方便扩展和阅读,支持cocos creator2.X版本,完整的 ...

  5. JUC源码分析-线程池篇(五):ForkJoinPool - 2

    通过上一篇(JUC源码分析-线程池篇(四):ForkJoinPool - 1)的讲解,相信同学们对 ForkJoinPool 已经有了一个大概的认识,本篇我们将通过分析源码的方式来深入了解 ForkJ ...

  6. Elasticsearch源码分析—线程池(十一) ——就是从队列里处理请求

    Elasticsearch源码分析-线程池(十一) 转自:https://www.felayman.com/articles/2017/11/10/1510291570687.html 线程池 每个节 ...

  7. 菜鸟带你看源码——看不懂你打我ArrayList源码分析(基于java 8)

    文章目录 看源码并不难 软件环境 成员变量: 构造方法 核心方法 get方法 remove方法 add方法 结束 看源码并不难 如何学好编程?如何写出优质的代码?如何快速提高自己的编程能力?等等一系列 ...

  8. 深读源码-java线程系列之自己手写一个线程池

    问题 (1)自己动手写一个线程池需要考虑哪些因素? (2)自己动手写的线程池如何测试? 简介 线程池是Java并发编程中经常使用到的技术,那么自己如何动手写一个线程池呢?本文将手把手带你写一个可用的线 ...

  9. java 线程池 源码_java线程池源码分析

    我们在关闭线程池的时候会使用shutdown()和shutdownNow(),那么问题来了: 这两个方法又什么区别呢? 他们背后的原理是什么呢? 线程池中线程超过了coresize后会怎么操作呢? 为 ...

最新文章

  1. 2018.03.03、android-照虎画猫搭建简易Rest服务器
  2. 线程工具类(根据电脑逻辑处理器个数控制同时运行的线程个数)
  3. 【小白学习PyTorch教程】八、使用图像数据增强手段,提升CIFAR-10 数据集精确度...
  4. Reading——The Non-Designer's Design Book
  5. IntelliJ IDEA 2017.01配置jdk和tomcat
  6. Learning Perl chapter 4 练习题
  7. Redis单机部署、添加开机自启、配置参数
  8. java 控制jsp_JSP学习之Java Web中的安全控制实例详解
  9. Sublime Text 3新建工程
  10. java map removeall_Java删除Map中元素
  11. 开热点给电脑消耗大吗_你试过爬楼梯减肥吗?热量消耗大,选对姿势很重要!...
  12. 2020年十大开源漏洞回顾
  13. 【转载】 MySQL之用户资源限制
  14. 基于Ntrip的实时多线程RTCM数据流接收及解码
  15. java poi excel 图表_Java用POI实现根据Excel表格模板生成新的Excel并实现数据输出
  16. mac idea jrebel 激活
  17. Sublime Text 3.0汉化教程
  18. linux iscsi 发起程序,怎么查看进程的发起程序,iscsi发起程序是什么
  19. select()函数
  20. win10照片查看器_Win10 下好用的免费无广告看图软件 XnView

热门文章

  1. 代数多重网格法简介(Algebraic Multigrid)
  2. 住在储藏室的小夫妻【zt】
  3. 世界5G大会,12位专家重磅发言!
  4. 20155314 2016-2017-2 《Java程序设计》第6周学习总结
  5. 2019年中国软件业务收入100强
  6. 类脑计算机能解决,全球神经元规模最大类脑计算机问世
  7. C++/C语言sizeof关键字详解
  8. 怎么禁用Windows 10 自动更新驱动程序
  9. 16~235和0~255
  10. 久其修改服务器地址,久其软件服务器地址