3.3.3 消费者拉取线程

消费者拉取管理器在创建拉取线程时,会将表示分区及其分区信息对象的全局partitionMap作为类级别的变量传给每个拉取线程,但每个拉取线程在拉取时实际上只会负责一部分的分区。拉取线程在拉取到分区数据后,需要将拉取结果保存到分区信息的队列中。因为每个拉取线程都持有全局的partitionMap引用,所以processPartitionData()方法在处理拉取结果时,可以获取到分区信息中的队列,并将拉取结果填充到队列中。相关代码如下:

注意:拉取线程的抽象类在拉取工作中也有一个partitionMap变量,但这个变量表示的是:分配给当前消费者的所有分区的拉取状态。它的作用域仅仅在抽象拉取线程中,和消费者拉取线程没有关联。

消费者拉取线程构建拉取请求后,通过Si111pleConsu111er代表和服务端的网络连接。Si111pleConsu111er使用同步类型的阻塞通道发送请求和接收响应。相关代码如下:

消费者和备份剧本的拉取线程在收到拉取消息后处理方式不同,比如备份副本会把数据写到自己本地的日志文件中,消费者则会把数据填充到分区信息对象的队歹lj中供消费者客户端应用程序获取。

注意:除了拉取请求(FetchRequest),Si111pleConsu111er还提供了其他两种请求的发送:提交偏移量的请求(OffsetC0111111itRequest)、获取偏移量的请求(OffsetFetchRequest)。这些方法不仅作为高级A凹的内直实现,也可以提供给低级API进行手动控制。

  1. 分区信息的队列保存拉取的消息

消费者拉取线程的fetch()方法,通过Si111pleConsu111er向服务端发起请求并返回所有分区及其数据(PartitionData),然后处理每个分区的拉取结果。ProcessPartitionData()方法的参数是分区数据的底层消息集,即从服务端拉取到的分区消息对象。它会根据分区得到分区信息对象,调用其enqueue()方法,将消息集包装成数据块(FetchedDataChunk)放入分区信息对象的队列中。相关代码如下:


如图3-20所示,分区信息对象作为消费者应用程序和拉取线程的中间桥梁,保存了“拉垠偏移盘”和“队列”两个重要的信息。拉取偏移量用在拉取钱程中,表示要从分区的什么位置拉取消息,拉取钱程拉取到数据后将拉取结果填充到队列中。回顾一下消费者连接器在一开始创建了队列和消息流时,队列是空的。现在,分区信息对象的队列有数据后,消费者应用程序可以通过消息流从队列中取得数据。

2.拉取出现错误的处理方式

拉取线程向服务端发送拉取请求如果收到UoFFSET_our_oF_RANGE错误码表示扣,取请求的拉取偏移iiJ超H-\服务端分区的范围,拉取线程就要根据消费者设置的重置策略设置拉取偏移盐,并且更新分区的拉取状态。下一次发送拉取请求时,拉取线程使用重茸的偏移盐拉取分区的消息。相关代码如下:


消费者拉取线程拉取消息过程中还可能遇到其他的错误,通常是分区的主副本发生变化,导致拉取线程不能再从之前的节点上读取数据。此时,拉取线程会调用handlePartitionsWithErrOrs()抽象方法进行处理。首先,这个分区不应该继续拉取,所以要将其从拉取状态集合中移除,这样下次拉取请求就不会存在这个错误的分区了。然后,将分区加入到消费者拉取管理器的noleaderPartitionset中,这样LeaderFinderThread就会重新选择分区的主副本,让拉取线程连接最新的节点。相关代码如下:

如图3-21所示,总结从分配分区给消费者,到拉取线程拉取消息返回给消费者的具体步骤如下。

(1)再平衡操作将分区分配给消费者,读取ZK的偏移量作为分区信息的拉取偏移量。
(2)分区信息的队列用来存储结果数据,拉取偏移量作为拉取线程初始的拉取位置。
(3)拉取线程拉取分区的数据,初始时从拉取偏移量开始拉取消息。
(4)partitionMap表示分区的最新拉取状态,每次拉取数据后都要更新拉取状态。
(5)拉取线程创建拉取请求,并通过SiMpleConsL』附r发送请求和接收响应结果。
(6)拉取钱程技取到分区消息后,将分区数据的消息集填充到分区信息对象的队列。
(7)创建消费者连接对象时,会创建队列和消息流,一个队列关联了一个消息流。
(8)消费者客户端从消息流中迭代读取结果数据,实际上就是从队列中拉取消息。

目前为止,虽然拉取线程从服务端成功拉取到了最新消息,并放到分区信息对象的队列里,但是客户端其实“还没有开始读取队列中的消息”。消费者的客户端应用程序需要通过“迭代消息流”,才能从队列中读取IH消息。而只有消费者客户端成功消费到数据,才表示消息已经到达客户端。否则在这之前尽管数据已经在客户端进程中,但是还没有到达客户端应用程序,就不算做被消费,只能说“正在等待被消费”。

