Java定时任务(2)


上次浅显的分析了Timer及TimerTask的调度原理,这里我们再来看一下另一种定时调度方式ScheduledThreadPoolExecutor的内部执行原理。

  • ScheduledThreadPoolExecutor调度方式
  • 阻塞队列DelayedWorkQueue的实现
  • ScheduledFutureTask控制调度过程
  • 总结

ScheduledThreadPoolExecutor调度方式

类似于Timer,它的调度方式也有4种,我们抽出其中一种来具体分析。

//一次性任务,无返回值
public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {...}//一次性任务,有返回值
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit) {...}public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {...}public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit){...}

我们取scheduleWithFixedDelay作为切入点来进行分析。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (delay <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;}//将Runnable适配成FutureTask
ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay));//由于是将一个无返回值的Runnable包装成一个FutureTask,所以传入一个null作为返回值ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time = ns;         //设置执行时间this.period = period;   //设置时间片this.sequenceNumber = sequencer.getAndIncrement();}//Executors.callable()是真正进行Runnable与FutureTask适配的函数
public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW;       // ensure visibility of callable}public static <T> Callable<T> callable(Runnable task, T result) {if (task == null)throw new NullPointerException();return new RunnableAdapter<T>(task, result);}    //call()就不多做介绍了,由FutureTask的call来掉内部Runnable的run()
static final class RunnableAdapter<T> implements Callable<T> {final Runnable task;final T result;RunnableAdapter(Runnable task, T result) {this.task = task;this.result = result;}public T call() {task.run();return result;}}//修改或替换用于执行一个可运行的任务。//这个方法可以用来覆盖具体类以管理内部任务。//默认实现仅返回给定的任务。RunnableScheduledFuture<Void> t = decorateTask(command, sft);protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {return task;}//这里之所以先将task放入队列,再由worker去取出并运行,是因为task有可能并不期望当下就被运行,而是期望一段时间的延迟后再开始运行。
//同时一个task被创建后直接运行,也是不合理的一种行为。
private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())       //线程池状态是否正常reject(task);       //拒绝任务,根据拒绝策略,通常有1.直接丢弃 2.抛异常 3.新建一个线程来执行 4.类似于LRU,丢弃队列中最久的一个任务else {super.getQueue().add(task);         //先加入BlockingQueue,该队列默认在ThreadPoolExecutor中实现,但这里提供了DelayedWorkQueue代替默认实现if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&      //当task已经被加入队列,但是线程池挂了,根据status来决定,取消并删除在队列中的task。remove(task))task.cancel(false);elseensurePrestart();}}//其实就是保证至少有一个worker已经启动void ensurePrestart() {int wc = workerCountOf(ctl.get());if (wc < corePoolSize)addWorker(null, true);else if (wc == 0)addWorker(null, false);}

至此,一个定时任务就被加入DelayedWorkQueue队列了,而调度任务的线程由ThreadPoolExecutor提供。
但是到此为此还并没有实现延迟调度的功能。
接下来说说DelayedWorkQueue是如何实现延迟调度的。


阻塞队列DelayedWorkQueue的实现

