1.ScheduledThreadPoolExecutor 整体结构剖析。

1.1类图介绍

根据上面类图图可以看到Executor其实是一个工具类,里面提供了好多静态方法,根据用户选择返回不同的线程池实例。可以看到ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor 并实现 ScheduledExecutorService接口。线程池队列是 DelayedWorkQueue,和 DelayedQueue 类似是一个延迟队列。

ScheduledFutureTask 是具有返回值的任务,继承自 FutureTask,FutureTask 内部有个变量 state 用来表示任务的状态,一开始状态为 NEW,所有状态为:

    private static final int NEW          = 0;//初始状态private static final int COMPLETING   = 1;//执行中状态private static final int NORMAL       = 2;//正常运行结束状态private static final int EXCEPTIONAL  = 3;//运行中异常private static final int CANCELLED    = 4;//任务被取消private static final int INTERRUPTING = 5;//任务正在被中断private static final int INTERRUPTED  = 6;//任务已经被中断

FutureTask可能的任务状态转换路径如下所示:

    NEW -> COMPLETING -> NORMAL //初始状态->执行中->正常结束NEW -> COMPLETING -> EXCEPTIONAL//初始状态->执行中->执行异常NEW -> CANCELLED//初始状态->任务取消NEW -> INTERRUPTING -> INTERRUPTED//初始状态->被中断中->被中断

其实ScheduledFutureTask 内部还有个变量 period 用来表示任务的类型,其任务类型如下:

  • period=0,说明当前任务是一次性的,执行完毕后就退出了。

  • period 为负数,说明当前任务为 fixed-delay 任务,是定时可重复执行任务。

  • period 为整数,说明当前任务为 fixed-rate 任务,是定时可重复执行任务。

接下来我们可以看到ScheduledThreadPoolExecutor 的造函数如下

    //使用改造后的Delayqueue.public ScheduledThreadPoolExecutor(int corePoolSize) {//调用父类ThreadPoolExecutor的构造函数super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,new DelayedWorkQueue());}public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}

根据上面代码可以看到线程池队列是 DelayedWorkQueue

2、原理分析

我们主要看三个重要的函数,如下所示:

schedule(Runnable command, long delay,TimeUnit unit)scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

2.1、schedule(Runnable command, long delay,TimeUnit unit)方法

该方法作用是提交一个延迟执行的任务,任务从提交时间算起延迟 unit 单位的 delay 时间后开始执行,提交的任务不是周期性任务,任务只会执行一次,代码如下:

public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {//(1)参数校验if (command == null || unit == null)throw new NullPointerException();//(2)任务转换RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));//(3)添加任务到延迟队列
    delayedExecute(t);return t;
}

可以看到上面代码所示,代码(1)参数校验,如果 command 或者 unit 为 null,抛出 NPE 异常。

代码(2)装饰任务,把提交的 command 任务转换为 ScheduledFutureTaskScheduledFutureTask 是具体放入到延迟队列里面的东西,由于是延迟任务,所以 ScheduledFutureTask 实现了 long getDelay(TimeUnit unit) 和 int compareTo(Delayed other) 方法,triggerTime 方法转换延迟时间为绝对时间,也就是把当前时间的纳秒数加上延迟的纳秒数后的 long 型值。

接下来我们需要看 ScheduledFutureTask的构造函数,如下所示:

ScheduledFutureTask(Runnable r, V result, long ns) {//调用父类FutureTask的构造函数super(r, result);this.time = ns;this.period = 0;//period为0,说明为一次性任务this.sequenceNumber = sequencer.getAndIncrement();
}

根据构造函数可以看到内部首先调用了父类 FutureTask 的构造函数,父类 FutureTask 的构造函数代码如下:

//通过适配器把runnable转换为callable
public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW;       //设置当前任务状态为NEW
}

根据上面代码可以看到FutureTask 中任务又被转换为了 Callable 类型后,保存到了变量 this.callable 里面,并设置 FutureTask 的任务状态为 NEW。

然后 ScheduledFutureTask 构造函数内部设置 time 为上面说的绝对时间,需要注意这里 period 的值为 0,这说明当前任务为一次性任务,不是定时反复执行任务。

