文章目录

  • 引言
  • 一、ScheduledThreadPool 使用示例
    • 1. 延时类的定时任务 `schedule`
    • 2. 延时类,固定周期执行任务 `scheduleAtFixedRate`
    • 3. 延迟开始,固定间隔周期性任务 `scheduleWithFixedDelay`
  • 二、初始化
  • 三、任务执行
    • 1. 源码对照
    • 2. 线程的创建与启动
    • 3. 周期性执行
    • 4. 延时的控制
    • 5. 如何修改任务的 time 属性
  • 四、总结

引言

之前的文章《线程池源码分析》,详细介绍了线程池的原理。

这篇在此基础上聊聊 定时类的线程池 ScheduledThreadPool

假定线程池那篇你已经看过了哈!

项目中,经常会用到定时任务,

比如基于注解的 @Scheduled,配合 cron 表达式。

那源码级别,若要使用定时任务,不大可能再集成 spring 框架来实现

比如 Eureka 的心跳机制,典型的定时任务,

它的实现就是应用的 定时类线程池,

也就是今天所说的 ScheduledThreadPool

今天详细剖析下它的源码及使用。


一、ScheduledThreadPool 使用示例

1. 延时类的定时任务 schedule

public static void main(String[] args) throws Exception {ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);long start = System.currentTimeMillis();log.info("延迟的 定时任务执,开始时间:{}", start);scheduledExecutorService.schedule(new Runnable() {@Overridepublic void run() {long running = System.currentTimeMillis();log.info(" 执行任务,时间:{}", running);log.info("相隔毫秒数:{}", running - start);}},3, TimeUnit.SECONDS);scheduledExecutorService.shutdown();}

这个示例,任务提交后,不是立马执行。按指定的延迟时间,过了这个时间再执行。

比如示例中,任务提交后,延迟3秒再执行。

2. 延时类,固定周期执行任务 scheduleAtFixedRate

public static void main(String[] args) throws Exception {ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);long start = System.currentTimeMillis();log.info("延迟类固定周期任务,开始时间:{}", start);scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@SneakyThrows@Overridepublic void run() {long running = System.currentTimeMillis();long dif = running - start;log.info("与任务提交时间,相隔毫秒数:{}", dif);if(dif > 20*1000){scheduledExecutorService.shutdown();log.info("延迟类固定周期任务 end");}Thread.sleep(1000);}},5, 2,  TimeUnit.SECONDS);}

这个示例是,任务提交5秒之后,才开始执行。之后每隔 2 秒,周期性执行。(每次任务执行时间耗时1秒)

即相邻两个任务,任务开始的时间,相隔 2 秒 (不考虑溢出)


执行的效果如上图。误差及其小。每两秒执行一次。

3. 延迟开始,固定间隔周期性任务 scheduleWithFixedDelay

public static void main(String[] args) throws Exception {ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);long start = System.currentTimeMillis();log.info("延迟类固定周期任务,开始时间:{}", start);scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {@SneakyThrows@Overridepublic void run() {long running = System.currentTimeMillis();long dif = running - start;log.info("begin 与任务提交时间,相隔毫秒数:{}", dif);if(dif > 20*1000){scheduledExecutorService.shutdown();log.info("延迟类固定周期任务 end");}Thread.sleep(1000);log.info("end 与任务提交时间,相隔毫秒数:{}", System.currentTimeMillis() - start);}},5, 2,  TimeUnit.SECONDS);}

这个示例是,任务提交5秒之后,才开始执行。每次任务执行完成后,等 2 秒后,再次执行

即上个任务结束,到下次任务执行开始,之间的时间间隔是 2 秒。

固定间隔的,执行效果如图

这个和前一个周期性任务,有一点差异,我画个图,差别很容易理解。

二、初始化

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);}public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}

这里的 super 指的是 ThreadPoolExecutor,也就是《线程池源码分析》,文章中提到的构造方法。

也就是说,

  • corePoolSize 线程池的核心线程数量,是调用者传入的
  • maximumPoolSize 最大线程数量,是Integer的最大值
  • keepAliveTime 线程阻塞时,存活时间是 0
  • workQueue 阻塞的队列是 DelayedWorkQueue

这里需要说说,这个DelayedWorkQueue,它是 ScheduledThreadPoolExecutor 的内部类。

《DelayQueue源码解析》,这篇文章里讲过延时队列。

只要把这篇文章看明白了,就知道 DelayedWorkQueue 是做什么了,实现的逻辑几乎一模一样。

三、任务执行

1. 源码对照

public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit) {if (callable == null || unit == null)throw new NullPointerException();RunnableScheduledFuture<V> t = decorateTask(callable,new ScheduledFutureTask<V>(callable,triggerTime(delay, unit)));delayedExecute(t);return t;}public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (period <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;}public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (delay <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;}

仔细看下,这三个方法的实现,基本上是一样的,代码都差不多。

大概就是,将任务包装成 ScheduledFutureTask,然后执行 delayedExecute(t)

