为啥有这个技术???

1. 这个stream是操作消息队列的,简化,学习消息队列的成本降低。
2. 可操作rabbitMQ兔子message queue,kafaka,可理解为jdbc可操作oracle, mysql..
3. spring家的技术学就完了。。

stream

  • 对消息驱动需要了解的概念
  • 消息驱动生产者,消费者
  • 再建一个服务,消息消费者,问题

对消息驱动需要了解的概念

(1) 网站 文档
https://spring.io/projects/spring-cloud-stream#overview

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.1.RELEASE/reference/html/


(2) 介绍

什么是SpringCloudStream
官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。
应用程序通过inputs或者outputs与Spring Cloud Stream中binder对象交互。
通过我们配置来binding(绑定),
而Spring Cloud Stream的binder对象负责与消息中间件交互。
所以,我们只需要搞清楚如何与Spring Cloud Stream
交互就可以访便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,
引用了发布-订阅、消费组、分区的三个核心概念。
目前仅支持RabbitMQ、Kafka.

(3) 标准mq

  • Message

    • 生产者/消费者之间靠消息媒介传递信息内容
  • 消息通道MessageChannel

    • 消息必须走特定的通道
  • 消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器订阅

    • 消息通道里的消息如何被消费呢,谁负责收发处理

(4) 为什么用Cloud Stream

1 绑定stream凭什么可以统一底层差异

  • 在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,
    于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性
    通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
    通过向应用程序暴露统- -的Channel通道, 使得应用程序不需要再考虑各种不同的消息中间件实现。

2 binder架构

INPUT对应于消费者
OUTPUT对应于生产者

(5) Stream中的消息通信方式遵循了发布-订阅模式

Topic主题进行广播
在RabbitMQ就是Exchange 交换机
在kafka中就是Topic

(6) 常用api,注解

消息驱动生产者,消费者

  1. 生产者
    pom
        <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>

ymal

server:port: 8802spring:application:name: cloud-stream-consumercloud:stream:binders: # 在此处配置要绑定的rabbitmq的服务信息;defaultRabbit: # 表示定义的名称,用于于binding整合type: rabbit # 消息组件类型environment: # 设置rabbitmq的相关的环境配置spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestbindings: # 服务的整合处理input: # 这个名字是一个通道的名称destination: studyExchange # 表示要使用的Exchange名称定义content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”binder: defaultRabbit  # 设置要绑定的消息服务的具体设置eureka:client: # 客户端进行Eureka注册的配置service-url:defaultZone: http://localhost:7001/eurekainstance:lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)instance-id: receive-8802.com  # 在信息列表时显示主机名称prefer-ip-address: true     # 访问的路径变为IP地址

发送消息接口,实现

public interface IMessageProvider
{public String send();
}/// 实现
import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.integration.support.MessageBuilder;
import javax.annotation.Resource;
import org.springframework.cloud.stream.messaging.Source;import javax.annotation.Resource;
import java.util.UUID;@EnableBinding(Source.class) //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider
{@Resourceprivate MessageChannel output; // 消息发送管道@Overridepublic String send(){String serial = UUID.randomUUID().toString();output.send(MessageBuilder.withPayload(serial).build());System.out.println("*****serial: "+serial);return null;}
}
  1. 消费者
    yaml, pom同时,yaml要改端口。
    定义controller接收消息
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {@Value("${server.port}")private String serverPort;@StreamListener(Sink.INPUT)public void input(Message<String> message) {System.out.println("消费者1号,接受:"+message.getPayload()+"\t port:"+serverPort);}}

再建一个服务,消息消费者,问题

1. 消息重复。发送者发送消息,两个消费者都会接收到消息,如果是支付多个模块,
收到一条消息,多个模块会收到坏账,需要分组,只对一个支付模块发消息.
2. 消息持久化。当关掉消费者,消息丢失。

a. 新增group配置,自定义group

group: damn

b. 持久化,服务挂了,保证消息不丢失

页数配置分组解决

