作者 | 草捏子

来源 | 草捏子(ID:chaycao)

头图 |  CSDN 下载自视觉中国

这周我们学习下消费者,还是先从一个消费者的Hello World学起:

public class Consumer {public static void main(String[] args) {// 1. 配置参数Properties properties = new Properties();properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "group.demo");// 2. 根据参数创建KafkaConsumer实例(消费者)KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);// 3. 订阅主题consumer.subscribe(Collections.singletonList("topic-demo"));try {// 4. 轮循消费while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());}}} finally {// 5. 关闭消费者consumer.close();}}}

前两步和生产者类似,配置参数然后根据参数创建实例,区别在于消费者使用的是反序列化器,以及多了一个必填参数 group.id ,用于指定消费者所属的消费组。关于消费组的概念在《图解Kafka中的基本概念》中介绍过了,消费组使得消费者的消费能力可横向扩展,这次再介绍一个新的概念 “再均衡” ,其意思是将分区的所属权进行重新分配,发生于消费者中有新的消费者加入或者有消费者宕机的时候。我们先了解再均衡的概念,至于如何再均衡不在此深究。

我们继续看上面的代码,第3步,subscribe 订阅期望消费的主题,然后进入第4步,轮循调用 poll 方法从 Kafka 服务器拉取消息。给poll方法中传递了一个Duration 对象,指定 poll 方法的超时时长,即当缓存区中没有可消费数据时的阻塞时长,避免轮循过于频繁。 poll 方法返回的是一个 ConsumerRecords 对象,其内部对多个分区的 ConsumerRecored 进行了封装,其结构如下:

public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;// ...}

ConsumerRecord 则类似 ProducerRecord ,封装了消息的相关属性:

public class ConsumerRecord<K, V> {private final String topic;  // 主题private final int partition;  // 分区号private final long offset;  // 偏移量private final long timestamp;  // 时间戳private final TimestampType timestampType;  // 时间戳类型private final int serializedKeySize;  // key序列化后的大小private final int serializedValueSize;  // value序列化后的大小private final Headers headers;  // 消息头部private final K key;  // 键private final V value;  // 值private final Optional<Integer> leaderEpoch;  // leader的周期号

偏移量

相比 ProdercerRecord 的属性更多,其中重点讲下偏移量,偏移量是分区中一条消息的唯一标识。消费者在每次调用poll方法时,则是根据偏移量去分区拉取相应的消息。而当一台消费者宕机时,会发生再均衡,将其负责的分区交给其他消费者处理,这时可以根据偏移量去继续从宕机前消费的位置开始。

而为了应对消费者宕机情况,偏移量被设计成不存储在消费者的内存中,而是被持久化到一个Kafka的内部主题 __consumer_offsets 中,在 Kafka 中,将偏移量存储的操作称作提交。而消息者在每次消费消息时都将会将偏移量进行提交,提交的偏移量为下次消费的位置,例如本次消费的偏移量为x,则提交的是x+1。

在代码中我们并没有看到显示的提交代码,那么Kafka的默认提交方式是什么?默认情况下,消费者会定期以 auto_commit_interval_ms (5秒)的频率进行一次自动提交,而提交的动作发生于 poll 方法里,在进行拉取操作前会先检查是否可以进行偏移量提交,如果可以,则会提交即将拉取的偏移量。

下面我们看下这样一个场景,上次提交的偏移量为2,而当前消费者已经处理了2、3、4号消息,正准备提交5,但却宕机了。当发生再均衡时,其他消费者将继续从已提交的2开始消费,于是发生了重复消费的现象。

我们可以通过减小自动提交的时间间隔来减小重复消费的窗口大小,但这样仍然无法避免重复消费的发生。

按照线性程序的思维,由于自动提交是延迟提交,即在处理完消息之后进行提交,所以应该不会出现消息丢失的现象,也就是已提交的偏移量会大于正在处理的偏移量。但放在多线程环境中,消息丢失的现象是可能发生的。例如线程 A 负责调用 poll 方法拉取消息并放入一个队列中,由线程 B 负责处理消息。如果线程 A 已经提交了偏移量5,而线程B还未处理完2、3、4号消息,这时候发生宕机,则将丢失消息。

手动提交

从上述场景的描述,我们可以知道自动提交是存在风险的。所以Kafka除了自动提交,还提供了手动提交的方式,可以细分为同步提交异步提交,分别对应了 KafkaConsumer 中的 commitSynccommitAsync 方法。我们先尝试使用同步提交修改程序:

while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());}consumer.commitSync();;
}

