【kafka】Kafka consumer处理大消息数据过大导致消费停止问题
文章目录
- 1.概述
- 2.案例分析
- 3.kafka的设计初衷
- 3.1 broker 配置
- 3.2 Consumer 配置
- M.扩展
1.概述
转载:https://www.cnblogs.com/wynjauu/articles/9409686.html
昨天线上发现一个现象:kafka一个topic有数据生产,但是没有消费了,各种排查es集群、内存使用情况等都正常,但是就是消费线程死了一样,重启jboss也没有用,最后发现是因为消费该topic的kafka八的partition因为遇到了大数据超过1M而都被堵塞(我们是四台机器每个机器两个partation消费)。
下面是网上看到到一个解释博客:
2.案例分析
处理kafka consumer的程序的时候,发现如下错误:
ERROR [2017-01-12 07:16:02,466] com.flow.kafka.consumer.main.KafkaConsumer: Unexpected Error Occurred
! kafka.common.MessageSizeTooLargeException: Found a message larger than the maximum fetch size of this consumer on topic codeTopic partition 3 at fetch offset 94. Increase the fetch size, or decrease the maximum message size the broker will allow.
! at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:91) ~[pip-kafka-consumer.jar:na]
! at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) ~[pip-kafka-consumer.jar:na]
! at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) ~[pip-kafka-consumer.jar:na]
! at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) ~[pip-kafka-consumer.jar:na]
! at com.flow.kafka.consumer.main.KafkaConsumer$KafkaRiverFetcher.run(KafkaConsumer.java:291) ~[original-pip-kafka-consumer.jar:na]
! at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_51]
! at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_51]
! at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
如上log可以看出,问题就是有一个较大的消息数据在codeTopic的partition 3上,然后consumer未能消费,提示我可以减小broker允许进入的消息数据的大小,或者增大consumer程序消费数据的大小。
从log上来看一目了然,如果要解决当前问题的话,
- 减小broker消息体大小(设置message.max.bytes参数);
- 增大consumer获取数据信息大小(设置
fetch.message.max.bytes
参数)。默认broker消息体大小为1000000字节即为1M大小。
消费者方面:fetch.message.max.bytes——>这将决定消费者可以获取的数据大小。
broker方面:replica.fetch.max.bytes——>这将允许broker的副本发送消息在集群并确保消息被正确地复制。如果这是太小,则消息不会被复制,因此,消费者永远不会看到的消息,因为消息永远不会承诺(完全复制)。
broker方面:message.max.bytes——>可以接受数据生产者最大消息数据大小。
由我的场景来看较大的消息体已经进入到了kafka,我这里要解决这个问题,只需要增加consumer的fetch.message.max.bytes数值就好。我单独把那条数据消费出来,写到一个文件中发现那条消息大小为1.5M左右,为了避免再次发生这种问题我把consumer程序的fetch.message.max.bytes参数调节为了3072000即为3M,重启consumer程序,查看log一切正常,解决这个消费错误到此结束,下面介绍一下kafka针对大数据处理的思考。
3.kafka的设计初衷
Kafka设计的初衷是迅速处理小量的消息,一般10K大小的消息吞吐性能最好(可参见LinkedIn的kafka性能测试)。但有时候,我们需要处理更大的消息,比如XML文档或JSON内容,一个消息差不多有10-100M,这种情况下,Kakfa应该如何处理?
针对这个问题,可以参考如下建议:
最好的方法是不直接传送这些大的数据。如果有共享存储,如NAS, HDFS, S3等,可以把这些大的文件存放到共享存储,然后使用Kafka来传送文件的位置信息。
第二个方法是,将大的消息数据切片或切块,在生产端将数据切片为10K大小,使用分区主键确保一个大消息的所有部分会被发送到同一个kafka分区(这样每一部分的拆分顺序得以保留),如此以来,当消费端使用时会将这些部分重新还原为原始的消息。
第三,Kafka的生产端可以压缩消息,如果原始消息是XML,当通过压缩之后,消息可能会变得不那么大。在生产端的配置参数中使用compression.codec和commpressed.topics可以开启压缩功能,压缩算法可以使用GZip或Snappy。
不过如果上述方法都不是你需要的,而你最终还是希望传送大的消息,那么,则可以在kafka中设置下面一些参数:
3.1 broker 配置
message.max.bytes
(默认:1000000) – broker能接收消息的最大字节数,这个值应该比消费端的fetch.message.max.bytes更小才对,否则broker就会因为消费端无法使用这个消息而挂起。log.segment.bytes
(默认: 1GB) – kafka数据文件的大小,确保这个数值大于一个消息的长度。一般说来使用默认值即可(一般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。replica.fetch.max.bytes
(默认: 1MB) – broker可复制的消息的最大字节数。这个值应该比message.max.bytes大,否则broker会接收此消息,但无法将此消息复制出去,从而造成数据丢失。
3.2 Consumer 配置
fetch.message.max.bytes
(默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。所以,如果你一定要选择kafka来传送大的消息,还有些事项需要考虑。要传送大的消息,不是当出现问题之后再来考虑如何解决,而是在一开始设计的时候,就要考虑到大消息对集群和主题的影响。
性能: 根据前面提到的性能测试,kafka在消息为10K时吞吐量达到最大,更大的消息会降低吞吐量,在设计集群的容量时,尤其要考虑这点。
可用的内存和分区数:Brokers会为每个分区分配replica.fetch.max.bytes
参数指定的内存空间,假设replica.fetch.max.bytes=1M
,且有1000个分区,则需要差不多1G的内存,确保 分区数最大的消息不会超过服务器的内存,否则会报OOM错误。
同样地,消费端的fetch.message.max.bytes
指定了最大消息需要的内存空间,同样,分区数最大需要内存空间 不能超过服务器的内存。所以,如果你有大的消息要传送,则在内存一定的情况下,只能使用较少的分区数或者使用更大内存的服务器。
垃圾回收:到现在为止,我在kafka的使用中还没发现过此问题,但这应该是一个需要考虑的潜在问题。更大的消息会让GC的时间更长(因为broker需要分配更大的块),随时关注GC的日志和服务器的日志信息。如果长时间的GC导致kafka丢失了zookeeper的会话,则需要配置zookeeper.session.timeout.ms参数为更大的超时时间。
M.扩展
【kafka】控制台默认不能发送大数据 4096 4095 4129
消息在Apache Kafka中被截断
【kafka】Kafka consumer处理大消息数据过大导致消费停止问题相关推荐
- 大数据是如何定义,多大的数据是大数据?
大数据是什么? 多大的数据叫大数据? 很多没有接触过大数据的人,都很难清楚地知道,究竟多大的数据量才可以称之为大数据.那么,根据数据收集的端口,企业端与个人端之间,大数据的数量级别是不同的. 企业端( ...
- Mysql学习总结(75)——并发量大、数据量大的互联网业务数据库设计军规
一.基础规范 (1)必须使用InnoDB存储引擎 解读:支持事务.行级锁.并发性能更好.CPU及内存缓存页优化使得资源利用率更高 (2)新库使用utf8mb4字符集 解读:万国码,无需转码,无乱码风险 ...
- Mysql学习总结(60)——并发量大、数据量大的互联网业务数据库设计规范总结
一.基础规范 (1)必须使用InnoDB存储引擎 解读:支持事务.行级锁.并发性能更好.CPU及内存缓存页优化使得资源利用率更高 (2)表字符集默认使用utf8,必要时候使用utf8mb4 解读:(1 ...
- 10亿数据找出前100大的数据(网易大数据面试算法题)
精华在评论区.... 当时去面试的时候现场现写,憋了将近一个小时,用递归实现了,估计问题很多,不是人家怎么可能不要我,哈哈哈,开个玩笑: 思路就是新建一个长度为100数组array1,把前100个元素 ...
- 配置Kafka发送大消息
Apache Kafka是一个强大开源.分布式容错的事件流平台.然而,当我们使用Kafka发送大于配置的大小限制的消息时,它会给出一个错误.前文介绍了Spring集成Kafka,本文研究如何使用Kaf ...
- (转)超全面设计指南:如何做大屏数据可视化设计?
数据可视化是一门庞大系统的科学,本文所有讨论仅针对大屏数据可视化这一特定领域.管中窥豹,如有遗漏或不足之处欢迎大家讨论交流. 文章结构及思维导图: 一.基础概念 1. 什么是数据可视化 把相对复杂.抽 ...
- 什么是大数据 究竟多大才算是大数据,大数据怎么学习?
大数据,什么是大数据呢?多大的数据叫大数据?红火一时的数据分析走向了我们,纷纷称不分析数据企业将长久不了,可是究竟什么样的数据才是大数据呢,什么样的数据才是最大的呢? 如果你没有接触过大数据,那么你就 ...
- 大屏数据可视化设计指南!
基础概念 1. 什么是数据可视化 把相对复杂.抽象的数据通过可视的方式以人们更易理解的形式展示出来的一系列手段叫做数据可视化,数据可视化是为了更形象地表达数据内在的信息和规律,促进数据信息的传播和应用 ...
- 聊聊大数据(一)——大数据的存储
"大数据"现在可谓越来越火了,不管是什么行业,也不敢是不是搞计算机的,都要赶个集,借着这股热潮,亦或炒作,亦或大干一番.尤其是从事IT行业的,不跟"大数据"沾点 ...
最新文章
- Matlab数据的可视化 -- 线性图函数plot
- python简单代码hello-小白学 Python(1):开篇
- head first servlet jsp 学习笔记
- 差点就被联通客服给营销了...
- SAP Fiori和Vue的结合会给企业级应用软件的UI开发带来什么?
- MySQL查询之聚合查询
- 优秀学生专栏——董超
- 基于LINQ to SQL的WEB开发三层架构(1)
- 如何为Linux安装Go语言
- python名称空间_python基础:名称空间与作用域
- python发音模块-python 利用pyttsx3文字转语音
- Spark Accumulator累加器
- UML概要基础知识(待完善)
- pyecharts本地文件_pyecharts 模块的简单使用(可视化神器)
- 今日头条面试——iOS开发面试题
- 7-4 计算存款利息(10 分)
- html5调用papy支付,Payment
- Linux定时任务与开机自启动脚本(cron与crontab)
- linux pam 解锁_linux pam锁定用户
- linux系统怎样安装驱动程序,linux下网卡驱动安装全过程