Spring AMQP 源码分析 04 - MessageListener

## 测试代码

gordon.study.rabbitmq.springamqp.AsyncConsumer.java

### 分析

## MessageListener

org.springframework.amqp.core.MessageListener 是 Spring AMQP 异步消息投递的监听器接口,它只有一个方法 onMessage,用于处理消息队列推送来的消息。

MessageListener 大概对应 amqp client 中的 Consumer 类。onMessage 方法大概对应 Consumer 类的 handleDelivery 方法。

从这也可以看出,Spring AMQP 的 Message 类至少包含 consumer tag、envelope、basic properties 和 message body 等信息

## MessageListenerContainer

org.springframework.amqp.rabbit.listener.MessageListenerContainer 可以看作 message linstener 的容器。但这个 Container 的语义并不是指它包含多个 message listener,实际上从方法注释和实现代码可以看出,MessageListenerContainer 只包含一个 MessageListener 。那 Container 的语义是什么呢?

一方面,Container 是指虽然只有一个 MessageListener 指定消息消费的逻辑,但是可以生成多个线程使用相同的 MessageListener 同时消费消息。代码第19行 setConcurrentConsumers 方法就是用来指定并发消费者的数量。可以把 MessageListenerContainer 看成下图中的 Subscriber group

另一方面,Container 代表生命周期管理的职责。MessageListener 仅仅实现消息消费逻辑,而整个消息消费何时开始、何时结束、如何设置、状态怎样等等问题全都是由 MessageListenerContainer(及其实现类)负责的。实际上,MessageListenerContainer 继承自 SmartLifecycle 接口,该接口是 Spring 容器提供的与生命周期管理相关的接口,实现该接口的类一般情况下会由 Spring 容器负责启动与停止。由于本例没有启用 Spring 容器环境,所以代码第26行需要主动调用 start 方法,消息消费才会开始执行。

## 内部实现思路

我们知道,amqp client 中的 Consumer 接口实际上只定义了回调方法,我们在回调方法(主要是 handleDelivery 方法)中实现自己的业务逻辑(对消息的消费)。Consumer 接口的回调方法实际上是在一个独立线程中执行的,当我们调用 Channel 的 basicConsume方法时,amqp client 会创建线程处理消息、创建队列缓存从 broker 推送来的消息。然而这些内部实现并没有暴露出来,导致 Spring AMQP 必须自己重新编写一套类似的实现以获得最大的灵活度。

按照前面的分析,我们可以想象 Spring AMQP 为了实现自己的 message listener,需要哪些组件:

  • MessageListenerContainer 的实现类,即 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer。它作为整个异步消息投递的核心类存在。
  • 因为 MessageListenerContainer 实际上管理了一个消费者线程组,因此需要相关线程类与线程调度类。Spring AMQP 中该线程类为 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer,调度类当然就是 SimpleMessageListenerContainer,其 start 方法会启动线程
  • 消息队列推送过来的消息需要一个本地队列缓存。
  • 需要实现 amqp client 的 Consumer 接口。在该接口实现类中,我们简单的把消息放到本地队列中。org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer 负责这件事
  • 根据单一职责原则,线程类只负责异步消费者的创建与(无限循环)消息消费;InternalConsumer 只负责实现 amqp client 的 Consumer 接口,与 amqp client 原生的异步消息投递实现对接,将消息放入本地队列。那么,我们还需要一个真正的异步消费者模型,用来管理消费行为与状态。org.springframework.amqp.rabbit.listener.BlockingQueueConsumer 承担这部分责任。从名字可以看出,BlockingQueueConsumer 采用 BlockingQueue 作为本地队列缓存消息。
  • 用户的业务逻辑是在 MessageListener 接口中实现的,框架的主要处理过程为:创建合适的连接与信道,从 amqp client 中获取消息暂存到本地缓存,从本地缓存读取消息并调用 MessageListener 接口的 onMessage 方法消费消息。

## 内部流程分析

SimpleMessageListenerContainer 的 start 方法会根据 int concurrentConsumers 的值创建对应数量的 BlockingQueueConsumer 实例,并放入 Set<BlockingQueueConsumer> consumers 中。接着为每一个 BlockingQueueConsumer 创建对应的消息处理线程 AsyncMessageProcessingConsumer(实现了 Runnable 接口),并通过 Executor taskExecutor = new SimpleAsyncTaskExecutor() 这个自实现的线程池启动每一个 AsyncMessageProcessingConsumer 线程。最后通过判断每一个 AsyncMessageProcessingConsumer 的 FatalListenerStartupException startupException 属性是否有值来判断 SimpleMessageListenerContainer 是否正常启动了所有的消息监听器。

