目录

  • `RabbitMQ` 概述
    • `RabbitMQ` 消息流动过程
    • 交换机的模式
      • `Topic` 匹配订阅模式
  • `RabbitMQ` 与 `springboot` 整合
    • `DirectExchange` 模式
      • 消息 `provider` 端
        • `Maven` 主要依赖
        • 配置文件
        • 配置类
        • 发送消息接口
          • 测试
      • 消息 `consumer` 端
        • 接收消息类
      • 测试
    • `TopicExchange` 模式
      • 消息 `provider` 端
        • 配置类
        • 发送消息接口
      • 消息 `consumer` 端
        • 接收消息类
      • 测试
        • 测试一
        • 测试二
    • `FanoutExchange` 模式
      • 消息 `provider` 端
        • 配置类
        • 发送消息接口
      • 消息 `consumer` 端
        • 接收消息类
      • 测试

RabbitMQ 概述

RabbitMQ 消息流动过程

RabbitMQ 的一个消息从推送到接收的流程


黄色的圈圈就是我们的消息推送服务,将消息推送到中间方框里面也就是 RabbitMQ 的服务器,然后经过服务器里面的交换机、队列等各种关系(后面会详细讲)将数据处理入列后,最终右边的蓝色圈圈消费者获取对应监听的消息

交换机的模式

常用的交换机有以下 3 种,因为消费者是从队列获取信息的,队列是绑定交换机的(一般),所以对应的消息推送,接收模式也会有以下几种

  • DirectExchange:精确匹配 RoutingKey,将消息发送到绑定到 Exchange 交换机的指定 Queue 队列中
  • FanoutExchange:忽略 RoutingKey,将消息发送到绑定到 Exchange 交换机的所有 Queue 队列中
  • TopicExchange:模糊匹配 RoutingKey,将消息发送到绑定到 Exchange 交换机的指定 Queue 队列中

Topic 匹配订阅模式

  • *:表示一个单词 (必须出现的)
  • #:表示任意数量(零个或多个)单词

通配的绑定键是跟队列进行绑定的。例如,队列 Q1 绑定键为 *.TT.*,队列 Q2 绑定键为 TT.#

  • 如果一条消息携带的路由键为 A.TT.B,那么队列 Q1 将会收到;
  • 如果一条消息携带的路由键为 TT.AA.BB,那么队列 Q2 将会收到

当一个队列的绑定键为 # 的时候,这个队列将会无视消息的路由键,接收所有的消息。当 *# 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为

RabbitMQspringboot 整合

  • springboot 版本:2.0.6.RELEASE
  • rabbitmq 版本:3.8.3

我们需要创建 2springboot 项目,一个 rabbitmq-provider,一个rabbitmq-consumer。首先搭建 rabbitmq-provider

DirectExchange 模式

消息 provider

Maven 主要依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件

server.port=8080spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

配置类

@Configuration
public class DirectRabbitConfig {@Beanpublic Queue DirectQueue() {/*durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在*//*exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除*//*autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除*//*一般设置一下队列的持久化就好,其余两个就是默认false*/return new Queue("DirectQueue", true);}@Beanpublic DirectExchange directExchange() {return new DirectExchange("directExchange", true, false);}/*将队列和交换机绑定, 并设置用于匹配键:DirectRouting*/@Beanpublic Binding binding() {return BindingBuilder.bind(DirectQueue()).to(directExchange()).with("DirectRouting");}
}

发送消息接口

@Controller
public class SendMessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendDirectMessage")@ResponseBodypublic String sendDirectMessage() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "test message, hello!";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);/*将消息携带绑定键值DirectRouting 发送到交换机directExchange上*/rabbitTemplate.convertAndSend("directExchange", "DirectRouting", map);return "消息已发送至rabbitmq server";}
}
测试

启动 rabbitmq 服务,再启动项目 rabbitmq-provider,调用该 http://localhost:8080/sendDirectMessage 接口,再打开 rabbitmq 管理页面 http://localhost:15672/#/



消息 consumer

接下来,创建 rabbitmq-consumer 项目

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
server.port=8081

接收消息类

@Slf4j
@Component
@RabbitListener(queues = {"DirectQueue"})/*监听的队列*/
public class DirectReceiverService1 {@RabbitHandlerpublic void process(@NotNull Map<String, Object> testMessage) {log.info("DirectReceiver1消费者收到消息: " + testMessage.toString());}
}

测试

分别启动 rabbitmq-providerrabbitmq-consumer 两个项目,可以看到 consumer 端的消息消费日志


然后可以再继续调用 rabbitmq-provider 项目的推送消息接口,可以看到消费者即时消费消息


