引入依赖

        <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId> <!-- or '*-stream-kafka' --></dependency>
复制代码

消息生产/消费

生产者

首先定义一个接口:

/*** 使用 stream 首先定义一个接口*/
public interface StreamClient {String MSG = "myStreamMsg"; //发送消息的队列名称String RETURN_MSG = "returnMsg"; //消费消息成功返回消息的队列名称@Output(MSG)MessageChannel output();@Input(RETURN_MSG)SubscribableChannel input();// MessageChannel 发送消息类型// SubscribableChannel 订阅消息类型
}
复制代码

生产者:

/*
这里采用的 controoller 的方式来测试发送消息*/
@RestController
@EnableBinding(StreamClient.class)  // 这里必须要绑定接口,否者就会启动报错,查找不到 streamClient
public class StreamSenderController {@Autowiredprivate StreamClient streamClient;// 测试 stream 发送方法/*    @GetMapping("sendMsg")public void send() {streamClient.output().send(MessageBuilder.withPayload(new Date().toString()).build());}@GetMapping("sendDate")public void send1() {streamClient.output().send(MessageBuilder.withPayload(new Date()).build());}*/@GetMapping("sendPerson")public void send2() {Person person = new Person();person.setName("Berg");person.setSex("man");streamClient.output().send(MessageBuilder.withPayload(person).build()); // 发送消息}@StreamListener(value = StreamClient.RETURN_MSG)public void returnMsg(String msg) {System.out.println("reurn message:"+msg);} // 消费消息后的返回
}
复制代码

消费者

首先定义一个接口:

/**java* 使用 stream 首先定义一个接口*/
public interface StreamClient {String MSG = "myStreamMsg";  // 消费消息队列String RETURN_MSG = "returnMsg"; // 返回队列@Input(MSG)SubscribableChannel input();@Output(RETURN_MSG)MessageChannel output();}
复制代码

消费者:

@Component
@EnableBinding(StreamClient.class)  // 这里同样需要绑定接口
public class StreamReceiver {private final Logger logger = LoggerFactory.getLogger(StreamReceiver.class);/*    @StreamListener(value = "myStreamMsg")public void receive(String msg) {logger.info("receive:{}",  msg);}@StreamListener(value = "myStreamMsg")public void receive1(Date date) {SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");logger.info("receive:{}",  dateFormat.format(date));}*/@StreamListener(StreamClient.MSG)  // 监听消费消息队列@SendTo(StreamClient.RETURN_MSG) // 返回的消息队列public String  receive2(Person p) {logger.info("name:{}",p.getName());logger.info("sex:{}",p.getSex());return "received message";}
}
复制代码

问题

重复订阅问题:

通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理(出现上述重复消费问题)。但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能。

解决方案: 在消费者配置文件中添加如下配置

spring:cloud:stream:bindings:myStreamMsg: # 消息队列名称group: one # 如果这里不定义分组,就会出现多个实例重复消费的问题
复制代码

消息类型问题:

在发送消息时有可能时对象,也有可能时文本。因此不同类型之间的转换可能会出现问题。默认为 json 类型 解决方案: 在生产者配置文件中配置消息类型

spring:stream:bindings:myStreamMsg:Content-Type: application/json  // 配置类型
复制代码

代码

github地址

Spring Cloud Stream 简单使用相关推荐

  1. 一文了解Spring Cloud Stream体系

    点击蓝色"程序猿DD"关注我哟 加个"星标",不忘签到哦 来源:阿里巴巴中间件 Spring Cloud Stream 在 Spring Cloud 体系内用于 ...

  2. Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

    应用场景 有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的: ...

