• 概述
  • OrdinaryConsumer类
  • ConsumerWorker.java
  • MultiThreadedConsumer.java
  • MultiThreadedRebalanceListener.java
  • Test.java

上一篇《Kafka Consumer多线程实例续篇》修正了多线程提交位移的问题,但依然可能出现数据丢失的情况,原因在于多个线程可能拿到相同分区的数据,而消费的顺序会破坏消息本身在分区中的顺序,因而扰乱位移的提交。这次我使用KafkaConsumer的pause和resume方法来防止这种情形的发生。另外,本次我会编写一个测试类用于验证消费相同数量消息时,单线程消费速度要远逊于多线程消费。

回到顶部

概述

这一次,我编写了5个java文件,它们分别是:

  • OrdinaryConsumer.java:普通的单线程Consumer,用于后面进行性能测试对比用。
  • ConsumerWorker.java:多线程消息处理类,本质上就是一个Runnable。会被提交给线程池用于实际消息处理。
  • MultiThreadedConsumer.java:多线程Consumer主控类,用于将消息分配给不同的ConsumerWorker,并且管理位移的提交。
  • MultiThreadedRebalanceListener.java:为多线程Consumer服务的Rebalance监听器。
  • Test.java:用于测试单线程和多线程性能。

回到顶部

OrdinaryConsumer类

单线程的Consumer最简单,我首先给出它的代码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

package huxihx.mtc;

import org.apache.kafka.clients.consumer.Consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;

import java.util.Collections;

import java.util.Properties;

import java.util.concurrent.ThreadLocalRandom;

/**

 * 单线程Consumer

 */

public class OrdinaryConsumer {

    private final Consumer<String, String> consumer;

    private final int expectedCount; // 用于测试的消息数量

