Spring Cloud Stream是一个构建消息驱动的微服务应用框架,它使用Binder和消息中间件建立联系,我们在使用的时候不需要关心我们到底是使用的是RabbitMQ还是Kafka,因此我们可以在消息中间件中随意切换。

1、依赖

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId><version>3.1.3</version>
</dependency>

2、生产者

2.1    yml配置

server:port: 9001servlet:context-path: /spring:application:name: cloud-stream-provider
#  rabbitmq:   # rabbitmq配置 和 下面的 两种 都行
#    host: localhost # 服务器IP
#    port: 5672  # 服务器端口
#    username: guest # 用户名
#    password: guest # 密码
#    virtual-host: / # 虚拟主机地址cloud:stream:binders:defaultRabbit:type: rabbitenvironment:spring:rabbitmq:addresses: localhost # 服务器IPport: 5672  # 服务器端口username: guest # 用户名password: guest # 密码bindings: #服务的整合处理send-out-0: #生产者通道的名称 需和生产者 代码里面 通道名称一致destination: studyExchange #studyExchange  #这个名字是一个通道的名称content-type: application/json #设置消息类型,本次为json,文本则设置"text/plain"eureka:client:#表示是否将自己注册进EurekaServer 默认trueregister-with-eureka: true#是否从EurekaServer抓取已有的注册信息,默认为true。单节点无所谓。集群必须设置true才能配合ribbon使用负载均衡fetch-registry: trueservice-url:#设置与Eureka Server交互的地址查询服务和注册服务都需依赖这个地址defaultZone: http://localhost:8001/eureka
#      defaultZone: http://IP:8001/eureka,http://IP:8002/eureka   #集群版instance:instance-id: stream9001 #在消息列表显示主机的名称prefer-ip-address: true #访问的路径变为IP地址lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔(默认30秒)lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔(默认90秒)

2.2 生产者实现

//3.x之前版本使用,现在已过时,官方不推荐使用
//@EnableBinding(Source.class)
@Component
public class MsgProviderImpl implements MsgProvider {//3.x之前版本使用,现在已过时,官方不推荐使用
//    @Autowired
//    private MessageChannel output;
//    @Override
//    public String send() {
//        String serial = IdUtil.simpleUUID();
//        System.out.println("******发送了msg:" + serial);
//        output.send(MessageBuilder.withPayload(serial).build());
//        return serial;
//    }//这里直接装配一个桥 用来连接rabbit或者kafka@AutowiredStreamBridge streamBridge;@Overridepublic String send() {String serial = IdUtil.simpleUUID();//这里说明一下这个 streamBridge.send 方法的参数 第一个参数是exchange或者topic 就是主题名称//默认的主题名称是通过//输入:    <方法名> + -in- + <index>//输出:    <方法名> + -out- + <index>//这里我们接收的时候就要用send方法 参数是consumer<String>接收  详情看9002的controller//consumer的参数类型是这里message的类型// 生产者的通道名称须与yml中配置的属性一致String SMS_OUTPUT = "send-out-0";streamBridge.send(SMS_OUTPUT, serial);System.out.println("************发送了msg:"+serial);return serial;}
}

3、消费者

3.1 yml配置

server:port: 9002servlet:context-path: /spring:application:name: cloud-stream-consumerrabbitmq:host: localhost # 服务器IPport: 5672  # 服务器端口username: guest # 用户名password: guest # 密码virtual-host: / # 虚拟主机地址cloud:stream:bindings: #服务的整合处理send-in-0: #消费者通道的名称destination: studyExchange #studyExchange  自定义一个通道的名称content-type: application/json #设置消息类型,本次为json,文本则设置"text/plain"group: ijustecA  # 自定义 分组名称eureka:client:#表示是否将自己注册进EurekaServer 默认trueregister-with-eureka: true#是否从EurekaServer抓取已有的注册信息,默认为true。单节点无所谓。集群必须设置true才能配合ribbon使用负载均衡fetch-registry: trueservice-url:#设置与Eureka Server交互的地址查询服务和注册服务都需依赖这个地址defaultZone: http://localhost:8001/eureka
#      defaultZone: http://IP:8001/eureka,http://IP:8002/eureka   #集群版instance:instance-id: stream9002 #在消息列表显示主机的名称prefer-ip-address: true #访问的路径变为IP地址lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔(默认30秒)lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔(默认90秒)

