近日我们项目组采用 Kafka 来做系统日志统一管理,但是天降横祸的让 Kafka 集群(3台服务器)都挂了,堪比中大奖的节奏,随之而来的是使用 Kafka 发送消息日志的服务全部卡死,经过排查发现居然是 Kafka 当机导致了调用 Kafka 发送日志服务一直处于阻塞状态。最后我们在检查代码的时候发现,如果无法连接 Kafka 服务,则会出现一分钟的阻塞。以上问题有两种解决方案:

一、开启异步模式 ( @EnableAsync )

@EnableAsync
@Configuration
public class KafkaProducerConfig {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);@Value("${kafka.brokers}")private String servers;@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);return props;}@Beanpublic ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) {return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper));}@Beanpublic KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) {return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper));}@Beanpublic Producer producer() {return new Producer();}
}
public class Producer {public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);@Autowiredprivate KafkaTemplate<String, GenericMessage> kafkaTemplate;@Asyncpublic void send(String topic, GenericMessage message) {ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message);future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() {@Overridepublic void onSuccess(final SendResult<String, GenericMessage> message) {LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());}@Overridepublic void onFailure(final Throwable throwable) {LOGGER.error("unable to send message= " + message, throwable);}});}
}

二、如果使用同步模式,可以通过修改配置参数 MAX_BLOCK_MS_CONFIG ( max.block.ms / 默认 60s ) 来缩短阻塞时间

package com.havent.demo.logger.config;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.scheduling.annotation.EnableAsync;import java.util.HashMap;
import java.util.Map;@EnableAsync
@Configuration
@EnableKafka
public class KafkaConfiguration {@Value("${spring.kafka.producer.bootstrap-servers}")private String serverAddress;public Map<String, Object> producerConfigs() {System.out.println("HH > serverAddress: " + serverAddress);Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverAddress);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// 如果请求失败,生产者会自动重试,我们指定是0次,如果启用重试,则会有重复消息的可能性props.put(ProducerConfig.RETRIES_CONFIG, 0);// Request发送请求,即Batch批处理,以减少请求次数,该值即为每次批处理的大小props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);/*** 这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。这类似于TCP的算法,例如上面的代码段,* 可能100条消息在一个请求发送,因为我们设置了linger(逗留)时间为1毫秒,然后,如果我们没有填满缓冲区,* 这个设置将增加1毫秒的延迟请求以等待更多的消息。 需要注意的是,在高负载下,相近的时间一般也会组成批,即使是* linger.ms=0。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的,更有效的请求。*/props.put(ProducerConfig.LINGER_MS_CONFIG, 2000);/*** 控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。* 当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定, 之后它将抛出一个TimeoutException。*/props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);// 用于配置send数据或partitionFor函数得到对应的leader时,最大的等待时间,默认值为60秒// HH 警告:如无法连接 kafka 会导致程序卡住,尽量不要设置等待太久props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 100);// 消息发送的最长等待时间props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 100);// 0:不保证消息的到达确认,只管发送,低延迟但是会出现消息的丢失,在某个server失败的情况下,有点像TCP// 1:发送消息,并会等待leader 收到确认后,一定的可靠性// -1:发送消息,等待leader收到确认,并进行复制操作后,才返回,最高的可靠性props.put(ProducerConfig.ACKS_CONFIG, "0");System.out.println(props);return props;}public ProducerFactory<String, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());return kafkaTemplate;}}

谨以此献给那些被 Spring Kafka 同步模式坑害又苦无出路的同胞!

Spring Boot + Spring-Kafka 异步配置相关推荐

  1. Spring boot 项目Kafka Error connecting to node xxx:xxx Kafka项目启动异常 Failed to construct kafka consumer

    Spring boot 项目Kafka Error connecting to node xxx:xxx Spring boot Kafka项目启动异常 新建了一个springBoot集成Kafka的 ...

  2. Spring Boot 中的异步调用

    Spring Boot 中的异步调用 通常我们开发的程序都是同步调用的,即程序按照代码的顺序一行一行的逐步往下执行,每一行代码都必须等待上一行代码执行完毕才能开始执行.而异步编程则没有这个限制,代码的 ...

