一 SystemTimer

Kafka中定时器的实现,它在时间轮的基础上添加了执行到期任务,阻塞等待最近到期任务的功能

tickMs: Long 时间格

executorName: String 线程名字

wheelSize: Int 时间轮

startMs: Long 创建时间

taskExecutor:ExecutorService  固定线程池,由此线程执行定时到期任务

delayQueue:DelayQueue[TimerTaskList]  各个层级时间轮公用的DelayQueue,主要作用是阻塞推进时间轮指针的线程ExpiredOperationReaper,等待最近到期的任务

taskCounter:AtomicInteger 各个层级时间轮公用的任务个数计数器

timingWheel:TimingWheel 层级时间轮中对底层的时间轮

readWriteLock:ReentrantReadWriteLock 用来同步时间轮指针的currentTime修改后的读写锁

add 再添加过程中,如果发现任务已经到期,则将任务提交到taskExecutor执行;如果任务未到期,则调用TimerWheel.add提交到时间轮等待后期执行

def add(timerTask:TimerTask): Unit = {
  readLock.lock()
  try {
    // 将TimerTask封装成TimerTaskEntry,并计算到期时间
   
addTimerTaskEntry(newTimerTaskEntry(timerTask,timerTask.delayMs + SystemTime.hiResClockMs))
  } finally {
    readLock.unlock()
  }
}

addTimerTaskEntry 向时间轮提交添加任务

private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {// 向时间轮提交添加任务失败,任务可能到期或者取消if (!timingWheel.add(timerTaskEntry)) {// 如果不是取消,则提交给线程池if (!timerTaskEntry.cancelled)taskExecutor.submit(timerTaskEntry.timerTask)}
}

advanceClock 完成了时间轮指针的推进,同时对到期TimerTaskList的任务进行处理;如果TimerTaskList到期,但其中某些任务未到期 会将未到期的任务进行降级,添加到低层次的时间轮继续等待,如果任务到期了则提交到taskExecutor执行

def advanceClock(timeoutMs: Long): Boolean = {// 从队列取出第一个元素,如果不能立即取出,可以等待timeoutMs毫秒再取,还是没有取到返回空var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)if (bucket != null) {// 表示在队列阻塞期间有任务到期writeLock.lock()try {while (bucket != null) {// 尝试推进当前时间轮的指针,同时也会尝试推进上层时间轮的指针,随着当前时间轮的不断推进,上层时间轮指针早晚会被推进成功timingWheel.advanceClock(bucket.getExpiration())// 调用reinsert,尝试将bucket中的任务重新添加到时间轮,此过程可能会提交到taskExecutor线程执行,未到期的任务进行降级bucket.flush(reinsert)bucket = delayQueue.poll()// 非阻塞的取出第一个元素}} finally {writeLock.unlock()}true} else {false}
}

二 TimerTaskList源码分析

