ScheduledThreadPoolExecutor概述

ScheduledThreadPoolExecutor下文简称 STPE.

  1 public class ScheduledThreadPoolExecutor
  2         extends ThreadPoolExecutor
  3         implements ScheduledExecutorService

STPE 可以看到继承线程池类,从名字也可以看出这个STPE是一个执行周期任务的线程池。STPE的几个特点 :

  • 继承 ThreadPoolExecutor
  • 内部使用DelayedQueue 即是任务达到某个时间要求才返回的任务队列
  • 在运行之前,取消一个已提交的任务(task调用Cancel 方法),那么该任务不会执行,默认情况下,这样一个已经取消的任务不会自动从任务队列移除,直到延迟时间到了才移除,为了防止队列中保持已经取消的任务,使用 setRemoveOnCancelPolicy 设置true ,会在取消后立即移除队列。

下面看看几个周期方法 :

  • schedule(); : 任务开始前延时A秒执行
  • scheduleAtFixedRate();  (例如: 首个任务开始前延时A秒,时间为B秒的任务)
  • scheduleWithFixedDelay();   (例如: 首个任务开始前延时A秒,执行任务后延时B秒再进行下一个任务)

内部结构 :

  • DelayedWorkerQueue 内部类
  • ScheduledFutureTask 内部类
  • 几个控制变量

源码分析

重要的方法

先跟着流程走一遍,把工作思路先走一遍

  1     public ScheduledFuture<?> schedule(Runnable command,
  2                                        long delay,
  3                                        TimeUnit unit) {
  4         if (command == null || unit == null)
  5             throw new NullPointerException();
  6         RunnableScheduledFuture<?> t = decorateTask(command,
  7             new ScheduledFutureTask<Void>(command, null,
  8                                           triggerTime(delay, unit)));
  9         delayedExecute(t);
 10         return t;
 11     }
 12
 13
 14
 15
 16     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
 17                                                   long initialDelay,
 18                                                   long period,
 19                                                   TimeUnit unit) {
 20         if (command == null || unit == null)
 21             throw new NullPointerException();
 22         if (period <= 0)
 23             throw new IllegalArgumentException();
 24         //包装成  ScheduledFutureTask
 25         ScheduledFutureTask<Void> sft =
 26             new ScheduledFutureTask<Void>(command,
 27                                           null,
 28                                           triggerTime(initialDelay, unit),
 29                                           unit.toNanos(period));
 30         //装饰成  RunnableScheduledFuture
 31         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
 32         sft.outerTask = t;
 33         delayedExecute(t);
 34         return t;
 35     }
 36
 37
 38
 39
 40     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
 41                                                      long initialDelay,
 42                                                      long delay,
 43                                                      TimeUnit unit) {
 44         if (command == null || unit == null)
 45             throw new NullPointerException();
 46         if (delay <= 0)
 47             throw new IllegalArgumentException();
 48         ScheduledFutureTask<Void> sft =
 49             new ScheduledFutureTask<Void>(command,
 50                                           null,
 51                                           triggerTime(initialDelay, unit),
 52                                           unit.toNanos(-delay));
 53         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
 54         sft.outerTask = t;
 55         delayedExecute(t);
 56         return t;
 57     }

可以看到三个方法最后都会调用 delayedExecute 方法。

  1     private void delayedExecute(RunnableScheduledFuture<?> task) {
  2         if (isShutdown())
  3             reject(task);
  4         else {
  5             //任务队列加入任务
  6             super.getQueue().add(task);
  7             // 此时任务有可能在执行了,判断是不是 Running 状态,
  8             if (isShutdown() &&
  9                 !canRunInCurrentRunState(task.isPeriodic()) &&
 10                 remove(task))
 11                 task.cancel(false);
 12             else
 13                 ensurePrestart();
 14         }
 15     }
 16
 17
 18     /**
 19      * Same as prestartCoreThread except arranges that at least one
 20      * thread is started even if corePoolSize is 0.
 21      *
 22      * 此时任务已入列,即使是 corePoolSize 为 0 ,也要开线程执行
 23      *
 24      */
 25     void ensurePrestart() {
 26         int wc = workerCountOf(ctl.get());
 27         if (wc < corePoolSize)
 28             //以core线程数量为上限,增加线程执行
 29             addWorker(null, true);
 30         else if (wc == 0)
 31             //不以core线程数量为上限,增加线程执行
 32             addWorker(null, false);
 33     }

