1.前言

基于SpringBoot我搭建了一个模拟购买商品订单下单并发送消息使用RabbitMQ消息队列的场景来分析实现不同模式下的场景。

也是对于SpringBoot整合RabbitMQ的一种总结。

使用到的模型如下图所示,在下订单处理的同时,采用消息队列生产者向MQ消息中间件中生产消息发送给对应的队列,创建消费者来消费队列中的消息调用服务。

2.基于SpringBoot配置类构建消息队列

项目构建我采用的是IDEA中Spring Initializr构建器创建的SpringBoot Maven项目,这部分主要是使用到了Spring RabbitMQ与SpringBoot Web的依赖组件。

由于原生支持,在IDEA中勾选对应的选项即可,非常简单,无需考虑多余的Maven Repository引入。

创建SpringBoot项目主要有springboot-order-rabbitmq-consumer与springboot-order-rabbitmq-producer两个Module。

这里还是简单说明一下pom.xml与application.yml配置:

pom.xml

<dependencies><!--rabbitmq starter依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency></dependencies>
# 服务端口
server:port: 8080
# 配置rabbitmq服务
spring:rabbitmq:username: adminpassword: adminvirtual-host: /host: 127.0.0.1  #基于本地windows RabbitMQ测试,云服务填写对应地址即可port: 5672

2.1.生产者配置类

RabbitMQ中消息队列模式主要常用的模式就是:fanout、direct、topic模式,这里我主要讲解fanout与direct进行配置类构建生产者消费者。

整合生成消息队列(交换机、Queues及绑定关系、Routing key)可以从生产者端也可从消费者端进行。

主要构建方式有两种:

①配置类生成交换机与队列

②注解形式绑定交换机队列关系(topic使用注解方式构建)

这里先说第一种配置类方式:

使用配置类生成消息生产者队列主要配置类说明:

主要配置类XxxTypeRabbitConfig

//注意:XxxType表示是交换机类型:可以是Fanout/Direct/Topic/Headers
@Configuration
public class XxxTypeRabbitConfig {//使用注入方式声明对应的Queue@Beanpublic Queue emailQueue() {// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue("email.xxxType.queue", true);}@Beanpublic Queue smsQueue() {return new Queue("sms.xxxType.queue", true);}@Beanpublic Queue weixinQueue() {return new Queue("weixin.xxxType.queue", true);}//声明交换机,不同的交换机类型不同:DirectExchange/FanoutExchange/TopicExchange/HeadersExchange@Beanpublic XxxTypeExchange xxxTypeOrderExchange() {return new XxxTypeExchange("xxxType_order_exchange", true, false);}//绑定关系:将队列和交换机绑定, 并设置用于匹配键:routingKey@Beanpublic Binding bindingXxxType1() {return BindingBuilder.bind(weixinQueue())  //绑定哪个Queue.to(fanoutOrderExchange());  //是哪个交换机}@Beanpublic Binding bindingXxxType2() {return BindingBuilder.bind(smsQueue()).to(xxxTypeOrderExchange());}@Beanpublic Binding bindingXxxType3() {return BindingBuilder.bind(emailQueue()).to(xxxTypeOrderExchange());}
}

消息发送类,主要给创建的队列填充消息,这里主要用到RabbitTemplate类调用convertAndSend方法进行对应交换机消息队列的发送:

@Service
public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate;// 1: 定义交换机private String exchangeName = "";// 2: 路由keyprivate String routeKey = "";//XxxType类型交换机public void makeOrderXxxType(Long userId, Long productId, int num) {exchangeName = "xxxType_order_exchange";routeKey = "";// 1: 模拟用户下单String orderNumer = UUID.randomUUID().toString();// 2: 根据商品id productId 去查询商品的库存// int numstore = productSerivce.getProductNum(productId);// 3:判断库存是否充足// if(num >  numstore ){ return  "商品库存不足..."; }// 4: 下单逻辑// orderService.saveOrder(order);// 5: 下单成功要扣减库存// 6: 下单完成以后System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);// 发送订单信息给RabbitMQ xxxTyperabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);}}

2.2.Fanout模式消息生产者

①创建交换机与队列生成配置类,注意fanout这里绑定Queues的时候不要设置routing key,是采用广播订阅发送的方式:

/*** @Description:  fanout交换机类型就是对应的消息采用广播订阅模式,订阅绑定交换机的队列都应该收到消息* @Author: fengye* @Date: 2021/4/16 14:29*/
@Configuration
public class FanoutRabbitConfig {//使用注入方式声明对应的Queue@Beanpublic Queue emailQueue() {// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue("email.fanout.queue", true);}@Beanpublic Queue smsQueue() {return new Queue("sms.fanout.queue", true);}@Beanpublic Queue weixinQueue() {return new Queue("weixin.fanout.queue", true);}//声明交换机,不同的交换机类型不同:DirectExchange/FanoutExchange/TopicExchange/HeadersExchange@Beanpublic FanoutExchange fanoutOrderExchange() {return new FanoutExchange("fanout_order_exchange", true, false);}//绑定关系:将队列和交换机绑定, 并设置用于匹配键:routingKey@Beanpublic Binding bindingFanout1() {return BindingBuilder.bind(weixinQueue())  //绑定哪个Queue.to(fanoutOrderExchange());  //是哪个交换机}@Beanpublic Binding bindingFanout2() {return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange());}@Beanpublic Binding bindingFanout3() {return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange());}
}

②消息队列发送到Queue,使用OrderService进行发送,主要用到了RabbitTemplate:

@Service
public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate;// 1: 定义交换机private String exchangeName = "";// 2: 路由keyprivate String routeKey = "";//Fanout类型交换机public void makeOrderFanout(Long userId, Long productId, int num) {exchangeName = "fanout_order_exchange";routeKey = "";// 1: 模拟用户下单String orderNumer = UUID.randomUUID().toString();// 2: 根据商品id productId 去查询商品的库存// int numstore = productSerivce.getProductNum(productId);// 3:判断库存是否充足// if(num >  numstore ){ return  "商品库存不足..."; }// 4: 下单逻辑// orderService.saveOrder(order);// 5: 下单成功要扣减库存// 6: 下单完成以后System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);// 发送订单信息给RabbitMQ fanoutrabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);}
}

③生产者方启动测试类向fanout_order_exchange交换机队列发送消息,存储到消息队列中:

@SpringBootTest
class RabbitmqApplicationTests {@Autowiredprivate OrderService orderService;@Testvoid fanoutTest() throws InterruptedException {for (int i = 0; i < 10; i++) {Thread.sleep(1000);Long userId = 100L + i;Long productId = 10001L + i;int num = 10;orderService.makeOrderFanout(userId, productId, num);}}
}

运行结果:

生成队列并存储10条消息。

2.3.Fanout模式消息消费者

①配置类实现消息消费者队列比较简单,主要就是使用@RabbitListener绑定对应的队列,并使用@RabbitHandler接收消息对应中的参数信息即可,注意选择合适的数据类型接收:

对应消息队列类配置:

//通过@RabbitListener绑定队列接收消息
@RabbitListener(queues = {"weixin.fanout.queue"})
@Component
public class FanoutDuanxinConsumer {//队列中的消息会通过@RabbitHandler注解注入到方法参数中,就可以获取到队列中的消息@RabbitHandlerpublic void reviceMessage(String message){System.out.println("weixin fanout----接收到了订单信息是:->" + message);}
}@RabbitListener(queues = {"email.fanout.queue"})
@Component
public class FanoutEmailConsumer {@RabbitHandlerpublic void reviceMessage(String message){System.out.println("email fanout----接收到了订单信息是:->" + message);}
}@RabbitListener(queues = {"sms.fanout.queue"})
@Component
public class FanoutSMSConsumer {@RabbitHandlerpublic void reviceMessage(String message){System.out.println("sms fanout----接收到了订单信息是:->" + message);}
}

启动消息接收者consumer SpringBoot项目:

可以看到消息队列存储消息已被消费,控制台打印出了对应的消息信息。

2.4.Direct模式消息生产者

Direct模式消息生产者基于配置类构建与Fanout一样,这里简单说明一下配置类的增加的代码就行:

修改XxxTypeConfig基类为DirectExchange:

/*** @Description:  direct交换机类型采用routing key与Queue进行绑定,通过key不同一对一进行消息传递* @Author: fengye* @Date: 2021/4/16 14:29*/
@Configuration
public class DirectRabbitConfig {//使用注入方式声明对应的Queue@Beanpublic Queue emailQueue() {// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue("email.direct.queue", true);}@Beanpublic Queue smsQueue() {return new Queue("sms.direct.queue", true);}@Beanpublic Queue weixinQueue() {return new Queue("weixin.direct.queue", true);}//声明交换机,不同的交换机类型不同:DirectExchange/FanoutExchange/TopicExchange/HeadersExchange@Beanpublic DirectExchange directOrderExchange() {return new DirectExchange("direct_order_exchange", true, false);}//绑定关系:将队列和交换机绑定, 并设置用于匹配键:routingKey@Beanpublic Binding bindingFanout1() {return BindingBuilder.bind(weixinQueue())  //绑定哪个Queue.to(directOrderExchange())  //是哪个交换机.with("weixin");   //对应什么key}@Beanpublic Binding bindingFanout2() {return BindingBuilder.bind(smsQueue()).to(directOrderExchange()).with("sms");}@Beanpublic Binding bindingFanout3() {return BindingBuilder.bind(emailQueue()).to(directOrderExchange()).with("email");}
}

对应消息发送Service类:

@Service
public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate;// 1: 定义交换机private String exchangeName = "";// 2: 路由keyprivate String routeKey = "";//Direct类型交换机public void makeOrderDirect(Long userId, Long productId, int num) {exchangeName = "direct_order_exchange";routeKey = "weixin";// 1: 模拟用户下单String orderNumer = UUID.randomUUID().toString();// 2: 根据商品id productId 去查询商品的库存// int numstore = productSerivce.getProductNum(productId);// 3:判断库存是否充足// if(num >  numstore ){ return  "商品库存不足..."; }// 4: 下单逻辑// orderService.saveOrder(order);// 5: 下单成功要扣减库存// 6: 下单完成以后System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);// 发送订单信息给RabbitMQ fanoutrabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);}
}

执行测试类进行测试:

@SpringBootTest
class RabbitmqApplicationTests {@Autowiredprivate OrderService orderService;@Testvoid directTest() throws InterruptedException {for (int i = 0; i < 10; i++) {Thread.sleep(1000);Long userId = 100L + i;Long productId = 10001L + i;int num = 10;orderService.makeOrderDirect(userId, productId, num);}}
}

运行结果:

可以看到DirectQueue消息队列已经生成并存储到对应的weixin路由Key的队列中:

2.5.Direct模式消息消费者

①创建对应的消息队列消费者类,使用@RabbitListener、@RabbitHandler进行监听并绑定消息获取结果,这部分与上面的Fanout模式消费者是一样的:

//通过@RabbitListener绑定队列接收消息
@RabbitListener(queues = {"weixin.direct.queue"})
@Component
public class DirectDuanxinConsumer {//队列中的消息会通过@RabbitHandler注解注入到方法参数中,就可以获取到队列中的消息@RabbitHandlerpublic void reviceMessage(String message){System.out.println("duanxin direct queue----接收到了订单信息是:->" + message);}
}@RabbitListener(queues = {"email.direct.queue"})
@Component
public class DirectEmailConsumer {@RabbitHandlerpublic void reviceMessage(String message){System.out.println("email direct----接收到了订单信息是:->" + message);}
}@RabbitListener(queues = {"sms.direct.queue"})
@Component
public class DirectSMSConsumer {@RabbitHandlerpublic void reviceMessage(String message){System.out.println("sms direct----接收到了订单信息是:->" + message);}
}

②启动SpringBoot项目进行消费测试:

可以看到消息队列中绑定weixin端队列收到了10条消息。

3.基于SpringBoot注解类构建消息队列

使用注解方式实现消息队列主要是从消费者进行交换机与Queues队列的绑定关系建立,并使用@Component进行注入,可以比较简单地处理交换机与队列之间的绑定关系,随SpringBoot项目一启动就同时创建Exchange与Queues队列的关系。

下面总的说一下主要的注解:

//通过@RabbitListener绑定队列接收消息
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings = @QueueBinding(//队列名字,绑定对应的队列接收消息value = @Queue(value = "weixin.xxxType.queue", autoDelete = "false"),//交换机名字,必须和生产者中交换机名相同;指定绑定的交换机类型exchange = @Exchange(value = "xxxType_order_exchange", type = ExchangeTypes.XXXType),key = "com.#"
))
//队列中的消息会通过@RabbitHandler注解注入到方法参数中,就可以获取到队列中的消息
@RabbitHandler

3.1.Topic模式消息消费者

topic模式这里从消息消费者Springboot项目入手,优先创建出RabbitMQ上的消息队列与交换机进行绑定,基于@RabbitListener与@QueueBinding会随项目启动自动创建消息队列:

