Kafka 0.9版本开始推出了Java版本的consumer,优化了coordinator的设计以及摆脱了对zookeeper的依赖。社区最近也在探讨正式用这套consumer API替换Scala版本的consumer的计划。鉴于目前这方面的资料并不是很多,本文将尝试给出一个利用KafkaConsumer编写的多线程消费者实例,希望对大家有所帮助。

这套API最重要的入口就是KafkaConsumer(o.a.k.clients.consumer.KafkaConsumer),普通的单线程使用方法官网API已有介绍,这里不再赘述了。因此,我们直奔主题——讨论一下如何创建多线程的方式来使用KafkaConsumer。KafkaConsumer和KafkaProducer不同,后者是线程安全的,因此我们鼓励用户在多个线程中共享一个KafkaProducer实例,这样通常都要比每个线程维护一个KafkaProducer实例效率要高。但对于KafkaConsumer而言,它不是线程安全的,所以实现多线程时通常由两种实现方法:

1 每个线程维护一个KafkaConsumer

2  维护一个或多个KafkaConsumer,同时维护多个事件处理线程(worker thread)

当然,这种方法还可以有多个变种:比如每个worker线程有自己的处理队列。consumer根据某种规则或逻辑将消息放入不同的队列。不过总体思想还是相同的,故这里不做过多展开讨论了。

  下表总结了两种方法的优缺点:

  优点 缺点
方法1(每个线程维护一个KafkaConsumer) 方便实现
速度较快,因为不需要任何线程间交互
易于维护分区内的消息顺序
更多的TCP连接开销(每个线程都要维护若干个TCP连接)
consumer数受限于topic分区数,扩展性差
频繁请求导致吞吐量下降
线程自己处理消费到的消息可能会导致超时,从而造成rebalance
方法2 (单个(或多个)consumer,多个worker线程) 可独立扩展consumer数和worker数,伸缩性好

实现麻烦

通常难于维护分区内的消息顺序

处理链路变长,导致难以保证提交位移的语义正确性

下面我们分别实现这两种方法。需要指出的是,下面的代码都是最基本的实现,并没有考虑很多编程细节,比如如何处理错误等。

方法1

ConsumerRunnable类

 1 import org.apache.kafka.clients.consumer.ConsumerRecord;2 import org.apache.kafka.clients.consumer.ConsumerRecords;3 import org.apache.kafka.clients.consumer.KafkaConsumer;4 5 import java.util.Arrays;6 import java.util.Properties;7 8 public class ConsumerRunnable implements Runnable {9
10     // 每个线程维护私有的KafkaConsumer实例
11     private final KafkaConsumer<String, String> consumer;
12
13     public ConsumerRunnable(String brokerList, String groupId, String topic) {
14         Properties props = new Properties();
15         props.put("bootstrap.servers", brokerList);
16         props.put("group.id", groupId);
17         props.put("enable.auto.commit", "true");        //本例使用自动提交位移
18         props.put("auto.commit.interval.ms", "1000");
19         props.put("session.timeout.ms", "30000");
20         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
21         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
22         this.consumer = new KafkaConsumer<>(props);
23         consumer.subscribe(Arrays.asList(topic));   // 本例使用分区副本自动分配策略
24     }
25
26     @Override
27     public void run() {
28         while (true) {
29             ConsumerRecords<String, String> records = consumer.poll(200);   // 本例使用200ms作为获取超时时间
30             for (ConsumerRecord<String, String> record : records) {
31                 // 这里面写处理消息的逻辑,本例中只是简单地打印消息
32                 System.out.println(Thread.currentThread().getName() + " consumed " + record.partition() +
33                         "th message with offset: " + record.offset());
34             }
35         }
36     }
37 }

ConsumerGroup类

 1 package com.my.kafka.test;2 3 import java.util.ArrayList;4 import java.util.List;5 6 public class ConsumerGroup {7 8     private List<ConsumerRunnable> consumers;9
10     public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
11         consumers = new ArrayList<>(consumerNum);
12         for (int i = 0; i < consumerNum; ++i) {
13             ConsumerRunnable consumerThread = new ConsumerRunnable(brokerList, groupId, topic);
14             consumers.add(consumerThread);
15         }
16     }
17
18     public void execute() {
19         for (ConsumerRunnable task : consumers) {
20             new Thread(task).start();
21         }
22     }
23 }

ConsumerMain类

 1 public class ConsumerMain {2 3     public static void main(String[] args) {4         String brokerList = "localhost:9092";5         String groupId = "testGroup1";6         String topic = "test-topic";7         int consumerNum = 3;8 9         ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
10         consumerGroup.execute();
11     }
12 }