构建 BlockingQueueConsumer 的构造函数参数很多,其中 ConnectionFactory 是代码第17行创建的 CachingConnectionFactory,AcknowledgeMode 默认为 AUTO。

org.springframework.amqp.core.AcknowledgeMode 定义了三种确认模式:

  • NONE:不确认,相当于 amqp client 中 Channel.basicConsume 方法中 autoAck 参数值设为 true
  • MANUAL:用户通过 channel aware listener 手动控制消息确认
  • AUTO:Spring AMQP 框架根据 message listener 的 onMessage 执行过程中是否抛出异常来决定是否确认消息消费

AsyncMessageProcessingConsumer 的 run 方法比较复杂,粗略解读一下

  1. 调用 BlockingQueueConsumer 的 start 方法(不是 Runnable 接口)。
  2. start 方法先通过 ConnectionFactoryUtils.getTransactionalResourceHolder 静态方法创建出供该线程使用的 channel,该方法返回类型是 RabbitResourceHolder。这部分代码涉及到事务,很复杂,但是本文的测试代码不涉及事务,目前只要了解多个 AsyncMessageProcessingConsumer 会生成多个 RabbitResourceHolder 实例,但是由于使用了 CachingConnectionFactory 的默认缓存模式,所以这些 RabbitResourceHolder 实例共用同一个(AMQP)连接,每个 AsyncMessageProcessingConsumer 独享该连接创建的一个(AMQP)信道即可
  3. start 方法接着创建 InternalConsumer 实例,并调用刚创建的 AMQP 信道的 basicQos 和  basicConsume 方法开始接受消息。这样,当队列接受到消息时,amqp client 会主动调用 InternalConsumer 的 handleDelivery 方法。该方法调用 BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body)); 将消息放到 BlockingQueueConsumer 的 BlockingQueue<Delivery> queue 中。org.springframework.amqp.rabbit.support.Delivery 类封装了 amqp client 通过  handleDelivery 方法回送过来的所有参数。
    有两个细节值得说一下:第一,BlockingQueueConsumer 可以同时消费多个队列,对每个队列,都会调用 basicConsume 方法让 InternalConsumer 监听当前队列(即同一个信道,同一个 Consumer ,不同的队列);其二,可以通过 ConsumerTagStrategy tagStrategy 设定 Tag 命名规则。
  4. 接着,在 while 循环中调用 SimpleMessageListenerContainer 的 receiveAndExecute 方法,不停的尝试从 queue 中获取 Delivery 实例,将之转化为 Message,然后执行 MessageListener 的 onMessage 回调方法。
  5. 如果执行成功,则调用 AMQP 信道的 basicAck 方法确认消息消费成功。
  6. 如果执行过程中发生异常,则将异常转化为 ListenerExecutionFailedException 抛出。默认情况下,Spring AMQP 处理用户自定义异常的逻辑非常简单:调用 AMQP 信道的 basicReject 方法将消息退回队列,打印 warning 级别的日志,但不会打破 AsyncMessageProcessingConsumer 线程的 while 循环,消息消费继续进行。这部分内容下篇文章分析。

来源: https://www.cnblogs.com/gordonkong/p/7115155.html

