服务异步通信

  • 目录
    • 1. 初识MQ
    • 2. RabbitMQ快速入门
    • 3. SpringAMQP
    • 最后

目录

1. 初识MQ

  1. 同步通讯
    直播、微服务间基于Feign的调用
    问题:耦合高、性能下降(调用耗时、等待响应(资源浪费)、吞吐量下降、服务阻塞、级联失败)
  2. 异步通讯
    微信聊天
    事件驱动模式

    优势一:服务解耦
    优势二:性能提升,吞吐量提高
    优势三:服务没有强依赖,解决级联失败问题
    优势四:流量消峰
    问题一:依赖于Broker的可靠性、安全性、吞吐能力
    问题二:架构复杂了,业务没有明显的流水线,不好追踪管理
  3. 异步通讯框架
    MQ,存放消息的队列,也就是事件驱动架构中的Broker。

2. RabbitMQ快速入门

  1. RabbitMQ概述与安装
    基于Erlang语言开发的开源消息通信中间件
    使用docker安装
1.1拉取
docker pull rabbitmq:3-management
1.2导入后加载镜像
docker load -i mq.tar
2 启动MQ
docker run \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=admin \--name mq \--hostname mq1 \ -p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-managment

登录管理界面

  1. 常见消息模型
    publisher:消息发布者,将消息发送到队列queue
    queue:消息队列,负责接收并缓存消息
    consumer:订阅队列,处理队列中的消息
    subscribe:订阅
  • 基本消息队列 BasicQueue
    HelloWorld
  • 工作消息队列 WorkQueue
    可以提高消息处理速度,避免队列消息堆积

    消息预取机制:在处理消息之前,取出所有消息
    解决:每次只能获取一条消息,处理完成才能获取下一个消息
spring:rabbitmq:listener:simple:prefetch: 1
  • 发布订阅
    允许将同一消息发送给多个消费者。
    Exchange:只负责消息的转发
    广播 Fanout Exchange
    将收到的消息路由到每一个跟其绑定的queue

    publisher
//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;......
String exchangeName = "name.fanout";
String message = "Hello World";
rabbitTemplate.convertAndSend(exchangeName, "", message);
......

consumer

@Configration
public class FanoutConfig {@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("name.fanout");}@Beanpublic Queue fanoutQueue1(){return new Queue("name.queue1");}@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Queue fanoutQueue2(){return new Queue("name.queue2");}@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

路由 Direct Exchange
将接收到的消息根据规则路由到指定的Queue。

每个Queue都与Exchange设置一个BindingKey
发布者发送消息时,指定消息的RoutingKey
Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
BindingKey可以相同

consumer

