女主宣言

Kafka 作为一个支持实时处理大量请求的分布式流处理平台,需要一个设计良好的定时器来处理异步任务。本文作者将基于 Kafka 1.1.0 版本的源码来介绍 Kafka 中定时器的基础数据结构——时间轮的原理和实现。

PS:丰富的一线技术、多元化的表现形式,尽在“360云计算”,点关注哦!

1

简单时间轮

简单时间轮是时间任务桶的循环链表,又被称为桶(bucket)。令 u 为时间单元大小,一个大小为 n 的时间轮有 n 个桶,能够持有 n * u 个定时任务,每个任务的过期时间会落在一个时间间隔内。(注:下文的 u 和 n 沿用这个定义)

每个桶持有进入相应时间范围的定时任务。第一个桶持有 [0, u) 范围的任务,第二个桶持有 [u, 2u) 范围的任务……第 n 个桶持有 [u * (n - 1), u * n) 范围的任务。每过一个时间单元 u,定时器会推进并移动到下个桶,然后第一个桶中所有的定时任务都会过期。由于任务已经过期,此时定时器不会插入任务到当前桶中。定时器会立刻运行过期的任务。因为空桶在下一轮是可用的,所以如果当前的桶对应时间 t,那么它会在推进后变成 [t + u * n, t + (n + 1) * u) 的桶。

本质上时间轮就是个哈希表,通过对任务的过期时间求哈希,落到对应的位置。而每个位置对应的 bucket 是链表,因此时间轮插入/删除定时任务的时间复杂度是 O(1)。而基于优先队列的定时器,比如 java.util.concurrent.DelayQueue 和 java.util.Timer 插入/删除的时间复杂度是 O(log n)。

2

分层时间轮

简单时间轮的主要缺点是它假定定时器请求是在从当前时刻开始的 n * u 时间间隔内,如果定时器请求超出了这个间隔就会产生溢出,导致任务无法放入时间轮中。分层时间轮会处理这种溢出,它以层次来组织时间轮,最底层的精度更高,层数越高,表示的精度更低。这里用精度来指代时间单元大小。

举例说明,令 u = 1, n = 3,设起始时刻是 c,则各层次的桶为

层次 精度
1 [c,c] [c+1,c+1] [c+2,c+2]  1
2 [c,c+2] [c+3,c+5] [c+6,c+8] 3
3 [c,c+8] [c+9,c+17] [c+18,c+26] 9

PS:这里沿用了代码注释里的表示,即闭区间,而前面讲述原理时都是左闭右开区间,两者是等价的,只是表示不一致。

在 c+1 时刻,桶 [c,c]、[c,c+2]、[c,c+8]过期了,之后:

  • 1 层的时钟移动到 c+1,并且创建新的桶 [c+3,c+3];

  • 2、3 层的时钟仍然在 c 处,因为他们没完全过期。

此时各层次的桶为:

层次 精度
1 [c+1,c+1] [c+2,c+2] [c+3,c+3] 1
2 [c,c+2] [c+3,c+5] [c+6,c+8]  3
3 [c,c+8] [c+9,c+17] [c+18,c+26] 9

注意,桶 [c,c+2] 不会接收任何任务,因为此时时刻是 c+1,只有过期时间为 c+1 和 c+2 才会被分配到该桶,然而 1 层的两个桶 [c+1,c+1] [c+2,c+2] 会优先接收任务。类似地,3 层的 [c+1,c+8] 也不会接收任何任务,因为这个范围被 2 层的桶覆盖了。

对单层时间轮,插入/删除定时任务的时间复杂度都是 O(1)。对分层时间轮,令 m 是时间轮的数量,则插入的时间复杂度是 O(m),因为最多向上插 m 次。相比起系统中请求的数量,m 通常是小很多的。而删除的时间复杂度是 O(1)。

像时钟就是一个典型的三层时间轮,秒针能表示 0 到 59 秒,但是对 60 秒以上则需要分针进一步表示,再进一步即时针,一共能表示的时间范围为 0 到 43199 秒,精度为 1 秒。从秒针到分针到时针,表示精度是依次降低的,秒针精度为 1 秒,有 60 格,因此分针精度是 1 * 60 = 60 秒,类似地,时钟精度是 3600 秒。

3

TimingWheel 的实现

了解了分层时间轮的概念后,阅读代码实现也就简单了。Kafka 时间轮为 TimingWheel 类,位于 kafka.utils.timer 包。

内部字段

名称 类型 说明
tickMs Long 时间单元 u
wheelSize Int 桶的数量 n
startMs Long 毫秒级时间戳 
taskCounter AtomicInteger 任务数量,即所有桶的节点数量之和
queue DelayQueue[TimerTaskList] 标准库的延时队列

