Spring Cloud全集文章目录:

零、什么是微服务?一看就会系列!

一、手把手教你搭建SpringCloud项目(一)图文详解,傻瓜式操作

二、手把手教你搭建SpringCloud项目(二)生产者与消费者

三、手把手教你搭建SpringCloud项目(三)集成Eureka服务注册中心

四、手把手教你搭建SpringCloud项目(四)Eureka集群版搭建

五、手把手教你搭建SpringCloud项目(五)生产者集群版搭建

六、手把手教你搭建SpringCloud项目(六)Eureka实现服务发现

七、手把手教你搭建SpringCloud项目(七)集成Consul服务注册中心

八、手把手教你搭建SpringCloud项目(八)集成Ribbon负载均衡器

九、手把手教你搭建SpringCloud项目(九)集成OpenFeign服务接口调用

十、手把手教你搭建SpringCloud项目(十)集成Hystrix之服务降级

十一、手把手教你搭建SpringCloud项目(十一)集成Hystrix之服务熔断

十二、手把手教你搭建SpringCloud项目(十二 )集成Hystrix之图形化Dashboard实时监控

十三、手把手教你搭建SpringCloud项目(十三 )集成Gateway新一代网关

十四、手把手教你搭建SpringCloud项目(十四 )集成Config分布式配置中心

十五、手把手教你搭建SpringCloud项目(十五)集成Bus消息总线

十六、手把手教你搭建SpringCloud项目(十六)集成Stream消息驱动

十七、手把手教你搭建SpringCloud项目(十七)集成Sleuth分布式链路跟踪

文章继续更新中,欢迎点赞关注!

一、消息驱动概述

1. 消息驱动是什么?

在实际应用中有很多消息中间件,比如现在企业里常用的有ActiveMQ、RabbitMQ、RocketMQ、Kafka等,学习所有这些消息中间件无疑需要大量时间经历成本,那有没有一种技术,使我们不再需要关注具体的消息中间件的细节,而只需要用一种适配绑定的方式,自动的在各种消息中间件内切换呢?消息驱动就是这样的技术,它能屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型

SpringCloud Stream是一个构件消息驱动微服务的框架。应用程序通过inputs和outputs来与SpringCloud Stream中的绑定器(binder)对象交互,通过配置来绑定,而SpringCloud Stream的绑定器对象负责与消息中间件交互,所以,我们只需要搞清楚如何与SpringCloud Stream交互就可以方便使用消息驱动的方式。但是截至到目前时间,SpringCloud Stream目前仅支持RabbitMQ和Kafka

2. 设计思想

在经典的消息队列中,生产者/消费者之间靠消息媒介传递信息内容,消息必须走特定的通道Message Channel,消息通道里的子接口Subscribable Channel消费消息,然后MessageHandler负责收发处理。

在SpringCloud Stream中,通过定义绑定器(binder)作为中间层,实现了应用程序与消息中间件细节之间的隔离。在消息绑定器中,INPUT对应于消费者,OUTPUT对应于生产者,Stream中的消息通信方式遵循了发布—订阅模式:用Topic(主题)进行广播(RabbitMQ中对应于Exchange交换机,Kafka中就是Topic)。

3. SpringCloud Stream编码API和常用注解

组成 说明
Middleware 中间件,目前只支持RabbitMQ和Kaf
Binder Binder是应用与消息中间件之间的封装,目前实行了RabbitMQ和Kafka的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现
@Input 注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListner 监听队列,用于消费者的队列的消息接收
@EnableBinding 使信道Channel和交换机/主题(Exchange/Topic)绑定在一起

二. SpringCloud Stream 案例实操

新建三个子模块分别对应于消息的生产者和消费者:

模块名 微服务功能
cloud-stream-rabbitmq-provider8801 生产者,发送消息模块
cloud-stream-rabbitmq-consumer8802 消费者,接收消息模块
cloud-stream-rabbitmq-consumer8803 消费者,接收消息模块

1. 消息驱动之消息生产者

