RabbitMq的六种模式分析详解
AMQP
即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
它可以使对应的客户端(client)与对应的消息中间件(broker)进行交互。消息中间件从发布者(publisher)那里收到消息(发布消息的应用,也称为producer),然后将他们转发给消费者(consumers,处理消息的应用)。由于AMQP是一个网络协议,所以发布者、消费者以及消息中间件可以部署到不同的物理机器上面。
SpringBoot整合RabbitMq
应用依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency>
application.yml
server:port: 8090
spring:rabbitmq:addresses: 192.168.100.10:5672,192.168.100.11:5672username: xiaobaopassword: xiaobao
简单配置简单使用
生产者
@Controller
public class ProducerDemo {@Autowiredprivate AmqpTemplate rabbitTemplate;@RequestMapping("/send")@ResponseBodypublic String send() {String context = "hello==========" + new Date(); //往"queueName"队列里面发送消息(先在mq的控制台创建一个queueName队列)this.rabbitTemplate.convertAndSend("queueName", context);return "success";}
}
消费者
@Component
@RabbitListener(queues = "queueName") //监听queueName这个队列
public class ConsumerDemo {@RabbitHandlerpublic void process(String hello) {System.out.println("Receiver ===================: " + hello);}
}
springboot配置是不是很简单,整个整合都非常简单。
用代码创建一个队列
用代码创建一个队列,而不是在mq管理控制台手动创建。so easy.
@Configuration
public class RabbitMQConfig {@Beanpublic Queue queue() {return new Queue("queueName");}
}
spring在启动时会扫描到Configuration这个注解是一个配置文件的注解。在发送者发送消息时,发现没有这个队列,才会创建这个队列。
高级玩法
时候我们有多个mq,自己组的mq队列啊,交换机啊,虚拟内存之类的,但有时候会用到其他组的mq,他们的配置信息和我们的完全不一样,这样时候直接使用spring自带的集成模式就难于满足我们。但是spring没那么傻,它提供了配置文件,你可以通过注解配置来实现多个mq不同的mq来做设置。
@Configuration
public class RabbitConfig {@Value("${rabbitmq.queue.group}")private String groupQueueName;@Value("${rabbitmq.exchange}")private String exchangeName;@Value("${rabbitmq.addresses}")private String address;@Value("${rabbitmq.port}")private Integer port;@Value("${rabbitmq.username}")private String userName;@Value("${rabbitmq.pw}")private String password;@Value("${rabbitmq.virtual}")private String virtualHost;//创建连接工厂@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setPort(port);connectionFactory.setAddresses(address);connectionFactory.setUsername(userName);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);connectionFactory.setConnectionTimeout(1000);return connectionFactory;}//创建连接工厂的一个ampg管理@Beanpublic AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {return new RabbitAdmin(connectionFactory);}@BeanQueue queue() {return new Queue(groupQueueName, true);}//创建一个topic交换机,使用的是amqpAdmin来管理。@BeanTopicExchange exchange(AmqpAdmin amqpAdmin) {TopicExchange exchange = new TopicExchange(exchangeName);exchange.setAdminsThatShouldDeclare(amqpAdmin);return exchange;}//创建一个模版,绑定的是connectionFactory这个工厂。@BeanRabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {return new RabbitTemplate(connectionFactory);}//创建第二个连接工厂@Beanpublic ConnectionFactory tempConnectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setPort(port);connectionFactory.setAddresses(address);connectionFactory.setUsername(userName);connectionFactory.setPassword(password);connectionFactory.setVirtualHost("temp");connectionFactory.setConnectionTimeout(1000);return connectionFactory;}//第二个管理@Beanpublic AmqpAdmin tempAdmin(ConnectionFactory tempConnectionFactory) {return new RabbitAdmin(tempConnectionFactory);}//创建一个交换机,关联到tempAdmin这个上面@BeanTopicExchange tempExchange(AmqpAdmin tempAdmin) {TopicExchange exchange = new TopicExchange("topic.A");exchange.setAdminsThatShouldDeclare(tempAdmin);return exchange;}//创建第二个template@BeanRabbitTemplate tempRabbitTemplate(ConnectionFactory tempConnectionFactory) {return new RabbitTemplate(tempConnectionFactory);}//设置一个简单监听工厂。@Beanpublic SimpleRabbitListenerContainerFactory tempListenerContainerFactory(ConnectionFactory tempConnectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(tempConnectionFactory);return factory;}}
第二种模式下面的监听者
@RabbitListener(containerFactory = "tempListenerContainerFactory", bindings = {@QueueBinding(value = @Queue(value = "A.queue"),exchange = @Exchange(value = "topic.A", type = ExchangeTypes.TOPIC),key = "user.message.up")}) public void process(Message message) {}}
注意:VirtualHost这个参数。虚拟host,我们创建的所有队列,交换机之类的东西都是在虚拟host下面进行的,在不同的虚拟host下面,他们之间互不通信,我们可以创建2个一摸一样的队列,只需要在不同的虚拟host下面。虚拟host下面就相当于物理隔绝差不多。
上面这个例子在项目中,我们组的队列放到了一个虚拟host下面,其他组的队列放到放到了另外一个虚拟host中,导致了在开发过程中互不通信,所以需要这么配置。
RabbitMq的六种模式分析详解
P:生产者,X:交换机,Q:队列,C:消费者
Hello Word
controller@Controller public class ProducerDemo {@Autowiredprivate AmqpTemplate rabbitTemplate;@RequestMapping("/send")@ResponseBodypublic String send() {String context = "hello==========" + new Date();log.info("Sender : " + context);//生产者,正在往hello这个路由规则中发送,由于没有交换机,所以路由规则就是队列名称this.rabbitTemplate.convertAndSend("hello", context);return "success";}}
消费者
@Component
//监听hello这个队列
@RabbitListener(queues = "hello")
public class ConsumerDemo {@RabbitHandlerpublic void process(String hello) {System.out.println("Receiver ===================: " + hello);}
}
- 工作模式
一个队列有两个消费者
在上面的基础中,添加一个消费者就OK了。
消费者:
@Component
@RabbitListener(queues = "hello")//监听hello这个队列
public class ConsumerDemo1{@RabbitHandlerpublic void process(String hello) {System.out.println("Receiver ===================: " + hello);}
}
//两个消费者
@Component
@RabbitListener(queues = "hello")//监听hello这个队列
public class ConsumerDemo2{@RabbitHandlerpublic void process(String hello) {System.out.println("Receiver ===================: " + hello);}
}
当两个消费者同时监听一个队列时,他们并不能同时消费一条消息,而是随机消费消息。1,2,3,4,5,6消息来了,consumer1消费了1,3,5;consumer2消费了2,4,6。这个数据是随机的哦,别理解为奇偶数。可以自己测试一下。
一个队列中一条消息,只能被一个消费者消费
- 订阅与发布模式
生产者将消息不是直接发送到队列,而是发送到X交换机,然后由交换机发送给两个队列,两个消费者各自监听一个队列,来消费消息。
1.定义一个订阅模式的交换机:FanoutExchange交换机。
2.创建2个队列helloA,helloB,然后将这两个队列绑定到交换机上面。
@Configuration
public class RabbitMQConfig {@Beanpublic Queue queueA() {return new Queue("helloA", true);}@Beanpublic Queue queueB() {return new Queue("helloB", true);}//创建一个fanoutExchange交换机@BeanFanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");}//将queueA队列绑定到fanoutExchange交换机上面@BeanBinding bindingExchangeMessageFanoutA(Queue queueA, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queueA).to(fanoutExchange);}//将queueB队列绑定到fanoutExchange交换机上面@BeanBinding bindingExchangeMessageFanoutB(Queue queueB, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queueB).to(fanoutExchange);}}
注意一个细节哦。bindingExchangeMessageFanoutA这种参数重的queueA与创建队列的方法queueA()名字要相同哦。这样才知道queueA绑定了该交换机哦。交换机的名称也同样。fanoutExchange参数的名字和fanoutExchange()名字要一样哦。
生产者:this.rabbitTemplate.convertAndSend(“fanoutExchange”,”“, context);
@Component
@RabbitListener(queues = "queueA")//监听queueA这个队列
public class ConsumerDemo1{@RabbitHandlerpublic void process(String hello) {System.out.println("Receiver ===================: " + hello);}
}//两个消费者
@Component
@RabbitListener(queues = "queueB")//监听queueB这个队列
public class ConsumerDemo2{@RabbitHandlerpublic void process(String hello) {System.out.println("Receiver ===================: " + hello);}
}
现在生产者发送了一条消息,会发现consumer1,2都会收到。之前不是说过一个队列里面的一条消息,只能被一个消费者消费吗?怎么现在一条消息被两个消费者消费了。要知道这里对于生产者来说是只生产了一条消息,但是它发送给了交换机,交换机会根据绑定的队列来发送。现在绑定了queueA,queueB队列,所以两个队列里面都有消息了。而消费者关注的也是两个队列,就看到了一条消息被两个消费者消费的情况了。
路由模式
@Configurationpublic class RabbitMQConfig {public static final String DIRECT_EXCHANGE = "directExchange";public static final String QUEUE_DIRECT_A = "direct.A";public static final String QUEUE_DIRECT_B = "direct.B";//创建一个direct交换机@BeanDirectExchange directExchange() {return new DirectExchange(DIRECT_EXCHANGE);}@BeanQueue queueDirectNameA() {return new Queue(QUEUE_DIRECT_A);}//创建队列@BeanQueue queueDirectNameB() {return new Queue(QUEUE_DIRECT_B);}//将direct.A队列绑定到directExchange交换机中,使用direct.a.key作为路由规则@BeanBinding bindingExchangeMessageDirectA(Queue queueDirectNameA, DirectExchange directExchange) {return BindingBuilder.bind(queueDirectNameA).to(directExchange).with("direct.a.key");}@BeanBinding bindingExchangeMessageDirectB(Queue queueDirectNameB, DirectExchange directExchange) {return BindingBuilder.bind(queueDirectNameB).to(directExchange).with("direct.b.key");}}
消费者一样,只需要监听队列就OK了。
@Component
public class ConsumerDemo {@RabbitListener(queues = "topic.A")@RabbitHandlerpublic void processtopicA(String hello) {System.out.println("Receiver Exchanges topic.A ===================: " + hello);}@RabbitListener(queues = "topic.B")@RabbitHandlerpublic void processtopicB(String hello) {System.out.println("Receiver Exchanges topic.B ===================: " + hello);}
}
//往directExchange交换机中发送消息,使用direct.a.key作为路由规则。
生产者:rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, “direct.a.key”, context);
topic.A,topic.B两个队列都绑定了交换机directExchange,但他们的路由规则不同,a队列用了direct.a.key,b队列用了direct.b.key,这种情况下,生产者使用direct.a.key作为路由规则,就只有a队列能收到消息,b队列则收不到消息。
//如果a,b队列都关联了direct.a.key路由规则,则上面的生产者发送消息时,a,b队列都能收到消息,这样就有类似fanout交换机的功能了。
@BeanBinding bindingExchangeMessageDirectA(Queue queueDirectNameA, DirectExchange directExchange) {return BindingBuilder.bind(queueDirectNameA).to(directExchange).with("direct.a.key");}@BeanBinding bindingExchangeMessageDirectB(Queue queueDirectNameB, DirectExchange directExchange) {return BindingBuilder.bind(queueDirectNameB).to(directExchange).with("direct.a.key");}
topic主题模式
public static final String TOPIC_EXCHANGE = "topicExchange";public static final String QUEUE_TOPIC_KEY = "topic.#";public static final String QUEUE_TOPIC_KEY_B = "topic.b.key";public static final String QUEUE_TOPIC_A = "topic.A";public static final String QUEUE_TOPIC_B = "topic.B";//创建一个topic交换机@BeanTopicExchange topicExchange() {return new TopicExchange(TOPIC_EXCHANGE);}@BeanQueue queueTopicNameA() {return new Queue(QUEUE_TOPIC_A);}@BeanQueue queueTopicNameB() {return new Queue(QUEUE_TOPIC_B);}
//队列topic.A绑定交换机并且关联了topic.#正则路由规则。就是说只要topic.开头的,topic.A队列都将收到消息
@BeanBinding bindingExchangeMessageTopicA(Queue queueTopicNameA, TopicExchange topicExchange) {return BindingBuilder.bind(queueTopicNameA).to(topicExchange).with(QUEUE_TOPIC_KEY);}
//队列topic.B绑定交换机并且关联了topic.b.key正则路由规则。就是说必须是topic.b.key,topic.B队列才能收到消息,和directExchange类型一样了。
@BeanBinding bindingExchangeMessageTopicB(Queue queueTopicNameB, TopicExchange topicExchange) {return BindingBuilder.bind(queueTopicNameB).to(topicExchange).with(QUEUE_TOPIC_KEY_B);}
生产者
@RequestMapping("/topic/send")@ResponseBodypublic String sendTopicExchange() {String context = "Exchange==topic-->b====== " + new Date();log.info("Sender : " + context);this.rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, "topic.b.key", context);return "success";}
这里发送消息时,往topicExchange这个交换机中发送,并且路由规则为topic.b.key。由于b队列绑定了交换机和路由规则就是它,所以队列b能收到消息。
但是由于A队列的过滤规则为topic.#,就是说只要topic开头的就的路由规则,交换机就会往这个队列里面发送消息。所以a队列也能收到消息,topic.b.key是topic开头的。
对于a队列来说,路由规则为topic.adsf,topic.b.key,topic.a等等,a队列都将收到消息,因为它的路由规则就是topic开头就可以。
消费者:监听队列就OK了,其他不用关心。
- Rpc模式
小小总结
订阅模式,路由模式,主题模式,他们三种都非常类似。而且主题模式可以随时变成两外两种模式。
在主题模式下:路由规则不为正则表达式的时候,他就和路由模式一样。当路由规则不为表达式,且路由规则一样时,就变成了订阅模式。是不是很厉害的样子。
在路由模式下:路由规则一样时,就变成了订阅模式。
简单总结六种模式
简单模式:生产者,一个消费者,一个队列 工作模式:生产者,多个消费者,一个队列
订阅与发布模式(fanout):生产者,一个交换机(fanoutExchange),没有路由规则,多个消费者,多个队列
路由模式(direct):生产者,一个交换机(directExchange),路由规则,多个消费者,多个队列
主题模式(topic):生产者,一个交换机(topicExchange),模糊匹配路由规则,多个消费者,多个队列
RPC模式:生产者,多个消费者,路由规则,多个队列 总结 一个队列,一条消息只会被一个消费者消费(有多个消费者的情况也是一样的)。
订阅模式,路由模式,主题模式,他们的相同点就是都使用了交换机,只不过在发送消息给队列时,添加了不同的路由规则。订阅模式没有路由规则,路由模式为完全匹配规则,主题模式有正则表达式,完全匹配规则。
在订阅模式中可以看到一条消息被多个消费者消费了,不违背第一条总结,因为一条消息被发送到了多个队列中去了。
在交换机模式下:队列和路由规则有很大关系 在有交换机的模式下:3,4,5模式下,生产者只用关心交换机与路由规则即可,无需关心队列
消费者不管在什么模式下:永远不用关心交换机和路由规则,消费者永远只关心队列,消费者直接和队列交互
中文版文档【简略】
英文版文档【详细】
RabbitMq的六种模式分析详解相关推荐
- RabbitMQ 相关概念和方法详解
名词解释 ConnectionFactory: 与 RabbitMQ 服务器连接的管理器. Connection: 与 RabbitMQ 服务器的连接. Channel: 与 Exchange 的连接 ...
- 谷粒商城RabbitMQ锁库存逻辑详解--新理解(长文警告)
前言 不废话,上来就说,代码我会放挺多,写过这个项目的自然能懂,如果真的像理解的请认真看哦 分析 /*出现的问题:扣减库存成功了,但是由于网络原因超时,出现异常,导致订单事务回滚,库存事务不回滚(解决 ...
- UML类图-六种关系详解
UML基本介绍 UML--Unified modeling language UML(统一建模语言),是一种用于软件系统分析和设计的语言工具,它用于帮助软件开发人员进行思考和记录思路的结果 UML本身 ...
- 点击每个兄弟节点获取对应节点下标的六种方案详解
点击每个兄弟节点获取对应节点下标的五种方案详解 一.前言 二.示例 三.解决方案 四.总结 一.前言 在 DOM 节点中,或者在循环引用中,如何点击每个兄弟节点获取对应节点下标,比如 ul 下有 3个 ...
- Java中多线程的六种状态详解
在我们的Thread类中,我们可以看到多线程有六种状态. NEW:初始状态 RUNNABLE:运行状态 BLOCKED:阻塞状态 WAITING:等待状态 TIMED_WAITING:超时等待状态 T ...
- RabbitMQ集群故障恢复详解
RabbitMQ的mirror queue(镜像队列)机制是最简单的队列HA(High Available高可用)方案, 它通过在cluster的基础上增加ha-mode.ha-param等polic ...
- RabbitMQ的三大交换器详解
pom文件都是相同的 <?xml version="1.0" encoding="UTF-8"?> <project xmlns=" ...
- rabbitMQ消息队列 consume详解
实现消费者的方法就是 $channel->basic_consume("TestQueue", "", false, false, false, fals ...
- 六轴机器人直角坐标系建立_工业机器人六种坐标系详解(图)
召唤大国工匠 助力强国建设 坐标系是为确定机器人的位置和姿态而在机器人或其他空间上设定的位姿指标系统. 大地坐标系(World Coordinate System) 基坐标系(Base Coor ...
最新文章
- 正多边形中心到各边的向量合
- linux NFS配置:NFS相关概念及其配置与查看
- LeetCode1262 可被三整除的最大和(动态规划)
- [crypto]-01-对称加解密AES原理概念详解
- C++之多重继承引发的重复调用
- webapi 找到了与请求匹配的多个操作(ajax报500,4的错误)
- C语言字符串的输入和输出
- CSS 盒子模型 第三节
- GICv3软件overview手册之配置GIC
- python 评论分析_python分析评论内容是积极的还是消极的(应用朴素做分词处理及情感识别)...
- php中关于qq第三方登录
- 【小5聊】sql server基础之查询经纬度范围,10公里范围的经纬度标注点
- Ubuntu18中安装Nvidia驱动和CUDA和cuDNN库加速
- Python绘制地理图--Cartopy基础
- Excel中的LEN和LENB,VBA中的Len和LenB
- Java 字母飘落小游戏
- 数字签名标准(DSS)
- 计算机键盘fn,USB键盘Fn功能键调节方法
- 初学graphiql 查询操作
- python基础-PyYaml操作yaml文件
热门文章
- ENVI遥感数字图像处理方法(一)
- LeetCode--分发饼干(贪心)
- zigbee3.0@学习笔记@TI STACK@串口发送
- 数学计算机小论文范文,小学数学小论文范文
- 关于SetCapture() 和 ReleaseCapture()的使用方法
- 教你用笔记本破解无线路由器密码
- 菊风云 | 在线音乐教育市场前路漫漫,看音视频技术如何为其“保驾护航”!
- ThinkPad笔记本切换F功能键方法
- 第十二届蓝桥杯省赛Python--暴力破解
- Python中print用法里面的%s,%d,%f,%2s,%7s,%-7s,%.2s,%.7s