本文讲述一下如何自定义spring kafka的consumer线程池

KafkaMessageListenerContainer

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

protected void doStart() {if (isRunning()) {return;}ContainerProperties containerProperties = getContainerProperties();if (!this.consumerFactory.isAutoCommit()) {AckMode ackMode = containerProperties.getAckMode();if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");}if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))&& containerProperties.getAckTime() == 0) {containerProperties.setAckTime(5000);}}Object messageListener = containerProperties.getMessageListener();Assert.state(messageListener != null, "A MessageListener is required");if (messageListener instanceof GenericAcknowledgingMessageListener) {this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener;}else if (messageListener instanceof GenericMessageListener) {this.listener = (GenericMessageListener<?>) messageListener;}else {throw new IllegalStateException("messageListener must be 'MessageListener' "+ "or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName());}if (containerProperties.getConsumerTaskExecutor() == null) {SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");containerProperties.setConsumerTaskExecutor(consumerExecutor);}if (containerProperties.getListenerTaskExecutor() == null) {SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-L-");containerProperties.setListenerTaskExecutor(listenerExecutor);}this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener);setRunning(true);this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);}

这里涉及到了两个线程池,一个是ConsumerTaskExecutor,一个是ListenerTaskExecutor,都在containerProperties里头配置
如果没有默认配置,则分别创建带"-C-"和"-L-"的SimpleAsyncTaskExecutor
ConsumerTaskExecutor用来去poll kafka消息
ListenerTaskExecutor用来调用@KafkaListener标注的方法

自定义

自定义executor,将其托管给spring容器的好处就是可以跟随容器的生命周期,在容器销毁之前优雅关闭线程池

扩展ConcurrentKafkaListenerContainerFactory

可以重写spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/ConcurrentKafkaListenerContainerFactory.java的initializeContainer方法,然后进行设置

public class CustomConcurrentKafkaListenerContainerFactory<K,V> extends ConcurrentKafkaListenerContainerFactory<K,V> {/*** The executor for threads that poll the consumer.*/private AsyncListenableTaskExecutor consumerTaskExecutor;/*** The executor for threads that invoke the listener.*/private AsyncListenableTaskExecutor listenerTaskExecutor;public CustomConcurrentKafkaListenerContainerFactory(AsyncListenableTaskExecutor consumerTaskExecutor, AsyncListenableTaskExecutor listenerTaskExecutor) {this.consumerTaskExecutor = consumerTaskExecutor;this.listenerTaskExecutor = listenerTaskExecutor;}@Overrideprotected void initializeContainer(ConcurrentMessageListenerContainer<K, V> instance) {super.initializeContainer(instance);instance.getContainerProperties().setConsumerTaskExecutor(consumerTaskExecutor);instance.getContainerProperties().setListenerTaskExecutor(listenerTaskExecutor);}
}

设置

应用自定义kafkaListenerContainerFactory,替换为自己扩展的ConcurrentKafkaListenerContainerFactory即可。

@Configuration
@AutoConfigureBefore(KafkaAutoConfiguration.class)
public class KafkaExecutorConfig {@Bean(name = "consumerTaskExecutor")public ThreadPoolTaskExecutor consumerTaskExecutor(){ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());executor.setMaxPoolSize(5);executor.setQueueCapacity(100);executor.setThreadNamePrefix("my-C-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setWaitForTasksToCompleteOnShutdown(true);executor.initialize();return executor;}@Bean(name = "listenerTaskExecutor")public ThreadPoolTaskExecutor listenerTaskExecutor(){ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setQueueCapacity(100);executor.setThreadNamePrefix("my-L-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setWaitForTasksToCompleteOnShutdown(true);executor.initialize();return executor;}@Bean("kafkaListenerContainerFactory")public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer,ConsumerFactory<Object, Object> kafkaConsumerFactory) {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new CustomConcurrentKafkaListenerContainerFactory<Object, Object>(consumerTaskExecutor(),listenerTaskExecutor());configurer.configure(factory, kafkaConsumerFactory);return factory;}
}

这样就大功告成了