3.2 消费者实现

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import java.util.function.Consumer;/*** @author liukj* @date 2022/4/8 11:24*/
@Component
//3.x之前版本使用,现在已过时,官方不推荐使用
//@EnableBinding(Sink.class)
public class ReceiveMessageController {@Value("${server.port}")private String serverPort;//3.x之前版本使用,现在已过时,官方不推荐使用
//    @StreamListener(Sink.INPUT)
//    public void input(Message<String> message){
//        System.out.println("我是消费者--------->: "+message.getPayload() + "\t port: " + serverPort);
//    }/*** //这里接收rabbitmq的条件是参数为Consumer 并且 方法名和supplier方法名相同* //这里的返回值是一个匿名函数 返回类型是consumer 类型和提供者的类型一致* //supplier发送的exchange是 send-in-0 这里只需要用send方法名即可* //1、在stream3.1中 我们不需要像以前一样用@Binding @StreamListener来监听了 这样少写了很多代码和配置 我们可以使用StreamBrige来进行发送* //2、StreamBrige.send() 方法的参数拼写规则:* //可以直接看官方文档[命名规则][https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_binding_and_binding_names]* //输入:    <方法名> + -in- + <index>* //输出:    <方法名> + -out- + <index>* //3、接收的时候直接用前面的方法名即可* @return*/@BeanConsumer<String> send() {return str -> {System.out.println("我是消费者--------->: "+str + "\t port: " + serverPort);};}
}

Spring Cloud Stream Rabbit 3.1.3 入门实践相关推荐

  1. 【本人秃顶程序员】使用Spring Cloud Stream和RabbitMQ实现事件驱动的微服务

    ←←←←←←←←←←←← 快!点关注 让我们展示如何使用Spring Cloud Stream来设计事件驱动的微服务.首先,Spring Cloud Stream首先有什么好处?因为Spring AM ...

  2. Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)

    应用场景 之前我们已经通过<Spring Cloud Stream消费失败后的处理策略(一):自动重试>一文介绍了Spring Cloud Stream默认的消息重试功能.本文将介绍Rab ...

  3. Spring Cloud Stream消费失败后的处理策略(三):使用DLQ队列(RabbitMQ)

    应用场景  前两天我们已经介绍了两种Spring Cloud Stream对消息失败的处理策略: 自动重试:对于一些因环境原因(如:网络抖动等不稳定因素)引发的问题可以起到比较好的作用,提高消息处理的 ...

  4. Spring Cloud Stream 使用延迟消息实现定时任务(RabbitMQ)

    应用场景 我们在使用一些开源调度系统(比如:elastic-job等)的时候,对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次.然而实际业务中还存在另外一种定时 ...

  5. RabbitMQ 整合 Spring Cloud Stream

    https://git.coding.net/dgutllx/RabbitmqStudy.git Spring Cloud Stream 整体架构核心概念图: Middleware 消息中间件 Spr ...

  6. Spring Cloud Stream与RabbitMQ整合时Producer与Consumer的相关配置

    生产者属性 下面的属性都必须添加前缀: spring.cloud.stream.<rabbitName>.bindings.<channelName>.producer. 如果 ...

  7. 【进阶技术】一篇文章搞掂:Spring Cloud Stream

    本文总结自官方文档http://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.0.RC3/single/spring-clou ...

  8. Spring Cloud Stream如何消费自己生产的消息?

    在上一篇<Spring Cloud Stream如何处理消息重复消费?>中,我们通过消费组的配置解决了多实例部署情况下消息重复消费这一入门时的常见问题.本文将继续说说在另外一个被经常问到的 ...

  9. Spring Cloud Stream如何消费自己生产的消息

    在上一篇<Spring Cloud Stream如何处理消息重复消费>中,我们通过消费组的配置解决了多实例部署情况下消息重复消费这一入门时的常见问题.本文将继续说说在另外一个被经常问到的问 ...

最新文章

  1. sklearn计算两个向量之间的距离
  2. Mysql优化之开山篇
  3. java guice_java – Guice:如何为一个类型获得多个@Provides?
  4. ACM主要赛考察内容
  5. JDK的下载与安装eclipse的下载与安装
  6. 前端基础-HTML标记语言
  7. 区间覆盖全部类型及部分精选习题汇总详解(贪心策略)
  8. php友情链接大于3换行,细节见真情 PHPWind v5.3友情链接功能改进
  9. AVC与H264 区别
  10. Linux 下压缩与解压.zip和.rar
  11. Trie树讲解(例题:ACWING 835,ACWING 143)
  12. android+4.0访问网络,Android 中从4.0以后无法在主线程访问网络的解决办法。
  13. Google发布了Google Sketchup,完全免费
  14. 融云 SDK 5.0.0 功能迭代
  15. Tkinter教程之Button篇
  16. layui怎么设置select默认选中,修改回显
  17. Win10家庭版使用gpedit.msc方法
  18. 高德地图API定位失败 浏览器定位 IP定位
  19. 不同参数对分类模型性能影响记录
  20. 大数据与人工智能论文作业

热门文章

  1. docker -v :rw :ro
  2. 【macOS Catalina 10.15.X(19xx)原版镜像合集】
  3. 一个未知的项目被声明为你的MXML文件的根。切换到源代码模式加以纠正。
  4. linux克隆tf卡中的内容,TF/SD内存卡数据克隆怎样做图文详细教程
  5. K-means与DBSCAN聚类算法
  6. QT 中改变文字颜色 字体 形状
  7. 长生诀linux架设教程,手游【长生诀】VM一键即玩服务端+GM工具+图文教程
  8. 倩女幽魂手游服务器维护时间,倩女幽魂手游12月29日在线维护公告
  9. 第45期:动态规划-背包问题
  10. 王垠:对博士学位说永别