原文地址:https://blog.csdn.net/russle/article/details/81258590

Spring Boot 中使用@KafkaListener并发批量接收消息

2018年07月28日 10:53:37 russle 阅读数:6996

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/russle/article/details/81258590

kakfa是我们在项目开发中经常使用的消息中间件。由于它的写性能非常高,因此,经常会碰到Kafka消息队列拥堵的情况。遇到这种情况时,有时我们不能直接清理整个topic,因为还有别的服务正在使用该topic。因此只能额外启动一个相同名称的consumer-group来加快消息消费(经测试,如果该topic只有一个分区,实际上再启动一个新的消费者,没有作用)。

完整的代码在这里,欢迎加星号、fork。

官方文档在https://docs.spring.io/spring-kafka/reference/html/_reference.html

第一步,并发消费

先看代码,重点是这我们使用的是ConcurrentKafkaListenerContainerFactory并且设置了factory.setConcurrency(4); (我的topic有4个分区,为了加快消费将并发设置为4,也就是有4个KafkaMessageListenerContainer)

    @BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(4);factory.setBatchListener(true);factory.getContainerProperties().setPollTimeout(3000);return factory;}

第二步,批量消费

然后是批量消费。重点是factory.setBatchListener(true); 
以及 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); 
一个设启用批量消费,一个设置批量消费每次最多消费多少条消息记录。

重点说明一下,我们设置的ConsumerConfig.MAX_POLL_RECORDS_CONFIG是50,并不是说如果没有达到50条消息,我们就一直等待。官方的解释是”The maximum number of records returned in a single call to poll().”, 也就是50表示的是一次poll最多返回的记录数。

从启动日志中可以看到还有个 max.poll.interval.ms = 300000, 也就说每间隔max.poll.interval.ms我们就调用一次poll。每次poll最多返回50条记录。

max.poll.interval.ms官方解释是”The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. “;

    @BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(4);factory.setBatchListener(true);factory.getContainerProperties().setPollTimeout(3000);return factory;}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset());propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);return propsMap;}

启动日志截图 

关于max.poll.records和max.poll.interval.ms官方解释截图: 

第三步,分区消费

对于只有一个分区的topic,不需要分区消费,因为没有意义。下面的例子是针对有2个分区的情况(我的完整代码中有4个listenPartitionX方法,我的topic设置了4个分区),读者可以根据自己的情况进行调整。

public class MyListener {private static final String TPOIC = "topic02";@KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })public void listenPartition0(List<ConsumerRecord<?, ?>> records) {log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());log.info("Id0 records size " +  records.size());for (ConsumerRecord<?, ?> record : records) {Optional<?> kafkaMessage = Optional.ofNullable(record.value());log.info("Received: " + record);if (kafkaMessage.isPresent()) {Object message = record.value();String topic = record.topic();log.info("p0 Received message={}",  message);}}}@KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) })public void listenPartition1(List<ConsumerRecord<?, ?>> records) {log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());log.info("Id1 records size " +  records.size());for (ConsumerRecord<?, ?> record : records) {Optional<?> kafkaMessage = Optional.ofNullable(record.value());log.info("Received: " + record);if (kafkaMessage.isPresent()) {Object message = record.value();String topic = record.topic();log.info("p1 Received message={}",  message);}}
}

关于分区和消费者关系,后面会补充,先摘录如下: 
If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.

最后,总结,如果我们的topic有多个分区,经过以上步骤可以很好的加快消息消费。如果只有一个分区,因为已经有一个同名group id在消费了,新启动的一个基本上没有作用(本人测试结果)。

