Spring Boot 中使用 RabbitMQ
2019独角兽企业重金招聘Python工程师标准>>>
RabbitMQ
是一个开源的AMQP
实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP
等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
AMQP
,即Advanced message Queuing
Protocol
,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP
的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP
等,支持AJAX
。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
常用概念
通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ
在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange
). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。
准备
环境安装
任选其一
CentOs7.3 搭建 RabbitMQ 3.6 单机服务与使用
http://www.ymq.io/2017/08/16/rabbit-install
CentOs7.3 搭建 RabbitMQ 3.6 Cluster 集群服务与使用
http://www.ymq.io/2017/08/17/rabbit-install-cluster
Github 代码
代码我已放到 Github ,导入spring-boot-rabbitmq
项目
github https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-rabbitmq
添加依赖
在项目中添加 spring-boot-starter-amqp
依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
参数配置
spring.application.name=ymq-rabbitmq-spring-bootspring.rabbitmq.host=10.4.98.15
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
交换机(Exchange)
1.Direct Exchange 根据route key 直接找到队列
2.Topic Exchange 根据route key 匹配队列
3.Topic Exchange 不处理route key 全网发送,所有绑定的队列都发送
Direct Exchange
Direct Exchange
是RabbitMQ
默认的交换机模式,也是最简单的模式,根据key
全文匹配去寻找队列。
任何发送到Direct Exchange
的消息都会被转发到RouteKey
中指定的Queue
。
1.一般情况可以使用rabbitMQ
自带的Exchange:""
(该Exchange
的名字为空字符串,下文称其为default Exchange
)。
2.这种模式下不需要将Exchange
进行任何绑定(binding
)操作
3.消息传递时需要一个RouteKey
,可以简单的理解为要发送到的队列名字。
4.如果vhost
中不存在RouteKey
中指定的队列名,则该消息会被抛弃。
配置队列
@Configuration
public class RabbitDirectConfig {@Beanpublic Queue helloQueue() {return new Queue("hello");}@Beanpublic Queue directQueue() {return new Queue("direct");}//-------------------配置默认的交换机模式,可以不需要配置以下-----------------------------------@BeanDirectExchange directExchange() {return new DirectExchange("directExchange");}//绑定一个key "direct",当消息匹配到就会放到这个队列中@BeanBinding bindingExchangeDirectQueue(Queue directQueue, DirectExchange directExchange) {return BindingBuilder.bind(directQueue).to(directExchange).with("direct");}// 推荐使用 helloQueue() 方法写法,这种方式在 Direct Exchange 模式 多此一举,没必要这样写//---------------------------------------------------------------------------------------------
}
监听队列
@Component
@RabbitListener(queues = "hello")
public class helloReceiver {@RabbitHandlerpublic void process(String message) {System.out.println("接收者 helloReceiver," + message);}
}
@Component
@RabbitListener(queues = "direct")
public class DirectReceiver {@RabbitHandlerpublic void process(String message) {System.out.println("接收者 DirectReceiver," + message);}
}
发送消息
package io.ymq.rabbitmq.test;import io.ymq.rabbitmq.run.Startup;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;/*** 描述: 默认的交换机模式** @author: yanpenglei* @create: 2017/10/25 1:03*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Startup.class)
public class RabbitDirectTest {@Autowiredprivate AmqpTemplate rabbitTemplate;@Testpublic void sendHelloTest() {String context = "此消息在,默认的交换机模式队列下,有 helloReceiver 可以收到";String routeKey = "hello";context = "routeKey:" + routeKey + ",context:" + context;System.out.println("sendHelloTest : " + context);this.rabbitTemplate.convertAndSend(routeKey, context);}@Testpublic void sendDirectTest() {String context = "此消息在,默认的交换机模式队列下,有 DirectReceiver 可以收到";String routeKey = "direct";String exchange = "directExchange";context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;System.out.println("sendDirectTest : " + context);// 推荐使用 sendHello() 方法写法,这种方式在 Direct Exchange 多此一举,没必要这样写this.rabbitTemplate.convertAndSend(exchange, routeKey, context);}
}
按顺序执行:响应
接收者 helloReceiver,routeKey:hello,context:此消息在,默认的交换机模式队列下,有 helloReceiver 可以收到接收者 DirectReceiver,context:directExchange,routeKey:direct,context:此消息在,默认的交换机模式队列下,有 DirectReceiver 可以收到
Fanout Exchange
任何发送到Fanout Exchange
的消息都会被转发到与该Exchange
绑定(Binding)
的所有Queue上
。
1.可以理解为路由表的模式
2.这种模式不需要 RouteKey
3.这种模式需要提前将Exchange
与Queue
进行绑定,一个Exchange
可以绑定多个Queue
,一个Queue
可以同多个Exchange
进行绑定。
4.如果接受到消息的Exchange
没有与任何Queue
绑定,则消息会被抛弃。
配置队列
@Configuration
public class RabbitFanoutConfig {final static String PENGLEI = "fanout.penglei.net";final static String SOUYUNKU = "fanout.souyunku.com";@Beanpublic Queue queuePenglei() {return new Queue(RabbitFanoutConfig.PENGLEI);}@Beanpublic Queue queueSouyunku() {return new Queue(RabbitFanoutConfig.SOUYUNKU);}/*** 任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有队列上。*/@BeanFanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");}@BeanBinding bindingExchangeQueuePenglei(Queue queuePenglei, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queuePenglei).to(fanoutExchange);}@BeanBinding bindingExchangeQueueSouyunku(Queue queueSouyunku, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queueSouyunku).to(fanoutExchange);}}
监听队列
@Component
@RabbitListener(queues = "fanout.penglei.net")
public class FanoutReceiver1 {@RabbitHandlerpublic void process(String message) {System.out.println("接收者 FanoutReceiver1," + message);}
}
@Component
@RabbitListener(queues = "fanout.souyunku.com")
public class FanoutReceiver2 {@RabbitHandlerpublic void process(String message) {System.out.println("接收者 FanoutReceiver2," + message);}
}
发送消息
package io.ymq.rabbitmq.test;import io.ymq.rabbitmq.run.Startup;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;/*** 描述: 广播模式或者订阅模式队列** @author: yanpenglei* @create: 2017/10/25 1:08*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Startup.class)
public class RabbitFanoutTest {@Autowiredprivate AmqpTemplate rabbitTemplate;@Testpublic void sendPengleiTest() {String context = "此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到";String routeKey = "topic.penglei.net";String exchange = "fanoutExchange";System.out.println("sendPengleiTest : " + context);context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;this.rabbitTemplate.convertAndSend(exchange, routeKey, context);}@Testpublic void sendSouyunkuTest() {String context = "此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到";String routeKey = "topic.souyunku.com";String exchange = "fanoutExchange";context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;System.out.println("sendSouyunkuTest : " + context);this.rabbitTemplate.convertAndSend(exchange, routeKey, context);}
}
按顺序执行:响应
接收者 FanoutReceiver1,context:fanoutExchange,routeKey:topic.penglei.net,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到
接收者 FanoutReceiver2,context:fanoutExchange,routeKey:topic.penglei.net,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到接收者 FanoutReceiver2,context:fanoutExchange,routeKey:topic.souyunku.com,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到
接收者 FanoutReceiver1,context:fanoutExchange,routeKey:topic.souyunku.com,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到
Topic Exchange
任何发送到Topic Exchange
的消息都会被转发到所有关心RouteKey
中指定话题的Queue
上
1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个标题``(RouteKey)
,Exchange
会将消息转发到所有关注主题能与RouteKey
模糊匹配的队列。
2.这种模式需要RouteKey
,也许要提前绑定Exchange
与Queue
。
3.在进行绑定时,要提供一个该队列关心的主题,如#.log.#
表示该队列关心所有涉及log的消息(一个RouteKey为MQ.log.error
的消息会被转发到该队列)。
4.#
表示0个或若干个关键字,*
表示一个关键字。如topic.*
能与topic.warn
匹配,无法与topic.warn.timeout
匹配;但是topic.#
能与上述两者匹配。
5.同样,如果Exchange
没有发现能够与RouteKey
匹配的Queue
,则会抛弃此消息。
配置队列
@Configuration
public class RabbitTopicConfig {final static String MESSAGE = "topic.message";final static String MESSAGES = "topic.message.s";final static String YMQ = "topic.ymq";@Beanpublic Queue queueMessage() {return new Queue(RabbitTopicConfig.MESSAGE);}@Beanpublic Queue queueMessages() {return new Queue(RabbitTopicConfig.MESSAGES);}@Beanpublic Queue queueYmq() {return new Queue(RabbitTopicConfig.YMQ);}/*** 交换机(Exchange) 描述:接收消息并且转发到绑定的队列,交换机不存储消息*/@BeanTopicExchange topicExchange() {return new TopicExchange("topicExchange");}//綁定队列 queueMessages() 到 topicExchange 交换机,路由键只接受完全匹配 topic.message 的队列接受者可以收到消息@BeanBinding bindingExchangeMessage(Queue queueMessage, TopicExchange topicExchange) {return BindingBuilder.bind(queueMessage).to(topicExchange).with("topic.message");}//綁定队列 queueMessages() 到 topicExchange 交换机,路由键只要是以 topic.message 开头的队列接受者可以收到消息@BeanBinding bindingExchangeMessages(Queue queueMessages, TopicExchange topicExchange) {return BindingBuilder.bind(queueMessages).to(topicExchange).with("topic.message.#");}//綁定队列 queueYmq() 到 topicExchange 交换机,路由键只要是以 topic 开头的队列接受者可以收到消息@BeanBinding bindingExchangeYmq(Queue queueYmq, TopicExchange topicExchange) {return BindingBuilder.bind(queueYmq).to(topicExchange).with("topic.#");}}
监听队列
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1 {@RabbitHandlerpublic void process(String message) {System.out.println("接收者 TopicReceiver1," + message);}}
@Component
@RabbitListener(queues = "topic.message.s")
public class TopicReceiver2 {@RabbitHandlerpublic void process(String message) {System.out.println("接收者 TopicReceiver2," + message);}}
@Component
@RabbitListener(queues = "topic.ymq")
public class TopicReceiver3 {@RabbitHandlerpublic void process(String message) {System.out.println("接收者 TopicReceiver3," + message);}}
发送消息
package io.ymq.rabbitmq.test;import io.ymq.rabbitmq.run.Startup;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;/*** 描述: 配置转发消息模式队列** @author: yanpenglei* @create: 2017/10/25 1:20*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Startup.class)
public class RabbitTopicTest {@Autowiredprivate AmqpTemplate rabbitTemplate;@Testpublic void sendMessageTest() {String context = "此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到";String routeKey = "topic.message";String exchange = "topicExchange";context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;System.out.println("sendMessageTest : " + context);this.rabbitTemplate.convertAndSend(exchange, routeKey, context);}@Testpublic void sendMessagesTest() {String context = "此消息在,配置转发消息模式队列下,有 TopicReceiver2 TopicReceiver3 可以收到";String routeKey = "topic.message.s";String exchange = "topicExchange";context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;System.out.println("sendMessagesTest : " + context);this.rabbitTemplate.convertAndSend(exchange, routeKey, context);}@Testpublic void sendYmqTest() {String context = "此消息在,配置转发消息模式队列下,有 TopicReceiver3 可以收到";String routeKey = "topic.ymq";String exchange = "topicExchange";context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;System.out.println("sendYmqTest : " + context);this.rabbitTemplate.convertAndSend(exchange, routeKey, context);}
}
按顺序执行:响应
接收者 TopicReceiver2,context:topicExchange,routeKey:topic.message,context:此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到
接收者 TopicReceiver1,context:topicExchange,routeKey:topic.message,context:此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到
接收者 TopicReceiver3,context:topicExchange,routeKey:topic.message,context:此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到接收者 TopicReceiver3,context:topicExchange,routeKey:topic.message.s,context:此消息在,配置转发消息模式队列下,有 TopicReceiver2 TopicReceiver3 可以收到
接收者 TopicReceiver2,context:topicExchange,routeKey:topic.message.s,context:此消息在,配置转发消息模式队列下,有 TopicReceiver2 TopicReceiver3 可以收到接收者 TopicReceiver3,context:topicExchange,routeKey:topic.ymq,context:此消息在,配置转发消息模式队列下,有 TopicReceiver3 可以收到
代码我已放到 Github ,导入spring-boot-rabbitmq
项目
github https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-rabbitmq
Contact
- 作者:鹏磊
- 出处:http://www.ymq.io/2017/10/26/rabbitmq-spring-boot-example
- Email:admin@souyunku.com
- 版权归作者所有,转载请注明出处
- Wechat:关注公众号,搜云库,专注于开发技术的研究与知识分享
转载于:https://my.oschina.net/yanpenglei/blog/1608327
Spring Boot 中使用 RabbitMQ相关推荐
- Spring Boot中使用RabbitMQ
很久没有写Spring Boot的内容了,正好最近在写Spring Cloud Bus的内容,因为内容会有一些相关性,所以先补一篇关于AMQP的整合. Message Broker与AMQP简介 Me ...
- 新手入门教程-------Spring Boot中集成RabbitMQ
AMQP:是Advanced Message Queuing Protocol的简称,高级消息队列协议,是一个面向消息中间件的开放式标准应用层协议. 定义了以下特性: 消息方向 消息队列 消息路由(包 ...
- RabbitMQ(六)——Spring boot中消费消息的两种方式
前言 上一篇博客中,我们只是简单总结了Spring boot中整合RabbitMQ的操作,针对消息消费的两种方式只是简单给了一个实例,这篇博客,我们进一步总结关于Spring boot消息消费的相关功 ...
- rabbitmq使用_Spring Boot中使用RabbitMQ
Message Broker与AMQP简介 Message Broker是一种消息验证.传输.路由的架构模式,其设计目标主要应用于下面这些场景: 消息路由到一个或多个目的地 消息转化为其他的表现方式 ...
- 再谈Spring Boot中的乱码和编码问题
编码算不上一个大问题,即使你什么都不管,也有很大的可能你不会遇到任何问题,因为大部分框架都有默认的编码配置,有很多是UTF-8,那么遇到中文乱码的机会很低,所以很多人也忽视了. Spring系列产品大 ...
- 【spring boot2】第8篇:spring boot 中的 servlet 容器及如何使用war包部署
嵌入式 servlet 容器 在 spring boot 之前的web开发,我们都是把我们的应用部署到 Tomcat 等servelt容器,这些容器一般都会在我们的应用服务器上安装好环境,但是 spr ...
- Spring Boot 中使用 MongoDB 增删改查
本文快速入门,MongoDB 结合SpringBoot starter-data-mongodb 进行增删改查 1.什么是MongoDB ? MongoDB 是由C++语言编写的,是一个基于分布式文件 ...
- Spring Boot 中使用@Async实现异步调用,加速任务执行!
欢迎关注方志朋的博客,回复"666"获面试宝典 什么是"异步调用"?"异步调用"对应的是"同步调用",同步调用指程序按照 ...
- 徒手解密 Spring Boot 中的 Starter自动化配置黑魔法
我们使用 Spring Boot,基本上都是沉醉在它 Stater 的方便之中.Starter 为我们带来了众多的自动化配置,有了这些自动化配置,我们可以不费吹灰之力就能搭建一个生产级开发环境,有的小 ...
最新文章
- LeetCode实战:排序链表
- java appendchild_详解javascript appendChild()的完整功能
- u盘文件看得见却打不开_win7下u盘文件打不开怎么办 win7下u盘文件打不开解决方法...
- asp.net添加删除表格_你问我答|135编辑器使用之超链接和表格问题
- 通过OData创建C4C Lead时,遇到Account missing的错误消息
- [TJOI2013]拯救小矮人(反悔贪心证明),「ICPC World Finals 2019」Hobson 的火车(基环树,差分)
- 蛋糕是叫胚子还是坯子_教你做巧克力淋面蛋糕,掌握这个配比,好看又好吃,10分钟做一个...
- SharePoint 2013 列表启用搜索
- 解决python-kafka连接kafka时报错kafka.errors.NoBrokersAvailable: NoBrokersAvailable
- 于谦加盟高德地图 推出“哪儿都熟”相声导航
- HDU 2814 斐波那契循环节 欧拉降幂
- 哈工大SCIR Lab | EMNLP 2019 常识信息增强的事件表示学习
- 东华大学计算机学院刘国华,计算机科学与技术学院2016级迎新大会顺利举行
- linux新建虚拟机到图形化界面
- Android游戏开发LoneBall小游戏
- 75道逻辑思维趣题,含答案
- Gos —— 显示器控制
- 数据结构 斐波那契查找法(C语言)
- Echarts 实现动态地图
- 汇编语言学习之基本指令(上)