SpringCloud Stream消息驱动相关推荐

  1. SpringCloud2020学习笔记13——SpringCloud Stream消息驱动

    目录 一.消息驱动概述 1.简介 2.官网 2.设计思想 ① 标准MQ ② 为什么用Cloud Stream ③ Stream中的消息通信方式遵循了发布-订阅模式 3.Spring Cloud Str ...

  2. SpringCloud微服务架构,Config 分布式配置中心,Bus 消息总线, Stream 消息驱动,Sleuth+Zipkin 链路追踪

    Config分布式配置中心 Config 概述 概述 • Spring Cloud Config 解决了在分布式场景下多环境配置文件的管理和维护. • 好处: • 集中管理配置文件 • 不同环境不同配 ...

  3. 手把手教你搭建SpringCloud项目(十六)集成Stream消息驱动

    Spring Cloud全集文章目录: 零.什么是微服务?一看就会系列! 一.手把手教你搭建SpringCloud项目(一)图文详解,傻瓜式操作 二.手把手教你搭建SpringCloud项目(二)生产 ...

  4. SpringCloud(十一)Bus消息总线、Stream消息驱动

    一.Bus消息总线 需求:分布式自动刷新配置功能: 解决:SpringCloud Bus配合Spring cloud Config使用可以实现配置的动态刷新. 1.概述 定义:Spring Cloud ...

  5. Spring Cloud Stream消息驱动

    一.Spring Cloud Stream消息驱动 1.为什么引入cloud Stream MQ(消息中间件) ActiveMQ RabbitMQ RocketMQ Kafka 问题:可能中间java ...

  6. (十七)Java springcloud B2B2C o2o多用户商城 springcloud架构-消息驱动 Spring Cloud Stream...

    在使用spring cloud云架构的时候,我们不得不使用Spring cloud Stream,因为消息中间件的使用在项目中无处不在,我们公司后面做了娱乐方面的APP,在使用spring cloud ...

  7. SpringCloud (十一) --------- Stream 消息驱动框架

    目录 一.Stream 概述 二.Stream 重要概念 三.Stream 应用 四.Stream 自定义消息通道 五.Stream 分组与持久化 六.Stream 设置路由键 一.Stream 概述 ...

  8. 【Java开发】Spring Cloud 10 :Stream消息驱动

    官方定义Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架.它为一些供应商的消息中间件产品提供了个性化的自动化配置实现,Spirng Cloud Stream 本质上 ...

  9. SpringCloud学习笔记 - 消息驱动 - Spring Cloud Stream

    1. stream为什么被引入 常见MQ(消息中间件): ActiveMQ RabbitMQ RocketMQ Kafka 有没有一种新的技术诞生,让我们不再关注具体MQ的细节,我们只需要用一种适配绑 ...

最新文章

  1. 面试:什么是序列化,怎么序列化,为什么序列化,反序列化会遇到什么问题,如何解决?...
  2. call dword prt[eax]
  3. BigData:根据最新2018人工智能行业创新企业Top100名单,绘制AI行业VC最AI的热门领域(投资领域)
  4. java读写锁降级_java的读写锁中锁降级的问题
  5. HTTPClient系统学习
  6. BLE-NRF51822教程1-常用概念
  7. Leetcode--50. Pow(x,y)
  8. python的十大算法_Python十大排序算法
  9. win7 中出现“为了配置TCP/IP,必须安装并启动网络适配卡“问题的解决办法
  10. 本地apk安装是什么意思_Sony电视安装第三方播放器
  11. susmote个人网站博客论坛(TexTec | 关注互联网技术,传播极客精神)
  12. 由数据库连接池想到的----处理他人未释放的资源
  13. 编译内核_Linux内核编译(自己实现的网卡上面测试c1000k案例)
  14. 天涯明月刀最新服务器,天涯明月刀最新开服时间表 | 手游网游页游攻略大全
  15. Marlin代码分析一些记录
  16. Python语言入门这一篇就够了-学习笔记(十二万字)
  17. 【英语阅读】纽约时报 | 在纽约,几乎每个人身边都有人感染病毒
  18. (原创)android6.0系统 PowerManager深入分析
  19. css怎么随着鼠标移动,利用CSS sprites制作随着鼠标移动的动画背景
  20. 「JCVI」如何筛选得到最优blast比对结果?

热门文章

  1. 前端学习(1351)模板引擎
  2. 前端学习(1151):let经典面试题1
  3. 前端学习(672):if-else
  4. STM32 网络通信Web Server中 SSI与CGI的应用解析
  5. java setdaemon_Java ThreadGroup setDaemon()方法
  6. 基于Colab Pro Google Drive的Kaggle实战
  7. linux gst-launch 播放视频旋转,【视频开发】Gstreamer中一些gst-launch常用命令
  8. Rds基于mysql开发的_开发云数据库RDS MYSQL版讲解
  9. uilabel 自适应
  10. [调试]Asp.Net常见问题