Spring Cloud Stream 简单使用
引入依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId> <!-- or '*-stream-kafka' --></dependency>
复制代码
消息生产/消费
生产者
首先定义一个接口:
/*** 使用 stream 首先定义一个接口*/
public interface StreamClient {String MSG = "myStreamMsg"; //发送消息的队列名称String RETURN_MSG = "returnMsg"; //消费消息成功返回消息的队列名称@Output(MSG)MessageChannel output();@Input(RETURN_MSG)SubscribableChannel input();// MessageChannel 发送消息类型// SubscribableChannel 订阅消息类型
}
复制代码
生产者:
/*
这里采用的 controoller 的方式来测试发送消息*/
@RestController
@EnableBinding(StreamClient.class) // 这里必须要绑定接口,否者就会启动报错,查找不到 streamClient
public class StreamSenderController {@Autowiredprivate StreamClient streamClient;// 测试 stream 发送方法/* @GetMapping("sendMsg")public void send() {streamClient.output().send(MessageBuilder.withPayload(new Date().toString()).build());}@GetMapping("sendDate")public void send1() {streamClient.output().send(MessageBuilder.withPayload(new Date()).build());}*/@GetMapping("sendPerson")public void send2() {Person person = new Person();person.setName("Berg");person.setSex("man");streamClient.output().send(MessageBuilder.withPayload(person).build()); // 发送消息}@StreamListener(value = StreamClient.RETURN_MSG)public void returnMsg(String msg) {System.out.println("reurn message:"+msg);} // 消费消息后的返回
}
复制代码
消费者
首先定义一个接口:
/**java* 使用 stream 首先定义一个接口*/
public interface StreamClient {String MSG = "myStreamMsg"; // 消费消息队列String RETURN_MSG = "returnMsg"; // 返回队列@Input(MSG)SubscribableChannel input();@Output(RETURN_MSG)MessageChannel output();}
复制代码
消费者:
@Component
@EnableBinding(StreamClient.class) // 这里同样需要绑定接口
public class StreamReceiver {private final Logger logger = LoggerFactory.getLogger(StreamReceiver.class);/* @StreamListener(value = "myStreamMsg")public void receive(String msg) {logger.info("receive:{}", msg);}@StreamListener(value = "myStreamMsg")public void receive1(Date date) {SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");logger.info("receive:{}", dateFormat.format(date));}*/@StreamListener(StreamClient.MSG) // 监听消费消息队列@SendTo(StreamClient.RETURN_MSG) // 返回的消息队列public String receive2(Person p) {logger.info("name:{}",p.getName());logger.info("sex:{}",p.getSex());return "received message";}
}
复制代码
问题
重复订阅问题:
通常在生产环境,我们的每个服务都不会以单节点的方式运行在生产环境,当同一个服务启动多个实例的时候,这些实例都会绑定到同一个消息通道的目标主题(Topic)上。默认情况下,当生产者发出一条消息到绑定通道上,这条消息会产生多个副本被每个消费者实例接收和处理(出现上述重复消费问题)。但是有些业务场景之下,我们希望生产者产生的消息只被其中一个实例消费,这个时候我们需要为这些消费者设置消费组来实现这样的功能。
解决方案: 在消费者配置文件中添加如下配置
spring:cloud:stream:bindings:myStreamMsg: # 消息队列名称group: one # 如果这里不定义分组,就会出现多个实例重复消费的问题
复制代码
消息类型问题:
在发送消息时有可能时对象,也有可能时文本。因此不同类型之间的转换可能会出现问题。默认为 json 类型 解决方案: 在生产者配置文件中配置消息类型
spring:stream:bindings:myStreamMsg:Content-Type: application/json // 配置类型
复制代码
代码
github地址
Spring Cloud Stream 简单使用相关推荐
- 一文了解Spring Cloud Stream体系
点击蓝色"程序猿DD"关注我哟 加个"星标",不忘签到哦 来源:阿里巴巴中间件 Spring Cloud Stream 在 Spring Cloud 体系内用于 ...
- Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑
应用场景 有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的: ...
- Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)
应用场景 之前我们已经通过<Spring Cloud Stream消费失败后的处理策略(一):自动重试>一文介绍了Spring Cloud Stream默认的消息重试功能.本文将介绍Rab ...
- Spring Cloud Stream消费失败后的处理策略(三):使用DLQ队列(RabbitMQ)
应用场景 前两天我们已经介绍了两种Spring Cloud Stream对消息失败的处理策略: 自动重试:对于一些因环境原因(如:网络抖动等不稳定因素)引发的问题可以起到比较好的作用,提高消息处理的 ...
- Spring Cloud Stream消费失败后的处理策略(二):自定义错误处理逻辑
应用场景 上一篇<Spring Cloud Stream消费失败后的处理策略(一):自动重试>介绍了默认就会生效的消息重试功能.对于一些因环境原因.网络抖动等不稳定因素引发的问题可以起到 ...
- Spring Cloud Stream消费失败后的处理策略(一):自动重试
之前写了几篇关于Spring Cloud Stream使用中的常见问题,比如: 如何处理消息重复消费? 如何消费自己生产的消息? 下面几天就集中来详细聊聊,当消息消费失败之后该如何处理的几种方式. 不 ...
- Spring Cloud Stream 使用延迟消息实现定时任务(RabbitMQ)
应用场景 我们在使用一些开源调度系统(比如:elastic-job等)的时候,对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次.然而实际业务中还存在另外一种定时 ...
- Spring Cloud Stream 体系及原理介绍
https://mp.weixin.qq.com/s/e_pDTFmFcSqHH-uSIzNmMg Spring Cloud Stream 在 Spring Cloud 体系内用于构建高度可扩展的基于 ...
- 谷歌gcp 远程计算机_引导性GCP:带有Google Cloud Pub / Sub的Spring Cloud Stream
谷歌gcp 远程计算机 我最近在Sprint Central的工程博客上阅读了Josh Long的Bootiful GCP系列 ,特别喜欢关于使用Google Cloud的Pub / Sub的第四部分 ...
- 引导性GCP:带有Google Cloud Pub / Sub的Spring Cloud Stream
我最近在Sprint Central的工程博客上阅读了Josh Long的Bootiful GCP系列 ,特别喜欢关于使用Google Cloud的Pub / Sub的第四部分 . 我受到该系列的启发 ...
最新文章
- 漫谈视频目标跟踪与分割
- ISA Server 2006 的内部客户端概念
- [Hibernate]在VS2010中应用NHibernate 3.2与MySQL
- CentOS中安装WiFi图形管理工具
- 用美颜照当广告犯法!要么就标注“照骗”,挪威针对明星网红出手了
- SpringBoot - Spring Boot 应用剖析
- android中xmlns:tools属性详解
- 利用Excel VBA批量计算长时间序列植被物候动态阈值(逐像元)
- leetcode 376. 摆动序列 思考分析
- [转载]oracle定时器
- python访问序列元素的编号用什么括起来_【填空题】序列元素的编号称为 ,它从 开始,访问序列元素时将它用 括起来。...
- python变量图片_在Python中向3D图添加第4个变量
- 了解计算机网络拓扑结构,认识计算机网络拓扑结构
- 迎建国七十周年,Linux厂商巡礼之优麒麟
- 用python编写决策树算法_详细介绍python实现决策树C4.5算法
- Edge浏览器如何关闭金山毒霸安全主页.
- 迅雷下不了php文件怎么打开方式,如何解决迅雷打不开php文件的问题
- 求两者较大值的max函数的用法(c++基础)
- Linux必备工具————虚拟机
- html图片十字形,CSS3 十字架
热门文章
- 关于何种情况下使用DataGrid、DataList或Repeater的一些讨论(1) ambushaa [翻译] [转]
- vue 中使用axios的总结
- Android实现圆角和圆形
- SpringBoot中多种Filter配置方式
- wxml、wxss、js 引入外部文件的方法
- C# 计算程序运行耗时的方法
- PHP 删除文件,文件下的目录
- git clone 失败_鲜为人知的Git功能——Git Worktree工作树
- 蚂蚁如果上市成功,价格崩盘是必然的
- 没有安装gcc,导致提示configure cannot guess build type; you must specify one