如果在使用生产者客户端发送消息的时候将acks参数设置为-1,那么就意味着需要等待ISR 集合中的所有副本都确认收到消息之后才能正确地收到响应的结果,或者捕获超时异常。

那么这里等待消息写入follower副本井返回相应的响应结果给生产者客户端的动作是由谁来执行的呢?在将消息写入leader副本的本地日志文件之后,Kafka会创建一个延时的生产操作(DelayedProduce),用来处理消息正常写入所有副本或超时的情况,以返回相应的响应结果给生产者客户端。

延时消息分为延时生产消息延时拉取消息

1. 延时生产消息

生产者客户端生产消息的时候,消息写入leader副本,如果acks参数设置为-1,Kafka会创建一个延时的生产操作,并将延时生产操作加入延时操作管理器,触发事件事件包括外部事件触发超时触发

1.1 外部事件触发 – HW增长

介绍HW之前,先介绍一下LEO – Log End Offse,它标识当前日志文件中下一条待写入消息的offset,分区ISR集合中的每个副本都会维护自身的LEO。HW(高水位标记)对应的是ISR集合中的最小LEO,所以HW增长则表示分区中多数副本已经拉取到了最新消息,此时延时生产操作便被触发,返回成功给生产者客户端。

1.2 超时触发

超时比较容易理解,当超过指定时间,HW未发生变化,则认为消息仍未被follower拉取,便同样触发延时生产操作,只不过这里返回错误给生产者客户端。

2. 延时拉取消息

延时拉取操作同样由延时操作管理器管理,触发事件包括外部事件触发超时触发

2.1 外部事件触发

延时拉取操作的外部事件不同于延时生产操作,因为拉取操作的请求方可能是消费者客户端和follower副本,所以需要考虑两种场景。

  • 消费者客户端拉取:
    当消费者客户端发出延时拉取操作时,broker并不是立即返回,而是将延时拉取操作添加到延时操作管理器管理,并等待外部事件(HW增长)的到来。随着生产者客户端消息的写入,HW增长,延时拉取操作被触发,broker随即返回成功给消费者客户端。

  • follower副本拉取:
    follower副本通过从leader副本消费消息的方式保持和leader副本的消息同步。当follower副本和leader副本的消息保持一致,而leader副本又无新消息写入时,follower副本向leader副本发出拉取操作将无意义,并且消耗资源,显然不合理。此时leader副本是如何做的呢?

    原来,Kafka在处理拉取请求时,会先读取一次日志文件,如果收集不到足够多的消息,那么就会创建一个延时拉取操作以等待拉取到足够数量的消息。延时操作创建完成同样被添加到延时操作管理器,等待外部事件(leader副本的日志文件新增消息)到来。随着生产者客户端消息的写入,消息追加到leader副本的日志文件,延时拉取操作被触发,此时,会再读取一次日志文件,然后将拉取结果返回给follower副本。

2.2 超时触发

超时触发比较简单,就是等到超时时间触发第二次读取日志文件的操作。

3. 延时消息实现原理

Kafka中存在大量的延时操作,比如延时生产、延时拉取和延时删除等。Kafka 并没有使用JDK自带的Timer 或DelayQueue来实现延时的功能,而是基于时间轮的概念自定义实现了一个用于延时功能的定时器(SystemTimer)。JDK中Timer和DelayQueue的插入和删除操作的平均时间复杂度为O (nlogn) 并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)。

3.1 时间轮

时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList见下图) 。TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务。下图是一个包括三层结构的时间轮,中心的tickMs、wheelSize、interval分别表示该层的时间的刻度、格数、总跨度。三层时间轮的时间区间分别是[0,20)、[20,400)、[400,160000)。

举个例子,比如现在有一个450ms的任务,那么,该任务最终将被插入第三层时间轮中时间格0(时间区间[400,800))的TimerTaskList。注意到在该时间格内的可能存在多个任务(比如446ms、455ms和473ms的定时任务),时间格0对应的TimerTaskList的超时时间expired为400ms。随着时间的流逝,当此TimerTaskList到期之时,原本定时为450ms的任务还剩下50ms的时间,还不能执行这个任务的到期操作。这里就有一个时间轮降级的操作,会将这个剩余时间为50ms的定时任务重新提交到层级时间轮中,此时,该任务被放到第二层时间轮中时间格1到期时间为[40ms, 60ms)的时间格中。再经历40ms之后,此时这个任务又被“ 察觉”,过还剩余10ms,还是不能立即执行到期操作。 所以还要再有一次时间轮的降级,此任务被添加到第一层时间轮的时间格10(时间区间[10ms,11ms))中,之后再经历10ms后,此任务真正到期,最终执行相应的到期操作。

时间轮的设计源于生活,生活中常见的钟表就是一个三层结构的时间轮。第一层:秒针(tickMs=1s、wheelSize=60、interval=1min);第二层:分针(tickMs=1min、wheelSize=60、interval=1hour);第三层:时针(tickMs=1hour、wheelSize=12、interval=12hour);

3.2 任务推进

有了定时任务,Kafka是如何来推进这些任务的呢?Kafka中的定时器借了JDK中的DelayQueue1 来协助推进时间轮。 具体做法是:

  • 对于每个使用到的TimerTaskList 调用delayQueue.offer加入DelayQueue,超时时间为TimerTaskList对应的expired;
  • DelayQueue会根据TimerTaskList 对应的超时时间expiration来排序, 最短expiration 的TimerTaskList会被排在DelayQueue的队头。
  • Kafka 中会有一个线程通过调用delayQueue.take来获取DelayQueue中到期的任务列表,这个线程叫作“ExpiredOperationReaper”,可以直译为“过期操作收割机”。
  • 对获取到的任务列表,执行具体的任务。

  1. DelayQueue 不再在这里介绍,可参考大佬文章:Java多线程进阶(三六)—— J.U.C之collections框架:DelayQueue ↩︎

