Kafka的延迟操作是一个相对独立的组件,他的主要功能是管理延迟操作,底层依赖于Kafka提供的时间轮实现。JDK本身提供的java.util.Timer也可以实现定时任务,但是如果系统请求量巨大,性能要求很高,他们底层所依赖的数据结构存取操作复杂度都是O(nlog(n))

为了将时间复杂度降为o(1),一般会使用其他方式的定时任务组件,比如zookeeper的时间桶方式处理session过期,netty也使用Hash

WheelTimer这种时间轮的实现。

Kafka时间轮的实现是TimingWheel,他是一个存储定时任务的环形队列(桶),底层使用数组实现,数组中每一个元素可以存放一个TimerTaskList对象

TimerTaskList是环形双向链表,在其中链表项TimeTaskEntry封装了真正的定时任务TimerTask。TimerTaskList使用expiration字段记录了整个TimerTaskList的超时时间。TimeTaskEntry中的expirationMs字段记录了超时时间戳,timerTask字段指向了对应的TimerTask任务.

TimerTask中的delayMs记录了任务的延迟时间,timerTaskEntry记录了TimerTaskEntry对象

TimingWheel提供了分层的概念,因为年时间跨度比较大,数量很大,单层的时间轮会造成任务的round很大,单个格子链表很长。一般情况,第一层时间跨度是最小的,第二层时间跨度比较大。

如上图所示:假设编号为0的时间格或者桶保存着到期时间为t,每一个tick的持续时间(tickDuration)为20ms,在这个格子里只能保存着到期时间为[t~t+20]ms的任务,任务到底放在哪一个时间格或者桶里面根据不同的场景可以有不同的算法,假设时间轮的时间格有n个,到期时间为m(ms),那么计算公式m%n = 所在的时间格或者桶,比如n=10,m=34ms,那么他所在桶或者时间格是4

当任务到期时间超出了当前时间所表示的时间范围时,就会尝试加到上一层时间轮,如下图所示:

其中第一层时间轮每个时间格是1ms,整个时间轮跨度是20ms,指针当前时间表示的时间是currentTime,则该时间轮跨度为currentTime

~currentTime+20,只有时间在这段范围内任务才能添加到该层时间轮等待到期。到期时间超出[currentTime~currentTime+20]这个时间范围的任务会尝试添加到上级时间轮中,通过逐层向上级尝试最终找到合适的时间轮层级

整个时间轮表示的时间跨度是不变的,随着指针的不断后移,当前时间轮能处理的时间段也在不断后移,新来的TimerTaskEntry会复用原来的已经到期的TimerTaskList,如下图所示,第一层时间轮跨度始终为20ms,指针表示的时间在不段后移。当指针指向0是时间格的时候,假设currentTime = 100,指向第三个时间格,此时指针表示的时间为当前时间104ms,整个时间轮表示的时间段是[104~ 124],但是该时间轮的时间跨度依然是20ms。此时间轮中编号为2的时间格表示的时间不再是102~103,而是123~124

假设现在有一个任务在445ms后执行,默认情况下,各个层级的时间轮的时间格个数为20,第一层时间轮每一个时间格跨度为1ms,整个时间轮跨度为20ms,跨度不够。第二层时间轮每一个时间格跨度为20ms,整个时间轮跨度为400ms,跨度依然不够,第三层时间轮每一个时间格跨度为400ms,整个时间轮跨度为8000ms,现在跨度够了,此任务就放在第三层时间轮的第一个时间格对应的TimerTaskList,等待被执行,此TimerTaskList到期时间是400ms,随着时间的流逝,当此TimerTaskList到期时,距离该任务到期时间还有45ms,不能执行该任务,我们将重新提交到时间轮,此时第一层时间轮跨度依然不够,不能执行任务,第二层时间轮时间格跨度为20,整个世间轮跨度为400,跨度足够,放在第三个时间格等待执行,如此往复几次,高层时间轮最终会慢慢移动到低层时间轮上,最终任务到期执行。

一 重要属性

