调用kafka producer发送数据时,发现延迟级别在10-200ms不等,与正常的kafka写入速度不匹配,于是开始找问题~

一.场景:

一批数据,需要遍历每个数据并发送数据细节的信息到kafka,下面是我原始代码,每个人发送后执行一次flush操作。

val results = Array[DataObject](...)
results.foreach(data => {val info = new ProducerRecord[String, String](topic, message)producer.send(info)
})
kafka.flush()

服务器执行延迟在10-200ms不等

二.可能原因分析:

1.send 函数造成阻塞

    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {return this.send(record, (Callback)null);}public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);return this.doSend(interceptedRecord, callback);}

查看源码的send逻辑,一种有回调函数,一种没有回调函数,所以这里send是异步执行,不会造成堵塞,排除

2.flush 函数造成阻塞

    public void flush() {log.trace("Flushing accumulated records in producer.");this.accumulator.beginFlush();this.sender.wakeup();try {this.accumulator.awaitFlushCompletion();} catch (InterruptedException var2) {throw new InterruptException("Flush interrupted.", var2);}}

flush 这里accumulator会调用await相关方法,查看官方API的解释是:

flush()
Invoking this method makes all buffered records immediately available to send (even if linger.ms is greater than 0) and blocks on the completion of the requests associated with these records.

调用此方法可使所有缓冲记录立即可用于发送(即使linger.ms大于0)并在与这些记录关联的请求完成时发生阻塞。 ok,找到问题

三.Flush 原理

基于flush引起的延迟,首先看一下kafka生产的过程

Step1:异步调用send发送日志,根据Properties的配置对kv进行序列化

Step2::根据k hash 得到分区信息,追加到对应topic下的partition,这里先会写入到本地缓存区

Step3: 本地缓存写入后,有独立的线程传送向producer发送ACK

1.分析:

flush 是将第二步写到缓存区的数据强制推送发送,正常情况下清空缓存区操作通过参数配置实现:

batch.size 离线缓存达到该size时执行一次flush

linger.ms 达到该时间间隔时,执行一次flush

调用flush时,会清空缓存区内存,调用 awaitFlushCompletion 时需要等待缓存区清空,这里会造成线程的堵塞

    public void awaitFlushCompletion() throws InterruptedException {try {Iterator i$ = this.incomplete.all().iterator();while(i$.hasNext()) {RecordBatch batch = (RecordBatch)i$.next();batch.produceFuture.await();}} finally {this.flushesInProgress.decrementAndGet();}}

awaitFlushCompletion 将当前缓存区数据构造迭代器循环发送,并在finally阶段调整offset。

这里我设置发送延迟时间为1000ms

我的实际发送时间在1000ms以内,所以每次发送调用 flush 都会造成延迟,相当于手动调用频繁的刷新缓存区,增加的IO等待的时间,违背了批处理减少IO的规则,所以造成kafka写入时长增加,这里取消flush,通过参数控制 producer 生产解决问题。

第一次时间长是因为初始化kafka服务端,和最一开始添加 flush 相比,时间消耗基本可以忽略。

