AbstractFetcherThread:拉取消息的步骤

副本机制是Kafka实现数据高可靠性的基础:同一个分区下的多个副本分散在不同的Broker机器上,它们保存相同的消息数据以实现高可靠性。那如何确保所有副本上的数据一致性呢?最常见方案当属Leader/Follower备份机制(Leader/Follower Replication)。Kafka分区的:

  • 某个副本会被指定为Leader,负责响应客户端的读、写请求

  • 其他副本自动成为Follower,被动同步Leader副本中的数据

    被动同步:Follower副本不断向Leader副本发送读取请求,以获取Leader处写入的最新消息数据

本文就研究Follower副本如何通过拉取线程实现这一目标。Follower副本在副本同步过程中,还可能发生截断(Truncation),其原理又是为何?

案例

这部分源码贴近底层设计架构原理。阅读它对我实际有啥用?

生产环境曾发现,一旦Broker上副本数过多,Broker内存占用就会很高。HeapDump后,发现在于ReplicaFetcherThread#buildFetch有这么一行代码:

val builder = fetchSessionHandler.newBuilder()

内部会实例化一个LinkedHashMap。若分区数很多,该Map会被扩容数次,带来大量不必要的数据拷贝,既增加内存Footprint,又浪费CPU。后续通过将负载转移到其他Broker解决该问题。

Kafka社区也发现了这个Bug,所以现在变成:

修改后语句直接传入FETCH请求中总的分区数,并直接将其传给LinkedHashMap,避免再执行扩容。

说回Follower副本从Leader副本拉取数据。Kafka就是通过ReplicaFetcherThread,副本获取线程实现的消息拉取及处理。

本文先从抽象基类AbstractFetcherThread研究,最终彻底搞明白Follower端同步Leader端消息的原理。

AbstractFetcherThread

抽象类,从Broker获取多个分区的消息数据,至于获取之后如何对这些数据进行处理,则交由子类来实现。

类定义及字段

除了构造器的这几个字段,AbstractFetcherThread还定义了两个type类型。关键字type定义一个类型,可当做一个快捷方式,如FetchData:

type FetchData = FetchResponse.PartitionData[Records]

类似快捷方式:凡源码用到FetchResponse.PartitionData[Records],都可使用FetchData替换,EpochData同理。

FetchData定义里的PartitionData类型,是客户端clients工程中FetchResponse类的嵌套类。FetchResponse类封装的是FETCH请求的Response对象,其内PartitionData是个POJO,保存Response中单个分区数据拉取的各项数据:

  • 从该分区的Leader副本拉取回来的消息

  • 该分区的高水位值

  • 日志起始位移值

在PartitionData中,最需关注的是recordSet,保存了实际的消息集合。

  • 注意到EpochData定义位置,它也是PartitionData类型,但EpochData的PartitionData是OffsetsForLeaderEpochRequest的PartitionData类型

    Kafka源码有很多名为PartitionData的嵌套类。很多请求类型中的数据都是按分区层级分组,因此源码很自然地在这些请求类中创建同名嵌套类。所以,注意区分PartitionData嵌套类是定义在哪类请求中的!

分区读取状态类

AbstractFetcherThread构造器中,还有个**PartitionStates[PartitionFetchState]**类型的字段:

  • 泛型参数类型PartitionFetchState类,表征分区读取状态,保存分区的已读取位移值和对应副本状态。

这里的状态有二:

副本读取状态

副本读取状态由ReplicaState接口表示:

分区读取状态:

  • 可获取,表明副本获取线程当前能够读取数据。
  • 截断中,表明分区副本正在执行截断操作(比如该副本刚刚成为Follower副本)。
  • 被推迟,表明副本获取线程获取数据时出现错误,需要等待一段时间后重试。

分区读取状态中的【可获取、截断中】与副本读取状态的【获取中、截断中】并非严格对应。副本读取状态处获取中,并不一定表示分区读取状态就是可获取状态。对于分区,它是否能被获取的条件要比副本严格。

