前言

理解一下Kafka的读的自动提交功能。

找到了一篇专门介绍这个功能的文章,选择主要的内容进行一下翻译和做笔记。

自动提交参数auto.commit的设置

Understanding the ‘enable.auto.commit’ Kafka Consumer property

Kafka Consumers read messages from a Kafka topic, its not a hard concept to get your head around. But behind the scenes there’s a lot more going on than meets the eye.

Say we’re consuming messages from a Topic and our Consumer crashes. Once we realise that the world isn't ending, we recover from the crash and we start consuming again. We start receiving messages exactly where we left off from, its kinda neat.

假设我们正在从一个 Topic 中消费消息,这个时候我们的这个消费者(客户端)宕机了。我们意识到这不是世界的末日,我们可以从宕机中恢复,重新开始消费。我们可以从我们上一次离开的地方重新接收消息,这非常灵巧。

There’s two reasons as to why this happens. One is something referred to as the “Offset” and the other is a couple of default Consumer values.

发生这样的事情是因为两个原因。一个是一个叫 “Offset” 的东西,另外一个是一些 Consumer 的默认的值。

So whats an Offset?

The Offset is a piece of metadata, an integer value that continually increases for each message that is received in a partition. Each message will have a unique Offset value in a partition.

Offset 是一块元数据,一个整数,会针对每一个 partition 上接收到的消息而持续增长。每一个消息在一个 partition 上将会有唯一的一个Offset。

I use Keys in some of my projects, some of them I don’t ;)

So as you can see here, each message has a unique Offset, and that Offset represents the position of that message in that particular partition.

上面介绍了一下Kafka的offset是什么,offset是记录每条消息在partition里面的位置的。

When a Consumer reads the messages from the Partition it lets Kafka know the Offset of the last consumed message. This Offset is stored in a Topic named _consumer_offsets, in doing this a consumer can stop and restart without forgetting which messages it has consumed.

这里讲,offset会被存在一个叫做_consumer_offsets的主题中,这样来帮助消费者记录处理到哪里了。

When we create our Consumers, they have a set of default properties which we can override or we can just leave the default values in effect.

There are two properties that are driving this behaviour.

有两个属性需要关注。

  1. enable.auto.commit

  2. auto.commit.interval.ms

The first property enable.auto.commit has a default value of true and the second property auto.commit.interval.ms has a default value of 5000. These values are correct for Blizzards node-rdkafka client and the Java KafkaConsumer client but other libraries may differ.

enable.auto.commit 的默认值是 true;就是默认采用自动提交的机制。

auto.commit.interval.ms 的默认值是 5000,单位是毫秒。

So by default every 5 seconds a Consumer is going to commit its Offset to Kafka or every time data is fetched from the specified Topic it will commit the latest Offset.

这样,默认5秒钟,一个 Consumer 将会提交它的 Offset 给 Kafka,或者每一次数据从指定的 Topic 取回时,将会提交最后一次的 Offset。

Now in some scenarios this is the ideal behaviour but on other scenarios its not.

这样,在某些场景下,这是理想的表现,但是在其他场景下,并不是。

Say our Consumer is processing a message with an Offset of 100 and whilst processing it the Consumer fetches some more data, the Offset is commit and then the Consumer crashes. Upon coming back up it will start consuming messages from the most recent committed Offset, but how can we safely say that we haven’t lost messages and the Offset of the new message isn't later then the one of the message been processed?

这么说,我们的 Consumer 正在消费一个 Offset 是100的消息,同时这个 Consumer 取回了一些数据,这个 Offset 提交了,然后 Consumer 崩溃了。在我们回来的时候,我们会重新从最新提交的 Offset 去进行消息的消费,但是我们如何能安全地说,我们没有丢失消息,并且这个新消息的 Offset 不会比刚刚被处理的那个消息靠后呢?

What we can do is commit the Offset of messages manually after processing them. This give us full control over when we consider a message dealt with, processed and ready to let Kafka know that.

解决这个问题的方案就是我们手动地提交这个 Offset,在处理完这些消息之后。这给与了我们完全的控制,什么时候去处理一个消息,什么时候去让 Kafka 知道这个。

Firstly we have to change the value of the enable.auto.commit property.

enable.auto.commit: false

When we change this property the auto.commit.interval.ms value isnt taken into consideration.

So now we can commit our Offset manually after the processing has taken place and if the Consumer crashes whilst processing a message it will start consuming from that same Offset, no messages lost.

我们把这个参数设置为 false ,就会由我们自己手动地来处理这个事情。

Both the clients mentioned earlier in this article have methods exposed to commit the Offset.

For further reading on the clients check out the links below.

