Kafka Broker

工作流程

Zookeeper存储的Kafka信息

Kafka Broker总体工作流程

Broker参数

参数名称 描述
replica.lag.time.max.ms ISR中,如果Follower长时间未向Leader发送通信请求或同步数据,则将该Follower踢出ISR中,该时间阈值,默认30s
auto.leader.rebalance.enable 默认true,自动Leader Partition平衡
leader.imbalance.per.broker.percentage 默认10%,每个broker允许的不平衡的leader比率。如果每个broker超过了这个值,控制器会触发leader平衡
leader.imbalance.check.interval.seconds 默认300s,检查Leader负载是否平衡的间隔时间
log.segment.bytes kafka中的日志是分成一块块存储的,此配置是指log日志划分成块大小,默认值1g
log,index.interval,bytes 默认4kb,kafka每当写入4kb大小的日志,然后就往index文件里面记录一个索引
log.retention.hours kafka中数据保存的时间,默认7天
log. retention.minutes kafka中数据保存的时间,分钟级别,默认关闭
log. retention.ms kafka中数据保存的时间,毫秒级别,默认关闭
log.retention.check.interval.ms 检查数据是否超时的间隔,默认是5分钟
log.cleanup.policy 默认是delete,表示数据启用删除策略;如果设置为compact,表示所有的数据启用压缩策略

节点服役和退役

节点服役

新节点添加
1,启动新节点

./kafka-server-start.sh -daemon  ../config/server.properties

2,执行负载均衡

  • 创建一个要均衡的主题 vim topics-to-move.json
{"topics":[{"topic":first}],"version": 1
}
  • 生成一个负载均衡的计划
bin/kafka-reassign-partitions.sh --bootstrap-server ah101:9092 --topics-to-move-json-file   topics-to-move.json --broker-list "1001,1002,1003" --generate
  • 创建副本负载计划 vim increase-replication-factor.json
{"partitions": [{"topic": "first","partition": 0,"replicas": [1001,1002,1003]},{"topic": "first","partition": 1,"replicas": [1002,1001,1003]},{"topic": "first","partition": 2,"replicas": [1001,1002,1003]},{"topic": "first","partition": 3,"replicas": [1003,1002,1001]},{"topic": "first","partition": 4,"replicas": [1001,1002,1003]},{"topic": "first","partition": 5,"replicas": [1003,1001,1002]},{"topic": "first","partition": 6,"replicas": [1002,1001,1003]},{"topic": "first","partition": 7,"replicas": [1001,1003,1002]}],"version": 1
}
  • 执行副本存储计划
 ./kafka-reassign-partitions.sh --bootstrap-server ah101:9092  --reassignment-json-file increase-replication-factor.json --execute

  • 验证副本存储计划
./kafka-reassign-partitions.sh --bootstrap-server ah101:9092   --reassignment-json-file increase-replication-factor.json --verify

节点退出
  • 创建一个要均衡的主题 vim topics-to-move.json
{"topics":[{"topic":first}],"version": 1
}
  • 生成一个负载均衡的计划
bin/kafka-reassign-partitions.sh --bootstrap-server ah101:9092 --topics-to-move-json-file   topics-to-move.json --broker-list "1001,1002" --generate
  • 创建副本负载计划 vim remove-replication-factor.json
  • 执行副本存储计划
 ./kafka-reassign-partitions.sh --bootstrap-server ah101:9092  --reassignment-json-file remove-replication-factor.json --execute
  • 验证副本存储计划
./kafka-reassign-partitions.sh --bootstrap-server ah101:9092   --reassignment-json-file remove-replication-factor.json  --verify

Kafka副本

副本的基本信息

  1. 作用 :提高数据的可靠性
  2. 默认副本1个,生产环境一般配置为2个,保证数据的可靠性,太多副本会增加磁盘存储空间,增加网络上的数据传输,降低效率
  3. 副本分为 Leader和Follower 。kafka生产者只会将数据发送给leader,然后follower会从leader进行同步数据
  4. kafka分区所有的副本称为AR