3.3.3 消费者拉取线程相关推荐

  1. Consumer消息拉取和消费流程分析

    1. 前言 MQConsumer是RocketMQ提供的消费者接口,从接口定义上可以看到,它主要的功能是订阅感兴趣的Topic.注册消息监听器.启动生产者开始消费消息. ​ 消费者获取消息的模式有两种 ...

  2. 3.1.2 消费者客户端的线程模型

    3.1.2 消费者客户端的线程模型 消费者连接器的createMessageStreams()方法会调用consume()方法,但consume()方法并不真正消费数据,而只是为消费消息做准备工作,具 ...

  3. 4.4.2 将拉取偏移量作为提交偏移量

    4.4.2 将拉取偏移量作为提交偏移量 旧API中,当客户端迭代消费消息时会更新分区信息的已消费偏移量,并且有一个后台线程定时将分区信息的已消费偏移量作为已提交偏移量发送给协调者节点. 新API中,订 ...

  4. RocketMQ源码(十七)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码

    转载来源: RocketMQ源码(19)-Broker处理DefaultMQPushConsumer发起的拉取消息请求源码[一万字]_刘Java的博客-CSDN博客 此前我们学习了RocketMQ源码 ...

  5. RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码【一万字】

    基于RocketMQ release-4.9.3,深入的介绍了Broker处理DefaultMQPushConsumer发起的拉取消息请求源码. 此前我们学习了RocketMQ源码(18)-Defau ...

  6. 如果可以,我想并行消费Kafka拉取的数据库Binlog

    关注 "Java艺术" 我们一起成长! 本篇所述内容只在试用阶段,可行性有待考验! 笔者在上一篇提到:由于Binlog需要顺序消费,所以阿里数据订阅服务DTS只将Binlog放入t ...

  7. rocketmq中的消息拉取及并发消费理解

    消息拉取采用单线程形式,便于消息的顺序拉取 默认批量取32个,出现性能考虑,减少网络请求.不能保证会拉取到32个,因为消息队列中的存放的是topic-queueid对应的索引,会包含多个tag,而消息 ...

  8. kafka consumer配置拉取速度慢_Kafka消费者的使用和原理

    这周我们学习下消费者,仍然还是先从一个消费者的Hello World学起: public class Consumer { public static void main(String[] args) ...

  9. Kafka实战 - 06 Kafka消费者:从指定Topic拉取工单处置记录信息并存入MongoDB数据库

    文章目录 1. 处置记录表 t_disposal_record 2. kafka 主题和消费者配置 3. 定义一个线程任务 KafkaTask 1. kafka Topic中的数据:KafkaDisp ...

最新文章

  1. png图片IDAT块异常
  2. 5年没有工资收入,他如何支撑世界上最大的免费编程社区?
  3. Angular全套知识讲解,错过必悔!
  4. 02-继承的本质-Objective-C基础
  5. 没有写入hosts文件权限
  6. PHP笔记-AES加解密(PHP7)
  7. 电脑位数怎么看_看完了这篇文章你就知道怎么选电脑了...
  8. 中芯国际发布2021Q1财报:55/65纳米工艺依旧为营收主力
  9. Spark 任务参数配置
  10. lay-verify=required 没生效_眼睛一闭一睁,20万没了!|侧翻|交通事故|半挂车|追尾...
  11. java格林威治时间转换_JAVA 格式化格林威治时间(Wed Aug 01 00:00:00 CST 2012)格式转换...
  12. android手机碎片管理,安卓手机如何进行系统碎片整理
  13. Java语言的特点有哪些?有什么作用?
  14. 小型机与PC服务器的区别(phpc)
  15. 南阳理工ACM 题4《ASCII码排序》
  16. echarts实现多y轴情况下多个元素共用一个y轴
  17. 使用CMAKE和交叉编译工具链
  18. Unity 3D游戏-消消乐(三消类)教程和源码
  19. 利用深度学习进行时间序列预测
  20. php运行方式fpm fcgi,php如何从fpm-fcgi切换运行模式到cli

热门文章

  1. RDF查询语言SPARQL
  2. 字符编码在项目中的应用(一)
  3. 基于微信小程序的家政服务预约系统的设计与实现
  4. 在Linux中卸载Refind
  5. 【Arduino】入门篇——火焰报警器
  6. 机器学习入门(3)-——多元回归
  7. 实用epub阅读器分享
  8. 封神台旧靶场-kali系列
  9. php 小时,php - 将秒转换为小时:分钟:秒
  10. 年薪20万招java讲师