如果 enable.auto.commit 设置成 false,那么 auto.commit.interval.ms 也就不被再考虑了

JSDoc: Class: KafkaConsumerKafkaConsumer class for reading messages from Kafka This is the main entry point for reading data from Kafka. You…blizzard.github.io

KafkaConsumer (kafka 0.10.2.1 API)To avoid this, we will manually commit the offsets only after the corresponding records have been inserted into the…kafka.apache.org

If anyone wants any more information on Kafka or Consumers get in touch on Twitter.

Cheers,

Danny

https://twitter.com/danieljameskay

举例

与 kafka auto commit 两个配置:

  • enable.auto.commit:是否开启自动提交
  • auto.commit.interval.ms:自动提交时间间隔

假设 enable.auto.commit 设置为 true,auto.commit.interval.ms 设置为 3000,试想一下会不会出现这样的问题:

poll 方法返回了 500 条数据,需要 5 秒钟才能处理完,假设在第 4 秒的时候应用挂了,offset 是不是在第 3 秒的时候已经被自动提交了,从而导致第 4 秒之后的数据“丢失”了?

正确答案是:不会的!虽然 auto.commit.interval.ms 设置为 3000,但是检查时间间隔是否过了 3 秒是由 poll 方法去触发的,所以只要在记录还没处理完之前我们没有主动去调用 poll 方法,就算时间间隔到了,也不会去自动提交。

自动提交是在哪里执行的

kafka consumer offset 的提交是有 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator 来完成的,真正执行提交的有两个方法:

  • 同步提交:org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#maybeAutoCommitOffsetsSync
  • 异步提交:org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#maybeAutoCommitOffsetsAsync

同步提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void maybeAutoCommitOffsetsSync(Timer timer) {if (autoCommitEnabled) {Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();try {log.debug("Sending synchronous auto-commit of offsets {}", allConsumedOffsets);if (!commitOffsetsSync(allConsumedOffsets, timer))log.debug("Auto-commit of offsets {} timed out before completion", allConsumedOffsets);} catch (WakeupException | InterruptException e) {log.debug("Auto-commit of offsets {} was interrupted before completion", allConsumedOffsets);// rethrow wakeups since they are triggered by the userthrow e;} catch (Exception e) {// consistent with async auto-commit failures, we do not propagate the exceptionlog.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumedOffsets, e.getMessage());}}
}

调用这个方法,当我们开启了自动提交,就会触发一个同步提交。那么哪里会调用这个方法?

  • 加入一个消费者组之前:org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#onJoinPrepare
  • 关闭一个消费者之前:org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#close

这两个触发点都跟我们要讨论的 auto.commit.interval.ms 问题无关,所以这里就不展开了。

异步提交

1
2
3
4
5
6
7
8
9
public void maybeAutoCommitOffsetsAsync(long now) {if (autoCommitEnabled) {nextAutoCommitTimer.update(now);if (nextAutoCommitTimer.isExpired()) {nextAutoCommitTimer.reset(autoCommitIntervalMs);doAutoCommitOffsetsAsync();}}
}

当 nextAutoCommitTimer 到期了就会执行 doAutoCommitOffsetsAsync() 方法进行异步提交,这个到期时间间隔就是 auto.commit.interval.ms 设置的间隔,所以我们只要跟踪 maybeAutoCommitOffsetsAsync 方法的调用方就知道什么时候会检查是否已经到期,从而进行自动异步提交。

通过 IDEA 快捷键查看,也有两个地方调用:

  • 手动分配分区时:org.apache.kafka.clients.consumer.KafkaConsumer#assign
  • 拉取数据时:org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration)

手动分配分区时调用是确保消费者之前分配的老分区 offset 的提交,也和 auto.commit.interval.ms 无关。所以,无论同步提交还是异步提交,跟 auto.commit.interval.ms 有关的只剩下 org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration) 方法了,只有这个方法在正常情况下会被多次调用的。

这就验证了文章开头的问题,只要我们没有去调用 poll 方法,就算时间间隔到了,也无法触发自动提交。

注意

如果auto_commit_interval_ms的值设置的过大,当消费者在自动提交偏移量之前异常退出,将导致kafka未提交偏移量,进而出现重复消费的问题,所以建议auto_commit_interval_ms的值越小越好

enable.auto.commit
如果为true,则将在后台定期提交消费者的offset。
默认值为true;

总结
offset自动提交,要注意可能引起重复消费的问题

参考

https://medium.com/@danieljameskay/understanding-the-enable-auto-commit-kafka-consumer-property-12fa0ade7b65

KafkaConsumer (kafka 0.10.2.1 API) 这里是官网介绍如何使用consumer