AR=ISR+OSR
ISR: 表示和Leader保持同步的Follower的集合。如果Follower长时间未向Leader发送通信请求或者同步数据,则该数据会被踢出ISR列表,该时间阈值由replica.lag.time.max,ms参数设定,默认30s。Leader发生故障的时候,就会从ISR中选举新的Leader。
OSR:表示Follower与Leader副本同步的时候,延迟过多的副本。

Leader选举流程

Kafka集群中有一个broker 的Controller会被选举为Controller Leader,负责管理集群broker的上下线,所有的topic的分区副本分配和Leader选举等工作。

Follower故障处理细节

LEO(log end offset):每个副本的最后offset,LEO其实就是最新的offset+1
HW(high watermark): 所有副本中最小的LEO。

1)Follower故障

  • Follower发生故障后会被踢出ISR
  • 这个期间 Leader和Follower继续接受数据
  • 待该Follower恢复后,Follower会读取本地磁盘记录的上次HW,并将log文件高于HW部分截取掉,从HW开始向Leader进行同步
  • 等该Follwer的LEO大于等于该partitionde HW,则将Follower重新加入ISR

2)Leader故障

  • Leader发生故障的时候,会从ISR中选出一个新的Leader
  • 为保证多个副本之间数据一致性,其余的Follower会先将各自的log文件高于HW的部分裁掉,然后从新的Leader同步
    注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复

分区副本分配
如果一个topic 创建16个分区 3个副本
那么AR规律

特点:
1)第一列为每个broker id 的排序 比如0,1,2,3,比如 1001 1002,1003
2)根据broker的数量作为一组进行副本分配
比如如果是4个broker组成的集群 那么leader 是0,1,2,3 ,AR 和ISR的列表的第一个也必定的是0,1,2,3
3) 一把第一个是间隔为0 第2个间隔为1 ,第三个间隔为2,这个间隔数和副本设置的数量有关系的
比如设置副本数为3 那么最后的间隔最大数为2 (3-1)

手动调整分区副本存储

生产环境中,每台服务器的配置和性能不一致,但是kafka只会根据自己的代码规则创建对应的副本,就会导致个别服务器存储压力比较大。所以需要手动调整分区副本的存储。

需求: 创建一个新的topic (testTopic) ,4个分区,2个副本,。将该topic的所有副本都存储到broker0和broker1

步骤
1)创建topic

bin/kafka-topics.sh --bootstrap-server  ah101:9092 --create --partitions 4 --replication-factor 2 -topic testTopic
  1. 创建副本存储计划 (所有的副本存储在broker0 ,broker1)vim assign.json
{
"version":1,
"partitions":[{"topic":"testTopic","partition":0,"replicas":[0,1]},
{"topic":"testTopic","partition":1,"replicas":[0,1]},
{"topic":"testTopic","partition":2,"replicas":[1,0]},
{"topic":"testTopic","partition":3,"replicas":[1,0]}]
}

3) 执行副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server ah101:9092 --reassignment-json-file assign.json --execute

4) 验证副本存储计划执行的情况

bin/kafka-reassign-partitions.sh --bootstrap-server ah101:9092 --reassignment-json-file assign.json --verify

5)查看副本存储情况

bin/kafka-topics.sh --bootstrap-server   ah101:9092 --describe --topic testTopic

Leader Partition 负载均衡

正常情况下,Kafka本身会自动把Leader Partition均匀的分散在各个机器上,来保证每台机器的读写和吞吐量都是均匀的。但是如果某些broker宕机,会导致leader partition过于集中在其他少部分的几台broker上,这会导致这几台的broker读写请求压力过高,其他宕机的broker重启只后都是follower,读写请求很低,造成集群负载不均衡

参数名称 描述
auto.leader.rebalance.enable 默认true,自动Leader partition平衡。生产环境中国,Leader重选举代价比较大,可能会带来性能影响,建议设置false关闭
leader.imbalance.per,broker,percentage 默认10%,每个broker允许的不平衡的leader的比率。如果每个broker超过这个值,控制器会触发重平衡
leader.imbalance.check.interval.time 默认值100,检查Leader负载是否平衡的间隔时间

文件存储