在处理完一批消息后,都会提交偏移量,这样能减小重复消费的窗口大小,但是由于是同步提交,所以程序会阻塞等待提交成功后再继续处理下一条消息,这样会限制程序的吞吐量。那我们改为使用异步提交:

while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());}consumer.commitAsync();;
}

异步提交时,程序将不会阻塞,但异步提交在提交失败时也不会进行重试,所以提交是否成功是无法保证的。因此我们可以组合使用两种提交方式。在轮循中使用异步提交,而当关闭消费者时,再通过同步提交来保证提交成功。

try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());}consumer.commitAsync();}
} finally {try {consumer.commitSync();} finally {consumer.close();}
}

上述介绍的两种无参的提交方式都是提交的 poll 返回的一个批次的数据。若未来得及提交,也会造成重复消费,如果还想更进一步减少重复消费,可以在 for 循环中为 commitAsynccommitSync 传入分区和偏移量,进行更细粒度的提交,例如每1000条消息我们提交一次:

Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.value());// 偏移量加1currentOffsets.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));if (count % 1000 == 0) {consumer.commitAsync(currentOffsets, null);}count++;}
}

poll方法

关于提交就介绍到这里。在使用消费者的代理中,我们可以看到 poll 方法是其中最为核心的方法,能够拉取到我们需要消费的消息。所以接下来,我们一起深入到消费者 API 的幕后,看看在 poll 方法中,都发生了什么,其实现如下:

public ConsumerRecords<K, V> poll(final Duration timeout) {return poll(time.timer(timeout), true);
}

在我们使用设置超时时间的 poll 方法中,会调用重载方法,第二个参数 includeMetadataInTimeout 用于标识是否把元数据的获取算在超时时间内,这里传值为 true ,也就是算入超时时间内。下面再看重载的 poll 方法的实现:

private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {// 1. 获取锁并确保消费者没有关闭acquireAndEnsureOpen();try {// 2.记录poll开始this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());// 3.检查是否有订阅主题if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");}do {// 4.安全的唤醒消费者client.maybeTriggerWakeup();// 5.更新偏移量(如果需要的话)if (includeMetadataInTimeout) {updateAssignmentMetadataIfNeeded(timer, false);} else {while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {log.warn("Still waiting for metadata");}}// 6.拉取消息final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);if (!records.isEmpty()) {// 7.如果拉取到了消息或者有未处理的请求,由于用户还需要处理未处理的消息// 所以会再次发起拉取消息的请求(异步),提高效率if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {client.transmitSends();}// 8.调用消费者拦截器处理return this.interceptors.onConsume(new ConsumerRecords<>(records));}} while (timer.notExpired());return ConsumerRecords.empty();} finally {// 9.释放锁release();// 10.记录poll结束this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());}
}

我们对上面的代码逐步分析,首先是第1步 acquireAndEnsureOpen 方法,获取锁并确保消费者没有关闭,其实现如下:

private void acquireAndEnsureOpen() {acquire();if (this.closed) {release();throw new IllegalStateException("This consumer has already been closed.");}
}

其中 acquire 方法用于获取锁,为什么这里会要上锁。这是因为 KafkaConsumer 是线程不安全的,所以需要上锁,确保只有一个线程使用 KafkaConsumer 拉取消息,其实现如下:

private static final long NO_CURRENT_THREAD = -1L;
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
private final AtomicInteger refcount = new AtomicInteger(0);private void acquire() {long threadId = Thread.currentThread().getId();if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");refcount.incrementAndGet();
}

用一个原子变量 currentThread 作为锁,通过 cas 操作获取锁,如果 cas 失败,即获取锁失败,表示发生了竞争,有多个线程在使用 KafkaConsumer ,则会抛出 ConcurrentModificationException 异常,如果 cas 成功,还会将 refcount 加一,用于重入。

再看第2、3步,记录 poll 的开始以及检查是否有订阅主题。然后进入 do-while 循环,如果没有拉取到消息,将在不超时的情况下一直轮循。

第4步,安全的唤醒消费者,并不是唤醒,而是检查是否有唤醒的风险,如果程序在执行不可中断的方法或是收到中断请求,会抛出异常,这里我还不是很明白,先放一下。

第5步,更新偏移量,就是我们在前文说的在进行拉取操作前会先检查是否可以进行偏移量提交。

第6步,pollForFetches 方法拉取消息,其实现如下:

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {long pollTimeout = coordinator == null ? timer.remainingMs() :Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());// 1.如果消息已经有了,则立即返回final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();if (!records.isEmpty()) {return records;}// 2.准备拉取请求fetcher.sendFetches();if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {pollTimeout = retryBackoffMs;}Timer pollTimer = time.timer(pollTimeout);// 3.发送拉取请求client.poll(pollTimer, () -> {return !fetcher.hasAvailableFetches();});timer.update(pollTimer.currentTimeMs());// 3.返回消息return fetcher.fetchedRecords();
}

如果 fetcher 已经有消息了则立即返回,这里和下面将要讲的第7步对应。如果没有消息则使用 Fetcher 准备拉取请求然后再通过 ConsumerNetworkClient 发送请求,最后返回消息。

为啥消息会已经有了呢,我们回到 poll 的第7步,如果拉取到了消息或者有未处理的请求,由于用户还需要处理未处理的消息,这时候可以使用异步的方式发起下一次的拉取消息的请求,将数据提前拉取,减少网络IO的等待时间,提高程序的效率。

第8步,调用消费者拦截器处理,就像 KafkaProducer 中有 ProducerInterceptor ,在 KafkaConsumer 中也有 ConsumerInterceptor ,用于处理返回的消息,处理完后,再返回给用户。

第9、10步,释放锁和记录 poll 结束,对应了第1、2步。

KafkaConsumerpoll 方法就分析到这里。最后用一个思维导图回顾下文中较为重要的知识点:

参考:

《Kafka权威指南》

《深入理解Kafka核心设计和实践原理》

你绝对能看懂的Kafka源代码分析-KafkaConsumer类代码分析: https://blog.csdn.net/liyiming2017/article/details/89187474

Kafka消费者源码解析之一KafkaConsumer:https://blog.csdn.net/lt793843439/article/details/89511405

更多精彩推荐
☞开启人才进阶之旅,鲲鹏开发者技术沙龙点燃计算行业激情
☞全面拥抱云原生应用研发的拐点已经到来
☞微软全球 AKS 女掌门人,这样击破云原生“怪圈”!
☞阿里动物园再添新丁,小蛮驴搞定物流最后三公里
☞中国移动云智融合峰会 与您相约揽胜九天
☞我投资比特币的3个原因
点分享点点赞点在看