Kafka produce flush 引起的性能分析相关推荐

  1. kafka原理和性能分析测试

    1.Kafka写数据流程: producer先从zookeeper的broker-list的节点找到partition(分区)的leader: producer将消息发送给该leader的partit ...

  2. 关于 Rocksdb 性能分析 需要知道的一些“小技巧“ -- perf_context的“内功” ,systemtap、perf、 ftrace的颜值

    文章目录 内部工具 包含头文件 接口使用 核心指标 Perf Context IOStats Context 外部工具 Systemtap 工具 Perf工具 Ftrace 工具 2020.8.20 ...

  3. 从 ES 到 Kylin,斗鱼客户端性能分析平台进化之旅

    一.背景 斗鱼是一家面向大众用户的在线直播平台,每天都有超大量的终端用户在使用斗鱼各客户端参与线上互动.伴随业务的迅猛发展,斗鱼需要对客户端采集到的性能数据进行统计和分析,开发出具有多维度分析图表和数 ...

  4. mysql索引结构原理、性能分析与优化

    摘要: 第一部分:基础知识 第二部分:MYISAM和INNODB索引结构 1.简单介绍B-tree B+ tree树 2.MyisAM索引结构 3.Annode索引结构 4.MyisAM索引与Inno ...

  5. golang调用java的函数_大话golang性能分析(一):profile基本原理

    引言:好久没分享了,不多废话了,准备一个专题分三期来分享下golang的性能分析. O 专题目标 理解profile基本原理 熟悉go常用性能分析工具pprof 快速对线上服务的cpu.内存.goro ...

  6. go pprof 性能分析

    Go 的运行时性能分析接口都位于 runtime/pprof 包中.只需要调用 runtime/pprof 库即可得到我们想要的数据. CPU 性能分析 编写代码,生成分析报告 假设我们实现了这么一个 ...

  7. mysql监控和性能分析工具

    mysql监控和性能分析工具 Mysql作为广泛应用的数据库系统,平时运维工作中对她的监控必不可少,现在把我对Mysql数据库的监控体会写成下文,欢迎拍砖. 无论是DBA或是SA,监控的目标都很明确, ...

  8. 性能分析之两个性能瓶颈分析案例

    最近处理了几个项目中的性能问题,来跟大家唠唠. 这几个问题是非常常见的. 性能瓶颈就有这么个特点,大部分瓶颈分析到最后,都给人有一种猛拍大腿突然醒悟的感觉.但是在分析到具体的原因之前,都是抓耳挠腮,百 ...

  9. Go 学习笔记(81)— Go 性能分析工具 pprof

    Go 语言工具链中的 go pprof 可以帮助开发者快速分析及定位各种性能问题,如 CPU消耗 .内存分配及阻塞分析 .具体作用如下: 性能分析首先需要使用 runtime.pprof 包嵌入到待分 ...

最新文章

  1. RepVGG:极简架构,SOTA性能,论文解读
  2. 如何训练大学生的工程实战能力
  3. java mysql防重复提交_防止数据重复提交的6种方法(超简单)!
  4. graphpad数据小数点_GraphPad Prism 统计指南 | 关于异常值(Outlier),你真的了解吗?...
  5. mysql打错了怎么办_数据库出错了怎么办?
  6. form表单通过checkbox_HTML表单标签基础
  7. Illustrator 教程,如何在 Illustrator 中创建颜色渐变?
  8. 恢复误删文件 Windows File Recovery
  9. Redisson + Lettuce实现
  10. Process Explorer简易图文教程(下)
  11. loadrunner 11 的下载和安装
  12. 用css和js分别实现三级导航菜单
  13. 这个是什么错误,求大神解答
  14. 如何将Nios II硬件和软件合成一个文件(NIOS II)(硬件)(软件)(合并)
  15. 深度学习之一---什么是神经网络
  16. @vue/composition-api/dist/vue-composition-api.mjs in ./node_modules/vue-demi/lib/index.mjs 报错
  17. Ubuntu burg
  18. 搜索引擎优化(SEO)之 前端性能优化技巧
  19. mysql高级 tigger触发器 --[1]
  20. 谱半径一定大于0_图的无符号拉普拉斯谱半径与最大度

热门文章

  1. 根据年份判断属相php,自动算出生肖属相的c程序编写程序,输入一个年份,判断该年属相.(提示switch-case......
  2. 51单片机 AT24C02 PROTEUS 读写程序 源码
  3. android WindowManager 应用内部悬浮窗口总结
  4. Android车机初体验
  5. Python统计文件夹大小
  6. 手机芯片研发有多难_手机芯片设计很简单?详解手机芯片设计的难点
  7. 【CST】-01软件安装及熟悉
  8. ubuntu18.04 搭建ffmpeg踩坑
  9. python中美元人汇率_Python爬虫练习:爬取美元历史汇率
  10. python中函数的定义_Python函数是什么_如何定义和调用函数?