1)topic数据存储机制
topic是逻辑上的概念,而partition是物理上的概念。每个partition对应一个log文件,该log文件存储的就是Producer生产的数据。Producer生产的数据会被不断的追加到该log文件的末端。为防止log文件过大导致数据定位效率低下,kafka采取分片和索引机制,将每个partition分为多个segement。每个segment包括 “.index”,“.log”,".timeindex"等文件。该文件命名规则 topic名称+分区序号,列如first-0。

说明: index和log文件以当前segment第一条消息的offset命名

通过工具可以查看index和log文件信息

kafka-run-class.sh  kafka.tools.DumpLogSegments --file  ./000000000000000000000000.indexkafka-run-class.sh  kafka.tools.DumpLogSegments --file  ./000000000000000000000000.log

2)如何在log文件中定位到offset=100的record?

  • 根据目标offset定位segment文件
  • 找到小于等于目标offset的最大offset对应的索引项
  • 定位log文件
  • 向下遍历找到目标record记录

3) 文件清理策略
kafka的日志默认保存时间7天,可以通过调整参数修改保存时间。
log.retention.hours 最低优先级小时 ,默认7天
log.retention.check.interval.ms 负责设置检查周期,默认5分钟

4)日志一旦超过设置的时间,怎么处理?
kafka提供的日志清理策略有两种
delete和compact两种

  • log.clean.policy=delete 所有的数据启用删除策略
    (1)基于时间:默认打开,以segment中所有记录中最大的时间戳作为该文件的时间戳
    (2)基于大小:默认关闭。超过设置所有日志总大小,删除最早的segment。 log.retention.bytes 默认-1,表示无穷大
  • log.clean.policy=compact
    对于相同key 不同value值,只保留最后一个版本。
    压缩后offset数据不是连续的。
    这种策略只适合特殊场景,比如消息的key为用户id,value是用户资料,通过这中压缩策略,整个消息就保存所有用户的最新资料。

5)高效读写数据

  • kafka本身是分布式集群,采用分区技术,并行度高
  • 读数据采取稀疏索引,可以快速的定位要消费数据的位置
  • 顺序写磁盘(省去大量磁头寻址时间)
  • 页缓存+0拷贝技术
    PageCahe:kafka重度依赖底层操作系统提供的pagecahce功能。当上层有写操作的时候,操作系统只是将数据写入pagecache,当读操作发生的时候,先从pagecache中读取,如果找不到,在从磁盘中读取

Kafka 消费者

Kafka的消费方式

pull模式:
consumer采用从broker主动拉取数据

push模式:
broker推送数据到consumer

kafka采用的pull方式,因为由broker决定消息发送速率,很难适应所有的消费者的消费速率。
pull模式不足之处:如果kafka没有数据,消费者可能会陷入死循环中,一直返回空数据。

消费的工作流程


消费者组:由多个消费者组成。形成一个消费者组的条件,所有的消费者的groupid相同

  • 消费者组内每个消费者负责消费不同的分区数据,一个分区只能由一个组内消费者消费。
  • 消费者组之间互不影响。所有的消费者都属于某个消费者组。即消费者组是逻辑上一个订阅者
  • 如果消费者组中添加更多消费者,超过主题分区的数量,则有一部分消费者会闲置。不会接受任何消息

消费者组的初始化流程:

coordinator:辅助实现消费者组初始化和分区分配。
coordinator节点选择=groupId的hashcode值%50(__consumer_offsets的分区数量)

  • 每个consumer都发送给JoinGroup
  • 选出一个consumer作为leader
  • 把消费的topic情况发送给leader消费者
  • leader会负责指定消费方案
  • 把消费方案发给coodinator
  • Coodinator把消费方案发给各个consumer,
  • 每个消费者都会和coordinator保持心跳(3s),一旦超时(session.time.out=45s),该消费者会被移除,并触发再平衡,或者消费者处理消息时间过长(max.poll.interval.ms=5分钟),也会触发再平衡

消费者重要参数