容易被误会的 Kafka 消费者属性 enable.auto.commit相关推荐

  1. springboot和kafka结合其中enable.auto.commit等于false失效

    事件描述 公司使用的是Spring Cloud工作的微服务框架.其中做了SpringBoot和kafka的结合.但是意外的是enable.auto.commit参数设置成了false,kafka的of ...

  2. 理解 Kafka 消费者属性的 enable.auto.commit

    前言 理解一下Kafka的读的自动提交功能. 找到了一篇专门介绍这个功能的文章,选择主要的内容进行一下翻译和做笔记. 正文 Understanding the 'enable.auto.commit' ...

  3. Kafka之enable.auto.commit使用解析

    通过字面意思我们不难理解这是kafka的自动提交功能. 配置消费者(配置ENABLE_AUTO_COMMIT_CONFIG为 true 配置自动提交) enable.auto.commit 的默认值是 ...

  4. kafka消费者如何读同一生产者消息_Kafka系列3:深入理解Kafka消费者

    上面两篇聊了Kafka概况和Kafka生产者,包含了Kafka的基本概念.设计原理.设计核心以及生产者的核心原理.本篇单独聊聊Kafka的消费者,包括如下内容:消费者和消费者组 如何创建消费者 如何消 ...

  5. Kafka(三):kafka消费者

    文章目录 1. 消费方式 2. 消费者总体工作流程 2.1 消费者组 2.2 消费者组初始化流程 2.3 消费者组详细消费流程 3 消费者重要参数 4. 分区的分配以及再平衡 4.1 Range以及再 ...

  6. kafka 同步提交 异步_极限MQ (5) Kafka 消费者

    要想知道如何从 Kafka 读取消息,需要先了解消费者和消费者群组的概念. 假设我们有一个应用程序需要从 Kafka 主题读取消息井验证这些消息,然后再把它们保存起来.应用程序需要创建一个消费者对象, ...

  7. kafka消费者开发方式小结

    [README] 1, 本文总结了 kafka消费者开发方式: 2, 本文使用的是最新的kafka版本 3.0.0: [1] kafka消费则 [1.1]消费者与消费者组 1)消费者: 应用程序需要创 ...

  8. (转)Kafka 消费者 Java 实现

    转自: Kafka 消费者 Java 实现 - 简书应用程序使用 KafkaConsumer向 Kafka 订阅 Topic 接收消息,首先理解 Kafka 中消费者(consumer)和消费者组(c ...

  9. kafka基础篇(四)——kafka消费者客户端

    一.入门程序 先上代码,从代码入手,讲解kafka消费者客户端的细节. public class HelloKafkaConsumer {public static void main(String[ ...

最新文章

  1. iOS架构-cocoaPods之自制私有库及管理(17)
  2. 前SAP全球VP加盟第四范式任总裁,戴文渊:加速多行业规模化扩展
  3. PHP判断是否为手机客户端
  4. 第04讲: 基础探究,Session 与 Cookies
  5. java 线程 api_Java核心API之线程(上)
  6. centos 6.8 下安装redmine(缺陷跟踪系统)
  7. SQL 全文索引 CONTAINS
  8. 软件工程革命 三部曲 —— 前传
  9. 2019.03.21 创建表 一对多,一对一,多对多。
  10. 《推荐系统实践》算法纯享(附代码链接)(四)—— UGC推荐篇
  11. Excel技巧—几个快速填充公式更高效的小技巧
  12. python分组统计excel数据_在python中对数据进行分组并与excel进行比较
  13. MySQL事务的保证机制
  14. 流程控制语句 函数 对象和数组
  15. arm9芯片包括哪些?arm9如何应用?
  16. WebDriver Sierra 10.12.3 N卡驱动
  17. WIN10作为服务器操作系统可以吗,服务器可以装win10吗
  18. 网络热帖惹争议,程序员高薪现象你怎么看?
  19. acne scar treatment options
  20. 基于Tofino2的64X100GE高性能可编程交换机MX7636-64X

热门文章

  1. 有了它,不会JavaScript,也能写出各种精彩页面!
  2. Git Worktree 高级使用,这样清爽多了|新技能
  3. 两款自动检测代码工具与插件,开源真香
  4. 皮一皮:经历过的举手报道...
  5. Java延迟加载的最佳实践应用示例!
  6. 扬言要干掉 RESTful API 的 GraphQL 是什么鬼?
  7. 每日一皮:发现程序员经常熬夜有三个弊端!
  8. Git 2.25.0 发布,新特性:部分 clone 与稀疏 checkout
  9. 计算机硬件技术 教案,教案07-计算机硬件技术基础.doc
  10. 苏州大学9月计算机考试试题,2016年9月计算机一级考试题及答案