  3. Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)

    应用场景 之前我们已经通过<Spring Cloud Stream消费失败后的处理策略(一):自动重试>一文介绍了Spring Cloud Stream默认的消息重试功能.本文将介绍Rab ...

  4. Spring Cloud Stream消费失败后的处理策略(三):使用DLQ队列(RabbitMQ)

    应用场景  前两天我们已经介绍了两种Spring Cloud Stream对消息失败的处理策略: 自动重试:对于一些因环境原因(如:网络抖动等不稳定因素)引发的问题可以起到比较好的作用,提高消息处理的 ...

  5. Spring Cloud Stream消费失败后的处理策略(二):自定义错误处理逻辑

    应用场景  上一篇<Spring Cloud Stream消费失败后的处理策略(一):自动重试>介绍了默认就会生效的消息重试功能.对于一些因环境原因.网络抖动等不稳定因素引发的问题可以起到 ...

  6. Spring Cloud Stream消费失败后的处理策略(一):自动重试

    之前写了几篇关于Spring Cloud Stream使用中的常见问题,比如: 如何处理消息重复消费? 如何消费自己生产的消息? 下面几天就集中来详细聊聊,当消息消费失败之后该如何处理的几种方式. 不 ...

  7. Spring Cloud Stream 使用延迟消息实现定时任务(RabbitMQ)

    应用场景 我们在使用一些开源调度系统(比如:elastic-job等)的时候,对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次.然而实际业务中还存在另外一种定时 ...

  8. Spring Cloud Stream 体系及原理介绍

    https://mp.weixin.qq.com/s/e_pDTFmFcSqHH-uSIzNmMg Spring Cloud Stream 在 Spring Cloud 体系内用于构建高度可扩展的基于 ...

  9. 谷歌gcp 远程计算机_引导性GCP:带有Google Cloud Pub / Sub的Spring Cloud Stream

    谷歌gcp 远程计算机 我最近在Sprint Central的工程博客上阅读了Josh Long的Bootiful GCP系列 ,特别喜欢关于使用Google Cloud的Pub / Sub的第四部分 ...

  10. 引导性GCP:带有Google Cloud Pub / Sub的Spring Cloud Stream

    我最近在Sprint Central的工程博客上阅读了Josh Long的Bootiful GCP系列 ,特别喜欢关于使用Google Cloud的Pub / Sub的第四部分 . 我受到该系列的启发 ...

最新文章

  1. 漫谈视频目标跟踪与分割
  2. ISA Server 2006 的内部客户端概念
  3. [Hibernate]在VS2010中应用NHibernate 3.2与MySQL
  4. CentOS中安装WiFi图形管理工具
  5. 用美颜照当广告犯法!要么就标注“照骗”,挪威针对明星网红出手了
  6. SpringBoot - Spring Boot 应用剖析
  7. android中xmlns:tools属性详解
  8. 利用Excel VBA批量计算长时间序列植被物候动态阈值(逐像元)
  9. leetcode 376. 摆动序列 思考分析
  10. [转载]oracle定时器
  11. python访问序列元素的编号用什么括起来_【填空题】序列元素的编号称为 ,它从 开始,访问序列元素时将它用 括起来。...
  12. python变量图片_在Python中向3D图添加第4个变量
  13. 了解计算机网络拓扑结构,认识计算机网络拓扑结构
  14. 迎建国七十周年,Linux厂商巡礼之优麒麟
  15. 用python编写决策树算法_详细介绍python实现决策树C4.5算法
  16. Edge浏览器如何关闭金山毒霸安全主页.
  17. 迅雷下不了php文件怎么打开方式,如何解决迅雷打不开php文件的问题
  18. 求两者较大值的max函数的用法(c++基础)
  19. Linux必备工具————虚拟机
  20. html图片十字形,CSS3 十字架

热门文章

  1. 关于何种情况下使用DataGrid、DataList或Repeater的一些讨论(1) ambushaa [翻译] [转]
  2. vue 中使用axios的总结
  3. Android实现圆角和圆形
  4. SpringBoot中多种Filter配置方式
  5. wxml、wxss、js 引入外部文件的方法
  6. C# 计算程序运行耗时的方法
  7. PHP 删除文件,文件下的目录
  8. git clone 失败_鲜为人知的Git功能——Git Worktree工作树
  9. 蚂蚁如果上市成功,价格崩盘是必然的
  10. 没有安装gcc,导致提示configure cannot guess build type; you must specify one