SpringCloud(十一)Bus消息总线、Stream消息驱动
一、Bus消息总线
需求:分布式自动刷新配置功能;
解决:SpringCloud Bus配合Spring cloud Config使用可以实现配置的动态刷新。
1、概述
定义:Spring Cloud Bus是用来将分布式系统的节点与轻量级消息系统链接起来的框架,它整合了Java的事件处理机制和消息中间件的功能。
Spring Clud Bus目前支持RabbitMQ和Kafka。
通俗定义:bus称之为springcloud中消息总线,主要用来在微服务系统中实现远端配置更新时通过广播形式通过所有客户端刷新配置信息,避免手动重启服务的工作。
能干什么?
Spring Cloud Bus能管理和传播分布式系统间的消息,就像一个分布式执行器, 可用于广播状态更改、事件推送等,也可以当作微服务间的通信通道。
为何被称为总线?
在微服务架构的系统中,通常会使用轻量级的消息代理来构建一个共用的消息主题, 并让该系统中所有微服务实例都连接上来。由于该主题中产生的消息会被所有实例监听和消费,所以称它为消息总线。在总线上的各个实例,都可以方便地广播一些需要让其他连接在该 主题上的实例都知道的消息。
基本原理
ConfigClient实例都监听MQ中同一个topic(默认是springCloudBus)。当一个服务刷新数据的时候,它会把这个信息放入到Topic中,这样其它监听同一Topic的服务就能得到通知,然后去更新自身的配置。
2、RabbitMQ环境配置
- 安装Erlang,下载地址:opt_win64_21.3.exe
- 安装RabbitMQ,下载地址:rabbitmq-server 3.7.14
- 进入RabbitMQ安装目录下的sbin目录
- 输入以下命令启动管理功能
- 在sbin目录下运行命令行窗口cmd
- 输入命令
rabbitmq-plugins enable rabbitmq management
,这样就可以添加可视化插件了。 - 这样就添加了rabbitmq界面,只需要启动rabbitmq即可
- 访问地址查询是否安装成功:
在浏览器上输入 http://127.0.0.1:15672
- 输入账号密码并登录:guest guest
3、SpringCloud Bus动态刷新全局广播
必须先具备良好的RabbitMQ环境
为了实现广播效果,我们新建3366微服务(同3355模块)
- cloud-config-client-3366
- POM
- YML
- 启动类
- controller
设计思想:
1)利用消息总线触发一个客 户端/bus/refresh,而刷新所有客户端的配置
2)利用消息总线触发一个服务端ConfigServer的/bus/refresh端点,而刷新所有客户端的配置
图二的架构显然更加适合,图一不适合的原因如下:
- 打破了微服务的职责单一-性,因为微服务本身是业务模块,它本不应该承担配置刷新的职责。
- 破坏了微服务各节点的对等性。
- 有一定的局限性。例如,微服务在迁移时,它的网络地址常常会发生变化,此时如果想要做到自动刷新,那就会增加更多的修改
具体实现步骤:
- 给cloud-config-center-3344配置中心服务端添加消息总线支持
- POM
<!--添加消息总线RbbitMQ支持-->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
- YML:注意写的位置要对齐
#rabbit相关配置rabbitmq:host: localhostport: 5672username: guestpassword: guest#rabbitmq相关配置,暴露bus刷新配置的端点
management:endpoints: #暴露bus刷新配置的端点web:exposure:include: 'bus-refresh' #凡是暴露监控、刷新的都要有actuator依赖,bus-refresh就是actuator的刷新操作
- 给cloud-config-client-3355客户端添加消息总线支持
- POM
<!--添加消息总线RbbitMQ支持-->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
- YML
#rabbit相关配置 15672是web管理界面的端口,5672是MQ访问的端口rabbitmq:host: localhostport: 5672username: guestpassword: guest
- 给cloud-config-client-3366客户端添加消息总线支持
- 操作同上
- 测试如下
- 修改GitHub上配置文件新增版本号
- 只需要3344发一次,其他处处生效:
curl -X POST "http://localhost:3344/actuator/bus-refresh"
- 查询配置中心:http://config-3344:3344/config-dev.yml
- 查看客户端:http://localhost:3355/configInfo;http://localhost:3366/configInfo
此时获取配置信息,发现后已经刷新了
一次修改,广播通知,处处生效
4、SpringCloud Bus动态刷新定点通知
需求:不想全部通知,只想定点通知。只通知3355,不通知3366
简单来说:
- 指定具体某一个实例生效而不是全部
- 公式:
http://localhost:配置中心的端口号/actuator/bus-refresh/{destination}
- /bus/refresh请求不再发送到具体的服务实例上,而是发给config server并通过destination参数类指定需要更新配置的服务或实例
具体实现:
curl -X POST "http://localhost:3344/actuator/bus-refresh/config-client:3355"
总结:
二、Stream消息驱动
1、消息驱动概述
定义:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。
1. 什么是SpringCloud Stream?
官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。
应用程序通过inputs或者outputs与Spring Cloud Stream中binder对象交互。
通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。
所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
目前仅支持RabbitMQ、Kafka
官网文档
SpringCloud Steam中文指导手册
2. 设计思想
标准MQ(没有引出Springcloud):
- 生产者/消费者之间靠消息媒介传递信息内容:Message
- 消息必须走特定的通道:消息通道MessageChannel
- 消息通道里的消息如何被消费呢,谁负责收发处理:消息通道MessageChannel的子接口SubscribleChannel,由MessageHandle消息处理器锁订阅
为什么要用Cloud Stream?
这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。
Stream凭什么可以统一底层差异?
在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性
通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
通过向应用程序暴露统一的Channel通道, 使得应用程序不需要再考虑各种不同的消息中间件实现。
通过定义绑定器Binder
作为中间层,实现了应用程序与消息中间件细节之间的隔离。
Binder
- INPUT对应于消费者
- OUTPUT对应于生产者
通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。
Stream中的消息通信方式遵循了发布-订阅模式:Topic主题进行广播
- 在RabbitMQ就是Exchange
- 在Kakfa中就是Topic
3. Springcloud Stream标准流程套路
- Binder:很方便的连接中间件,屏蔽差异
- Channel:通道,是队列Mueue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
- Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。
4. 编码API和常用注解
2、案例说明
RabbitMQ环境是通的;
工程中创建三个子模块;
1)cloud-stream-rabbitmq-provider8801:作为生产者进行发消息模块
2)cloud-stream-rabbitmq-consumer8802:作为消息接收模块
3)cloud-stream-rabbitmq-consumer8803:作为消息接收模块
3、消息驱动之生产者
新建cloud-stream-rabbitmq-provider8801
POM
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
- YML
server:port: 8801spring:application:name: cloud-stream-providercloud:stream:binders: # 在此处配置要绑定的rabbitMQ的服务信息defaultRabbit: # 表示定义的名称,用于binding的整合type: rabbit # 消息中间件类型environment: # 设置rabbitMQ的相关环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理ouput: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的exchange名称定义content-type: application/json # 设置消息类型,本次为json,文本则设为text/plain#binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka:client:service-url:defaultZone: http://eureka7001.com:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的间隔时间,默认30lease-expiration-duration-in-seconds: 5 # 超过5秒间隔,默认90instance-id: send-8801.com # 主机名prefer-ip-address: true # 显示ip
- 主启动类
@SpringBootApplication
public class StreamMQMain8801 {public static void main(String[] args) {SpringApplication.run(StreamMQMain8801.class, args);}
}
- 业务类
发送接口实现类
//这不是传统的service,这是和rabbitmq打交道的,不需要加注解@Service
//这里不调用dao,调用消息中间件的service
//信道channel和exchange绑定在一起
@EnableBinding(Source.class) // 定义消息推动管道
public class MessageProviderImpl implements MessageProvider {// 消息发送管道@Resourceprivate MessageChannel output;@Overridepublic String send() {String serial = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());System.out.println("serial = " + serial);return null;}
}
Controller
@RestController
public class SendMessageController {@Resourceprivate MessageProvider messageProvider;@GetMapping("/sendMessage")public String sendMessage(){return messageProvider.send();}
}
- 测试
1)启动7001
2)rabbitMQ:http://localhost:15672
3)启动8801
4)访问:http://localhost:8801/sendMessage
4、消息驱动之消费者
- 新建cloud-stream-rabbitmq-consumer8802
- POM
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
</dependencies>
- YML
server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: # 在此处配置要绑定的rabbitMQ的服务信息defaultRabbit: # 表示定义的名称,用于binding的整合type: rabbit # 消息中间件类型environment: # 设置rabbitMQ的相关环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的exchange名称定义content-type: application/json # 设置消息类型,本次为json,文本则设为text/plain#binder: defaultRabbit # 设置要绑定的消息服务的具体设置#group: atguiguA
eureka:client:service-url:defaultZone: http://eureka7001.com:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的间隔时间,默认30lease-expiration-duration-in-seconds: 5 # 超过5秒间隔,默认90instance-id: receive-8802.com #主机名prefer-ip-address: true # 显示ip
- 主启动类
@SpringBootApplication
public class StreamMQMain8802 {public static void main(String[] args) {SpringApplication.run(StreamMQMain8802.class, args);}
}
- 业务类
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message<String> message){System.out.println("消费者1号,------->接收到的消息: "+message.getPayload()+"\t port: "+serverPort);}
}
- 测试8801发送8802接收消息
访问:http://localhost:8801/sendMessage
5、分组消费与持久化
- 依照8802,clon出来一个运行8803:cloud-stream-rabbitmq-consumer8803
- 启动
RabbitMQ,7001(服务注册),8801(消息生产),8802(消息消费),8803(消息消费)
- 运行后有两个问题
- 重复消费问题
- 消息持久化问题
- 消费
目前8802/8803 同时都接收到了,存在重复消费问题
如何解决:分组和持久化
生产中实际案例:
比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。
这时我们就可以使用Stream中的消息分组来解决。
注意:在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。
不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费。
- 分组
原理:
微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。
不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。
1)8802/8803都变成不同组,group两个不同
修改8002YML
group: atguiguA
修改8003YML
group: atguiguB
结论:还是重复消费。
所以:8802/8803实现了轮询分组,每次只有一个消费者;8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费。
2)8802/8803都变成相同组,group两个相同
修改8002YML
group: atguiguA
修改8003YML
group: atguiguA
结论:同一个组的多个微服务案例,每次只会有一个能拿到。
- 持久化
- 通过上述,解决了重复消费问题,再看看持久化
- 停止8802/8803并去除掉8802的分组group: atguiguA,8803的分组group:atguiguA没有去掉。
- 8801先发送4条消息到rabbitmq
- 先启动8802,无分组属性配置,后台没有打出来消息。
- 再启动8803,有分组属性配置,后台打出来了MQ上的消息。
如果有收获!!! 希望老铁们来个三连,点赞、收藏、转发。
创作不易,别忘点个赞,可以让更多的人看到这篇文章,顺便鼓励我写出更好的博客
SpringCloud(十一)Bus消息总线、Stream消息驱动相关推荐
- SpringCloud微服务架构,Config 分布式配置中心,Bus 消息总线, Stream 消息驱动,Sleuth+Zipkin 链路追踪
Config分布式配置中心 Config 概述 概述 • Spring Cloud Config 解决了在分布式场景下多环境配置文件的管理和维护. • 好处: • 集中管理配置文件 • 不同环境不同配 ...
- Spring Cloud(十一)高可用的分布式配置中心 Spring Cloud Bus 消息总线集成(RabbitMQ)
上一篇文章,留了一个悬念,Config Client 实现配置的实时更新,我们可以使用 /refresh 接口触发,如果所有客户端的配置的更改,都需要手动触发客户端 /refresh ,当服务越来越多 ...
- SpringCloud学习笔记 - 消息总线 - Spring Cloud Bus
1. 消息总线简介 消息代理中间件构建了一个共用的消息主题让所有微服务实例订阅,当该消息主题产生消息时会被所有微服务实例监听和消费. 消息代理又是什么?消息代理是一个消息验证.传输.路由的架构模式,主 ...
- Spring Cloud学习:07消息总线(Spring Cloud Bus)
2019独角兽企业重金招聘Python工程师标准>>> 1 消息总线介绍 消息总线是一种通信工具,可以在机器之间互相传输消息.文件等.消息总线扮演着一种消息路由的角色,拥有一套完备的 ...
- SpringCloud07_消息总线(Bus)
1.消息总线Bus (1)基本概念与原理 总线: 在为服务架构中使用轻量级的消息代理来构建一个公用的消息主题,并让系统中所有的微服务实例都连接,该主题中产生的消息会被所有的实例监听和消费,所以称为消息 ...
- springcloud微服务架构开发实战:分布式消息总线
消息总线的定义 前面在1.4.2节中强调过,在微服务架构中,经常会使用REST 服务或基于消息的通信机制. 在3.6节中也详细介绍了消息通信的实现方式.消息总线就是一种基于消息的通信机制. 消息总线是 ...
- (转发)一个通用的C++ 消息总线框架
注:转自https://www.cnblogs.com/qicosmos/archive/2013/04/28/3048919.html 应用开发过程中经常会处理对象间通信的问题,一般都是对象或接口的 ...
- Linux进程内消息总线设计
文章目录 Windows平台进程内消息总线 如果没有消息总线,会产生什么问题 死循环包含关系 高耦合.低内聚 消息总线 结构图 原理 生产者与总线的关系 总线与消费者的关系 Linux进程内消息总线设 ...
- 消息总线扩展之集成Thrift-RPC
本文主要探讨了消息总线支持Thrift RPC的实现过程.鉴于RabbitMQ官方的Java Client提供了基于RabbitMQ的JSON-RPC,消息总线也顺道提供了JSON-RPC的API.然 ...
最新文章
- 简明Python3教程 10.模块
- [扫盲] Salesforce.com: 业界云计算(Cloud Computing)的主要倡导者之一
- 最快捷的阅读实训新闻
- 人人都应该知道的身体小窍门!
- 有赞统一日志平台初探
- Django(part34)--一对多映射
- [css] 怎么让body高度自适应屏幕?为什么?
- 华为申请注册鸿蒙商标,华为申请“鸿蒙商标”,企业注册商标有什么价值?
- 加密解密_作业-加密解密程序
- C++ rand()函数和srand()函数
- 一维数组去重处理法一(C语言)
- 开源项目面试重要吗_开源是最重要项目的骨干
- 读《世界是数字的》笔记
- 连接MySql出现Client does not support authentication protocol requested by server错误
- 【SQL基础】SQL增删改查基本语句
- 我的世界java能开光追吗_《我的世界》开光追是怎样一种体验
- 大厂都有哪些●快速上手●项目管理秘籍?
- 代码详解:手把手教你建立自己的视频分类模型
- Adobe 安装程序无法初始化,请下载Adobe Support Advisor检测该问题
- zabbix-agent配置详解
热门文章
- 若int a = 0, b = 1, c = 2,则逻辑表达式a++ b++ || (c -= 2)执行之后
- OSI模型的传输层,会话层,表示层,
- Windows沙盒——系统自带的一次性虚拟机
- cdf表格_高速公路表格一览表
- 软件工程会议、期刊、学术组织等
- EMI/RFI (开关电源外壳带高压?)
- GitHub图标SVG版本
- Airbnb数据分析(数据来源:kaggle)
- 传世基本架构-客户端(传世文件格式分析)
- 语义化版本控制模块-Semver