来自:z小赵

前言

经过上篇文章的简单实战之后,今天来聊聊生产者将消息从客户端发送到 Broker 上背后发生了哪些故事,看不看由你,但是我保证可以本篇文章你一定可以学到应用背后的一些实质东西。

本文我们从以下 4 个方面来探讨下一条消息如何被准确的发送到 Broker 的 partition 上。

1. 客户端组件

2. 客户端缓存存储模型

3. 确定消息的 partition 位置

4. 发送线程的工作原理


客户端组件

  • KafkaProducer:

KafkaProducer 是一个生产者客户端的进程,通过该对象启动生产者来发送消息。

  • RecordAccumulator:

RecordAccumulator 是一个记录收集器,用于收集客户端发送的消息,并将收集到的消息暂存到客户端缓存中。

  • Sender:

Sender 是一个发送线程,负责读取记录收集器中缓存的批量消息,经过一些中间转换操作,将要发送的数据准备好,然后交由 Selector 进行网络传输。

  • Selector:

Selector 是一个选择器,用于处理网络连接和读写处理,使用网络连接处理客户端上的网络请求。

通过使用以上四大组件即可完成客户端消息的发送工作。消息在网络中传输的方式只能通过二级制的方式,所以首先需要将消息序列化为二进制形式缓存在客户端,kafka 使用了双端队列的方式将消息缓存起来,然后使用发送线程(Sender)读取队列中的消息交给 Selector 进行网络传输发送给服务端(Broker)

主流程

以上为发送消息的主流程,附上部分源码供大家参考,接下来分析下几个非常重要流程的具体实现原理。


客户端缓存存储模型

客户端缓存模型

从上图可以看出,一条消息首先需要确定要被存储到那个 partition 对应的双端队列上;其次,存储消息的双端队列是以批的维度存储的,即 N 条消息组成一批,一批消息最多存储 N 条,超过后则新建一个组来存储新消息;其次,新来的消息总是从左侧写入,即越靠左侧的消息产生的时间越晚;最后,只有当一批消息凑够 N 条后才会发送给 Broker,否则不会发送到 Broker 上。

了解了客户端存储模型后,来探讨下确定消息的 partition(分区)位置?


确定消息的 partition 位置

消息可分为两种,一种是指定了 key 的消息,一种是没有指定 key 的消息。

对于指定了 key 的消息,partition 位置的计算方式为:Utils.murmur2(key) % numPartitions,即先对 key 进行哈希计算,然后在于 partition 个数求余,从而得到该条消息应该被存储在哪个 partition 上。

对于没有指定 key 的消息,partition 位置的计算方式为:采用 round-robin 方式确定 partition 位置,即采用轮询的方式,平均的将消息分布到不同的 partition 上,从而避免某些 partition 数据量过大影响 Broker 和消费端性能。

注意

由于 partition 有主副的区分,此处参与计算的 partition 数量是当前有主 partition 的数量,即如果某个 partition 无主的时候,则此 partition 是不能够进行数据写入的。

稍微解释一下,主副 partition 的机制是为了提高 kafka 系统的容错性的,即当某个 Broker 意外宕机时,在此 Broker 上的主 partition 状态为不可读写时(只有主 partition 可对外提供读写服务,副 partition 只有数据备份的功能),kafka 会从主 partition 对应的 N 个副 partition 中挑选一个,并将其状态改为主 partition,从而继续对外提供读写操作。

消息被确定分配到某个 partition 对应记录收集器(即双端队列)后,接下来,发送线程(Sender)从记录收集器中收集满足条件的批数据发送给 Broker,那么发送线程是如何收集满足条件的批数据的?批数据是按照 partition 维度发送的还是按照 Broker 维度发送数据的?


发送线程的工作原理

Sender 线程的主要工作是收集满足条件的批数据,何为满足条件的批数据?缓存数据是以批维度存储的,当一批数据量达到指定的 N 条时,就满足发送给 Broker 的条件了。

partition 维度和 Broker 维度发送消息模型对比。

模型对比图

从图中可以看出,左侧按照 partition 维度发送消息,每个 partition 都需要和 Broker 建连,总共发生了四次网络连接。而右侧将分布在同一个 Broker 的 partition 按组聚合后在与 Broker 建连,只需要两次网络连接即可。所以 Kafka 选择右侧的方式。

Sender 的主要工作

第一步:扫描记录收集器中满足条件的批数据,然后将 partition -> 批数据映射转换成 BrokerId -> N 批数据的映射。第二步:Sender 线程会为每个 BrokerId 创建一个客户端请求,然后将请求交给 NetWorkClient,由 NetWrokClient 去真正发送网络请求到 Broker。

NetWorkClient 的工作内容

Sender 线程准备好要发送的数据后,交由 NetWorkClient 来进行网络相关操作。主要包括客户端与服务端的建连、发送客户端请求、接受服务端响应。完成如上一系列的工作主要由如下方法完成。

  1. reday()方法。从记录收集器获取准备完毕的节点,并连接所有准备好的节点。

  2. send()方法。为每个节点创建一个客户端请求,然后将请求暂时存到节点对应的 Channel(通道)中。

  3. poll()方法。该方法会真正轮询网络请求,发送请求给服务端节点和接受服务端的响应。


总结