//在consumer服务中编写消费者逻辑,绑定队列
@Component
public class SpringRabbitListener {@RabbitListener(binding= @QueueBinding(value = @Queue(name = "队列名称"), exchange = @Exchange(name = "交换机名称1", type = ExchangeTypes.DIRECT), key = {"key1","key2"}))public void listenerDirectQueue1(String msg){// 业务逻辑}@RabbitListener(binding= @QueueBinding(value = @Queue(name = "队列名称2"), exchange = @Exchange(name = "交换机名称", type = ExchangeTypes.DIRECT), key = {"key1","key2"}))public void listenerDirectQueue2(String msg){// 业务逻辑}
}

publisher

//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;......
String exchangeName = "name.direct";
String message = "Hello World";
String routingKey = "key1";
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
......

主题 Topic Exchange
与DirectExchange类似,但Topic Exchange 的RoutingKey必须时多个单词列表,并且以 . 分割
Queue与Exchange指定BindingKey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词

consumer

//在consumer服务中编写消费者逻辑,绑定队列
@Component
public class SpringRabbitListener {@RabbitListener(bindings= @QueueBinding(value = @Queue(name = "队列名称2"), exchange = @Exchange(name = "交换机名称", type = ExchangeTypes.TOPIC), key = "xxx.#"))public void listenerTopicQueue2(String msg){// 业务逻辑}
}

publisher

//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;......
String exchangeName = "name.direct";
String message = "Hello World";
String routingKey = "xxx.aaa";
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
......
  1. 快速入门
# publisher
// 建立连接
ConnecationFactory factory = new ConnectionFactory();
// 设置连接参数 主机名、端口号、vhost、用户名、密码
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
// 建立连接
Connaction connetion = factory.newConnection();
// 创建通道Channel
Channel channel = connetion.createChannel();
// 创建队列Queues
String queueName = "one.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 发送消息
String message = "Hello World";
channel.basicPubulsh("", queueName, null, message.getBytes);
// 关闭通道、连接
channel.close();
connetion.close();
# consumer
// 建立连接
ConnecationFactory factory = new ConnectionFactory();
// 设置连接参数 主机名、端口号、vhost、用户名、密码
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
// 建立连接
Connaction connetion = factory.newConnection();
// 创建通道Channel
Channel channel = connetion.createChannel();
// 创建队列Queues
String queueName = "one.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 订阅消息
String message = "Hello World";
channel.basicPubulsh(queueName, true, new DefaultConsumer()channel{@Override
pubilc void handleDelivery(String consumerTag, Envelope envelpoe, AMQP.BasicProperties properties, byte[] body) throe IOException {// 处理消息}
});
// 关闭通道、连接
channel.close();
connetion.close();

3. SpringAMQP

  • AMQP,应用程序之间传递消息的协议,与平台无关。

  • SpringAMQP,基于AMQP协议定义的一套API。spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

    监听器容器,用于异步处理入站消息
    用于发送和接收消息的RabbitTemplate
    RabbitAdmin用于自动声明队列,交换和绑定

注意:消息一旦消费就会从队列删除,RabbitMQ没有消息回溯功能

  1. 引入依赖
<!-- 父工程中引入spring-amqp的依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. publisher 发送消息
spring:rabbitmq:host: 127.0.0.1 # rabbitMQ ip地址port: 5672virtual-host: /username: adminpassword: admin
//在publisher服务中利用RabbitTemplate发送消息到队列
@Aoutwired
private RabbitTemplate rabbitTemplate;......
String queueName = "queueName";
String message = "Hello World";
rabbitTemplate.convertAndSend(queueName, message);
......
  1. consumer 监听消息
spring:rabbitmq:host: 127.0.0.1 # 主机名port: 5672virtual-host: /username: adminpassword: admin
//在consumer服务中编写消费者逻辑,绑定队列
@Component
public class SpringRabbitListener {@RabbitListener(queue = "队列名称")public void listenerSimpleQueue(String msg){// 业务逻辑}
}

消息转换器
RabbitTemplate ,将message对象序列化为字节

修改序列化方式

父工程

<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>

publisher

@Configration
public class SpringRabbitListener {@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}

最后

以上是学习 黑马程序员《微服务技术全栈教程》的学习笔记

服务异步通信RabbitMQ相关推荐

  1. RabbitMQ 服务异步通信 -- 初识MQ(同步通信和异步通信、MQ、几种常见MQ的对比)、RabbitMQ安装和介绍

    文章目录 1. 初识MQ 1.1 同步通信和异步通信 1.1.1 同步通信存在的问题 1.1.2 同步调用小结 1.1.3 异步通讯 1.1.4 异步调用方案 1.1.5 异步调用小结 1.2 什么是 ...

  2. RabbitMQ 服务异步通信 -- 入门案例(消息预存机制)、SpringAMQP、发布订阅模式(FanoutExchange、DirectExchange、TopicExchange)、消息转换器

    文章目录 1. 入门案例 2. 完成官方Demo中的hello world案例 2.1 创建1个工程,2个模块 2.1.1 父工程的依赖,子工程不需要导入额外的依赖 2.1.2 配置子工程的配置文件( ...

  3. Spring Cloud Alibaba 消息队列:基于 RocketMQ 实现服务异步通信

    本讲咱们将学习以下三方面内容: 介绍消息队列与 Alibaba RocketMQ: 掌握 RocketMQ 的部署方式: 讲解微服务接入 RocketMQ 的开发技巧: 首先咱们先来认识什么是消息队列 ...

  4. SpringCloud 微服务(六)-服务异步通信

    看完了黑马程序员的免费课程,感觉受益匪浅,写个笔记,记录一下 课程地址:SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,史上最全面的springcloud微服务技术栈 ...

  5. 服务异步通信-高级篇

    消息队列在使用过程中,面临着很多实际问题需要思考: 消息可靠性 消息从发送,到消费者接受,会经历多个过程: 其中的每一步都可能导致消息丢失,常见的丢失消息的原因包括: 发送时丢失: 生产者发送的消息未 ...

  6. 【微服务】RabbitMQ部署高级篇

    RabbitMQ部署高级篇 1.单机部署 1.1.下载镜像 1.2.安装MQ 2.安装DelayExchange插件 2.1.下载插件 2.2.上传插件 2.3.安装插件 3.集群部署 2.1.集群分 ...

  7. (四)RabbitMQ消息队列-服务详细配置与日常监控管理

    (四)RabbitMQ消息队列-服务详细配置与日常监控管理 原文:(四)RabbitMQ消息队列-服务详细配置与日常监控管理 RabbitMQ服务管理 启动服务:rabbitmq-server -de ...

  8. 使用.NET Core 2.1,RabbitMQ,SignalR,EF Core 2.1和Angular 6开发微服务

    目录 介绍 单一软件 微服务架构 微服务设计与规划 示例应用程序 示例应用程序的微服务 微服务进程间通信 微服务与消息队列之间的消息传递 RabbitMQ消息代理 消息队列体系结构目标和决策 帐户管理 ...

  9. 微服务架构集成RabbitMQ给用户推送消息(发送短信,发送邮件,发送站内信息)

    因为是分布式微服务项目,所以发送方在一个微服务,接收方在另外的一个微服务,在发送方,导入RabbitMQ依赖包 <!--RabbitMQ依赖--><dependency>< ...

最新文章

  1. flink启动命令参数_Flink调优之前,必须先看懂的TaskManager内存模型
  2. 【CodeVS1080】线段树练习
  3. 异步复位同步释放_简谈同步复位和异步复位
  4. 数据库操作的隔离级别 Transaction Isolation Levels
  5. 您的光纤电缆和测试仪是否准备好用于400G以太网?
  6. 常用的redis命令
  7. 集设分享最全字体帮合集,愿这些作品能给你带来灵感和启迪。
  8. c/c++入门教程 - 1.基础c/c++ - 1.0 Visual Studio 2019安装环境搭建
  9. windows安装补丁慢 360安全卫士和腾讯电脑管家安装同样卡住 解决办法
  10. 房地产项目动态计划管理系统
  11. 2017年信息学奥赛NOIP普及组试题
  12. 微软中国2023校招【内推】全面开启!
  13. 计算机无法读取智能卡,电脑智能卡读卡器驱动程序丢失怎么办?如何重新安装智能卡服务?...
  14. AS 编写 Xposed 插件需要修改的地方
  15. SQL Server数据并发处理
  16. 计算机JAVA相关说课稿_面向对象程序设计-java说课稿
  17. 英语时态:一般、否定疑问句、现表将来
  18. 工程建设项目全套流程,门清!
  19. 请问下这个hive beeline命令是啥意思。
  20. k8s部署微服务组件eureka

热门文章

  1. 【python】用turtle画七巧板
  2. ThreadLocal学习笔记
  3. 降维专题(一):为什么要降维?
  4. 如何用html5实现网页聊天,HTML5 WebSocket实现点对点聊天的示例代码
  5. c语言变量大全,C语言变量
  6. 【vue大师晋级之路第二集:深入了解组件】第3章——自定义事件
  7. 计算机应用技术与物联网专业介绍,《物联网应用技术》专业简介
  8. java调用帆软cpt文件_报表中心FineReport中java如何直接调用报表打印
  9. HTTP/HTTP/HTTP
  10. electron 主进程与渲染进程通讯