private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown()) // 如果线程池已关闭,拒绝执行任务reject(task);else {super.getQueue().add(task); // 将任务放入阻塞队列if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false); // 符合某些条件,任务直接取消elseensurePrestart(); // 确保任务执行}}void ensurePrestart() { // 此方法,可以保证一定有线程来执行任务int wc = workerCountOf(ctl.get());if (wc < corePoolSize)addWorker(null, true);else if (wc == 0)addWorker(null, false);}

delayedExecute 这个方法看着也很简单,将任务入队,然后调用 ensurePrestart()

ensurePrestart() 这个方法更简单,就是调用 addWorker(null, true) 方法。

在《线程池源码分析》,这篇文章中,

详细分析过 addWorker(null, true) 方法,本文不再赘述。

看到这里,有没有这样的感觉:

这周期性的任务,怎么就执行了?代码中也没体现出来呀!

网上的文章,也没有讲怎么执行的,为啥?有点讲不清楚。
.

2. 线程的创建与启动

执行scheduleWithFixedDelay方法时, 调用 delayedExecute

delayedExecute 方法将任务放入阻塞队列,第一次执行时,会调用 addWorker 方法。

ThreadPoolExecutor 这在类中的 addWorker 方法会创建一个线程,并会 调用 start(),即启动线程。

这部分内容在《线程池源码分析》中,详细说过。

.

3. 周期性执行

线程启动后,JVM 会在合适的时候,调用 ThreadPoolExecutor 中的 run() 方法,

其实现是 runWorker() 方法,简化代码如下。

    while (task != null || (task = getTask()) != null) {w.lock();....task.run();}

task.run() 时,会调用ScheduledThreadPoolExecutor 类中的 run() 方法

public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic)) // 线程池关闭时,会进入这个分支cancel(false);else if (!periodic)ScheduledFutureTask.super.run(); // 不是周期性任务,直接执行任务else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime(); // 重置下次执行的时间reExecutePeriodic(outerTask); // 将任务放进阻塞队列}}

ScheduledFutureTask.super.runAndReset() 这个方法会调用示例中,重写的 run() 方法。

setNextRunTime() 这个方法会修改 任务中的 time 属性。

reExecutePeriodic 这个方法,做了两件事,