其中 long getDelay(TimeUnit unit) 方法代码如下,用来获取当前任务还有多少时间就过期了,代码如下所示:

//元素过期算法,装饰后时间-当前时间,就是即将过期剩余时间
public long getDelay(TimeUnit unit) {return unit.convert(time - now(), NANOSECONDS);
}

接下来接着看compareTo(Delayed other) 方法,代码如下:

public int compareTo(Delayed other) {if (other == this) // compare zero ONLY if same objectreturn 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;long diff = time - x.time;if (diff < 0)return -1;else if (diff > 0)return 1;else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}long d = (getDelay(TimeUnit.NANOSECONDS) -other.getDelay(TimeUnit.NANOSECONDS));return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}

根据上面代码的执行逻辑,可以看到compareTo 作用是加入元素到延迟队列后,内部建立或者调整堆时候会使用该元素的 compareTo 方法与队列里面其他元素进行比较,让最快要过期的元素放到队首。所以无论什么时候向队列里面添加元素,队首的的元素都是最即将过期的元素。

接下来接着看代码(3)添加任务到延迟队列,delayedExecute 的代码如下:

private void delayedExecute(RunnableScheduledFuture<?> task) {//(4)如果线程池关闭了,则执行线程池拒绝策略if (isShutdown())reject(task);else {//(5)添加任务到延迟队列super.getQueue().add(task);//(6)再次检查线程池状态if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);else//(7)确保至少一个线程在处理任务
            ensurePrestart();}
}

可以看到代码(4)首先判断当前线程池是否已经关闭了,如果已经关闭则执行线程池的拒绝策略(如果不知道线程池的拒绝策略可以看前一篇线程池的介绍。)

否者执行代码(5)添加任务到延迟队列。添加完毕后还要重新检查线程池是否被关闭了,如果已经关闭则从延迟队列里面删除刚才添加的任务,但是有可能线程池线程已经从任务队列里面移除了该任务,也就是该任务已经在执行了,所以还需要调用任务的 cancle 方法取消任务。

如果代码(6)判断结果为 false,则会执行代码(7)确保至少有一个线程在处理任务,即使核心线程数 corePoolSize 被设置为 0.

ensurePrestart 代码如下:

void ensurePrestart() {int wc = workerCountOf(ctl.get());//增加核心线程数if (wc < corePoolSize)addWorker(null, true);//如果初始化corePoolSize==0,则也添加一个线程。else if (wc == 0)addWorker(null, false);}
}

如上代码首先首先获取线程池中线程个数,如果线程个数小于核心线程数则新增一个线程,否者如果当前线程数为 0 则新增一个线程。

通过上面代码我们分析了如何添加任务到延迟队列,下面我们看线程池里面的线程如何获取并执行任务的,从前面讲解的 ThreadPoolExecutor 我们知道具体执行任务的线程是 Worker 线程,Worker 线程里面调用具体任务的 run 方法进行执行,由于这里任务是 ScheduledFutureTask,所以我们下面看看 ScheduledFutureTask 的 run 方法。代码如下:

public void run() {//(8)是否只执行一次boolean periodic = isPeriodic();//(9)取消任务if (!canRunInCurrentRunState(periodic))cancel(false);//(10)只执行一次,调用schdule时候else if (!periodic)ScheduledFutureTask.super.run();//(11)定时执行else if (ScheduledFutureTask.super.runAndReset()) {//(11.1)设置time=time+period
        setNextRunTime();//(11.2)重新加入该任务到delay队列
        reExecutePeriodic(outerTask);}
}   

可以看到代码(8)isPeriodic 的作用是判断当前任务是一次性任务还是可重复执行的任务,isPeriodic 的代码如下:

public boolean isPeriodic() {return period != 0;
}

可知内部是通过 period 的值来判断,由于转换任务创建 ScheduledFutureTask 时候传递的 period 为 0 ,所以这里 isPeriodic 返回 false。

代码(9)判断当前任务是否应该被取消,canRunInCurrentRunState 的代码如下:

boolean canRunInCurrentRunState(boolean periodic) {return isRunningOrShutdown(periodic ?continueExistingPeriodicTasksAfterShutdown :executeExistingDelayedTasksAfterShutdown);
}

这里传递的 periodic 为 false,所以 isRunningOrShutdown 的参数为 executeExistingDelayedTasksAfterShutdownexecuteExistingDelayedTasksAfterShutdown 默认是 true 标示当其它线程调用了 shutdown 命令关闭了线程池后,当前任务还是要执行,否者如果为 false,标示当前任务要被取消。

由于 periodic 为 false,所以执行代码(10)调用父类 FutureTask 的 run 方法具体执行任务,FutureTask 的 run 方法代码如下:

public void run() {//(12)if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;//(13)try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;//(13.1)
                    setException(ex);}//(13.2)if (ran)set(result);}} finally {...省略        }}

可以看到代码(12)如果任务状态不是 NEW 则直接返回,或者如果当前任务状态为NEW但是使用 CAS 设置当然任务的持有者为当前线程失败则直接返回。代码(13)具体调用 callable 的 call 方法执行任务,这里在调用前又判断了任务的状态是否为 NEW 是为了避免在执行代码(12)后其他线程修改了任务的状态(比如取消了该任务)。

如果任务执行成功则执行代码(13.2)修改任务状态,set 方法代码如下:

 protected void set(V v) {//如果当前任务状态为NEW,则设置为COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;//设置当前任务终状为NORMAL,也就是任务正常结束UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();}}

如上代码首先 CAS 设置当前任务状态从 NEW 转换到 COMPLETING,这里多个线程调用时候只有一个线程会成功,成功的线程在通过 UNSAFE.putOrderedInt 设置任务的状态为正常结束状态,这里没有用 CAS 是因为同一个任务只可能有一个线程可以运行到这里,这里使用 putOrderedInt 比使用 CAS 函数或者 putLongVolatile 效率要高,并且这里的场景不要求其它线程马上对设置的状态值可见。

这里思考个问题,这里什么时候多个线程会同时执行 CAS 设置任务状态从态从 NEW 到 COMPLETING?其实当同一个 comand 被多次提交到线程池时候就会存在这样的情况,由于同一个任务共享一个状态值 state。

如果任务执行失败,则执行代码(13.1),setException 的代码如下,可见与 set 函数类似,代码如下:

protected void setException(Throwable t) {//如果当前任务状态为NEW,则设置为COMPLETINGif (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = t;//设置当前任务终态为EXCEPTIONAL,也就是任务非正常结束UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);             finishCompletion();}
}

到这里代码(10)逻辑执行完毕,一次性任务也就执行完毕了,

下面会讲到如果任务是可重复执行的,则不会执行步骤(10)而是执行代码(11)。

2.2  scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)方法

当任务执行完毕后,延迟固定间隔时间后再次运行(fixed-delay 任务):其中 initialDelay 说明提交任务后延迟多少时间开始执行任务 command,delay 表示当任务执行完毕后延长多少时间后再次运行 command 任务,unit 是 initialDelay 和 delay 的时间单位。任务会一直重复运行直到任务运行时候抛出了异常或者取消了任务,或者关闭了线程池。scheduleWithFixedDelay 的代码如下:

 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) {//(14)参数校验if (command == null || unit == null)throw new NullPointerException();if (delay <= 0)throw new IllegalArgumentException();//(15)任务转换,注意这里是period=-delay<0ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,                                    triggerTime(initialDelay, unit),unit.toNanos(-delay));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;//(16)添加任务到队列
        delayedExecute(t);return t;}

如上代码(14)进行参数校验,校验失败则抛出异常,代码(15)转换 command 任务为 ScheduledFutureTask,这里需要注意的是这里传递给 ScheduledFutureTask 的 period 变量的值为 -delay,period < 0 这个说明该任务为可重复执行的任务。然后代码(16)添加任务到延迟队列后返回。

任务添加到延迟队列后线程池线程会从队列里面获取任务,然后调用 ScheduledFutureTask的 run 方法执行,由于这里 period<0 所以 isPeriodic 返回 true,所以执行代码(11),runAndReset 的代码如下:

protected boolean runAndReset() {//(17)if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return false;//(18)boolean ran = false;int s = state;try {Callable<V> c = callable;if (c != null && s == NEW) {try {c.call(); // don't set resultran = true;} catch (Throwable ex) {setException(ex);}}} finally {...        }return ran && s == NEW;//(19)
}

该代码和 FutureTask 的 run 类似,只是任务正常执行完毕后不会设置任务的状态,这样做是为了让任务成为可重复执行的任务,这里多了代码(19)如果当前任务正常执行完毕并且任务状态为 NEW 则返回 true 否者返回 false。

如果返回了 true 则执行代码(11.1)setNextRunTime 方法设置该任务下一次的执行时间,setNextRunTime 的代码如下:

private void setNextRunTime() {long p = period;if (p > 0)//fixed-rate类型任务time += p;else//fixed-delay类型任务time = triggerTime(-p);}

如上代码这里 p < 0 说明当前任务为 fixed-delay 类型任务,然后设置 time 为当前时间加上 -p 的时间,也就是延迟 -p 时间后在次执行。

总结:本节介绍的 fixed-delay 类型的任务的执行实现原理如下,当添加一个任务到延迟队列后,等 initialDelay 时间后,任务就会过期,过期的任务就会被从队列移除,并执行,执行完毕后,会重新设置任务的延迟时间,然后在把任务放入延迟队列实现的,依次往复。需要注意的是如果一个任务在执行某一个次时候抛出了异常,那么这个任务就结束了,但是不影响其它任务的执行。

2.3、scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)方法

相对起始时间点固定频率调用指定的任务(fixed-rate 任务):当提交任务到线程池后延迟 initialDelay 个时间单位为 unit 的时间后开始执行任务 comand ,然后 initialDelay + period 时间点再次执行,然后在 initialDelay + 2 * period 时间点再次执行,依次往复,直到抛出异常或者调用了任务的 cancel 方法取消了任务在结束或者关闭了线程池。

scheduleAtFixedRate 的原理与 scheduleWithFixedDelay 类似,下面我们讲下不同点,首先调用 scheduleAtFixedRate 时候代码如下:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {...//装饰任务类,注意period=period>0,不是负的ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));...return t;
}

如上代码 fixed-rate 类型的任务在转换 command 任务为 ScheduledFutureTask 的时候设置的 period=period 不在是 -period

所以当前任务执行完毕后,调用 setNextRunTime 设置任务下次执行的时间时候执行的是 time += p 而不在是 time = triggerTime(-p);

总结:相对于 fixed-delay 任务来说,fixed-rate 方式执行规则为时间为 initdelday + n*period; 时候启动任务,但是如果当前任务还没有执行完,下一次要执行任务的时间到了,不会并发执行,下次要执行的任务会延迟执行,要等到当前任务执行完毕后在执行一个任务。

3、总结

ScheduledThreadPoolExecutor 的实现原理,其内部使用的 DelayQueue来存放具体任务,其中任务分为三种,其中一次性执行任务执行完毕就结束了,fixed-delay任务保证同一个任务多次执行之间间隔固定时间,fixed-rate 任务保证任务执行按照固定的频率执行,其中任务类型使用 period 的值来区分。

转载于:https://www.cnblogs.com/huangjuncong/p/11029893.html

