springboot 集成rabbitmq 实例
springboot 集成rabbitmq 实例
个人在学习rabbitmq时发现网上很少有系统性介绍springboot和rabbitmq如何集成的,其他人总结的都片段化,所以结合个人调研过程,整理此篇文章。
本文章共分为以下部分:
- rabbitmq简介
- springboot配置
- rabbitmq生产者配置
- rabbitmq消费者配置
- 问题补充
一、rabbitmq简介
目前流程的消息队列主要有:ActivityMQ/kafka/redis/rabbitmq等,各有各自的应用场景,关于各个框架的介绍,大家可自行百度,网上很多文章介绍~其中rabbit因为其ack特性以及还算不错的性能被大多数公司采用。
概念:
- 生产者 消息的产生方,负责将消息推送到消息队列
- 消费者 消息的最终接受方,负责监听队列中的对应消息,消费消息
- 队列 消息的寄存器,负责存放生产者发送的消息
- 交换机 负责根据一定规则分发生产者产生的消息
- 绑定 完成交换机和队列之间的绑定
模式:
- direct
直连模式,用于实例间的任务分发 - topic
话题模式,通过可配置的规则分发给绑定在该exchange上的队列 - headers
适用规则复杂的分发,用headers里的参数表达规则 - fanout
分发给所有绑定到该exchange上的队列,忽略routing key
安装
单机版安装很简单,大概步骤如下:
# 安装erlang包yum install erlang
# 安装socatyum install socat
# 安装rabbit rpm -ivh rabbitmq-server-3.6.6-1.el6.noarch.rpm
# 启动服务rabbitmq-server start
# 增加管理控制功能rabbitmq-plugins enable rabbitmq_management
# 增加用户:sudo rabbitmqctl add_user root passwordrabbitmqctl set_user_tags root administrator rabbitmqctl set_permissions -p / root '.*' '.*' '.*'
集群安装,可参考以下博客:
rabbitmq集群安装
二、springboot配置
废话少说直接上代码:
配置参数
application.yml:
spring:rabbitmq:addresses: 192.168.1.1:5672username: usernamepassword: passwordpublisher-confirms: truevirtual-host: /
java config读取参数
/*** RabbitMq配置文件读取类** @author chenhf* @create 2017-10-23 上午9:31**/
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {@Value("${spring.rabbitmq.addresses}")private String addresses;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.publisher-confirms}")private Boolean publisherConfirms;@Value("${spring.rabbitmq.virtual-host}")private String virtualHost;// 构建mq实例工厂@Beanpublic ConnectionFactory connectionFactory(){CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses(addresses);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setPublisherConfirms(publisherConfirms);connectionFactory.setVirtualHost(virtualHost);return connectionFactory;}@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){return new RabbitAdmin(connectionFactory);}@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)public RabbitTemplate rabbitTemplate(){RabbitTemplate template = new RabbitTemplate(connectionFactory());return template;}
}
三、rabbitmq生产者配置
主要配置了直连和话题模式,其中话题模式设置两个队列(queueTopicTest1、queueTopicTest2),此两个队列在和交换机绑定时分别设置不同的routingkey(.TEST.以及lazy.#)来验证匹配模式。
/*** 用于配置交换机和队列对应关系* 新增消息队列应该按照如下步骤* 1、增加queue bean,参见queueXXXX方法* 2、增加queue和exchange的binding* @author chenhf* @create 2017-10-23 上午10:33**/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class RabbitMqExchangeConfig {/** logger */private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig.class);/*** @Author:chenhf* @Description: 主题型交换机* @Date:下午5:49 2017/10/23* @param* @return*/@BeanTopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode());rabbitAdmin.declareExchange(contractTopicExchange);logger.debug("完成主题型交换机bean实例化");return contractTopicExchange;}/*** 直连型交换机*/@BeanDirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) {DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode());rabbitAdmin.declareExchange(contractDirectExchange);logger.debug("完成直连型交换机bean实例化");return contractDirectExchange;}//在此可以定义队列@BeanQueue queueTest(RabbitAdmin rabbitAdmin){Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode());rabbitAdmin.declareQueue(queue);logger.debug("测试队列实例化完成");return queue;}//topic 1@BeanQueue queueTopicTest1(RabbitAdmin rabbitAdmin){Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode());rabbitAdmin.declareQueue(queue);logger.debug("话题测试队列1实例化完成");return queue;}//topic 2@BeanQueue queueTopicTest2(RabbitAdmin rabbitAdmin){Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode());rabbitAdmin.declareQueue(queue);logger.debug("话题测试队列2实例化完成");return queue;}//在此处完成队列和交换机绑定@BeanBinding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode());rabbitAdmin.declareBinding(binding);logger.debug("测试队列与直连型交换机绑定完成");return binding;}//topic binding1@BeanBinding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode());rabbitAdmin.declareBinding(binding);logger.debug("测试队列与话题交换机1绑定完成");return binding;}//topic binding2@BeanBinding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode());rabbitAdmin.declareBinding(binding);logger.debug("测试队列与话题交换机2绑定完成");return binding;}}
在这里用到枚举类:RabbitMqEnum
/*** 定义rabbitMq需要的常量** @author chenhf* @create 2017-10-23 下午4:07**/
public class RabbitMqEnum {/*** @param* @Author:chenhf* @Description:定义数据交换方式* @Date:下午4:08 2017/10/23* @return*/public enum Exchange {CONTRACT_FANOUT("CONTRACT_FANOUT", "消息分发"),CONTRACT_TOPIC("CONTRACT_TOPIC", "消息订阅"),CONTRACT_DIRECT("CONTRACT_DIRECT", "点对点");private String code;private String name;Exchange(String code, String name) {this.code = code;this.name = name;}public String getCode() {return code;}public String getName() {return name;}}/*** describe: 定义队列名称* creat_user: chenhf* creat_date: 2017/10/31**/public enum QueueName {TESTQUEUE("TESTQUEUE", "测试队列"),TOPICTEST1("TOPICTEST1", "topic测试队列"),TOPICTEST2("TOPICTEST2", "topic测试队列");private String code;private String name;QueueName(String code, String name) {this.code = code;this.name = name;}public String getCode() {return code;}public String getName() {return name;}}/*** describe: 定义routing_key* creat_user: chenhf* creat_date: 2017/10/31**/public enum QueueEnum {TESTQUEUE("TESTQUEUE1", "测试队列key"),TESTTOPICQUEUE1("*.TEST.*", "topic测试队列key"),TESTTOPICQUEUE2("lazy.#", "topic测试队列key");private String code;private String name;QueueEnum(String code, String name) {this.code = code;this.name = name;}public String getCode() {return code;}public String getName() {return name;}}}
以上完成消息生产者的定义,下面封装调用接口
测试时直接调用此工具类,testUser类需自己实现
rabbitMqSender.sendRabbitmqDirect("TESTQUEUE1",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.1.2",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.TEST.2",testUser);
/*** rabbitmq发送消息工具类** @author chenhf* @create 2017-10-26 上午11:10**/@Component
public class RabbitMqSender implements RabbitTemplate.ConfirmCallback{/** logger */private static final Logger logger = LoggerFactory.getLogger(RabbitMqSender.class);private RabbitTemplate rabbitTemplate;@Autowiredpublic RabbitMqSender(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;this.rabbitTemplate.setConfirmCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean b, String s) {logger.info("confirm: " + correlationData.getId());}/*** 发送到 指定routekey的指定queue* @param routeKey* @param obj*/public void sendRabbitmqDirect(String routeKey,Object obj) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());logger.info("send: " + correlationData.getId());this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode(), routeKey , obj, correlationData);}/*** 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定Topic的Queue上* @param routeKey* @param obj*/public void sendRabbitmqTopic(String routeKey,Object obj) {CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());logger.info("send: " + correlationData.getId());this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode(), routeKey , obj, correlationData);}
}
四、rabbitmq消费者配置
springboot注解方式监听队列,无法手动指定回调,所以采用了实现ChannelAwareMessageListener接口,重写onMessage来进行手动回调,详见以下代码,详细介绍可以在spring的官网上找amqp相关章节阅读
直连消费者
通过设置TestUser的name来测试回调,分别发两条消息,一条UserName为1,一条为2,查看控制台中队列中消息是否被消费
/*** 消费者配置** @author chenhf* @create 2017-10-30 下午3:14**/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class ExampleAmqpConfiguration {@Bean("testQueueContainer")public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("TESTQUEUE");container.setMessageListener(exampleListener());container.setAcknowledgeMode(AcknowledgeMode.MANUAL);return container;}@Bean("testQueueListener")public ChannelAwareMessageListener exampleListener() {return new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());//通过设置TestUser的name来测试回调,分别发两条消息,一条UserName为1,一条为2,查看控制台中队列中消息是否被消费if ("2".equals(testUser.getUserName())){System.out.println(testUser.toString());channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}if ("1".equals(testUser.getUserName())){System.out.println(testUser.toString());channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);}}};}}
topic消费者1
/*** 消费者配置** @author chenhf* @create 2017-10-30 下午3:14**/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration {@Bean("topicTest1Container")public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("TOPICTEST1");container.setMessageListener(exampleListener1());container.setAcknowledgeMode(AcknowledgeMode.MANUAL);return container;}@Bean("topicTest1Listener")public ChannelAwareMessageListener exampleListener1(){return new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());System.out.println("TOPICTEST1:"+testUser.toString());channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}};}}
topic消费者2
/*** 消费者配置** @author chenhf* @create 2017-10-30 下午3:14**/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration2 {@Bean("topicTest2Container")public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames("TOPICTEST2");container.setMessageListener(exampleListener());container.setAcknowledgeMode(AcknowledgeMode.MANUAL);return container;}@Bean("topicTest2Listener")public ChannelAwareMessageListener exampleListener() {return new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());System.out.println("TOPICTEST2:"+testUser.toString());channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}};}}
问题补充
使用过程中可能出现的坑参考此篇文章
https://segmentfault.com/a/11...
springboot 集成rabbitmq 实例相关推荐
- SpringBoot集成RabbitMQ消息队列搭建与ACK消息确认入门
1.Windows下安装RabbitMQ的步骤详解+图解(erlang+RabbitMQ) 2.SpringBoot集成RabbitMQ参考文章 1.RabbitMQ介绍 RabbitMQ是实现AMQ ...
- Springboot集成RabbitMQ一个完整案例
springboot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,springboot 提供了 spring-boot-starter-amqp 对消息各种支持. 1.配置pom文 ...
- springboot 集成 RabbitMQ confirm 确认模式和 return 回退模式以及Consumer Ack模式
springboot 集成 RabbitMQ confirm 确认模式和 return 回退模式以及Consumer Ack模式 说明: RabbitMQ消息的可靠投递 在使用 RabbitMQ 的时 ...
- Springboot集成rabbitmq实现延时队列
Springboot集成rabbitmq实现延时队列 什么是延时队列? 列举几个使用场景: 常见的种类有: 延时任务-实现方式: 详细信息:[https://www.cnblogs.com/JonaL ...
- Springboot集成rabbitMQ之mandatory和备份交换机
Springboot集成rabbitMQ之mandatory和备份交换机 mandatory 之前编写的消息队列代码中,通过重写ConfirmCallback中的confirm方法实现了消息送达的确认 ...
- RabbitMQ——SpringBoot集成RabbitMQ
文章目录: 1.创建一个SpringBoot工程--消息发送者 1.创建一个SpringBoot工程--消息接收者 3.测试结果 3.1 direct 3.2 fanout 3.3 topic 3.4 ...
- springboot集成rabbitmq商品秒杀业务实战(流量削峰)
消息队列如何实现流量削峰? 要对流量进行削峰,最容易想到的解决方案就是用消息队列来缓冲瞬时流量,把同步的直接调用转换成异步的间接推送,中间通过一个队列在一端承接瞬时的流量洪峰,在另一端平滑地将消息推送 ...
- SpringBoot集成rabbitmq错误:org.springframework.amqp.AmqpConnectException: java.net.ConnectException的解决办法
在集成rabbitmq后,运行项目,报错日志: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Co ...
- springboot集成rabbitMQ安装+代码
环境安装 本机电脑是mac,所以直接在终端下(无论在哪个目录下都不影响)输入 brew install erlang,先装erlang,不然rabbitmq装了跑不了,接下来就是等待的时刻.... 装 ...
最新文章
- Silverlight中文件的生成操作与其对应的获取方法
- mybaits十二:使用collection嵌套结果集查询
- Machine Learning week 2 quiz: Linear Regression with Multiple Variables
- 数据库设计与查询语句的优化
- 时间序列 线性回归 区别_时间序列分析的完整介绍(带R)::线性过程I
- JPG PNG GIF BMP图片格式的区别
- 玩转Java注解:元注解、内置注解、自定义注解的原理和实现
- Zara精讲C#.Cache、它和Redis区别是什么???
- 考虑SOC蓄电池 双向DC/DC 充放电控制 matlab仿真模型 buck boost
- 美洽客服报表功能:用数据驱动企业业绩增长
- 玩转基因组浏览器之使用IGV查看基因结构信息
- KKB:二进制知识:15瓶水,其中只有一瓶水有毒,请问至少需要几只小白鼠可以一次性喝出来?
- ddr走线教程_DDR3 Fly By走线精讲
- 关于下载表格数据乱码的解决方案
- 论文翻译:混合维在庞加莱几何三维骨架的动作识别
- linux分区管理,Linux下磁盘分区管理
- 关于MySQL错误提示ERROR 1265 (01000)的问题分析及解决方案
- 皮肤结构走向运用仿制图章
- win11 / win10 彻底删除系统“快速访问”中自动添加的文件夹 - 不再自动添加
- C++ GDAL/OGR 图层求交集Intersection
热门文章
- 使用.NET中的XML注释(一) -- XML注释标签讲解
- 新版上线时发现的数据库优化问题
- java 构造函数抛出异常,构造函数抛出异常;嵌套异常是java.lang.NoClassDefFoundError:javax/servlet/ServletContext...
- 近期低分纯生信友好的期刊简介
- 毕业论文选题三步搞定!
- mysql gtid 5.7_MySQL5.7之GTID复制
- 获取map第一个的key和value_谁要是再敢用Map传参,我过去就是一JIO
- Ubuntu | ubuntu 中配置静态 IP
- SQLite学习手册(实例代码一)
- 在html中引入css内部样式表使用,CSS样式学习笔记(三)html文件引入CSS的方法(2)...