方法2

Worker类

 1 import org.apache.kafka.clients.consumer.ConsumerRecord;2 3 public class Worker implements Runnable {4 5     private ConsumerRecord<String, String> consumerRecord;6 7     public Worker(ConsumerRecord record) {8         this.consumerRecord = record;9     }
10
11     @Override
12     public void run() {
13         // 这里写你的消息处理逻辑,本例中只是简单地打印消息
14         System.out.println(Thread.currentThread().getName() + " consumed " + consumerRecord.partition()
15             + "th message with offset: " + consumerRecord.offset());
16     }
17 }

ConsumerHandler类

 1 import org.apache.kafka.clients.consumer.ConsumerRecord;2 import org.apache.kafka.clients.consumer.ConsumerRecords;3 import org.apache.kafka.clients.consumer.KafkaConsumer;4 5 import java.util.Arrays;6 import java.util.Properties;7 import java.util.concurrent.ArrayBlockingQueue;8 import java.util.concurrent.ExecutorService;9 import java.util.concurrent.ThreadPoolExecutor;
10 import java.util.concurrent.TimeUnit;
11
12 public class ConsumerHandler {
13
14     // 本例中使用一个consumer将消息放入后端队列,你当然可以使用前一种方法中的多实例按照某张规则同时把消息放入后端队列
15     private final KafkaConsumer<String, String> consumer;
16     private ExecutorService executors;
17
18     public ConsumerHandler(String brokerList, String groupId, String topic) {
19         Properties props = new Properties();
20         props.put("bootstrap.servers", brokerList);
21         props.put("group.id", groupId);
22         props.put("enable.auto.commit", "true");
23         props.put("auto.commit.interval.ms", "1000");
24         props.put("session.timeout.ms", "30000");
25         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
26         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
27         consumer = new KafkaConsumer<>(props);
28         consumer.subscribe(Arrays.asList(topic));
29     }
30
31     public void execute(int workerNum) {
32         executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
33                 new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
34
35         while (true) {
36             ConsumerRecords<String, String> records = consumer.poll(200);
37             for (final ConsumerRecord record : records) {
38                 executors.submit(new Worker(record));
39             }
40         }
41     }
42
43     public void shutdown() {
44         if (consumer != null) {
45             consumer.close();
46         }
47         if (executors != null) {
48             executors.shutdown();
49         }
50         try {
51             if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
52                 System.out.println("Timeout.... Ignore for this case");
53             }
54         } catch (InterruptedException ignored) {
55             System.out.println("Other thread interrupted this shutdown, ignore for this case.");
56             Thread.currentThread().interrupt();
57         }
58     }
59
60 }

Main类

 1 public class Main {2 3     public static void main(String[] args) {4         String brokerList = "localhost:9092,localhost:9093,localhost:9094";5         String groupId = "group2";6         String topic = "test-topic";7         int workerNum = 5;8 9         ConsumerHandler consumers = new ConsumerHandler(brokerList, groupId, topic);
10         consumers.execute(workerNum);
11         try {
12             Thread.sleep(1000000);
13         } catch (InterruptedException ignored) {}
14         consumers.shutdown();
15     }
16 }

  总结一下,这两种方法或是模型都有各自的优缺点,在具体使用时需要根据自己实际的业务特点来选取对应的方法。就我个人而言,我比较推崇第二种方法以及背后的思想,即不要将很重的处理逻辑放入消费者的代码中,很多Kafka consumer使用者碰到的各种rebalance超时、coordinator重新选举、心跳无法维持等问题都来源于此。

在第二种用法中我使用的是自动提交的方式,省去了多线程提交位移的麻烦。很多人跑来问如果是手动提交应该怎么写?由于KafkaConsumer不是线程安全的,因此我们不能简单地在多个线程中直接调用consumer.commitSync来提交位移。本文将给出一个实际的例子来模拟多线程消费以及手动提交位移。

  本例中包含3个类:

  • ConsumerThreadHandler类:consumer多线程的管理类,用于创建线程池以及为每个线程分配任务。另外consumer位移的提交也在这个类中进行
  • ConsumerWorker类:本质上是一个Runnable,执行真正的消费逻辑并上报位移信息给ConsumerThreadHandler
  • Main类:测试主方法类

测试代码

ConsumerWorker类

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

package huxi.test.consumer.multithreaded;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;

import org.apache.kafka.common.TopicPartition;

import java.util.List;

import java.util.Map;

public class ConsumerWorker<K, V> implements Runnable {

