前段时间的RabbitMQ broker服务端由于某个队列一直积压消息,运维在凌晨对mq服务端机器pod进行了扩容,重启了RabbitMQ,然后早上发现自己的服务在mq重启之后一直报异常,停止消费了,导致影响了业务的运行,虽然mq重启成功了但是消费者却没有重连成功。本节会通过分析spring-rabbit的源码,来分析问题出现的原因以及解决办法。

目录

一、出现的问题

二、spring-rabbit消费源码分析

三、解决消费者停止消费问题


一、出现的问题

先看下报了什么异常,这里挑了一些主要的异常堆栈贴出来

o.s.a.r.l.BlockingQueueConsumer - Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException
Failed to declare queue(s):[work_queue]

...............................................

Consumer received fatal=false exception on startup
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.handleDeclarationException(BlockingQueueConsumer.java:661)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:601)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:581)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1196)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1041)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[work_queue]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:710)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:594)
    ... 4 common frames omitted
Caused by: java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:1006)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:52)
    at sun.reflect.GeneratedMethodAccessor175.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1110)
    at com.sun.proxy.$Proxy285.queueDeclarePassive(Unknown Source)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:689)
    ... 5 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'work_queue' in vhost 'work_platform', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138)
    ... 13 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'work_queue' in vhost 'work_platform', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:516)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:178)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:111)
    at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:670)
    at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:597)
    ... 1 common frames omitted

...........................

o.s.a.r.l.SimpleMessageListenerContainer  message:  Stopping container from aborted consumer

我们挑里面的几个主要的错误信息:

  1. Failed to declare queue(s):[work_queue]
  2. Consumer received fatal=false exception on startup org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
  3. Stopping container from aborted consumer

二、spring-rabbit消费源码分析

打开spring-rabbit的源码发现Consumer received fatal=false exception on startup异常在SimpleMessageListenerContainer类中的子类AsyncMessageProcessingConsumer的run()方法中。

下面简单分析一下spring-rabbit的消费者源码流程:

spring-rabbit会为我们的每个消费者(可能消费一个或者多个队列)创建一个SimpleMessageListenerContainer对象,SimpleMessageListenerContainer继承了AsyncMessageProcessingConsumer,而AsyncMessageProcessingConsumer又实现了Lifecycle接口的start()方法,启动时会调用start()方法。

AsyncMessageProcessingConsumer的start()方法如下:

 /*** Start this container.* @see #doStart*/@Overridepublic void start() {if (isRunning()) {return;}if (!this.initialized) {synchronized (this.lifecycleMonitor) {if (!this.initialized) {afterPropertiesSet();}}}try {if (logger.isDebugEnabled()) {logger.debug("Starting Rabbit listener container.");}configureAdminIfNeeded();checkMismatchedQueues();doStart();}catch (Exception ex) {throw convertRabbitAccessException(ex);}}

start()方法会调用doStart()方法

/*** Start this container, and notify all invoker tasks.*/
protected void doStart() {// Reschedule paused tasks, if any.synchronized (this.lifecycleMonitor) {this.active = true;this.running = true;this.lifecycleMonitor.notifyAll();}
}

实际上调用的是子类SimpleMessageListenerContainer中的doStart方法,

 /*** Re-initializes this container's Rabbit message consumers, if not initialized already. Then submits each consumer* to this container's task executor.*/
@Override
protected void doStart() {checkListenerContainerAware();super.doStart();synchronized (this.consumersMonitor) {if (this.consumers != null) {throw new IllegalStateException("A stopped container should not have consumers");}int newConsumers = initializeConsumers();if (this.consumers == null) {logger.info("Consumers were initialized and then cleared " +"(presumably the container was stopped concurrently)");return;}if (newConsumers <= 0) {if (logger.isInfoEnabled()) {logger.info("Consumers are already running");}return;}Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();for (BlockingQueueConsumer consumer : this.consumers) {AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);processors.add(processor);getTaskExecutor().execute(processor);if (getApplicationEventPublisher() != null) {getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));}}waitForConsumersToStart(processors);}
}

里面的initializeConsumers方法根据我们配置的消费者线程数量concurrentConsumers创建对应数量的消费者,实际的消费逻辑都在BlockingQueueConsumer中。

然后循环遍历BlockingQueueConsumer集合,将每个BlockingQueueConsumer包装创建一个AsyncMessageProcessingConsumer(实现了Runnable接口)。

getTaskExecutor().execute(processor)获取线程池执行创建的线程任务,然后发布了一个AsyncConsumerStartedEvent事件。

