作者:未完成交响曲,资深Java工程师!目前在某一线互联网公司任职,架构师社区合伙人!

一、基本概念

1.体系架构

  • Producer:生产者

  • Consumber:消费者

  • Broker:服务代理节点(kafka实例)

2.消息存储

  • 主题(Topic):kafka消息以topic为单位进行归类,逻辑概念

  • 分区(Partition):

    • Topic-Partition为一对多

    • 分区在存储层面可看做是一个可追加的日志文件

    • 消息在追加到分区时会分配一个特定的偏移量(offset)作为在此分区的唯一标示

    • kafka通过offset保证消息在分区内的顺序性,但只保证分区有序而不保证主题有序

    • 每条消息发送到broker前,会根据分区规则分配到具体的哪个分区

3.容灾设计

  • 多副本机制(Replica):

    • 一个分区会在多个副本中保存相同的消息

    • 副本之间是一主多从关系

    • leader副本负责读写操作,follower副本只负责同步消息(主动拉取)

    • leader副本故障时,从follower副本重新选举新leader

同步状态

  • 分区中所有副本统称为 AR(Assigned Replicas)

  • 所有与leader副本保持一定程度同步的副本(包括leader)组成 ISR(In-Sync Replicas)

  • 同步之后过多的副本组成 OSR(Out-of-Sync Replicas)

特殊偏移量

  • LEO(Log End Offset):标识当前分区下一条代写入消息的offset

  • HW(High Watermark):高水位,标识了一个特定的offset,消费者只能拉渠道这个offset之前的消息(不含HW)

  • 所有副本都同步了的消息才能被消费,HW的位置取决于所有follower中同步最慢的分区的offset

二、生产者

1.客户端开发

  • 消息发送步骤

    • 配置生产者客户端参数及创建相应的生产者实例

    • Properties

    • KafkaProducer

    • 构建待发送的消息:ProducerRecord

    • 发送消息:send( ),flush( )

    • 关闭生产者实例:close( )

  • 必要参数配置:

    • bootstrap.servers:设置kafka集群地址,并非需要所有broker地址,因为生产者会从给定的broker中获取其他broker信息

    • key.serializer、value.serializer:转换字节数组到所需对象的序列化器,填写全限类名

  • 发送模式

    • 发后即忘(fire-and-forget):只管往kafka发送而不关心消息是否正确到达,不对发送结果进行判断处理;

    • 同步(sync):KafkaProducer.send()返回的是一个Future对象,使用Future.get()来阻塞获取任务发送的结果,来对发送结果进行相应的处理;

    • 异步(async):向send()返回的Future对象注册一个Callback回调函数,来实现异步的发送确认逻辑。

  • 拦截器

    • 实现ProducerInterceptor接口,在消息发送的不同阶段调用

    • configure():完成生产者配置时

    • onSend():调用send()后,消息序列化和计算分区之前

    • onAcknowledgement():消息被应答之前或消息发送失败时

    • close():生产者关闭时

    • 通过 interceptor.classes 配置指定

  • 序列化

    • 自定义序列化器:实现Serializer接口

  • 分区器

    • 在消息发送到kafka前,需要先计算出分区号,默认使用DefaultPartitioner(采用MurmurHash2算法)

    • 自定义分区器:实现Partitioner接口

    • 通过partitioner.class配置指定

2.原理分析

整体架构

  • 主线程KafkaProducer创建消息,通过可能的拦截器、序列化器和分区器之后缓存到消息累加器(RecordAccumulatro)

  • 消息在RecordAccumulator被包装成ProducerBatch,以便Sender线程可以批量发送,缓存的消息发送过慢时,send()方法会被阻塞或抛异常

  • 缓存的大小通过buffer.memory配置,阻塞时间通过max.block.ms配置

  • Kafka生产者客户端中,通过ByteBuffer实现消息内存的创建和释放,而RecordAccumulator内部有一个BufferPool用来实现ByteBuffer的复用

  • Sender从RecordAccumulator中获取缓存的消息后,将ProducerBatch按Node分组,Node代表broker节点。也就是说sender只向具体broker节点发送消息,而不关注属于哪个分区,这里是应用逻辑层面到网络层面的转换

  • Sender发往Kafka前,还会保存到InFlightRequests中,其主要作用是缓存已经发出去但还没收到相应的请求,也是以Node分组。

  • 每个连接最大缓存未响应的请求数通过max.in.flight.requests.per.connection配置(默认5)

元数据的更新

  • InFlightRequests可以获得leastLoadedNode,即所有Node中负载最小的。leastLoadedNode一般用于元数据请求、消费者组播协议等交互。

  • 当客户端中没有需要使用的元数据信息或唱过metadata.max.age.ms没有更新元数据时,就会引起元数据更新操作。