    public OrdinaryConsumer(String brokerId, String topic, String groupID, int expectedCount) {

        Properties props = new Properties();

        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerId);

        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID);

        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Collections.singletonList(topic));

        this.expectedCount = expectedCount;

    }

    public void run() {

        try {

            int alreadyConsumed = 0;

            while (alreadyConsumed < expectedCount) {

                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

                alreadyConsumed += records.count();

                records.forEach(this::handleRecord);

            }

        finally {

            consumer.close();

        }

    }

    private void handleRecord(ConsumerRecord<String, String> record) {

        try {

            // 模拟每条消息10毫秒处理

            Thread.sleep(ThreadLocalRandom.current().nextInt(10));

        catch (InterruptedException ignored) {

            Thread.currentThread().interrupt();

        }

        System.out.println(Thread.currentThread().getName() + " finished message processed. Record offset = " + record.offset());

    }

} 

代码很简单,没什么可说的。唯一要说的是Consumer会模拟10毫秒处理一条事件。后面多线程Consumer我们也会使用相同的标准。

回到顶部

ConsumerWorker.java

接下来是消息处理的Runnable类:ConsumerWorker。和上一篇相比,这次最大的不同在于每个Worker只处理相同分区下的消息,而不是向之前那样处理多个分区中的消息。这样做的好处在于一旦某个分区的消息分配给了这个Worker,我可以暂停这个分区的可消费状态,直到这个Worker全部处理完成。如果是混着多个分区的消息一起处理,实现这个就比较困难。ConsumerWorker代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

package huxihx.mtc;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.List;

import java.util.concurrent.CompletableFuture;

import java.util.concurrent.ThreadLocalRandom;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicLong;

import java.util.concurrent.locks.ReentrantLock;

public class ConsumerWorker<K, V> {

    private final List<ConsumerRecord<K, V>> recordsOfSamePartition;

    private volatile boolean started = false;

    private volatile boolean stopped = false;

    private final ReentrantLock lock = new ReentrantLock();

    private final long INVALID_COMMITTED_OFFSET = -1L;

    private final AtomicLong latestProcessedOffset = new AtomicLong(INVALID_COMMITTED_OFFSET);

    private final CompletableFuture<Long> future = new CompletableFuture<>();

    public ConsumerWorker(List<ConsumerRecord<K, V>> recordsOfSamePartition) {

        this.recordsOfSamePartition = recordsOfSamePartition;

    }

    public boolean run() {

        lock.lock();

        if (stopped)

            return false;

        started = true;

        lock.unlock();

        for (ConsumerRecord<K, V> record : recordsOfSamePartition) {

            if (stopped)

                break;

            handleRecord(record);

            if (latestProcessedOffset.get() < record.offset() + 1)

                latestProcessedOffset.set(record.offset() + 1);

        }

        return future.complete(latestProcessedOffset.get());

    }

    public long getLatestProcessedOffset() {

        return latestProcessedOffset.get();

    }

    private void handleRecord(ConsumerRecord<K, V> record) {

        try {

            Thread.sleep(ThreadLocalRandom.current().nextInt(10));

        catch (InterruptedException ignored) {

            Thread.currentThread().interrupt();

        }

        System.out.println(Thread.currentThread().getName() + " finished message processed. Record offset = " + record.offset());

    }

    public void close() {

        lock.lock();

        this.stopped = true;

        if (!started) {

            future.complete(latestProcessedOffset.get());

        }

        lock.unlock();

    }

    public boolean isFinished() {

        return future.isDone();

    }

    public long waitForCompletion(long timeout, TimeUnit timeUnit) {

        try {

            return future.get(timeout, timeUnit);

        catch (Exception e) {

            if (e instanceof InterruptedException)

                Thread.currentThread().interrupt();

            return INVALID_COMMITTED_OFFSET;

        }

    }

}

需要说明的地方有以下几点:

  • latestProcessedOffset:使用这个变量保存该Worker当前已消费的最新位移。
  • future:使用CompletableFuture来保存Worker要提交的位移。
  • Worker成功操作与否的标志就是看这个future是否将latestProcessedOffset值封装到结果中。
  • handleRecord和单线程Consumer中的一致,模拟10ms处理消息。

回到顶部

MultiThreadedConsumer.java

构建好了ConsumerWorker类之后,下面是编写多线程Consumer的主控类,该类循环执行:1、创建Consumer;2、读取订阅分区的消息;3、将消息按照不同分区进行归组分发给不同的线程;4、暂停这些分区的后续消费,同时等待Worker线程完成消息处理;5、提交这些分区的位移;6、恢复这些分区的消费。

以下代码是MultiThreadedConsumer类的完整代码:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

package huxihx.mtc;

import org.apache.kafka.clients.consumer.Consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;

import org.apache.kafka.common.TopicPartition;

import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;

import java.util.Collections;

import java.util.HashMap;

import java.util.HashSet;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import java.util.Set;

import java.util.concurrent.CompletableFuture;

import java.util.concurrent.Executor;

import java.util.concurrent.Executors;

public class MultiThreadedConsumer {

    private final Map<TopicPartition, ConsumerWorker<String, String>> outstandingWorkers = new HashMap<>();

    private final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();

    private long lastCommitTime = System.currentTimeMillis();

    private final Consumer<String, String> consumer;

    private final int DEFAULT_COMMIT_INTERVAL = 3000;

    private final Map<TopicPartition, Long> currentConsumedOffsets = new HashMap<>();

    private final long expectedCount;

    private final static Executor executor = Executors.newFixedThreadPool(

            Runtime.getRuntime().availableProcessors() * 10, r -> {

                Thread t = new Thread(r);

                t.setDaemon(true);

                return t;

            });

    public MultiThreadedConsumer(String brokerId, String topic, String groupID, long expectedCount) {

        Properties props = new Properties();

        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerId);

        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID);

        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Collections.singletonList(topic), new MultiThreadedRebalanceListener(consumer, outstandingWorkers, offsetsToCommit));

        this.expectedCount = expectedCount;

    }

    public void run() {

        try {

            while (true) {

                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));

                distributeRecords(records);

                checkOutstandingWorkers();

                commitOffsets();

                if (currentConsumedOffsets.values().stream().mapToLong(Long::longValue).sum() >= expectedCount) {

                    break;

                }

            }

        finally {

            consumer.close();

        }

    }

    /**

     * 对已完成消息处理并提交位移的分区执行resume操作

     */

    private void checkOutstandingWorkers() {

        Set<TopicPartition> completedPartitions = new HashSet<>();

        outstandingWorkers.forEach((tp, worker) -> {

            if (worker.isFinished()) {

                completedPartitions.add(tp);

            }

            long offset = worker.getLatestProcessedOffset();

            currentConsumedOffsets.put(tp, offset);

            if (offset > 0L) {

                offsetsToCommit.put(tp, new OffsetAndMetadata(offset));

            }

        });

        completedPartitions.forEach(outstandingWorkers::remove);

        consumer.resume(completedPartitions);

    }

    /**

     * 提交位移

     */

    private void commitOffsets() {

        try {

            long currentTime = System.currentTimeMillis();

            if (currentTime - lastCommitTime > DEFAULT_COMMIT_INTERVAL && !offsetsToCommit.isEmpty()) {

                consumer.commitSync(offsetsToCommit);

                offsetsToCommit.clear();

            }

            lastCommitTime = currentTime;

        catch (Exception e) {

            e.printStackTrace();

        }

    }

    /**

     * 将不同分区的消息交由不同的线程,同时暂停该分区消息消费

     * @param records

     */

    private void distributeRecords(ConsumerRecords<String, String> records) {

        if (records.isEmpty())

            return;

        Set<TopicPartition> pausedPartitions = new HashSet<>();

        records.partitions().forEach(tp -> {

            List<ConsumerRecord<String, String>> partitionedRecords = records.records(tp);

            pausedPartitions.add(tp);

            final ConsumerWorker<String, String> worker = new ConsumerWorker<>(partitionedRecords);

            CompletableFuture.supplyAsync(worker::run, executor);

            outstandingWorkers.put(tp, worker);

        });

        consumer.pause(pausedPartitions);

    }

}  

