kafka 心跳机制

Kafka是通过心跳机制来控制消费超时,心跳机制对于消费者客户端来说是无感的,它是一个异步线程,当我们启动一个消费者实例时,心跳线程就开始工作了。心跳超时会导致消息重复消费

在org.apache.kafka.clients.consumer.internals.AbstractCoordinator中会启动一个HeartbeatThread线程来定时发送心跳和检测消费者的状态。每个消费者都有个org.apache.kafka.clients.consumer.internals.ConsumerCoordinator,而每个ConsumerCoordinator都会启动一个HeartbeatThread线程来维护心跳,心跳信息存放在org.apache.kafka.clients.consumer.internals.Heartbeat中,声明的Schema如下所示:

    private final int sessionTimeoutMs;private final int heartbeatIntervalMs;private final int maxPollIntervalMs;private final long retryBackoffMs;private volatile long lastHeartbeatSend; private long lastHeartbeatReceive;private long lastSessionReset;private long lastPoll;private boolean heartbeatFailed;

心跳线程实现方法

public void run() {try {log.debug("Heartbeat thread started");while (true) {synchronized (AbstractCoordinator.this) {if (closed)return;if (!enabled) {AbstractCoordinator.this.wait();continue;}if (state != MemberState.STABLE) {// the group is not stable (perhaps because we left the group or because the coordinator// kicked us out), so disable heartbeats and wait for the main thread to rejoin.disable();continue;}client.pollNoWakeup();long now = time.milliseconds();if (coordinatorUnknown()) {if (findCoordinatorFuture != null || lookupCoordinator().failed())// the immediate future check ensures that we backoff properly in the case that no// brokers are available to connect to.AbstractCoordinator.this.wait(retryBackoffMs);} else if (heartbeat.sessionTimeoutExpired(now)) {// the session timeout has expired without seeing a successful heartbeat, so we should// probably make sure the coordinator is still healthy.markCoordinatorUnknown();} else if (heartbeat.pollTimeoutExpired(now)) {// the poll timeout has expired, which means that the foreground thread has stalled// in between calls to poll(), so we explicitly leave the group.maybeLeaveGroup();} else if (!heartbeat.shouldHeartbeat(now)) {// poll again after waiting for the retry backoff in case the heartbeat failed or the// coordinator disconnectedAbstractCoordinator.this.wait(retryBackoffMs);} else {heartbeat.sentHeartbeat(now);sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {@Overridepublic void onSuccess(Void value) {synchronized (AbstractCoordinator.this) {heartbeat.receiveHeartbeat(time.milliseconds());}}@Overridepublic void onFailure(RuntimeException e) {synchronized (AbstractCoordinator.this) {if (e instanceof RebalanceInProgressException) {// it is valid to continue heartbeating while the group is rebalancing. This// ensures that the coordinator keeps the member in the group for as long// as the duration of the rebalance timeout. If we stop sending heartbeats,// however, then the session timeout may expire before we can rejoin.heartbeat.receiveHeartbeat(time.milliseconds());} else {heartbeat.failHeartbeat();// wake up the thread if it's sleeping to reschedule the heartbeatAbstractCoordinator.this.notify();}}}});}}}} catch (AuthenticationException e) {log.error("An authentication error occurred in the heartbeat thread", e);this.failed.set(e);} catch (GroupAuthorizationException e) {log.error("A group authorization error occurred in the heartbeat thread", e);this.failed.set(e);} catch (InterruptedException | InterruptException e) {Thread.interrupted();log.error("Unexpected interrupt received in heartbeat thread", e);this.failed.set(new RuntimeException(e));} catch (Throwable e) {log.error("Heartbeat thread failed due to unexpected error", e);if (e instanceof RuntimeException)this.failed.set((RuntimeException) e);elsethis.failed.set(new RuntimeException(e));} finally {log.debug("Heartbeat thread has closed");}}

在心跳线程中这里面包含两个最重要的超时函数,分别是sessionTimeoutExpired() 和 pollTimeoutExpired()。

public boolean sessionTimeoutExpired(long now) {return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeoutMs;
}public boolean pollTimeoutExpired(long now) {return now - lastPoll > maxPollIntervalMs;}
  • sessionTimeoutExpired

如果sessionTimeout超时,则会被标记为当前协调器处理断开, 即将将消费者移除,重新分配分区和消费者的对应关系。在Kafka Broker Server中,Consumer Group定义了5中(如果算上Unknown,应该是6种状态)状态,org.apache.kafka.common.ConsumerGroupState,如下图所示:

  • pollTimeoutExpired

如果触发了poll超时,此时消费者客户端会退出ConsumerGroup,当再次poll的时候,会重新加入到ConsumerGroup,触发消费者再平衡策略 RebalanceGroup。而KafkaConsumer Client是不会帮我们重复poll的,需要我们自己在实现的消费逻辑中不停的调用poll方法

Kafka消费者负载均衡策略 消费者再平衡 consumer rebalance

Kafka 如何解决消息重复

Kafka 心跳机制 重复消费相关推荐

  1. 6张图阐述Kafka心跳机制(时间轮算法的具体运用)

    Broker端与客户端的心跳在Kafka中非常的重要,因为一旦在一个心跳过期周期内(默认10s),Broker端的消费组组协调器(GroupCoordinator)会把消费者从消费组中移除,从而触发重 ...

  2. kafka 如何避免重复消费

    为什么会出现重复消费 1.kafka是通过offset来标记消费的.默认情况下,消费完成后会自动提交offset,避免重复消费. Kafka消费端的自动提交逻辑有一个默认的5秒间隔,也就是说在5秒之后 ...

  3. 什么?搞不定Kafka重复消费?

    来自:架构之美 前言 今天我们聊一个话题,这个话题大家可能在面试过程中,或者是工作当中经常遇到 ????如何保证 Kafka 消息不重复消费?我们在做开发的时候为了程序的健壮性,在使用 Kafka 的 ...

  4. 三张表有重复字段_什么?搞不定Kafka重复消费?

    点戳蓝字"架构之美"关注我们哦! 前言 今天我们聊一个话题,这个话题大家可能在面试过程中,或者是工作当中经常遇到 ?如何保证 Kafka 消息不重复消费?我们在做开发的时候为了程序 ...

  5. Kafka不丢失数据与不重复消费数据

    文章目录 一.不丢失数据 1.生产者数据不丢失 2.消费者数据不丢失 二.不重复消费数据 一.不丢失数据 1.生产者数据不丢失 同步模式:配置=1(只有Leader收到,-1所有副本成功,0不等待). ...

  6. 总结kafka的consumer消费能力很低造成重复消费死循环的情况下的处理方案

    简介 由于项目中需要使用kafka作为消息队列,并且项目是基于spring-boot来进行构建的,所以项目采用了spring-kafka作为原生kafka的一个扩展库进行使用.先说明一下版本: spr ...

  7. kafka 重复消费和数据丢失_刨根问底,Kafka消息中间件到底会不会丢消息

    大型互联网公司一般都会要求消息传递最大限度的不丢失,比如用户服务给代金券服务发送一个消息,如果消息丢失会造成用户未收到应得的代金券,最终用户会投诉. 为避免上面类似情况的发生,除了做好补偿措施,更应该 ...

  8. 【消息队列】kafka是如何保证消息不被重复消费的

    一.kafka自带的消费机制 kafka有个offset的概念,当每个消息被写进去后,都有一个offset,代表他的序号,然后consumer消费该数据之后,隔一段时间,会把自己消费过的消息的offs ...

  9. MQ问题集(kafka主从同步与高可用,MQ重复消费、幂等)

    1.kafka主从同步与高可用 https://1028826685.iteye.com/blog/2354570 http://developer.51cto.com/art/201808/5815 ...

最新文章

  1. php写网页6,基于ThinkPHP6+AdminLTE框架开发的响应式企业网站CMS系统PHP源码,ThinkPHP6开发的后台权限管理系统...
  2. 浅谈强化学习的方法及学习路线
  3. 计算器界面分析及界面程序实现
  4. EF里的默认映射以及如何使用Data Annotations和Fluent API配置数据库的映射
  5. Teamtalk源码分析
  6. 编程命名中的7+1个提示
  7. 洛谷P3371-【模板】单源最短路【SPFA】
  8. AcWing 2019. 拖拉机(双端BFS)
  9. 《测试驱动数据库开发》——2.1 TDD中类的角色
  10. changeable和changeful_change的形容词是什么?
  11. 采集新闻数据的10个经典方法
  12. 从administrators组中删除guest来宾账户时提示无法在内置账号上运行此操作的解决办法...
  13. ocr文字识别html,LEADTOOLS 创建基于HTML5的零占用OCR文字识别
  14. Java的时间格式化
  15. 柴胡加龙骨牡蛎汤去大黄合当归芍药散治顽固心悸案(李跃海)
  16. 开源中国源码学习UI篇(一)之FragmentTabHost的使用分析
  17. kill word fore out
  18. 记录 设计+制作AA机台
  19. 矩阵理论| 基础:向量范数、赋范向量空间与内积空间、重要不等式
  20. 集群、负载均衡、分布式 简介

热门文章

  1. 一头扎进springboot之捕获全局异常
  2. IQueryable VS IEnumerable
  3. 谁能分享一下PMP备考攻略?
  4. 程序员的逆袭之路---从沉迷游戏到入职腾讯的全过程
  5. 不要给技术人员做绩效
  6. 看了彭于晏解封后的照片,有网友瞬间觉得自己配得上他了
  7. linux Hadoop环境变量安装及应用
  8. Unreal中Interface接口的使用
  9. Threading lightly, Part 2: Reducing contention
  10. 【NOIP校内模拟】图论题