新建Module:cloud-stream-rabbitmq-provider8801作为消息的生产者用来发送消息,在其POM文件中除引入web、actuator、eureka-client等必要启动器外,还需要引入SpringCloud Stream对应实现RabbitMQ的启动器依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

编写其配置文件application.yml:

server:port: 8801spring:application:name: cloud-stream-providercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理output: # 这个名字是一个通道的名称,OUTPUT表示这是消息的发送方destination: testExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit # 设置要绑定的消息服务的具体设置eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)instance-id: send-8801.com  # 在信息列表时显示主机名称prefer-ip-address: true     # 访问的路径变为IP地址

编写其主启动类后,然后编写业务类,在业务来中分别要编写发送消息接口及其实现类,并在发送接口消息的实现类中添加**@EnableBinding注解用来绑定消息的推送管道,消息生产者绑定的消息推送管道为org.springframework.cloud.stream.messaging.Source**:

package cn.sher6j.sprincloud.service;/*** 发送消息接口* @author sher6j* @create 2020-05-25-12:20*/
public interface IMessageProvider {public String send();
}
----------------------------------------------------------------------
package cn.sher6j.sprincloud.service.impl;import cn.sher6j.sprincloud.service.IMessageProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import java.util.UUID;/*** 发送消息接口实现类* @author sher6j* @create 2020-05-25-12:21*/
@EnableBinding(Source.class) //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider {@Autowiredprivate MessageChannel output; //消息发送管道@Overridepublic String send() {String serial = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());//发送消息System.out.println("========serial:" + serial);return null;}
}

注意我们在service的实现类中不再需要@Service注解,因为这个service不再是传统意义上的和Controller、DAO数据等进行交互的service,而是要绑定绑定器打交道的service。

然后编写其业务层的Controller:

@RestController
public class SendMessageController {@Autowiredprivate IMessageProvider messageProvider;@GetMapping("/sendMessage")public String sendMessage() {return messageProvider.send();}
}

启动服务注册中心后和RabbitMQ后,启动消息生产者微服务,我们在RabbitMQ的控制面板中可以看见多出了一个名为testExchange的交换机,这个交换机恰恰就是我们之前在配置文件中配置的交换机名字:

然后我们访问 http://localhost:8801/sendMessage 使用消息生产者微服务发送消息,在其微服务后台我们看到了打印的消息:

在RabbitMQ的控制面板中我们也看到了确实发送了消息。

2. 消息驱动之消息消费者

新建Module:cloud-stream-rabbitmq-consumer8802/8803作为消息的生产者用来接收消息,其POM文件中引入的启动器依赖和消息生产者微服务的依赖几乎相同,然后编写其配置文件application.yml,其配置文件的书写和消息生产者的几乎一致,特别需要注意的是,消息生产者微服务用到的通道为OUTPUT,而消息消费者微服务用到的通道为INPUT,其他的配置文件信息就只需要注意端口号、注册服务名的区别即可:

spring:cloud:bindings: input: # 这个名字是一个通道的名称,INPUT表示消息消费者

编写完主启动类,编写消息消费者的业务类,由于是消费者,所以只需要编写其Controller即可,在其Controller上同样需要添加**@EnableBinding注解用来绑定消息的推送管道,消息消费者绑定的消息推送管道为import org.springframework.cloud.stream.messaging.Sink,在接收消息的方法中需要使用@StreamListner**注解来监听其绑定的消息推送管道:

package cn.sher6j.springcloud.controller;import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;/*** @author sher6j* @create 2020-05-25-12:58*/
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageController {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message<String> message) {System.out.println("消费者" + serverPort + "号,收到消息:" + message.getPayload());}
}

然后启动消息发送消费者服务,用生产者发送消息,我们可以发现在消费者端可以成功接收到消息:

3. 分组消费和持久化

1. 重复消费问题

当生产者发送消息后,此时的我们的消费者都接受了消息并进行了消费,也就是说同一条消息被多个消息消费者所消费:

