业务背景
项目有个需求需要统计IM聊天相关数据,原设计思想是在聊天产生时通过消息队列进行数据记录,利用rocketMQ实现。上线后发现由于内部IM活跃用户量级较大,MQ生产者生产消息过多,消费者实时消费会造成服务器CPU和硬盘读写压力,在改不了硬件配置的情况下,笔者通过了解到kafka批量消费的实现可解决这个问题,记录下该方案。

环境

kafka、Springboot、JDK8

依赖

使用的是Springboot v2.1.5.RELEASE版本,pom依赖如下:

        <!-- kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.6.RELEASE</version></dependency>

配置文件

生产者配置

核心配置是:

#设置是否批量消费,默认 single(单条),batch(批量)
spring.kafka.listener.type=batch
# 手动
spring.kafka.listener.ack-mode=manual_immediate
# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
spring.kafka.producer.properties.linger.ms=10000

单条消费和提交有时候会影响性能,spring-kafka提供了批量拉取数据和手动提交的策略

#设置是否批量消费,默认 single(单条),batch(批量)
spring.kafka.listener.type=batch
# 手动
spring.kafka.listener.ack-mode=manual_immediate
# 集群地址
spring.kafka.bootstrap-servers=192.168.2.135:9092
# 重试次数
spring.kafka.producer.retries=3
# 应答级别
# acks=0 把消息发送到kafka就认为发送成功
# acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
# acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
spring.kafka.producer.acks=all
# 批量处理的最大大小 单位 byte
spring.kafka.producer.batch-size=4096
# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
spring.kafka.producer.buffer-memory=33554432
# 客户端ID
spring.kafka.producer.client-id=im-kafka
# Key 序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# Value 序列化类
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 消息压缩:none、lz4、gzip、snappy,默认为 none。
spring.kafka.producer.compression-type=gzip
# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
spring.kafka.producer.properties.linger.ms=1000
# KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms
spring.kafka.producer.properties.max.block.ms=6000

消费者配置

核心配置是:

kafka:listener:# 手动ack-mode: manual_immediate#设置是否批量消费,默认 single(单条),batch(批量)type: batch# 自动提交 offset 默认 trueenable-auto-commit: false# 批量消费最大数量max-poll-records: 100

在配置文件中关闭自动提交,开启手动提交和批量消费就可以批量消费了,但是最后需要手动提交offset

  kafka:listener:# 手动ack-mode: manual_immediate#设置是否批量消费,默认 single(单条),batch(批量)type: batch# 集群地址bootstrap-servers: 192.168.2.135:9092# 消费者配置consumer:# 默认消费者组group-id: imStatisticsConsumerGroup# 自动提交 offset 默认 trueenable-auto-commit: false# 自动提交的频率 单位 msauto-commit-interval: 1000# 批量消费最大数量max-poll-records: 100# Key 反序列化类key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# Value 反序列化类value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 当kafka中没有初始offset或offset超出范围时将自动重置offset# earliest:重置为分区中最小的offset# latest:重置为分区中最新的offset(消费分区中新产生的数据)# none:只要有一个分区不存在已提交的offset,就抛出异常auto-offset-reset: latestproperties:session:timeout:# session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作ms: 120000request:timeout:# 请求超时ms: 120000

生产者端代码

    public void sendToImStatistics(List<ImChatStatistics> statistics) {kafkaTemplate.send(KAFKA_IM_CHAT_STATISTICS, JsonUtils.toString(statistics));}