深入Kafka-延时消息相关推荐

  1. 计算发送延时与传播延迟_消息队列——延时消息应用解析及实践

    简介:在大部分场景下业务系统如果只需要实现异步解耦.削峰填谷等能力,常规的普通消息就可以满足此类需求.除此之外,在某些特殊的业务场景中,普通消息类型存在无法满足需求的情况.这就需要消息队列服务本身支持 ...

  2. 延时消息_手把手实现一条延时消息

    前言 近期在维护公司的调度平台,其中有个关键功能那就是定时任务:定时任务大家平时肯定接触的不少,比如 JDK 中的 Timer.ScheduledExecutorService.调度框架 Quartz ...

  3. 手把手实现一条延时消息

    近期在维护公司的调度平台,其中有个关键功能那就是定时任务:定时任务大家平时肯定接触的不少,比如 JDK 中的 Timer.ScheduledExecutorService.调度框架 Quartz 等. ...

  4. Kafka 的消息异常情况~追日

    Kafka 的消息异常情况 1.消息丢失情况 消息发送端 Producer (1) acks=0: 表示 Producer 不需要等待任何 broker 确认收到消息的回复, 就可以继续发送下一条消息 ...

  5. java开发中常见的延时消息解决方案

    前言 延时消息(定时消息)指的在分布式异步消息场景下,生产端发送一条消息,希望在指定延时或者指定时间点被消费端消费到,而不是立刻被消费. 延时消息适用的业务场景非常的广泛,在分布式系统环境下,延时消息 ...

  6. 延时消息常见实现方案

    前言 延时消息(定时消息)指的在分布式异步消息场景下,生产端发送一条消息,希望在指定延时或者指定时间点被消费端消费到,而不是立刻被消费. 延时消息适用的业务场景非常的广泛,在分布式系统环境下,延时消息 ...

  7. 实现延时消息的6种方案

    延时消息(定时消息)指的在分布式异步消息场景下,生产端发送一条消息,希望在指定延时或者指定时间点被消费端消费到,而不是立刻被消费. 延时消息适用的业务场景非常的广泛,在分布式系统环境下,延时消息的功能 ...

  8. 骑士卡:基于Kafka搭建消息中心,上亿消息推送轻松完成

    全球购骑士卡是国内领先的会员制特权电商平台,汇聚国内外"吃喝玩乐买"超 300 项会员专属优惠特权.全球购骑士卡基于移动互联生活方式,打通线上.线下消费场景,汇集时下热门.高频的商 ...

  9. 构建企业级业务高可用的延时消息中台

    来自:架构之美 1.业务场景剖析 公司业务系统(比如:电商系统)中有大量涉及定时任务的业务场景,例如:实现买卖双方在线沟通的IM系统,为了确保接收方能够收到消息,服务端一般都会有重试策略,即服务端在消 ...

  10. rocktmq 消息延时清空_使用Kotlin+RocketMQ实现延时消息的示例代码

    一. 延时消息 延时消息是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 使用延时消息的典型场景,例如: 在电商系统中,用户下完订单30分钟内没支付, ...

最新文章

  1. 信息学奥赛一本通(C++)在线评测系统——基础(一)C++语言——1091:求阶乘的和
  2. HWM和delete,drop,truncate的关系
  3. oracle 月累计,oracle 可有什么方法 统计每月累积购买人数?
  4. win10如何提高电脑画质_win10电脑怎么提高画质 | 手游网游页游攻略大全
  5. X64-CL iPro 采集卡OC-64EO-IPRO0简介
  6. 如何获取foreach循环当前迭代的索引?
  7. Ubuntu安装jdk10
  8. 【优化分类】基于matlab GA优化GRNN超参数分类【含Matlab源码 1399期】
  9. 魔兽世界服务端开服架设服务器搭建教程Centos系统
  10. js html 测反应速度游戏,利用JS测试目标网站的打开响应速度
  11. 困境下的SEO,站长如何自渡?
  12. 点餐APP 冲刺一总结
  13. IDEA+Java控制台实现宠物管理系统
  14. Android 11.0 手动安装Persistent app失败的解决方案
  15. 一种求周期二元线性序列的极小多项式的方法
  16. arpspoof实现内网欺骗
  17. 如何向妈妈解释什么是爬虫
  18. prism EventAggregator(事件聚合器)
  19. 二分答案(超级详细)
  20. pwscf与wannier90 Hands-On实战训练(一)——费米面计算为例

热门文章

  1. 渗透——目录扫描神器DirBuster用法
  2. [附源码]java毕业设计高校新生报到管理系统
  3. 大学生软件课程设计之新生报到管理系统
  4. android 文本表情,把文本内容变为表情包,Android开发还可以这样 玩?!
  5. 外部浏览器打开微信小程序指定页面
  6. Linux下更高级的网络配置(网络桥接、bond及team网络接口的配置)
  7. 优化问题中:如何得到对偶目标函数
  8. 自动计算税金 CALCULATE_TAX_FROM_GROSSAMOUNT
  9. Eclipse Kepler (4.3.2) SR2 安装 Web Tools Platform (WTP)
  10. GBase 8s(GDCA)课堂练习题及答案总结