Kafka 心跳机制 重复消费
kafka 心跳机制
Kafka是通过心跳机制来控制消费超时,心跳机制对于消费者客户端来说是无感的,它是一个异步线程,当我们启动一个消费者实例时,心跳线程就开始工作了。心跳超时会导致消息重复消费。
在org.apache.kafka.clients.consumer.internals.AbstractCoordinator中会启动一个HeartbeatThread线程来定时发送心跳和检测消费者的状态。每个消费者都有个org.apache.kafka.clients.consumer.internals.ConsumerCoordinator,而每个ConsumerCoordinator都会启动一个HeartbeatThread线程来维护心跳,心跳信息存放在org.apache.kafka.clients.consumer.internals.Heartbeat中,声明的Schema如下所示:
private final int sessionTimeoutMs;private final int heartbeatIntervalMs;private final int maxPollIntervalMs;private final long retryBackoffMs;private volatile long lastHeartbeatSend; private long lastHeartbeatReceive;private long lastSessionReset;private long lastPoll;private boolean heartbeatFailed;
心跳线程实现方法
public void run() {try {log.debug("Heartbeat thread started");while (true) {synchronized (AbstractCoordinator.this) {if (closed)return;if (!enabled) {AbstractCoordinator.this.wait();continue;}if (state != MemberState.STABLE) {// the group is not stable (perhaps because we left the group or because the coordinator// kicked us out), so disable heartbeats and wait for the main thread to rejoin.disable();continue;}client.pollNoWakeup();long now = time.milliseconds();if (coordinatorUnknown()) {if (findCoordinatorFuture != null || lookupCoordinator().failed())// the immediate future check ensures that we backoff properly in the case that no// brokers are available to connect to.AbstractCoordinator.this.wait(retryBackoffMs);} else if (heartbeat.sessionTimeoutExpired(now)) {// the session timeout has expired without seeing a successful heartbeat, so we should// probably make sure the coordinator is still healthy.markCoordinatorUnknown();} else if (heartbeat.pollTimeoutExpired(now)) {// the poll timeout has expired, which means that the foreground thread has stalled// in between calls to poll(), so we explicitly leave the group.maybeLeaveGroup();} else if (!heartbeat.shouldHeartbeat(now)) {// poll again after waiting for the retry backoff in case the heartbeat failed or the// coordinator disconnectedAbstractCoordinator.this.wait(retryBackoffMs);} else {heartbeat.sentHeartbeat(now);sendHeartbeatRequest().addListener(new RequestFutureListener<Void>() {@Overridepublic void onSuccess(Void value) {synchronized (AbstractCoordinator.this) {heartbeat.receiveHeartbeat(time.milliseconds());}}@Overridepublic void onFailure(RuntimeException e) {synchronized (AbstractCoordinator.this) {if (e instanceof RebalanceInProgressException) {// it is valid to continue heartbeating while the group is rebalancing. This// ensures that the coordinator keeps the member in the group for as long// as the duration of the rebalance timeout. If we stop sending heartbeats,// however, then the session timeout may expire before we can rejoin.heartbeat.receiveHeartbeat(time.milliseconds());} else {heartbeat.failHeartbeat();// wake up the thread if it's sleeping to reschedule the heartbeatAbstractCoordinator.this.notify();}}}});}}}} catch (AuthenticationException e) {log.error("An authentication error occurred in the heartbeat thread", e);this.failed.set(e);} catch (GroupAuthorizationException e) {log.error("A group authorization error occurred in the heartbeat thread", e);this.failed.set(e);} catch (InterruptedException | InterruptException e) {Thread.interrupted();log.error("Unexpected interrupt received in heartbeat thread", e);this.failed.set(new RuntimeException(e));} catch (Throwable e) {log.error("Heartbeat thread failed due to unexpected error", e);if (e instanceof RuntimeException)this.failed.set((RuntimeException) e);elsethis.failed.set(new RuntimeException(e));} finally {log.debug("Heartbeat thread has closed");}}
在心跳线程中这里面包含两个最重要的超时函数,分别是sessionTimeoutExpired() 和 pollTimeoutExpired()。
public boolean sessionTimeoutExpired(long now) {return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeoutMs;
}public boolean pollTimeoutExpired(long now) {return now - lastPoll > maxPollIntervalMs;}
sessionTimeoutExpired
如果sessionTimeout超时,则会被标记为当前协调器处理断开, 即将将消费者移除,重新分配分区和消费者的对应关系。在Kafka Broker Server中,Consumer Group定义了5中(如果算上Unknown,应该是6种状态)状态,org.apache.kafka.common.ConsumerGroupState,如下图所示:
pollTimeoutExpired
如果触发了poll超时,此时消费者客户端会退出ConsumerGroup,当再次poll的时候,会重新加入到ConsumerGroup,触发消费者再平衡策略 RebalanceGroup。而KafkaConsumer Client是不会帮我们重复poll的,需要我们自己在实现的消费逻辑中不停的调用poll方法。
Kafka消费者负载均衡策略 消费者再平衡 consumer rebalance
Kafka 如何解决消息重复
Kafka 心跳机制 重复消费相关推荐
- 6张图阐述Kafka心跳机制(时间轮算法的具体运用)
Broker端与客户端的心跳在Kafka中非常的重要,因为一旦在一个心跳过期周期内(默认10s),Broker端的消费组组协调器(GroupCoordinator)会把消费者从消费组中移除,从而触发重 ...
- kafka 如何避免重复消费
为什么会出现重复消费 1.kafka是通过offset来标记消费的.默认情况下,消费完成后会自动提交offset,避免重复消费. Kafka消费端的自动提交逻辑有一个默认的5秒间隔,也就是说在5秒之后 ...
- 什么?搞不定Kafka重复消费?
来自:架构之美 前言 今天我们聊一个话题,这个话题大家可能在面试过程中,或者是工作当中经常遇到 ????如何保证 Kafka 消息不重复消费?我们在做开发的时候为了程序的健壮性,在使用 Kafka 的 ...
- 三张表有重复字段_什么?搞不定Kafka重复消费?
点戳蓝字"架构之美"关注我们哦! 前言 今天我们聊一个话题,这个话题大家可能在面试过程中,或者是工作当中经常遇到 ?如何保证 Kafka 消息不重复消费?我们在做开发的时候为了程序 ...
- Kafka不丢失数据与不重复消费数据
文章目录 一.不丢失数据 1.生产者数据不丢失 2.消费者数据不丢失 二.不重复消费数据 一.不丢失数据 1.生产者数据不丢失 同步模式:配置=1(只有Leader收到,-1所有副本成功,0不等待). ...
- 总结kafka的consumer消费能力很低造成重复消费死循环的情况下的处理方案
简介 由于项目中需要使用kafka作为消息队列,并且项目是基于spring-boot来进行构建的,所以项目采用了spring-kafka作为原生kafka的一个扩展库进行使用.先说明一下版本: spr ...
- kafka 重复消费和数据丢失_刨根问底,Kafka消息中间件到底会不会丢消息
大型互联网公司一般都会要求消息传递最大限度的不丢失,比如用户服务给代金券服务发送一个消息,如果消息丢失会造成用户未收到应得的代金券,最终用户会投诉. 为避免上面类似情况的发生,除了做好补偿措施,更应该 ...
- 【消息队列】kafka是如何保证消息不被重复消费的
一.kafka自带的消费机制 kafka有个offset的概念,当每个消息被写进去后,都有一个offset,代表他的序号,然后consumer消费该数据之后,隔一段时间,会把自己消费过的消息的offs ...
- MQ问题集(kafka主从同步与高可用,MQ重复消费、幂等)
1.kafka主从同步与高可用 https://1028826685.iteye.com/blog/2354570 http://developer.51cto.com/art/201808/5815 ...
最新文章
- php写网页6,基于ThinkPHP6+AdminLTE框架开发的响应式企业网站CMS系统PHP源码,ThinkPHP6开发的后台权限管理系统...
- 浅谈强化学习的方法及学习路线
- 计算器界面分析及界面程序实现
- EF里的默认映射以及如何使用Data Annotations和Fluent API配置数据库的映射
- Teamtalk源码分析
- 编程命名中的7+1个提示
- 洛谷P3371-【模板】单源最短路【SPFA】
- AcWing 2019. 拖拉机(双端BFS)
- 《测试驱动数据库开发》——2.1 TDD中类的角色
- changeable和changeful_change的形容词是什么?
- 采集新闻数据的10个经典方法
- 从administrators组中删除guest来宾账户时提示无法在内置账号上运行此操作的解决办法...
- ocr文字识别html,LEADTOOLS 创建基于HTML5的零占用OCR文字识别
- Java的时间格式化
- 柴胡加龙骨牡蛎汤去大黄合当归芍药散治顽固心悸案(李跃海)
- 开源中国源码学习UI篇(一)之FragmentTabHost的使用分析
- kill word fore out
- 记录 设计+制作AA机台
- 矩阵理论| 基础:向量范数、赋范向量空间与内积空间、重要不等式
- 集群、负载均衡、分布式 简介