2019独角兽企业重金招聘Python工程师标准>>>

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

AMQP,即Advanced message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

常用概念

通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。

准备

环境安装

任选其一

CentOs7.3 搭建 RabbitMQ 3.6 单机服务与使用

http://www.ymq.io/2017/08/16/rabbit-install

CentOs7.3 搭建 RabbitMQ 3.6 Cluster 集群服务与使用

http://www.ymq.io/2017/08/17/rabbit-install-cluster

Github 代码

代码我已放到 Github ,导入spring-boot-rabbitmq 项目

github https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-rabbitmq

添加依赖

在项目中添加 spring-boot-starter-amqp 依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

参数配置

spring.application.name=ymq-rabbitmq-spring-bootspring.rabbitmq.host=10.4.98.15
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

交换机(Exchange)

1.Direct Exchange 根据route key 直接找到队列
2.Topic Exchange 根据route key 匹配队列
3.Topic Exchange 不处理route key 全网发送,所有绑定的队列都发送

Direct Exchange

Direct ExchangeRabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。

任何发送到Direct Exchange的消息都会被转发到RouteKey中指定的Queue

1.一般情况可以使用rabbitMQ自带的Exchange:""(该Exchange的名字为空字符串,下文称其为default Exchange)。
2.这种模式下不需要将Exchange进行任何绑定(binding)操作
3.消息传递时需要一个RouteKey,可以简单的理解为要发送到的队列名字。
4.如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。

配置队列

@Configuration
public class RabbitDirectConfig {@Beanpublic Queue helloQueue() {return new Queue("hello");}@Beanpublic Queue directQueue() {return new Queue("direct");}//-------------------配置默认的交换机模式,可以不需要配置以下-----------------------------------@BeanDirectExchange directExchange() {return new DirectExchange("directExchange");}//绑定一个key "direct",当消息匹配到就会放到这个队列中@BeanBinding bindingExchangeDirectQueue(Queue directQueue, DirectExchange directExchange) {return BindingBuilder.bind(directQueue).to(directExchange).with("direct");}// 推荐使用 helloQueue() 方法写法,这种方式在 Direct Exchange 模式 多此一举,没必要这样写//---------------------------------------------------------------------------------------------
}

监听队列

@Component
@RabbitListener(queues = "hello")
public class helloReceiver {@RabbitHandlerpublic void process(String message) {System.out.println("接收者 helloReceiver," + message);}
}
@Component
@RabbitListener(queues = "direct")
public class DirectReceiver {@RabbitHandlerpublic void process(String message) {System.out.println("接收者 DirectReceiver," + message);}
}

发送消息

package io.ymq.rabbitmq.test;import io.ymq.rabbitmq.run.Startup;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;/*** 描述: 默认的交换机模式** @author: yanpenglei* @create: 2017/10/25 1:03*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Startup.class)
public class RabbitDirectTest {@Autowiredprivate AmqpTemplate rabbitTemplate;@Testpublic void sendHelloTest() {String context = "此消息在,默认的交换机模式队列下,有 helloReceiver 可以收到";String routeKey = "hello";context = "routeKey:" + routeKey + ",context:" + context;System.out.println("sendHelloTest : " + context);this.rabbitTemplate.convertAndSend(routeKey, context);}@Testpublic void sendDirectTest() {String context = "此消息在,默认的交换机模式队列下,有 DirectReceiver 可以收到";String routeKey = "direct";String exchange = "directExchange";context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;System.out.println("sendDirectTest : " + context);// 推荐使用 sendHello() 方法写法,这种方式在 Direct Exchange 多此一举,没必要这样写this.rabbitTemplate.convertAndSend(exchange, routeKey, context);}
}

按顺序执行:响应

接收者 helloReceiver,routeKey:hello,context:此消息在,默认的交换机模式队列下,有 helloReceiver 可以收到接收者 DirectReceiver,context:directExchange,routeKey:direct,context:此消息在,默认的交换机模式队列下,有 DirectReceiver 可以收到

Fanout Exchange

任何发送到Fanout Exchange 的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上

1.可以理解为路由表的模式
2.这种模式不需要 RouteKey
3.这种模式需要提前将ExchangeQueue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。
4.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。

配置队列

@Configuration
public class RabbitFanoutConfig {final static String PENGLEI = "fanout.penglei.net";final static String SOUYUNKU = "fanout.souyunku.com";@Beanpublic Queue queuePenglei() {return new Queue(RabbitFanoutConfig.PENGLEI);}@Beanpublic Queue queueSouyunku() {return new Queue(RabbitFanoutConfig.SOUYUNKU);}/*** 任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有队列上。*/@BeanFanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");}@BeanBinding bindingExchangeQueuePenglei(Queue queuePenglei, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queuePenglei).to(fanoutExchange);}@BeanBinding bindingExchangeQueueSouyunku(Queue queueSouyunku, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queueSouyunku).to(fanoutExchange);}}