private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {// TimerTaskList 是一个双向的循环列表// root.next 指向下一个// root.prev 指向前一个// 创建一个TimerTaskEntry 节点,初始化next prev 都等于root,这是一个虚拟的TimerTaskEntry,并不存储任何任务private[this] val root = new TimerTaskEntry(null, -1)root.next = rootroot.prev = root// 初始化该环形双向链表到期时间private[this] val expiration = new AtomicLong(-1L)// 设置该环形双向链表的到期时间,如果里面的任务到期则交给线程执行,否则进行降级// 如果到期时间改变返回truedef setExpiration(expirationMs: Long): Boolean = {expiration.getAndSet(expirationMs) != expirationMs}// 获取该环形双向链表的到期时间def getExpiration(): Long = {expiration.get()}// Apply the supplied function to each of tasks in this listdef foreach(f: (TimerTask)=>Unit): Unit = {synchronized {// 获取下一个TimerTaskEntry节点var entry = root.next// 如果TimerTaskEntry不等于rootwhile (entry ne root) {// 获取下一个TimerTaskEntry下一个节点val nextEntry = entry.next// 如果下一个entry没有被取消,调用函数fif (!entry.cancelled) f(entry.timerTask)entry = nextEntry}}}// Add a timer task entry to this listdef add(timerTaskEntry: TimerTaskEntry): Unit = {var done = falsewhile (!done) {// Remove the timer task entry if it is already in any other list// We do this outside of the sync block below to avoid deadlocking.// We may retry until timerTaskEntry.list becomes null.// 如果其他TimerTaskList也存在这个timerTaskEntry则删除它,避免死锁timerTaskEntry.remove()synchronized {timerTaskEntry.synchronized {// timerTaskEntry所在的TimerTaskList如果为空if (timerTaskEntry.list == null) {// put the timer task entry to the end of the list. (root.prev points to the tail entry)// 把当前timerTaskEntry放在队列尾部,并且前一个节点指向尾部的entry,下一个节点指向头部也val tail = root.prevtimerTaskEntry.next = roottimerTaskEntry.prev = tailtimerTaskEntry.list = thistail.next = timerTaskEntryroot.prev = timerTaskEntrytaskCounter.incrementAndGet()// 时间轮任务总数+1done = true}}}}}// 从TimerTaskLKList删除指定的TimerTaskEntrydef remove(timerTaskEntry: TimerTaskEntry): Unit = {synchronized {timerTaskEntry.synchronized {// 如果timerTaskEntry所在的TimerTaskList就是当前的TimerTaskList// 删除这个timerTaskEntryif (timerTaskEntry.list eq this) {timerTaskEntry.next.prev = timerTaskEntry.prevtimerTaskEntry.prev.next = timerTaskEntry.nexttimerTaskEntry.next = nulltimerTaskEntry.prev = nulltimerTaskEntry.list = nulltaskCounter.decrementAndGet()// 时间轮任务总数-1}}}}// 删除所有的task entry,并且每一个task entry应用函数fdef flush(f: (TimerTaskEntry)=>Unit): Unit = {synchronized {var head = root.nextwhile (head ne root) {remove(head)f(head)head = root.next}expiration.set(-1L)}}def getDelay(unit: TimeUnit): Long = {unit.convert(max(getExpiration - SystemTime.hiResClockMs, 0), TimeUnit.MILLISECONDS)}def compareTo(d: Delayed): Int = {val other = d.asInstanceOf[TimerTaskList]if(getExpiration < other.getExpiration) -1else if(getExpiration > other.getExpiration) 1else 0}}

三 TimerTaskEntry源码分析

private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: Long) extends Ordered[TimerTaskEntry] {@volatilevar list: TimerTaskList = null // TimerTaskEntry所在的TimerTaskList双向循环链表var next: TimerTaskEntry = null // 下一个TimerTaskEntryvar prev: TimerTaskEntry = null // 前一个TimerTaskEntry// 给当前TimerTaskEntry设置timer taskif (timerTask != null) timerTask.setTimerTaskEntry(this)// 当前timer task entry是否取消了timer taskdef cancelled: Boolean = {timerTask.getTimerTaskEntry != this}def remove(): Unit = {var currentList = list// remove 被调用,其他的线程将会移动这个entry到一个其他的TimeTaskList,因此我们会重试直到TimeTaskList为空// 删除这个entry可能会失败,因为TimeTaskList的值的改变while (currentList != null) {// 从TimerTaskLKList删除指定的TimerTaskEntrycurrentList.remove(this)// TimerTaskLKList重新赋给currentListcurrentList = list}}override def compare(that: TimerTaskEntry): Int = {this.expirationMs compare that.expirationMs}
}

四 TimerTask分析

trait TimerTask extends Runnable {val delayMs: Long // timestamp in millisecondprivate[this] var timerTaskEntry: TimerTaskEntry = nulldef cancel(): Unit = {synchronized {if (timerTaskEntry != null) timerTaskEntry.remove()timerTaskEntry = null}}// 即某一个timer task entry希望持有这个timer task,但是其他的timer task正持有它,则就把当前的这个timer task entry给删掉private[timer] def setTimerTaskEntry(entry: TimerTaskEntry): Unit = {synchronized {// 如果某一个timer task被已经存在的timer task entry所持有,我们首先会删除这个timer task entryif (timerTaskEntry != null && timerTaskEntry != entry)timerTaskEntry.remove()timerTaskEntry = entry}}private[timer] def getTimerTaskEntry(): TimerTaskEntry = {timerTaskEntry}}

SystemTimer,TimerTaskList等源码分析相关推荐

  1. Netty学习十七:源码分析之HashWheelTimer