副本获取线程做的事情,日志截断和消息获取:

  • isReplicaInSync,副本限流,出镜率不高

  • isDelayed,判断是否需要推迟获取对应分区的消息

    源码会不断调整那些不需要推迟的分区的读取顺序,以保证读取公平性。公平性实现在partitionStates字段的PartitionStates类,定义在clients工程。会接收一组要读取的主题分区,然后轮询读取这些分区以确保公平性。

clients端源码自行查阅。

public class PartitionStates<S> {private final LinkedHashMap<TopicPartition, S> map = new LinkedHashMap<>();......public void updateAndMoveToEnd(TopicPartition topicPartition, S state) {map.remove(topicPartition);map.put(topicPartition, state);updateSize();}......
}

PartitionStates轮询处理要读取的多个分区,依靠LinkedHashMap保存所有主题分区,其元素有明确迭代顺序,默认为元素插入的顺序。

假设Kafka要读5个分区的消息:A、B、C、D和E。若插入顺序:ABCDE,则首先读分区A。一旦A被读取后,为确保各分区都有同等机会被读取,代码需将A插入到分区列表的最后一位,这就是updateAndMoveToEnd:把A从map中移除,再插回去,这样A自然就处于列表的最后一位了。这便是PartitionStates的作用。

core API

processPartitionData、truncate、buildFetch和doWork,涵盖拉取线程所做的最重要的3件事:

  • 构建FETCH请求

  • 执行截断操作

  • 处理拉取后的结果

doWork串联起前面的这3方法。

最重要的processPartitionData,用于处理读取回来的消息集合。它是个抽象方法,因此需子类实现它的逻辑。具体到Follower副本而言, 由ReplicaFetcherThread类实现:

protected def processPartitionData(topicPartition: TopicPartition,  // 读取哪个分区的数据fetchOffset: Long,               // 读取到的最新位移值partitionData: FetchData         // 读取到的分区消息数据
): Option[LogAppendInfo]           // 写入已读取消息数据前的元数据

返回值Option[LogAppendInfo]:

  • 对Follower副本读消息写入日志,可忽略Option,因为肯定会返回具体LogAppendInfo实例,而不是None
  • LogAppendInfo类封装了很多消息数据被写入到日志前的重要元数据信息,如首条消息的位移值、最后一条消息位移值、最大时间戳等

truncate

protected def truncate(topicPartition: TopicPartition, // 要对哪个分区下副本执行截断操作truncationState: OffsetTruncationState  // Offset + 截断状态
): Unit

OffsetTruncationState类告诉Kafka要把指定分区下副本截断到哪个位移值,封装了:

  • 一个位移值

  • 一个截断完成与否的布尔值状态

buildFetch