消费者端代码

    @KafkaListener(topics = {"imChatStatistics"}, groupId = "{imStatisticsConsumerGroup}")public void listen(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment) {try {if (CollectionUtils.isEmpty(consumerRecords)) {return;}LogUtils.info("KafkaImStatisticsListener 处理推送消息[data大小: {}]", consumerRecords.size());List<ImChatStatistics> totalList = new ArrayList<>();for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {List<ImChatStatistics> list = JSON.parseArray(consumerRecord.value(), ImChatStatistics.class);list.stream().forEach(item -> {item.setWeek(DateUtils.getWeek(item.getDate()));});totalList.addAll(list);}imChatStatisticsMapper.batchInsertOrUpdate(totalList);// 手动提交offsetacknowledgment.acknowledge();} catch (Exception e) {LogUtils.error("ImChartConsumer 消息消费失败 :" + e.getMessage(), e);}}

Kafka 批量消费相关推荐

  1. Kafka 批量消费消息

    改成批量消费消息有两个需要注意的地方: 1.配置文件修改 # 批量消费每次最多消费条目数 max-poll-records: 20 listener:# 关闭监听topic不存在的话项目启动报错mis ...

  2. python kafka offset自动提交_Spring-Kafka —— 实现批量消费和手动提交offset

    spring-kafka的官方文档介绍,可以知道自1.1版本之后, @KafkaListener开始支持批量消费,只需要设置batchListener参数为true 把application.yml中 ...

  3. SpringBoot 集成 kafka,基于注解批量消费设置

    网上关于SpringBoot 集成kafka的批量消费功能需要手动创建类(这篇文章不错:[弄nèng - Kafka]应用篇(三) -- Springboot整合Kafka(批量消费)_司马缸砸缸了- ...

  4. springboot整合kafka实现批量消费

    linux安装kafka:https://blog.csdn.net/qq_37936542/article/details/109453249 kafka版本:kafka_2.12-2.6.0.tg ...

  5. flume系列之:使用通配符批量消费kafka的Topic

    flume系列之:使用通配符批量消费kafka的Topic #指定kafka topic使用注释的这个 #kafka_topics: "optics-production-data" ...

  6. kafka 主动消费_SpringBoot2 整合Kafka组件,应用案例和流程详解

    本文源码:Git || Gitee 一.搭建Kafka环境 1.下载解压 -- 下载wget http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.1 ...

  7. Apache Kafka-消费端_批量消费消息的核心参数及功能实现

    文章目录 概述 参数设置 Code POM依赖 配置文件 生产者 消费者 单元测试 测试结果 源码地址 概述 kafka提供了一些参数可以用于设置在消费端,用于提高消费的速度. 参数设置 https: ...

  8. 正确处理kafka多线程消费的姿势

    最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息.通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步. 解耦. 削峰等几大好处, ...

  9. Kafka批量消费模式

    目录 批量消费全局配置 单独为消费者开启批量消费配置 Kafka默认的listener消费模式为single单消息模式,即一次消费一条消息. 批量消费可以一次性消费到多条消息,如果是顺序不敏感的业务, ...

最新文章

  1. spring-data-jpa Repository的基本知识
  2. 《Windows Vista for Developers》系列
  3. 听说你想去大厂看学姐,带你看看京东软件产品经理岗长啥样?
  4. 语音识别 | GMM-HMM、DNN-HMM等主流算法及前沿技术
  5. 第三次学JAVA再学不好就吃翔(part108)--带缓冲的字符流
  6. Android绘制(三):Path结合属性动画, 让图标动起来!
  7. CImage类的用法(转帖)
  8. 重新初始化_关窗,也有大学问!宝马车窗初始化设置步骤方法...
  9. java中文乱码decode_JAVA中文字符乱码解决详解
  10. 别再问我阿里面试流程了!!!P8 面试官 花了一个月整理了这份 4000 字的 面试流程
  11. 抖音上很火的 立方体相册和旋转时钟,基于人脸识别实现程序员的专属相册和专属时钟,包含15套相册模板和9套时钟风格,可以直接替换成自己的图片,部署生成自己的个性化专属相册
  12. POJ3658Matrix( 双重二分+负数+死循环)
  13. 基于python的数字图像处理--学习笔记(二)
  14. 小米手机显示无法连接服务器错误代码,来电转接出现连接问题或MMI码无效的解决方法...
  15. Python那些让我疑惑许久的代码--2
  16. 配置 Windows Server 2008 R2 DNS 服务器
  17. 了解一下,Android 10 Build系统
  18. MySQL数据库文件转化为Word表格做论文/报告
  19. python 读取合并单元格的数据_Python使用xlrd实现读取合并单元格
  20. java 大于当前日期_java判断某日期 是否超过今天

热门文章

  1. 光伏数据采集分析系统
  2. 安装:Microsoft Project 2010
  3. ImportError: _C.cpython-37m-x86_64-linux-gnu.so: undefined symbol:_ZN3c107Warning4warnENS_14SourceL
  4. mats显存测试软件linux环境,显卡检测工具Mats-显存检测软件Mats下载 2017 免费版|显存检测软件Mats 2017 免费版 - 爱学府软件园...
  5. 计算机专业就业率最低: 正规军干不过游击队收藏 面对当今的研究生教育——只有无奈我国教育中令人揪心的若干个不等式...
  6. 渡一教育公开课web前端开发JavaScript精英课学习笔记(一)前言
  7. Jenkins Maven打包Jar,部署远程服务器
  8. 路测里程超过350万英里之后,Waymo想在冬天进行「冰雪测试」
  9. 赛门铁克linux安装教程,SEP14在linux下安装失败……
  10. rust睡觉按键没反应_腐蚀Rust实用技巧大全 Rust新手上手指南