问题解决过程

线上一个界面发现老是没有数据,排查下来时生产者没有成功发送消息所致,报错如下:

org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 0 ms.

我们发现配置里有一个很奇怪的参数

    config.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 0);

我们找到相关源码:

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {TopicPartition tp = null;try {throwIfProducerClosed();// first make sure the metadata for the topic is availableClusterAndWaitTime clusterAndWaitTime;try {// -------------------------maxBlockTimeMs最终会传入ProducerConfig.MAX_BLOCK_MS_CONFIG对应值0clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);.......  private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {....do {log.trace("Requesting metadata update for topic {}.", topic);metadata.add(topic);int version = metadata.requestUpdate();sender.wakeup();try {metadata.awaitUpdate(version, remainingWaitMs);} catch (TimeoutException ex) {// ------------------主要是在这里报错了------------// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMsthrow new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");}....

我们找到max.block.ms的作用

public static final String MAX_BLOCK_MS_CONFIG = "max.block.ms";private static final String MAX_BLOCK_MS_DOC = "The configuration controls how long <code>KafkaProducer.send()</code> and <code>KafkaProducer.partitionsFor()</code> will block."+ "These methods can be blocked either because the buffer is full or metadata unavailable."+ "Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.";

问题很清晰了, send时发现要更新metadata,更新metadata时0ms超时报错了!

故把该值改为1000即可

    config.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 1000);

深入思考这个问题

这个功能是由界面点击时触发的,但运营人员很少会去使用这个功能,故猜测太长时间没用,比如超过9分钟(connections.max.idle.ms), 导致连接关闭,下次send时需要重新拉取metadata导致,但实际是这样的吗?no

我们把connections.max.idle.ms=1000,期待1s后send失败,但实际上还是需要空闲好几分钟后send才会失败,说明connections.max.idle.ms这个参数不是报错的直接原因!

通过DEBUG日志, 我们发现超时连接断开时,会触发metadata的更新;当metadata.max.age.ms超时时也会触发metadate更新。我们再深入研究下去

调试时发现抛出报错的日志前面,有如下所示日志

[2021-08-25 22:28:15,039] [kafka-producer-network-thread | producer-1] [DEBUG] org.apache.kafka.clients.Metadata - Removing unused topic InnerProcess from the metadata list, expiryMs 1629901694499 now 1629901695039

这句话是在下面这个地方打印的

org.apache.kafka.clients.Metadata#updatepublic static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;public synchronized void update(Cluster newCluster, Set<String> unavailableTopics, long now) {....// producer默认为true,consumer默认为falseif (topicExpiryEnabled) {// Handle expiry of topics from the metadata refresh set.for (Iterator<Map.Entry<String, Long>> it = topics.entrySet().iterator(); it.hasNext(); ) {Map.Entry<String, Long> entry = it.next();long expireMs = entry.getValue();// 第一次需要立即更新为-1if (expireMs == TOPIC_EXPIRY_NEEDS_UPDATE)// 第一次设置了5分钟后失效entry.setValue(now + TOPIC_EXPIRY_MS);else if (expireMs <= now) {// 达到失效时间后,topic就会删除,下次send需要重新更新metadata了it.remove();log.debug("Removing unused topic {} from the metadata list, expiryMs {} now {}", entry.getKey(), expireMs, now);}}}.....

我们可以看到topic其实是设置了失效时间的,默认5分钟,当第一次调用send(),失效时间为-1,并且会触发更新metadata,即会修改为entry.setValue(now + TOPIC_EXPIRY_MS),5分钟后失效,但5分钟后不会立即失效,若等到9分钟连接断开时,会走到上面的代码:

  if (expireMs <= now) {// 达到失效时间后,topic就会删除,下次send需要重新更新metadata了it.remove();log.debug("Removing unused topic {} from the metadata list, expiryMs {} now {}", entry.getKey(), expireMs, now);}

即会将topic移除了,故9分钟后再次send时,需要更新metadata,但max.block.ms=0导致超时报错,即发送失败!

我们可以做这样的试验, 修改下面2个参数

        // 在topic失效之前停掉config.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 4 * 60 * 1000);// 10小时才更新一次metadataconfig.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 10 * 60 * 60 * 1000);

然后每大于5分钟的时间间隔,send一次

@Scheduled(fixedDelay = 9 * 60 * 1000 + 10000)public void sendKafkaASync() {kafkaService.sendKafkaASync();}

发现除了第一次失败后,就不会失败了!!!

因此,试验下来,是由于max.block.ms设置过小,导致更新metadata超时,导致send失败;更深入的:
1)首次发送没有topic数据,send调用时需要首先更新metadata时间不够,导致超时;
2)后面发送失败,是因为topic已超时,并在之后,connections.max.idle.ms或metadata.max.age.ms到时间触发metadata更新,而把超时的topic移除了。topic移除后,再进行send,有需要先更新metadata,进而还是会失败;