public boolean offer(Runnable x) {if (x == null)throw new NullPointerException();RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;final ReentrantLock lock = this.lock;lock.lock();try {int i = size;if (i >= queue.length)grow();     //重建数组,长度增加50%size = i + 1;   if (i == 0) {queue[0] = e;setIndex(e, 0);} else {siftUp(i, e);}if (queue[0] == e) {leader = null;available.signal();}} finally {lock.unlock();}return true;}//这点跟Timer中的操作比较类似,做了一个相对排序而非绝对意义的排序。
//内部做了一个比较接口,根据下次执行的时间来排队。
private void siftUp(int k, RunnableScheduledFuture<?> key) {while (k > 0) {int parent = (k - 1) >>> 1;RunnableScheduledFuture<?> e = queue[parent];if (key.compareTo(e) >= 0)break;queue[k] = e;setIndex(e, k);k = parent;}queue[k] = key;setIndex(key, k);}//但是显然offer并没有来提供延迟调度的实现,真正的延迟调度的实现是在take中完成的。public RunnableScheduledFuture<?> take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {RunnableScheduledFuture<?> first = queue[0];    //第一个元素,也就是优先级最高,最应该被执行的任务if (first == null)available.await();  //进入这里说明队列中没有待执行的任务,所以来取任务的worker进入等待状态else {long delay = first.getDelay(NANOSECONDS);   //取到任务,getDelay定义于内部类ScheduledFutureTask中if (delay <= 0)return finishPoll(first);               //下面有说明,建议先看下面first = null; // don't retain ref while waitingif (leader != null)available.await();else {Thread thisThread = Thread.currentThread();leader = thisThread;try {available.awaitNanos(delay);        //进入这里说明,当前还未到该任务下次执行的时间,所以进入等待,等待时长为当前到执行时间的时差} finally {if (leader == thisThread)leader = null;}}}}} finally {if (leader == null && queue[0] != null)available.signal();lock.unlock();}}//其目的是计算出下次将要执行任务的时间与当前系统时间的差值。
//若小于等于0,则说明该立即被执行。
//否则说明仍需等待。public long getDelay(TimeUnit unit) {return unit.convert(time - now(), NANOSECONDS);}//返回第一个task,并将第一个元素与最后一个元素互换位置。
//通常因为我们在offer一个task的时候已经进行过相对排序,使得这个队列在大体上是相对有序的。
//而这里在take走一个元素后却将最后一个元素(理论上来说,无论是优先级是相对低的,下次执行时间都是最久的)互换。private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {int s = --size;     RunnableScheduledFuture<?> x = queue[s];queue[s] = null;if (s != 0)siftDown(0, x);setIndex(f, -1);return f;}private void siftDown(int k, RunnableScheduledFuture<?> key) {int half = size >>> 1;while (k < half) {int child = (k << 1) + 1;RunnableScheduledFuture<?> c = queue[child];int right = child + 1;if (right < size && c.compareTo(queue[right]) > 0)c = queue[child = right];if (key.compareTo(c) <= 0)break;queue[k] = c;setIndex(c, k);k = child;}queue[k] = key;setIndex(key, k);}

siftDown()大体上是一个自顶向下的一个相对排序。
首先half = size >>> 1 = size/2。
其次,关注whlie(k < half),我们假设永不会走到break,下面我简单的列出部分数组下标变化情况。
假设 size = 32。

k child right
0 1 2
1 3 4
3 7 8
7 15 16
15 31 32

由于条件是k < half
所以至多循环log2(size)次。
类比于siftUp,其实是一次自顶向下的相对排序,结果也会是相对有序,我猜可能是为了弥补siftUp()时的不足,所以再进行了一次相对排序。
至此,已经从大体上了解了DelayedWorkQueue是如何实现延迟调度的。

最后,在来看下worker取到task之后如何run
由于task被ScheduledFutureTask包装,所以我们看的自然是ScheduledFutureTask的run()。


ScheduledFutureTask控制调度过程

public void run() {boolean periodic = isPeriodic();    //判断是否是一个循环任务if (!canRunInCurrentRunState(periodic))cancel(false);else if (!periodic)ScheduledFutureTask.super.run();    //若不是循环任务,当做一个普通任务来处理else if (ScheduledFutureTask.super.runAndReset()) { //若是循环任务,在runAndReset()中实际运行,运行完毕后同时将status重新置为NEW,也就是相当于一个新tasksetNextRunTime();       //计算下次任务执行的时间,这里与Timer有所区别,Timer是先计算出来在运行,而这里是先运行在计算reExecutePeriodic(outerTask);       //自然,它完成的就是讲task重新放入队列,并做一些线程池状态的相关判断}}

总结

1.延迟调度由DelayedWorkQueue来实现。
2.Executors.callable()完成Runnable到Callable的转换(适配)。
3.ScheduledFutureTask包装task,实现循环任务的“循环”处理,延迟计算,调度控制等相关操作。
4.ThreadPoolExecutor实现多线程并发操作。
5.相比于Time,最为明显的几点:
  1.Timer单线程,ScheduledThreadPoolExecutor多线程(显而易见)。
  2.Timer先计算下次调度时间,在运行,ScheduledThreadPoolExecutor相反。
  3.Timer之间无法实现TimerTask共享,ScheduledThreadPoolExecutor通过ThreadPoolExecutor可以实现。
  4.Timer与ScheduledThreadPoolExecutor使用的BlockingQueue不同。

Java定时任务(2)相关推荐

  1. java定时任务,每天定时执行任务

    java定时任务,每天定时执行任务.以下是这个例子的全部代码. public class TimerManager {//时间间隔private static final long PERIOD_DA ...

  2. java定时任务框架elasticjob详解

    这篇文章主要介绍了java定时任务框架elasticjob详解,Elastic-Job是ddframe中dd-job的作业模块中分离出来的分布式弹性作业框架.该项目基于成熟的开源产品Quartz和Zo ...

  3. Springboot2 Quartz实现JAVA定时任务的动态配置

    动态配置Quartz.没接触过定时任务的同学可以先看下此篇:JAVA定时任务实现的几种方式 文章目录 一.需求背景 1. 问题现象 2. 问题分析 3. 解决方案 二.需求背景 2.1. maven依 ...

  4. Java定时任务解决方案

    Java定时任务解决方案 参考文章: (1)Java定时任务解决方案 (2)https://www.cnblogs.com/zhixiang-org-cn/p/9490877.html (3)http ...

  5. Java定时任务(一) Timer及TimerTask的案例解析及源码分析

    Java定时任务(一)  Timer及TimerTask的案例解析及源码分析 一.概述: 定时任务这个概念在Java的学习以及项目的开发中并不陌生,应用场景也是多种多样.比如我们会注意到12306网站 ...

  6. JAVA定时任务的简单实现

    Java定时任务的简单实现 2011-01-02 18:34:43|  分类: 软件开发 |  标签:void  timer  import  param  dateutil   |字号大中小 订阅 ...

  7. Java定时任务技术分析

    <从零打造项目>系列文章 工具 比MyBatis Generator更强大的代码生成器 ORM框架选型 SpringBoot项目基础设施搭建 SpringBoot集成Mybatis项目实操 ...

  8. Java 定时任务详解

    文章目录 单机定时任务技术选型 Timer ScheduledExecutorService Spring Task 时间轮 分布式定时任务技术选型 Quartz Elastic-Job XXL-JO ...

  9. Java定时任务手工触发-使用Arthas

    1. 前言 在测试环境经常需要手工触发Java应用中的定时任务,如果定时任务没有使用Quartz,Java应用中也没有提供其他方法手工触发定时任务,可以使用Arthas快速实现以上目的. 以下使用Ar ...

  10. JAVA定时任务时间配置

    JAVA定时任务时间配置 每个位置配置解释 参数说明 通配符说明 常用示例 每个位置配置解释 秒 分 时 日 月 周 年* * * * * * * 参数说明 序号 说明 是否必填 允许填写的值 允许的 ...

最新文章

  1. 2013年上半年网络工程师真题
  2. 3.2 进阶-好多鱼
  3. Java反射实现几种方式
  4. linux根目录挂载到2440开发板,飞凌2440开发板挂载NFS
  5. java 数组 push pop_JavaScript学习笔记:数组的push()、pop()、shift()和unshift()方法
  6. 填坑-十万个为什么?(22)
  7. mysql中的tablefamily_Mysql中的表操作
  8. 蒂姆·库克:给好人留的后门同时也是给坏人留的
  9. 一个较完整的关键字过滤解决方案(上)
  10. Linux PCI 设备驱动基本框架(一)
  11. Team Foundation Server (TFS) 2015 安装指导
  12. Android 应用内更新 Support in-app updates [GP官方支持]
  13. 7. Swift 基于Xmpp和openfire实现一个简单的登录注册
  14. C#反混淆脱壳工具de4dot的使用(转)
  15. go技巧-json转map
  16. 微大夫感冒舒缓仪亮相“2018健康陕西发展大会”回顾
  17. 学大伟业 Day 4 培训总结
  18. HTML+CSS+JS实现 ❤️ 立方体旋转图片切换特效❤️
  19. C#入门学习——飞行棋
  20. ormpp 项目新版本规划概述

热门文章

  1. android多渠道打包(动态改变地址打包,只需改下版本号)
  2. 程序员能干一辈子吗?性格内向的老实人,更适合程序员职业,工资高且不限年龄!
  3. 雪橇、雪车、钢架雪车傻傻分不清?一文教你弄明白
  4. 《未来简史》九、我们编造一堆故事,框了自己一辈子
  5. mysql setinc_thinkphp3.2.0 setInc方法
  6. three 天空球_three.js添加场景背景和天空盒(skybox)
  7. iOS -- block one
  8. 幼儿应不应该使用计算机,该不该对幼儿进行电脑教育?
  9. 一步步分析百度音乐的播放地址,利用Python爬虫批量下载
  10. Linux下Apache下载安装