Spring Boot 中使用@KafkaListener并发批量接收消息(转载)相关推荐

  1. Spring Boot实战解决高并发数据入库: Redis 缓存+MySQL 批量入库

    前言 最近在做阅读类的业务,需要记录用户的PV,UV: 项目状况:前期尝试业务阶段: 特点: 快速实现(不需要做太重,满足初期推广运营即可) 快速投入市场去运营 收集用户的原始数据,三要素: 谁 在什 ...

  2. boot spring 接口接收数据_在 Spring Boot 中使用 Dataway 配置数据查询接口

    Dataway介绍 Dataway 是基于 DataQL 服务聚合能力,为应用提供的一个接口配置工具.使得使用者无需开发任何代码就配置一个满足需求的接口. 整个接口配置.测试.冒烟.发布.一站式都通过 ...

  3. java 消息服务框架_Java消息服务 在 Spring Boot 中的使用

    原标题:Java消息服务 在 Spring Boot 中的使用 当前环境 Mac OS 10.11.x docker 1.12.1 JDK 1.8 SpringBoot 1.5 前言 基于之前一篇&q ...

  4. Spring Boot中声明式数据库事务使用与理解

    JDBC的数据库事务 传统JDBC的数据库事务的一个示例如下代码所示,该示例仅为一个insertUser方法的数据库事务过程.可以看到,如果还存在很多其他的数据库事务需要,则需要编写很多类似于如下的代 ...

  5. Spring Boot 中使用@Async实现异步调用,加速任务执行!

    欢迎关注方志朋的博客,回复"666"获面试宝典 什么是"异步调用"?"异步调用"对应的是"同步调用",同步调用指程序按照 ...

  6. Spring Boot中使用PostgreSQL数据库

    在如今的关系型数据库中,有两个开源产品是你必须知道的.其中一个是MySQL,相信关注我的小伙伴们一定都不陌生,因为之前的Spring Boot关于关系型数据库的所有例子都是对MySQL来介绍的.而今天 ...

  7. Spring Boot中如何扩展XML请求和响应的支持

    在之前的所有Spring Boot教程中,我们都只提到和用到了针对HTML和JSON格式的请求与响应处理.那么对于XML格式的请求要如何快速的在Controller中包装成对象,以及如何以XML的格式 ...

  8. Spring Boot中使用Flyway来管理数据库版本

    久违了的Spring Boot系列,今天抽空更新一篇.之前写过很多篇关于数据访问的文章了,比如下面这些: 使用JdbcTemplate 使用Spring-data-jpa简化数据访问层(推荐) 多数据 ...

  9. Spring Boot中使用RabbitMQ

    很久没有写Spring Boot的内容了,正好最近在写Spring Cloud Bus的内容,因为内容会有一些相关性,所以先补一篇关于AMQP的整合. Message Broker与AMQP简介 Me ...

  10. Spring Boot中使用Swagger2构建RESTful APIs

    关于 Swagger Swagger能成为最受欢迎的REST APIs文档生成工具之一,有以下几个原因: Swagger 可以生成一个具有互动性的API控制台,开发者可以用来快速学习和尝试API. S ...

最新文章

  1. VC++6.0如何删除文件
  2. 【转载】JDBC连接各种数据库的字符串
  3. 独家 | 数据科学家的必备读物:从零开始用 Python 构建循环神经网络(附代码)...
  4. pandas模块学习笔记2--基本功能
  5. linux sh文件case,Shell脚本case语句简明教程
  6. 如何修改容器的一些参数
  7. linux tomcat apr安装,Linux下Tomcat安装并开启APR模式-Go语言中文社区
  8. python mongodb查询_Python MongoDB 查找
  9. 这65条工作和成长建议,你将受用终生!
  10. 15 —— npm —— package.json 与 package-lock.json 的作用
  11. 不能右键新建html文件,win10无法新建文件夹怎么办 win10右键新建菜单设置方法图文教程...
  12. vue3 src/main.js文件配置
  13. 语音社交app源码中音频混音的实现步骤
  14. 吴恩达深度学习课程第二章第一周编程作业
  15. win7打开或关闭windows功能 提示“出现错误,并非所有的功能被更改”,管理员权限惹的祸...
  16. php生成五星红旗,php基于GD库画五星红旗的方法_PHP
  17. 未来计算机教师职业愿景展望,教师愿景与职业规划
  18. Redhat克隆及其配置
  19. java反射的优缺点_Java反射机制的优缺点
  20. php下载地址转换工具,PHP迅雷、快车、旋风下载专用链转换代码

热门文章

  1. Writing udev rules
  2. shell读取用户输入
  3. 联通积分兑换的Q币怎么兑换到QQ上
  4. android实现应用程序仅仅有在第一次启动时显示引导界面
  5. 在用户控件中动态添加控件及事件
  6. CPU负载均衡之EAS
  7. ffmpeg超详细综合教程(二)——为直播流添加滤镜
  8. windows客户端连接linux服务器上的postmaster
  9. 在cmd下安装Scrapy怎么解决方案python3
  10. html自动get提交,html跳转,获取get提交参数