    一.常见定时任务实现 定时器的使用场景包括:成月统计报表.财务对账.会员积分结算.邮件推送等,它一般有三种表现形式:按固定周期定时执行.延迟一定时间后执行.指定某个时刻执行. 定时器的本质是设计一种数 ...

  2. 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析

    目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...

  3. SpringBoot-web开发(四): SpringMVC的拓展、接管(源码分析)

    [SpringBoot-web系列]前文: SpringBoot-web开发(一): 静态资源的导入(源码分析) SpringBoot-web开发(二): 页面和图标定制(源码分析) SpringBo ...

  4. SpringBoot-web开发(二): 页面和图标定制(源码分析)

    [SpringBoot-web系列]前文: SpringBoot-web开发(一): 静态资源的导入(源码分析) 目录 一.首页 1. 源码分析 2. 访问首页测试 二.动态页面 1. 动态资源目录t ...

  5. SpringBoot-web开发(一): 静态资源的导入(源码分析)

    目录 方式一:通过WebJars 1. 什么是webjars? 2. webjars的使用 3. webjars结构 4. 解析源码 5. 测试访问 方式二:放入静态资源目录 1. 源码分析 2. 测 ...

  6. Yolov3Yolov4网络结构与源码分析

    Yolov3&Yolov4网络结构与源码分析 从2018年Yolov3年提出的两年后,在原作者声名放弃更新Yolo算法后,俄罗斯的Alexey大神扛起了Yolov4的大旗. 文章目录 论文汇总 ...

  7. ViewGroup的Touch事件分发(源码分析)

    Android中Touch事件的分发又分为View和ViewGroup的事件分发,View的touch事件分发相对比较简单,可参考 View的Touch事件分发(一.初步了解) View的Touch事 ...

  8. View的Touch事件分发(二.源码分析)

    Android中Touch事件的分发又分为View和ViewGroup的事件分发,先来看简单的View的touch事件分发. 主要分析View的dispatchTouchEvent()方法和onTou ...

  9. MyBatis原理分析之四:一次SQL查询的源码分析

    上回我们讲到Mybatis加载相关的配置文件进行初始化,这回我们讲一下一次SQL查询怎么进行的. 准备工作 Mybatis完成一次SQL查询需要使用的代码如下: Java代码   String res ...

最新文章

  1. 22条API设计的最佳实践
  2. matlab绘制bland-altman,制作Bland-Altman图的步骤和程序(以SPSS作图为例讲解)
  3. MySql基础教程(三)——查询训练
  4. LeetCode 695. Max Area of Island javascript解决方案
  5. 深度学习在文本领域的应用
  6. 保证一致性吗_RabbitMQ消息一致性:重要消息,请设置持久化
  7. es6 实例:使用Proxy实现观察者模式
  8. android 初始化变量,变量初始化 - Android Studio
  9. python进阶中文版_GitHub - lcm2179/Intermediate-Python: 《Python进阶》(Intermediate Python 中文版)...
  10. 会员运营五大难题与破解之道
  11. 「读书感悟系列」苏世民:我的经验与教训
  12. pion/ion搭建
  13. 关于函数不定积分的方法总结
  14. win10安装无线显示器失败
  15. java72-GUL流式布局管理器
  16. 数据时代的的企业管理 记SAP商业同略会
  17. 苏宁从面试到入职历险记
  18. 英文输入时的自动补全功能
  19. GEE学习笔记:在Google Earth Engine(GEE)中计算坡度、坡向、山体阴影
  20. arcgis中的标注和注记

热门文章

  1. java画虚线_在java中绘制虚线
  2. 500万数据mysql_mysql 单表500万数据经过处理后新增到新表
  3. 制造业数字化转型的困难_智能制造如何助力企业转型升级?百家制造业企业共谋数字化转型路...
  4. java null equals_Java null检查为什么使用==代替.equals()
  5. edge css兼容,CSS输入错误样式在Edge浏览器中无法正确显示
  6. ipv6网络使用scp,并解决No route to host与no matches found报错
  7. Ubuntu 装机必备设置与软件安装
  8. ct与x光的哪个辐射大_ct和x光哪个辐射大
  9. 动态代理和静态代理的区别_代理,是动态和静态的吗?
  10. 中3d库后接负载_500W电源横评:交叉负载放倒3款产品