线上某服务 A 调用服务 B 接口完成一次交易,一次晚上的生产变更之后,系统监控发现服务 B 接口频繁超时,后续甚至返回线程池耗尽错误 Thread pool is EXHAUSTED。因为服务 B 依赖外部接口,刚开始误以为外部接口延时导致,所以临时增加服务 B dubbo 线程池线程数量。配置变更之后,重启服务,服务恢复正常。一段时间之后,服务 B 再次返回线程池耗尽错误。这次深入排查问题之后,才发现 Kafka 异步发送消息阻塞了 dubbo 线程,从而导致调用超时。

一、问题分析

Dubbo 2.6.5,Kafak maven 0.8.0-beta1

服务 A 调用服务 B,收到如下错误:

2019-08-30 09:14:52,311 WARN method [%f [DUBBO] Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-xxxx, Pool Size: 1000 (active: 1000, core: 1000, max: 1000, largest: 1000), Task: 6491 (completed: 5491), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://xxxx!, dubbo version: 2.6.0, current host: 127.0.0.1

可以看到当前 dubbo 线程池已经满载运行,不能再接受新的调用。正常情况下 dubbo 线程可以很快完成任务,然后归还到线程池中。由于线程执行的任务发生阻塞,消费者端调用超时。而服务提供者端由于已有线程被阻塞,线程池必须不断创建新线程处理任务,直到线程数量达到最大数量,系统返回 Thread pool is EXHAUSTED

线程任务长时间被阻塞可能原因有:

  • 频繁的 fullgc,导致系统暂停。
  • 调用某些阻塞 API,如 socket 连接未设置超时时间导致阻塞。
  • 系统内部死锁

通过分析系统堆栈 dump 情况,果然发现所有 dubbo 线程都处于 WATTING 状态。

下图为应用堆栈 dump 日志:

从堆栈日志可以看到 dubbo 线程最后阻塞在 LinkedBlockingQueue#put ,而该阻塞发生在 Kafka 发送消息方法内。

这里服务 B 需要使用 Kafka 发送监控消息,为了消息发送不影响主业务,这里使用 Kafka 异步发送消息。由于 Kafka 服务端最近更换了对外的端口,而服务 B Kafka 配置未及时变更。最后服务 B 修改配置,服务重新启动,该问题得以解决。

二、Kafka 异步模式

下面分析 Kafka 异步发送消息阻塞的实际原因。

0.8.0 Kafka 默认使用同步模式发送消息,异步发送消息需要设置producer.type=async属性。同步模式需要等待 Kafka 将消息发送到消息队列,这个过程当然会阻塞主线程。而异步模式最大的优点在于无需要等待 Kafka 这个发送过程。

原本认为这里的异步是使用子线程去运行任务,但是 Kafka 异步模式并非这样。查看 Kafka 官方文档producer,可以看到对异步模式描述。

Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer has an asynchronous mode that accumulates data in memory and sends out larger batches in a single request. The batching can be configured to accumulate no more than a fixed number of messages and to wait no longer than some fixed latency bound (say 100 messages or 5 seconds). This allows the accumulation of more bytes to send, and few larger I/O operations on the servers. Since this buffering happens in the client it obviously reduces the durability as any data buffered in memory and not yet sent will be lost in the event of a producer crash.

从上我们可以看到,Kafka 异步模式将会把多条消息打包一块批量发送到服务端。这种模式将会先把消息放到内存队列中,直到消息到达一定数量(默认为 200)或者等待时间超限(默认为 5000ms)。

这么做最大好处在于提高消息发送的吞吐量,减少网络 I/O。当然这么做也存在明显劣势,如果生产者宕机,在内存中还未发送消息可能就会丢失。

下面从 kafka 源码分析这个阻塞过程。

三、Kafka 源码解析

Kafka 消息发送端采用如下配置:

Properties props = new Properties();props.put("metadata.broker.list", "localhost:9092");// 选择异步发送props.put("producer.type", "async");props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("queue.buffering.max.messages","1");props.put("batch.num.messages","1");Producer<Integer, String> producer= new Producer(new ProducerConfig(props));producer.send(new KeyedMessage("test", "hello world"));

这里设置 producer.type=async,从而使 Kafka 异步发送消息。

send 方法源码如下

ps: 这个版本 Kafka 源码采用 Scala 编写,不过源码还是比较简单,比较容易阅读。

def send(messages: KeyedMessage[K,V]*) {if (hasShutdown.get)throw new ProducerClosedExceptionrecordStats(messages)sync match {case true => eventHandler.handle(messages)// 由于  producer.type=async 异步发送case false => asyncSend(messages)}}

由于我们上面设置 producer.type=async,这里将会使用 asyncSend 异步发送模式。

asyncSend 源码如下

private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {for (message <- messages) {val added = config.queueEnqueueTimeoutMs match {case 0  =>queue.offer(message)case _  =>try {config.queueEnqueueTimeoutMs < 0 match {case true =>queue.put(message)truecase _ =>queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)}}catch {case e: InterruptedException =>false}}if(!added) {producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)}else {trace("Added to send queue an event: " + message.toString)trace("Remaining queue size: " + queue.remainingCapacity)}}}

asyncSend 将会把消息加入到 LinkedBlockingQueue 阻塞队列中。这里根据 config.queueEnqueueTimeoutMs参数使用不同方法。

config.queueEnqueueTimeoutMs=0,将会调用 LinkedBlockingQueue#offer,如果该队列未满,将会把元素插入队列队尾。如果队列未满,直接返回 false。所以如果此时队列已满,消息不再会加入队列中,然后 asyncSend 将会抛出 QueueFullException 异常。

config.queueEnqueueTimeoutMs < 0,将会调用 LinkedBlockingQueue#put 加入元素,如果该队列已满,该方法将会一直被阻塞直到队列存在可用空间。

config.queueEnqueueTimeoutMs > 0,将会调用 LinkedBlockingQueue#offer,这里与上面不同之处在于设置超时时间,如果队列已满将会阻塞知道超时。

config.queueEnqueueTimeoutMs参数通过 queue.enqueue.timeout.ms 配置生效,默认为 -1。默认情况下 LinkedBlockingQueue 最大数量为 10000,可以通过设置 queue.buffering.max.messages 改变队列最大值。

消息放到队列中后,Kafka 将会使用一个异步线程不断从队列中获取消息,批量发送消息。

异步处理消息代码如下

private def processEvents() {var lastSend = SystemTime.millisecondsvar events = new ArrayBuffer[KeyedMessage[K,V]]var full: Boolean = false// drain the queue until you get a shutdown commandStream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS)).takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {currentQueueItem =>val elapsed = (SystemTime.milliseconds - lastSend)// check if the queue time is reached. This happens when the poll method above returns after a timeout and// returns a null objectval expired = currentQueueItem == nullif(currentQueueItem != null) {trace("Dequeued item for topic %s, partition key: %s, data: %s".format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))events += currentQueueItem}// check if the batch size is reachedfull = events.size >= batchSizeif(full || expired) {if(expired)debug(elapsed + " ms elapsed. Queue time reached. Sending..")if(full)debug("Batch full. Sending..")// if either queue time has reached or batch size has reached, dispatch to event handlertryToHandle(events)lastSend = SystemTime.millisecondsevents = new ArrayBuffer[KeyedMessage[K,V]]}}// send the last batch of eventstryToHandle(events)if(queue.size > 0)throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue".format(queue.size))}

这里异步线程将会不断从队列中获取任务,一旦条件满足,就会批量发送任务。该条件为:

  1. 批量消息数量达到 200,可以设置 batch.num.messages 参数改变配置。
  2. 等待时间到达最大的超时时间,默认为 5000ms,可以设置 queue.buffering.max.ms 改变改配置。

四、问题解决办法

上面问题虽然通过更换 Kafka 正确地址解决,但是为了预防下次该问题再发生,可以采用如下方案:

  1. 改变 config.queueEnqueueTimeoutMs默认配置,像这种系统监控日志允许丢失,所以可以设置 config.queueEnqueueTimeoutMs=0
  2. 升级 Kafka 版本,最新版本 Kafka 使用 Java 重写发送端逻辑,不再使用阻塞队列存储消息。

本文首发于:studyidea.cn/kafka…

欢迎关注我的公众号:程序通事,获得日常干货推送。如果您对我的专题内容感兴趣,也可以关注我的博客:studyidea.cn

kafka实现异步发送_Kafka 异步消息也会阻塞?记一次 Dubbo 频繁超时排查过程相关推荐

  1. kafka实现异步发送_Kafka Producer 异步发送消息居然也会阻塞?

    Kafka 一直以来都以高吞吐量的特性而家喻户晓,就在上周,在一个性能监控项目中,需要使用到 Kafka 传输海量消息,在这过程中遇到了一个 Kafka Producer 异步发送消息会被阻塞的问题, ...

  2. KAFKA SpringBoot2 Nacos 消息异步发送和消费消息(进阶篇)

    文章目录 一.基础集成 1. 技术选型 2. 导入依赖 3. kafka配置 4. auto-offset-reset 简述 5. 新增一个订单类 6. 生产者(异步) 7. 消费者 8. kafka ...

  3. 异步发送,那消息可靠性怎么保证?

    消息丢失可能发生在生产者发送消息.MQ本身丢失消息.消费者丢失消息3个方面. 生产者丢失 生产者丢失消息的可能点在于程序发送失败抛异常了没有重试处理,或者发送的过程成功但是过程中网络闪断MQ没收到,消 ...

  4. 消息的同步发送,异步发送以及消息发送的可靠性

    最近写的一个通信框架中有两种最基本的消息发送方式:同步发送和异步发送. 同步方式: 消息的发送方发A送一条消息到接收端B,B收到消息之后需要对消息进行处理,然后发送ACK确认消息回A,A收到B的ACK ...

  5. kafka 脚本发送_NWPC消息平台:在ecFlow系统中发送产品事件消息

    本文属于介绍 NWPC 消息平台 系列文章. 本文介绍如何在基于 ecFlow 构建的数值预报业务系统中发送 NWPC 消息平台的 产品事件消息. 介绍 数值预报业务系统产品制作一般分为三个步骤: 监 ...

  6. kafka 异步发送阻塞_Kafka学习一

    一.github下载kafka的源码 可以看到kafka的源码开源社区是非常活跃的. 二.搭建kafka环境 构建kafka环境,首先需要安装Scala和gradle,再安装的scala插件需要和你的 ...

  7. kafka之Producer同步与异步消息发送及事务幂等性案例应用实战

    本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客.版权声明:本套Spark商业应用实战归作者(秦凯新)所有,禁止转载,欢迎学习. 秦凯新的技术社区 ...

  8. Kafka生产者——消息发送流程,同步、异步发送API

    生产者消息发送流程 发送原理 Kafka的Producer发送消息采用的是异步发送的方式. 在消息发送的过程中,涉及到了两个线程:main线程和Sender线程,以及一个线程共享变量:RecordAc ...

  9. 【Kafka】消息的同步发送和异步发送

    文章目录 概述 1. sync vs async 1.1 java代码同步和异步 2. 可靠性机制(ack属性配置) 2.1 oneway 3. 一般配置 4. 同步异步和ack的联系和区别 参考 概 ...

最新文章

  1. Dvtm -- 平铺式终端
  2. (转)搜索Maven仓库 获取 groupid artifactId
  3. 【Qt】数据库实战之QSqlQueryModel
  4. 一句话讲清楚IOC容器以及什么时候使用@Autowired
  5. Android之从网络上获取图片的两种方式讲解:thread+handle和AsyncTask方式
  6. 【HDU - 5452】Minimum Cut(树形dp 或 最近公共祖先lca+树上差分,转化tricks,思维)
  7. load和loads的区别
  8. 第五周 项目二 建立链栈算法库
  9. 使用自然语言处理来检测电子邮件中的垃圾邮件
  10. 力扣(98.107)补9.10
  11. Java Semaphore实现高并发场景下的流量控制(附源码) | 实用代码架构
  12. 工控 组态王6.60 SP3软件7.5 SP4 授权软件狗 USB 硬件狗 分享 下载
  13. uva 10014 Simple calculations(数学推导)
  14. TP-LINK TL-WR720N 客户端模式设置
  15. DEV C++编写程序出现 [errror]Id returned 1 exit status报错可能出现的原因及解决办法
  16. 工业互联网问题分析——工业(工控)系统信息安全专题培训心得
  17. JAVA递归函数示例 - 阶乘计算
  18. js 事件阻止传播方法,准确定位事件源
  19. 查看Windows版本信息
  20. linux读取pts内容,linux pts/0的含义

热门文章

  1. 快速排序的递归和非递归实现 c语言版本
  2. bert速度提升fastbert
  3. 完全理解Python迭代对象、迭代器、生成器
  4. jieba分词流程及部分源码解读(一)
  5. https://blog.csdn.net/blmoistawinde/article/details/84329103
  6. 汇编语言中将数据、代码、栈放入不同的段
  7. 车载网络处理器带来多功能能力
  8. 什么是智能医院?如何构建自己的解决方案?
  9. 基于NVIDIA GPUs的深度学习训练新优化
  10. 多篇开源CVPR 2020 语义分割论文