文章目录

  • 概述
  • 添加依赖
  • 配置文件配置RabbitMQ的地址信息
  • 接口定义
  • 接收方 @EnableBinding @StreamListener
  • 测试
  • 消费组
  • 发送复杂对象
  • 消息回执
  • 代码

概述

官网 : https://spring.io/projects/spring-cloud-stream

概括来说,Spring Cloud Stream 进一步封装了消息队列,可以做到代码层面对消息队列无感知。

这里我们仅仅是做个入门级别的介绍,更多用法还是参考官网上的指导说明,毕竟最权威了。


添加依赖

无需多说,要想使用Spring Cloud Stream ,第一步肯定是添加依赖了 ,如下

这里使用的消息队列是 RabbitMQ ,如果你是用的是kafka,换成对应的spring-cloud-starter-stream-kafka依赖即可

     <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>


配置文件配置RabbitMQ的地址信息

spring-cloud-starter-stream-rabbit是Spring Cloud Stream对RabbitMQ的封装,包含了对RabbitMQ的自动化配置,比如连接的RabbitMQ的默认地址localhost,默认端口5672,默认用户guest,默认密码guest,如果采用的是如上默认配置,可以不用修改配置。

这里我把配置文件放到了远端的Git,通过config server 拉取配置。

RabbitMQ的安装 ,这里我选择了使用Docker镜像,安装如下

在Docker CE中安装RabbitMQ


接口定义

可知: Sink和Source两个接口分别定义了输入通道和输出通道,Processor通过继承Source和Sink,同时具有输入通道和输出通道。这里我们就模仿Sink和Source,自定义一个消息通道。

