在kafka中,有许多请求并不是立即返回,而且处理完一些异步操作或者等待某些条件达成后才返回,这些请求一般都会带有timeout参数,表示如果timeout时间后服务端还不满足返回的条件,就判定此次请求为超时,这时候kafka同样要返回超时的响应给客户端,这样客户端才知道此次请求超时了。比如ack=-1的producer请求,就需要等待所有的isr备份完成了才可以返回给客户端,或者到达timeout时间了返回超时响应给客户端。

上面的场景,可以用延迟任务来实现。也就是定义一个任务,在timeout时间后执行,执行的内容一般就是先检查返回条件是否满足,满足的话就返回客户端需要的响应,如果还是不满足,就发送超时响应给客户端。

对于延迟操作,java自带的实现有Timer和ScheduledThreadPoolExecutor。这两个的底层数据结构都是基于一个延迟队列,在准备执行一个延迟任务时,将其插入到延迟队列中。这些延迟队列其实就是一个用最小堆实现的优先级队列,因此,插入一个任务的时间复杂度是O(logN),取出一个任务执行后调整堆的时间也是O(logN)。

如果要执行的延迟任务不多,O(logN)的速度已经够快了。但是对于kafka这样一个高吞吐量的系统来说,O(logN)的速度还不够,为了追求更快的速度,kafka的设计者使用了Timing Wheel的数据结构,让任务的插入时间复杂度达到了O(1)。

Timing Wheel

image.png

上面是时间轮的一个结构图,该时间轮有8个槽,当前时间指向0号槽。

我们再看一下Kafka里面TimingWheel的数据结构