//通过@RabbitListener绑定队列接收消息
// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(bindings = @QueueBinding(//队列名字,绑定对应的队列接收消息value = @Queue(value = "weixin.topic.queue", autoDelete = "false"),//交换机名字,必须和生产者中交换机名相同;指定绑定的交换机类型exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),key = "com.#"
))
@Component
public class TopicDuanxinConsumer {//队列中的消息会通过@RabbitHandler注解注入到方法参数中,就可以获取到队列中的消息@RabbitHandlerpublic void reviceMessage(String message){System.out.println("duanxin topic----接收到了订单信息是:->" + message);}
}@RabbitListener(bindings = @QueueBinding(//队列名字,绑定对应的队列接收消息value = @Queue(value = "email.topic.queue", autoDelete = "false"),//交换机名字,必须和生产者中交换机名相同;指定绑定的交换机类型exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),key = "#.order.#"
))
@Component
public class TopicEmailConsumer {//队列中的消息会通过@RabbitHandler注解注入到方法参数中,就可以获取到队列中的消息@RabbitHandlerpublic void reviceMessage(String message){System.out.println("email topic----接收到了订单信息是:->" + message);}}@RabbitListener(bindings = @QueueBinding(//队列名字,绑定对应的队列接收消息value = @Queue(value = "sms.topic.queue", autoDelete = "false"),//交换机名字,必须和生产者中交换机名相同;指定绑定的交换机类型exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),key = "*.course.*"
))
@Component
public class TopicSMSConsumer {//队列中的消息会通过@RabbitHandler注解注入到方法参数中,就可以获取到队列中的消息@RabbitHandlerpublic void reviceMessage(String message){System.out.println("sms topic----接收到了订单信息是:->" + message);}
}