参数名称 描述
bootstrap.servers 向kafka集群建立初始连接用到的host/port列表
key.deserializer value.deserializer 指定接受消息的key和value的反序列化类型,一定要写全类名
group.id 标记消费者所属的消费者组
enable.auto.commit 默认为true,消费者会自动周期性的向服务器提交偏移量
auto.commit.interval.ms 如果设置了 enable.auto.commit=true,则该值定义了消费者偏移量向kafka提交的频率 默认5s
auto.offset.reset 当kafka中没有初始偏移量或者当前偏移量再服务器中不存在,该如何处理?earlist:自动重置偏移量到最早的偏移量;latest:默认,自动重置偏移量为最新的偏移量。none:如果消费者组原来偏移量不存在,则向消费者抛异常,anything:向消费者抛异常
offsets.topic.num.patitions _consumer_offsets的分区数,默认50
hearbeat.inteval.ms kafka消费者与coordinator之间的心跳时间,默认3s,该值必须夏鸥session.out.time
session.time.out kafka消费者与coordinator之间的心跳时间,默认45s
max.pooll.interval.ms 消费者处理消息的最大时长,默认5min
fetch.min.bytes 默认一字节,消费者获取服务端一批消息最小字节
featch.max,wait,ms 默认500ms,如果没有从服务器端获取到一批数据最小字节,时间到了,仍然返回数据
featch.max.bytes 默认 50m,消费者获取服务器端一批消息最大的字节数。
max.poll.records 一次poll拉取数据返回的消息的最大条数,默认500s

分区平衡以及再平衡

一个消费者组有多个消费者,一个topic有多个分区,那么问题是到底由哪个消费者来消费哪个分区的数据

kafka有四种主流分区分配策略: Range,RoundRobin,Sticky,CooperativeSticky
可以通过参数partition.assignment.strategy修改分区策略。默认策略是Range+CooperativeSticky

Range
Range是对每一个topic而言的

首先对同一个topic里面的分区按照序号进行排序,并对消费者按照字母顺序排序。
假如有7个分区,3个消费者,排序后将会是0,1,2,3,4,5,6。消费者是c0,c1,c2。
通过partition/consumers来决定每个消费者消费几个分区,如果除不尽,那么前面的几个消费者将会多消费1个分区。

例如: 7/3=2余1 那么 c0将会多消费一个消费者


注意
如果只是针对一个topic而言,c0消费者多消费一个分区影响不大,但是如果有多个topic,那么针对每个topic消费者c0都会将多消费一个分区,topic越多,c0消费的分区就会越多,容易产生数据倾斜

RoundRobin
RoundRobin针对集群中所有的topic而言

RoundRobin轮询分区策略,是把所有的partition和所有的consumer都列出来,然后按照hashcode进行排序,最后通过轮询算法来分发patition给到各个消费者。

Sticky
粘性分区: 在执行一次新的分配前,考虑上次分配的结果,尽量减少分配调动,可以节省大量的开销。
kafka0.11.x引入这种分配策略。首先会尽量均衡的放置分区掉消费上面,在出现统一消费组内消费者出现问题的时候,会尽量保持原有的分配的分区不变化

Offset位移

offset的默认维护位置
从0.9开始 ,consumer默认将offset保存在kafka一个内置topic中,该topic为_consumer_offsets。
0.9以前,consumer默认将offset保存在zookeeper。

_consumer_offsets 主题采用key value的形式存储数据。key是groupid+topic+分区号,value是当前offset的值。每隔一段时间内,kafka内部会对这个topic进行compact,也就是每个groupid+topic+分区号就保留最新的数据。

配置文件 config/consumer.properties,添加配置exclude.internal.topics=false,默认true,表示不能消费该系统主题,为了查看该系统主题数据,可以将参数修改为false