private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {private[this] val interval = tickMs * wheelSizeprivate[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs
}

tickMs:表示一个槽所代表的时间范围,kafka的默认值的1ms

wheelSize:表示该时间轮有多少个槽,kafka的默认值是20

startMs:表示该时间轮的开始时间

taskCounter:表示该时间轮的任务总数

queue:是一个TimerTaskList的延迟队列。每个槽都有它一个对应的TimerTaskList,TimerTaskList是一个双向链表,有一个expireTime的值,这些TimerTaskList都被加到这个延迟队列中,expireTime最小的槽会排在队列的最前面。

interval:时间轮所能表示的时间跨度,也就是tickMs*wheelSize

buckets:表示TimerTaskList的数组,即各个槽。

currentTime:表示当前时间,也就是时间轮指针指向的时间

运行原理

当新增一个延迟任务时,通过buckets[expiration / tickMs % wheelSize]先计算出它应该属于哪个槽。比如延迟任务的delayMs=2ms,当前时间currentTime是0ms,则expiration=delayMs+startMs=2ms,通过前面的公式算出它应该落于2号槽。并把任务封装成TimerTaskEntry然后加入到TimerTaskList链表中。

之后,kafka会启动一个线程,去推动时间轮的指针转动。其实现原理其实就是通过queue.poll()取出放在最前面的槽的TimerTaskList。由于queue是一个延迟队列,如果队列中的expireTime没有到达,该操作会阻塞住,直到expireTime到达。如果通过queue.poll()取到了TimerTaskList,说明该槽里面的任务时间都已经到达。这时候就可以遍历该TimerTaskList中的任务,然后执行对应的操作了。

针对上面的例子,就2号槽有任务,所以当取出2号槽的TimerTaskList后,会先将currentTime = timeMs - (timeMs % tickMs),其中timeMs也就是该TimerTaskList的expireTime,也就是2Ms。所以,这时currentTime=2ms,也就是时间轮指针指向2Ms。

时间溢出处理

在kafka的默认实现中,tickMs=1Ms,wheelSize=20,这就表示该时间轮所能表示的延迟时间范围是0~20Ms,那如果延迟时间超过20Ms要如何处理呢?Kafka对时间轮做了一层改进,使时间轮变成层级的时间轮。

一开始,第一层的时间轮所能表示时间范围是0~20Ms之间,假设现在出现一个任务的延迟时间是200Ms,那么kafka会再创建一层时间轮,我们称之为第二层时间轮。

第二层时间轮的创建代码如下

overflowWheel = new TimingWheel(tickMs = interval,wheelSize = wheelSize,startMs = currentTime,taskCounter = taskCounter,queue
)

也就是第二层时间轮每一个槽所能表示的时间是第一层时间轮所能表示的时间范围,也就是20Ms。槽的数量还是一样,其他的属性也是继承自第一层时间轮。这时第二层时间轮所能表示的时间范围就是0~400Ms了。

之后通过buckets[expiration / tickMs % wheelSize]算出延迟时间为200Ms的任务应该位于第二层时间轮的10号槽位。

同理,如果第二层时间轮的时间范围还容纳不了新的延迟任务,就会创建第三层、第四层...

值得注意的是,只有当前时间轮无法容纳目标延迟任务所能表示的时间时,才需要创建更高一级的时间轮,或者说把该任务加到更高一级的时间轮中(如果该时间轮已创建)。

一些细节

  1. 当时间轮的指针指向1号槽时,即currentTime=1Ms,说明0号槽的任务都已经到期了,这时0号槽就会被拿出来复用,可以容纳20~21Ms延迟时间的任务。也就是说,如果currentTime=0Ms时进来一个21Ms的延迟任务,就需要创建更高一级的时间轮,但是如果currentTime=1Ms时进来一个21Ms的延迟任务,就可以直接把它放到0号槽中,当currentTime=21时,指针又指向0号槽
  2. 细心的同学可能发现,第一层的0号槽所能表示的任务延迟时间范围是01Ms,对应的TimerTaskList的expireTime是0Ms。第二层的0号槽锁能表示的任务延迟时间范围是020Ms,对应的TimerTaskList的expireTime也是0Ms。他们的TimerTaskList又都是放在一个延迟队列中。这时候执行queue.poll()会把这两个TimerTaskList都取出来,然后遍历链表的时候还会判断该任务是否达到执行时间了,如果没有的话,这些任务还会被塞回时间轮中。这时由于第一层指针的转动,原先处于第二层时间轮中的任务可能会重新落到第一层时间轮上面。

源码解析

添加新的延迟任务

//SystemTimer.scala
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {if (!timingWheel.add(timerTaskEntry)) {// Already expired or cancelledif (!timerTaskEntry.cancelled)taskExecutor.submit(timerTaskEntry.timerTask)}}

往时间轮添加新的任务

//TimingWheel
def add(timerTaskEntry: TimerTaskEntry): Boolean = {//获取任务的延迟时间val expiration = timerTaskEntry.expirationMs//先判断任务是否已经完成if (timerTaskEntry.cancelled) {false//如果任务已经到期} else if (expiration < currentTime + tickMs) {false//判断当前时间轮所能表示的时间范围是否可以容纳该任务} else if (expiration < currentTime + interval) {// 根据任务的延迟时间算出应该位于哪个槽val virtualId = expiration / tickMsval bucket = buckets((virtualId % wheelSize.toLong).toInt)bucket.add(timerTaskEntry)// 设置TimerTaskList的expireTimeif (bucket.setExpiration(virtualId * tickMs)) {//把TimerTaskList加入到延迟队列queue.offer(bucket)}true} else {//如果时间超出当前所能表示的最大范围,则创建新的时间轮,并把任务添加到那个时间轮上面if (overflowWheel == null) addOverflowWheel()overflowWheel.add(timerTaskEntry)}}private[this] def addOverflowWheel(): Unit = {synchronized {if (overflowWheel == null) {overflowWheel = new TimingWheel(tickMs = interval,wheelSize = wheelSize,startMs = currentTime,taskCounter = taskCounter,queue)}}}

从上面的代码可以看出,对于当前时间轮是否可以容纳目标任务,是通过expiration < currentTime + interval来计算的,也就是根据时间轮的指针往后推interval时间就是时间轮所能表示的时间范围。

时间轮指针的推进

 //SystemTimer.scala
def advanceClock(timeoutMs: Long): Boolean = {//从延迟队列中取出最近的一个槽,如果槽的expireTime没到,此操作会阻塞timeoutMsvar bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)if (bucket != null) {writeLock.lock()try {while (bucket != null) {//推进时间轮的指针timingWheel.advanceClock(bucket.getExpiration())//把TimerTaskList的任务都取出来重新add一遍,add的时候会检查任务是否已经到期bucket.flush(reinsert)bucket = delayQueue.poll()}} finally {writeLock.unlock()}true} else {false}}
//TimingWheel
def advanceClock(timeMs: Long): Unit = {if (timeMs >= currentTime + tickMs) {//推进时间轮的指针currentTime = timeMs - (timeMs % tickMs)// 推进上层时间轮的指针if (overflowWheel != null) overflowWheel.advanceClock(currentTime)}}

总结

相比于常用的DelayQueue的时间复杂度O(logN),TimingWheel的数据结构在插入任务时只要O(1),获取到达任务的时间复杂度也远低于O(logN)。另外,kafka的TimingWheel在插入任务之前还会先检查任务是否完成,对于那些在任务超时直接就完成指定操作的场景,TimingWheel的表现更加优秀。

作者:疯狂的哈丘
链接:https://www.jianshu.com/p/0f0fec47a0ad
来源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

TimingWheel 时间轮详解相关推荐

  1. mysql 获取当前整点_Oracle 生成未来三天的整点时间(步骤详解)

    需求:X坐标轴时间都为整点时间,展示未来三天内的预测(x轴展示未来三天的整点时间),每3个小时一个刻度,横坐标共计24个刻度 步骤一:取当前时间 SELECT SYSDATE FROM DUAL 步骤 ...

  2. linux脚本日期时间,Linux 日期和时间操作详解

    Linux 日期和时间操作详解 发布时间:2012-11-27 15:10:07   作者:佚名   我要评论 Linux将时钟分为系统时钟(System Clock)和硬件(Real Time Cl ...

  3. SQL Server时间粒度系列----第4节季、年时间粒度详解

    本文目录列表: 1.SQL Server季时间粒度 2.SQL Server年时间粒度 3.总结语 4.参考清单列表 SQL Serve季时间粒度   季时间粒度也即是季度时间粒度.一年每3个月是一个 ...

  4. TOEIC考试时间分配详解

    TOEIC考试时间分配详解 托业考试题全部为单项选择 涂答题卡 答错不扣分 除听力第二部分为3选1外其余皆为4选1 每一类题型前都会有详细的答题指引 考试时间 120分钟(45+75) 原始分换算为标 ...

  5. 如何打印time_t时间类型? time_t时间类型详解

    linux下存储时间常见的有两种存储方式,一个是从1970年到现在经过了多少秒,一个是用一个结构来分别存储年月日时分秒的. time_t 这种类型就是用来存储从1970年到现在经过了多少秒,要想更精确 ...

  6. Asp 时间函数详解

    now() 获取当前系统日期和时间,ASP输出可以这样写:<%=now()%> Year(now()) 获取年份, ASP输出:<%=Year(now())%> Month(n ...

  7. Pygame Time时间控制详解

    Pygame Time时间控制详解 pygame.time 时间控制模块,是 Pygame 中使用频率较高的模块,其主要功能是管理时间和游戏帧数率(即 FPS). 时间在游戏开发中承担着非常重要的作用 ...

  8. tomcat线程释放时间_详解tomcat的连接数与线程池

    前言 在使用tomcat时,经常会遇到连接数.线程数之类的配置问题,要真正理解这些概念,必须先了解Tomcat的连接器(Connector). 在前面的文章 详解Tomcat配置文件server.xm ...

  9. python时间函数详解_Python:Numpy库基础分析——详解datetime类型的处理

    原标题:Python:Numpy库基础分析--详解datetime类型的处理 Python:Numpy库基础分析--详解datetime类型的处理 关于时间的处理,Python中自带的处理时间的模块就 ...

最新文章

  1. 4 年开发 43 款软件,这位乡村教师火了
  2. mysql bin的过期时间_Mysql设置binlog过期时间并自动删除
  3. TS流 PS流 ES流
  4. 全球及中国膀胱癌药物行业“十四五”专项规划及市场调研分析报告2021-2027年
  5. python循环套循环_零基础学python 14 循环套循环:循环的嵌套
  6. 黑苹果2k屏_一加手机屏幕新特性:120Hz、2K+ OLED、10bit
  7. Python 编码问题:‘ascii‘ codec can‘t encode characters in position 的解决方案(中文乱码终极解决方案)
  8. VOC和COCO数据集标注格式的介绍
  9. linux 系统硬件信息检测工具,在Linux上查找系统hwinfo硬件信息工具
  10. PowerBuilder快速入门实践
  11. html js 鼠标变图片大小,js之图片变大变小效果
  12. 解析Linux商业应用现状
  13. Python中numpy的np.where()函数
  14. kindle中html笔记,Kindle教程:如何导出笔记
  15. 关于win11兼容性视图模式
  16. Java自定义导出列_后台生成EXCEL文档,自定义列
  17. linux修改tomcat默认的8080端口号
  18. Lunix基础终端控制器操作
  19. http client 短链接复用(变为长连接)
  20. 短片连载(6),记录这段 电脑爱好者 生涯,从四年级开始接触计算机,直到大二寒假决定步入IT行业。...

热门文章

  1. Python 的文件IO相关操作说明
  2. BZOJ3028食物——生成函数+泰勒展开
  3. React中props与state的区别
  4. 前端学习 -- 颜色
  5. Python-统计svn变更代码行数
  6. Javascript基础系列之(六)循环语句(do while循环)
  7. 不常用≠没用 Win7容易忽略的四个功能
  8. php 设置curl不超时时间,curl命令的超时时间
  9. java消费者中url找不到,java – URL可以使用浏览器访问,但是仍然是具有URLConnection的FileNotFoundException...
  10. html 让表格在右侧显示不出来,css中怎么解决表格边框不显示的问题?