Kafka消费者的使用和原理相关推荐

  1. kafka使用_Kafka 消费者的使用和原理

    继上周的<Kafka 生产者的使用和原理>,这周我们学习下消费者,仍然还是先从一个消费者的Hello World学起: public class Consumer { public sta ...

  2. Kafka消费者原理解析

    文章目录 消费者和消费组 创建Kafka消费者 rebalance 分区再均衡 rebalance触发时机 rebalance 分区分配策略 rebalance generatian rebalanc ...

  3. 2021年大数据Kafka(九):kafka消息存储及查询机制原理

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 kafka消息存储及查询机制原理 一.Kafka数据存储机制 ...

  4. Kafka核心设计与实践原理总结:进阶篇

    作者:未完成交响曲,资深Java工程师!目前在某一线互联网公司任职,架构师社区合伙人! kafka作为当前热门的分布式消息队列,具有高性能.持久化.多副本备份.横向扩展能力.我学习了<深入理解K ...

  5. Stream Processing: Apache Kafka的Exactly-once的定义 原理和实现

    2018年,Apache Kafka以一种特殊的设计和方法实现了强语义的exactly-once和事务性.热泪盈眶啊! 这篇文章将讲解kafka中exactly-once和事务操作的原理,具体为 (1 ...

  6. kafka消费者如何读同一生产者消息_Kafka系列3:深入理解Kafka消费者

    上面两篇聊了Kafka概况和Kafka生产者,包含了Kafka的基本概念.设计原理.设计核心以及生产者的核心原理.本篇单独聊聊Kafka的消费者,包括如下内容:消费者和消费者组 如何创建消费者 如何消 ...

  7. Kafka知识总结之Broker原理总结

    简介 这篇文章介绍Kafka的Broker工作流程,包括其中控制器的选举过程:kafka副本的leader选举以及leader和follower故障流程:简单讲述了生产环境中如何调整分区副本:kafk ...

  8. Kafka快速入门(Kafka消费者)

    Kafka 消费者 1. Kafka 消费方式 2 Kafka 消费者工作流程 2.1 消费者总体工作流程 2.2 消费者组原理 Consumer Group(CG):消费者组,由多个consumer ...

  9. kafka消费者(Consumer)端多线程消费的实现方案

    kafka消费者(Consumer)端多线程消费的实现方案 kafka Java consumer设计原理 设计原理 为什么用单线程设计 多线程方案: 方案一: 方案二: 两个方案的优缺点: kafk ...

最新文章

  1. 利用CSDN将图片自动存入AI Studio :pic2bml
  2. hdu 6096---String(AC自动机)
  3. python编程入门经典 评分-关于 Python 的经典入门书籍有哪些?
  4. AtomicInteger
  5. phantomjs安装所需依赖
  6. 从服务器上的数据库备份到本地
  7. (None resource)-Binary system
  8. linux awk 改写文件,批处理修改文件内容的问题,使用awk命令
  9. Java8-2-Lambda表达式实战-一句话实现Map中按照Value排序
  10. Ubuntu18.04 安装wine
  11. maven安装教程(保姆级别)
  12. WIN10计算机不支持3D游戏怎么办,教您如何在win10系统中启用3D加速?
  13. DM368串口通信调试
  14. 上百种Python炫酷可视化案例珍藏版——看完掌握~一键三连~老板都想要给你升职加薪哟!
  15. XP IIS下配置.net的问题总结与简单解决方法
  16. vue中的路由对象和路由记录
  17. java 基准测试 格式_JMH java基准测试
  18. CCF过程记录以及经验总结
  19. Word中的字体大小(几号-几磅)
  20. 联通数据能力开放平台介绍

热门文章

  1. 【python】入门第一篇
  2. HDU - 1757 A Simple Math Problem (矩阵快速幂)
  3. Scrum立会报告+燃尽图(Final阶段第二次)
  4. 学习ASP.NET Core Razor 编程系列四——Asp.Net Core Razor列表模板页面
  5. Centos7的iso everything与DVD以及Live的区别
  6. jQuery中Ajax+Spring MVC实现跨域请求
  7. SQLite学习笔记(十二)虚拟机指令
  8. 视图引擎smarty 一
  9. 02 - 用wxStreamToTextRedirector和wxTextCtrl输出std::cout
  10. 错误摘要 HTTP 错误 403.14 - Forbidden Web 服务器被配置为不列出此目录的内容。