从源码角度分析RabbitMQ重启后,消费者停止消费怎么解决
前段时间的RabbitMQ broker服务端由于某个队列一直积压消息,运维在凌晨对mq服务端机器pod进行了扩容,重启了RabbitMQ,然后早上发现自己的服务在mq重启之后一直报异常,停止消费了,导致影响了业务的运行,虽然mq重启成功了但是消费者却没有重连成功。本节会通过分析spring-rabbit的源码,来分析问题出现的原因以及解决办法。
目录
一、出现的问题
二、spring-rabbit消费源码分析
三、解决消费者停止消费问题
一、出现的问题
先看下报了什么异常,这里挑了一些主要的异常堆栈贴出来
o.s.a.r.l.BlockingQueueConsumer - Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException
Failed to declare queue(s):[work_queue]...............................................
Consumer received fatal=false exception on startup
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.handleDeclarationException(BlockingQueueConsumer.java:661)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:601)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:581)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1196)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1041)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[work_queue]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:710)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:594)
... 4 common frames omitted
Caused by: java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:1006)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:52)
at sun.reflect.GeneratedMethodAccessor175.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1110)
at com.sun.proxy.$Proxy285.queueDeclarePassive(Unknown Source)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:689)
... 5 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'work_queue' in vhost 'work_platform', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138)
... 13 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'work_queue' in vhost 'work_platform', class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:516)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:178)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:111)
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:670)
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:597)
... 1 common frames omitted...........................
o.s.a.r.l.SimpleMessageListenerContainer message: Stopping container from aborted consumer
我们挑里面的几个主要的错误信息:
- Failed to declare queue(s):[work_queue]
- Consumer received fatal=false exception on startup org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
- Stopping container from aborted consumer
二、spring-rabbit消费源码分析
打开spring-rabbit的源码发现Consumer received fatal=false exception on startup异常在SimpleMessageListenerContainer类中的子类AsyncMessageProcessingConsumer的run()方法中。
下面简单分析一下spring-rabbit的消费者源码流程:
spring-rabbit会为我们的每个消费者(可能消费一个或者多个队列)创建一个SimpleMessageListenerContainer对象,SimpleMessageListenerContainer继承了AsyncMessageProcessingConsumer,而AsyncMessageProcessingConsumer又实现了Lifecycle接口的start()方法,启动时会调用start()方法。
AsyncMessageProcessingConsumer的start()方法如下:
/*** Start this container.* @see #doStart*/@Overridepublic void start() {if (isRunning()) {return;}if (!this.initialized) {synchronized (this.lifecycleMonitor) {if (!this.initialized) {afterPropertiesSet();}}}try {if (logger.isDebugEnabled()) {logger.debug("Starting Rabbit listener container.");}configureAdminIfNeeded();checkMismatchedQueues();doStart();}catch (Exception ex) {throw convertRabbitAccessException(ex);}}
start()方法会调用doStart()方法
/*** Start this container, and notify all invoker tasks.*/
protected void doStart() {// Reschedule paused tasks, if any.synchronized (this.lifecycleMonitor) {this.active = true;this.running = true;this.lifecycleMonitor.notifyAll();}
}
实际上调用的是子类SimpleMessageListenerContainer中的doStart方法,
/*** Re-initializes this container's Rabbit message consumers, if not initialized already. Then submits each consumer* to this container's task executor.*/
@Override
protected void doStart() {checkListenerContainerAware();super.doStart();synchronized (this.consumersMonitor) {if (this.consumers != null) {throw new IllegalStateException("A stopped container should not have consumers");}int newConsumers = initializeConsumers();if (this.consumers == null) {logger.info("Consumers were initialized and then cleared " +"(presumably the container was stopped concurrently)");return;}if (newConsumers <= 0) {if (logger.isInfoEnabled()) {logger.info("Consumers are already running");}return;}Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();for (BlockingQueueConsumer consumer : this.consumers) {AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);processors.add(processor);getTaskExecutor().execute(processor);if (getApplicationEventPublisher() != null) {getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));}}waitForConsumersToStart(processors);}
}
里面的initializeConsumers方法根据我们配置的消费者线程数量concurrentConsumers创建对应数量的消费者,实际的消费逻辑都在BlockingQueueConsumer中。
然后循环遍历BlockingQueueConsumer集合,将每个BlockingQueueConsumer包装创建一个AsyncMessageProcessingConsumer(实现了Runnable接口)。
getTaskExecutor().execute(processor)获取线程池执行创建的线程任务,然后发布了一个AsyncConsumerStartedEvent事件。
protected int initializeConsumers() {int count = 0;synchronized (this.consumersMonitor) {if (this.consumers == null) {this.cancellationLock.reset();this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);for (int i = 0; i < this.concurrentConsumers; i++) {BlockingQueueConsumer consumer = createBlockingQueueConsumer();this.consumers.add(consumer);count++;}}}return count;
}
下面看下消费的最核心逻辑,也就是AsyncMessageProcessingConsumer实现的run方法:
我们看到try catch代码块中有一个while循环中有一个mainLoop()循环,mainLoop方法中就是拉取消息的逻辑,可以看到catch了很多的异常。在上面定义了一个boolean类型的变量aborted默认false,在catch到的有些异常当中将aborted改为了true,aborted这个变量的值直接决定了下面在killOrRestart()方法中的处理逻辑。
看下前面抛出的QueuesNotAvailableException异常的catch逻辑,会判断missingQueuesFatal属性是否为true,如果missingQueuesFatal为true会将aborted改成了true,然后调用了publishConsumerFailedEvent方法。
missingQueuesFatal属性默认为true,表示队列出现异常(队列不可用/被删除)的时候是否失败,如果失败的话消费者容器(MessageListenerContainer)不会自动进行重启,如果为false则表示队列出现异常时会自动重启消费容器。将missingQueuesFatal改成false也是一种解决消费者不消费的解决办法。
spring.rabbitmq.listener.simple.missing-queues-fatal=true
protected boolean isMissingQueuesFatal() {return this.missingQueuesFatal;
}
......
catch (QueuesNotAvailableException ex) {logger.error("Consumer received fatal=" + isMismatchedQueuesFatal() + " exception on startup", ex);if (isMissingQueuesFatal()) {this.startupException = ex;// Fatal, but no point re-throwing, so just abort.aborted = true;}publishConsumerFailedEvent("Consumer queue(s) not available", aborted, ex);
}
publishConsumerFailedEvent方法中传入了aborted(此时为true),即fatal的值为true,abortEvents(BlockingQueue)添加了一个ListenerContainerConsumerFailedEvent事件(fatal此时为true)。
private final BlockingQueue<ListenerContainerConsumerFailedEvent> abortEvents = new LinkedBlockingQueue<>();
......
protected void publishConsumerFailedEvent(String reason, boolean fatal, Throwable t) {if (!fatal || !isRunning()) {super.publishConsumerFailedEvent(reason, fatal, t);}else {try {this.abortEvents.put(new ListenerContainerConsumerFailedEvent(this, reason, t, fatal));}catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}
killOrRestart()方法:关键的地方都写了注释,可以看到aborted的值是true的时候,会从阻塞队列abortEvents中获取ListenerContainerConsumerFailedEvent事件,并且广播该事件;如果aborted为false的时候会调用restart方法重启消费者容器。
注意:这里当前面的catch的异常中将aborted改成了true或者消费者已经关闭的状态下,消费者容器不会自动重启,仅仅是发布了一个ListenerContainerConsumerFailedEvent广播事件,其他情况下消费者会自动重启。
我们针对消费者停止消费的处理逻辑也就可以从ListenerContainerConsumerFailedEvent广播事件入手。
private void killOrRestart(boolean aborted) {//判断consumer是关闭状态 || aborted==trueif (!isActive(this.consumer) || aborted) {logger.debug("Cancelling " + this.consumer);try {this.consumer.stop();SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);if (getApplicationEventPublisher() != null) {getApplicationEventPublisher().publishEvent(new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));}}catch (AmqpException e) {logger.info("Could not cancel message consumer", e);}if (aborted && SimpleMessageListenerContainer.this.containerStoppingForAbort.compareAndSet(null, Thread.currentThread())) {logger.error("Stopping container from aborted consumer");stop();SimpleMessageListenerContainer.this.containerStoppingForAbort.set(null);ListenerContainerConsumerFailedEvent event = null;do {try {//从阻塞队列abortEvents中获取ListenerContainerConsumerFailedEvent事件event = SimpleMessageListenerContainer.this.abortEvents.poll(ABORT_EVENT_WAIT_SECONDS,TimeUnit.SECONDS);if (event != null) {//如果ListenerContainerConsumerFailedEvent不为空,发布广播该事件SimpleMessageListenerContainer.this.publishConsumerFailedEvent(event.getReason(), event.isFatal(), event.getThrowable());}}catch (InterruptedException e) {Thread.currentThread().interrupt();}}while (event != null);}}else {logger.info("Restarting " + this.consumer);//调用restart方法重启消费者容器restart(this.consumer);}
}
restart方法会重新创建消费者,发布消费者重启事件AsyncConsumerRestartedEvent,通过线程池执行AsyncMessageProcessingConsumer任务。
private void restart(BlockingQueueConsumer oldConsumer) {BlockingQueueConsumer consumer = oldConsumer;synchronized (this.consumersMonitor) {if (this.consumers != null) {try {// Need to recycle the channel in this consumerconsumer.stop();// Ensure consumer counts are correct (another is going// to start because of the exception, but// we haven't counted down yet)this.cancellationLock.release(consumer);this.consumers.remove(consumer);if (!isActive()) {// Do not restart - container is stoppingreturn;}//重新创建消费者BlockingQueueConsumerBlockingQueueConsumer newConsumer = createBlockingQueueConsumer();newConsumer.setBackOffExecution(consumer.getBackOffExecution());consumer = newConsumer;this.consumers.add(consumer);if (getApplicationEventPublisher() != null) {//发布消费者重启事件AsyncConsumerRestartedEventgetApplicationEventPublisher().publishEvent(new AsyncConsumerRestartedEvent(this, oldConsumer, newConsumer));}}catch (RuntimeException e) {logger.warn("Consumer failed irretrievably on restart. " + e.getClass() + ": " + e.getMessage());// Re-throw and have it logged properly by the caller.throw e;}//通过线程池异步执行任务AsyncMessageProcessingConsumergetTaskExecutor().execute(new AsyncMessageProcessingConsumer(consumer));}}
}
三、解决消费者停止消费问题
通过上面的源码分析,我们得知消费出现某些异常(例如QueuesNotAvailableException)的时候会发布一个ListenerContainerConsumerFailedEvent事件,我们可以监听这个事件重启消费者容器。
spring中跟RabbitMQ相关的事件是AmqpEvent的子类
spring通过发布事件的方式,可以通知观察者(即事件监听器)消费者的一些行为,消费者相关的事件如下所示:
- AsyncConsumerStartedEvent:一个新的消费者启动事件
- AsyncConsumerStoppedEvent:一个消费者停止事件
- AsyncConsumerRestartedEvent:一个消费者重启事件
- ListenerContainerConsumerFailedEvent:一个消息监听器消费失败的事件
我们可以监听ListenerContainerConsumerFailedEvent事件,其定义如下所示:其中有一个属性fatal,我们上面也提到过,fatal为true时表示消费者出现了致命的错误,此时消费者不会自动重试进行重新启动,需要我们在事件处理逻辑中进行重启。当fatal为false时,我们可以忽略该事件,消费者容器中会自动重试启动。
public class ListenerContainerConsumerFailedEvent extends AmqpEvent {private final String reason;private final boolean fatal;private final Throwable throwable;
}
处理逻辑代码:判断event的fatal是true时,先判断container是否在运行,如果没有在运行则调用start进行启动,然后发送告警信息。
import java.util.Arrays;
import org.springframework.amqp.rabbit.listener.ListenerContainerConsumerFailedEvent;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import lombok.extern.slf4j.Slf4j;@Slf4j
@Component
public class ListenerContainerConsumerFailedEventListener implements ApplicationListener<ListenerContainerConsumerFailedEvent> {@Overridepublic void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {log.error("消费者失败事件发生:{}", event);if (event.isFatal()) {log.error("Stopping container from aborted consumer. Reason::{}", event.getReason(), event.getThrowable());SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource();String queueNames = Arrays.toString(container.getQueueNames());try {try {Thread.sleep(30000);} catch (Exception e) {log.error(e.getMessage());}//判断此时消费者容器是否正在运行Assert.state(!container.isRunning(), String.format("监听容器%s正在运行!", container));//消费者容器没有在运行时,进行启动container.start();log.info("重启队列{}的监听成功", queueNames);} catch (Exception e) {log.error("重启队列{}的监听失败", queueNames, e);}// TODO 短信/邮件/钉钉...告警,包含队列信息,监听断开原因,断开时异常信息,重启是否成功等...}}
}
另外,也可以将missingQueuesFatal改成false,可以在抛出QueuesNotAvailableException异常时不改变aborted的值,这样在killOrRestart方法中就会自动自动调用重启的方法,但是这种处理方式仅限于QueuesNotAvailableException异常,不像上面的处理方式具有通用性。
spring.rabbitmq.listener.simple.missing-queues-fatal=false
从源码角度分析RabbitMQ重启后,消费者停止消费怎么解决相关推荐
- 【Android 插件化】Hook 插件化框架 ( 从源码角度分析加载资源流程 | Hook 点选择 | 资源冲突解决方案 )
Android 插件化系列文章目录 [Android 插件化]插件化简介 ( 组件化与插件化 ) [Android 插件化]插件化原理 ( JVM 内存数据 | 类加载流程 ) [Android 插件 ...
- Mybatis底层原理学习(二):从源码角度分析一次查询操作过程
在阅读这篇文章之前,建议先阅读一下我之前写的两篇文章,对理解这篇文章很有帮助,特别是Mybatis新手: 写给mybatis小白的入门指南 mybatis底层原理学习(一):SqlSessionFac ...
- 从源码角度分析MapReduce的map-output流程
文章目录 前言 流程图 源码分析 1 runNewMapper方法 2.NewOutputCollector方法 2.1 createSortingCollector方法 2.1.1 collecto ...
- 从源码角度分析 Mybatis 工作原理
作者:vivo互联网服务器团队-Zhang Peng 一.MyBatis 完整示例 这里,我将以一个入门级的示例来演示 MyBatis 是如何工作的. 注:本文后面章节中的原理.源码部分也将基于这个示 ...
- 带你从源码角度分析ViewGroup中事件分发流程
序言 这篇博文不是对事件分发机制全面的介绍,只是从源码的角度分析ACTION_DOWN.ACTION_MOVE.ACTION_UP事件在ViewGroup中的分发逻辑,了解各个事件在ViewGroup ...
- 从源码角度分析Android中的Binder机制的前因后果
为什么在Android中使用binder通信机制? 众所周知linux中的进程通信有很多种方式,比如说管道.消息队列.socket机制等.socket我们再熟悉不过了,然而其作为一款通用的接口,通信开 ...
- Hadoop(十二):从源码角度分析Hadoo是如何将作业提交给集群的
为什么80%的码农都做不了架构师?>>> 一:MapReduce提交作业过程的流程图 通过图可知主要有三个部分,即: 1) JobClient:作业客户端. 2) JobTra ...
- 从源码角度分析MapReduce的reduce流程
文章目录 前言 流程图 Reduce都干了哪些事? 源码分析 1.run方法 1.1 比较器getOutputValueGroupingComparator 1.1.1 getOutputKeyCom ...
- Android开发知识(二十三)从源码角度分析ListView的滑动复用机制
文章目录 前言 认识RecycleBin机制 ListView的布局方式 ListView的元素创建流程 ListView滑动加载过程 前言 ListView作为一个常用的列表控件,虽然现在基本被Re ...
最新文章
- 安全与隐私没有允许任何来源选项
- android入门知识,android基础知识学习笔记
- elasticjob已下线_elastic-job详解(二):作业的调度
- javaone_为JavaOne 2014做好准备!
- 优雅的找出ArrayList中重复的元素
- 死锁终结者:顺序锁和轮询锁!
- 漫画:趣解鸿蒙 OS 如何实现跨平台?
- 电脑怎么分区硬盘分区方法
- C语言经典问题(收藏)
- Codeforces Edu:双指针 » Step 3 » Practice:A. Looped Playlist
- PV,UV,IP,VV,CV的含义与区别
- 批量删除电脑或手机中的空文件夹
- 微信支付body中文乱码解决方案
- 利用 SEH 机制 Exploit it
- 万用表测占空比怎么接_如何使用万用表测量频率和占空比?
- 输入法/非输入法切换 无法取消快捷键问题 以及 shift按键关闭CapsLock问题
- when的多条件查询
- h5游戏接入googleplay时遇到的问题总结
- 一文读懂:快速入门机器学习,基础向
- 矩阵转置相关公式_(机器学习示例)上证指数、深证指数相关性研究
热门文章
- SpringBoot启动报错Error creating bean with name 'projectingArgumentResolverBeanPostProcessor'
- 小d课堂-海量数据处理商用短链平台大课-课程资料xiaoecf
- 安桌 网易新闻客户端运行
- 了解什么是路由与网关
- jq循环日期_jq 日期区间处理
- MPB:山大倪金凤组-白蚁肠道优势厌氧菌的分离与培养
- MALT1 抑制剂,1926163-57-6,Z-VRP-DArg-FMK
- 我们为什么要学计算机硬件技术设计,《计算机硬件组成》教学设计
- css实现流星划过的效果
- 大厂们终于无法忍受“加一秒”了,微软谷歌Meta等公司提议废除闰秒