可以看到,任务加入队列后就进行判断是否线程池shutdown , 最后 addWorker 方法,创建线程就OK了(addWorker 在ThreadPoolExecutor 我们上节已经分析过了,那么此时就等待线程从队列中获取任务就可以了)。

DelayedWorkQueue

下面贴出它的注释,这个队列从名字就可以看出是和 Delay 相关,同时是基于堆操作的,既然是堆,那么应该要知道二叉堆的上浮和下沉操作。

加入元素的操作。

  1     static class DelayedWorkQueue extends AbstractQueue<Runnable>
  2         implements BlockingQueue<Runnable>

  1         public void put(Runnable e) {
  2             offer(e);
  3         }
  4
  5         //插入一个元素
  6         public boolean offer(Runnable x) {
  7             if (x == null)
  8                 throw new NullPointerException();
  9             RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
 10             final ReentrantLock lock = this.lock;
 11             lock.lock();
 12             try {
 13                 int i = size;
 14                 if (i >= queue.length)
 15                     //扩容
 16                     grow();
 17                 size = i + 1;
 18
 19                 //当前队列中没元素
 20                 if (i == 0) {
 21                     queue[0] = e;
 22                     setIndex(e, 0);
 23                 } else {
 24                     //已有元素可,二叉堆上浮操作(上浮意味着都尾部添加再上浮上去)
 25                     siftUp(i, e);
 26                 }
 27                 //无元素队列加入时的情况 ,即是说有元素时是不会唤醒的
 28                 if (queue[0] == e) {
 29                     leader = null;
 30                     //唤醒第一个
 31                     available.signal();
 32                 }
 33             } finally {
 34                 lock.unlock();
 35             }
 36             return true;
 37         }
 38
 39
 40         /**
 41          * Sifts element added at bottom up to its heap-ordered spot.
 42          * Call only when holding lock.
 43          *
 44          * 必须加锁
 45          *
 46          */
 47         private void siftUp(int k, RunnableScheduledFuture<?> key) {
 48             while (k > 0) {
 49                 int parent = (k - 1) >>> 1;
 50                 RunnableScheduledFuture<?> e = queue[parent];
 51                 if (key.compareTo(e) >= 0)
 52                     break;
 53                 // 父类元素大于下面的子类元素 ,交换
 54                 queue[k] = e;
 55                 setIndex(e, k);
 56                 k = parent;
 57             }
 58             queue[k] = key;
 59             setIndex(key, k);
 60         }

上面的 offer 方法是加锁操作。

下面是take 方法,为什么要看take 方法呢?这是因为在ThreadPoolExecute 执行任务,线程获取队列中的方法,调用的是队列的take 方法或是 poll 方法。

  1         public RunnableScheduledFuture<?> take() throws InterruptedException {
  2             //加锁,所以只有一个线程可以进来
  3             final ReentrantLock lock = this.lock;
  4             lock.lockInterruptibly();
  5             try {
  6                 for (;;) {
  7                     RunnableScheduledFuture<?> first = queue[0];
  8                     if (first == null)
  9                         //没有了就阻塞, await方法会释放所有的锁,其他的也会进来的
 10                         //唤醒之后,继续for循环
 11                         available.await();
 12                     else {
 13                         //拿到任务,要是到了延迟的时间就返回任务
 14                         long delay = first.getDelay(NANOSECONDS);
 15                         if (delay <= 0)
 16                             return finishPoll(first);
 17
 18                         //拿到任务,但是还没到时间执行任务
 19                         first = null; // don't retain ref while waiting
 20                         //下面的代码就是先来的拿到 leader ,然后执行 awaitNanos();其它的就 await
 21                         if (leader != null)
 22                             available.await();
 23                         else {
 24                             Thread thisThread = Thread.currentThread();
 25                             leader = thisThread;
 26                             try {
 27                                 available.awaitNanos(delay);
 28                             } finally {
 29                                 if (leader == thisThread)
 30                                     leader = null;
 31                             }
 32                         }
 33                     }
 34                 }
 35             } finally {
 36                 //唤醒后面的
 37                 if (leader == null && queue[0] != null)
 38                     available.signal();
 39                 lock.unlock();
 40             }
 41         }

