1、Kafka写数据流程:

  1. producer先从zookeeper的broker-list的节点找到partition(分区)的leader;
  2. producer将消息发送给该leader的partition;
  3. leader将消息写入本地log;
  4. followers从leader pull消息,实现replication的副本备份机制,同样写入本地log;
  5. replication写入本地log后向leader发送ack(确认);
  6. leader收到所有ISR中的replica的ACK后,增加HW(high watermark,最后commit的offset),并向producer发送ACK。
  7. producer收到leader的ack,证明生产的数据已被kafka成功写入

总之,producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐率)。

  1. 存储策略
    无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:
    1)基于时间:log.retention.hours=168
    2)基于大小:log.retention.bytes=1073741824
    需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。

  2. 分发策略:

1) partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。
2) 如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
3) 如果既没指定partition,又没有设置key,则会轮询选出一个partition 。

  1. kafka高性能的原因

1)Broker NIO异步消息处理:实现了IO线程与业务线程分离; 即生产者客户端缓存消息批量发送,消费者批量从broker获取消息,减少网络io次数,充分利用磁盘顺序读写的性能,并且还会将每个消息进行压缩传输。

2)磁盘顺序写:因为卡夫卡是将消息记录持久化到磁盘的,而磁盘的顺序读写性能是很高的;

3)零拷贝:采用了sendfile方法,从而允许操作系统将数据从Page Cache直接发送到网络socket缓冲区,只需要最后copy就可将数据复制到NIC缓冲区。

4)Page Cache:卡夫卡充分利用了操作系统的page cache机制,即利用操作系统自身的内存,而不是JVM空间内存,所以卡夫卡的读写操作基本上是基于内存的;

5)分区分段+索引:卡夫卡是采用分布式系统分区分桶的设计思想做的,所以 Kafka的message消息实际上是分布式存储在一个一个小的segment中的, Kafka又默认为分段后的数据文件建立了索引文件

2、kafka消费过程分析

(1)Kafka采用的是点对点的模式,消费者主动的去kafka集群拉取消息,与producer相同的是,消费者在拉取消息的时候也是找leader去拉取

  • 多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id。
  • 同一个消费组的消费者可以消费同一topic不同分区的数据,但是不会组内多个消费者消费同一分区的数据!!!
  • **消费者数少于分区:**会出现某个消费者消费多个partition数据的情况(此时消费的速度不及只处理一个partition的消费者的处理速度)
  • **消费者数多于分区:**多出来的消费者不消费任何partition的数据。
  • 建议消费者组的consumer的数量与partition的数量一致!

(2)消费者模型:消费的消费模型有两种:推送模型(push)和拉取(pull)

​ 1) 基于推送模型(push)的消息系统,由消息代理记录消费者的消费状态。消息代理在将消息推送到消费者后,标记这条消息为已消费,但这种方式无法很好地保证消息被处理。 消息代理发送完消息后,要设置状态为“已发送”,只有收到消费者的确认请求后才更新为“已消费”,这就需要消息代理中记录所有的消费状态,这种做法显然是不可取的。

​ 2) 拉取模型 (kafka): 由消费者自己记录消费状态,每个消费者互相独立地顺序读取每个分区的消息 。 这种由消费者控制偏移量的优点是:消费者可以按照任意的顺序消费消息。比如,消费者可以重置到旧的偏移量,重新处理之前已经消费过的消息;或者直接跳到最近的位置,从当前的时刻开始消费。

(3)消费高级API:

​ 1)高级API的优点:①写起来简单;不需要自行去管理offset,系统通过zookeeper自行管理 ; 不需要管理分区,副本等情况,系统自动管理 ;

​ ② 消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据(默认设置1分钟更新一下zookeeper中存的offset) ;

​ ③ 可以使用group来区分对同一个topic 的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响) 。

​ 2)高级API的缺点:① 不能自行控制offset(对于某些特殊需求来说);② 不能细化控制如分区、副本、zk等 。

(4)消费低级API

​ 1)低级API的优点:

​ ① 能够让开发者自己控制offset,想从哪里读取就从哪里读取 ;

​ ② 自行控制连接分区,对分区自定义进行负载均衡 ;

​ ③对zookeeper的依赖性降低(如:offset不一定非要靠zk存储,自行存储offset即可,比如存在文件或者内存中) 。

​ 2)低级API的缺点:

​ ① 太过复杂,需要自行控制offset,连接哪个分区,找到分区leader 等。

3、kafka的ACK应答机制:

  1. 在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为0、1、all(-1)。(保证消息不丢失)

(1) **0:**代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
(2)**1:**代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。
(3)**all:**代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。

4、设定消费位置

auto.offset.reset值含义解释
earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