以上,即为生产者客户端的一条消息从生产到发送到 Broker 上的全过程。现在是不是就很清晰了呢?也许有些朋友会比较疑惑它的网络请求模型是什么样的,作者就猜你会你会问,下一篇我们就来扒开它的神秘面纱看看其究竟是怎么实现的,敬请期待。

特别推荐一个分享架构+算法的优质内容,还没关注的小伙伴,可以长按关注一下:长按订阅更多精彩▼如有收获,点个在看,诚挚感谢

Kafka一条消息如何被存储到Broker上?相关推荐

  1. Redis、Kafka 和 Pulsar 消息队列对比

    点击关注公众号,Java干货及时送达 导语 | 市面上有非常多的消息中间件,rabbitMQ.kafka.rocketMQ.pulsar. redis等等,多得令人眼花缭乱.它们到底有什么异同,你应该 ...

  2. Redis、Kafka 和 Pulsar 消息队列对比,写得太好了!

    市面上有非常多的消息中间件,rabbitMQ.kafka.rocketMQ.pulsar. redis等等,多得令人眼花缭乱.它们到底有什么异同,你应该选哪个? 本文尝试通过技术演进的方式,以redi ...

  3. 面试官:请你从架构演进的角度讲讲redis、kafka和 pulsar消息队列

    导语 | 市面上有非常多的消息中间件,rabbitMQ.kafka.rocketMQ.pulsar. redis等等,多得令人眼花缭乱.它们到底有什么异同,你应该选哪个?本文尝试通过技术演进的方式,以 ...

  4. kafka查看broker上主副本_Kafka基础(一):基本概念及生产者、消费者示例

    本文章大部分内容均摘自 朱忠华老师的<深入理解Kafka:核心设计与实践原理>,也特别推荐广大读者购买阅读. 一.概述 1. 简介 Kafka 起初是由 LinkedIn 公司采用 Sca ...

  5. kafka 如何做到1秒发布百万级条消息?

    kafka 如何做到1秒发布百万级条消息 kafak 提供的生产端的API发布消息到一个 topic 或者多个 topic 的一个分区(保证消息的顺序性)或多个分区(并行处理,不能保证消息的顺序性). ...

  6. kafka是如何通过offset定位一条消息的?

    Kafka文件结构 Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic 的.topic 是逻辑上的概念,而 partition 是物理上的概念,每个 ...

  7. Kafka是如何处理Netflix每天2万亿条消息的?

    有些人喜欢使用 HTTP REST APIs,但是他们可能会碰到自身的队列问题;有些人则倾向使用诸如 RabbitMQ 之类旧的消息队列,然而他们不得不考虑扩容和运营等相关问题. 因此以 Kafka ...

  8. Kafka:分布式消息队列的抽象模型

    最基础的分布式队列编程抽象模型是点对点模型,其他抽象构架模型居于改基本模型上各角色的数量和交互变化所导致的不同拓扑图.具体而言,不同数量的发送者.分布式队列以及接收者组合形成了不同的分布式队列编程模型 ...

  9. Go语言如何操纵Kafka才能保证消息不丢?

    背景 目前一些互联网公司会使用消息队列来做核心业务,因为是核心业务,所以对数据的最后一致性比较敏感,如果中间出现数据丢失,就会引来用户的投诉,年底绩效就变成325了.之前和几个朋友聊天,他们的公司都在 ...

最新文章

  1. poj2367拓扑排序模版题
  2. shop++商品搜索出现乱码的解决方法
  3. 3.4 matlab用for语句实现循环结构
  4. shopify在哪里填写html,[Shopify开店教程]添加嵌入代码
  5. java 难题_您可以避免的6种组织成长难题
  6. 华为鸿蒙将比安卓快,任正非透露华为鸿蒙系统:将比安卓速度快60%
  7. linux命令:FTP服务
  8. 泰斯花粉阻隔剂 怎么使用
  9. python内置函数分类_注意 Python 内置函数并不是万能的!
  10. 重写FileUpload控件让它可以显示上传后的文件名
  11. was expecting double-quote to start field name错误
  12. 【thinkphp5.1】htmlentities() expects parameter 1 to be string, array given
  13. 华为服务器维护客服,服务器客服
  14. 提升方法(boosting)详解
  15. Appy Couple:婚礼策划应用 用户个性化定制
  16. 图片验证码识别-自动登录工具开发
  17. echarts树图修改连线样式颜色,树的形状曲线和折线,树图边的曲度
  18. Round 2 A - Cthulhu CodeForces - 103B -图同构,DFS
  19. 用Excel制作不一样的百分比信息图表(2)
  20. ardupilot FFT分析与使用

热门文章

  1. c语言gga字符串校验和代码,NMEA-0183协议解析(示例代码)
  2. python的HTML文件中使用和加载CSS文件
  3. 借助tkinter设计人脸检测的界面(摄像头检测,视频检测,视频检测并保存)
  4. python电影数据分析的代码_python-small-examples
  5. nginx文件服务器html美化,关于nginx:Nginx浏览目录配置及美化
  6. CF660C Hard Process(尺取法)
  7. php正则表达式2,php正则表达式(2)
  8. 征途linux编译错误,征途误事-山外メ雲ジ-ChinaUnix博客
  9. oracle时间相减得到天数_【数列】从错位相减到阿贝尔变换
  10. n维椭球体积公式_混凝土工程量计算规则及公式