现在是交换机与队列是一对一,那如果有多个消费端监听同一个队列,会怎么样?在 consumer端再增加一个配置类如下

@Slf4j
@Component
@RabbitListener(queues = {"DirectQueue"})
public class DirectReceiverService2 {@RabbitHandlerpublic void process(@NotNull Map<String, Object> testMessage) {log.info("DirectReceiver2消费者收到消息: " + testMessage.toString());}
}

此时是一个交换机,相当于有两个消费端监听同一个队列。再次调用 rabbitmq-provider 项目的推送消息接口,结果如下


可以看到是实现了轮询的方式对消息进行消费,而且不存在重复消费

TopicExchange 模式

消息 provider

配置类

接着,我们使用 TopicExchange 主题交换机。在 rabbitmq-provider 项目里面创建配置类 TopicRabbitConfig

@Configuration
public class TopicRabbitConfig {/*绑定键*/public final static String MAN = "topic.man";public final static String WOMAN = "topic.woman";@Beanpublic Queue firstQueue() {return new Queue(TopicRabbitConfig.MAN, true);}@Beanpublic Queue secondQueue() {return new Queue(TopicRabbitConfig.WOMAN, true);}@Beanpublic TopicExchange topicExchange() {return new TopicExchange("topicExchange", true, false);}/*将firstQueue和topicExchange绑定,而且绑定的键值为topic.man*//*这样只要是消息携带的路由键是topic.man,才会分发到该队列*/@Beanpublic Binding bindingExchangeMessage1(){return BindingBuilder.bind(firstQueue()).to(topicExchange()).with("topic.man");}/*将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#*//*这样只要是消息携带的路由键是以topic.开头,都会分发到该队列*/@Beanpublic Binding bindingExchangeMessage2() {return BindingBuilder.bind(secondQueue()).to(topicExchange()).with("topic.#");}
}

发送消息接口

添加 2 个接口,用于推送消息到主题交换机

@Controller
public class SendMessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendDirectMessage")@ResponseBodypublic String sendDirectMessage() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "test message, hello!";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);/*将消息携带绑定键值DirectRouting 发送到交换机directExchange上*/rabbitTemplate.convertAndSend("directExchange", "DirectRouting", map);return "消息已发送至rabbitmq server";}@GetMapping("/sendTopicMessage1")@ResponseBodypublic String sendTopicMessage1() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "message: M A N ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> manMap = new HashMap<>();manMap.put("messageId", messageId);manMap.put("messageData", messageData);manMap.put("createTime", createTime);rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);return "消息已发送至rabbitmq server";}@GetMapping("/sendTopicMessage2")@ResponseBodypublic String sendTopicMessage2() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "message: woman is all ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> womanMap = new HashMap<>();womanMap.put("messageId", messageId);womanMap.put("messageData", messageData);womanMap.put("createTime", createTime);rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);return "消息已发送至rabbitmq server";}
}

消息 consumer

接收消息类

@Slf4j
@Component
@RabbitListener(queues = {"topic.man"})
public class TopicManReceiver {@RabbitHandlerpublic void process(@NotNull Map<String, Object> testMessage) {log.info("TopicManReceiver消费者收到消息: " + testMessage.toString());}
}@Slf4j
@Component
@RabbitListener(queues = {"topic.woman"})
public class TopicTotalReceiver {@RabbitHandlerpublic void process(@NotNull Map<String, Object> testMessage) {log.info("TopicTotalReceiver消费者收到消息: " + testMessage.toString());}
}

测试

测试一

启动 rabbitmq 服务,rabbitmq-provider,rabbitmq-consumer 两个项目都跑起来,首先调用接口 http://localhost:8080/sendTopicMessage1


消费者 rabbitmq-consumer 的日志输出情况

  • TopicManReceiver 监听队列 firstQueue,绑定键为:topic.man
  • TopicTotalReceiver 监听队列 secondQueue,绑定键为:topic.#
  • 而当前推送的消息,携带的路由键为:topic.man,所以可以看到两个监听消费者receiver 都成功消费到了消息,因为这两个 recevier 监听的队列的绑定键都能与这条消息携带的路由键匹配上

测试二

接下来调用接口 http://localhost:8080/sendTopicMessage2


然后看消费者 rabbitmq-consumer 的日志输出情况

  • TopicManReceiver 监听队列 firstQueue,绑定键为:topic.man
  • TopicTotalReceiver 监听队列 secondQueue,绑定键为:topic.#
  • 而当前推送的消息,携带的路由键为:topic.woman,所以可以看到两个监听消费者只有 TopicTotalReceiver 成功消费到了消息