5、kafka几种消费方式

  1. 自动提交offset: 在创建KafkaConsumer对象时,通过参数enable.auto.commit设定,true表示自动提交(默认)。自动提交策略由消费者协调器(ConsumerCoordinator)每隔${auto.commit.interval.ms}毫秒执行一次偏移量的提交。

  2. 手动提交offset: 生产环境中,需要在数据消费完全后再提交offset,也就是说在数据从kafka的topic取出来后并被逻辑处理后,才算是数据被消费掉,此时需要手动去提交topic的offset。

    手动提交方式一:同步提交
    consumer.commitSync()方式提交

    手动提交方式二:异步提交

    consumer.commitAsync(callback)方式提交

  3. 手动提交partition的offset:

6、kafka的性能分析

producer

单线程:

原始数据:

0.0.0.0 0 dat    2019-03-11 22:00:00 2019-03-11 22:00:00 11.184.32.234   1   4   1940  L:0,P:2   52.229.174.233  443 223.146.131.254 5636    F<-C  unknown type: 259 2 L:1,P:2    52.229.174.233  443 223.146.131.254 5636    F->C  unknown type: 259 2 mac: 56:00:03:0d:c0:02<-00:04:19:00:02:98 unknown type: 259 2 6 1345    10  7136    2019-03-11 22:00:00 2019-03-11 22:00:00  sni 38 .array709-prod.do.dsp.mp.microsoft.com

每条消息会在生产后加上id进行标注,以防每条数据都长一个样子

改进前:

数据量 发送用时(ms)
10000 1 300
100000 16 000
1000000 150 000

改进后:

数据量 发送用时(ms)
1000 386
10000 1 364
100000 13 160
1000000 35 139
数据量 发送用时
1000 186ms
10000 1 079ms
100000 11 018ms
1000000 34 670sm
10000000 305 043ms

每个测试都经过超过30次的测试,

我们可以看到效率提高了接近5倍的生产效率

目前的问题:

  1. 发送百万数据的时候前面4-5秒速度很快50-60m/s,后面会降到1m/s
  2. 发送千万数据前面4-5秒速度很快50-60m/s,后面会降到3-4m/s

10个线程:

1万数据:总时间:2411ms

10万数据:总时间:19924ms

100万数据:总时间:180000ms

结论

producer端增加线程数量并不能提高发送速率,单线程producer不存在线程切换生产速率更快

开启压缩后

数据量 发送用时
10000 730ms
100000 794ms
1000000 3 592ms
10000000 33 260ms

速率再次提高了10倍左右

consumer

基于partitions为50的topic进行测试

消费可以达到千兆网卡满带宽:111M/s 持续消费无问题

总数据量:8580万
总时间:228674 ms
平均每秒处理数据:375000

总数据量:3560万
总时间:35390 ms
平均每秒处理数据:1005000

小结

  1. 生产者端不管是使用单例的producer多线程生产还是使用多个producer同时时生产数据均不会提高生产性能
  2. 最初到现在生产性能提高42.8倍,应该还有比较大的提升空间,
  3. consumer的速率提高主要的方法是增加partition数量
  4. consumer端不存在开启压缩的选项也就是topic是否开启压缩不会影响到consuemr 的消费速率。
  5. consumer的消费速率目前可以达到千兆网卡满带宽。稳定在112M/S的数据传输

topic开启压缩的方法

在集群中运行:

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --add-config compression.type=lz4

这里的 --alter 命令可以对已经创建的topic进行增加配置修改配置等

在producer中增加:

compression.type=lz4

Broker配置

broker.id=107
listeners=PLAINTEXT://172.16.2.107:9092
num.network.threads=25
num.io.threads=24
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/hadoop/soft/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=slave106:2181,slave107:2181,slave108:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
auto.create.topics.enable = false
delete.topic.enable=true

Producer配置

#指定接受消息的代理
bootstrap.servers=172.16.2.107:9092
#当至少有一个follow拷贝到leader中的消息后再进行提交消息已发送
acks=0
#重试次数
retries=3
#控制批量发送消息的大小,减少访问服务端的次数提高性能,16KB
batch.size=16384
#batch.size=32768
#控制消息每隔多少时间进行发送,减少访问服务端次数
linger.ms=500
#设置缓冲区大小,发送速度大于这个值的话会导致缓冲被耗尽,32MB
#buffer.memory=134217728
buffer.memory=268435456
# 设置消息压缩
compression.type=lz4
#设置键,值以什么序列的形式进行发送
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

Consumer配置

#只需要指定kafka集群中的一个ip就可以了
bootstrap.servers=172.16.2.107:9092
#每个group id都会收到一份kafka 内部的消息
group.id=test
#自动提交 offset
enable.auto.commit=false
#发送的每条消息的key,和value使用的序列化方式
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer= org.apache.kafka.common.serialization.StringDeserializer

