SpringBoot 集成 kafka,基于注解批量消费设置
网上关于SpringBoot 集成kafka的批量消费功能需要手动创建类(这篇文章不错:【弄nèng - Kafka】应用篇(三) —— Springboot整合Kafka(批量消费)_司马缸砸缸了-CSDN博客_kafka springboot批量消费https://blog.csdn.net/yy756127197/article/details/103895413),但是感觉对代码的改动比较大。因此花了点时间研究了一下如何简单的集成。
maven依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.4.RELEASE</version></parent><!-- 自动引入kafka版本2.2.5.RELEASE --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
配置文件
#============== kafka ===================
# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=127.0.0.1:9092
#spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
#=============== consumer =======================
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=mygroup
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
spring.kafka.consumer.auto-offset-reset=latest
# enable.auto.commit:true --> 设置自动提交offset
spring.kafka.consumer.enable-auto-commit=false
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.poll-timeout=1000
spring.kafka.listener.type=batch
spring.kafka.listener.concurrency=1
spring.kafka.consumer.max-poll-records=1000
最后几个批量设置的关键配置:
- spring.kafka.listener.poll-timeout=1000 #看了说明也没搞懂这个参数什么作用,无论设置大小对后续的测试没有任何影响,有知道的大神还望点拨下
- spring.kafka.listener.type=batch #指定监听的模式,好多文章里没有此项
- spring.kafka.listener.concurrency=1 #同时处理线程数,应设置与brocker数量一致,由于测试服务器没有多个brocker,因此不知道影响
- spring.kafka.consumer.max-poll-records=1000 #每批最大条数,默认500
消费者代码:仅观察接收到的集合大小
@KafkaListener(topics = "MY_TOPIC")public void myListener(List<ConsumerRecord<Integer, JSONObject>> list){System.out.print(list.size()+",");/* list.forEach( it -> {System.out.println(it.value());});*/}//参数可以使用泛型 List<ConsumerRecord<?, ?>>
生产者测试
public static void myTest(){Properties p = new Properties();p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);Random random = new Random();for(int i=0;i<20000;i++){JSONObject jsonObject = new JSONObject();jsonObject.put("productName", random.nextInt(10));jsonObject.put("couponCount", random.nextInt(50));jsonObject.put("siteId",random.nextInt(5));ProducerRecord<String, String> recordTopic5 = new ProducerRecord<String, String>("MY_TOPIC", jsonObject.toString());kafkaProducer.send(recordTopic5);/*try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}*/}kafkaProducer.close();}
测试结果
模拟生产者发消息时的不连续性,对消费者批量接受消息数量的影响
没有sleep时:
sleep=1时:
sleep=10时:
SpringBoot 集成 kafka,基于注解批量消费设置相关推荐
- springboot集成kafka消费手动启动停止
项目场景: 在月结,或者某些时候,我们需要停掉kafka所有的消费端,让其暂时停止消费,而后等月结完成,再从新对消费监听恢复,进行消费,此动作不需要重启服务,最后源码下载 解决分析 KafkaList ...
- SpringBoot集成kafka全面实战
本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下<大白话kafka架构原理>.<秒懂kafka HA(高可用)>两篇文章 ...
- 从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例
从现在开始学 Kafka:SpringBoot 集成 Kafka,生产者与消费者示例 前言 加依赖 生产者 加配置 生产者代码示例 消费者 加配置 消费者监听器示例 调用 关于 Serializer ...
- SpringBoot集成Kafka低版本和高版本
SpringBoot集成Kafka低版本和高版本 说明 地址 低版本SpringBoot集成Kafka代码 代码 kafka生产者配置 kafka消费者配置 发送消息给kafka的Controller ...
- springboot集成kafka及kafka web UI的使用
springboot集成kafka application.properties spring.kafka.bootstrap-servers=CentOSA:9092,CentOSB:9092,Ce ...
- SpringBoot集成Kafka
SpringBoot集成Kafka 知识索引 SpringBoot集成Kafka工程搭建 SpringBoot集成Kafka配置 SpringBoot集成Kafka生产消息 SpringBoot集成K ...
- SpringBoot集成Kafka消息队列
1.说明 Spring可以方便的集成使用 Kafka消息队列 , 只需要引入依赖包spring-kafka, 注意版本兼容问题, 本文详细介绍SpringBoot集成Kafka的方法, 以及生产者和消 ...
- Springboot 集成kafka
一.创建项目并导入pom依赖 <dependency><groupId>org.springframework.kafka</groupId><artifac ...
- 【无废话】SpringBoot集成Kafka消息队列
0.前言 本人整理收藏了22年多家公司面试知识点整理 ,以及各种Java核心知识点免费分享给大家,我认为对面试与学习来说是非常有用的,想要资料的话请点白嫖这份答案←戳我** 1.说明 Spring可以 ...
最新文章
- python cx oracle安装_python3.6的安装及cx_oracle安装
- AIphaCode 并不能取代程序员,而是开发者的工具
- Drug Discovery Today | 频繁命中化合物机制探究:PAINS规则的局限性
- 增加堆内存的大小 - 提防眼镜蛇效应
- 学习python用哪个app-Python和R:学哪个好?
- The Excel Connection Manager is not supported in the 64-bit version of SSIS, as no OLE DB provider i
- 20165333第一次课堂测试补漏
- Opencv一维直方图的绘制
- LeetCode 1653. 使字符串平衡的最少删除次数(DP)
- C# 将PDF文件转换为word格式
- CSDN博文下载器(JAVA)
- C++编写的常用软件(找找方向)
- 电脑的脉搏---时钟频率的来龙去脉
- ElasticSearch Index Settings
- linux无线8179,编译安装0bda 8179无线网卡
- 《信息学奥赛》1354:括弧匹配检验
- openssl加密base64编码
- 绝对路径和相对路径的优缺点
- PicoScope 4425A新能源车诊断套装(型号:PQ196)
- Martin Fowler关于微服务的原文翻译
热门文章
- pdf限制打印了怎么办
- kubernetes continually evict pod when node's inode exhausted
- 2017-12-22 日语编程语言抚子-第三版实现初探 1
- 股票波动率 python_如何统计投资品种波动率(python)?
- Entity层、DAO层、Service层、Controller层 先后顺序
- 3DMAX一键屋顶建模插件MW RoofGen使用教程
- 十大管理之项目人力资源管理知识点
- 水电表、工控、医用电子设备等超低功耗段码LCD液晶显示驱动IC-VKL144A/B,TSSOP48/QFN48,工作电流<10微安,可完全兼容替代PCF8551、MCP144、BU9792、9B92等
- 揭秘“0 day漏洞”:一款强大却脆弱的武器
- wow_32_64 汇编调试器/注入器/汇编指令书写神器 V1.7