Spring Cloud【Finchley】- 21 Spring Cloud Stream 构建消息驱动微服务
文章目录
- 概述
- 添加依赖
- 配置文件配置RabbitMQ的地址信息
- 接口定义
- 接收方 @EnableBinding @StreamListener
- 测试
- 消费组
- 发送复杂对象
- 消息回执
- 代码
概述
官网 : https://spring.io/projects/spring-cloud-stream
概括来说,Spring Cloud Stream 进一步封装了消息队列,可以做到代码层面对消息队列无感知。
这里我们仅仅是做个入门级别的介绍,更多用法还是参考官网上的指导说明,毕竟最权威了。
添加依赖
无需多说,要想使用Spring Cloud Stream ,第一步肯定是添加依赖了 ,如下
这里使用的消息队列是 RabbitMQ ,如果你是用的是kafka,换成对应的spring-cloud-starter-stream-kafka
依赖即可
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
配置文件配置RabbitMQ的地址信息
spring-cloud-starter-stream-rabbit是Spring Cloud Stream对RabbitMQ的封装,包含了对RabbitMQ的自动化配置,比如连接的RabbitMQ的默认地址localhost,默认端口5672,默认用户guest,默认密码guest,如果采用的是如上默认配置,可以不用修改配置。
这里我把配置文件放到了远端的Git,通过config server 拉取配置。
RabbitMQ的安装 ,这里我选择了使用Docker镜像,安装如下
在Docker CE中安装RabbitMQ
接口定义
可知: Sink和Source两个接口分别定义了输入通道和输出通道,Processor通过继承Source和Sink,同时具有输入通道和输出通道。这里我们就模仿Sink和Source,自定义一个消息通道。
package com.artisan.order.message;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;public interface ArtisanSink {// 同一个服务里面的通道名字不能一样,在不同的服务里可以相同名字的通道// 否则启动抛出如下异常 bean definition with this name already existsString INPUT = "MyMsgInput";@Input(ArtisanSink.INPUT)SubscribableChannel input();}
如上定义了一个名为MyMsgInput的消息输入通道,@Input注解的参数则表示了消息通道的名称
接收方 @EnableBinding @StreamListener
StreamReceive 用来接收RabbitMQ发送来的消息
package com.artisan.order.message;import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;/*** 接收方*/@Component
// Step1 注解 绑定刚才的接口
@EnableBinding(ArtisanSink.class)
@Slf4j
public class StreamReceive {// Step2 @StreamListener 绑定对象的名称@StreamListener(ArtisanSink.INPUT)public void processStreamMsg(Object msg){log.info("StreamReceive: {}",msg);}}
- 第一步: 使用了
@EnableBinding
注解实现对消息通道的绑定,我们在该注解中还传入了一个参数ArtisanSink.class,ArtisanSink是一个自定义接口,主要功能是实现对输入消息通道绑定的定义。 - 第二步:在StreamReceive 类中定义了processStreamMsg方法,重点是在该方法上添加了
@StreamListener
注解,该注解表示该方法为消息中间件上数据流的事件监听器,ArtisanSink.INPUT参数表示这是input消息通道上的监听处理器。
测试
模拟发送发发送消息,方便起见,我们直接在Controller层写个方法吧
package com.artisan.order.controller;import com.artisan.order.message.Sink;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@Slf4j
public class MsgStreamController {@Autowiredprivate ArtisanSink sink;@GetMapping("/sendMsgByStream")public void sendMsgByStream(){String message = "I am one msg sent by Spring Cloud Stream";sink.input().send(MessageBuilder.withPayload(message).build());}
}
通过 @Autowired自动注入刚才的Sink接口,然后调用 sink.input().send方法发送消息即可。
启动服务,观察RabbitMQ上的队列 ,自动创建了一个
点进去看下
MyMsgInput和 在接口中的定义一致 。
访问: http://localhost:8081/sendMsgByStream
观察日志:
2019-04-13 10:56:32.749 INFO 820 --- [nio-8081-exec-4] com.artisan.order.message.StreamReceive : StreamReceive: I am one msg sent by Spring Cloud Stream
接收方收到了一条消息如上,OK。
消费组
需求: 由于服务可能会有多个实例同时在运行,我们只希望消息被一个实例所接收
先来改造下项目,启动多个服务实例
为了多启动几个节点,我们需要把定义在远端Git上的要加载到bootstrap.yml中的端口信息给注释掉,否则第二个端口因端口冲突起不来。
然后通过如下方式在JVM参数中指定启动端口
第一个app 启动端口 -Dserver.port=8082
第一个app 启动端口 -Dserver.port=5656
启动后查看在Eureka Server上的注册情况
再看看RabbitMQ的消息队列情况,两个 OK
旧版本中 ,如果不做任何设置,此时发送一条消息将会被所有的实例接收到 ,但是可以通过消息分组来解决 。
具体可参考: https://segmentfault.com/a/1190000011796459
主要是配置分组
spring:cloud:stream:bindings:# MyMsgInput 自定义 order消费组MyMsgInput:# 消息组的名称group: order#输入通道的主题名destination: MyMsgInput#存在消息队列中的消息,如果是复杂对象,则以JSON的形式展示content-type: application/json
新版本:
Spring Boot : 2.0.3.RELEASE
Spring Cloud : Finchley.RELEASE
经过测试 不存在这个问题
把这俩节点的日志信息都清空掉,重新发送个消息
我们就用5656这个节点好了 ,http://localhost:5656/sendMsgByStream
经过验证只有5656这一个节点收到了消息。无需设置分组。
发送复杂对象
上面的例子中我们发送的是一个字符串,
如果是复杂对象呢? 来测试下
@GetMapping("/sendMsgByStream2")public void sendMsgByStream2(){OrderDTO orderDTO = new OrderDTO();orderDTO.setOrderId("11111");orderDTO.setOrderAmount(new BigDecimal(9999));sink.input().send(MessageBuilder.withPayload(orderDTO).build());}
启动5656端口的服务,访问 http://localhost:5656/sendMsgByStream2
观察日志:
2019-04-13 17:06:47.438 INFO 13764 --- [nio-5656-exec-1] com.artisan.order.message.StreamReceive : StreamReceive: OrderDTO(orderId=11111, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null, orderAmount=9999, orderStatus=null, payStatus=null, orderDetailList=null)
OK。
这是我们如果把消息消费方注释掉,让消息累计在消息队列中,我们去看下消息队列中存储的复杂对象的格式
启动5656端口的服务,访问 http://localhost:5656/sendMsgByStream2
org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
消息回执
消费者收到消息后给发送方一个ACK确认,该如何做呢?
比如接收到消息后,返回给ArtisanSource.OUTPUT一个消息,直接使用@SendTo直接即可,就会将返回的字符串发送给ArtisanSource.OUTPUT通道
定义一个
package com.artisan.order.message;import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;public interface ArtisanSource {String OUTPUT = "MyMsgOutput";@Output(ArtisanSource.OUTPUT)MessageChannel output();
}
写一个该消息的接收方
package com.artisan.order.message;import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;/*** 接收方*/@Component
// Step1 注解 绑定刚才的接口
@EnableBinding(ArtisanSource.class)
@Slf4j
public class StreamReceive2 {// Step2 @StreamListener 绑定对象的名称@StreamListener(ArtisanSource.OUTPUT)public void processStreamMsg2(String msg){log.info("OUTPUT StreamReceive: {}",msg);}}
启动微服务,访问 http://localhost:5656/sendMsgByStream2
2019-04-13 18:06:51.817 INFO 972 --- [nio-5656-exec-1] com.artisan.order.message.StreamReceive : INPUT StreamReceive: OrderDTO(orderId=11111, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null, orderAmount=9999, orderStatus=null, payStatus=null, orderDetailList=null)
2019-04-13 18:06:51.823 INFO 972 --- [nio-5656-exec-1] c.artisan.order.message.StreamReceive2 : OUTPUT StreamReceive: received OK
代码
https://github.com/yangshangwei/springcloud-o2o/tree/master/artisan_order
Spring Cloud【Finchley】- 21 Spring Cloud Stream 构建消息驱动微服务相关推荐
- Spring Cloud Streams Messaging消息驱动微服务实践
作者:禅与计算机程序设计艺术 1.简介 消息驱动微服务是一个新的分布式架构模式,它基于异步通信和事件驱动的消息传递机制,通过轻量级的消息代理与集成框架实现分布式系统的解耦合.弹性伸缩和可靠性保证.Sp ...
- java B2B2C Springboot多租户电子商城系统-Spring Cloud Stream(消息驱动)
1.什么是Spring Cloud Stream 愿意了解源码的朋友直接企鹅求求:二一四七七七五六三三 Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架.它可以基于 ...
- spring解耦_云端时代的解耦:使用Spring Cloud Azure构建云端原生微服务
spring解耦 重要要点 云本机应用程序应充分利用云的优势,而不仅仅是迁移到云中 通过在云计算环境上运行,微服务与云原生环境并驾齐驱 集中配置,服务发现,异步消息驱动和分布式跟踪是微服务基础架构 S ...
- 微服务平台(Micro Service Platform : MSP)旨在提供一个集开发、测试、运维于一体的开发者专属平台,让开发者能快速构建或使用微服务,让开发更简单,让运维更高效。...
微服务平台(Micro Service Platform : MSP)旨在提供一个集开发.测试.运维于一体的开发者专属平台,让开发者能快速构建或使用微服务,让开发更简单,让运维更高效. MSP采用业界 ...
- 如何构建成功的微服务架构?带你洞悉微服务构建流程,以实战角度出发,详解微服务架构
前言 随着技术变得更加复杂,许多团队正在评估他们的架构如何最好地支持未来的业务.其中一种架构,微服务正在成为前瞻性技术部门越来越流行的选择.微服务架构可能是释放业务潜力的关键,但如何实现呢? 微服务是 ...
- nodejs 调用微服务器_无服务器NodeJS:构建下一个微服务的快速,廉价方法
nodejs 调用微服务器 by Filipe Tavares 由Filipe Tavares 无服务器NodeJS:构建下一个微服务的快速,廉价方法 (Serverless NodeJS: the ...
- 项目是采用目前比较流行的 SpringBoot/SpringCloudAlibaba构建新零售微服务电商项目
简介: 技术架构 项目是采用目前比较流行的 SpringBoot/SpringCloudAlibaba构建新零售微服务电商项目,从项目中台架构技术选型.模块设计.基础设施的构建.分布式解决方 案.互联 ...
- jq项目如何启服务_用小项目详解我们应该如何去构建我们的微服务
前言 关于微服务的概念,说到底,很多人看了之后会认为没有什么意思,因为没有实际的东西说明,即使每个概念都明白了,也很难付之实践.所以这次,我来用一个实际的例子去说明,在实际的项目过程中我们会如何去构建 ...
- 面对微服务的N种坑,我们需要构建综合的微服务治理能力
这几年微服务的热度持续居高不下,企业纷纷向微服务架构转型.但在微服务落地时,大家更多是在技术架构层面发力,以为所谓的微服务化就是简单的引入一套微服务框架,却忽略了微服务架构带来的影响是全方位的,它会对 ...
最新文章
- edge浏览器如何把网页放到桌面_最强桌面浏览器
- Vue 学习 第六天学习笔记
- ActionBar之style出现Cannot resolve symbol 'Theme' 错误
- nginx源码分析—模块及其初始化
- todo项目开发_Facebook的TODO项目,巴西的Coursera,Drupal等
- python xlwt设置单元格的自定义背景颜色
- # JDK7+ MethodHandle
- 【人脸表情识别】基于matlab GUI稀疏表示人脸表情识别【含Matlab源码 786期】
- 双击计算机显示远程调用失败,win10提示远程调用过程失败且未执行的修复办法...
- 基于JavaEE的人力资源管理系统的设计与实现任务书与开题报告
- excel文件运行报错(xx.xlsx)不是有效的win32应用程序
- Java的JDK在哪里下载,如何下载?
- cors数据类型_CORS账号和南方RTK连接怎样操作使用?步骤说明
- 18岁少年盗取1500万日元萌乃币, 逼交易所关停, 引发日本史上第一次加密币盗窃法律诉讼...
- Mathematica13.1的安装与使用配置
- 如何在Excel表格中给某一列添加固定筛选项
- ESP8266-Arduino网络编程实例-WiFi连接丢失解决方法
- Chrome浏览器自带截长图功能,只需两个快捷键!
- matlab 函数,matlab 语法1
- 一、python入门整体快速学习