监听队列

@Component
@RabbitListener(queues = "fanout.penglei.net")
public class FanoutReceiver1 {@RabbitHandlerpublic void process(String message) {System.out.println("接收者 FanoutReceiver1," + message);}
}
@Component
@RabbitListener(queues = "fanout.souyunku.com")
public class FanoutReceiver2 {@RabbitHandlerpublic void process(String message) {System.out.println("接收者 FanoutReceiver2," + message);}
}

发送消息

package io.ymq.rabbitmq.test;import io.ymq.rabbitmq.run.Startup;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;/*** 描述: 广播模式或者订阅模式队列** @author: yanpenglei* @create: 2017/10/25 1:08*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Startup.class)
public class RabbitFanoutTest {@Autowiredprivate AmqpTemplate rabbitTemplate;@Testpublic void sendPengleiTest() {String context = "此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到";String routeKey = "topic.penglei.net";String exchange = "fanoutExchange";System.out.println("sendPengleiTest : " + context);context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;this.rabbitTemplate.convertAndSend(exchange, routeKey, context);}@Testpublic void sendSouyunkuTest() {String context = "此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到";String routeKey = "topic.souyunku.com";String exchange = "fanoutExchange";context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;System.out.println("sendSouyunkuTest : " + context);this.rabbitTemplate.convertAndSend(exchange, routeKey, context);}
}

按顺序执行:响应

接收者 FanoutReceiver1,context:fanoutExchange,routeKey:topic.penglei.net,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到
接收者 FanoutReceiver2,context:fanoutExchange,routeKey:topic.penglei.net,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到接收者 FanoutReceiver2,context:fanoutExchange,routeKey:topic.souyunku.com,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到
接收者 FanoutReceiver1,context:fanoutExchange,routeKey:topic.souyunku.com,context:此消息在,广播模式或者订阅模式队列下,有 FanoutReceiver1 FanoutReceiver2 可以收到

Topic Exchange

任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue

1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个标题``(RouteKey)Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。
2.这种模式需要RouteKey,也许要提前绑定ExchangeQueue
3.在进行绑定时,要提供一个该队列关心的主题,如#.log.#表示该队列关心所有涉及log的消息(一个RouteKey为MQ.log.error的消息会被转发到该队列)。
4.#表示0个或若干个关键字,*表示一个关键字。如topic.*能与topic.warn匹配,无法与topic.warn.timeout匹配;但是topic.#能与上述两者匹配。
5.同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。

配置队列

@Configuration
public class RabbitTopicConfig {final static String MESSAGE = "topic.message";final static String MESSAGES = "topic.message.s";final static String YMQ = "topic.ymq";@Beanpublic Queue queueMessage() {return new Queue(RabbitTopicConfig.MESSAGE);}@Beanpublic Queue queueMessages() {return new Queue(RabbitTopicConfig.MESSAGES);}@Beanpublic Queue queueYmq() {return new Queue(RabbitTopicConfig.YMQ);}/*** 交换机(Exchange) 描述:接收消息并且转发到绑定的队列,交换机不存储消息*/@BeanTopicExchange topicExchange() {return new TopicExchange("topicExchange");}//綁定队列 queueMessages() 到 topicExchange 交换机,路由键只接受完全匹配 topic.message 的队列接受者可以收到消息@BeanBinding bindingExchangeMessage(Queue queueMessage, TopicExchange topicExchange) {return BindingBuilder.bind(queueMessage).to(topicExchange).with("topic.message");}//綁定队列 queueMessages() 到 topicExchange 交换机,路由键只要是以 topic.message 开头的队列接受者可以收到消息@BeanBinding bindingExchangeMessages(Queue queueMessages, TopicExchange topicExchange) {return BindingBuilder.bind(queueMessages).to(topicExchange).with("topic.message.#");}//綁定队列 queueYmq() 到 topicExchange 交换机,路由键只要是以 topic 开头的队列接受者可以收到消息@BeanBinding bindingExchangeYmq(Queue queueYmq, TopicExchange topicExchange) {return BindingBuilder.bind(queueYmq).to(topicExchange).with("topic.#");}}