package com.artisan.order.message;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;public interface ArtisanSink {// 同一个服务里面的通道名字不能一样,在不同的服务里可以相同名字的通道// 否则启动抛出如下异常  bean definition with this name already existsString INPUT = "MyMsgInput";@Input(ArtisanSink.INPUT)SubscribableChannel input();}

如上定义了一个名为MyMsgInput的消息输入通道,@Input注解的参数则表示了消息通道的名称


接收方 @EnableBinding @StreamListener

StreamReceive 用来接收RabbitMQ发送来的消息

package com.artisan.order.message;import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;/*** 接收方*/@Component
// Step1 注解  绑定刚才的接口
@EnableBinding(ArtisanSink.class)
@Slf4j
public class StreamReceive {// Step2  @StreamListener 绑定对象的名称@StreamListener(ArtisanSink.INPUT)public void processStreamMsg(Object msg){log.info("StreamReceive: {}",msg);}}
  • 第一步: 使用了@EnableBinding注解实现对消息通道的绑定,我们在该注解中还传入了一个参数ArtisanSink.class,ArtisanSink是一个自定义接口,主要功能是实现对输入消息通道绑定的定义。
  • 第二步:在StreamReceive 类中定义了processStreamMsg方法,重点是在该方法上添加了@StreamListener注解,该注解表示该方法为消息中间件上数据流的事件监听器,ArtisanSink.INPUT参数表示这是input消息通道上的监听处理器。

测试

模拟发送发发送消息,方便起见,我们直接在Controller层写个方法吧

package com.artisan.order.controller;import com.artisan.order.message.Sink;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@Slf4j
public class MsgStreamController {@Autowiredprivate ArtisanSink sink;@GetMapping("/sendMsgByStream")public void sendMsgByStream(){String message = "I am one msg sent by Spring Cloud Stream";sink.input().send(MessageBuilder.withPayload(message).build());}
}

通过 @Autowired自动注入刚才的Sink接口,然后调用 sink.input().send方法发送消息即可。

启动服务,观察RabbitMQ上的队列 ,自动创建了一个

点进去看下

MyMsgInput和 在接口中的定义一致 。

访问: http://localhost:8081/sendMsgByStream

观察日志:

2019-04-13 10:56:32.749  INFO 820 --- [nio-8081-exec-4] com.artisan.order.message.StreamReceive  : StreamReceive: I am one msg sent by Spring Cloud Stream

接收方收到了一条消息如上,OK。


消费组

需求: 由于服务可能会有多个实例同时在运行,我们只希望消息被一个实例所接收

先来改造下项目,启动多个服务实例

为了多启动几个节点,我们需要把定义在远端Git上的要加载到bootstrap.yml中的端口信息给注释掉,否则第二个端口因端口冲突起不来。

然后通过如下方式在JVM参数中指定启动端口
第一个app 启动端口 -Dserver.port=8082
第一个app 启动端口 -Dserver.port=5656

启动后查看在Eureka Server上的注册情况


再看看RabbitMQ的消息队列情况,两个 OK


旧版本中 ,如果不做任何设置,此时发送一条消息将会被所有的实例接收到 ,但是可以通过消息分组来解决 。
具体可参考: https://segmentfault.com/a/1190000011796459

主要是配置分组

spring:cloud:stream:bindings:# MyMsgInput 自定义   order消费组MyMsgInput:# 消息组的名称group: order#输入通道的主题名destination: MyMsgInput#存在消息队列中的消息,如果是复杂对象,则以JSON的形式展示content-type: application/json

新版本:
Spring Boot : 2.0.3.RELEASE
Spring Cloud : Finchley.RELEASE

经过测试 不存在这个问题

把这俩节点的日志信息都清空掉,重新发送个消息

我们就用5656这个节点好了 ,http://localhost:5656/sendMsgByStream
经过验证只有5656这一个节点收到了消息。无需设置分组。


发送复杂对象

上面的例子中我们发送的是一个字符串,

如果是复杂对象呢? 来测试下

 @GetMapping("/sendMsgByStream2")public void sendMsgByStream2(){OrderDTO orderDTO = new OrderDTO();orderDTO.setOrderId("11111");orderDTO.setOrderAmount(new BigDecimal(9999));sink.input().send(MessageBuilder.withPayload(orderDTO).build());}

启动5656端口的服务,访问 http://localhost:5656/sendMsgByStream2

观察日志:

2019-04-13 17:06:47.438  INFO 13764 --- [nio-5656-exec-1] com.artisan.order.message.StreamReceive  : StreamReceive: OrderDTO(orderId=11111, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null, orderAmount=9999, orderStatus=null, payStatus=null, orderDetailList=null)

OK。

这是我们如果把消息消费方注释掉,让消息累计在消息队列中,我们去看下消息队列中存储的复杂对象的格式

启动5656端口的服务,访问 http://localhost:5656/sendMsgByStream2

org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers

消息回执

消费者收到消息后给发送方一个ACK确认,该如何做呢?

比如接收到消息后,返回给ArtisanSource.OUTPUT一个消息,直接使用@SendTo直接即可,就会将返回的字符串发送给ArtisanSource.OUTPUT通道

定义一个

package com.artisan.order.message;import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;public interface ArtisanSource {String OUTPUT = "MyMsgOutput";@Output(ArtisanSource.OUTPUT)MessageChannel output();
}

写一个该消息的接收方

package com.artisan.order.message;import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;/*** 接收方*/@Component
// Step1 注解  绑定刚才的接口
@EnableBinding(ArtisanSource.class)
@Slf4j
public class StreamReceive2 {// Step2  @StreamListener 绑定对象的名称@StreamListener(ArtisanSource.OUTPUT)public void processStreamMsg2(String msg){log.info("OUTPUT StreamReceive: {}",msg);}}

启动微服务,访问 http://localhost:5656/sendMsgByStream2

2019-04-13 18:06:51.817  INFO 972 --- [nio-5656-exec-1] com.artisan.order.message.StreamReceive  : INPUT StreamReceive: OrderDTO(orderId=11111, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null, orderAmount=9999, orderStatus=null, payStatus=null, orderDetailList=null)
2019-04-13 18:06:51.823  INFO 972 --- [nio-5656-exec-1] c.artisan.order.message.StreamReceive2   : OUTPUT StreamReceive: received OK

代码

https://github.com/yangshangwei/springcloud-o2o/tree/master/artisan_order

Spring Cloud【Finchley】- 21 Spring Cloud Stream 构建消息驱动微服务相关推荐

  1. Spring Cloud Streams Messaging消息驱动微服务实践

    作者:禅与计算机程序设计艺术 1.简介 消息驱动微服务是一个新的分布式架构模式,它基于异步通信和事件驱动的消息传递机制,通过轻量级的消息代理与集成框架实现分布式系统的解耦合.弹性伸缩和可靠性保证.Sp ...

  2. java B2B2C Springboot多租户电子商城系统-Spring Cloud Stream(消息驱动)

    1.什么是Spring Cloud Stream 愿意了解源码的朋友直接企鹅求求:二一四七七七五六三三 Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架.它可以基于 ...

  3. spring解耦_云端时代的解耦:使用Spring Cloud Azure构建云端原生微服务

    spring解耦 重要要点 云本机应用程序应充分利用云的优势,而不仅仅是迁移到云中 通过在云计算环境上运行,微服务与云原生环境并驾齐驱 集中配置,服务发现,异步消息驱动和分布式跟踪是微服务基础架构 S ...

  4. 微服务平台(Micro Service Platform : MSP)旨在提供一个集开发、测试、运维于一体的开发者专属平台,让开发者能快速构建或使用微服务,让开发更简单,让运维更高效。...

    微服务平台(Micro Service Platform : MSP)旨在提供一个集开发.测试.运维于一体的开发者专属平台,让开发者能快速构建或使用微服务,让开发更简单,让运维更高效. MSP采用业界 ...

  5. 如何构建成功的微服务架构?带你洞悉微服务构建流程,以实战角度出发,详解微服务架构

    前言 随着技术变得更加复杂,许多团队正在评估他们的架构如何最好地支持未来的业务.其中一种架构,微服务正在成为前瞻性技术部门越来越流行的选择.微服务架构可能是释放业务潜力的关键,但如何实现呢? 微服务是 ...

  6. nodejs 调用微服务器_无服务器NodeJS:构建下一个微服务的快速,廉价方法

    nodejs 调用微服务器 by Filipe Tavares 由Filipe Tavares 无服务器NodeJS:构建下一个微服务的快速,廉价方法 (Serverless NodeJS: the ...

  7. 项目是采用目前比较流行的 SpringBoot/SpringCloudAlibaba构建新零售微服务电商项目

    简介: 技术架构 项目是采用目前比较流行的 SpringBoot/SpringCloudAlibaba构建新零售微服务电商项目,从项目中台架构技术选型.模块设计.基础设施的构建.分布式解决方 案.互联 ...

  8. jq项目如何启服务_用小项目详解我们应该如何去构建我们的微服务

    前言 关于微服务的概念,说到底,很多人看了之后会认为没有什么意思,因为没有实际的东西说明,即使每个概念都明白了,也很难付之实践.所以这次,我来用一个实际的例子去说明,在实际的项目过程中我们会如何去构建 ...

  9. 面对微服务的N种坑,我们需要构建综合的微服务治理能力

    这几年微服务的热度持续居高不下,企业纷纷向微服务架构转型.但在微服务落地时,大家更多是在技术架构层面发力,以为所谓的微服务化就是简单的引入一套微服务框架,却忽略了微服务架构带来的影响是全方位的,它会对 ...

最新文章

  1. edge浏览器如何把网页放到桌面_最强桌面浏览器
  2. Vue 学习 第六天学习笔记
  3. ActionBar之style出现Cannot resolve symbol 'Theme' 错误
  4. nginx源码分析—模块及其初始化
  5. todo项目开发_Facebook的TODO项目,巴西的Coursera,Drupal等
  6. python xlwt设置单元格的自定义背景颜色
  7. # JDK7+ MethodHandle
  8. 【人脸表情识别】基于matlab GUI稀疏表示人脸表情识别【含Matlab源码 786期】
  9. 双击计算机显示远程调用失败,win10提示远程调用过程失败且未执行的修复办法...
  10. 基于JavaEE的人力资源管理系统的设计与实现任务书与开题报告
  11. excel文件运行报错(xx.xlsx)不是有效的win32应用程序
  12. Java的JDK在哪里下载,如何下载?
  13. cors数据类型_CORS账号和南方RTK连接怎样操作使用?步骤说明
  14. 18岁少年盗取1500万日元萌乃币, 逼交易所关停, 引发日本史上第一次加密币盗窃法律诉讼...
  15. Mathematica13.1的安装与使用配置
  16. 如何在Excel表格中给某一列添加固定筛选项
  17. ESP8266-Arduino网络编程实例-WiFi连接丢失解决方法
  18. Chrome浏览器自带截长图功能,只需两个快捷键!
  19. matlab 函数,matlab 语法1
  20. 一、python入门整体快速学习

热门文章

  1. 小型邮件服务器,windows 2003 server小型邮件服务器架设
  2. 各数据结构算法时间复杂度图【笔记自用】
  3. Tensorflow实现MNIST数据自编码(1)
  4. 卷积神经网络(CNN)模型结构
  5. Hadoop 在关机重启后,namenode启动报错
  6. 推荐30个用于微服务的顶级工具
  7. Kafka剖析(一):Kafka背景及架构介绍--转
  8. GitHub上那些值得一试的JAVA开源库--转
  9. 高并发高流量网站架构详解--转载
  10. 大数据读书笔记(2)-流式计算