总体上而言,send的间隔不要超过5分钟最好,超过5分钟不调用send, 存在一定的可能导致topic被移除,从而下次send需要重新拉去metadata

附录:相关代码

pom.xml

<parent><artifactId>spring-boot-starter-parent</artifactId><groupId>org.springframework.boot</groupId><version>2.1.5.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><!-- logback to log4j2 --><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.5.RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version></dependency></dependencies>

log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<!-- status指定log4j本身的日志打印级别 -->
<!-- 配置文件发生修改,在monitorInterval时间即5s内,重新加载,无需重启 -->
<Configuration status="DEBUG" monitorInterval="5">.....<Loggers><Logger name="org.springframework" level="INFO" /><Logger name="com.ydfind" level="DEBUG" /><Logger name="org.apache.kafka.clients.consumer" level="WARN"/><Logger name="org.apache.kafka.clients.FetchSessionHandler" level="WARN" /><Logger name="org.apache.kafka.clients.producer.KafkaProducer" level="TRACE" /></Loggers>
</Configuration>

application.yml

logging:config: classpath:log4j2.xml

KafkaConfig.java

package com.ydfind.config;import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.web.context.request.FacesRequestAttributes;import java.awt.datatransfer.StringSelection;
import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaConfig {@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}private ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}private Map<String, Object> producerConfigs() {Map<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//
//        config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
//        config.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxx\" password=\"xxx\"");
//        config.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");config.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 0);
//        config.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 1 * 60 * 1000);
//        config.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 30 * 60 * 1000);//        config.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 100 * 60 * 1000);
//        config.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 1 * 60 * 1000);// ----貌似下面会10小时内都成功!!!!---------------------------// 4min时connect断掉,5min时topic失效,6min时send()成功,10min时把topic失效时间弄从15min!!!!!// 是send()导致entry.getValue() == TOPIC_EXPIRY_NEEDS_UPDATE,还是connect断开的更新topic导致呢!!!// topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE)-------------send时会把topic的失效时间弄成-1----相当于续命,不会失效// 在topic失效之前停掉config.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 4 * 60 * 1000);// 10小时才更新一次metadataconfig.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 10 * 60 * 60 * 1000);return config;}
}

KafkaService

package com.ydfind.service;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;@Component
@Slf4j
public class KafkaService {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public String sendKafkaSync() {ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send("InnerProcess", "sendKafkaSync");String msg = "";try {SendResult<String, String> stringStringSendResult = send.get();msg += stringStringSendResult.getRecordMetadata().offset();} catch (InterruptedException e) {e.printStackTrace();return e.getMessage();} catch (ExecutionException e) {e.printStackTrace();return e.getMessage();}return msg;}private AtomicInteger total = new AtomicInteger();private AtomicInteger success = new AtomicInteger();public void sendKafkaASync() {ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send("InnerProcess", "sendKafkaSync");send.completable().whenCompleteAsync((n, e) -> {total.getAndIncrement();if (null != e) {e.printStackTrace();} else {success.getAndIncrement();}log.info("\n-------------------total = {}, success = {}", total.get(), success.get());});}
}

MainApp.java

package com.ydfind;import com.ydfind.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@SpringBootApplication
@RestController
@EnableScheduling
public class MainApp {public static void main(String[] args) {SpringApplication.run(MainApp.class, args);}@Autowiredprivate KafkaService kafkaService;@GetMapping("sendKafka")public String sendKafka() {return kafkaService.sendKafkaSync();}@Scheduled(fixedDelay = 9 * 60 * 1000 + 10000)public void sendKafkaASync() {kafkaService.sendKafkaASync();}
}