自定义spring kafka consumer 线程池相关推荐

  1. 你也被Spring的这个“线程池”坑过吗?

    前两天一个晚上,正当我沉浸在敲代码的快乐中时,听到隔壁的同事传来一声不可置信的惊呼:线程池提交命令怎么可能会执行一秒多? 线程池提交方法执行一秒多?那不对啊,线程池提交应该是一个很快的操作,一般情况下 ...

  2. Spring 定时器结合线程池

    需求:Spring 定时器结合线程池处理工单 a.定时扫库查出一定数量的需要处理的工单 b.开启线程处理查出的工单 1,创建处理工单的task @Component("AppWorkOrde ...

  3. spring提供的线程池

    SPRING中的线程池ThreadPoolTaskExecutor 分类: JAVA Spring2013-07-12 10:36 14896人阅读 评论(9) 收藏 举报 Spring线程池多线程 ...

  4. Spring Boot系列二 Spring @Async异步线程池用法总结

    转载 自 https://blog.csdn.net/hry2015/article/details/67640534 1. TaskExecutor Spring异步线程池的接口类,其实质是java ...

  5. 自定义简单版本python线程池

    python未提供线程池模块,在python3上用threading和queue模块自定义简单线程池,代码如下: 1 #用threading queue 做线程池 2 import queue 3 i ...

  6. Spring Boot 配置线程池使用多线程插入数据

    点击上方 好好学java ,选择 星标 公众号 重磅资讯.干货,第一时间送达 今日推荐:牛人 20000 字的 Spring Cloud 总结,太硬核了~个人原创+1博客:点击前往,查看更多 来源:h ...

  7. spring async 默认线程池_springboot:异步调用@Async

    在后端开发中经常遇到一些耗时或者第三方系统调用的情况,我们知道Java程序一般的执行流程是顺序执行(不考虑多线程并发的情况),但是顺序执行的效率肯定是无法达到我们的预期的,这时就期望可以并行执行,常规 ...

  8. Spring Boot笔记-线程池调度计划仅运行一次

    这里是有这样的一个需求,启动spring boot后用一个新线程,跑一次就可以了,首先是线程池申请和配置: @Configuration @EnableAsync public class Async ...

  9. Spring Thread Pool 线程池的应用

    Spring and Java Thread example 扫扫关注"茶爸爸"微信公众号 坚持最初的执着,从不曾有半点懈怠,为优秀而努力,为证明自己而活. Download it ...

最新文章

  1. linux路由介绍,Linux的路由表详细介绍
  2. 机器人也来玩“踢瓶盖挑战”了,你动他就动,靠脑电控制,路人也能玩丨MIT出品...
  3. 关于MonoBehaviour的单例通用规则
  4. 信创产业发展应不忘初心牢记使命
  5. C++应用过程中使用知识点
  6. 树形结构递归初始化(父节点,统计字段等)
  7. uos系统虚拟机_体验中兴深度联合推出的「UOS」统一操作系统
  8. 最近完成的一个可伸缩性的WEB开发框架
  9. HDU1202 The calculation of GPA【水题】
  10. linux python虚拟环境 error_阿里云Linux系统配置python3-虚拟环境-mysql --踩坑实践(Ubuntu系统转centOS7)...
  11. Unity的序列化机制探索
  12. 哀其不幸的墨西哥人工智能
  13. java启动临时文件_File.createTempFile创建临时文件的示例详解
  14. 关于#1-D:last line of file ends without a newline警告的解决办法(stm32)
  15. 真·电子二胡 (ESP32配合库乐队APP实现的电子制作)
  16. 广西国家级自然保护区功能区划图(展示)
  17. 记一次简单高效的吸血鬼算法
  18. ubuntu使用ntfslabel 修改磁盘分区卷标
  19. 用递归及非递归方式实现树状结构的遍历函数
  20. VGGNet网络结构

热门文章

  1. 请编写出一个html页面 令其输出,javaweb程序设计案例教程_课后习题1.pdf
  2. java Executor实例_Executor框架+实例
  3. python 遍历文件夹 提取文件内信息 存为新文件名_python获取遍历文件名称并分别保存为XLSX和CSV格式...
  4. 计算机开启时提示键盘错误,电脑开机出现异常提示keyboard not found的故障原因及解决方法_电脑故障...
  5. git rollback代码都没了_Git使用总结
  6. linux vim编辑器主要作用,Linux-vim编辑器
  7. linux中daemonize用法,daemonize Unix系统后台守护进程管理软件
  8. 四路服务器芯片组,四路服务器主板配置
  9. mysql数据库的字符集设置_mysql数据库字符集设置
  10. 2020年人工神经网络第二次作业-参考答案第三题