    private final ConsumerRecords<K, V> records;

    private final Map<TopicPartition, OffsetAndMetadata> offsets;

    public ConsumerWorker(ConsumerRecords<K, V> record, Map<TopicPartition, OffsetAndMetadata> offsets) {

        this.records = record;

        this.offsets = offsets;

    }

    @Override

    public void run() {

        for (TopicPartition partition : records.partitions()) {

            List<ConsumerRecord<K, V>> partitionRecords = records.records(partition);

            for (ConsumerRecord<K, V> record : partitionRecords) {

                // 插入消息处理逻辑,本例只是打印消息

                System.out.println(String.format("topic=%s, partition=%d, offset=%d",

                        record.topic(), record.partition(), record.offset()));

            }

            // 上报位移信息

            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();

            synchronized (offsets) {

                if (!offsets.containsKey(partition)) {

                    offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));

                else {

                    long curr = offsets.get(partition).offset();

                    if (curr <= lastOffset + 1) {

                        offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));

                    }

                }

            }

        }

    }

}

ConsumerThreadHandler类

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

package huxi.test.consumer.multithreaded;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;

import org.apache.kafka.common.TopicPartition;

import org.apache.kafka.common.errors.WakeupException;

import java.util.Arrays;

import java.util.Collection;

import java.util.Collections;

import java.util.HashMap;

import java.util.Map;

import java.util.Properties;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

public class ConsumerThreadHandler<K, V> {

    private final KafkaConsumer<K, V> consumer;

    private ExecutorService executors;

    private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

    public ConsumerThreadHandler(String brokerList, String groupId, String topic) {

        Properties props = new Properties();

        props.put("bootstrap.servers", brokerList);

        props.put("group.id", groupId);

        props.put("enable.auto.commit""false");

        props.put("auto.offset.reset""earliest");

        props.put("key.deserializer""org.apache.kafka.common.serialization.ByteArrayDeserializer");

        props.put("value.deserializer""org.apache.kafka.common.serialization.ByteArrayDeserializer");

        consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {

            @Override

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {

                consumer.commitSync(offsets);

            }

            @Override

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {

                offsets.clear();

            }

        });

    }

    /**

     * 消费主方法

     * @param threadNumber  线程池中线程数

     */

    public void consume(int threadNumber) {

        executors = new ThreadPoolExecutor(

                threadNumber,

                threadNumber,

                0L,

                TimeUnit.MILLISECONDS,

                new ArrayBlockingQueue<Runnable>(1000),

                new ThreadPoolExecutor.CallerRunsPolicy());

        try {

            while (true) {

                ConsumerRecords<K, V> records = consumer.poll(1000L);

                if (!records.isEmpty()) {

                    executors.submit(new ConsumerWorker<>(records, offsets));

                }

                commitOffsets();

            }

        catch (WakeupException e) {

            // swallow this exception

        finally {

            commitOffsets();

            consumer.close();

        }

    }

    private void commitOffsets() {

        // 尽量降低synchronized块对offsets锁定的时间

        Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;

        synchronized (offsets) {

            if (offsets.isEmpty()) {

                return;

            }

            unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets));

            offsets.clear();

        }

        consumer.commitSync(unmodfiedMap);

    }

    public void close() {

        consumer.wakeup();

        executors.shutdown();

    }

}

Main类

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

package huxi.test.consumer.multithreaded;

public class Main {

    public static void main(String[] args) {

        String brokerList = "localhost:9092";

        String topic = "test-topic";

        String groupID = "test-group";

        final ConsumerThreadHandler<byte[], byte[]> handler = new ConsumerThreadHandler<>(brokerList, groupID, topic);

        final int cpuCount = Runtime.getRuntime().availableProcessors();

        Runnable runnable = new Runnable() {

            @Override

            public void run() {

                handler.consume(cpuCount);

            }

        };

        new Thread(runnable).start();

        try {

            // 20秒后自动停止该测试程序

            Thread.sleep(20000L);

        catch (InterruptedException e) {

            // swallow this exception

        }

        System.out.println("Starting to close the consumer...");

        handler.close();

    }

}  

测试步骤

1. 首先创建一个测试topic: test-topic,10个分区,并使用kafka-producer-perf-test.sh脚本生产50万条消息

2. 运行Main,假定group.id设置为test-group

3. 新开一个终端,不断地运行以下脚本监控consumer group的消费进度

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group

测试结果

LAG列全部为0表示consumer group的位移提交正常。值得一提的是,各位可以通过控制consumer.poll的超时时间来控制ConsumerThreadHandler类提交位移的频率。

感谢QQ群友的提醒,这种方式有丢失数据的时间窗口——假设T1线程在t0时间消费分区0的位移=100的消息M1,而T2线程在t1时间消费分区0的位移=101的消息M2。现在假设t3时T2线程先完成处理,于是上报位移101给Handler,但此时T1线程尚未处理完成。t4时handler提交位移101,之后T1线程发生错误,抛出异常导致位移100的消息消费失败,但由于位移已经提交到101,故消息丢失~。

Kafka Consumer多线程实例相关推荐