protected int initializeConsumers() {int count = 0;synchronized (this.consumersMonitor) {if (this.consumers == null) {this.cancellationLock.reset();this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);for (int i = 0; i < this.concurrentConsumers; i++) {BlockingQueueConsumer consumer = createBlockingQueueConsumer();this.consumers.add(consumer);count++;}}}return count;
}

下面看下消费的最核心逻辑,也就是AsyncMessageProcessingConsumer实现的run方法:

我们看到try catch代码块中有一个while循环中有一个mainLoop()循环,mainLoop方法中就是拉取消息的逻辑,可以看到catch了很多的异常。在上面定义了一个boolean类型的变量aborted默认false,在catch到的有些异常当中将aborted改为了true,aborted这个变量的值直接决定了下面在killOrRestart()方法中的处理逻辑。

看下前面抛出的QueuesNotAvailableException异常的catch逻辑,会判断missingQueuesFatal属性是否为true,如果missingQueuesFatal为true会将aborted改成了true,然后调用了publishConsumerFailedEvent方法。

missingQueuesFatal属性默认为true,表示队列出现异常(队列不可用/被删除)的时候是否失败,如果失败的话消费者容器(MessageListenerContainer)不会自动进行重启,如果为false则表示队列出现异常时会自动重启消费容器。将missingQueuesFatal改成false也是一种解决消费者不消费的解决办法。

spring.rabbitmq.listener.simple.missing-queues-fatal=true

