前面对kafka的学习中已经了解到KafkaProducer通过设定参数retries,如果发送消息到broker时抛出异常,且是允许重试的异常,那么就会最大重试retries参数指定的次数。

本片文章主要分析几个问题:
- 哪些异常可以重试
- 如何实现重试

接下来通过分析一一解开这些问题的答案。

1.哪些异常可以重试

org.apache.kafka.clients.producer.internals.Sender类中有如下方法:

private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response) {return batch.attempts() < this.retries &&((response.error.exception() instanceof RetriableException) ||(transactionManager != null && transactionManager.canRetry(response, batch)));
}

通过方法名可知,其作用是判断是否能重试,由方法体内的实现可知,允许重试需要满足两个条件:
1. 重试次数少于参数retries指定的值;
2. 异常是RetriableException类型或者TransactionManager允许重试;

transactionManager.canRetry()后面会分析;先看看哪些异常是RetriableException类型异常。

  • RetriableException类型异常

kafka对RetriableException异常注释是:短暂性的通过重试可以成功的异常;通过RetriableException类关系图可知,可重试异常有图中RetriableException的子类那些异常(可以通过异常是否继承自RetriableException判断是否可重试异常):

  • TransactionManager允许重试

如果异常不属于RetriableException类型,但是只要满足(transactionManager != null && transactionManager.canRetry(response, batch))就允许重试,所以,首先需要满足transactionManager不为null。transactionManager是在KafkaProducer中构造Sender传入的。构造TransactionManager的核心源码如下:

private static TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext, Logger log) {TransactionManager transactionManager = null;boolean userConfiguredIdempotence = false;// 用户设置的Properties参数中是否有'enable.idempotence',如果有的话, 就用用户配置的if (config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) {userConfiguredIdempotence = true;}// 用户设置的Properties参数中是否有'transactional.id',如果有的话, 就用用户配置的boolean userConfiguredTransactions = false;if (config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) {userConfiguredTransactions = true;}// 得到参数'enable.idempotence'的值boolean idempotenceEnabled = config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);// 如果用户显示配置enable.idempotence为false,并且又配置了transactional.id,就会抛出这个异常if (!idempotenceEnabled && userConfiguredIdempotence && userConfiguredTransactions) {throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");} // 如果用户配置了transactional.id,那么idempotenceEnabled就认为是true(与)if (userConfiguredTransactions) {idempotenceEnabled = true;}// 只有用户配置了transactional.id,且enable.idempotence没有设置为false,这里才为true,就会构造一个有效的TransactionManager;从这里可知,如果用户没有配置transactional.id,那么TransactionManager为nullif (idempotenceEnabled) {// 构造TransactionManager的几个重要参数String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);transactionManager = new TransactionManager(logContext, transactionalId, transactionTimeoutMs, retryBackoffMs);... ...}return transactionManager;
}

根据上面源码分析可知,只要用户配置了transactional.id,且没有显示配置enable.idempotence为false,那么TransactionManager就不会为null;

接下来还要满足transactionManager.canRetry(response, batch)才允许重试,主要包括下面几种情况:
- 碰到OutOfOrderSequenceException异常
- broker的响应报文中没有logStartOffset(正常的响应信息:”T0-0” -> “{error: NONE,offset: 0,logAppendTime: -1, logStartOffset: 0}”)

2.如何实现重试

上面说明了什么情况下允许重试,接下来分析kafka是如何实现重试的。

2.1原理图

本打算把原理图放在最后,但是最后还是决定放在前面。对重试机制有一定的了解后,再看后面的分析就容易很多。kafka发送&重试机制如下图所示:

说明:
1. new KafkaProducer()后创建一个后台线程KafkaThread扫描RecordAccumulator中是否有消息;
2. 调用KafkaProducer.send()发送消息,实际上只是把消息保存到RecordAccumulator中;
3. 后台线程KafkaThread扫描到RecordAccumulator中有消息后,将消息发送到kafka集群;
4. 如果发送成功,那么返回成功;
5. 如果发送失败,那么判断是否允许重试。如果不允许重试,那么返回失败的结果;如果允许重试,把消息再保存到RecordAccumulator中,等待后台线程KafkaThread扫描再次发送;

初步了解整个发送&重试过程后,再根据源码进行更深入的分析。

2.2后台线程

分析kafka如何实现重试之前,先看一下发送消息到broker前做的主要事情:

  1. 构造KafkaProducer时,构造Send并启动一个异步线程:
this.sender = new Sender(... ...);
String ioThreadName = "kafka-producer-network-thread" + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();

且从这段代码可知,每个KafkaProducer会启动一个线程处理消息,这个线程命名为:kafka-producer-network-thread | ${clientId}。

笔者某个实例查看KafkaProducer启动的线程结果如下:

[afei@kafka ~]$ jstack -l 23715 | grep "kafka-producer-network-thread"
"kafka-producer-network-thread | producer-2" #109 daemon prio=5 os_prio=0 tid=0x00007fe081921000 nid=0x5dcb runnable [0x00007fdfeb92b000]
"kafka-producer-network-thread | producer-1" #46 daemon prio=5 os_prio=0 tid=0x00007fe081f5a800 nid=0x5d66 runnable [0x00007fe024d20000]
  1. 调用KafkaProducer的send()方法时,先把发送的消息存储在accumulator中:
RecordAccumulator.RecordAppendResult result = accumulator.append(
tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs);

2.3**RecordAccumulator**

RecordAccumulator是保存需要发送的消息或者重试消息的核心。发送消息之前先把消息存放在这里,异步线程KafkaThread启动后从这里取消息然后发送到broker。当发送出错且允许重试时,又会把这些需要重试的消息保存到这里再进行重试。

当调用KafkaProducer的send()方法发送消息时,会调用append()方法将消息暂时存放,核心源码如下:

 // 获得deque或者创建deque。因为核心数据结构是ConcurrentMap<TopicPartition, Deque<ProducerBatch>>,所以生产者批次消息是按照分区区分的。如果根据分区拿不到deque的话,就创建一个deque。
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
// 把需要发送的消息放入队列中,
dq.addLast(batch);

当发送出错且允许重试时,会调用reenqueue()方法将消息暂时存放,核心源码如下:

public void reenqueue(ProducerBatch batch, long now) {Deque<ProducerBatch> deque = getOrCreateDeque(batch.topicPartition);synchronized (deque) {// 把需要重试的消息放入队列中,等到重试deque.addFirst(batch);}
}

RecordAccumulator简单总结:通过这两段代码的分析可知,保存需要发送的(重试)消息的核心数据结构是Deque。且创建队列时是new ArrayDeque(),没有指定初始容量。这里不打算深入分析Deque,只是简单介绍一下,Deque是Double ended queue (双端队列) 的缩写。首尾都可写入可读取。

2.3发送&重试

下面分析kafka是如何发送并如何重试的。(TransactionManager相关代码被省略,其的作用后面有机会单独一篇文章分析);发送消息核心代码在Sender.java中, Sender.java实现了Runnable接口, 所以是后台线程异步发送消息到kafka集群:

public class Sender implements Runnable {public void run() {// KafkaProducer发送消息的线程启动后,一直运行,直到KafkaProducer.close()将running置为falsewhile (running) {run(time.milliseconds());}// 根据日志可知,接下来是KafkaProducer关闭后的逻辑log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");// 当非强制关闭时,可能依然有请求堆积在accumulator中, 我们需要将这些剩余的请求处理完成while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {run(time.milliseconds());}if (forceClose) {// 如果强制关闭,且有未处理完的消息,那么让这些消息的发送失败,并抛出异常new IllegalStateException("Producer is closed forcefully.").log.debug("Aborting incomplete batches due to forced shutdown");this.accumulator.abortIncompleteBatches();}... ...}
}

KafkaProducer关闭有方式有两种:close();close(long timeout, TimeUnit timeUnit),第一种是友好的关闭且设置timeout为Long.MAX_VALUE,第二种如果设置timeout为0,就是强制关闭,即forceClose=true。
备注:drained: 流干,耗尽,undrained则表示未耗尽。

准备发送消息前需要尝试去accumulator中获取消息:

// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,this.maxRequestSize, now);

accumulator.drain()本质就是:Deque<ProducerBatch> deque = getDeque(tp);ProducerBatch batch = deque.pollFirst();,即根据分区信息得到Deque,然后不断获取ProducerBatch,即封装后的要发送的消息。

run(long)方法中往broker发送消息的部分核心代码(位于Sender.java中)如下:

private void sendProduceRequest(... ...){ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,produceRecordsByPartition, transactionalId);RequestCompletionHandler callback = new RequestCompletionHandler() {public void onComplete(ClientResponse response) {// 这里是处理响应消息的地方handleProduceResponse(response, recordsByPartition, time.milliseconds());}};// 省略发送消息到broker的代码... ...
}   

handleProduceResponse()中收到的响应,如何是网络断开,那么构造响应:new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION)。如果有版本不匹配问题,那么构造响应:new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION)。还有一种特殊情况,如果指定了acks=0,那么构造响应new ProduceResponse.PartitionResponse(Errors.NONE),因为这种情况下只需要发送即可,不需要响应结果。接下来调用下面的方法–完成或者重试请求:

private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId, long now) {Errors error = response.error;if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 &&(batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {log.warn("Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}", ...);// 如果是'MESSAGE_TOO_LARGE'的错误,且是批量消息(recordCount>1),那么切割消息后再发送this.accumulator.splitAndReenqueue(batch);this.accumulator.deallocate(batch);this.sensors.recordBatchSplit();} else if (error != Errors.NONE) {// 如果响应有错误,判断是否允许重试if (canRetry(batch, response)) {// 如果允许重试,会输出warn日志log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}"... ...);if (transactionManager == null) {// 重新把消息放到队列中reenqueueBatch(batch, now);} ... ...} else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {// 接收到这种错误,就认为返回成功。completeBatch(batch, response);} else {... ...}// 到这里如果是UnknownTopicOrPartitionException异常,说明producer缓存的元数据信息可能已经过期,所以需要请求更新,代码省略} else {completeBatch(batch, response);}... ...
}

如果需要重试,重新入队列的源码如下:

// ProducerBatch就是发送的消息
private void reenqueueBatch(ProducerBatch batch, long currentTimeMs) {// accumulator的reenqueue前面已经分析了,本质就是调用Deque的addFirst()this.accumulator.reenqueue(batch, currentTimeMs);this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
}

11. kafka重试机制解读相关推荐

  1. Kafka设计解析(八)- Kafka事务机制与Exactly Once语义实现原理

    http://www.infoq.com/cn/articles/kafka-analysis-part-8# 写在前面的话 本文所有Kafka原理性的描述除特殊说明外均基于Kafka 1.0.0版本 ...

  2. Java基础学习总结(172)——手写Java 重试机制

    package com.zhy.common.retry;import java.util.Arrays;import org.apache.commons.lang3.StringUtils; im ...

  3. 【Spring Cloud】OpenFeign和Spring Cloud Loadbalancer调用失败后的重试机制比较

    1 概述 搭建一个微服务系统,有两个服务,Client和Server,Server有三个实例A.B.C,我让Client调用Server,Loadbalancer负载分担默认采用轮询机制,当Serve ...

  4. 【转载】Java重试机制

    重试机制在分布式系统中,或者调用外部接口中,都是十分重要的. 重试机制可以保护系统减少因网络波动.依赖服务短暂性不可用带来的影响,让系统能更稳定的运行的一种保护机制. 为了方便说明,先假设我们想要进行 ...

  5. Springboot 整合Retry 实现重试机制

    重试,在项目需求中是非常常见的,例如遇到网络波动等,要求某个接口或者是方法可以最多/最少调用几次: 实现重试机制,非得用Retry这个重试框架吗?那肯定不是,相信很多伙伴手写一下控制流程的逻辑也可以达 ...

  6. Guava-retrying 重试机制

    文章目录 Guava-retrying 1. 主要相关类 1.1 Attemp 类 1.2 Retryer 类 1.3 RetryListener 2. WaitStrategies 重试等待策略 2 ...

  7. 层层递进打造你的重试机制

    重试机制在分布式系统中,或者调用外部接口中,都是十分重要的. 重试机制可以保护系统减少因网络波动.依赖服务短暂性不可用带来的影响,让系统能更稳定的运行的一种保护机制. 为了方便说明,先假设我们想要进行 ...

  8. 6张图阐述Kafka心跳机制(时间轮算法的具体运用)

    Broker端与客户端的心跳在Kafka中非常的重要,因为一旦在一个心跳过期周期内(默认10s),Broker端的消费组组协调器(GroupCoordinator)会把消费者从消费组中移除,从而触发重 ...

  9. 手机淘宝双11全球狂欢节技术解读

    手机淘宝 双11全球狂欢节技术解读 2015双11全球狂欢节全天交易额912.17亿元!无线成交626.42亿元!无线占比68.67%!--这是消费的力量,是新经济的力量,是我们每一个人的力量,更是中 ...

最新文章

  1. router路由react_使用React Router在React中受保护的路由
  2. 理解koa-router 路由一般使用
  3. rsync服务同步,linux日志,screen工具
  4. MFC DLL对话框调用
  5. Java EE 7 Batch中传递属性/参数的2种方式
  6. 130242014037-汤毓聪-实验一
  7. 苹果cms安装 php映射,苹果cmsV10安装过程中的常见问题处理办法
  8. 1.5.7 Python匿名函数
  9. Python3常用正则表达式
  10. JellyViewPager
  11. ORK包的安装与linemod算法识别测试 (使用kinect v2 出现很奇怪的问题和解决)
  12. charles V4.2.1版本 破解码
  13. 微信小程序后端系统CMS开发笔记--04
  14. 内网即时通讯软件优点大全分享
  15. android电子指南针,Android实现电子罗盘(指南针)方向传感器的应用
  16. echarts图表销毁
  17. 软件项目管理三国启示录01 群雄争霸之项目经理的自我修养
  18. SLG游戏中大地图实现使用四叉树技术
  19. Web实现前后端分离,前后端解耦
  20. 在快充时代逆行的苹果

热门文章

  1. 基于经度坐标校正鱼眼图像---python实现
  2. 程序员面试一面、二面、三面区别
  3. openstack关闭安全组(网络端口)的限制
  4. Iphone版音乐计算机,轻松实现从iPhone上进行音乐等文件的共享复制
  5. 网页链接分享到微信里的海报制作
  6. 新手体验 kaggle上的电影评论情感分析
  7. fatal: No url found for submodule path ‘xxx‘ in .gitmodule
  8. 油猴脚本Tampermonkey的简介和安装使用,五分钟安装
  9. linux待机唤醒_Linux电源管理-休眠与唤醒
  10. Visual Studio 2019 卸载干净+下载安装方法 2021-5-7