服务异步通信RabbitMQ
服务异步通信
- 目录
- 1. 初识MQ
- 2. RabbitMQ快速入门
- 3. SpringAMQP
- 最后
目录
1. 初识MQ
- 同步通讯
直播、微服务间基于Feign的调用
问题:耦合高、性能下降(调用耗时、等待响应(资源浪费)、吞吐量下降、服务阻塞、级联失败) - 异步通讯
微信聊天
事件驱动模式
优势一:服务解耦
优势二:性能提升,吞吐量提高
优势三:服务没有强依赖,解决级联失败问题
优势四:流量消峰
问题一:依赖于Broker的可靠性、安全性、吞吐能力
问题二:架构复杂了,业务没有明显的流水线,不好追踪管理 - 异步通讯框架
MQ,存放消息的队列,也就是事件驱动架构中的Broker。
2. RabbitMQ快速入门
- RabbitMQ概述与安装
基于Erlang语言开发的开源消息通信中间件
使用docker安装
1.1拉取
docker pull rabbitmq:3-management
1.2导入后加载镜像
docker load -i mq.tar
2 启动MQ
docker run \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=admin \--name mq \--hostname mq1 \ -p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-managment
登录管理界面
- 常见消息模型
publisher:消息发布者,将消息发送到队列queue
queue:消息队列,负责接收并缓存消息
consumer:订阅队列,处理队列中的消息
subscribe:订阅
- 基本消息队列 BasicQueue
HelloWorld
- 工作消息队列 WorkQueue
可以提高消息处理速度,避免队列消息堆积
消息预取机制:在处理消息之前,取出所有消息
解决:每次只能获取一条消息,处理完成才能获取下一个消息
spring:rabbitmq:listener:simple:prefetch: 1
- 发布订阅
允许将同一消息发送给多个消费者。
Exchange:只负责消息的转发
广播 Fanout Exchange
将收到的消息路由到每一个跟其绑定的queue
publisher
//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;......
String exchangeName = "name.fanout";
String message = "Hello World";
rabbitTemplate.convertAndSend(exchangeName, "", message);
......
consumer
@Configration
public class FanoutConfig {@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("name.fanout");}@Beanpublic Queue fanoutQueue1(){return new Queue("name.queue1");}@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Queue fanoutQueue2(){return new Queue("name.queue2");}@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
路由 Direct Exchange
将接收到的消息根据规则路由到指定的Queue。
每个Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
BindingKey可以相同
consumer
//在consumer服务中编写消费者逻辑,绑定队列
@Component
public class SpringRabbitListener {@RabbitListener(binding= @QueueBinding(value = @Queue(name = "队列名称"), exchange = @Exchange(name = "交换机名称1", type = ExchangeTypes.DIRECT), key = {"key1","key2"}))public void listenerDirectQueue1(String msg){// 业务逻辑}@RabbitListener(binding= @QueueBinding(value = @Queue(name = "队列名称2"), exchange = @Exchange(name = "交换机名称", type = ExchangeTypes.DIRECT), key = {"key1","key2"}))public void listenerDirectQueue2(String msg){// 业务逻辑}
}
publisher
//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;......
String exchangeName = "name.direct";
String message = "Hello World";
String routingKey = "key1";
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
......
主题 Topic Exchange
与DirectExchange类似,但Topic Exchange 的RoutingKey必须时多个单词列表,并且以 . 分割
Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词
consumer
//在consumer服务中编写消费者逻辑,绑定队列
@Component
public class SpringRabbitListener {@RabbitListener(bindings= @QueueBinding(value = @Queue(name = "队列名称2"), exchange = @Exchange(name = "交换机名称", type = ExchangeTypes.TOPIC), key = "xxx.#"))public void listenerTopicQueue2(String msg){// 业务逻辑}
}
publisher
//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;......
String exchangeName = "name.direct";
String message = "Hello World";
String routingKey = "xxx.aaa";
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
......
- 快速入门
# publisher
// 建立连接
ConnecationFactory factory = new ConnectionFactory();
// 设置连接参数 主机名、端口号、vhost、用户名、密码
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
// 建立连接
Connaction connetion = factory.newConnection();
// 创建通道Channel
Channel channel = connetion.createChannel();
// 创建队列Queues
String queueName = "one.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 发送消息
String message = "Hello World";
channel.basicPubulsh("", queueName, null, message.getBytes);
// 关闭通道、连接
channel.close();
connetion.close();
# consumer
// 建立连接
ConnecationFactory factory = new ConnectionFactory();
// 设置连接参数 主机名、端口号、vhost、用户名、密码
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
// 建立连接
Connaction connetion = factory.newConnection();
// 创建通道Channel
Channel channel = connetion.createChannel();
// 创建队列Queues
String queueName = "one.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 订阅消息
String message = "Hello World";
channel.basicPubulsh(queueName, true, new DefaultConsumer()channel{@Override
pubilc void handleDelivery(String consumerTag, Envelope envelpoe, AMQP.BasicProperties properties, byte[] body) throe IOException {// 处理消息}
});
// 关闭通道、连接
channel.close();
connetion.close();
3. SpringAMQP
AMQP,应用程序之间传递消息的协议,与平台无关。
SpringAMQP,基于AMQP协议定义的一套API。spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
监听器容器,用于异步处理入站消息
用于发送和接收消息的RabbitTemplate
RabbitAdmin用于自动声明队列,交换和绑定
注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能
- 引入依赖
<!-- 父工程中引入spring-amqp的依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- publisher 发送消息
spring:rabbitmq:host: 127.0.0.1 # rabbitMQ ip地址port: 5672virtual-host: /username: adminpassword: admin
//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;......
String queueName = "queueName";
String message = "Hello World";
rabbitTemplate.convertAndSend(queueName, message);
......
- consumer 监听消息
spring:rabbitmq:host: 127.0.0.1 # 主机名port: 5672virtual-host: /username: adminpassword: admin
//在consumer服务中编写消费者逻辑,绑定队列
@Component
public class SpringRabbitListener {@RabbitListener(queue = "队列名称")public void listenerSimpleQueue(String msg){// 业务逻辑}
}
消息转换器
RabbitTemplate ,将message对象序列化为字节
修改序列化方式
父工程
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>
publisher
@Configration
public class SpringRabbitListener {@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}
最后
以上是学习 黑马程序员《微服务技术全栈教程》的学习笔记
服务异步通信RabbitMQ相关推荐
- RabbitMQ 服务异步通信 -- 初识MQ(同步通信和异步通信、MQ、几种常见MQ的对比)、RabbitMQ安装和介绍
文章目录 1. 初识MQ 1.1 同步通信和异步通信 1.1.1 同步通信存在的问题 1.1.2 同步调用小结 1.1.3 异步通讯 1.1.4 异步调用方案 1.1.5 异步调用小结 1.2 什么是 ...
- RabbitMQ 服务异步通信 -- 入门案例(消息预存机制)、SpringAMQP、发布订阅模式(FanoutExchange、DirectExchange、TopicExchange)、消息转换器
文章目录 1. 入门案例 2. 完成官方Demo中的hello world案例 2.1 创建1个工程,2个模块 2.1.1 父工程的依赖,子工程不需要导入额外的依赖 2.1.2 配置子工程的配置文件( ...
- Spring Cloud Alibaba 消息队列:基于 RocketMQ 实现服务异步通信
本讲咱们将学习以下三方面内容: 介绍消息队列与 Alibaba RocketMQ: 掌握 RocketMQ 的部署方式: 讲解微服务接入 RocketMQ 的开发技巧: 首先咱们先来认识什么是消息队列 ...
- SpringCloud 微服务(六)-服务异步通信
看完了黑马程序员的免费课程,感觉受益匪浅,写个笔记,记录一下 课程地址:SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,史上最全面的springcloud微服务技术栈 ...
- 服务异步通信-高级篇
消息队列在使用过程中,面临着很多实际问题需要思考: 消息可靠性 消息从发送,到消费者接受,会经历多个过程: 其中的每一步都可能导致消息丢失,常见的丢失消息的原因包括: 发送时丢失: 生产者发送的消息未 ...
- 【微服务】RabbitMQ部署高级篇
RabbitMQ部署高级篇 1.单机部署 1.1.下载镜像 1.2.安装MQ 2.安装DelayExchange插件 2.1.下载插件 2.2.上传插件 2.3.安装插件 3.集群部署 2.1.集群分 ...
- (四)RabbitMQ消息队列-服务详细配置与日常监控管理
(四)RabbitMQ消息队列-服务详细配置与日常监控管理 原文:(四)RabbitMQ消息队列-服务详细配置与日常监控管理 RabbitMQ服务管理 启动服务:rabbitmq-server -de ...
- 使用.NET Core 2.1,RabbitMQ,SignalR,EF Core 2.1和Angular 6开发微服务
目录 介绍 单一软件 微服务架构 微服务设计与规划 示例应用程序 示例应用程序的微服务 微服务进程间通信 微服务与消息队列之间的消息传递 RabbitMQ消息代理 消息队列体系结构目标和决策 帐户管理 ...
- 微服务架构集成RabbitMQ给用户推送消息(发送短信,发送邮件,发送站内信息)
因为是分布式微服务项目,所以发送方在一个微服务,接收方在另外的一个微服务,在发送方,导入RabbitMQ依赖包 <!--RabbitMQ依赖--><dependency>< ...
最新文章
- flink启动命令参数_Flink调优之前,必须先看懂的TaskManager内存模型
- 【CodeVS1080】线段树练习
- 异步复位同步释放_简谈同步复位和异步复位
- 数据库操作的隔离级别 Transaction Isolation Levels
- 您的光纤电缆和测试仪是否准备好用于400G以太网?
- 常用的redis命令
- 集设分享最全字体帮合集,愿这些作品能给你带来灵感和启迪。
- c/c++入门教程 - 1.基础c/c++ - 1.0 Visual Studio 2019安装环境搭建
- windows安装补丁慢 360安全卫士和腾讯电脑管家安装同样卡住 解决办法
- 房地产项目动态计划管理系统
- 2017年信息学奥赛NOIP普及组试题
- 微软中国2023校招【内推】全面开启!
- 计算机无法读取智能卡,电脑智能卡读卡器驱动程序丢失怎么办?如何重新安装智能卡服务?...
- AS 编写 Xposed 插件需要修改的地方
- SQL Server数据并发处理
- 计算机JAVA相关说课稿_面向对象程序设计-java说课稿
- 英语时态:一般、否定疑问句、现表将来
- 工程建设项目全套流程,门清!
- 请问下这个hive beeline命令是啥意思。
- k8s部署微服务组件eureka