该类代码需要说明的地方包括:

  • executor:我创建了一个包含10倍CPU核数的线程数。具体线程数根据你自己的业务需求而定。如果你的事件处理逻辑是I/O密集型操作(比如写入外部系统),那么设置一个大一点的线程数通常都是有意义的。当然,我个人觉得最好不要超过Consumer分配到的总分区数。
  • 一定要将自动提交位移的参数设置为false。多线程Consumer的一个关键设计就是要手动提交位移。
  • Rebalance监听器设置为MultiThreadedRebalanceListener。这个类如何响应分区的回收与分配我们稍后讨论。
  • run方法的逻辑基本上遵循了上面提到的流程:消息获取 -> 分发 -> 检查消费进度 -> 提交位移
  • expectedCount:这是为了后面进行性能测试比对用到的总消息消费数。

回到顶部

MultiThreadedRebalanceListener.java

多线程Consumer在Rebalance操作开启后要小心处理。首先,主线程的poll方法与Worker线程处理消息是并行执行的。此时如果发生Rebalance,那么有些分区就会被分配给其他Consumer,但Worker线程依然可能正在处理这些分区。因此,就可能出现这样的场景:两个Consumer都会处理这些分区中的消息。这就破坏了消费者组的设计理念。针对这种情况,我们必须要确保要被回收的那些分区的处理必须首先完成,之后才能被重新分配。

总体而言,在要回收分区前,多线程Consumer必须完成:

  1. 停止对应的Worker线程
  2. 提交位移

当然,一旦分区被重新分配后,事情就变得简单了,我们调用resume恢复这些分区的可消费状态即可。如果这些分区之前就是可以消费的,那么调用resume方法就没有任何效果,总之是一个“无害”操作。MultiThreadedRebalanceListener类完整代码如下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

package huxihx.mtc;

import org.apache.kafka.clients.consumer.Consumer;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;

import org.apache.kafka.common.TopicPartition;

import java.util.Collection;