上述的问题就是消息的重复消费问题,那么这个问题为什么如此重要呢?其实重复消费这个问题本身不可怕,可怕的是没考虑到重复消费之后,怎么保证幂等性。幂等性,通俗的说,就一个数据,或者一个请求,重复很多次,需要确保对应的数据是不会改变的,不能出错)。分布式微服务应用为了实现高可用和负载均衡,实际上同一功能的服务都会部署多个具体的服务实例。举个例子,假设有一个系统,有一条消息要求往数据库里插入一条数据,要是这个消息重复消费两次,结果就是向数据库里插入了两条数据,这样数据就错了,就违背了幂等性原则,但是要是该消息消费到第二次的时候,可以判断一下已经消费过了,然后直接将该消息丢弃,这就实现了只插入一条数据,一条消息重复出现了两次,但是只有第一次真正被消费了,数据库里也就只插入了一条数据,这就保证了系统的幂等性。

上面简单的介绍了消息的重复消费问题,那如何解决这种重复消费问题呢,那就需要我们进行分组和持久化属性组操作,利用SpringCloud Stream中的消息分组来解决这个问题,需要注意的是在Stream中处于同一组中的多个消息消费者是竞争关系,也就是保证生产者所发送的同一个消息只会被其中一个消费者消费一次。在不同组的消费者是是可以对消息进行全面消费(重复消费)的,只有同一组内才会发生竞争关系。

在RabbitMQ中,默认分组是不同的,组流水号不一样,被认为不同组,我们查看testExchange交换机,可以发现8802和8803两个消息消费者处于不同的组,所以8801消息生产者发送的消息可以被这两个消费者重复消费:

2. 分组解决重复消费问题

上面在RabbitMQ控制面板中我们看到的组流水号是系统随机分配的,这样无疑不好控制,所以我们应该自定义配置分组,将8802/8803两个消息消费者微服务分为同一个组,以此来解决消息的重复消费问题,先来演示如何自定义分组。

在8802/8803微服务中的配置文件中分别添加组名属性:

spring:cloud:stream:bindings:input:group: A/B ## 分组名称

这里我们将8802设置为A组,8803设置为B组,然后我们将消息消费方的两个微服务重启,我们再次查看其组流水号,发现不再是长长的随机组流水号,而变成了我们自定义的分组:

此时由于8802/8803位于两个不同分组下,所以没有竞争关系,消息生产者发送消息后,仍然可以重复消费。

下面我们将这两个消息消费方微服务分到相同的消费组中,这样每次就只有一个消费者,消息生产者发送的消息只能被8802或8803其中一个接受到,这样就避免了重复消费,将8802和8803的分组名都改为A,再次重启两个消息消费方微服务,此时我们可以看到在分组A下已经有了两个消费者:

再用生产者发送5条消息,我们发现8802/8803分别消费了3条和2条不同的消息,而没有出现重复消费的问题:

由于时间原因就转发的其他小伙伴六甲横宝的文章,点击查看原文!

我们Spring Cloud Stream到这里就学习完毕了,下一篇文章我们学习Spring Cloud Stream分布式链路跟踪,持续学习,持续更新,下一节更精彩!欢迎朋友们点赞关注!感谢!

