Apache Kafka是一个强大开源、分布式容错的事件流平台。然而,当我们使用Kafka发送大于配置的大小限制的消息时,它会给出一个错误。前文介绍了Spring集成Kafka,本文研究如何使用Kafka发送大消息。

问题描述

Kafka配置限制允许发送消息大小,默认为1M。然而,如果需要发送大消息,需要调整这些参数,本文使用Kafka2.5,在进入配置部分之前,首先需要安装Kafka。

安装

这里搭建单节点kafka代理,生产者应用发送消息给指定主题,该主题为单分区主题。

我们看到整个过程涉及多个环节,kafka生产者、kafka代理、主题、kafka消费者。因此,所有这些配置需要调整,以适用大消息传输。我们的目标是调整参数能够发送20M大消息。

Kafka生产者配置

第一步是消息产生地,我们使用Spring Kafka从应用发送消息给Kafka服务器。因此,首先需要更新 max.request.size 属性。详细细节可参考官方文档,该有效值为常量,对于Kafka Client库中的ProducerConfig.MAX_REQUEST_SIZE_CONFIG 定义,需要增加Spring Kafka依赖。我们配置其值为20971520字节:

public ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");return new DefaultKafkaProducerFactory<>(configProps);
}

Kafka主题配置

生产者发送消息给Kafka代理上的主题。因此,接下来选哟配置kafka主题,这意味着需要更新max.message.bytes属性,其默认值为1M。

该参数控制kafka压缩后(如果启用了压缩)最大记录批次大小,详细内容参考官方文档。我们可以通过cli命令手动配置该属性:

./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic longMessage --partitions 1 \
--replication-factor 1 --config max.message.bytes=20971520

当然也可以通过Kafka Client进行配置:

public NewTopic topic() {NewTopic newTopic = new NewTopic(longMsgTopicName, 1, (short) 1);Map<String, String> configs = new HashMap<>();configs.put("max.message.bytes", "20971520");newTopic.configs(configs);return newTopic;
}

要发送大消息,至少需要配置这两个参数。

Kafka 代理配置

一个可选配置属性为:message.max.bytes,用于允许所有在代理上的主题接收大于1M的消息。该属性控制kafka压缩后(如何启用了压缩)允许的最大记录批次大小,详细内容参考官方文档。

通过在server.properties配置文件中增加下列属性:

message.max.bytes=20971520

另外,该属性将使用message.max.bytesmax.message.bytes 两者中的最大值。

Kafka消费者配置

下面讨论Kafka消费端配置属性。虽然这些变化对消费大消息不是必须的,为了避免消费端程序性能问题,最好也调整相应参数:

  • max.partition.fetch.bytes: 该属性限制从主题分区能消费的字节数,详细内容可参考官方文档。在Kafka Client 库中对应为ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG常量。

  • fetch.max.bytes: 该属性限制消费者从kafka服务器获取的字节数,kafka能够监听多个分区,详细内容参考官方文档。在Kafka Client 库中对应为ConsumerConfig.FETCH_MAX_BYTES_CONFIG 常量。

因此对配置消费者,需要创建KafkaConsumerFactory。另外需要说明的是:应该配置比topic/broker配置较大的值。

public ConsumerFactory<String, String> consumerFactory(String groupId) {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");return new DefaultKafkaConsumerFactory<>(props);
}

这里配置值于前面一样,是因为创建的主题仅有一个分区。但是FETCH_MAX_BYTES_CONFIG 值应该高于MAX_PARTITION_FETCH_BYTES_CONFIG。当消费者监听多个分区时,FETCH_MAX_BYTES_CONFIG 表示从多个分区获取消息的大小,而MAX_PARTITION_FETCH_BYTES_CONFIG表示从单个分区获取消息的大小。

其他选项

前面提及配置Kafka生产者、主题、代理和Kafka消费者中的不同参数,以实现发送、接收大消息。单通常应该避免使用Kafka发送大消息,处理大消息会消耗生产者和消费者更多的CPU和内存,最终在一定程度上限制了它们处理其他任务的能,还有可能导致终端用户高延迟问题。

还有其他一些调优方式:

  • Kafka生产者提供了压缩消息选项,它支持不同的压缩方法。我们可以compression.type property属性配置。
  • 可以将大消息存储在共享存储位置,并通过Kafka消息发送该位置。这可能是更快的方式,具有最小的处理开销。
  • 另一种选择是在生产者端将大消息拆分为每个大小为1KB的小消息。之后,我们可以使用分区键将所有这些消息发送到单个分区,以确保正确的顺序。然后在消费者端可以从较小消息重构为完整大消息。

总结