import java.util.HashMap;

import java.util.Map;

import java.util.concurrent.TimeUnit;

public class MultiThreadedRebalanceListener implements ConsumerRebalanceListener {

    private final Consumer<String, String> consumer;

    private final Map<TopicPartition, ConsumerWorker<String, String>> outstandingWorkers;

    private final Map<TopicPartition, OffsetAndMetadata> offsets;

    public MultiThreadedRebalanceListener(Consumer<String, String> consumer,

                                          Map<TopicPartition, ConsumerWorker<String, String>> outstandingWorkers,

                                          Map<TopicPartition, OffsetAndMetadata> offsets) {

        this.consumer = consumer;

        this.outstandingWorkers = outstandingWorkers;

        this.offsets = offsets;

    }

    @Override

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

        Map<TopicPartition, ConsumerWorker<String, String>> stoppedWorkers = new HashMap<>();

        for (TopicPartition tp : partitions) {

            ConsumerWorker<String, String> worker = outstandingWorkers.remove(tp);

            if (worker != null) {

                worker.close();

                stoppedWorkers.put(tp, worker);

            }

        }

        stoppedWorkers.forEach((tp, worker) -> {

            long offset = worker.waitForCompletion(1, TimeUnit.SECONDS);

            if (offset > 0L) {

                offsets.put(tp, new OffsetAndMetadata(offset));

            }

        });

        Map<TopicPartition, OffsetAndMetadata> revokedOffsets = new HashMap<>();

        partitions.forEach(tp -> {

            OffsetAndMetadata offset = offsets.remove(tp);

            if (offset != null) {

                revokedOffsets.put(tp, offset);

            }

        });

        try {

            consumer.commitSync(revokedOffsets);

        catch (Exception e) {

            e.printStackTrace();

        }

    }

    @Override

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

        consumer.resume(partitions);

    }

}

该类代码需要说明的地方包括:

  • 任何Rebalance监听器都要实现ConsumerRebalanceListener接口。
  • 该类定义了3个字段,分别保存Consumer实例、要停掉的Worker线程实例以及要提交的位移数据。
  • 主要的逻辑在onPartitionsRevoked方法中实现。第一步是停掉Worker线程;第二步是手动提交位移。

回到顶部

Test.java

说完了以上4个Java类之后,现在我们编写一个测试类来比较单线程Consumer和多线程Consumer的性能对比。首先我们创建一个topic,50个分区,单副本,并使用kafka-producer-perf-test工具创建5万条消息,每个分区1000条。之后编写如下代码分别测试两个Consumer的消费耗时:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

package huxihx.mtc;

public class Test {

    public static void main(String[] args) throws InterruptedException {

        int expectedCount = 50 900;

        String brokerId = "localhost:9092";

        String groupId = "test-group";

        String topic = "test";

        OrdinaryConsumer consumer = new OrdinaryConsumer(brokerId, topic, groupId + "-single", expectedCount);

        long start = System.currentTimeMillis();

        consumer.run();

        System.out.println("Single-threaded consumer costs " + (System.currentTimeMillis() - start));

        Thread.sleep(1L);

        MultiThreadedConsumer multiThreadedConsumer =

                new MultiThreadedConsumer(brokerId, topic, groupId + "-multi", expectedCount);

        start = System.currentTimeMillis();

        multiThreadedConsumer.run();

        System.out.println("Multi-threaded consumer costs " + (System.currentTimeMillis() - start));

    }

}

最后结果显示。单线程Consumer消费45000条消息共耗时232秒,而多线程Consumer耗时6.2秒,如下:

Single-threaded consumer costs 232336

Multi-threaded consumer costs 6246

显然,采用多线程Consumer的消费性能大约是单线程Consumer的37倍。当然实际的提升效果依具体环境而定。不过结论是肯定的,多线程Consumer在CPU核数很多且消息处理逻辑为I/O密集型操作的情形下会比单线程Consumer表现更好。