Kafka实战《原理2》相关推荐

  1. DataPipeline |《Apache Kafka实战》作者胡夕:Apache Kafka监控与调优

    胡夕,<Apache Kafka实战>作者,北航计算机硕士毕业,现任某互金公司计算平台总监,曾就职于IBM.搜狗.微博等公司.国内活跃的Kafka代码贡献者. 前言 虽然目前Apache ...

  2. Kafka设计原理看了又忘,忘了又看?

    什么是消息队列?简单来说,消息队列是存放消息的容器.客户端可以将消息发送到消息服务器,也可以从消息服务器获取消息. 作者:lbzhello来源:博客园|2019-07-18 09:17 什么是消息队列 ...

  3. 【kafka实战】分区重分配可能出现的问题和排查问题思路(生产环境实战,附视频)

    超强!!! Kafka高质量专栏学习大全,点我获取!!! 文章目录 前提 所有异常情况 1. TargetBroker若不在线,迁移脚本执行会失败 情景演示 2. TargetBroker在开始迁移过 ...

  4. Apache Kafka实战读书笔记(推荐指数:☆☆☆☆☆)

    Apache Kafka实战读书笔记(推荐指数:☆☆☆☆☆) 认识AK 快速入门 安装和启动 小案例 消息引擎系统 消息引擎范型 AK的概要设计 吞吐量/延时 消息持久化 负载均衡和故障转移: 伸缩性 ...

  5. Kafka消费者原理解析

    文章目录 消费者和消费组 创建Kafka消费者 rebalance 分区再均衡 rebalance触发时机 rebalance 分区分配策略 rebalance generatian rebalanc ...

  6. Kafka原理篇:图解kafka架构原理

    今天我们来深入讲解 Kafka 的架构和实现原理.[码哥]将从架构和细节入手,以生动的图深入讲解 Kafka 的实现原理. 我想很多同学之前可能已经看过很多 Kafka 原理相关的文章,但往往看时&q ...

  7. Kafka实战宝典:Kafka的控制器controller详解

    一.控制器简介 控制器组件(Controller),是 Apache Kafka 的核心组件.它的主要作用是在 Apache ZooKeeper 的帮助下管理和协调整个 Kafka 集群.集群中任意一 ...

  8. Kafka实战-Flume到Kafka

    1.概述 前面给大家介绍了整个Kafka项目的开发流程,今天给大家分享Kafka如何获取数据源,即Kafka生产数据.下面是今天要分享的目录: 数据来源 Flume到Kafka 数据源加载 预览 下面 ...

  9. 《Apache Kafka实战》读书笔记-调优Kafka集群

    <Apache Kafka实战>读书笔记-调优Kafka集群 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 一.确定调优目标 1>.常见的非功能性要求 一.性能( ...

  10. 简单分析KafKa工作原理

    架构图 Producer:Producer即生产者,消息的产生者,是消息的入口. kafka cluster: Broker:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我 ...

最新文章

  1. 字符设备和块设备的区别
  2. classloader
  3. 最大功率点跟踪_ADI公司推出集成最大功率点跟踪和I2C的80V降压升压电池充电控制器...
  4. 电机速度曲线规划2:S形速度曲线设计与实现
  5. HTML5 API详解(10):sessionStorage 你用过吗?
  6. LabVIEW: 无法执行该VI。
  7. (1)剑指Offer之斐波那契数列问题和跳台阶问题
  8. 统计数据库中各个表和空间使用情况
  9. 如何下载国家自然科学基金申请书的模板
  10. 2021有什么好的入耳式耳机推荐?耳机热销性价比牌子排行榜单推荐!
  11. 协议僵化 or 协议僵化
  12. 海思对接索尼ECX334 RGB OLED屏总结
  13. VS code更改背景图片和颜色
  14. SQLite的使用------图片存储
  15. 机器学习简介及常用算法
  16. 2.13学习生活总结
  17. 开发im即时通讯如何用Netty实现心跳机制、断线重连机制
  18. 农行c3开发语言,【解密】C3境外系统全面崛起,农行谱写新篇章
  19. 移动数据和软件更新系统及方法
  20. 运气指数测试软件,测一测最近的运势如何,有什么测试运势的软件

热门文章

  1. Linux系统中的磁盘管理
  2. 精仿小鸟云官网高大上模板,可做对接IDC站
  3. 周杰伦中文网登录页面
  4. University's Little_Mess Note(more) [李园7舍_404]
  5. 解决vue在IE11读取缓存的问题
  6. hive 按行打印出截止日期和开始日期之间的日期
  7. 帕斯卡计算机介绍,帕斯卡计算机:第一台被写入百科全书的计算机
  8. 夏洛克第四季第五季剧本已写好 或为最好作品!
  9. 液晶显示器原理和应用
  10. 电子屏幕的51c语言程序,单片机控制LCD液晶显示器(含程序)