ScheduledThreadPool 源码解析——定时类线程池是如何工作的
文章目录
- 引言
- 一、ScheduledThreadPool 使用示例
- 1. 延时类的定时任务 `schedule`
- 2. 延时类,固定周期执行任务 `scheduleAtFixedRate`
- 3. 延迟开始,固定间隔周期性任务 `scheduleWithFixedDelay`
- 二、初始化
- 三、任务执行
- 1. 源码对照
- 2. 线程的创建与启动
- 3. 周期性执行
- 4. 延时的控制
- 5. 如何修改任务的 time 属性
- 四、总结
引言
之前的文章《线程池源码分析》,详细介绍了线程池的原理。
这篇在此基础上聊聊 定时类的线程池 ScheduledThreadPool
,
假定线程池那篇你已经看过了哈!
项目中,经常会用到定时任务,
比如基于注解的 @Scheduled
,配合 cron
表达式。
那源码级别,若要使用定时任务,不大可能再集成 spring 框架来实现
比如 Eureka 的心跳机制,典型的定时任务,
它的实现就是应用的 定时类线程池,
也就是今天所说的 ScheduledThreadPool
。
今天详细剖析下它的源码及使用。
一、ScheduledThreadPool 使用示例
1. 延时类的定时任务 schedule
public static void main(String[] args) throws Exception {ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);long start = System.currentTimeMillis();log.info("延迟的 定时任务执,开始时间:{}", start);scheduledExecutorService.schedule(new Runnable() {@Overridepublic void run() {long running = System.currentTimeMillis();log.info(" 执行任务,时间:{}", running);log.info("相隔毫秒数:{}", running - start);}},3, TimeUnit.SECONDS);scheduledExecutorService.shutdown();}
这个示例,任务提交后,不是立马执行。按指定的延迟时间,过了这个时间再执行。
比如示例中,任务提交后,延迟3秒再执行。
2. 延时类,固定周期执行任务 scheduleAtFixedRate
public static void main(String[] args) throws Exception {ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);long start = System.currentTimeMillis();log.info("延迟类固定周期任务,开始时间:{}", start);scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@SneakyThrows@Overridepublic void run() {long running = System.currentTimeMillis();long dif = running - start;log.info("与任务提交时间,相隔毫秒数:{}", dif);if(dif > 20*1000){scheduledExecutorService.shutdown();log.info("延迟类固定周期任务 end");}Thread.sleep(1000);}},5, 2, TimeUnit.SECONDS);}
这个示例是,任务提交5秒之后,才开始执行。之后每隔 2 秒,周期性执行。(每次任务执行时间耗时1秒)
即相邻两个任务,任务开始的时间,相隔 2 秒 (不考虑溢出)
执行的效果如上图。误差及其小。每两秒执行一次。
3. 延迟开始,固定间隔周期性任务 scheduleWithFixedDelay
public static void main(String[] args) throws Exception {ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);long start = System.currentTimeMillis();log.info("延迟类固定周期任务,开始时间:{}", start);scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {@SneakyThrows@Overridepublic void run() {long running = System.currentTimeMillis();long dif = running - start;log.info("begin 与任务提交时间,相隔毫秒数:{}", dif);if(dif > 20*1000){scheduledExecutorService.shutdown();log.info("延迟类固定周期任务 end");}Thread.sleep(1000);log.info("end 与任务提交时间,相隔毫秒数:{}", System.currentTimeMillis() - start);}},5, 2, TimeUnit.SECONDS);}
这个示例是,任务提交5秒之后,才开始执行。每次任务执行完成后,等 2 秒后,再次执行。
即上个任务结束,到下次任务执行开始,之间的时间间隔是 2 秒。
固定间隔的,执行效果如图
这个和前一个周期性任务,有一点差异,我画个图,差别很容易理解。
二、初始化
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);}public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}
这里的 super
指的是 ThreadPoolExecutor
,也就是《线程池源码分析》,文章中提到的构造方法。
也就是说,
corePoolSize
线程池的核心线程数量,是调用者传入的maximumPoolSize
最大线程数量,是Integer的最大值keepAliveTime
线程阻塞时,存活时间是 0workQueue
阻塞的队列是DelayedWorkQueue
这里需要说说,这个DelayedWorkQueue
,它是 ScheduledThreadPoolExecutor
的内部类。
《DelayQueue源码解析》,这篇文章里讲过延时队列。
只要把这篇文章看明白了,就知道 DelayedWorkQueue
是做什么了,实现的逻辑几乎一模一样。
三、任务执行
1. 源码对照
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit) {if (callable == null || unit == null)throw new NullPointerException();RunnableScheduledFuture<V> t = decorateTask(callable,new ScheduledFutureTask<V>(callable,triggerTime(delay, unit)));delayedExecute(t);return t;}public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (period <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;}public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (delay <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;}
仔细看下,这三个方法的实现,基本上是一样的,代码都差不多。
大概就是,将任务包装成 ScheduledFutureTask
,然后执行 delayedExecute(t)
。
private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown()) // 如果线程池已关闭,拒绝执行任务reject(task);else {super.getQueue().add(task); // 将任务放入阻塞队列if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false); // 符合某些条件,任务直接取消elseensurePrestart(); // 确保任务执行}}void ensurePrestart() { // 此方法,可以保证一定有线程来执行任务int wc = workerCountOf(ctl.get());if (wc < corePoolSize)addWorker(null, true);else if (wc == 0)addWorker(null, false);}
delayedExecute
这个方法看着也很简单,将任务入队,然后调用 ensurePrestart()
,
ensurePrestart()
这个方法更简单,就是调用 addWorker(null, true)
方法。
在《线程池源码分析》,这篇文章中,
详细分析过 addWorker(null, true)
方法,本文不再赘述。
看到这里,有没有这样的感觉:
这周期性的任务,怎么就执行了?代码中也没体现出来呀!
网上的文章,也没有讲怎么执行的,为啥?有点讲不清楚。
.
2. 线程的创建与启动
执行scheduleWithFixedDelay
方法时, 调用 delayedExecute
。
delayedExecute
方法将任务放入阻塞队列,第一次执行时,会调用 addWorker
方法。
ThreadPoolExecutor
这在类中的 addWorker
方法会创建一个线程,并会 调用 start(),即启动线程。
这部分内容在《线程池源码分析》中,详细说过。
.
3. 周期性执行
线程启动后,JVM 会在合适的时候,调用 ThreadPoolExecutor
中的 run()
方法,
其实现是 runWorker()
方法,简化代码如下。
while (task != null || (task = getTask()) != null) {w.lock();....task.run();}
task.run()
时,会调用ScheduledThreadPoolExecutor
类中的 run()
方法
public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic)) // 线程池关闭时,会进入这个分支cancel(false);else if (!periodic)ScheduledFutureTask.super.run(); // 不是周期性任务,直接执行任务else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime(); // 重置下次执行的时间reExecutePeriodic(outerTask); // 将任务放进阻塞队列}}
ScheduledFutureTask.super.runAndReset()
这个方法会调用示例中,重写的 run()
方法。
setNextRunTime()
这个方法会修改 任务中的 time 属性。
reExecutePeriodic
这个方法,做了两件事,
void reExecutePeriodic(RunnableScheduledFuture<?> task) {if (canRunInCurrentRunState(true)) {super.getQueue().add(task); // 将任务重新放入阻塞队列if (!canRunInCurrentRunState(true) && remove(task))task.cancel(false); // 某些情况下,取消任务elseensurePrestart(); // 保证有一个线程在工作}}
一是把任务放入阻塞队列。二是,若没有工作线程,创建一个线程。
看到这儿,应该还是觉得很懵才对,我结合图来解释。
这四步我大概解释下,
从阻塞队列中取任务,是否会阻塞,与其属性 time 的值有关,这个等下再解释。
执行任务,这个不需要多解释,在本文开头的几个示例中,就是打印了日志。
修改任务的time属性,这个是控制任务下次什么时候执行。即下次什么时候能从队列中取出来。
将任务放入队列,这个不需要多解释。
runWorker
的简化代码,上面给了出来,是一个 while 循环,每次循环都会执行这四步,
以上就是周期性执行的基本原因。
.
4. 延时的控制
《DelayQueue源码解析》在这篇文章中,介绍了延时阻塞队列的工作原理。
如果这个不太懂,下文的理解会很吃力。
runWorker 方法从队列中取任务时,会判断 getDelay() 方法的返回值,
假设返回值是 N,若 N 大于 0,则阻塞。阻塞的时间为N,时间到了之后,
自我唤醒,取到任务,就开始执行任务。
public long getDelay(TimeUnit unit) {return unit.convert(time - now(), NANOSECONDS);}final long now() {return System.nanoTime();}
getDelay() 方法中,time - now(),其中 time 的原始值,是 now() + delay。
比如本文开头的示例,延迟 5 秒执行。time 的原始值,就是 now() + 5 秒(最终单位统一为纳秒)。
延迟执行就是这么控制的。
.
5. 如何修改任务的 time 属性
上面说过,runWorker 的四步,第二步是执行任务,第三步是修改 time 属性。
修改 time 属性就是下面这个方法。
private void setNextRunTime() {long p = period;if (p > 0)time += p;elsetime = triggerTime(-p);}
本文开头示例二, scheduleAtFixedRate
这个方法 period 是正数。 示例中是 2 秒。
那么setNextRunTime
方法中,会执行 time += p
这行,翻译一下就是:
time - now()
的值会在 2 秒后变为负数,也就是2秒后会任务可以被再次取出。
这个我表达能力有限,只能说到这个层次了。
scheduleWithFixedDelay
这个方法 period
是负数。 示例中是 2 秒,即 这= -2。
会执行 time = triggerTime(-p) 这个方法。
long triggerTime(long delay) {return now() +((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));}
也就是在任务结束时,time
会被设置为 now() + 2 秒(最终结果以纳秒来计算)。
翻译一下就是,任务结束了,再过2秒,任务会被再次取出执行。
也就是说,通过 time 发生的修改,就控制了任务什么时候能被执行。
四、总结
1、本文是线程池进阶的内容,要理解本文,需要提前做两个功课。
《线程池工作原理》、《DelayQueue工作原理》这两篇文章要先理解。
2、scheduleWithFixedDelay
和 scheduleWithFixedDelay
工作原理相同。即 rumWorker
方法重复执行以下四个步骤
- 阻塞队列中取任务
- 执行任务
- 修改任务的 time 属性
- 重新将任务放回阻塞队列
ScheduledThreadPool 源码解析——定时类线程池是如何工作的相关推荐
- Java Executor源码解析(7)—Executors线程池工厂以及四大内置线程池
详细介绍了Executors线程池工具类的使用,以及四大内置线程池. 系列文章: Java Executor源码解析(1)-Executor执行框架的概述 Java Executor源码解析(2)-T ...
- Java Executor源码解析(3)—ThreadPoolExecutor线程池execute核心方法源码【一万字】
基于JDK1.8详细介绍了ThreadPoolExecutor线程池的execute方法源码! 上一篇文章中,我们介绍了:Java Executor源码解析(2)-ThreadPoolExecutor ...
- 【EventBus】EventBus 源码解析 ( 事件发送 | 线程池中执行订阅方法 )
文章目录 一.EventBus 中主线程支持类 二.EventBus 中 AsyncPoster 分析 三.AsyncPoster 线程池 Runnable 任务类 一.EventBus 中主线程支持 ...
- Executors源码解读——创建ExecutorService线程池
Executors源码解读--创建ExecutorService线程池 〇.[源码版本] jdk 1.8 一.线程池概述 二.线程池创建 三.Executors源码解读 newFixedThreadP ...
- JDK源码解析 InputStream类就使用了模板方法模式
JDK源码解析 InputStream类就使用了模板方法模式. 在InputStream类中定义了多个 read() 方法,如下: public abstract class InputStream ...
- JDK源码解析 Integer类使用了享元模式
JDK源码解析 Integer类使用了享元模式. 我们先看下面的例子: public class Demo {public static void main(String[] args) {Integ ...
- Java线程池状态判断源码_深入浅出Java线程池:源码篇
前言 在上一篇文章深入浅出Java线程池:理论篇中,已经介绍了什么是线程池以及基本的使用.(本来写作的思路是使用篇,但经网友建议后,感觉改为理论篇会更加合适).本文则深入线程池的源码,主要是介绍Thr ...
- JDK源码解析--Object类
作为一名java开发,肯定会知道object类,object类是所有类的基类,当一个类没有直接继承任何类时,默认继承object类,所以也被称之为"上帝类". 目录 一.继承Obj ...
- HashMap 1.8 源码解析以及非线程安全分析
2019独角兽企业重金招聘Python工程师标准>>> 1.首先看下HashMap的put方法. final V putVal(int hash, K key, V value, b ...
最新文章
- SpringBoot 编写ajax可以访问的接口
- Tesseract——OCR图像识别 入门篇
- 在Ubuntu中实验环境配置《操作系统原理与实践-李治军》
- 虚拟局域网Vlan与单臂路由、三层交换、链路聚合技术
- java 压缩 乱码_如何解决java压缩文件乱码问题
- [转载] 1.1Java使用JDBC原生方式连接MySql数据库
- MacOS12.3M1出现程序killed的一些想法
- Spring框架----Spring的bean之三种创建bean对象的方式
- css未生效,css不生效是什么原因
- Docker Image执行流程
- 高性能ORM数据访问组件Deft,比dapper快20%以上
- 微信小程序时间轴demo_微信小程序时间轴实现方法示例
- 尤雨溪: 2022 Web 前端生态趋势
- JQuery源码分析 - 闭包机制在jQuery中的使用及冲突解决
- node aws 内存溢出_如何使用Node和AWS S3设置简单的图像上传
- Python3.7.4入门-0/1To Begin/数据类型与结构
- mq5 EA模板及双均线交叉策略EAdemo
- 留用户、补内容,在线音乐暗战不停
- pytorch中torch.isnan()和torch.isfinite()
- java 阴历阳历转换