转自:

SpringBoot连接多RabbitMQ源 - 掘金在实际开发中,很多场景需要异步处理,这时就需要用到RabbitMQ,而且随着场景的增多程序可能需要连接多个RabbitMQ。SpringBoot本身提供了默认的配置可以快速配置连接RabbitMQ,但是只能连接一个RabbitMQ,当需要连接多个RabbitMQ时,默认的配置就…https://juejin.cn/post/6844904039797243917


在实际开发中,很多场景需要异步处理,这时就需要用到RabbitMQ,而且随着场景的增多程序可能需要连接多个RabbitMQ。SpringBoot本身提供了默认的配置可以快速配置连接RabbitMQ,但是只能连接一个RabbitMQ,当需要连接多个RabbitMQ时,默认的配置就不太适用了,需要单独编写每个连接。

在SpringBoot框架中,我们常用的两个类一般是:

  • RabbitTemplate:作为生产、消费消息使用;
  • RabbitAdmin:作为申明、删除交换机和队列,绑定和解绑队列和交换机的绑定关系使用。

所以我们连接多个RabbitMQ就需要重新建立连接、重新实现这两个类。 代码如下:

配置

application.properties配置文件需要配置两个连接:

server.port=8080# rabbitmq
v2.spring.rabbitmq.host=host
v2.spring.rabbitmq.port=5672
v2.spring.rabbitmq.username=username
v2.spring.rabbitmq.password=password
v2.spring.rabbitmq.virtual-host=virtual-host
#consume 手动 ack
v2.spring.rabbitmq.listener.simple.acknowledge-mode=manual
#1.当mandatory标志位设置为true时,
#   如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,
#   那么broker会调用basic.return方法将消息返还给生产者;
#2.当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,
#   mandatory标志告诉broker代理服务器至少将消息route到一个队列中,
#   否则就将消息return给发送者;
v2.spring.rabbitmq.template.mandatory=true
#publisher confirms 发送确认
v2.spring.rabbitmq.publisher-confirms=true
#returns callback :
#   1.未送达exchange
#   2.送达exchange却未送道queue的消息 回调returnCallback.(注意)出现2情况时,publisher-confirms 回调的是true
v2.spring.rabbitmq.publisher-returns=true
v2.spring.rabbitmq.listener.simple.prefetch=5# rabbitmq
v1.spring.rabbitmq.host=host
v1.spring.rabbitmq.port=5672
v1.spring.rabbitmq.username=username
v1.spring.rabbitmq.password=password
v1.spring.rabbitmq.virtual-host=virtual-host
#consume 手动 ack
v1.spring.rabbitmq.listener.simple.acknowledge-mode=manual
#1.当mandatory标志位设置为true时,
#   如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,
#   那么broker会调用basic.return方法将消息返还给生产者;
#2.当mandatory设置为false时,出现上述情况broker会直接将消息丢弃;通俗的讲,
#   mandatory标志告诉broker代理服务器至少将消息route到一个队列中,
#   否则就将消息return给发送者;
v1.spring.rabbitmq.template.mandatory=true
#publisher confirms 发送确认
v1.spring.rabbitmq.publisher-confirms=true
#returns callback :
#   1.未送达exchange
#   2.送达exchange却未送道queue的消息 回调returnCallback.(注意)出现2情况时,publisher-confirms 回调的是true
v1.spring.rabbitmq.publisher-returns=true
v1.spring.rabbitmq.listener.simple.prefetch=5

重写连接工厂

需要注意的是,在多源的情况下,需要在某个连接加上@Primary注解,表示主连接,默认使用这个连接;

