RibbatMQ(详解)
RibbatMQ
文章目录
- RibbatMQ
- 同步通讯和异步通讯:
- 同步通讯:
- 异步通讯:
- 技术对比:
- RibbatMQ入门:
- 安装RibbatMQ:
- RibbatMQ的基本结构:
- RabbitMQ消息模型:
- 简单队列模式的模型图:
- SpringAMQP:
- AMQP:
- SpringAMQP提供了三个功能:
- Basic Queue 简单队列模型:
- 消息发送:
- 消息接收:
- WorkQueue工作队列模型:
- 能者多劳:
- 消息发送:
- 消息接收:
- 发布和订阅:
- Fanout:广播
- 声明队列和交换机:
- 消息发送:
- 消息接收:
- Direct:路由
- 声明队列和交换机:
- 消息发送:
- 消息接收:
- Topic:话题
- 消息发送:
- 消息接收:
- 消息转换器:
- 配置JSON转换器:
同步通讯和异步通讯:
同步通讯:
- 同步通讯需要实时回复
- 优点:
- 时效性强,可以立即得到结果
- 缺点:
- 耦合性高
- 性能和吞吐能力下降
- 有额外的资源消耗
- 有级联失败问题
异步通讯:
- 事件发布者(publisher)发布事件,事件订阅者(Consumer)接收消息,无需及时回复。
- 为了解决事假发布者和事件订阅者之间的耦合,两方不是直接通信,而是引入一个中间人(Broker),发布者将消息发布到broker,不需要等待回复,订阅者从broker订阅事件,不需要知道是谁发的
- 优点:
- 吞吐量提升:无需等待订阅者处理完成,响应更快速
- 故障隔离:服务没有直接调用,不存在级联失败问题
- 调用间没有阻塞,不会造成无效的资源占用
- 耦合度极低,每个服务都可以灵活插拔,可替换
- 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
- 缺点:
- 架构复杂了,业务没有明显的流程线,不好管理
- 需要依赖于Broker的可靠、安全、性能
技术对比:
MQ,中文就是消息队列(MessageQueue),字面意思就是存放消息的队列,也就是事件驱动架构中的broker
比较常见的MQ实现:
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
几种常见MQ的对比:
RabbitMQ ActiveMQ RocketMQ Kafka 公司/社区 Rabbit Apache 阿里 Apache 开发语言 Erlang Java Java Scala&Java 协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议 可用性 高 一般 高 高 单机吞吐量 一般 差 高 非常高 消息延迟 微秒级 毫秒级 毫秒级 毫秒以内 消息可靠性 高 一般 高 一般 追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
RibbatMQ入门:
安装RibbatMQ:
单机部署
下载镜像:
docker pull rabbitmq:3.8-management
安装MQ:
docker run \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=123456 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3.8-management
RibbatMQ的基本结构:
- RibbatMQ中的一些角色:
- publisher:生产者
- consumer:消费者
- exchange个:交换机,负责消息路由
- queue:队列,存储消息
- virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
RabbitMQ消息模型:
- RabbitMQ官方提供了5个不同的Demo示例,对应了不同的消息模型
简单队列模式的模型图:
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存消息
- consumer:订阅队列,处理队列中的消息
实现publisher
建立连接
//建立连接ConnectionFactory factory = new ConnectionFactory();//设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.150.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");Connection connection = factory.newConnection();
创建Channel
//创建通道ChannelChannel channel = connection.createChannel();
声明队列
//创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);
发送消息
//发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");
关闭连接和channel
//关闭通道和连接channel.close();connection.close();
实现consumer
建立连接
// 建立连接ConnectionFactory factory = new ConnectionFactory();//设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.150.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");Connection connection = factory.newConnection();
创建Channel
//创建通道ChannelChannel channel = connection.createChannel();
声明队列
//创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);
订阅消息
//订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {//处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");
总结:
基本消息队列的消息发送流程:
建立connection
创建channel
利用channel声明队列
利用channel向队列发送消息
基本消息队列的消息接收流程:
建立connection
创建channel
利用channel声明队列
定义consumer的消费行为handleDelivery()
利用channel将消费者与队列绑定
SpringAMQP:
仅仅一个简单队列模式,写着就如此复杂,所以我们可以使用SpringAMOP来进行
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
AMQP:
- Advanced Message Queuing Protocol ,是用于在应用程序之间传递业务消息的开放标准,该协议与语言无关,更符合微服务中独立性的要求
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
Basic Queue 简单队列模型:
- 引入依赖:
<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
消息发送:
- 在配置文件中添加配置:
spring:rabbitmq:host: 192.168.150.101 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: root # 用户名password: 123456 # 密码
- 利用RabbitTemplate发送消息
@Autowired
private RabbitTemplate rabbitTemplate;@Test
public void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);
}
消息接收:
- 在配置文件中配置:
spring:rabbitmq:host: 192.168.150.101 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: root # 用户名password: 123456 # 密码
- 新建一个类SpringRabbitListener
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("spring 消费者接收到消息:【" + msg + "】");}
}
WorkQueue工作队列模型:
- 简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。
- 同一条消息只会被一个消费者处理,阅后即焚。
- 当发送消息的速度远远大于消费速度,长此以往会有大量的消息无法解决,此时就可以使用工作队列
能者多劳:
- 如果没有配置如下代码,两个消费者就会预取,均分两个消息,如果消费者1能力不行,一秒只能处理5条,消费者2一秒可以处理50条,就会出现消费者2早早处理完,在哪儿等着消息来,消费者1还有大把的消息没有处理,浪费资源,时间长了可能会出现栈溢出问题。
spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
消息发送:
- 没有新的API,和简单队列模型一样
消息接收:
- 没有新的API,只是多创建了两个消费者来绑定同一个队列。
发布和订阅:
可以看到,在订阅模型中,多了一个exchange,过程也有略微变化。
- Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
- Consumer:消费者,与以前一样,订阅队列,没有变化
- Queue:消息队列也与以前一样,接收消息、缓存消息。
注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的作用是什么?
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExchange的会将消息路由到每个绑定的队列
声明队列、交换机、绑定关系的Bean是什么?
- Queue
- FanoutExchange
- Binding
Fanout:广播
在广播模式下,消息发送流程是这样的:
- 可以有多个队列
- 每个队列都要绑定到Exchange(交换机)
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
- 交换机把消息发送给绑定过的所有队列
- 订阅队列的消费者都能拿到消息
在广播模式下,交换机会将消息转发给所有绑定的队列
声明队列和交换机:
在消费者创建一个config类,用来声明交换机和队列
@Configuration public class FanoutConfig {/*** 声明交换机* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}/*** 第1个队列*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);} }
消息发送:
// 队列名称String exchangeName = "itcast.fanout";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);
消息接收:
@RabbitListener(queues = "fanout.queue1") public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到Fanout消息:【" + msg + "】"); } @RabbitListener(queues = "fanout.queue2") public void listenFanoutQueue2(String msg) {System.out.println("消费者2接收到Fanout消息:【" + msg + "】"); }
Direct:路由
- Fanout交换机将消息路由给每一个与之绑定的队列
- Direct交换机根据RoutingKey判断路由给哪个队列
- 如果多个队列具有相同的RoutingKey,则与Fanout功能类似
声明队列和交换机:
- 用配置类来声明略显麻烦,spring还提供了注解方式来声明,在消费者中的SpringRabbitListener中添加消费者时,同事用注解来声明队列和交换机
//消费者1
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
//消费者2
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
消息发送:
- 与简单队列模型发送消息差不多,代码如下:
// 交换机名称String exchangeName = "itcast.direct";// 消息String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
//在发送时,需要指定一个key
消息接收:
- 在声明队列和交换机的时候就写好了
Topic:话题
话题的key可以由多个单词组成,只有一个key,用.分割
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
消息发送:
// 交换机名称String exchangeName = "itcast.topic";// 消息String message = "喜报!孙悟空大战哥斯拉,胜!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
消息接收:
//消费者1 @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "china.#" )) public void listenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】"); } //消费者2 @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "#.news" )) public void listenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】"); }
消息转换器:
Spring会把发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
配置JSON转换器:
在publisher和consumer两个服务中都引入依赖:
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version> </dependency>
在启动类中添加一个Bean即可:
@Bean public MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter(); }
RibbatMQ(详解)相关推荐
- 从命令行到IDE,版本管理工具Git详解(远程仓库创建+命令行讲解+IDEA集成使用)
首先,Git已经并不只是GitHub,而是所有基于Git的平台,只要在你的电脑上面下载了Git,你就可以通过Git去管理"基于Git的平台"上的代码,常用的平台有GitHub.Gi ...
- JVM年轻代,老年代,永久代详解
秉承不重复造轮子的原则,查看印象笔记分享连接↓↓↓↓ 传送门:JVM年轻代,老年代,永久代详解 速读摘要 最近被问到了这个问题,解释的不是很清晰,有一些概念略微模糊,在此进行整理和记录,分享给大家.在 ...
- docker常用命令详解
docker常用命令详解 本文只记录docker命令在大部分情境下的使用,如果想了解每一个选项的细节,请参考官方文档,这里只作为自己以后的备忘记录下来. 根据自己的理解,总的来说分为以下几种: Doc ...
- 通俗易懂word2vec详解词嵌入-深度学习
https://blog.csdn.net/just_so_so_fnc/article/details/103304995 skip-gram 原理没看完 https://blog.csdn.net ...
- 深度学习优化函数详解(5)-- Nesterov accelerated gradient (NAG) 优化算法
深度学习优化函数详解系列目录 深度学习优化函数详解(0)– 线性回归问题 深度学习优化函数详解(1)– Gradient Descent 梯度下降法 深度学习优化函数详解(2)– SGD 随机梯度下降 ...
- CUDA之nvidia-smi命令详解---gpu
nvidia-smi是用来查看GPU使用情况的.我常用这个命令判断哪几块GPU空闲,但是最近的GPU使用状态让我很困惑,于是把nvidia-smi命令显示的GPU使用表中各个内容的具体含义解释一下. ...
- Bert代码详解(一)重点详细
这是bert的pytorch版本(与tensorflow一样的,这个更简单些,这个看懂了,tf也能看懂),地址:https://github.com/huggingface/pytorch-pretr ...
- CRF(条件随机场)与Viterbi(维特比)算法原理详解
摘自:https://mp.weixin.qq.com/s/GXbFxlExDtjtQe-OPwfokA https://www.cnblogs.com/zhibei/p/9391014.html C ...
- pytorch nn.LSTM()参数详解
输入数据格式: input(seq_len, batch, input_size) h0(num_layers * num_directions, batch, hidden_size) c0(num ...
最新文章
- 为什么我的DevOps落地过程跟别人不一样?
- 面试官:说说什么是Java内存模型?
- 《是男人就下100层》真的有隐藏剧情!B站up主数月破解,原作者点赞致谢
- Python基础——PyCharm版本——第三章、数据类型和变量(超详细)
- FineReport——获取控件值和单元格值
- 信息学奥赛一本通(1020:打印ASCII码)
- Spark 2.1.0集成CarbonData 1.1.0
- mysql for rhel7_MySQL5.7.18 for Linux7.2(二进制安装)
- android viewdraghelper 点击移动,ViewDragHelper使用时遇到的问题
- 【iCore4 双核心板_ARM】例程十四:FATFS实验——文件操作
- 解析AMD品牌的中国元素
- 增程式串联混合动力实际项目模型,本模型基于Cruise软件和Simulink软件共同搭建完成,并实际应用,本资料包包含所有源文件
- macOS Monterey 12.0 Beta版 With Clover 5136 and OC 0.7.0 and PE 三EFI分区原版黑苹果镜像
- 《码出高效:Java 开发手册》“码” 出高效的同时编写出高质量的代“码”。PDF文档资料免费开放下载!
- ZIGBEE通过协议栈点对点通信流程
- 电影文件的合并与分割
- 软件工程中国大学慕课mooc北京大学 答案
- 3D世界 ORGE SceneManager GetStart
- 听说 TCC 不支持 OpenFeign?这个坑松哥必须给大家填了
- C++ 中的隐含 *this
热门文章
- sleep()和wait()的区别
- EmbeddedServletContainerException: Unable to start embedded Tomcat 内嵌Tomcat启动失败
- ie打开后不是主页(转载)
- Python联动Excel入门教程(3--数据处理)
- Linux笔记2_Linux图形界面简介
- 看楼房 | C++ | 栈
- Python | 用相关系数进行Kmeans聚类,利用利润率、打折率、销售额、毛利润得到商品价格弹性标签,建立价格折扣力度模型
- dumpbin的使用方法_DUMPBIN命令使用详解
- dumpbin的使用方法_[Windows]_[中级]_[使用命令行工具dumpbin分析文件]
- 【Debian11】win10+VMware16安装linux虚拟机踩过的坑