  3. Spring Security 实战:Spring Boot 下的自动配置

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 来源 | 公众号「码农小胖哥」 1. 前言 我们在前几篇 ...

  4. boot spring 接口接收数据_在 Spring Boot 中使用 Dataway 配置数据查询接口

    Dataway介绍 Dataway 是基于 DataQL 服务聚合能力,为应用提供的一个接口配置工具.使得使用者无需开发任何代码就配置一个满足需求的接口. 整个接口配置.测试.冒烟.发布.一站式都通过 ...

  5. spring boot通过命令行配置属性

    spring boot通过命令行配置属性 命令:java -jar xxx.jar --server.port=8888,通过使用–-server.port属性来设置xxx.jar应用的端口为8888 ...

  6. springboot 读取配置文件_使用 @ConfigurationProperties 在 Spring Boot 中加载配置

    本文地址: 使用 @ConfigurationProperties 在 Spring Boot 中加载配置 使用 Spring Boot 加载配置文件的配置非常便利,我们只需要使用一些注解配置一下就能 ...

  7. Spring Boot 2.0 的配置详解(图文教程)

    本文来自作者 泥瓦匠 @ bysocket.com 在 GitChat 上分享 「Spring Boot 2.0 的配置详解(图文教程)」 编辑 | 哈比 Spring Boot 配置,包括自动配置和 ...

  8. Spring Boot Validation提示信息国际化配置

    引言 之前介绍过Spring Boot Validation的使用及扩展,可参见:<SpringBoot Validation> 本文在此基础上重点讲解下Spring Boot Valid ...

  9. Spring Boot 灵活实现自动配置背后的故事~用起来更香了

    最近一直忙着在做新应届生的员工技术培训和面试 ,培训的则是Spring Boot部分的内容,这部分也是面试常问的点,于是想到了各位读者大大,特地的把内容分享大家一份. 不知道大家第一次搭Spring ...

  10. Spring Boot如何实现异步执行任务

    所谓异步任务,其实就是异步执行程序,有些时候遇到一些耗时的的任务,如果一直卡等待,肯定会影响其他程序的执行,所以就让这些程序需要以异步的方式去执行.那么下面就来介绍Spring Boot 如何实现异步 ...

最新文章

  1. 想转行?零基础该如何学Python?这些一定要明白
  2. 5G年终盘点:2018,意难平
  3. kali之metasploit基本使用
  4. 1标志图片_这四种情况将不再扣分罚款!11月起,全国高速统一限速标志
  5. 代码评审会议_如何将电话会议(和访问代码)另存为联系人
  6. [汇编语言]实验:更灵活的寻址方式 -应用si和di
  7. android9.0原生字体,iOS 9原生字体看腻了?不如学着去替换吧
  8. 《Python入门到精通》Python基础语法
  9. [ios]ios读写文件本地数据
  10. [ 安装 ] Hadoop安装步骤!
  11. c语言仿宋gb2312字体,仿宋gb2312字体官方下载|仿宋gb2312字体下载官方版 - 维维软件园...
  12. Mac系统内存越来越大?Mac内存清理技巧
  13. 《人生七年》纪录片总结
  14. 网页设计 颜色搭配
  15. 谷粒商城P46 gulimall-gateway刷新验证码出现503错误
  16. TryHackMe - Thompson靶场
  17. Unity学习推荐书籍
  18. #笔记(三十二)#dvwa漏洞wp
  19. 人工智能/虚拟现实技术的工程伦理分析:以电影《头号玩家》为例
  20. 【MySQL】测试题01

热门文章

  1. 1. Browser 对象 - Window 对象
  2. 15. Magento路由分发过程解析(四):请求重写
  3. mysql 双1设置_2020-10-15:mysql的双1设置是什么?
  4. css中的background的几个属性(background-attachment/background-origin,background-clip等)
  5. K8S 通过 yaml 文件创建资源
  6. Struts2 中的值栈的理解
  7. 基于C#的MongoDB数据库开发应用(2)--MongoDB数据库的C#开发
  8. HttpClient 学习整理【转】
  9. SQL WITH AS
  10. Qt - QVariant