此时打开 rabbitmq 管理页面,可以看到如下


FanoutExchange 模式

消息 provider

配置类

@Configuration
public class FanoutRabbitConfig {/*创建三个队列*/@Beanpublic Queue queueA() {return new Queue("fanout.A", true);}@Beanpublic Queue queueB() {return new Queue("fanout.B", true);}@Beanpublic Queue queueC() {return new Queue("fanout.C", true);}/*创建FanoutExchange交换机*/@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange", true, false);}/*将三个队列都绑定在交换机fanoutExchange上,因为是fanoutExchange, 路由键无需配置,配置也不起作用*/@Beanpublic Binding bindingExchangeA(){return BindingBuilder.bind(queueA()).to(fanoutExchange());}@Beanpublic Binding bindingExchangeB(){return BindingBuilder.bind(queueB()).to(fanoutExchange());}@Beanpublic Binding bindingExchangeC(){return BindingBuilder.bind(queueC()).to(fanoutExchange());}
}

发送消息接口

添加一个接口

@Controller
public class SendMessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendDirectMessage")@ResponseBodypublic String sendDirectMessage() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "test message, hello!";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);/*将消息携带绑定键值DirectRouting 发送到交换机directExchange上*/rabbitTemplate.convertAndSend("directExchange", "DirectRouting", map);return "消息已发送至rabbitmq server";}@GetMapping("/sendTopicMessage1")@ResponseBodypublic String sendTopicMessage1() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "message: M A N ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> manMap = new HashMap<>();manMap.put("messageId", messageId);manMap.put("messageData", messageData);manMap.put("createTime", createTime);rabbitTemplate.convertAndSend("topicExchange", "topic.man", manMap);return "消息已发送至rabbitmq server";}@GetMapping("/sendTopicMessage2")@ResponseBodypublic String sendTopicMessage2() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "message: woman is all ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> womanMap = new HashMap<>();womanMap.put("messageId", messageId);womanMap.put("messageData", messageData);womanMap.put("createTime", createTime);rabbitTemplate.convertAndSend("topicExchange", "topic.woman", womanMap);return "消息已发送至rabbitmq server";}@GetMapping("/sendFanoutMessage")@ResponseBodypublic String sendFanoutMessage() {String messageId = String.valueOf(UUID.randomUUID());String messageData = "message: testFanoutMessage ";String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String, Object> map = new HashMap<>();map.put("messageId", messageId);map.put("messageData", messageData);map.put("createTime", createTime);rabbitTemplate.convertAndSend("fanoutExchange", null, map);return "消息已发送至rabbitmq server";}
}

消息 consumer

接收消息类

@Slf4j
@Component
@RabbitListener(queues = {"fanout.A"})
public class FanoutReceiverA {@RabbitHandlerpublic void process(@NotNull Map<String, Object> testMessage) {log.info("FanoutReceiverA消费者收到消息  : " + testMessage.toString());}
}@Slf4j
@Component
@RabbitListener(queues = {"fanout.B"})
public class FanoutReceiverB {@RabbitHandlerpublic void process(@NotNull Map<String, Object> testMessage) {log.info("FanoutReceiverB消费者收到消息  : " + testMessage.toString());}
}@Slf4j
@Component
@RabbitListener(queues = {"fanout.C"})
public class FanoutReceiverC {@RabbitHandlerpublic void process(@NotNull Map<String, Object> testMessage) {log.info("FanoutReceiverC消费者收到消息  : " + testMessage.toString());}
}

测试

启动 rabbitmq 服务,rabbitmq-provider,rabbitmq-consumer 两个项目都跑起来,首先调用接口 http://localhost:8080/sendFanoutMessage


然后看看 rabbitmq-consumer 项目的日志情况


可以看到只要发送到 fanoutExchange 这个 FanoutExchange 的消息, 三个队列都绑定这个交换机,所以三个消息接收类都监听到了这条消息

此时打开 rabbitmq 管理页面,可以看到如下


本文未完,接着下一篇 springboot整合rabbitmq(二)

源码:https://gitee.com/chaojiangcj/springboot-rabbitmq