protected boolean isMissingQueuesFatal() {return this.missingQueuesFatal;
}
......
catch (QueuesNotAvailableException ex) {logger.error("Consumer received fatal=" + isMismatchedQueuesFatal() + " exception on startup", ex);if (isMissingQueuesFatal()) {this.startupException = ex;// Fatal, but no point re-throwing, so just abort.aborted = true;}publishConsumerFailedEvent("Consumer queue(s) not available", aborted, ex);
}

publishConsumerFailedEvent方法中传入了aborted(此时为true),即fatal的值为true,abortEvents(BlockingQueue)添加了一个ListenerContainerConsumerFailedEvent事件(fatal此时为true)。

private final BlockingQueue<ListenerContainerConsumerFailedEvent> abortEvents = new LinkedBlockingQueue<>();
......
protected void publishConsumerFailedEvent(String reason, boolean fatal, Throwable t) {if (!fatal || !isRunning()) {super.publishConsumerFailedEvent(reason, fatal, t);}else {try {this.abortEvents.put(new ListenerContainerConsumerFailedEvent(this, reason, t, fatal));}catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}

killOrRestart()方法:关键的地方都写了注释,可以看到aborted的值是true的时候,会从阻塞队列abortEvents中获取ListenerContainerConsumerFailedEvent事件,并且广播该事件;如果aborted为false的时候会调用restart方法重启消费者容器。

注意:这里当前面的catch的异常中将aborted改成了true或者消费者已经关闭的状态下,消费者容器不会自动重启,仅仅是发布了一个ListenerContainerConsumerFailedEvent广播事件,其他情况下消费者会自动重启。

        我们针对消费者停止消费的处理逻辑也就可以从ListenerContainerConsumerFailedEvent广播事件入手。

private void killOrRestart(boolean aborted) {//判断consumer是关闭状态 || aborted==trueif (!isActive(this.consumer) || aborted) {logger.debug("Cancelling " + this.consumer);try {this.consumer.stop();SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);if (getApplicationEventPublisher() != null) {getApplicationEventPublisher().publishEvent(new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));}}catch (AmqpException e) {logger.info("Could not cancel message consumer", e);}if (aborted && SimpleMessageListenerContainer.this.containerStoppingForAbort.compareAndSet(null, Thread.currentThread())) {logger.error("Stopping container from aborted consumer");stop();SimpleMessageListenerContainer.this.containerStoppingForAbort.set(null);ListenerContainerConsumerFailedEvent event = null;do {try {//从阻塞队列abortEvents中获取ListenerContainerConsumerFailedEvent事件event = SimpleMessageListenerContainer.this.abortEvents.poll(ABORT_EVENT_WAIT_SECONDS,TimeUnit.SECONDS);if (event != null) {//如果ListenerContainerConsumerFailedEvent不为空,发布广播该事件SimpleMessageListenerContainer.this.publishConsumerFailedEvent(event.getReason(), event.isFatal(), event.getThrowable());}}catch (InterruptedException e) {Thread.currentThread().interrupt();}}while (event != null);}}else {logger.info("Restarting " + this.consumer);//调用restart方法重启消费者容器restart(this.consumer);}
}

restart方法会重新创建消费者,发布消费者重启事件AsyncConsumerRestartedEvent,通过线程池执行AsyncMessageProcessingConsumer任务。

private void restart(BlockingQueueConsumer oldConsumer) {BlockingQueueConsumer consumer = oldConsumer;synchronized (this.consumersMonitor) {if (this.consumers != null) {try {// Need to recycle the channel in this consumerconsumer.stop();// Ensure consumer counts are correct (another is going// to start because of the exception, but// we haven't counted down yet)this.cancellationLock.release(consumer);this.consumers.remove(consumer);if (!isActive()) {// Do not restart - container is stoppingreturn;}//重新创建消费者BlockingQueueConsumerBlockingQueueConsumer newConsumer = createBlockingQueueConsumer();newConsumer.setBackOffExecution(consumer.getBackOffExecution());consumer = newConsumer;this.consumers.add(consumer);if (getApplicationEventPublisher() != null) {//发布消费者重启事件AsyncConsumerRestartedEventgetApplicationEventPublisher().publishEvent(new AsyncConsumerRestartedEvent(this, oldConsumer, newConsumer));}}catch (RuntimeException e) {logger.warn("Consumer failed irretrievably on restart. " + e.getClass() + ": " + e.getMessage());// Re-throw and have it logged properly by the caller.throw e;}//通过线程池异步执行任务AsyncMessageProcessingConsumergetTaskExecutor().execute(new AsyncMessageProcessingConsumer(consumer));}}
}

三、解决消费者停止消费问题

通过上面的源码分析,我们得知消费出现某些异常(例如QueuesNotAvailableException)的时候会发布一个ListenerContainerConsumerFailedEvent事件,我们可以监听这个事件重启消费者容器。

spring中跟RabbitMQ相关的事件是AmqpEvent的子类

spring通过发布事件的方式,可以通知观察者(即事件监听器)消费者的一些行为,消费者相关的事件如下所示:

  • AsyncConsumerStartedEvent:一个新的消费者启动事件
  • AsyncConsumerStoppedEvent:一个消费者停止事件
  • AsyncConsumerRestartedEvent:一个消费者重启事件
  • ListenerContainerConsumerFailedEvent:一个消息监听器消费失败的事件

我们可以监听ListenerContainerConsumerFailedEvent事件,其定义如下所示:其中有一个属性fatal,我们上面也提到过,fatal为true时表示消费者出现了致命的错误,此时消费者不会自动重试进行重新启动,需要我们在事件处理逻辑中进行重启。当fatal为false时,我们可以忽略该事件,消费者容器中会自动重试启动。

public class ListenerContainerConsumerFailedEvent extends AmqpEvent {private final String reason;private final boolean fatal;private final Throwable throwable;
}

处理逻辑代码:判断event的fatal是true时,先判断container是否在运行,如果没有在运行则调用start进行启动,然后发送告警信息。

import java.util.Arrays;
import org.springframework.amqp.rabbit.listener.ListenerContainerConsumerFailedEvent;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import lombok.extern.slf4j.Slf4j;@Slf4j
@Component
public class ListenerContainerConsumerFailedEventListener implements ApplicationListener<ListenerContainerConsumerFailedEvent> {@Overridepublic void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {log.error("消费者失败事件发生:{}", event);if (event.isFatal()) {log.error("Stopping container from aborted consumer. Reason::{}", event.getReason(), event.getThrowable());SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource();String queueNames = Arrays.toString(container.getQueueNames());try {try {Thread.sleep(30000);} catch (Exception e) {log.error(e.getMessage());}//判断此时消费者容器是否正在运行Assert.state(!container.isRunning(), String.format("监听容器%s正在运行!", container));//消费者容器没有在运行时,进行启动container.start();log.info("重启队列{}的监听成功", queueNames);} catch (Exception e) {log.error("重启队列{}的监听失败", queueNames, e);}// TODO 短信/邮件/钉钉...告警,包含队列信息,监听断开原因,断开时异常信息,重启是否成功等...}}
}

        另外,也可以将missingQueuesFatal改成false,可以在抛出QueuesNotAvailableException异常时不改变aborted的值,这样在killOrRestart方法中就会自动自动调用重启的方法,但是这种处理方式仅限于QueuesNotAvailableException异常,不像上面的处理方式具有通用性。

spring.rabbitmq.listener.simple.missing-queues-fatal=false

从源码角度分析RabbitMQ重启后,消费者停止消费怎么解决相关推荐

  1. 【Android 插件化】Hook 插件化框架 ( 从源码角度分析加载资源流程 | Hook 点选择 | 资源冲突解决方案 )

    Android 插件化系列文章目录 [Android 插件化]插件化简介 ( 组件化与插件化 ) [Android 插件化]插件化原理 ( JVM 内存数据 | 类加载流程 ) [Android 插件 ...

  2. Mybatis底层原理学习(二):从源码角度分析一次查询操作过程

    在阅读这篇文章之前,建议先阅读一下我之前写的两篇文章,对理解这篇文章很有帮助,特别是Mybatis新手: 写给mybatis小白的入门指南 mybatis底层原理学习(一):SqlSessionFac ...

  3. 从源码角度分析MapReduce的map-output流程

    文章目录 前言 流程图 源码分析 1 runNewMapper方法 2.NewOutputCollector方法 2.1 createSortingCollector方法 2.1.1 collecto ...

  4. 从源码角度分析 Mybatis 工作原理

    作者:vivo互联网服务器团队-Zhang Peng 一.MyBatis 完整示例 这里,我将以一个入门级的示例来演示 MyBatis 是如何工作的. 注:本文后面章节中的原理.源码部分也将基于这个示 ...

  5. 带你从源码角度分析ViewGroup中事件分发流程

    序言 这篇博文不是对事件分发机制全面的介绍,只是从源码的角度分析ACTION_DOWN.ACTION_MOVE.ACTION_UP事件在ViewGroup中的分发逻辑,了解各个事件在ViewGroup ...

  6. 从源码角度分析Android中的Binder机制的前因后果

    为什么在Android中使用binder通信机制? 众所周知linux中的进程通信有很多种方式,比如说管道.消息队列.socket机制等.socket我们再熟悉不过了,然而其作为一款通用的接口,通信开 ...

  7. Hadoop(十二):从源码角度分析Hadoo是如何将作业提交给集群的

    为什么80%的码农都做不了架构师?>>>    一:MapReduce提交作业过程的流程图 通过图可知主要有三个部分,即: 1) JobClient:作业客户端. 2) JobTra ...

  8. 从源码角度分析MapReduce的reduce流程

    文章目录 前言 流程图 Reduce都干了哪些事? 源码分析 1.run方法 1.1 比较器getOutputValueGroupingComparator 1.1.1 getOutputKeyCom ...

  9. Android开发知识(二十三)从源码角度分析ListView的滑动复用机制

    文章目录 前言 认识RecycleBin机制 ListView的布局方式 ListView的元素创建流程 ListView滑动加载过程 前言 ListView作为一个常用的列表控件,虽然现在基本被Re ...

最新文章

  1. 安全与隐私没有允许任何来源选项
  2. android入门知识,android基础知识学习笔记
  3. elasticjob已下线_elastic-job详解(二):作业的调度
  4. javaone_为JavaOne 2014做好准备!
  5. 优雅的找出ArrayList中重复的元素
  6. 死锁终结者:顺序锁和轮询锁!
  7. 漫画:趣解鸿蒙 OS 如何实现跨平台?
  8. 电脑怎么分区硬盘分区方法
  9. C语言经典问题(收藏)
  10. Codeforces Edu:双指针 » Step 3 » Practice:A. Looped Playlist
  11. PV,UV,IP,VV,CV的含义与区别
  12. 批量删除电脑或手机中的空文件夹
  13. 微信支付body中文乱码解决方案
  14. 利用 SEH 机制 Exploit it
  15. 万用表测占空比怎么接_如何使用万用表测量频率和占空比?
  16. 输入法/非输入法切换 无法取消快捷键问题 以及 shift按键关闭CapsLock问题
  17. when的多条件查询
  18. h5游戏接入googleplay时遇到的问题总结
  19. 一文读懂:快速入门机器学习,基础向
  20. 矩阵转置相关公式_(机器学习示例)上证指数、深证指数相关性研究

热门文章

  1. SpringBoot启动报错Error creating bean with name 'projectingArgumentResolverBeanPostProcessor'
  2. 小d课堂-海量数据处理商用短链平台大课-课程资料xiaoecf
  3. 安桌 网易新闻客户端运行
  4. 了解什么是路由与网关
  5. jq循环日期_jq 日期区间处理
  6. MPB:山大倪金凤组-白蚁肠道优势厌氧菌的分离与培养
  7. MALT1 抑制剂,1926163-57-6,Z-VRP-DArg-FMK
  8. 我们为什么要学计算机硬件技术设计,《计算机硬件组成》教学设计
  9. css实现流星划过的效果
  10. 大厂们终于无法忍受“加一秒”了,微软谷歌Meta等公司提议废除闰秒