线上问题-kafka生产者发送消息总是失败相关推荐

  1. Kafka生产者发送消息的三种方式

    Kafka是一种分布式的基于发布/订阅的消息系统,它的高吞吐量.灵活的offset是其它消息系统所没有的. Kafka发送消息主要有三种方式: 1.发送并忘记 2.同步发送 3.异步发送+回调函数 下 ...

  2. 10 kafka生产者发送消息的原理

    1.发送原理: 在消息发送的过程中,涉及到了两个线程--main 线程和 Sender 线程.在 main 线程 中创建了一个双端队列 RecordAccumulator.main 线程将消息发送给 ...

  3. Kafka实战 - 02 Kafka生产者发送消息至topic实现数据上报

    文章目录 1. 项目背景 2. 依赖和配置 3. 生产者配置 KafkaConfiguration 4. 同步数据Topic枚举 SyncDataTopicEnum 5. 请求体 DataSyncQo ...

  4. 【kafka系列】kafka之生产者发送消息实践

    目录 一.准备工作 二.终端命令 生产者命令 消费者命令 三.Java实践 搭建项目 异步发送-无回调 异步发送-有回调 同步发送 一.准备工作 进入实战之前先熟悉一下topic的相关命令,使用终端命 ...

  5. discard connection丢失数据_python kafka 生产者发送数据的三种方式

    python kafka 生产者发送数据的三种方式 发送方式 同步发送 发送数据耗时最长 有发送数据的状态,不会丢失数据,数据可靠性高 以同步的方式发送消息时,一条一条的发送,对每条消息返回的结果判断 ...

  6. 【源码篇】Kafka客户端发送消息

    在上一篇文章中,已经介绍了初始化 KafkaProducer 基本流程.当客户端对 KafkaProducer 完成完成后,可以调用 send() 方法将数据发送至kafka broker集群. 图中 ...

  7. 【Kafka生产者发消息流程】

    发送流程 首先生产者调用send方法发送消息后,会先经过拦截器,接着进入序列化器.序列化器主要用于对消息的Key和Value进行序列化.接着进入分区器选择消息的分区. 上面这几步完成之后,消息会进入到 ...

  8. RocketMQ3.2.2生产者发送消息自动创建Topic队列数无法超过4个

    问题现象 RocketMQ3.2.2版本,测试时尝试发送消息时自动创建Topic,设置了队列数量为8: producer.setDefaultTopicQueueNums(8); 同时设置broker ...

  9. 生产者发送消息的过程?

    1.Producer先连接到Broker,建立连接Connection,开启一个信道(Channel). 2.Producer声明一个交换器并设置好相关属性. 3.Producer声明一个队列并设置好 ...

最新文章

  1. python组件的react实现_React-Router动态路由设计最佳实践
  2. Spring Boot OAuth 2.0 客户端
  3. java根据pdf模板生成pdf_Java 复制、压缩PDF文档
  4. (文档挂起)打印机为什么打印失败?
  5. C#实现中国天气网XML接口测试
  6. Java IO ---学习笔记(数据流)
  7. react18并发渲染
  8. 学习3ds max插件开发过程中的一些小结
  9. CodeForces 746D Green and Black Tea 有坑
  10. 计算机无法共享打印机共享的打印机驱动,打印机不能共享_打印机不能共享怎么办?-太平洋IT百科...
  11. 麦氏细菌浊度分析仪的校准物质选择
  12. ios学习之模仿韩寒one.一个UI
  13. 工欲善其事必先利其器(一) —— VScode
  14. 项目csv文件 利用Excel分列功能 求和
  15. 计算机数学基础 周密,一位计算机牛人的心得谈计算机和数学免费.doc
  16. 阿里云大数据ACP(一)大数据开发平台 DataWorks
  17. java课程设计计算器 uml简图,计算器的用例建模
  18. 查看Oracle数据库版本号
  19. davinci可钻取图形
  20. 搞一下 AP AUTOSAR 原理及实战 | 01 AP AUTOSAR 设计思想及原理

热门文章

  1. SpringMVC 异常处理(简单异常处理器 SimpleMappingExceptionResolver;自定义异常处理需要实现HandlerExceptionResolver接口)
  2. 拓展训练之感——自己写的第一篇感想
  3. 利用VB脚本来设置Windows计划任务
  4. 一文掌握如何轻松稿定项目风险管理【静说】
  5. 【面试01】网络工程师面试
  6. python将电视剧按收视率进行排序_用 Python 检测国产电视剧后,发现了各位演员的真实水平.........
  7. SVD奇异值分解(标题重复率过高)
  8. 2022年年货节买什么好?2022年数码类好物推荐
  9. SSH 服务器拒绝了密码, 请再试一次
  10. 公交车之设计,堵车看模式