3.重要的生产者参数

  • acks:用来指定分区中有多少个副本收到这条消息,生产者才认为写入成功(默认”1")

  • “1":leader写入即成功、“0”:不需要等待服务端相应、”-1”/“all":ISR所有副本都写入才收到响应

  • max.request.size:限制生产者客户端能发送的消息的最大值(默认1048576,即1m)

  • retries、retry.backoff.ms:生产者重试次数(默认0)和两次重试之间的间隔(默认100)

  • compression.type:消息压缩方式,可配置为”gzip”、”snappy”、”lz4”(默认”none”)

  • connections.max.idle.ms:多久后关闭闲置的连接(默认540000,9分钟)

  • linger.ms:生产者发送ProducerBatch等待更多消息加入的时间(默认为0)

  • receive.buffer.bytes:Socket接收消息缓冲区的大小(默认32768,32k)

  • send.buffer.bytes:Socket发送消息缓冲区的大小(默认131072,128k)

  • request.timeout.ms:Producer等待请求响应的最长时间(默认30000ms),这个值需要比broker参数replica.lag.time.max.ms大

三、消费者

1.消费者与消费组

  • 每个分区只能被一个消费组的一个消费者消费

  • 消费者数大于分区数时,会有消费者分配不到分区而无法消费任何消息

  • 消费者并非逻辑上的概念,它是实际的应用实例,它可以是一个钱程,也可以是一个进程。

2.客户端开发

  • 消费步骤

    • 配置消费者客户端参数及创建KafkaConsumer实例

    • 订阅主题

    • 拉取消息并消费

    • 提交消费位移

    • 关闭实例

  • 必要的参数配置

    • bootstrap.servers:集群broker地址清单

    • group.id:消费组名称

    • key.deserializer、value.deserializer`:反序列化器

  • 订阅主题和分区

    • subscribe():订阅主题

    • assign():订阅指定主题分区

    • 通过partitionFor()方法先获取分区列表

    • unsubscribe():取消订阅

  • 消息消费

    • poll():返回的是所订阅的主题(分区)上的一组消息,可设定timeout参数来控制阻塞时间

  • 位移提交

    • 提交的offset为 lastConsumedOffset + 1

    • lastConsumedOffset:上一次poll拉取到的最后一条消息的offset

  • 控制或关闭消费

    • pause()、resume():暂停和恢复某分区的消费

  • 指定位移消费

    • seek():指定offset消费

    • beginingOffsets(),endOffsetes(),offstesForTimes():获取开头、末尾或指定时间的offset

    • seekToBeginning、seekToEnd():从开头、末尾开始消费

  • 再均衡

    • 在subcribe()时,可以注册一个实现ConsumerRebalanceListener接口的监听器

    • onPartionsRevoked():消费者停止读取消息之后,再均衡开始之前

    • onPartitionsAssigned():重新分配分区后,开始读取消费前

  • 拦截器

    • 实现ConsumerInterceptor接口

    • poll()返回之前,会调用onConsume()方法,提交完offset后会调用onCommit()方法

  • 多线程实现

    • KafkaProducer是线程安全的,但KafkaConsumer是非线程安全的,acquire()方法可检测当前是否只有一个线程在操作,否则抛出异常

    • 推荐使用单线程消费,而消息处理用多线程

3.重要的消费者参数

  • fetch.min.bytes:一次请求能拉取的最小数据量(默认1b)

  • fetch.max.bytes:一次请求能拉取的最大数据量(默认52428800b,50m)

  • fetch.max.wait.ms:与min.bytes有关,指定kafka拉取时的等待时间(默认500ms)

  • max.partition.fetch.bytes:从每个分区里返回Consumer的最大数据量(默认1048576b,1m)

  • max.poll.records:一次请求拉取的最大消息数(默认500)

  • connections.max.idle.ms:多久后关闭闲置连接,默认(540000,9分钟)

  • receive.buffer.bytes:Socket接收消息缓冲区的大小(默认65536,64k)

  • send.buffer.bytes:Socket发送消息缓冲区的大小(默认131072,128k)

  • request.timeout.ms:Consumer等待请求响应的最长时间(默认30000ms)

  • metadata.max.age.ms:元数据过期时间(默认30000,5分钟)

  • reconnect.backoff.ms:尝试重新连接指定主机前的等待时间(默认50ms)

  • retry.backoff.ms:尝试重新发送失败请求到指定主题分区的等待时间(默认100ms)

  • isolation.level:消费者的事务隔离级别(具体查看进阶篇:事务)

四、主题与分区

1.主题的管理

  • 创建

    • broker设置auto.create.topics.enable=true时,生产者发送消息时会自动创建分区数为num.partitions(默认1),副本因子为default.replication.factor(默认1)的主题

    • 通过kafka-topics.sh创建:create指令

      • kafka-topics.sh --zookeeper <zkpath> --create --topic <topic> --partitions <N> --replication-factor <N>

      • 手动分配副本:--replica-assignment

        • --replica-assignment 2:0:1,1:2:0,0:1:2

        • partion1 AR:2,0,1

        • partion2 AR:1:2:0

        • partion3 AR:0:1:2

    • 设定参数:--config <key=value>

  • 分区副本的分配

    • 使用kafka-topics.sh创建主题内部分配逻辑按机架信息划分两种策略:

      • 未指定机架信息分配策略:assignReplicasToBrokersRackUnaware()方法

      • 指定机架分配策略:assignReplicasToBrokersRackAware()方法

    • 当创建一个主题时,不管用什么方式,实质上是在zk的/broker/topics节点下创建与该主题对应的子节点并写入分区副本分配方案,并且在/config/topics节点下创建与该主题相关的子节点并写入主题配置信息

  • 查看:kafka-topics.sh脚本的 list、describe指令

  • 修改:kafka-topics.sh脚本的 alter指令

  • 配置管理:kafka-configs.sh脚本

  • 删除:kafka-topics.sh脚本的 delete指令

2.初识KafkaAdminClient

  • KafkaAdminClient可实现以调用API的方式对Kafka进行管理

  • 主题合法性

    • 通过KafkaAdminClient创建主题可能不符合规范,可以在broker端设置create.topic.policy.class.name来指定一个类验证主题创建时的合法性,这个类需要实现ClreateTopicPolicy接口,放入Kafka源码,并重新编译

3.分区的管理

  • 优先副本(preferred replica/preferred leader)

    • 优先副本即 AR 集合中的第一个副本,当分区leader出现故障时,会直接使用优先副本作为新的leader

    • kafka-perferred-replica-election.sh可进行优先副本选举操作

  • 分区重分配

    • 解决问题:

    • 将某节点上的分区副本迁移至其他节点:宕机迁移失效副本、有计划下线节点迁移副本

    • 注意,下线前最好先关闭或重启此broker,保证不是leader节点,减少了节点间流量复制

    • 向新增节点分配原有主题分区副本

    • 集群中新增节点时,只有新创建的主题分区才有可能分配到新节点上,需要把老主体的分区分配到新节点上

    • 可使用kafka-reassign-partitions.sh脚本

  • 复制限流

    • 数据复制会占用额外的资源,如果重分配的量太大必然会严重影响整体的性能。可以通过对副本间的复制流量加以限制来保证重分配期间整体服务不会受太大的影响,可分别限制follower副本复制速度和leader副本传输速度

    • 通过kafka-config.sh或 kafka-reassign-partitions.sh配置

    • broker级别:follower/leader.replication.throttled.rate=N

    • topic级别:follower/leader.replication.throttled.replicas=N

    • 分区重分配过程中的临时限流策略

    • 原AR会应用leader限流配置

    • 分区移动的目的地会应用follower限流配置

    • 重分配所需的数据复制完成后,临时限流策略会被移除

  • 修改副本因子

    • 通过kafka-reassign-partitions.sh配置

    • 如何选择合适的分区数

  • 性能测试工具

    • 生产者性能测试:kafka-producer-perf-test.sh脚本

    • 消费者性能测试:kafka-consumer-perf-test.sh脚本

  • 分区数和吞吐量的关系

    • 在一定限度内,吞吐量随分区数增加而上升,但由于磁盘、文件系统、I/O调度策略等影响,到一定程度时吞吐量会存在瓶颈或有所下降

  • 考量因素

    • 如果分区数过多,当集群中某个broker宕机,就会有大量分区需要进行leader角色切换,这个过程会耗费一定的时间,并且在此期间这些分区不可用。分区数越多,kafka的正常启动和关闭耗时也会越长,同时也会增加日志清理的耗时

    • 建议将分区数设定为broker的倍数

参考资料:《深入理解Kafka:核心设计与实践原理总结》-朱忠华

长按订阅更多精彩▼

如有收获,点个在看,诚挚感谢

Kafka核心设计与实践原理总结:基础篇相关推荐

  1. 《深入理解Kafka:核心设计与实践原理》笔误及改进记录

    2019年2月下旬笔者的有一本新书--<深入理解Kafka:核心设计与实践原理>上架,延续上一本<RabbitMQ实战指南>的惯例,本篇博文用来记录现在发现的一些笔误,一是给购 ...

  2. 深入理解Kafka核心设计与实践原理_01

    深入理解Kafka核心设计与实践原理_01 01_初识Kafka 1.1 基本概念 1.2 安装与配置 1.3 生产与消费 1.4 服务端参数配置 01_初识Kafka 1.1 基本概念 一个典型的 ...

  3. Kafka核心设计与实践原理总结:进阶篇

    作者:未完成交响曲,资深Java工程师!目前在某一线互联网公司任职,架构师社区合伙人! kafka作为当前热门的分布式消息队列,具有高性能.持久化.多副本备份.横向扩展能力.我学习了<深入理解K ...

  4. 新书《深入理解Kafka:核心设计与实践原理》上架,感谢支持~

    新书上架 初识 Kafka 时,笔者接触的还是 0.8.1 版本,Kafka 发展到目前的 2.x 版本,笔者也见证了Kafka的蜕变,比如旧版客户端的淘汰.新版客户端的设计.Kafka 控制器的迭代 ...

  5. 项目版本管理的最佳实践:gitflow基础篇

    对于项目版本管理,你是否存在这样的痛点:项目分支多而杂不好管理,git log界面commit信息错乱复杂无规范,版本回退不知道选择什么版本合适--. 项目版本管理的最佳实践系列,笔者将以两篇文章的形 ...

  6. 深入理解 Kafka :核心设计与实践 读书笔记

    第1章 初识 Kafka Kafka 架构有什么组件? 一个典型的 Kafka 体系架构包括若干 Producer.若干 Broker .若干 Consumer,以及一个 ZooKeeper 集群. ...

  7. 调试器工作原理之一——基础篇

    转自 http://blog.csdn.net/gqb_driver/article/details/13988001 英文原文:Eli Bendersky  翻译:伯乐在线- 陈舸 本文是一系列探究 ...

  8. python arduino电子书_Arduino电子设计实战指南:零基础篇 PDF 高清版

    给大家带来的一篇关于Arduino相关的电子书资源,介绍了关于Arduino.电子设计.零基础方面的内容,本书是由机械工业出版社出版,格式为PDF,资源大小29.3 MB,程晨编写,目前豆瓣.亚马逊. ...

  9. 一看就会,效率翻倍!在线设计必会技能(基础篇)

    [文末有福利哦] 摹客在线设计作为2020年国内设计工具新秀,一经推出就斩获了一大批产品经理和设计师的青睐. 除了功能强大.能力全面的特点外,摹客团队在工具的易用性方s面也做了非常多的努力. 其中不少 ...

最新文章

  1. linux 安装jdk yum安装 源码包安装
  2. Lync 2013部署(1)—AD准备
  3. windows下安装python+eclipse
  4. 优化Image设置Tiled类型产生过多的三角形顶点
  5. 如何实现 C/C++ 与 Python 的通信?
  6. android 读后台数据,Android 模拟后台数据返回 ,便与测试
  7. mui + php,GitHub - alphaphp/mui-kidApp: 基于 MUI 构建一个具有 90 +页面的APP应用
  8. php unid,微信扫码登陆/微信公交号 登录PHP 自适应 UnionID统一用户
  9. bing翻译api php,Auto Tag Slug使用 google翻译并删除bing api key提示
  10. PHP微信多级三级分佣系统,微信三级分销系统微信三级分销系统
  11. linux内核移植imx8,iMX8模块Ubuntu移植
  12. 【数据挖掘算法】(一)MSET 算法
  13. Python tkinter库窗口化爬虫
  14. android 系统 迷你pc,Remix mini:一款真正运行安卓系统的电脑
  15. 传统企业怎么通过抖音引流到微信
  16. 电脑端(PC)按键精灵2023——入门小白 详细 教程
  17. 1053 住房空置率 (20分)
  18. Python读取和操作Excel(.xlsx)文件
  19. 私有vlan(PVLAN)实验配置步骤
  20. JVM垃圾回收——ZGC垃圾收集器

热门文章

  1. 关于学习Python的一点学习总结(19->if及相关的符号运算)
  2. 关于学习Python的一点学习总结(4->成员资格->list->列表操作)
  3. codeforces 337D:树形dp[能到最远的一定可以到其他点]
  4. 【网络流】解题报告:luogu P2740 [USACO4.2]草地排水Drainage Ditches(Edmonds-Karp增广路,最大流模板)
  5. 计算机硬件系统公开课课件,计算机硬件系统的组成(公开课).ppt.ppt
  6. 3a三次方h c语言表达式,希尔伯特曲线——第八届蓝桥杯C语言B组(国赛)第三题...
  7. 软件测试在哪个城市好找工作,职业测试:你适合在哪个城市工作?
  8. linux sed 小数点,每天进步一点点——linux——sed
  9. python json dump输出中文_Python读写文件(json.dump())中文被转成Unicode问题
  10. Centos 7.5 安装Zabbix4.0