buckets : Array.tabulate[TimerTaskList] 类型,其每一个项都对应时间轮中一个时间格,用于保存TimerTaskList的数组

tickMs:Long 当前时间轮中一个时间格表示的时间跨度

wheelSize: Int 当前时间轮的大小也就是总的时间格数量

taskCounter:AtomicInteger 各层级时间轮中任务的总数

startMs:Long 当期时间轮的创建时间

queue:DelayQueue[TimerTaskList] 整个层级的时间轮公用一个任务队列,其元素类型是TimerTaskList

currentTime:时间轮的指针,将整个时间轮划分为到期部分和未到期部分。在初始化的时候,currentTime被修剪成tickMs的倍数startMs - (startMs % tickMs)

interval:Long 当前时间轮的时间跨度即tickMs * wheelSize,当前时间轮只能处理时间范围在currentTime~currentTime+tickMs*WheelSize之间的定时任务,超过这个范围则需要添加任务到上层时间轮

overflowWheel: TimingWheel 上层时间轮的引用

二 核心方法

2.1 addOverflowWheel 主要用于创建上层时间轮

private[this] def addOverflowWheel(): Unit = {
  synchronized {
    if (overflowWheel == null) {
      overflowWheel = new TimingWheel(
        tickMs = interval, // 上层时间轮的时间格跨度等于下一层时间轮的跨度
       
wheelSize = wheelSize,// 大小不变
       
startMs = currentTime,// 初始化当前时间轮的创建时间
       
taskCounter= taskCounter,//时间轮中任务的总数
       
queue
      )
    }
  }
}

2.2 add 向时间轮中添加定时任务,同时也会检测添加的任务是否已经到期

def add(timerTaskEntry: TimerTaskEntry): Boolean = {// 获取定时任务的超时时间戳val expiration = timerTaskEntry.expirationMs// 如果任务已经被取消if (timerTaskEntry.cancelled) {false // 返回添加失败} else if (expiration < currentTime + tickMs) { // 如果时间指针现在指向的时间+时间格跨度 > 需要添加的定时任务的超时时间戳表示已经到期// 举个例子:currentTime=102,时间格跨度为10ms,那么假设添加的任务超时时间戳为105 < 102+10false // 返回添加失败} else if (expiration < currentTime + interval) {// 如果时间指针现在指向的时间+当前时间轮跨度 > 需要添加的定时任务的超时时间戳表示没有到期// 然后把当前任务放进循环数组里面// 得到任务到期时间戳/时间格跨度的余数val virtualId = expiration / tickMs// 获取放在哪一个桶里 (到期时间戳/时间格跨度)%时间轮跨度val bucket = buckets((virtualId % wheelSize.toLong).toInt)bucket.add(timerTaskEntry)// 设置时间格的到期时间if (bucket.setExpiration(virtualId * tickMs)) {/** 整个时间轮表示的跨度是不变的,随着指针的后移,当前时间轮能够处理的时间段也在不段后移,新的TimerTaskEntry会复用原来的已经清理过的* TimerTaskList,此时你需要重新设置TimerTaskList的到期时间,并将桶重新入队*/queue.offer(bucket)}true} else { // 如果超出了时间的跨度范围,则将其添加到上层时间轮来处理if (overflowWheel == null) addOverflowWheel()overflowWheel.add(timerTaskEntry)}
}

2.3 advanceClock 尝试推进当前时间轮的指针,同时也会尝试推进上层时间轮的指针,随着当前时间轮的不断推进,上层时间轮指针早晚会被推进成功

def advanceClock(timeMs: Long): Unit = {// 尝试移动指针currentTimeif (timeMs >= currentTime + tickMs) {currentTime = timeMs - (timeMs % tickMs)// 尝试秃顶上层时间轮指针currentTimeif (overflowWheel != null) overflowWheel.advanceClock(currentTime)}
}

