2019独角兽企业重金招聘Python工程师标准>>>

现象:

kafka producer一开始写入正常,慢慢的会大量超时异常,偶尔正常写入;在callback函数打印异常信息为

org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for testTopic-0 due to 64534 ms has passed since batch creation plus linger time

原因:

producer send方法的callback函数执行缓慢导致;

producer往broker发送数据时是串行的,只有上次batch全部写入broker,并且全部callback函数执行完毕后,才会继续下一次发送。如果上一次发送全部callback函数执行时间超过了request.timeout.ms(30s),就会导致后续batch的message发送时间大于创建时间30s以上,然后被producer丢弃并抛出异常;

方法:

1.加大request.timeout.ms值;

2.调整callback,使得batch的callback执行时间在request.timeout.ms之内;

转载于:https://my.oschina.net/u/134474/blog/1531556

kafka producer写入超时相关推荐

  1. Kafka Producer 实现源码分析

    前言 拥抱变化接手了 Kafka 平台,遂学习 0.10.0 线上版本的设计与实现.限于篇幅,本文不会逐行解析源码,而是从逻辑流程.设计模式.并发安全等方面学习各组件,笔记仅供个人 Review 一: ...

  2. kafka学习_《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ

    前言 之前有文章 <从0到1学习Flink>-- Flink 写入数据到 Kafka 写过 Flink 将处理后的数据后发到 Kafka 消息队列中去,当然我们常用的消息队列可不止这一种, ...

  3. 6张图为你分析Kafka Producer 消息缓存模型

    本文分享自华为云社区<图解Kafka Producer 消息缓存模型>,作者:石臻臻的杂货铺. 在阅读本文之前, 希望你可以思考一下下面几个问题, 带着问题去阅读文章会获得更好的效果. 发 ...

  4. 1.30.Flink SQL案例将Kafka数据写入hive

    1.30.Flink SQL案例将Kafka数据写入hive 1.30.1.1.场景,环境,配置准备 1.30.1.2.案例代码 1.30.1.2.1.编写pom.xml文件 1.30.1.2.2.M ...

  5. Kafka Producer幂等性

    Producer幂等性 在 Kafka 中,Producer 默认不是幂等性的,但我们可以创建幂等性 Producer.它其实是 0.11.0.0 版本引入的新功能.在此之前,Kafka 向分区发送数 ...

  6. Kafka Producer拦截器

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  7. SAP Data Intelligence Modeler里的Kafka Producer和Kafka Consumer

    首先本地将kafka的docker容器镜像下载到本地并运行: docker search kafka docker pull spotify/kafka docker run --name kafka ...

  8. 【kafka】Kafka Producer Sticky Partitioner kafka 生产者 粘性 分区器

    1.概述 转载:[译]Kafka Producer Sticky Partitioner 最近事情多有点犯懒,依然带来一篇译文:Apache Kafka Producer Improvements w ...

  9. tcp实时传输kafka数据_关于Kafka producer管理TCP连接的讨论

    在Kafka中,TCP连接的管理交由底层的Selector类(org.apache.kafka.common.network)来维护.Selector类定义了很多数据结构,其中最核心的当属java.n ...

最新文章

  1. mysql 判断是否日期类型_MySQL:日期类型
  2. 博世豪掷10亿欧元德国建半导体工厂,要掌握自动驾驶芯片化核心竞争力?
  3. Thread.join(), CountDownLatch、CyclicBarrier和 Semaphore区别,联系及应用
  4. 基于mycat的mysql_基于Mycat中间件的MySQL读写分离
  5. linux查看系统后台,求助,如何查看后台服务
  6. MaxCompute,基于Serverless的高可用大数据服务 1
  7. Testbench编写
  8. windows10应用商店下安装kali子系统
  9. RestClient
  10. C++内存泄漏和内存碎片的产生及避免策略
  11. 机器学习算法原理与实践(二)、meanshift算法图解以及在图像聚类、目标跟踪中的应用
  12. kubernetes 二进制安装(v1.20.16)(四)部署 master
  13. NOIP2017提高组模拟赛4 (总结)
  14. 把Ubuntu主机加入Window工作组
  15. orchestrator配置参数详解-Ⅱ
  16. javascript和jQurey增删元素
  17. Error: request for member ‘xxx’ in something not a structure or union。
  18. 不装软件查看Mac的CPU温度、风扇转速等信息
  19. 用python中If-Else做奇偶数的判断
  20. 注册卫星地图下载器2008

热门文章

  1. C#中通过Selenium定位a标签的问题
  2. 使用JWT保护你的Spring Boot应用 - Spring Security实战
  3. innoDB索引使用和优化汇总
  4. Android -- 屏幕适配
  5. nhibernate配置教程
  6. python语音属于什么语言_python到底是什么类型的语言
  7. ASP人事工资管理系统毕设
  8. executorservice 重启_iPhone7使用久了突然手机自动重启,多半原因出在这儿,进来看看...
  9. linux tasklet 实例,Linux tasklet 分析笔记Chapter 2
  10. java8 重复注解_Java8新特性_重复注解与类型注解