RabbitMQ系列-MessageListener相关推荐

  1. RabbitMQ系列之【启动过程中遇到问题及解决方案】

    RabbitMQ系列之[启动过程中遇到问题及解决方案] 参考文章: (1)RabbitMQ系列之[启动过程中遇到问题及解决方案] (2)https://www.cnblogs.com/feixiabl ...

  2. rabbitmq系列问题解决:406, “PRECONDITION_FAILED - inequivalent arg ‘durable‘

    rabbitmq系列问题解决:406, "PRECONDITION_FAILED - inequivalent arg 'durable' 参考文章: (1)rabbitmq系列问题解决:4 ...

  3. RabbitMQ系列教程之四:路由(Routing)

    在上一个教程中,我们构建了一个简单的日志系统,我们能够向许多消息接受者广播发送日志消息. 在本教程中,我们将为其添加一项功能 ,这个功能是我们将只订阅消息的一个子集成为可能. 例如,我们可以只将关键的 ...

  4. RabbitMQ系列教程之三:发布\/订阅(Publish\/Subscribe)

    在前一个教程中,我们创建了一个工作队列.工作队列背后的假设是每个任务会被交付给一个[工人].在这一部分我们将做一些完全不同的事情--我们将向多个[消费者]传递信息.这种模式被称为"发布/订阅 ...

  5. RabbitMQ系列教程之二:工作队列(Work Queues)

    今天开始RabbitMQ教程的第二讲,废话不多说,直接进入话题.   (使用.NET 客户端 进行事例演示) 在第一个教程中,我们编写了一个从命名队列中发送和接收消息的程序.在本教程中,我们将创建一个 ...

  6. RabbitMQ系列(三)RabbitMQ交换器Exchange介绍与实践

    RabbitMQ交换器Exchange介绍与实践 RabbitMQ系列文章 RabbitMQ在Ubuntu上的环境搭建 深入了解RabbitMQ工作原理及简单使用 RabbitMQ交换器Exchang ...

  7. RabbitMQ系列——Rabbitmq Plugin configuration unchanged. 解决方案

    RabbitMQ系列--Rabbitmq Plugin configuration unchanged. 解决方案 一.问题概述 cmd命令窗口执行rabbitmq-plugins enable ra ...

  8. 工作队列模式(任务队列)| RabbitMQ系列(二)

    相关文章 RabbitMQ系列汇总:RabbitMQ系列 前言 工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成. 相反我们安排任务在之后执行.我们把任务封装为消息并 ...

  9. 【外行也能看懂的RabbitMQ系列(一)】—— RabbitMQ快速入门篇(内含丰富实例)

    系列文章目录 准备篇 RabbitMQ安装文档 第一章 RabbitMQ快速入门篇 第二章 RabbitMQ的Web管理界面详解 第三章 RabbitMQ进阶篇之死信队列 第四章 RabbitMQ进阶 ...

最新文章

  1. 【FFmpeg】警告:[hls] pkt.duration = 0, maybe the hls segment duration will not precise
  2. 2018年Python开源项目Top100!只在这里!
  3. 字节跳动一面:i++ 是线程安全的吗?
  4. 变分自编码器VAE代码
  5. java 生成 防伪码,优秀的生成防伪码的代码应该如何写?百万千万量级别的。
  6. [转]七大.NET开源框架
  7. 使用xmlhttp结合asp,实现网页的异步调用_asp实例
  8. python深度学习NER任务中:对段落的分割
  9. 页面加载完后立刻执行JS的两种方法
  10. 基于curl的php多线程类(异步请求)
  11. proxool java_Java应用中使用Proxool
  12. LeetCode 1798. 你能构造出连续值的最大数目
  13. remoting 中事件找不到订阅者时引发异常的解决办法
  14. ansys2017安装教程_ANSYS Proucts 18.1安装激活教程
  15. 气象专业文件nc的读取、裁剪与输出(python)
  16. 编写程序求解百鸡百钱问题。公鸡5元一只,母鸡3元一只,小鸡一元3只,问100元钱买100只鸡,可买公鸡、母鸡、小鸡各多少只?
  17. Navicat Premium 12 中文版v12.1.19
  18. mysql 触发器 sql server_喜忧参半的SQL Server触发器
  19. 一文总结 Shiro 实战教程
  20. Binary Cross Entropy

热门文章

  1. 怎样高枕无忧的使用阻力线
  2. Python|并发编程|爬虫|单线程|多线程|异步I/O|360图片|Selenium及JavaScript|Scrapy框架|BOM 和 DOM 操作简介|语言基础50课:学习(12)
  3. 报错:Since Spark 2.3,the queries from raw JSON/CSV files are disallowed when thereferenced columns
  4. 畅谈知识表示与知识图谱
  5. 情感成熟的20个标志,你知道吗?
  6. 苹果手机微信分身怎么弄?学会这学会这招不求人!
  7. [Java]finalized方法
  8. Onvif —— onvif 详细介绍
  9. 将excel中的多个工作表sheet合成一个工作表
  10. SpringBoot使用RestTemplate访问第三方接口