TimingWheel[时间轮]介绍相关推荐

  1. TimingWheel 时间轮详解

    在kafka中,有许多请求并不是立即返回,而且处理完一些异步操作或者等待某些条件达成后才返回,这些请求一般都会带有timeout参数,表示如果timeout时间后服务端还不满足返回的条件,就判定此次请 ...

  2. Netty时间轮调度原理分析,再不了解你就out啦

    一.时间轮介绍 之前公司内部搭建的延迟队列服务有用到时间轮,但是一直没有了解过它的实现原理. 最近有个和支付宝对接的项目,支付宝接口有流量控制,一定的时间内只允许 N 次接口调用,针对一些业务我们需要 ...

  3. 心跳超时时间设置_定时器实现之时间轮算法

    前言 在看这篇文章的时候对其中超时控制一块儿有点好奇.通过时间轮来控制超时?啥是时间轮?怎么控制的?文章会先介绍常见的计时超时处理,再引入时间轮介绍及 netty 在实现时的一些细节,最后总结下实现的 ...

  4. Java时间轮算法的实现

    考虑这样一个场景,现在有5000个任务,要让这5000个任务每隔5分中触发某个操作,怎么去实现这个需求.大部分人首先想到的是使用定时器,但是5000个任务,你就要用5000个定时器,一个定时器就是一个 ...

  5. 【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(下)

    承接上文 承接上一篇文章[算法数据结构专题]「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(上)]我们基本上对层级时间轮算法的基本原理有了一定的认识, ...

  6. Kafka解惑之时间轮(TimingWheel)

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. Kafka中存在大量的延迟操作,比如延 ...

  7. kafka时间轮linux时间轮,Kafka解惑之时间轮 (TimingWheel)

    Kafka中存在大量的延迟操作,比如延迟生产.延迟拉取以及延迟删除等.Kafka并没有使用JDK自带的Timer或者DelayQueue来实现延迟的功能,而是基于时间轮自定义了一个用于实现延迟功能的定 ...

  8. linux时间轮 Timing-Wheel的实现

    过一段时间上传更新自己的心得,以及linux的时间轮实现 现在git上传自己的C++代码 git@github.com:pbymw8iwm/Timing-Wheel.git

  9. 时间轮(TimingWheel)

    一.什么是时间轮 时间轮其实就是一种环形的数据结构,可以想象成时钟,分成很多格子,一个格子代表一段时间(这个时间越短,Timer的精度越高).并用一个双向链表存储放在该格子上的延时任务,同时一个指针随 ...

最新文章

  1. 5G NGC — AF 的 Service information
  2. foreach_and_函数
  3. 一个线程池 bug 引发的 GC 思考!
  4. 服务链路追踪配置mysql_学习微服务的服务链路追踪——Spring Cloud Sleuth+zipkin
  5. 手把手安装flownet2-pytorch
  6. Guava 相关文章
  7. ubuntu 16.04安装redis群集zz
  8. 重磅!吴恩达新书《机器学习训练秘籍》中文版来了(附PDF下载)
  9. awstats的简单配置
  10. Mac UE各版本破解方法
  11. 【LaTeX】LaTeX常见括号总结
  12. DataSheet IFI9486
  13. Windows 2008 Server R2 桌面体验
  14. mysql 签到_签到功能,用mysql还是redis?
  15. 为何iPad 2充电快 但用不了多久?
  16. 使用Beautifulsoup解析网页遇到的问题
  17. Fresh gizmo
  18. 前端——将png图片做成icon
  19. 忍者必须死3突然服务器维修,《忍者必须死3》3月25日停服维护公告
  20. C语言 主动判别int型出界

热门文章

  1. 第三篇:稳定性之借风险之力驱动架构演进
  2. 3-17Pytorch与线性代数运算
  3. lsqnonneg函数_matlab中线性最小二乘问题求解
  4. 用matlab做元胞自动机预测,元胞自动机(Cellular Automata)与城市规划及其MATLAB实现——莆田市城市发展预测...
  5. SWAT 学习相关基础知识(一)---Mr.Zhang
  6. c语言操作access数据类型,2016计算机二级《ACCESS》基本操作题及答案
  7. mysql 表锁的概念_MySQL 锁的一些简单概念
  8. python list大小_4个python常用高阶函数的使用方法
  9. python读取yaml文件
  10. django设置models.Model数据可以为空