这个地方使用到了ConditionObject 的 await 唤醒的操作在 有元素加入到队列中,调用 signal 方法,释放锁后就会唤醒下一个元素。同时我们可以看到,leader 的作用在这里起到了 “leader---只能有一个获取得到”的作用。

ScheduledFutureTask

  1     private class ScheduledFutureTask<V>
  2             extends FutureTask<V> implements RunnableScheduledFuture<V>

ScheduledFutureTask 继承Runnable并又线程执行,我们直接看run方法。

  1
  2         /**
  3          * Overrides FutureTask version so as to reset/requeue if periodic.
  4          */
  5     public void run() {
  6             boolean periodic = isPeriodic();
  7             if (!canRunInCurrentRunState(periodic))
  8                 cancel(false);
  9             //是不是时间周期任务
 10             else if (!periodic)
 11                 ScheduledFutureTask.super.run();
 12             //周期任务
 13             else if (ScheduledFutureTask.super.runAndReset()) {
 14                 //设置下次执行的时间周期
 15                 setNextRunTime();
 16                 //重新执行这个任务
 17                 reExecutePeriodic(outerTask);
 18             }
 19     }
 20
 21
 22
 23     /**
 24      * Executes the computation without setting its result, and then
 25      * resets this future to initial state, failing to do so if the
 26      * computation encounters an exception or is cancelled.  This is
 27      * designed for use with tasks that intrinsically execute more
 28      * than once.
 29      *
 30      * @return {@code true} if successfully run and reset
 31      */
 32     //父类的 runAndReset
 33     protected boolean runAndReset() {
 34         //对变量 runner 进行CAS 操作 ,
 35         if (state != NEW ||
 36             !UNSAFE.compareAndSwapObject(this, runnerOffset,
 37                                          null, Thread.currentThread()))
 38             return false;
 39         boolean ran = false;
 40         int s = state;
 41         try {
 42             Callable<V> c = callable;
 43             if (c != null && s == NEW) {
 44                 try {
 45                     //上面的call 方法调用的 run 方法
 46                     c.call(); // don't set result
 47                     ran = true;
 48                 } catch (Throwable ex) {
 49                     setException(ex);
 50                 }
 51             }
 52         } finally {
 53             // runner must be non-null until state is settled to
 54             // prevent concurrent calls to run()
 55             runner = null;
 56             // state must be re-read after nulling runner to prevent
 57             // leaked interrupts
 58             s = state;
 59             if (s >= INTERRUPTING)
 60                 handlePossibleCancellationInterrupt(s);
 61         }
 62         return ran && s == NEW;
 63     }
 64
 65
 66     //父类的 run 方法
 67     public void run() {
 68         if (state != NEW ||
 69             !UNSAFE.compareAndSwapObject(this, runnerOffset,
 70                                          null, Thread.currentThread()))
 71             return;
 72         try {
 73             Callable<V> c = callable;
 74             if (c != null && state == NEW) {
 75                 V result;
 76                 boolean ran;
 77                 try {
 78                     //这是调用了 call 方法
 79                     result = c.call();
 80                     ran = true;
 81                 } catch (Throwable ex) {
 82                     result = null;
 83                     ran = false;
 84                     setException(ex);
 85                 }
 86                 if (ran)
 87                     set(result);
 88             }
 89         } finally {
 90             // runner must be non-null until state is settled to
 91             // prevent concurrent calls to run()
 92             runner = null;
 93             // state must be re-read after nulling runner to prevent
 94             // leaked interrupts
 95             int s = state;
 96             if (s >= INTERRUPTING)
 97                 handlePossibleCancellationInterrupt(s);
 98         }
 99     }
100
101         /**
102          * Sets the next time to run for a periodic task.
103          */
104     private void setNextRunTime() {
105         long p = period;
106         if (p > 0)
107             time += p;
108         else
109             time = triggerTime(-p);
110     }
111
112
113
114     /**
115      * Requeues a periodic task unless current run state precludes it.
116      * Same idea as delayedExecute except drops task rather than rejecting.
117      *
118      * @param task the task
119      */
120     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
121         if (canRunInCurrentRunState(true)) {
122             //又再一次加入队列
123             super.getQueue().add(task);
124             if (!canRunInCurrentRunState(true) && remove(task))
125                 task.cancel(false);
126             else
127                 //再次开始执行
128                 ensurePrestart();
129         }
130     }