线程池之ScheduledThreadPoolExecutor线程池源码分析笔记相关推荐

  1. Clamav杀毒软件源码分析笔记 六

    分享一下我老师大神的人工智能教程!零基础,通俗易懂!http://blog.csdn.net/jiangjunshow 也欢迎大家转载本篇文章.分享知识,造福人民,实现我们中华民族伟大复兴! Clam ...

  2. Retrofit源码分析笔记(一)

    如遇图片无法加载请点击此链接 我们先从最简单的Retrofit使用方法看 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-n7bbuois-1665971394319)( ...

  3. SharpDevelop源码分析笔记(一)

    SharpDevelop自动命令启动UI部分(看SharpDevelop源码分析笔记随想) 参见:Fbt2008的大作  SharpDevelop源码分析笔记(一) 源文档 <http://ww ...

  4. Clamav杀毒软件源码分析笔记 九

    分享一下我老师大神的人工智能教程!零基础,通俗易懂!http://blog.csdn.net/jiangjunshow 也欢迎大家转载本篇文章.分享知识,造福人民,实现我们中华民族伟大复兴! Clam ...

  5. Clamav杀毒软件源码分析笔记[九]

    Clamav杀毒软件源码分析笔记[九] 刺猬@http://blog.csdn.net/littlehedgehog [数据流病毒扫描] 数据流病毒扫描,听上去貌似很牛逼的称呼,其实就是一个传送数据流 ...

  6. 创建线程的三种方法_Netty源码分析系列之NioEventLoop的创建与启动

    前言 前三篇文章分别分析了 Netty 服务端 channel 的初始化.注册以及绑定过程的源码,理论上这篇文章应该开始分析新连接接入过程的源码了,但是在看源码的过程中,发现有一个非常重要的组件:Ni ...

  7. Dubbo源码分析笔记-一(工程目录介绍)

    Dubbo 是阿里开发的分布式服务调用框架,提供了它提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现. 工程目录 模块介绍 dubbo-common   Dubb ...

  8. openmp官方源码_MNN推理过程源码分析笔记(一)主流程

    在正式开始推理代码分析之前, 回顾下 MNN整体结构 推理分为三个大部分 Engine Backends Runtime Optimize 那么问题来了,从哪里开始,怎么入手呢? 我的心得是源码分析不 ...

  9. 基于Linux的UART驱动框架源码分析笔记

    文章目录 前言 一.I.MX6ULL串口接收和发送方式 1.非DMA方式 1.1.接收方式 1.2 发送方式 2.DMA方式 2.1.接收方式 2.2 发送方式 二.UART驱动注册 1.uart_r ...

  10. vue实现消息badge 标记_Badge组件_element-ui源码分析笔记 - SegmentFault 思否

    Badge组件主要用于数字或状态的标记,对于消息类的提醒功能,使用这组件还是很常见的.具体显示效果如下图: 不管组件复杂还是简单,编码实现这个组件的都不是源码分析目的. 源码分析,在于通过一步步的实现 ...

最新文章

  1. AIX 修 炼 之 路
  2. 将Ojective-C代码移植转换为Swift代码
  3. 数据库面试题【十八、优化关联查询优化子查询优化LIMIT分页优化UNION查询优化WHERE子句】
  4. 上传文件显示进度条_文件上传带进度条进阶-断点续传
  5. 猜51CTO的人群结构
  6. (附源码)计算机毕业设计Java远程健康数据管理系统
  7. java课程 数独 文库_数独java代码
  8. 【12306抢票神器】抢票啦—请收下这份2022元旦春节抢票攻略
  9. 能上QQ不能上浏览器处理方法(win11版)
  10. 关于一句英文句子的词数的判断
  11. 通过phpstudy(小皮面板)搭建DVWA靶场教程
  12. win10系统下,屏幕录制专家如何录制耳机里面的声音
  13. BIT2023 智慧社区综合管理系统-一周目
  14. Finished with error:Navicat 运行SQL文件 报错
  15. 软件工程——系统流程图符号及案例
  16. InnoDB之redo log
  17. oracle 对象的授权
  18. 正则筛选图片url(js)
  19. 无线电技术 | 关于无线定位技术TDOA的综合论述(一)
  20. 曙光服务器如何重新设置u盘启动_u盘装系统设置u盘启动的两种方法

热门文章

  1. 阿里云控制台门户升级,V2.0 乘风破浪来了!
  2. Apache RocketMQ 4.8.0,DLedger 模式全面提升!
  3. 一台古老电脑之维修记
  4. API网关-apisix源码剖析,初始化依赖
  5. 腾讯云为小游戏开发者升级工具箱 小游戏联机对战引擎免费用
  6. Python基础——PyCharm版本——第四章、基础语法-分支语句(条件判断if语句)
  7. gg参数:dblogreader
  8. Oracle 估算数据库大小的方法
  9. 【转】flannel网络的VXLAN及host-gw
  10. Android app:transformNativeLibsWithStripDebugSymbolForDebug错误分析