Kafka Consumer多线程消费相关推荐

  1. 【kafka】浅谈Kafka的多线程消费的设计

    1.概述 转载:浅谈Kafka的多线程消费的设计 看原文去... 一.前言 跟RabbitMQ相比,Kafka的分区机制(Partition)使其支持对同一个"队列"分片并行读取, ...

  2. Kafka consumer多线程下not safe for multi-threaded access问题

    Kafka consumer多线程下not safe for multi-threaded access问题 默认配置下kafka consumer的offset的commit是自动的,如需改成手动提 ...

  3. kafka consumer 停止消费topic

    现象 在kafka consumer (以 kafka1.0.0为例)消费 topic 时,常常会出现程序还在运行,但是已经不消费消息了(kafka producer正常生产消息),使用kafka命令 ...

  4. Kafka Consumer多线程实例

    Kafka 0.9版本开始推出了Java版本的consumer,优化了coordinator的设计以及摆脱了对zookeeper的依赖.社区最近也在探讨正式用这套consumer API替换Scala ...

  5. java kafka consumer不消费,报错marking the coordinator (id rack null) dead for group

    问题描述:在linux系统,通过 kafka 命令行客户端测试消费正常,但通过Java consumer客户端无法正常接收队列消息,启动后输出如下日志信息: 15:21:34.864 [concurr ...

  6. Flink Kafka consumer的消费策略配置

    val helloStream: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String]("hello" ...

  7. kafka Java客户端之 consumer API 多线程消费消息

    kafka consumer 线程设计 Kafka Java Consumer采用的是单线程的设计.其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程. 用户主线程,指的是启 ...

  8. kafka消费者(Consumer)端多线程消费的实现方案

    kafka消费者(Consumer)端多线程消费的实现方案 kafka Java consumer设计原理 设计原理 为什么用单线程设计 多线程方案: 方案一: 方案二: 两个方案的优缺点: kafk ...

  9. 正确处理kafka多线程消费的姿势

    最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息.通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步. 解耦. 削峰等几大好处, ...

最新文章

  1. hosts文件配置不生效的解决办法
  2. mesos资源动态分配测试
  3. abp vnext2.0核心组件之.Net Core默认DI组件切换到AutoFac源码解析
  4. go语言web开发 排坑指南
  5. [蓝桥杯]基础练习 十六进制转八进制
  6. FFmpeg的编解码(二)
  7. [渝粤教育] 南京航空航天大学 航空航天材料概论 参考 资料
  8. ET框架——demo与自定义登录
  9. 绿色到黄色到红色的颜色渐变
  10. spire.office for.net 的Crack
  11. python通过线程实现定时器timer的方法
  12. App开屏页如何设计?来看这五个常用的方法
  13. 剑指offer做题记录
  14. Microsoft 文本转语音应用
  15. Quartus中jtagserver找不到指定文件的解决方法
  16. 整理学习之深度迁移学习
  17. git clone时需要密码
  18. 计算机视觉论文-2021-07-12
  19. 算法导论课后题和思考题 第3章
  20. Grep命令常见用法

热门文章

  1. 单片机原理及其应用——单片机外部中断实验(八段数码管通过按键依次显示0~9数字)
  2. android adb 环境,Android安卓环境搭建及ADB常用命令
  3. python3android版_Android QPython3 简易 SL4A 服务:android.py
  4. 基于python的查重系统_答案在这!如何快速的通过论文查重检测?
  5. rust布料怎么弄_布料“难弄”,你需要从这六方面解决!
  6. 程序员吐槽_某程序员吐槽一程序员大佬竟然放弃百度offer,回老家进烟草公司!是不是脑子有坑?网友:你才脑子有坑!...
  7. openfire java集群_优化openfire服务器,达到单机20万,集群50万
  8. 谈谈对python 和其他语言的区别_谈谈Python和其他语言的区别
  9. python定义一个字典、存储雇员号和姓名_【一点资讯】python后端开发工程师考证试题...
  10. mac mysql 报错_mac os mysql 配置?报错-问答-阿里云开发者社区-阿里云