监听队列

@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1 {@RabbitHandlerpublic void process(String message) {System.out.println("接收者 TopicReceiver1," + message);}}
@Component
@RabbitListener(queues = "topic.message.s")
public class TopicReceiver2 {@RabbitHandlerpublic void process(String message) {System.out.println("接收者 TopicReceiver2," + message);}}
@Component
@RabbitListener(queues = "topic.ymq")
public class TopicReceiver3 {@RabbitHandlerpublic void process(String message) {System.out.println("接收者 TopicReceiver3," + message);}}

发送消息

package io.ymq.rabbitmq.test;import io.ymq.rabbitmq.run.Startup;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;/*** 描述: 配置转发消息模式队列** @author: yanpenglei* @create: 2017/10/25 1:20*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Startup.class)
public class RabbitTopicTest {@Autowiredprivate AmqpTemplate rabbitTemplate;@Testpublic void sendMessageTest() {String context = "此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到";String routeKey = "topic.message";String exchange = "topicExchange";context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;System.out.println("sendMessageTest : " + context);this.rabbitTemplate.convertAndSend(exchange, routeKey, context);}@Testpublic void sendMessagesTest() {String context = "此消息在,配置转发消息模式队列下,有  TopicReceiver2 TopicReceiver3 可以收到";String routeKey = "topic.message.s";String exchange = "topicExchange";context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;System.out.println("sendMessagesTest : " + context);this.rabbitTemplate.convertAndSend(exchange, routeKey, context);}@Testpublic void sendYmqTest() {String context = "此消息在,配置转发消息模式队列下,有 TopicReceiver3 可以收到";String routeKey = "topic.ymq";String exchange = "topicExchange";context = "context:" + exchange + ",routeKey:" + routeKey + ",context:" + context;System.out.println("sendYmqTest : " + context);this.rabbitTemplate.convertAndSend(exchange, routeKey, context);}
}

按顺序执行:响应

接收者 TopicReceiver2,context:topicExchange,routeKey:topic.message,context:此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到
接收者 TopicReceiver1,context:topicExchange,routeKey:topic.message,context:此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到
接收者 TopicReceiver3,context:topicExchange,routeKey:topic.message,context:此消息在,配置转发消息模式队列下, 有 TopicReceiver1 TopicReceiver2 TopicReceiver3 可以收到接收者 TopicReceiver3,context:topicExchange,routeKey:topic.message.s,context:此消息在,配置转发消息模式队列下,有  TopicReceiver2 TopicReceiver3 可以收到
接收者 TopicReceiver2,context:topicExchange,routeKey:topic.message.s,context:此消息在,配置转发消息模式队列下,有  TopicReceiver2 TopicReceiver3 可以收到接收者 TopicReceiver3,context:topicExchange,routeKey:topic.ymq,context:此消息在,配置转发消息模式队列下,有 TopicReceiver3 可以收到

代码我已放到 Github ,导入spring-boot-rabbitmq 项目

github https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-rabbitmq

Contact

  • 作者:鹏磊
  • 出处:http://www.ymq.io/2017/10/26/rabbitmq-spring-boot-example
  • Email:admin@souyunku.com
  • 版权归作者所有,转载请注明出处
  • Wechat:关注公众号,搜云库,专注于开发技术的研究与知识分享

转载于:https://my.oschina.net/yanpenglei/blog/1608327

Spring Boot 中使用 RabbitMQ相关推荐

  1. Spring Boot中使用RabbitMQ

    很久没有写Spring Boot的内容了,正好最近在写Spring Cloud Bus的内容,因为内容会有一些相关性,所以先补一篇关于AMQP的整合. Message Broker与AMQP简介 Me ...

  2. 新手入门教程-------Spring Boot中集成RabbitMQ

    AMQP:是Advanced Message Queuing Protocol的简称,高级消息队列协议,是一个面向消息中间件的开放式标准应用层协议. 定义了以下特性: 消息方向 消息队列 消息路由(包 ...

  3. RabbitMQ(六)——Spring boot中消费消息的两种方式

    前言 上一篇博客中,我们只是简单总结了Spring boot中整合RabbitMQ的操作,针对消息消费的两种方式只是简单给了一个实例,这篇博客,我们进一步总结关于Spring boot消息消费的相关功 ...

  4. rabbitmq使用_Spring Boot中使用RabbitMQ

    Message Broker与AMQP简介 Message Broker是一种消息验证.传输.路由的架构模式,其设计目标主要应用于下面这些场景: 消息路由到一个或多个目的地 消息转化为其他的表现方式 ...

  5. 再谈Spring Boot中的乱码和编码问题

    编码算不上一个大问题,即使你什么都不管,也有很大的可能你不会遇到任何问题,因为大部分框架都有默认的编码配置,有很多是UTF-8,那么遇到中文乱码的机会很低,所以很多人也忽视了. Spring系列产品大 ...

  6. 【spring boot2】第8篇:spring boot 中的 servlet 容器及如何使用war包部署

    嵌入式 servlet 容器 在 spring boot 之前的web开发,我们都是把我们的应用部署到 Tomcat 等servelt容器,这些容器一般都会在我们的应用服务器上安装好环境,但是 spr ...

  7. Spring Boot 中使用 MongoDB 增删改查

    本文快速入门,MongoDB 结合SpringBoot starter-data-mongodb 进行增删改查 1.什么是MongoDB ? MongoDB 是由C++语言编写的,是一个基于分布式文件 ...

  8. Spring Boot 中使用@Async实现异步调用,加速任务执行!

    欢迎关注方志朋的博客,回复"666"获面试宝典 什么是"异步调用"?"异步调用"对应的是"同步调用",同步调用指程序按照 ...

  9. 徒手解密 Spring Boot 中的 Starter自动化配置黑魔法

    我们使用 Spring Boot,基本上都是沉醉在它 Stater 的方便之中.Starter 为我们带来了众多的自动化配置,有了这些自动化配置,我们可以不费吹灰之力就能搭建一个生产级开发环境,有的小 ...

最新文章

  1. LeetCode实战:排序链表
  2. java appendchild_详解javascript appendChild()的完整功能
  3. u盘文件看得见却打不开_win7下u盘文件打不开怎么办 win7下u盘文件打不开解决方法...
  4. asp.net添加删除表格_你问我答|135编辑器使用之超链接和表格问题
  5. 通过OData创建C4C Lead时,遇到Account missing的错误消息
  6. [TJOI2013]拯救小矮人(反悔贪心证明),「ICPC World Finals 2019」Hobson 的火车(基环树,差分)
  7. 蛋糕是叫胚子还是坯子_教你做巧克力淋面蛋糕,掌握这个配比,好看又好吃,10分钟做一个...
  8. SharePoint 2013 列表启用搜索
  9. 解决python-kafka连接kafka时报错kafka.errors.NoBrokersAvailable: NoBrokersAvailable
  10. 于谦加盟高德地图 推出“哪儿都熟”相声导航
  11. HDU 2814 斐波那契循环节 欧拉降幂
  12. 哈工大SCIR Lab | EMNLP 2019 常识信息增强的事件表示学习
  13. 东华大学计算机学院刘国华,计算机科学与技术学院2016级迎新大会顺利举行
  14. linux新建虚拟机到图形化界面
  15. Android游戏开发LoneBall小游戏
  16. 75道逻辑思维趣题,含答案
  17. Gos —— 显示器控制
  18. 数据结构 斐波那契查找法(C语言)
  19. Echarts 实现动态地图
  20. 汇编语言学习之基本指令(上)

热门文章

  1. WDK开发环境构建驱动程序入门、Windows驱动程序的Check Build和Free Build
  2. Python pip工具初步学习
  3. 认识VC++类向导的使用
  4. VC++文件编程操作实例
  5. 地图瓦片相关学习总结
  6. Linux 内核安全模块学习总结
  7. MacOS常用快捷键
  8. kubernetes实战篇之通过api-server访问dashboard
  9. php--理解PHP的依赖注入和laravel的服务容器
  10. angular5 httpclient的示例实战