我们看到了最终都会执行任务父类里,一个变量的 call方法,我们现在看看这个 call 方法。

  1     /**
  2     * Creates a periodic action with given nano time and period.
  3     */
  4     ScheduledFutureTask(Runnable r, V result, long ns, long period) {
  5             super(r, result);
  6             this.time = ns;
  7             this.period = period;
  8             this.sequenceNumber = sequencer.getAndIncrement();
  9     }
 10
 11     period : 正数(时间周期),负数(固定延迟后执行任务),0(不是可重复的任务)
 12
 13     //父类构造函数
 14     //上面的super(r, result);
 15     public FutureTask(Runnable runnable, V result) {
 16         this.callable = Executors.callable(runnable, result);
 17         this.state = NEW;       // ensure visibility of callable
 18     }
 19
 20
 21     //上面的Executors.callable
 22     public static <T> Callable<T> callable(Runnable task, T result) {
 23         if (task == null)
 24             throw new NullPointerException();
 25         return new RunnableAdapter<T>(task, result);
 26     }
 27
 28
 29
 30     static final class RunnableAdapter<T> implements Callable<T> {
 31         final Runnable task;
 32         final T result;
 33         RunnableAdapter(Runnable task, T result) {
 34             this.task = task;
 35             this.result = result;
 36         }
 37         public T call() {
 38             //实际调用的是 run方法
 39             task.run();
 40             return result;
 41         }
 42     }

可以看到 call 方法里其实执行的是 任务的 run 方法,然后返回个 result .

另外还有几个控制变量,他们在 delayedExecute 和 重写父类的方法 onShutdown 有关,主要的作用是取消任务后是否立即从队列中删除。

  1    /**
  2      * False if should cancel/suppress periodic tasks on shutdown.
  3      */
  4     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
  5
  6     /**
  7      * False if should cancel non-periodic tasks on shutdown.
  8      */
  9     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
 10
 11     /**
 12      * True if ScheduledFutureTask.cancel should remove from queue
 13      */
 14     private volatile boolean removeOnCancel = false;

  1     /**
  2      * Cancels and clears the queue of all tasks that should not be run
  3      * due to shutdown policy.  Invoked within super.shutdown.
  4      */
  5     @Override void onShutdown() {
  6         BlockingQueue<Runnable> q = super.getQueue();
  7         boolean keepDelayed =
  8             getExecuteExistingDelayedTasksAfterShutdownPolicy();
  9         boolean keepPeriodic =
 10             getContinueExistingPeriodicTasksAfterShutdownPolicy();
 11         if (!keepDelayed && !keepPeriodic) {
 12             for (Object e : q.toArray())
 13                 if (e instanceof RunnableScheduledFuture<?>)
 14                     ((RunnableScheduledFuture<?>) e).cancel(false);
 15             q.clear();
 16         }
 17         else {
 18             // Traverse snapshot to avoid iterator exceptions
 19             for (Object e : q.toArray()) {
 20                 if (e instanceof RunnableScheduledFuture) {
 21                     RunnableScheduledFuture<?> t =
 22                         (RunnableScheduledFuture<?>)e;
 23                     if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
 24                         t.isCancelled()) { // also remove if already cancelled
 25                         if (q.remove(t))
 26                             t.cancel(false);
 27                     }
 28                 }
 29             }
 30         }
 31         tryTerminate();
 32     }

参考资料 :

转载于:https://www.cnblogs.com/Benjious/p/10220268.html