kafka原理和性能分析测试相关推荐

  1. 车载安全计算机是列控车载,毕业论文:列控车载设备动态监测系统的原理及性能分析...

    毕业论文:列控车载设备动态监测系统的原理及性能分析 发表时间:2013-4-26 23:00:09 毕业论文:列控车载设备动态监测系统的原理及性能分析 提示:本文原版含图表word版全文下载地址附后( ...

  2. 由浅入深探究mysql索引结构原理_性能分析与优化_由浅入深探究mysql索引结构原理、性能分析与优化...

    由浅入深探究mysql索引结构原理.性能分析与优化 第一部分:基础知识第二部分:MYISAM和INNODB索引结构1, 简单介绍B-tree B+ tree树 2, MyisAM索引结构 3, Ann ...

  3. php函数的实现原理及性能分析

    2019独角兽企业重金招聘Python工程师标准>>> 前言 在任何语言中,函数都是最基本的技术单元之一.对于php的函数,它具有哪些特点?函数调用是怎么实现?php函数的性能如何, ...

  4. mysql索引结构原理、性能分析与优化

    摘要: 第一部分:基础知识 第二部分:MYISAM和INNODB索引结构 1.简单介绍B-tree B+ tree树 2.MyisAM索引结构 3.Annode索引结构 4.MyisAM索引与Inno ...

  5. DevTools 实现原理与性能分析实战

    作者:vivo 互联网浏览器内核团队-Li Qingmei 一.引言 从 2008 年 Google 释放出第一版的 Chrome 后,整个 Web 开发领域仿佛被注入了一股新鲜血液,渐渐打破了 IE ...

  6. virtio+ovs转发原理和性能分析

    女主宣言 virtio 是一种 I/O 半虚拟化解决方案,ovs是一个虚拟交换机,利用软件的方式实现交换功能.本文将对virtio+ovs的转发原理进行介绍和并对其性能展开分析. PS:丰富的一线技术 ...

  7. 【王道计组笔记】高速缓存器:局部性原理及性能分析

    背景: 随着CPU的工作速度成指数级增长,但是主存速度跟不上,所以要提升主存速度非常重要. m个模块采用低位交叉编址的方式可以基本上将主存的带宽提升m倍,但是这依旧与CPU差距很大. [王道计组笔记] ...

  8. Chrome DevTools 实现原理与性能分析实战

    点击上方 前端Q,关注公众号 回复加群,加入前端Q技术交流群 作者:vivo 互联网浏览器内核团队-Li Qingmei 一.引言 从 2008 年 Google 释放出第一版的 Chrome 后,整 ...

  9. kafka性能测试、性能分析与性能调优

    前言:最近在做kafka.mq.redis.fink.kudu等在中间件性能压测,压测kafka的时候参考了这篇文章,大家可以借鉴下! 一.测试环境 测试使用到三台机器,机器配置如下: 共同配置: I ...

最新文章

  1. [WPF]winfom中ShowWPF新窗口时TextBox等控件无法输入问题解决方法 .
  2. Python编程基础:第二十节 函数Function
  3. pip list和pip freeze的区别(列出所有包,列出包的requirements格式)
  4. ES5和ES6中对于继承的实现方法
  5. MyBatis 插件原理与自定义插件-用代理模式我们就要解决几个问题
  6. 02-1.CSS边框,边界,布局相关笔记
  7. 郑州it java_郑州Java网站开发
  8. 最佳海报样机模板|让作品脱颖而出
  9. 一个简单的登陆功能模块
  10. 南方cass快捷键命令修改在哪_南方CASS快捷命令
  11. 业余无线电通信_如何办理业余无线电台执照
  12. VS2010开发的winform程序在XP系统打不开的原因(与ico图标像素有关)
  13. JAVA的file.separate
  14. 打开文件安全警告怎么关闭?
  15. jfinal中json字符串转对象类
  16. assertThat断言测试方法
  17. in和exist区别
  18. 100的阶乘实现方法(C语言)
  19. centos7内核默认包含在操作系统镜像中_定制centos7.7安装镜像默认内核5.5去除默认3.10内核
  20. 毕业后来到富士康(南京)软件公司

热门文章

  1. 哦天,原来锐角三角形还可以这么画!
  2. ppt中垂直V形列表添加多个列表
  3. eclipse 导出可运行jar包时三种Library handling的区别
  4. 《惢客创业日记》2020.11.28(周六)谁有谁的身不由己?
  5. 2010年6月刊:微博
  6. 分享几个小技巧教你图片怎么加边框
  7. Web or Native 谁才是元宇宙的未来(上)?
  8. 网管转身变“黑客”,企业该怎么保护自身的数据资产?
  9. halcon缺陷检测
  10. oracle增量获取数据