springboot整合rabbitmq(一)相关推荐

  1. 九、springboot整合rabbitMQ

    springboot整合rabbitMQ 简介 rabbitMQ是部署最广泛的开源消息代理. rabbitMQ轻量级,易于在内部和云中部署. 它支持多种消息传递协议. RabbitMQ可以部署在分布式 ...

  2. RabbitMQ,RabbitMQ 的工作模式,Spring 整合 RabbitMQ,Springboot 整合RabbitMQ

    什么是RabbitMQ 1.1 MQ概述 MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器.多用于分布式系统之间进行通信. ⚫ MQ,消息队列,存储消息的中间件 ⚫ ...

  3. SpringBoot整合 ActiveMQ、SpringBoot整合RabbitMQ、SpringBoot整合Kafka

    1.概念:SpringBoot 整合消息服务2.具体内容对于异步消息组件在实际的应用之中会有两类:· JMS:代表作就是 ActiveMQ,但是其性能不高,因为其是用 java 程序实现的:· AMQ ...

  4. RabbitMq(九) SpringBoot整合RabbitMQ消费者示例代码

    概述 在上一篇我们介绍了SpringBoot整合RabbitMQ生产者代码,本章我们介绍SpringBoot整合RabbitMQ,实现消费者工程的代码实现.与生产者集成相比,集成消费者不需要进行添加配 ...

  5. RabbitMq(八) SpringBoot整合RabbitMQ 生产者代码实现

    在本章中我们将创建RabbitMQ的生产者工程,并实现生产者端代码实现. springboot整合RabbitMQ生产者工程步骤如下: 创建maven工程 引入springboot及RabbitMQ依 ...

  6. Springboot整合一之Springboot整合RabbitMQ

    前言 目前,springboot已然成为了最热的java开发整合框架,主要是因其简单的配置,并且本身提供了很多与第三方框架的整合,甚至可以让我们在短短的几分钟里就可以搭建一个完整的项目架构.所以,博主 ...

  7. Springboot整合RabbitMQ,包含direct,topic,fanout三种模式的整合

    一 RabbitMQ的介绍 RabbitMQ是消息中间件的一种,消息中间件即分布式系统中完成消息的发送和接收的基础软件.这些软件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿 ...

  8. Spring Boot---(10)SpringBoot整合RabbitMQ

    请参考:Spring Boot---(24)springboot整合RabbitMQ 由于docker安装非常方便,这里就用docker来安装和启动了.没接触过docker的可以参考这里:零基础学习D ...

  9. RabbitMQ 第一天 基础 6 SpringBoot 整合RabbitMQ

    RabbitMQ [黑马程序员RabbitMQ全套教程,rabbitmq消息中间件到实战] 文章目录 RabbitMQ 第一天 基础 6 SpringBoot 整合RabbitMQ 6.1 Sprin ...

  10. SpringBoot整合RabbitMQ 访问保错: reply-text=NOT_ALLOWED - access to vhost '/' refused for user 'cuit'

    SpringBoot整合RabbitMQ集群时报如下错误 RabbitMQ集群配置参考:https://blog.csdn.net/weixin_42465125/article/details/88 ...

最新文章

  1. The Unique MST
  2. boost::mpl模块实现print相关的测试程序
  3. 程序员-真实学习之路
  4. node-red教程2 第一条数据流
  5. uploadify 配置后,页面显示无效果
  6. C++多进程并发框架FFLIB
  7. MACAPP中引入ffmpeg库完成具体功能
  8. 买一个二级计算机软件多少钱,计算机二级考试需要买课本吗
  9. 2021年下半年 全国计算机技术与软件专业技术资格考试 浙江省合格人员数据分布
  10. 脱壳_详细_使用的方法_03
  11. 光源发散角怎么设置_Three.js 中的光源
  12. java如何动态添加数组数据_Java动态数组添加数据的方法与应用示例
  13. 基于Wiki的知识共享平台模型架构
  14. 新浪微博PC端登陆js分析及Python实现微博post登陆
  15. 语音芯片播报方案选型补充说明
  16. 应用提交 App Store 上架被拒的原因都有哪些
  17. 服饰美妆新品 | 阿迪达斯可循环跑鞋第三代LOOP系列发布;赫丽尔斯X吃豆人跨界限定系列推出...
  18. 萧毅舟;2.22黄金原油日内走势分析及操作策略建议
  19. lms c语言,LMS算法实现自适应滤波器(C语言版)
  20. QCustomplot笔记(二)之QCustomplot 坐标轴属性设置

热门文章

  1. 自动驾驶 4-1 二维运动学建模Kinematic Modeling in 2D
  2. 阿里云云计算 36 PolarDB MySQL的管理步骤
  3. 算法:两个数的和等于指定值1. Two Sum
  4. 过拟合产生的原因有哪些
  5. #include《》和#include“”的区别
  6. python实现连接池技术
  7. Python sys.path、sys.modules模块介绍
  8. 12满秩分解与奇异值分解(2)
  9. [译]直观理解信息论
  10. 数据库基础(5)1NF,2NF,3NF,BCNF 四大范式的定义和判别