手把手教你搭建SpringCloud项目(十六)集成Stream消息驱动相关推荐

  1. 手把手教你搭建SpringCloud项目(十)集成Hystrix之服务降级

    Spring Cloud全集文章目录: 零.什么是微服务?一看就会系列! 一.手把手教你搭建SpringCloud项目(一)图文详解,傻瓜式操作 二.手把手教你搭建SpringCloud项目(二)生产 ...

  2. 手把手教你搭建SpringCloud项目(九)集成OpenFeign服务接口调用

    Spring Cloud全集文章目录: 零.什么是微服务?一看就会系列! 一.手把手教你搭建SpringCloud项目(一)图文详解,傻瓜式操作 二.手把手教你搭建SpringCloud项目(二)生产 ...

  3. 手把手教你搭建Maven项目

    大家好,我是雄雄,欢迎关注微信公众号[雄雄的小课堂]. 今天给大家分享的是"手把手教你买基金",奥!!不对,不好意思,是"手把手教你使用idea搭建Maven项目&quo ...

  4. 手把手教你搭建Jenkins+Jmeter+Ant自动化集成环境

    一.安装前准备 1.JDK:jdk-8u121-windows-x64 2.jmeter工具:apache-jmeter-2.13 3.ANT工具:apache-ant-1.9.7-bin 4.jen ...

  5. 六、手把手教你搭建SpringCloudAlibaba之Sentinel实现流量实时监控

    SpringCloud Alibaba全集文章目录: 零.手把手教你搭建SpringCloudAlibaba项目 一.手把手教你搭建SpringCloud Alibaba之生产者与消费者 二.手把手教 ...

  6. 手把手教你搭建SpringCloudAlibaba之Nacos服务配置中心

    SpringCloud Alibaba全集文章目录: 零.手把手教你搭建SpringCloudAlibaba项目 一.手把手教你搭建SpringCloud Alibaba之生产者与消费者 二.手把手教 ...

  7. 手把手教你搭建Hadoop生态系统伪分布式集群

    Hello,我是 Alex 007,一个热爱计算机编程和硬件设计的小白,为啥是007呢?因为叫 Alex 的人太多了,再加上每天007的生活,Alex 007就诞生了. 手把手教你搭建Hadoop生态 ...

  8. 手把手教你搭建属于自己的技术博客(小白教程)

    手把手教你搭建属于自己的技术博客 先放上我自己搭建的博客地址 https://sourl.cn/Tbk7yt 这里我使用的是 Hexo 框架 以及 Github pages 进行的搭建 文章目录 手把 ...

  9. 手把手教你搭建 ELK 实时日志分析平台

    来自:武培轩 本篇文章主要是手把手教你搭建 ELK 实时日志分析平台,那么,ELK 到底是什么呢? ELK 是三个开源项目的首字母缩写,这三个项目分别是:Elasticsearch.Logstash ...

最新文章

  1. 使用Spring Boot和DJL进行深度学习
  2. Java环境搭建若干问题
  3. Docker logs 查看实时日志(日志最后的N行、某刻后日志)
  4. Java设计模式学习总结(11)——结构型模式之装饰器模式
  5. [FFmpeg] 多个图片合成视频
  6. 数据库开发工程中,一些不常遇到的难题
  7. linux 下的按键精灵 xdotool
  8. 计算机电磁泄露案例,电磁泄漏
  9. Angular之生命周期函数
  10. Eclipse BIRT报表开发工具安装教程
  11. ps cc2019版为什么做图一复制图层就卡死_你所不知道的十个被藏起来的PS功能,超级实用!...
  12. mac录屏如何把声音录进去?
  13. python实现QQ邮件的自动收发
  14. python中双向索引_Python 字典支持双向索引。Python 集合也支持双向索引
  15. 2018年互联网公司市值排名
  16. 对不起,我爱你黄陈晨
  17. Android系统修改无操作进入屏保页
  18. 示例:Linux应用程序遍历当前系统的PCI设备
  19. 数据库时间出现'0000/00/00',难道我穿越了?
  20. 中信银行信用卡中心算法工程师 校招一面面经

热门文章

  1. [微信] 微信网页版扫码登录的实现
  2. 微信公众号官方文档入口
  3. 警察蜀黍,这有个装正经又不正经的App!
  4. 计算机我们一起学猫叫谱子,一起喵喵喵喵喵|〈学猫叫〉/小潘潘 小峰峰 尤克里里曲谱...
  5. 机器学习和深度学习综述
  6. 简单HQL练习-统计店铺访客数
  7. 场效应管(MOS管)
  8. 谷歌生物医学专用翻译_一个可以快速翻译浏览英文文献的工具,拿走不谢!
  9. 关于EasyX和graphics.h的那些事(上)
  10. C语言编程答案保留三位小数,如何用c语言求倒数,保留3位有效数字