在本文中,我们介绍了配置调优Kafka选项以发送大于1MB的大消息。包括生产者端、主题、代理服务和消费者端的配置选项。其中一些选项是强制配置,一些是可选配置,虽然消费者配置是可选的,但可以避免负面的性能影响。最后,我们还介绍了发送大消息的其他可能选项。

内容参考:[Send Large Messages With Kafka](Send Large Messages With Kafka)

配置Kafka发送大消息相关推荐

  1. 利用Kafka发送/消费消息-Java示例

    利用Kafka发送/消费消息-Java示例 当使用命令行工具把基本的组件运行起来后,再使用Java client就很简单,这里是入门的第一个Java客户端程序,有很多需要深入理解的地方. 依赖配置 & ...

  2. kafka数据不丢失不重复_如何配置 KAFKA 使其消息不会丢失

    不可靠的KAFKA 这里的不可靠是指代KAFKA其设计之初就为高性能而设计,其是允许消息丢失的,但经过多个版本的升级之后,通过KAFKA的相关配置,我们可以将其作为可靠的队列(不丢消息的队列). 在本 ...

  3. kafka 解决大消息发送和接收报错问题

    kafka消息超过一定大小会报错如下: The message is 2044510 bytes when serialized which is larger than the maximum re ...

  4. python 查看kafka发送的消息格式及消息内容

    from pykafka import KafkaClientclient = KafkaClient(hosts="0.0.0.0:9092") topic = client.t ...

  5. 配置Kafka消息保留时间

    生产者发送消息给kafka,消息被追加值日志文件并保留一定周期(基于配置).本文探讨对Kafk主题配置消息保留时间. 基于时间保留 通过保留期属性,消息就有了TTL(time to live 生存时间 ...

  6. Kafka发送超过broker限定大小的消息时Client和Broker端各自会有什么异常?

    欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...

  7. 【kafka】Kafka consumer处理大消息数据过大导致消费停止问题

    文章目录 1.概述 2.案例分析 3.kafka的设计初衷 3.1 broker 配置 3.2 Consumer 配置 M.扩展 1.概述 转载:https://www.cnblogs.com/wyn ...

  8. 使用 rocketmq-spring-boot-starter 来配置、发送和消费 RocketMQ 消息

    作者 | 辽天 来源 | 阿里巴巴云原生公众号 导读:本文将 rocktmq-spring-boot 的设计实现做一个简单的介绍,读者可以通过本文了解将 RocketMQ Client 端集成为 sp ...

  9. kafka 脚本发送_NWPC消息平台:在ecFlow系统中发送产品事件消息

    本文属于介绍 NWPC 消息平台 系列文章. 本文介绍如何在基于 ecFlow 构建的数值预报业务系统中发送 NWPC 消息平台的 产品事件消息. 介绍 数值预报业务系统产品制作一般分为三个步骤: 监 ...

最新文章

  1. 如何选择视觉CV光源颜色
  2. android studio 加固和签名
  3. 程序员自学到底有没有用?网友们吵翻了...
  4. 初始化配置 libevent
  5. Centos-移动文件或目录-mv
  6. 2011阿里巴巴集团实习生招聘笔试题 CC++
  7. socket通信(1)概述
  8. Farseer.Net ORM开源框架 V0.x 教程目录
  9. SAP中国招聘内部顾问,工作职责是做客户项目,ABAP开发
  10. OpenJudge/Poj 1226 Substrings
  11. 为什么大公司一定要使用DevOps
  12. android动态更新配置文件,Android如何动态修改Manifest文件
  13. 基于ARP协议获取局域网内主机MAC地址
  14. java 根据模板,导出word并提供下载
  15. IEEE Access投稿流程经验分享
  16. 6.26 mongoDB是无法find未初始的值的,mongoose的Schema需具象化及个人商品Schema设计,租赁网登录态初次尝试,vue对对象未赋初值的监听,forEach等api深拷贝问题
  17. 毕业论文知网查重之应对办法
  18. Unity3D项目升级URP
  19. Android调用miui给权限,Android 11+MIUI12,APP任意获取用户隐私的路子被封死
  20. python如何输入一个矩阵_python怎么输入矩阵

热门文章

  1. 校园网自动登陆(河南科技学院)
  2. SEO将要苏醒,请乘坐你网络营销的班车
  3. 基础:MVC三层架构
  4. 易通眼镜店配镜记录档案管理软件 v4.04 绿色
  5. 王者荣耀服务器维护中啥意思,王者荣耀1月2日更新维护公告 王者荣耀1月2日更新了什么...
  6. 开发板的基础知识与分类
  7. 超详细轮播图,让你彻底明白轮播图!
  8. 军犬舆情:全流程服务打造舆情管理闭环
  9. konka电视怎么修改服务器,康佳网络电视如何升级 简单五步就搞定
  10. 思科交换机:vtp协议