  1. Kafka Consumer多线程消费

    概述 OrdinaryConsumer类 ConsumerWorker.java MultiThreadedConsumer.java MultiThreadedRebalanceListener.j ...

  2. Kafka consumer多线程下not safe for multi-threaded access问题

    Kafka consumer多线程下not safe for multi-threaded access问题 默认配置下kafka consumer的offset的commit是自动的,如需改成手动提 ...

  3. 学习笔记Kafka(六)—— Kafka Consumer API及开发实例

    一.Kafka Consumer API 1.1.Consumer 1.2.KafkaConsumer 1.3.ConsumerRecords 1.4.ConsumerRecord 1.5.Kafka ...

  4. 【kafka】浅谈Kafka的多线程消费的设计

    1.概述 转载:浅谈Kafka的多线程消费的设计 看原文去... 一.前言 跟RabbitMQ相比,Kafka的分区机制(Partition)使其支持对同一个"队列"分片并行读取, ...

  5. Kafka consumer

    Kafka consumer consumer概览 消费者组 消费者组定义:消费者使用一个消费者组名(即group.id)来标记自己,topic的每条消息都只会被发送到每个订阅它的消费者组的一个消费者 ...

  6. 读Kafka Consumer源码

    最近一直在关注阿里的一个开源项目:OpenMessaging OpenMessaging, which includes the establishment of industry guideline ...

  7. Kafka设计解析(四):Kafka Consumer解析--转

    原文地址:http://www.infoq.com/cn/articles/kafka-analysis-part-4?utm_source=infoq&utm_campaign=user_p ...

  8. Kafka设计解析(五): Kafka Consumer设计解析

    Kafka设计解析(五)- Kafka Consumer设计解析 大数据架构(郭俊_Jason) · 2015-09-18 08:24 点击上方 大数据架构   快速关注 Kafka Consumer ...

  9. Kafka设计解析(四):Kafka Consumer解析

    High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理.同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被 ...

最新文章

  1. java函数求方程,Commons Math学习笔记——函数方程求解
  2. 【Python】直接赋值、浅拷贝和深度拷贝解析
  3. saspython知乎_python学习笔记---linux/windows调用sas程序
  4. php中的preg_replace函数,PHP函数preg_replace()
  5. 有网友提问,关于本地XML转JSON的小工具
  6. RTMP协议中文翻译(首发)(转)
  7. 一步步编写操作系统 51 加载内核4
  8. 襄阳社区招聘计算机考什么时候,襄阳招聘网格员什么时候报名?
  9. 消息中间件学习总结(1)——RocketMQ之专访RocketMQ联合创始人:项目思路、技术细节和未来规划
  10. Python strip()与split()方法
  11. 帆软报表设计器菜单栏介绍之一
  12. Python 装饰器实例
  13. Leetcode-233-数字1的个数
  14. 查看linux操作系统版本信息
  15. Mtk Camera Hal到驱动的流程(2)
  16. 了解new一个对象具体过程
  17. Android 应用商店评分+APP分享
  18. 【金猿人物展】龙盈智达首席数据科学家王彦博:量子科技为AI大数据创新发展注入新动能...
  19. 【读论文】基于深度学习的铁路道岔转辙机故障诊断(2INTRO)
  20. android垃圾清理动画,[Android开发实战]金山清理大师(猎豹清理大师)一键加速快捷方式动画实现...

热门文章

  1. 25行代码AC——习题5-7 打印队列(Printer Queue,UVa 12100)——解题报告
  2. Python数据结构学习笔记——栈
  3. mysql 定时器停止_mysql事件【定时器】
  4. STP/RSTP/MSTP协议简介
  5. PHP 运动会,运动会成绩管理系统
  6. linux中断响应时间太慢_linux+arm系统学习与基础学习
  7. qpython numpy_Python-Numpy全面精简教程
  8. android studio turn off hyperv,Android Studio 无法运行模拟器
  9. php链接mysql验证用户登录,PHP连接mysql验证用户名是否存在
  10. linux性能并发 带机量,性能测试笔记(一):吞吐量与并发数