网上关于SpringBoot 集成kafka的批量消费功能需要手动创建类(这篇文章不错:【弄nèng - Kafka】应用篇(三) —— Springboot整合Kafka(批量消费)_司马缸砸缸了-CSDN博客_kafka springboot批量消费https://blog.csdn.net/yy756127197/article/details/103895413),但是感觉对代码的改动比较大。因此花了点时间研究了一下如何简单的集成。

maven依赖

    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.4.RELEASE</version></parent><!-- 自动引入kafka版本2.2.5.RELEASE --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

配置文件

#============== kafka ===================
# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=127.0.0.1:9092
#spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
#=============== consumer  =======================
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=mygroup
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
spring.kafka.consumer.auto-offset-reset=latest
# enable.auto.commit:true --> 设置自动提交offset
spring.kafka.consumer.enable-auto-commit=false
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.poll-timeout=1000
spring.kafka.listener.type=batch
spring.kafka.listener.concurrency=1
spring.kafka.consumer.max-poll-records=1000

最后几个批量设置的关键配置:

  • spring.kafka.listener.poll-timeout=1000  #看了说明也没搞懂这个参数什么作用,无论设置大小对后续的测试没有任何影响,有知道的大神还望点拨下
  • spring.kafka.listener.type=batch             #指定监听的模式,好多文章里没有此项
  • spring.kafka.listener.concurrency=1       #同时处理线程数,应设置与brocker数量一致,由于测试服务器没有多个brocker,因此不知道影响
  • spring.kafka.consumer.max-poll-records=1000 #每批最大条数,默认500

消费者代码:仅观察接收到的集合大小

    @KafkaListener(topics = "MY_TOPIC")public void myListener(List<ConsumerRecord<Integer, JSONObject>> list){System.out.print(list.size()+",");/* list.forEach( it -> {System.out.println(it.value());});*/}//参数可以使用泛型 List<ConsumerRecord<?, ?>>

生产者测试

public static void myTest(){Properties p = new Properties();p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);Random random = new Random();for(int i=0;i<20000;i++){JSONObject jsonObject = new JSONObject();jsonObject.put("productName", random.nextInt(10));jsonObject.put("couponCount", random.nextInt(50));jsonObject.put("siteId",random.nextInt(5));ProducerRecord<String, String> recordTopic5 =  new ProducerRecord<String, String>("MY_TOPIC", jsonObject.toString());kafkaProducer.send(recordTopic5);/*try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}*/}kafkaProducer.close();}

测试结果

模拟生产者发消息时的不连续性,对消费者批量接受消息数量的影响

没有sleep时:

sleep=1时:

sleep=10时:

SpringBoot 集成 kafka,基于注解批量消费设置相关推荐

  1. springboot集成kafka消费手动启动停止

    项目场景: 在月结,或者某些时候,我们需要停掉kafka所有的消费端,让其暂时停止消费,而后等月结完成,再从新对消费监听恢复,进行消费,此动作不需要重启服务,最后源码下载 解决分析 KafkaList ...

  2. SpringBoot集成kafka全面实战

    本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下<大白话kafka架构原理>.<秒懂kafka HA(高可用)>两篇文章 ...

  3. 从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例

    从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例 前言 加依赖 生产者 加配置 生产者代码示例 消费者 加配置 消费者监听器示例 调用 关于 Serializer ...

  4. SpringBoot集成Kafka低版本和高版本

    SpringBoot集成Kafka低版本和高版本 说明 地址 低版本SpringBoot集成Kafka代码 代码 kafka生产者配置 kafka消费者配置 发送消息给kafka的Controller ...

  5. springboot集成kafka及kafka web UI的使用

    springboot集成kafka application.properties spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,Ce ...

  6. SpringBoot集成Kafka

    SpringBoot集成Kafka 知识索引 SpringBoot集成Kafka工程搭建 SpringBoot集成Kafka配置 SpringBoot集成Kafka生产消息 SpringBoot集成K ...

  7. SpringBoot集成Kafka消息队列

    1.说明 Spring可以方便的集成使用 Kafka消息队列 , 只需要引入依赖包spring-kafka, 注意版本兼容问题, 本文详细介绍SpringBoot集成Kafka的方法, 以及生产者和消 ...

  8. Springboot 集成kafka

    一.创建项目并导入pom依赖 <dependency><groupId>org.springframework.kafka</groupId><artifac ...

  9. 【无废话】SpringBoot集成Kafka消息队列

    0.前言 本人整理收藏了22年多家公司面试知识点整理 ,以及各种Java核心知识点免费分享给大家,我认为对面试与学习来说是非常有用的,想要资料的话请点白嫖这份答案←戳我** 1.说明 Spring可以 ...

最新文章

  1. python cx oracle安装_python3.6的安装及cx_oracle安装
  2. AIphaCode 并不能取代程序员,而是开发者的工具
  3. Drug Discovery Today | 频繁命中化合物机制探究:PAINS规则的局限性
  4. 增加堆内存的大小 - 提防眼镜蛇效应
  5. 学习python用哪个app-Python和R:学哪个好?
  6. The Excel Connection Manager is not supported in the 64-bit version of SSIS, as no OLE DB provider i
  7. 20165333第一次课堂测试补漏
  8. Opencv一维直方图的绘制
  9. LeetCode 1653. 使字符串平衡的最少删除次数(DP)
  10. C# 将PDF文件转换为word格式
  11. CSDN博文下载器(JAVA)
  12. C++编写的常用软件(找找方向)
  13. 电脑的脉搏---时钟频率的来龙去脉
  14. ElasticSearch Index Settings
  15. linux无线8179,编译安装0bda 8179无线网卡
  16. 《信息学奥赛》1354:括弧匹配检验
  17. openssl加密base64编码
  18. 绝对路径和相对路径的优缺点
  19. PicoScope 4425A新能源车诊断套装(型号:PQ196)
  20. Martin Fowler关于微服务的原文翻译

热门文章

  1. pdf限制打印了怎么办
  2. kubernetes continually evict pod when node's inode exhausted
  3. 2017-12-22 日语编程语言抚子-第三版实现初探 1
  4. 股票波动率 python_如何统计投资品种波动率(python)?
  5. Entity层、DAO层、Service层、Controller层 先后顺序
  6. 3DMAX一键屋顶建模插件MW RoofGen使用教程
  7. 十大管理之项目人力资源管理知识点
  8. 水电表、工控、医用电子设备等超低功耗段码LCD液晶显示驱动IC-VKL144A/B,TSSOP48/QFN48,工作电流<10微安,可完全兼容替代PCF8551、MCP144、BU9792、9B92等
  9. 揭秘“0 day漏洞”:一款强大却脆弱的武器
  10. wow_32_64 汇编调试器/注入器/汇编指令书写神器 V1.7