配置Kafka发送大消息
Apache Kafka是一个强大开源、分布式容错的事件流平台。然而,当我们使用Kafka发送大于配置的大小限制的消息时,它会给出一个错误。前文介绍了Spring集成Kafka,本文研究如何使用Kafka发送大消息。
问题描述
Kafka配置限制允许发送消息大小,默认为1M。然而,如果需要发送大消息,需要调整这些参数,本文使用Kafka2.5,在进入配置部分之前,首先需要安装Kafka。
安装
这里搭建单节点kafka代理,生产者应用发送消息给指定主题,该主题为单分区主题。
我们看到整个过程涉及多个环节,kafka生产者、kafka代理、主题、kafka消费者。因此,所有这些配置需要调整,以适用大消息传输。我们的目标是调整参数能够发送20M大消息。
Kafka生产者配置
第一步是消息产生地,我们使用Spring Kafka从应用发送消息给Kafka服务器。因此,首先需要更新 max.request.size
属性。详细细节可参考官方文档,该有效值为常量,对于Kafka Client库中的ProducerConfig.MAX_REQUEST_SIZE_CONFIG 定义,需要增加Spring Kafka依赖。我们配置其值为20971520
字节:
public ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");return new DefaultKafkaProducerFactory<>(configProps);
}
Kafka主题配置
生产者发送消息给Kafka代理上的主题。因此,接下来选哟配置kafka主题,这意味着需要更新max.message.bytes
属性,其默认值为1M。
该参数控制kafka压缩后(如果启用了压缩)最大记录批次大小,详细内容参考官方文档。我们可以通过cli命令手动配置该属性:
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic longMessage --partitions 1 \
--replication-factor 1 --config max.message.bytes=20971520
当然也可以通过Kafka Client进行配置:
public NewTopic topic() {NewTopic newTopic = new NewTopic(longMsgTopicName, 1, (short) 1);Map<String, String> configs = new HashMap<>();configs.put("max.message.bytes", "20971520");newTopic.configs(configs);return newTopic;
}
要发送大消息,至少需要配置这两个参数。
Kafka 代理配置
一个可选配置属性为:message.max.bytes
,用于允许所有在代理上的主题接收大于1M的消息。该属性控制kafka压缩后(如何启用了压缩)允许的最大记录批次大小,详细内容参考官方文档。
通过在server.properties配置文件中增加下列属性:
message.max.bytes=20971520
另外,该属性将使用message.max.bytes
和 max.message.bytes
两者中的最大值。
Kafka消费者配置
下面讨论Kafka消费端配置属性。虽然这些变化对消费大消息不是必须的,为了避免消费端程序性能问题,最好也调整相应参数:
max.partition.fetch.bytes
: 该属性限制从主题分区能消费的字节数,详细内容可参考官方文档。在Kafka Client 库中对应为ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG常量。fetch.max.bytes
: 该属性限制消费者从kafka服务器获取的字节数,kafka能够监听多个分区,详细内容参考官方文档。在Kafka Client 库中对应为ConsumerConfig.FETCH_MAX_BYTES_CONFIG 常量。
因此对配置消费者,需要创建KafkaConsumerFactory。另外需要说明的是:应该配置比topic/broker配置较大的值。
public ConsumerFactory<String, String> consumerFactory(String groupId) {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");return new DefaultKafkaConsumerFactory<>(props);
}
这里配置值于前面一样,是因为创建的主题仅有一个分区。但是FETCH_MAX_BYTES_CONFIG
值应该高于MAX_PARTITION_FETCH_BYTES_CONFIG
。当消费者监听多个分区时,FETCH_MAX_BYTES_CONFIG
表示从多个分区获取消息的大小,而MAX_PARTITION_FETCH_BYTES_CONFIG
表示从单个分区获取消息的大小。
其他选项
前面提及配置Kafka生产者、主题、代理和Kafka消费者中的不同参数,以实现发送、接收大消息。单通常应该避免使用Kafka发送大消息,处理大消息会消耗生产者和消费者更多的CPU和内存,最终在一定程度上限制了它们处理其他任务的能,还有可能导致终端用户高延迟问题。
还有其他一些调优方式:
- Kafka生产者提供了压缩消息选项,它支持不同的压缩方法。我们可以
compression.type property
属性配置。 - 可以将大消息存储在共享存储位置,并通过Kafka消息发送该位置。这可能是更快的方式,具有最小的处理开销。
- 另一种选择是在生产者端将大消息拆分为每个大小为1KB的小消息。之后,我们可以使用分区键将所有这些消息发送到单个分区,以确保正确的顺序。然后在消费者端可以从较小消息重构为完整大消息。
总结
在本文中,我们介绍了配置调优Kafka选项以发送大于1MB的大消息。包括生产者端、主题、代理服务和消费者端的配置选项。其中一些选项是强制配置,一些是可选配置,虽然消费者配置是可选的,但可以避免负面的性能影响。最后,我们还介绍了发送大消息的其他可能选项。
内容参考:[Send Large Messages With Kafka](Send Large Messages With Kafka)
配置Kafka发送大消息相关推荐
- 利用Kafka发送/消费消息-Java示例
利用Kafka发送/消费消息-Java示例 当使用命令行工具把基本的组件运行起来后,再使用Java client就很简单,这里是入门的第一个Java客户端程序,有很多需要深入理解的地方. 依赖配置 & ...
- kafka数据不丢失不重复_如何配置 KAFKA 使其消息不会丢失
不可靠的KAFKA 这里的不可靠是指代KAFKA其设计之初就为高性能而设计,其是允许消息丢失的,但经过多个版本的升级之后,通过KAFKA的相关配置,我们可以将其作为可靠的队列(不丢消息的队列). 在本 ...
- kafka 解决大消息发送和接收报错问题
kafka消息超过一定大小会报错如下: The message is 2044510 bytes when serialized which is larger than the maximum re ...
- python 查看kafka发送的消息格式及消息内容
from pykafka import KafkaClientclient = KafkaClient(hosts="0.0.0.0:9092") topic = client.t ...
- 配置Kafka消息保留时间
生产者发送消息给kafka,消息被追加值日志文件并保留一定周期(基于配置).本文探讨对Kafk主题配置消息保留时间. 基于时间保留 通过保留期属性,消息就有了TTL(time to live 生存时间 ...
- Kafka发送超过broker限定大小的消息时Client和Broker端各自会有什么异常?
欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...
- 【kafka】Kafka consumer处理大消息数据过大导致消费停止问题
文章目录 1.概述 2.案例分析 3.kafka的设计初衷 3.1 broker 配置 3.2 Consumer 配置 M.扩展 1.概述 转载:https://www.cnblogs.com/wyn ...
- 使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息
作者 | 辽天 来源 | 阿里巴巴云原生公众号 导读:本文将 rocktmq-spring-boot 的设计实现做一个简单的介绍,读者可以通过本文了解将 RocketMQ Client 端集成为 sp ...
- kafka 脚本发送_NWPC消息平台:在ecFlow系统中发送产品事件消息
本文属于介绍 NWPC 消息平台 系列文章. 本文介绍如何在基于 ecFlow 构建的数值预报业务系统中发送 NWPC 消息平台的 产品事件消息. 介绍 数值预报业务系统产品制作一般分为三个步骤: 监 ...
最新文章
- 如何选择视觉CV光源颜色
- android studio 加固和签名
- 程序员自学到底有没有用?网友们吵翻了...
- 初始化配置 libevent
- Centos-移动文件或目录-mv
- 2011阿里巴巴集团实习生招聘笔试题 CC++
- socket通信(1)概述
- Farseer.Net ORM开源框架 V0.x 教程目录
- SAP中国招聘内部顾问,工作职责是做客户项目,ABAP开发
- OpenJudge/Poj 1226 Substrings
- 为什么大公司一定要使用DevOps
- android动态更新配置文件,Android如何动态修改Manifest文件
- 基于ARP协议获取局域网内主机MAC地址
- java 根据模板,导出word并提供下载
- IEEE Access投稿流程经验分享
- 6.26 mongoDB是无法find未初始的值的,mongoose的Schema需具象化及个人商品Schema设计,租赁网登录态初次尝试,vue对对象未赋初值的监听,forEach等api深拷贝问题
- 毕业论文知网查重之应对办法
- Unity3D项目升级URP
- Android调用miui给权限,Android 11+MIUI12,APP任意获取用户隐私的路子被封死
- python如何输入一个矩阵_python怎么输入矩阵