启动SpringBoot消费者项目,进行验证:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Kqk3vyMr-1618649142644)(https://upload-images.jianshu.io/upload_images/26087315-22e48ccd79b6643f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

3.2.Topic模式消息生产者

使用注解配置无需再创建对应的配置类Config来绑定Exchange与Queues的关系了。

直接使用Sevice调用服务发送消息即可。

①服务调用、向队列中发送消息:

@Service
public class OrderService {@Autowiredprivate RabbitTemplate rabbitTemplate;// 1: 定义交换机private String exchangeName = "";// 2: 路由keyprivate String routeKey = "";//Topic类型交换机public void makeOrderTopic(Long userId, Long productId, int num) {exchangeName = "topic_order_exchange";routeKey = "com.course.user";// 1: 模拟用户下单String orderNumer = UUID.randomUUID().toString();// 2: 根据商品id productId 去查询商品的库存// int numstore = productSerivce.getProductNum(productId);// 3:判断库存是否充足// if(num >  numstore ){ return  "商品库存不足..."; }// 4: 下单逻辑// orderService.saveOrder(order);// 5: 下单成功要扣减库存// 6: 下单完成以后System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);// 发送订单信息给RabbitMQ fanoutrabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);}
}

②服务测试:

@SpringBootTest
class RabbitmqApplicationTests {@Autowiredprivate OrderService orderService;@Testvoid topicTest() throws InterruptedException {for (int i = 0; i < 10; i++) {Thread.sleep(1000);Long userId = 100L + i;Long productId = 10001L + i;int num = 10;orderService.makeOrderTopic(userId, productId, num);}}
}

消息发送:

消费方consumer服务(消费者服务不停止)接收消息:

MQ系列SpringBoot快速整合RabbitMQ相关推荐

  1. SpringBoot 2 快速整合 RabbitMQ

    前言 本文介绍了通过最简单方法使用 SpringBoot 2 整合 RabbitMQ,带你快速上手 RabbitMQ 的操作. 操作前需要先安装 RabbitMQ 服务.Windows 系统可以参考 ...

  2. 【SpringBoot】43、SpringBoot中整合RabbitMQ实现延时队列(延时插件篇)

    死信队列实现篇,参考文章:[SpringBoot]60.SpringBoot中整合RabbitMQ实现延时队列(死信队列篇) 一.介绍 1.什么是延时队列? 延时队列即就是放置在该队列里面的消息是不需 ...

  3. Rabbitmq专题:springboot如何整合Rabbitmq?Rabbitmq有哪些工作模式?

    文章目录 1. Rabbitmq的安装 2. Rabbitmq的基本概念 3. RabbitMQ的工作模式 3.1 "Hello World!" 简单模式 3.2 Work que ...

  4. RabbitMQ的6种工作模式的学习记录,普通MAVEN和springboot项目整合rabbitmq的API详解

    1.RabbitMQ后台管理页面 2.RabbitMQ 核心(自我理解) 3.RabbitMQ6种工作模式介绍 4. RabbitMQ的消息可靠性 5.RabbitMQ普通MAVEN项目使用 6.Sp ...

  5. SpringBoot——快速整合EasyExcel实现Excel的上传下载

    文章目录: 1.EasyExcel 2.Excel的上传(读Excel) 3.Excel的下载(写Excel) 4.结语 1.EasyExcel Hello,大家好啊,好久不见了,自工作之后真的很难腾 ...

  6. java xml快捷注释_详解SpringBoot 快速整合Mybatis(去XML化+注解进阶)

    序言:使用MyBatis3提供的注解可以逐步取代XML,例如使用@Select注解直接编写SQL完成数据查询,使用@SelectProvider高级注解还可以编写动态SQL,以应对复杂的业务需求. 一 ...

  7. Springboot快速整合通用Mapper

    前言 后端业务开发,每个表都要用到单表的增删改查等通用方法,而配置了通用Mapper可以极大的方便使用Mybatis单表的增删改查操作. 通用mapper配置 1.添加maven: <depen ...

  8. RabbitMQ 第一天 基础 6 SpringBoot 整合RabbitMQ

    RabbitMQ [黑马程序员RabbitMQ全套教程,rabbitmq消息中间件到实战] 文章目录 RabbitMQ 第一天 基础 6 SpringBoot 整合RabbitMQ 6.1 Sprin ...

  9. springboot+security整合(1)

    说明 springboot 版本 2.0.3 源码地址:点击跳转 系列 springboot+security 整合(1) springboot+security 整合(2) springboot+s ...

  10. SpringBoot项目整合Retrofit最佳实践,这才是最优雅的HTTP客户端工具!

    作者:六点半起床 juejin.im/post/6854573211426750472 大家都知道okhttp是一款由square公司开源的java版本http客户端工具.实际上,square公司还开 ...

最新文章

  1. 陈勋教授的脑电信号降噪视频与讲座总结
  2. 使用python开发网页游戏_如何用python开发游戏
  3. 猿团专访云信CTO阙杭宁——网易云信“稳定”背后的秘密
  4. 华为鸿蒙系统不卡,华为鸿蒙系统,到底能不能取代安卓?网友:细节决定成败...
  5. c#winform演练 ktv项目 制作歌曲播放列表
  6. JDBC之用元数据将结果集封装为List对象
  7. “参与 Debian 项目 20 年后,被降级的我选择退出”
  8. 用脚本实现FTP的上传和下载
  9. Spring AOP异常处理(error at ::0 formal unbound in pointcut)
  10. javascript 阮一峰入门教程
  11. Scintilla教程(7): 多选以及滚动
  12. 手机远程控制电脑如何做到?
  13. 微信公号DIY:一小时搭建微信聊天机器人
  14. mysql 实现api接口_一套免费MySQL数据库数据接口API,让项目开发更简单
  15. python floor是什么意思_python里floor怎么用
  16. c语言入门1.2.3 百度云,C语言入门1.2.3--一个老鸟的C语言学习心得(附光盘)
  17. 第二届全国智能制造(中国制造2025)创新创业大赛华南人工智能专项赛决赛圆满举办
  18. linux-scp上传下载
  19. PHY6252超低功耗物联网蓝牙无线通信芯片
  20. 【C语言】时间转换24小时制转12小时制

热门文章

  1. 分享一个简单免费查询你手机注册过的网站的方法/app
  2. 【《编码(Coding)》读后感】隐匿在计算机软硬件背后的语言
  3. 车牌识别对于智慧城市的重要性
  4. 利用matlab设计矩形脉冲信号,信号课程设计
  5. 阿里云服务器使用步骤详解
  6. ESP32使用I2C数字电阻AD5254做PT100仿真
  7. eNSP华为路由器与交换机连接
  8. 读者提问:如何提高效率?
  9. Ubuntu挂载ISO文件
  10. 全国多年太阳辐射空间分布数据1981-2022年、气温分布数据、蒸散量数据、蒸发量数据、降雨量分布数据、日照数据、风速数据