package com.example.config.rabbitmq;import com.alibaba.fastjson.JSON;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;/*** Created by shuai on 2019/4/23.*/
@Configuration
public class MultipleRabbitMQConfig {@Bean(name = "v2ConnectionFactory")public CachingConnectionFactory hospSyncConnectionFactory(@Value("${v2.spring.rabbitmq.host}") String host,@Value("${v2.spring.rabbitmq.port}") int port,@Value("${v2.spring.rabbitmq.username}") String username,@Value("${v2.spring.rabbitmq.password}") String password,@Value("${v2.spring.rabbitmq.virtual-host}") String virtualHost,@Value("${v2.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,@Value("${v2.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);connectionFactory.setPublisherConfirms(publisherConfirms);connectionFactory.setPublisherReturns(publisherReturns);return connectionFactory;}@Bean(name = "v2RabbitTemplate")public RabbitTemplate firstRabbitTemplate(@Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,@Value("${v2.spring.rabbitmq.template.mandatory}") Boolean mandatory) {RabbitTemplate v2RabbitTemplate = new RabbitTemplate(connectionFactory);v2RabbitTemplate.setMandatory(mandatory);v2RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {if (!ack) {
//                    LOGGER.info("{} 发送RabbitMQ消息 ack确认 失败: [{}]", this.name, JSON.toJSONString(object));} else {
//                    LOGGER.info("{} 发送RabbitMQ消息 ack确认 成功: [{}]", this.name, JSON.toJSONString(object));}});v2RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> {
//                LOGGER.error("{} 发送RabbitMQ消息returnedMessage,出现异常,Exchange不存在或发送至Exchange却没有发送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});});return v2RabbitTemplate;}@Bean(name = "v2ContainerFactory")public SimpleRabbitListenerContainerFactory hospSyncFactory(@Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory,@Value("${v2.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,@Value("${v2.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));factory.setPrefetchCount(prefetch);return factory;}@Bean(name = "v2RabbitAdmin")public RabbitAdmin iqianzhanRabbitAdmin(@Qualifier("v2ConnectionFactory") ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}// mq主连接@Bean(name = "v1ConnectionFactory")@Primarypublic CachingConnectionFactory publicConnectionFactory(@Value("${v1.spring.rabbitmq.host}") String host,@Value("${v1.spring.rabbitmq.port}") int port,@Value("${v1.spring.rabbitmq.username}") String username,@Value("${v1.spring.rabbitmq.password}") String password,@Value("${v1.spring.rabbitmq.virtual-host}") String virtualHost,@Value("${v1.spring.rabbitmq.publisher-confirms}") Boolean publisherConfirms,@Value("${v1.spring.rabbitmq.publisher-returns}") Boolean publisherReturns) {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);connectionFactory.setPublisherConfirms(publisherConfirms);connectionFactory.setPublisherReturns(publisherReturns);return connectionFactory;}@Bean(name = "v1RabbitTemplate")@Primarypublic RabbitTemplate publicRabbitTemplate(@Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory,@Value("${v1.spring.rabbitmq.template.mandatory}") Boolean mandatory) {RabbitTemplate v1RabbitTemplate = new RabbitTemplate(connectionFactory);v1RabbitTemplate.setMandatory(mandatory);v1RabbitTemplate.setConfirmCallback((correlationData, ack, s) -> {if (!ack) {
//                    LOGGER.info("{} 发送RabbitMQ消息 ack确认 失败: [{}]", this.name, JSON.toJSONString(object));} else {
//                    LOGGER.info("{} 发送RabbitMQ消息 ack确认 成功: [{}]", this.name, JSON.toJSONString(object));}});v1RabbitTemplate.setReturnCallback((message, code, s, exchange, routingKey) -> {
//                LOGGER.error("{} 发送RabbitMQ消息returnedMessage,出现异常,Exchange不存在或发送至Exchange却没有发送到Queue中,message:[{}], code[{}], s[{}], exchange[{}], routingKey[{}]", new Object[]{this.name, JSON.toJSONString(message), JSON.toJSONString(code), JSON.toJSONString(s), JSON.toJSONString(exchange), JSON.toJSONString(routingKey)});});return v1RabbitTemplate;}@Bean(name = "v1ContainerFactory")@Primarypublic SimpleRabbitListenerContainerFactory insMessageListenerContainer(@Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory,@Value("${v1.spring.rabbitmq.listener.simple.acknowledge-mode}") String acknowledge,@Value("${v1.spring.rabbitmq.listener.simple.prefetch}") Integer prefetch) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.valueOf(acknowledge.toUpperCase()));factory.setPrefetchCount(prefetch);return factory;}@Bean(name = "v1RabbitAdmin")@Primarypublic RabbitAdmin publicRabbitAdmin(@Qualifier("v1ConnectionFactory") ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}
}

创建Exchange、Queue并绑定

再实现RabbitAdmin后,我们就需要根据RabbitAdmin创建对应的交换机和队列,并建立绑定关系

package com.example.config.rabbitmq;import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;
import javax.annotation.Resource;/*** 创建Queue、Exchange并建立绑定关系* Created by shuai on 2019/5/16.*/
@Configuration
public class MyRabbitMQCreateConfig {@Resource(name = "v2RabbitAdmin")private RabbitAdmin v2RabbitAdmin;@Resource(name = "v1RabbitAdmin")private RabbitAdmin v1RabbitAdmin;@PostConstructpublic void RabbitInit() {v2RabbitAdmin.declareExchange(new TopicExchange("exchange.topic.example.new", true, false));v2RabbitAdmin.declareQueue(new Queue("queue.example.topic.new", true));v2RabbitAdmin.declareBinding(BindingBuilder.bind(new Queue("queue.example.topic.new", true))        //直接创建队列.to(new TopicExchange("exchange.topic.example.new", true, false))    //直接创建交换机 建立关联关系.with("routing.key.example.new"));    //指定路由Key}
}

生产者

为了后续验证每个连接都建立成功,并且都能生产消息,生产者这里分别使用新生成的RabbitTemplate发送一条消息。

package com.example.topic;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.Resource;@Component
public class TopicProducer {@Resource(name = "v1RabbitTemplate")private RabbitTemplate v1RabbitTemplate;@Resource(name = "v2RabbitTemplate")private RabbitTemplate v2RabbitTemplate;public void sendMessageByTopic() {String content1 = "This is a topic type of the RabbitMQ message example from v1RabbitTemplate";v1RabbitTemplate.convertAndSend("exchange.topic.example.new","routing.key.example.new",content1);String content2 = "This is a topic type of the RabbitMQ message example from v2RabbitTemplate";v2RabbitTemplate.convertAndSend("exchange.topic.example.new","routing.key.example.new",content2);}
}

消费者

这里需要注意在配置消费队列时,需要标识ContainerFactory

package com.example.topic;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RabbitListener(queues = "queue.example.topic.new", containerFactory = "v2ContainerFactory")
public class TopicConsumer {@RabbitHandlerpublic void consumer(String message) {System.out.println(message);}
}

这样就完成了SpringBoot连接多个RabbitMQ源的示例了,再写一段测试代码验证下。

测试验证

package com.example.test;import com.example.topic.TopicProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQMultipleTest {@Autowiredprivate TopicProducer topicProducer;@Testpublic void topicProducerTest() {topicProducer.sendMessageByTopic();}
}

执行测试代码,验证结果为:

验证SpringBoot连接多RabbitMQ源成功!

github地址:Spring Boot 教程、技术栈、示例代码

SpringBoot连接多RabbitMQ源相关推荐

  1. RabbitMq 虚拟主机 virtual-host ,Springboot 中使用 RabbitMq 虚拟主机 virtual-host

    文章目录 RabbitMq 虚拟主机 virtual-host ,Springboot 中使用 RabbitMq 虚拟主机 virtual-host 1.在RabbitMQ 控制台中创建虚拟主机 2. ...

  2. springboot 入门详细教程 源码

    转载知乎文章 gitee官方教程+开源项目 Gitee ​ 已认证的官方帐号 1,303 人赞同了该回答 推荐以 Spring Boot 教程与 Spring Cloud 教程的详细开源项目 &quo ...

  3. SpringBoot 整合实现RabbitMQ

    目录 1.RabbitMQ介绍 2.RabbitMQ的工作原理 3. SpringBoot 整合实现RabbitMQ 3.1创建mq-rabbitmq-producer(生产者)发送消息 3.1.1p ...

  4. SpringBoot连接mysql数据,写入数据

    (1)先准备好mysql数据,作为springboot的数据存储服务器. 安装和启动mysql服务器的介绍:https://zhangphil.blog.csdn.net/article/detail ...

  5. springboot连接redis集群

    开启redis服务和客户端 查看下当前redis的进程 [root@localhost ~]# ps -ef | grep redis 启动redis服务 [root@localhost ~]# cd ...

  6. SpringBoot 连接mysql踩到的坑

    首先对于用SpringBoot连接mysql我先说明一下pom文件中需要引入那些jar: <dependency><groupId>mysql</groupId>& ...

  7. SpringBoot之使用RabbitMQ实现延迟队列

    在我们的各个项目中,经常会有这样的需求. 订单模块:在订单下单后30分钟如果没有付款,就自动取消订单, 短信模块:在下单成功后60s给用户发送短信通知 支付模块:在微信/支付宝支付成功后,1分钟后去调 ...

  8. 【原创】Windows下使用 Eclipse 管理 RabbitMQ 源码之问题解决

    2019独角兽企业重金招聘Python工程师标准>>> 使用 Eclipse + Erlide 来管理 Erlang 代码是一种常见方式.本文简要说明下,本人在 Windows 下管 ...

  9. Spring Boot学习总结(28)—— springboot连接postgresql 指定模式Schema

    springboot 连接 postgresql 指定模式Schema 一般的连接方式,我们创建数据库之后,在public 的Schema(模式)下建表,这时使用连接方式 jdbc:postgresq ...

最新文章

  1. c++ 类文件的动态库生成及调用例子
  2. SAP UI5 popup弹出对话框的调试
  3. DaveGray推荐的视觉思维好书(一)
  4. java 社招 简历_招聘java简历模板
  5. React跨域解决方案
  6. 狮子鱼社区团购独立版安装方法
  7. 编程珠玑续版-chp2 关联数组-awk
  8. Bypass open_basedir
  9. 没想到 Python 中竟然还藏着这些稀奇古怪的东西...
  10. 一个记账易app开发
  11. uniapp 电商小程序 订单30分钟倒计时
  12. 拾贰SparkSQL:数据关联优化
  13. C语言C Prime总结(2-7章)
  14. inc si指令的作用_亲水作用色谱(HILIC)(二):可选的固定相有哪些?
  15. UE风格化Day9-(摆烂特辑)原神石块砖材质欣赏
  16. hp服务器重置bmc,服务器BMC(带外)
  17. 群晖、威联通NAS硬盘本地化,使用RaiDrive通过WebDAV实现内网挂载
  18. 移动开发视频资源百度网盘地址分享
  19. MES系统数据采集实现方法
  20. HTMLa标签常用的四种链接

热门文章

  1. CodeCraft-20 (Div. 2) D. Nash Matrix 构造 + dfs
  2. 牛客网 【每日一题】5月13日 加分二叉树
  3. 周末狂欢赛3(跳格子,英雄联盟,排序问题)
  4. CF1481F-AB Tree【构造,背包】
  5. P4322-[JSOI2016]最佳团体【0/1分数规划,树形背包】
  6. jzoj3852-单词接龙【0/1分数规划,负环】
  7. 【DP】滑雪场的缆车(jzoj 1257)
  8. 初一模拟赛总结(2019.3.9)
  9. P2468 [SDOI2010]粟粟的书架 动态规划,主席树,二分答案
  10. 三个好用的并发工具类