Kafka Consumer多线程消费
- 概述
- OrdinaryConsumer类
- ConsumerWorker.java
- MultiThreadedConsumer.java
- MultiThreadedRebalanceListener.java
- Test.java
上一篇《Kafka Consumer多线程实例续篇》修正了多线程提交位移的问题,但依然可能出现数据丢失的情况,原因在于多个线程可能拿到相同分区的数据,而消费的顺序会破坏消息本身在分区中的顺序,因而扰乱位移的提交。这次我使用KafkaConsumer的pause和resume方法来防止这种情形的发生。另外,本次我会编写一个测试类用于验证消费相同数量消息时,单线程消费速度要远逊于多线程消费。
回到顶部
概述
这一次,我编写了5个java文件,它们分别是:
- OrdinaryConsumer.java:普通的单线程Consumer,用于后面进行性能测试对比用。
- ConsumerWorker.java:多线程消息处理类,本质上就是一个Runnable。会被提交给线程池用于实际消息处理。
- MultiThreadedConsumer.java:多线程Consumer主控类,用于将消息分配给不同的ConsumerWorker,并且管理位移的提交。
- MultiThreadedRebalanceListener.java:为多线程Consumer服务的Rebalance监听器。
- Test.java:用于测试单线程和多线程性能。
回到顶部
OrdinaryConsumer类
单线程的Consumer最简单,我首先给出它的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
|
代码很简单,没什么可说的。唯一要说的是Consumer会模拟10毫秒处理一条事件。后面多线程Consumer我们也会使用相同的标准。
回到顶部
ConsumerWorker.java
接下来是消息处理的Runnable类:ConsumerWorker。和上一篇相比,这次最大的不同在于每个Worker只处理相同分区下的消息,而不是向之前那样处理多个分区中的消息。这样做的好处在于一旦某个分区的消息分配给了这个Worker,我可以暂停这个分区的可消费状态,直到这个Worker全部处理完成。如果是混着多个分区的消息一起处理,实现这个就比较困难。ConsumerWorker代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
|
需要说明的地方有以下几点:
- latestProcessedOffset:使用这个变量保存该Worker当前已消费的最新位移。
- future:使用CompletableFuture来保存Worker要提交的位移。
- Worker成功操作与否的标志就是看这个future是否将latestProcessedOffset值封装到结果中。
- handleRecord和单线程Consumer中的一致,模拟10ms处理消息。
回到顶部
MultiThreadedConsumer.java
构建好了ConsumerWorker类之后,下面是编写多线程Consumer的主控类,该类循环执行:1、创建Consumer;2、读取订阅分区的消息;3、将消息按照不同分区进行归组分发给不同的线程;4、暂停这些分区的后续消费,同时等待Worker线程完成消息处理;5、提交这些分区的位移;6、恢复这些分区的消费。
以下代码是MultiThreadedConsumer类的完整代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
|
该类代码需要说明的地方包括:
- executor:我创建了一个包含10倍CPU核数的线程数。具体线程数根据你自己的业务需求而定。如果你的事件处理逻辑是I/O密集型操作(比如写入外部系统),那么设置一个大一点的线程数通常都是有意义的。当然,我个人觉得最好不要超过Consumer分配到的总分区数。
- 一定要将自动提交位移的参数设置为false。多线程Consumer的一个关键设计就是要手动提交位移。
- Rebalance监听器设置为MultiThreadedRebalanceListener。这个类如何响应分区的回收与分配我们稍后讨论。
- run方法的逻辑基本上遵循了上面提到的流程:消息获取 -> 分发 -> 检查消费进度 -> 提交位移
- expectedCount:这是为了后面进行性能测试比对用到的总消息消费数。
回到顶部
MultiThreadedRebalanceListener.java
多线程Consumer在Rebalance操作开启后要小心处理。首先,主线程的poll方法与Worker线程处理消息是并行执行的。此时如果发生Rebalance,那么有些分区就会被分配给其他Consumer,但Worker线程依然可能正在处理这些分区。因此,就可能出现这样的场景:两个Consumer都会处理这些分区中的消息。这就破坏了消费者组的设计理念。针对这种情况,我们必须要确保要被回收的那些分区的处理必须首先完成,之后才能被重新分配。
总体而言,在要回收分区前,多线程Consumer必须完成:
- 停止对应的Worker线程
- 提交位移
当然,一旦分区被重新分配后,事情就变得简单了,我们调用resume恢复这些分区的可消费状态即可。如果这些分区之前就是可以消费的,那么调用resume方法就没有任何效果,总之是一个“无害”操作。MultiThreadedRebalanceListener类完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
|
该类代码需要说明的地方包括:
- 任何Rebalance监听器都要实现ConsumerRebalanceListener接口。
- 该类定义了3个字段,分别保存Consumer实例、要停掉的Worker线程实例以及要提交的位移数据。
- 主要的逻辑在onPartitionsRevoked方法中实现。第一步是停掉Worker线程;第二步是手动提交位移。
回到顶部
Test.java
说完了以上4个Java类之后,现在我们编写一个测试类来比较单线程Consumer和多线程Consumer的性能对比。首先我们创建一个topic,50个分区,单副本,并使用kafka-producer-perf-test工具创建5万条消息,每个分区1000条。之后编写如下代码分别测试两个Consumer的消费耗时:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
|
最后结果显示。单线程Consumer消费45000条消息共耗时232秒,而多线程Consumer耗时6.2秒,如下:
Single-threaded consumer costs 232336
Multi-threaded consumer costs 6246
显然,采用多线程Consumer的消费性能大约是单线程Consumer的37倍。当然实际的提升效果依具体环境而定。不过结论是肯定的,多线程Consumer在CPU核数很多且消息处理逻辑为I/O密集型操作的情形下会比单线程Consumer表现更好。
Kafka Consumer多线程消费相关推荐
- 【kafka】浅谈Kafka的多线程消费的设计
1.概述 转载:浅谈Kafka的多线程消费的设计 看原文去... 一.前言 跟RabbitMQ相比,Kafka的分区机制(Partition)使其支持对同一个"队列"分片并行读取, ...
- Kafka consumer多线程下not safe for multi-threaded access问题
Kafka consumer多线程下not safe for multi-threaded access问题 默认配置下kafka consumer的offset的commit是自动的,如需改成手动提 ...
- kafka consumer 停止消费topic
现象 在kafka consumer (以 kafka1.0.0为例)消费 topic 时,常常会出现程序还在运行,但是已经不消费消息了(kafka producer正常生产消息),使用kafka命令 ...
- Kafka Consumer多线程实例
Kafka 0.9版本开始推出了Java版本的consumer,优化了coordinator的设计以及摆脱了对zookeeper的依赖.社区最近也在探讨正式用这套consumer API替换Scala ...
- java kafka consumer不消费,报错marking the coordinator (id rack null) dead for group
问题描述:在linux系统,通过 kafka 命令行客户端测试消费正常,但通过Java consumer客户端无法正常接收队列消息,启动后输出如下日志信息: 15:21:34.864 [concurr ...
- Flink Kafka consumer的消费策略配置
val helloStream: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String]("hello" ...
- kafka Java客户端之 consumer API 多线程消费消息
kafka consumer 线程设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程. 用户主线程,指的是启 ...
- kafka消费者(Consumer)端多线程消费的实现方案
kafka消费者(Consumer)端多线程消费的实现方案 kafka Java consumer设计原理 设计原理 为什么用单线程设计 多线程方案: 方案一: 方案二: 两个方案的优缺点: kafk ...
- 正确处理kafka多线程消费的姿势
最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息.通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步. 解耦. 削峰等几大好处, ...
最新文章
- hosts文件配置不生效的解决办法
- mesos资源动态分配测试
- abp vnext2.0核心组件之.Net Core默认DI组件切换到AutoFac源码解析
- go语言web开发 排坑指南
- [蓝桥杯]基础练习 十六进制转八进制
- FFmpeg的编解码(二)
- [渝粤教育] 南京航空航天大学 航空航天材料概论 参考 资料
- ET框架——demo与自定义登录
- 绿色到黄色到红色的颜色渐变
- spire.office for.net 的Crack
- python通过线程实现定时器timer的方法
- App开屏页如何设计?来看这五个常用的方法
- 剑指offer做题记录
- Microsoft 文本转语音应用
- Quartus中jtagserver找不到指定文件的解决方法
- 整理学习之深度迁移学习
- git clone时需要密码
- 计算机视觉论文-2021-07-12
- 算法导论课后题和思考题 第3章
- Grep命令常见用法
热门文章
- 单片机原理及其应用——单片机外部中断实验(八段数码管通过按键依次显示0~9数字)
- android adb 环境,Android安卓环境搭建及ADB常用命令
- python3android版_Android QPython3 简易 SL4A 服务:android.py
- 基于python的查重系统_答案在这!如何快速的通过论文查重检测?
- rust布料怎么弄_布料“难弄”,你需要从这六方面解决!
- 程序员吐槽_某程序员吐槽一程序员大佬竟然放弃百度offer,回老家进烟草公司!是不是脑子有坑?网友:你才脑子有坑!...
- openfire java集群_优化openfire服务器,达到单机20万,集群50万
- 谈谈对python 和其他语言的区别_谈谈Python和其他语言的区别
- python定义一个字典、存储雇员号和姓名_【一点资讯】python后端开发工程师考证试题...
- mac mysql 报错_mac os mysql 配置?报错-问答-阿里云开发者社区-阿里云