java 线程池(2)相关推荐

  1. 四种Java线程池用法解析

    四种Java线程池用法解析 本文为大家分析四种Java线程池用法,供大家参考,具体内容如下 http://www.jb51.net/article/81843.htm 1.new Thread的弊端 ...

  2. 面试必问---Java线程池8大拒绝策略

    前言 谈到java的线程池最熟悉的莫过于ExecutorService接口了,jdk1.5新增的java.util.concurrent包下的这个api,大大的简化了多线程代码的开发.而不论你用Fix ...

  3. Java线程池使用与原理

    线程池是什么? 我们可以利用java很容易创建一个新线程,同时操作系统创建一个线程也是一笔不小的开销.所以基于线程的复用,就提出了线程池的概念,我们使用线程池创建出若干个线程,执行完一个任务后,该线程 ...

  4. Java线程池实现原理及其在美团业务中的实践

    来自:美团技术团队 随着计算机行业的飞速发展,摩尔定律逐渐失效,多核CPU成为主流.使用多线程并行计算逐渐成为开发人员提升服务器性能的基本武器.J.U.C提供的线程池ThreadPoolExecuto ...

  5. Java线程池详解学习:ThreadPoolExecutor

    Java线程池详解学习:ThreadPoolExecutor Java的源码下载参考这篇文章:Java源码下载和阅读(JDK1.8) - zhangpeterx的博客 在源码的目录java/util/ ...

  6. Java 线程池详解学习:FixedThreadPool,CachedThreadPool,ScheduledThreadPool...

    Java常用的线程池有FixedThreadPool和CachedThreadPool,我们可以通过查看他们的源码来进行学习. Java的源码下载参考这篇文章:Java源码下载和阅读(JDK1.8) ...

  7. JAVA线程池ThreadPoolExecutor与阻塞队列BlockingQueue .

    2019独角兽企业重金招聘Python工程师标准>>> 从Java5开始,Java提供了自己的线程池.每次只执行指定数量的线程,java.util.concurrent.Thread ...

  8. Java线程池了解一下

    前言 马上就要过年了,还在岗位上坚守"swimming"的小伙伴们顶住.博主给大家带来一篇线程池的基本使用解解闷. 为什么需要使用线程池 1.减少线程创建与切换的开销 在没有使用线 ...

  9. java线程池拒绝策略_Java核心知识 多线程并发 线程池原理(二十三)

    线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后 启动这些任务,如果线程数量超过了最大数量超出数量的线程排队等候,等其它线程执行完毕, 再从队列中取出任务来执行.他 ...

  10. Java 线程池必知的8 大拒绝策略

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 来源 | kailing.pub/article/ind ...

最新文章

  1. Spring MVC 解读——@Autowired
  2. 美国出手管制五家中国超算企业
  3. 山西出台法规规范社会力量认养文物 系全国首例
  4. mysql数据库系统配置文件_跟我学虚拟机系统平台相关技术及应用——在CentOS系统中的MySql数据库系统配置文件中进行配置定义...
  5. python写软件测试用例_Python单元测试框架unittest:单个测试用例编写步骤及实例...
  6. [导入]创建DataTable对象
  7. 借贷平台Liquity昨日共有超300个Troves被清算
  8. linux常用命令之文件管理
  9. C# XML文件读取
  10. OpenDDS架构说明
  11. 徐思201771010132《面向对象程序设计(java)》第二周学习总结
  12. unity自动生成敌人_Unity学习笔记一:敌人模块和敌人生成器模块脚本编写思路梳理(1)...
  13. 北京3月去哪玩 赏花踏青登山六大推荐
  14. 人工智能权威网站推荐
  15. 使用MayaLiveLink插件在UE4中预览Maya模型动作
  16. r5 3500u和r5 4500u的区别
  17. 最新ITIL考试题库(中英对照版初级)
  18. 企业微信如何提高用户粘性防止粉丝流失
  19. 武汉纺织大学计算机科学校区在哪,武汉纺织大学是一本吗 重点专业是什么 有几个校区及校区地址...
  20. Netty简单实现客户端与服务端收发消息

热门文章

  1. 机器学习性能优化全解
  2. 吴恩达:无监督学习很重要!
  3. CVPR 2019最佳论文重磅出炉!李飞飞获计算机视觉基础贡献奖
  4. 从SAP APO到SAP IBP:CIO如何实现最佳过渡?
  5. 福利丨网友授课视频分享:机器学习实战-KNN-第一部分
  6. torch.nn.functional.pad
  7. python中一些常用函数和库的介绍(getattr、id、type、sys)
  8. 问题1:U盘可以识别但无法打开;问题2:U盘成为启动盘之后如何恢复成普通U盘。
  9. torch.bmm()函数的使用
  10. invalid use of incomplete type ‘class B‘