void reExecutePeriodic(RunnableScheduledFuture<?> task) {if (canRunInCurrentRunState(true)) {super.getQueue().add(task); // 将任务重新放入阻塞队列if (!canRunInCurrentRunState(true) && remove(task))task.cancel(false); // 某些情况下,取消任务elseensurePrestart(); // 保证有一个线程在工作}}

一是把任务放入阻塞队列。二是,若没有工作线程,创建一个线程。

看到这儿,应该还是觉得很懵才对,我结合图来解释。

这四步我大概解释下,

从阻塞队列中取任务,是否会阻塞,与其属性 time 的值有关,这个等下再解释。

执行任务,这个不需要多解释,在本文开头的几个示例中,就是打印了日志。

修改任务的time属性,这个是控制任务下次什么时候执行。即下次什么时候能从队列中取出来。

将任务放入队列,这个不需要多解释。

runWorker 的简化代码,上面给了出来,是一个 while 循环,每次循环都会执行这四步,

以上就是周期性执行的基本原因。

.

4. 延时的控制

《DelayQueue源码解析》在这篇文章中,介绍了延时阻塞队列的工作原理。

如果这个不太懂,下文的理解会很吃力。

runWorker 方法从队列中取任务时,会判断 getDelay() 方法的返回值,

假设返回值是 N,若 N 大于 0,则阻塞。阻塞的时间为N,时间到了之后,

自我唤醒,取到任务,就开始执行任务。

public long getDelay(TimeUnit unit) {return unit.convert(time - now(), NANOSECONDS);}final long now() {return System.nanoTime();}

getDelay() 方法中,time - now(),其中 time 的原始值,是 now() + delay。

比如本文开头的示例,延迟 5 秒执行。time 的原始值,就是 now() + 5 秒(最终单位统一为纳秒)。

延迟执行就是这么控制的。

.

5. 如何修改任务的 time 属性

上面说过,runWorker 的四步,第二步是执行任务,第三步是修改 time 属性。

修改 time 属性就是下面这个方法。

     private void setNextRunTime() {long p = period;if (p > 0)time += p;elsetime = triggerTime(-p);}

本文开头示例二, scheduleAtFixedRate 这个方法 period 是正数。 示例中是 2 秒。

那么setNextRunTime 方法中,会执行 time += p 这行,翻译一下就是:

time - now() 的值会在 2 秒后变为负数,也就是2秒后会任务可以被再次取出。

这个我表达能力有限,只能说到这个层次了。


scheduleWithFixedDelay 这个方法 period 是负数。 示例中是 2 秒,即 这= -2。

会执行 time = triggerTime(-p) 这个方法。

    long triggerTime(long delay) {return now() +((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));}

也就是在任务结束时,time 会被设置为 now() + 2 秒(最终结果以纳秒来计算)。

翻译一下就是,任务结束了,再过2秒,任务会被再次取出执行。

也就是说,通过 time 发生的修改,就控制了任务什么时候能被执行。

四、总结

1、本文是线程池进阶的内容,要理解本文,需要提前做两个功课。
《线程池工作原理》、《DelayQueue工作原理》这两篇文章要先理解。

2、scheduleWithFixedDelayscheduleWithFixedDelay 工作原理相同。即 rumWorker 方法重复执行以下四个步骤

  • 阻塞队列中取任务
  • 执行任务
  • 修改任务的 time 属性
  • 重新将任务放回阻塞队列

ScheduledThreadPool 源码解析——定时类线程池是如何工作的相关推荐

  1. Java Executor源码解析(7)—Executors线程池工厂以及四大内置线程池

    详细介绍了Executors线程池工具类的使用,以及四大内置线程池. 系列文章: Java Executor源码解析(1)-Executor执行框架的概述 Java Executor源码解析(2)-T ...

  2. Java Executor源码解析(3)—ThreadPoolExecutor线程池execute核心方法源码【一万字】

    基于JDK1.8详细介绍了ThreadPoolExecutor线程池的execute方法源码! 上一篇文章中,我们介绍了:Java Executor源码解析(2)-ThreadPoolExecutor ...

  3. 【EventBus】EventBus 源码解析 ( 事件发送 | 线程池中执行订阅方法 )

    文章目录 一.EventBus 中主线程支持类 二.EventBus 中 AsyncPoster 分析 三.AsyncPoster 线程池 Runnable 任务类 一.EventBus 中主线程支持 ...

  4. Executors源码解读——创建ExecutorService线程池

    Executors源码解读--创建ExecutorService线程池 〇.[源码版本] jdk 1.8 一.线程池概述 二.线程池创建 三.Executors源码解读 newFixedThreadP ...

  5. JDK源码解析 InputStream类就使用了模板方法模式

    JDK源码解析 InputStream类就使用了模板方法模式. 在InputStream类中定义了多个 read() 方法,如下: public abstract class InputStream ...

  6. JDK源码解析 Integer类使用了享元模式

    JDK源码解析 Integer类使用了享元模式. 我们先看下面的例子: public class Demo {public static void main(String[] args) {Integ ...

  7. Java线程池状态判断源码_深入浅出Java线程池:源码篇

    前言 在上一篇文章深入浅出Java线程池:理论篇中,已经介绍了什么是线程池以及基本的使用.(本来写作的思路是使用篇,但经网友建议后,感觉改为理论篇会更加合适).本文则深入线程池的源码,主要是介绍Thr ...

  8. JDK源码解析--Object类

    作为一名java开发,肯定会知道object类,object类是所有类的基类,当一个类没有直接继承任何类时,默认继承object类,所以也被称之为"上帝类". 目录 一.继承Obj ...

  9. HashMap 1.8 源码解析以及非线程安全分析

    2019独角兽企业重金招聘Python工程师标准>>> 1.首先看下HashMap的put方法. final V putVal(int hash, K key, V value, b ...

最新文章

  1. SpringBoot 编写ajax可以访问的接口
  2. Tesseract——OCR图像识别 入门篇
  3. 在Ubuntu中实验环境配置《操作系统原理与实践-李治军》
  4. 虚拟局域网Vlan与单臂路由、三层交换、链路聚合技术
  5. java 压缩 乱码_如何解决java压缩文件乱码问题
  6. [转载] 1.1Java使用JDBC原生方式连接MySql数据库
  7. MacOS12.3M1出现程序killed的一些想法
  8. Spring框架----Spring的bean之三种创建bean对象的方式
  9. css未生效,css不生效是什么原因
  10. Docker Image执行流程
  11. 高性能ORM数据访问组件Deft,比dapper快20%以上
  12. 微信小程序时间轴demo_微信小程序时间轴实现方法示例
  13. 尤雨溪: 2022 Web 前端生态趋势
  14. JQuery源码分析 - 闭包机制在jQuery中的使用及冲突解决
  15. node aws 内存溢出_如何使用Node和AWS S3设置简单的图像上传
  16. Python3.7.4入门-0/1To Begin/数据类型与结构
  17. mq5 EA模板及双均线交叉策略EAdemo
  18. 留用户、补内容,在线音乐暗战不停
  19. pytorch中torch.isnan()和torch.isfinite()
  20. java 阴历阳历转换

热门文章

  1. 论文写作-英文词汇或短语使用注意事项
  2. 联科教育为全球最大的网络供应商Cisco(思科)提供技术培训!
  3. Python实现SSH远程连接与文件传输
  4. 1计算机等级考试 12月,2017年12月计算机二级MS Office考试上机冲刺题(1)
  5. 神奇宝贝java路径_【NDS地图制作教程零一】如何打开工具——JAVA第一课
  6. WPF Expander更改样式
  7. git冲突出现的原因及解决方案
  8. Apache配置文件介绍
  9. Django Q对象
  10. 计算博弈笔记(一)博弈论