protected def buildFetch(// 一组要读取的分区列表// 分区是否可读取取决于PartitionFetchState中的状态partitionMap: Map[TopicPartition, PartitionFetchState]):
// 封装FetchRequest.Builder对象
ResultWithPartitions[Option[ReplicaFetch]]

本质为指定分区构建对应FetchRequest.Builder对象,而该对象是构建FetchRequest的核心组件。Kafka中任何类型的消息读取,都是通过给指定Broker发送FetchRequest请求来完成的。

doWork

串联前面3个方法的主要入口方法。

总结

本文研究Kafka的副本同步机制和副本管理器组件。Kafka副本间的消息同步依赖ReplicaFetcherThread线程。AbstractFetcherThread作为拉取线程的公共基类,AbstractFetcherThread类定义了很多重要方法。

  • AbstractFetcherThread类:拉取线程的抽象基类。它定义了公共方法处理所有拉取线程的共同逻辑,如执行截断操作,获取消息。

  • 拉取线程逻辑:循环执行截断操作和获取数据操作。

  • 分区读取状态:当前,源码定义了3类分区读取状态。拉取线程只能拉取处于可读取状态的分区的数据

美团二面:详细说说Kafka拉消息的过程?相关推荐

  1. 大数据之超级详细的KafKa集群搭建过程

    大数据 大数据之超级详细的KafKa集群搭建过程 文章目录 大数据 大数据之超级详细的KafKa集群搭建过程 前言 一.Kafka集群配置方法 1.1 将Kafka的安装包上传到虚拟机,并解压(三个虚 ...

  2. 【kafka连载二】windows测试kafka的消息收发

    前提 系统已启动zookeeper和kafka服务,若没有启动请执行以下操作 启动zookeeper CMD窗口进入zookeeper的bin子目录执行.\zkServer.cmd 启动kafka C ...

  3. 【kafka】消息队列设计精要

    1.概述 转载:消息队列设计精要 好文章,建议大家去看原文. 消息队列已经逐渐成为企业IT系统内部通信的核心手段.它具有低耦合.可靠投递.广播.流量控制.最终一致性等一系列功能,成为异步RPC的主要手 ...

  4. 骑士卡:基于Kafka搭建消息中心,上亿消息推送轻松完成

    全球购骑士卡是国内领先的会员制特权电商平台,汇聚国内外"吃喝玩乐买"超 300 项会员专属优惠特权.全球购骑士卡基于移动互联生活方式,打通线上.线下消费场景,汇集时下热门.高频的商 ...

  5. 2022年春招美团二面总结 凉经

    注:部分图片来自网络,如侵必删! 美团二面总结 自我介绍:在这次自我介绍中,不仅是心态上,还是语言表达上,都比一面时要进步很多. 你给我介绍一下你的项目吧 这次面试过程中,面试官倒是直接提示:结合你的 ...

  6. Kafka的消息格式

    Commit Log Kafka储存消息的文件被它叫做log,按照Kafka文档的说法是: Each partition is an ordered, immutable sequence of me ...

  7. 大数据笔记(三十二)——SparkStreaming集成Kafka与Flume

    三.集成:数据源 1.Apache Kafka:一种高吞吐量的分布式发布订阅消息系统 (1) (*)消息的类型 Topic:主题(相当于:广播) Queue:队列(相当于:点对点) (*)常见的消息系 ...

  8. Kafka生成消息时的3种分区策略

    本文分享自华为云社区<Kafka生产者3种分区分配策略>,作者:石臻臻的杂货铺. Kafka Producer在发送消息的时候,需要指定发送到哪个分区, 那么这个分区策略都有哪些呢?我们今 ...

  9. Kafka 分布式消息队列介绍

    Kafka 分布式消息队列 类似产品有JBoss.MQ 一.由Linkedln 开源,使用scala开发,有如下几个特点: (1)高吞吐 (2)分布式 (3)支持多语言客户端 (C++.Java) 二 ...

最新文章

  1. linux “大脏牛”漏洞分析(CVE-2017-1000405)
  2. 收发电子邮件属于计算机在方面的应用,计算机应用基础复习题(供参考).doc
  3. Nginx —— ngx_http_core_module 模块提供的变量
  4. [译] 理解数组在 PHP 内部的实现(给PHP开发者的PHP源码-第四部分)
  5. boost::mp11::mp_insert_c相关用法的测试程序
  6. 【转】Ubuntu 16.04 远程桌面
  7. 吊炸天的 Kafka 图形化工具 Eagle,必须推荐给你!
  8. ECShop 自定义函数以及调用
  9. linux下定时执行sh
  10. 创业指南:如何快速拿到天使投资?
  11. 六爪机器人_六爪机器人
  12. 电子邮件的地址格式是怎样的?请说明各部分的意思。
  13. java获取本地磁盘文件_java如何读取本地磁盘目录下的所有文件或者文件夹
  14. nand flash基础——读写擦操作
  15. linux qemu-nbd介绍
  16. WinForm的控件二次开发
  17. 雨天-一万个理由-LRC歌词下载
  18. Pytorch生成对抗网络(GAN)官方入门教程
  19. 纯js实现在线文字识别,从图片中提取文本信息
  20. 经典量化选股方法——没有秘密的多因子

热门文章

  1. 浏览器缓存——HTTP缓存
  2. mysql里字典是什么意思_解析MySQL数据字典中的一些疑问
  3. 中位数(又称中值,英语:Median)
  4. 深入浅出Java中参数传递的原理
  5. OSChina 周四乱弹 —— 十大炒股禁忌
  6. jupyterlab教程
  7. 利用附加调频测向-多普勒法
  8. 计算机进制、原码、反码、补码、移码相关知识
  9. 南京市秦淮区税务局走访云创,并在央视频进行宣传报道
  10. 是否可以手动调用析构函数