通过上述主构造参数可以计算出以下私有字段(private[this],可以被包内其他类访问)

  // 当前时间轮的整个时间跨度,即更高一层时间轮的 tickMs  private[this] val interval = tickMs * wheelSize  // 创建 wheelSize 个桶(定时任务链表)  private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }  // 向下取整,使起始时间戳能被 tickMs 整除  private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs  // 高一层时间轮,用来保存超过 interval 的任务  @volatile private[this] var overflowWheel: TimingWheel = null

通过 addOverflowWheel 创建高一层时间轮:

  private[this] def addOverflowWheel(): Unit = {    synchronized {      if (overflowWheel == null) {  // 双重检查上锁        overflowWheel = new TimingWheel(          // 仅有 tickMs 不是原封不动地转发低层时间轮的字段,因为高层时间轮的时间单元粒度更粗(即精度更低)          // 还是参考时钟,时针的 tickMs 是分针 tickMs 的 60 倍          tickMs = interval,          wheelSize = wheelSize,          startMs = currentTime,          taskCounter = taskCounter,          queue        )      }    }  }

添加定时任务

在 Kafka 中,定时任务被抽象为 TimerTaskEntry 类,而桶(定时任务链表)则被抽象为 TimerTaskList 类,在代码中命名都是 bucket(桶)。bucket 实现了 java.util.concurrent.Delayed 接口:

  def getDelay(unit: TimeUnit): Long = {    unit.convert(max(getExpiration - Time.SYSTEM.hiResClockMs, 0), TimeUnit.MILLISECONDS)  }

因此 bucket 能够被加入延时队列中,延时队列在调用 poll 时,会调用内部对象的 getDelay 方法来判断对象是否可以被弹出。再看看实际的 add 实现:

  def add(timerTaskEntry: TimerTaskEntry): Boolean = {    // 定时任务的过期时间戳    val expiration = timerTaskEntry.expirationMs    if (timerTaskEntry.cancelled) {      // Entry 绑定的 TimerTask 调用了 cancel() 方法主动将 Entry 从链表中移除      false    } else if (expiration < currentTime + tickMs) {      // 过期时间在第一个桶的范围内,表示已经过期,此时无需加入时间轮      false    } else if (expiration < currentTime + interval) {      // 过期时间在当前时间轮能表示的时间范围内,加入到其中一个桶      // 注意按照这个算法,第一个桶的时间范围是 [c+u,c+u*2),因为 [c,c+u) 范围内被视为已过期      // 而且第一个桶对应 buckets 的下标并不一定是 0,因为数组只是作为循环队列的存储方式,起始下标无所谓      val virtualId = expiration / tickMs      val bucket = buckets((virtualId % wheelSize.toLong).toInt)      bucket.add(timerTaskEntry)      // 设置过期时间,这里也取整了,即可以被 tickMs 整除      if (bucket.setExpiration(virtualId * tickMs)) { // 仅在新的过期时间和之前的不同才返回 true        // 由于进行了取整,同一个 bucket 所有节点的过期时间都相同,因此仅在 bucket 的第一个节点加入时才会进入此 if 块        // 因此保证了每个桶只会被加入一次到 queue 中,queue 存放所有包含定时任务节点的 bucket        // 借助 DelayQueue 来检测 bucket 是否过期,bucket 时遍历即可取出所有节点        queue.offer(bucket)      }      true    } else {      // 过期时间在当前时间轮表示的范围之外,即溢出,需要创建高一层时间轮来加入      if (overflowWheel == null) addOverflowWheel() // 双重检查上锁的第一层检查      overflowWheel.add(timerTaskEntry) // 注意高一层时间轮也可能无法容纳,因此可能会递归创建更高层级的时间轮    }  }

可以看到 DelayQueue 对象 queue 在时间轮的作用是,保存包含定时任务节点的桶,桶可以来自不同层次的时间轮,当然,所有层次时间轮也共享这个队列。

TimingWheel 本身没有实现推进功能,而是借助延迟队列 DelayQueue 来实现时间的推移,假设有 M 个定时任务分布在 N 个桶中,那么插入的时间复杂度为 O(M + N * log N),其中 M >= N。如果把任务全存到延迟队列中,那么插入的时间复杂度为 O(M * log M),因此 Kafka 时间轮的优化是有意义的。

时间轮的推进

  def advanceClock(timeMs: Long): Unit = {    if (timeMs >= currentTime + tickMs) { // timeMs 超过了当前 bucket 的时间范围      currentTime = timeMs - (timeMs % tickMs) // 修改当前时间,即原先的第一个桶已经失效      // 若存在更高层的时间轮,则也会向前运转      if (overflowWheel != null) overflowWheel.advanceClock(currentTime)    }  }

仅仅是修改 currentTime,该字段决定内部的 bucket 是否过期,见前面的 add 方法实现。

4

时间轮在 Kafka 管理定时任务中的作用

Kafka 使用 kafka.server 包下的 DelayedOperationPurgatory(下文简称 purgatory)类来管理异步任务(即延时操作 DelayedOperation)。每次 Kafka 收到一个请求,都会启动一个异步任务,如果不能立刻完成(比如 acks 设为 all 的 Produce 请求),则扔给 purgatory 保存(即插入到内部的时间轮中)。purgatory 会运行一个 ExpiredOperationReaper 后台线程来检测过期的异步任务并进行处理,在线程函数中会反复调用内部定时器对象 timeoutTimer 的 advanceClock 方法向前推进,如果有过期的任务,则会将其从定时器中移除并执行回调:

  private class ExpiredOperationReaper extends ShutdownableThread(/* ... */) {    // doWork 方法会在线程函数,即基类 Thread 的 run 方法中循环调用    override def doWork() {      advanceClock(200L)  // 200 ms    }  }

而 purgatory 是使用 kafka.utils.timer 包下的 SystemTimer 类作为定时器的:

  def apply[T <: delayedoperation string ... delayedoperationpurgatory>    val timer = new SystemTimer(purgatoryName)    new DelayedOperationPurgatory[T](purgatoryName, timer, /* ... */)  }

而在 SystemTimer 中,关键字段为时间轮对象 timingWheel:

  // java.util.concurrent 包提供的延时队列  private[this] val delayQueue = new DelayQueue[TimerTaskList]()  private[this] val taskCounter = new AtomicInteger(0)  // Kafka 在 kafka.utils.timer 包中自行实现的时间轮  private[this] val timingWheel = new TimingWheel(    tickMs = tickMs,    wheelSize = wheelSize,    startMs = startMs,    taskCounter = taskCounter,    delayQueue  )

其 advanceClock 方法实际上是调用 timingWheel.advanceClock 方法:

  def advanceClock(timeoutMs: Long): Boolean = {    // 从延时队列中等待 timeout 毫秒,若有过期 bucket 则取出    var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)    if (bucket != null) {  // 存在过期 bucket      writeLock.lock()      try {        while (bucket != null) {          // 推进当前时间轮,内部可能会递归推进更高一层时间轮,currentTime 被修改          timingWheel.advanceClock(bucket.getExpiration())          bucket.flush(reinsert)          // 默认 timeout 为 0,即非阻塞,也就是说尽可能取出当前时刻所有过期的 buckets          bucket = delayQueue.poll()        }      } finally {        writeLock.unlock()      }      true    } else {      false    }  }

可见,SystemTimer 对象在调用 advancedClock 推进时间时,其实是从延时队列中取出推进的时间内所有过期的 bucket,然后 flush:

  // Remove all task entries and apply the supplied function to each of them  def flush(f: (TimerTaskEntry)=>Unit): Unit = {    synchronized {      // 遍历整个 bucket(链表),remove 删除所有节点      var head = root.next      while (head ne root) {        remove(head)        f(head)        head = root.next      }      expiration.set(-1L)    }  }

注意到,传入 flush 的是 reinsert 函数:

private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)

问题来了,为啥删除之后又要重新插入呢?因为如果取出的这个 bucket 是属于高层时间轮的,由于高层时间轮精度不够,此时 bucket 可能并未过期。举个两层时间轮的例子(单位:毫秒):

层次
1 [0,1) [1,2)
2 [0,2) [2,4)

初始状态下,延时为 3 的任务被加入 [2,4),调用 advanceClock(2) 后,时间轮变成了:

层次
1 [2,3) [3,4) 
2 [2,4) [4,6) 

第 2 层的 [2,4) 被取出,然后延时为 3 的任务被取出,此时调用 reinsert 就会将其加入第 1 层的 [3,4),而不是立刻判断它过期。从高层时间轮降级到底层时间轮被隐藏在了这句不起眼的 bucket.flush(reinsert) 中。

5

总结

本文讲述了简单时间轮以及分层时间轮的概念,然后通过源码阅读讲述了 Kafka 中为何以及如何实现分层时间轮的。对于大量请求,每个请求对应一个定时任务,需要大量的插入/删除操作,因此采用了多层时间轮来降低插入/删除的时间复杂度,出于避免重复造轮子的考虑,Kafka 仍然借助 Java 标准库的延时队列来推进时间轮。除了学习了时间轮之外,Kafka 对时间轮的实现也给了我们另一个启示:优化要在性能敏感的地方优化,对于性能不敏感的操作,能用现成轮子就不要自己费心思重复造轮子。

如果大家有什么建议或疑问,可以在下方留言交流。

360云计算

由360云平台团队打造的技术分享公众号,内容涉及数据库、大数据、微服务、容器、AIOps、IoT等众多技术领域,通过夯实的技术积累和丰富的一线实战经验,为你带来最有料的技术分享

java方法里面能改定时器的时间吗_Kafka 时间轮的原理和实现相关推荐

  1. java方法调用之动态调用多态(重写override)的实现原理——方法表

    转自:http://blog.csdn.net/fan2012huan/article/details/51007517 上两篇篇博文讨论了java的重载(overload)与重写(override) ...

  2. java语言say方法,简单了解Java方法的定义和使用实现详解

    简单了解Java方法的定义和使用实现详解 发布时间:2020-09-25 11:36:07 来源:脚本之家 阅读:78 作者:OLIVER_QIN 这篇文章主要介绍了简单了解Java方法的定义和使用实 ...

  3. java方法调用之单分派与多分派(二)

    上篇博文java方法调用之重载.重写的调用原理(一) 讨论了重写与重载的实现原理,这篇博文讨论下单分派与多分派. 单分派.多分派 方法的接收者和方法的参数统称为方法的宗量. 根据分派基于宗量多少(接收 ...

  4. 【Java】【问题记录】 解决通过new Date()方法获取时间与当前时间时区不一致问题

    问题出现原因 项目部署到linux服务器,通过new Date()获取时间和在windows本地时间不一致,通过排查问题,发现是new Date()的问题 linux系统上通过new Date()获取 ...

  5. 计算机改计数器的方法,第五章定时器计数器(修改)-计算机原理及应用资源共享课.ppt...

    第五章定时器计数器(修改)-计算机原理及应用资源共享课.ppt 1 第5章 MCS-51单片机定时器/计数器 主要内容 定时/计数器的工作原理模式 定时/计数器的工作方式 定时/计数器的应用 2 (1 ...

  6. java中的System.currentTimeMillis()是什么?时间的单位转换以及方法的使用

    在开发过程中,通常很多人都习惯使用new Date()来获取当前时间.new Date()所做的事情其实就是调用了System.currentTimeMillis().如果仅仅是需要或者毫秒数,那么完 ...

  7. Java UTC时间转CST时间的方法

    有时,从数据库查询到时间是String类型的UTC时间,需要手动转为CST时间,以下是转换方法: import java.text.ParseException; import java.text.S ...

  8. UTC时间转北京时间JAVA方法

    public static void UTCtoCST(String utc){//CST可视为美国.澳大利亚.古巴或中国的标准时间,北京时间与utc时间相差8小时ZonedDateTime zdt ...

  9. Java 中Timer和TimerTask 定时器和定时任务使用的例子

    转载自  Java 中Timer和TimerTask 定时器和定时任务使用的例子 这两个类使用起来非常方便,可以完成我们对定时器的绝大多数需求 Timer类是用来执行任务的类,它接受一个TimerTa ...

最新文章

  1. ASP.NET MVC使用Bootstrap系列(1)——开始使用Bootstrap
  2. 什么是python-马哥教育官网-专业Linux培训班,Python培训机构
  3. 1122 Hamiltonian Cycle (25 分)【难度: 一般 / 知识点: 模拟 哈密顿回路】
  4. macOS zip 打包加密和 unzip 解压
  5. AOP拦截器 表达式写法
  6. PHP表单常用正则表达式(URL、HTTP、手机、邮箱等)
  7. zoj 3640 Help Me Escape (概率dp 递归求期望)
  8. 一次library cache pin故障的解决过程
  9. 线性表_循环链表(增减删查 + 约瑟夫环问题 代码实现 )
  10. 正在更新office,应用程序无法正常启动
  11. 语音识别算法原理文档整理(二)
  12. django慢学日常
  13. 计算机为何会自动开机,电脑自动开机是怎么回事 电脑自动开机解决方法
  14. openlayers4+中高德导航路径的实现
  15. 语音转文字软件哪个好,这三款值得收藏
  16. 冶金、水泥、化工行业自动化通信产品介绍
  17. window10安装minio
  18. 你也能成为 “最强大脑”
  19. WIN7_64 下DNW驱动
  20. 用c语言实现的FFT

热门文章

  1. OpenCV:读取与写入图片
  2. 区间DP之环形石子合并
  3. 目标检测——YOLOv5的学习笔记
  4. Python使用ffmpeg下载m3u8拼接为视频
  5. #C语言#警告:隐式声明函数‘xxx’ [-Wimplicit-function-declaration]
  6. python优先队列,队列和栈
  7. pat1057 stack
  8. I2C与SPI通信总线协议
  9. JS记坑 ----- children返回的类数组
  10. TypeError: Cannot set properties of undefined (setting ‘innerHTML‘)