Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑
应用场景 有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的:
@StreamListener(value = TestTopic.INPUT)
public void receiveV1(String payload, @Header("version") String version) {
if("1.0".equals(version)) {
// Version 1.0
}
if("2.0".equals(version)) {
// Version 2.0
}
}
那么当消息处理逻辑复杂的时候,这段逻辑就会变得特别复杂。针对这个问题,在@StreamListener注解中提供了一个不错的属性condition,可以用来优化这样的处理结构。
动手试试 下面通过编写一个简单的例子来具体体会一下这个属性的用法:
@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
@RestController
static class TestController {
@Autowired
private TestTopic testTopic;
/**
* 消息生产接口
*
* @param message
* @return
*/
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "1.0").build());
testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "2.0").build());
return "ok";
}
}
/**
* 消息消费逻辑
*/
@Slf4j
@Component
static class TestListener {
@StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='1.0'")
public void receiveV1(String payload, @Header("version") String version) {
log.info("Received v1 : " + payload + ", " + version);
}
@StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='2.0'")
public void receiveV2(String payload, @Header("version") String version) {
log.info("Received v2 : " + payload + ", " + version);
}
}
interface TestTopic {
String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";
@Output(OUTPUT)
MessageChannel output();
@Input(INPUT)
SubscribableChannel input();
}
}
内容很简单,既包含了消息的生产,也包含了消息消费。在/sendMessage接口的定义中,发送了两条消息,一条消息的头信息中包含version=1.0,另外一条消息的头信息中包含version=2.0。在消息监听类TestListener中,对TestTopic.INPUT通道定义了两个@StreamListener,这两个监听逻辑有不同的condition,这里的表达式表示会根据消息头信息中的version值来做不同的处理逻辑分发。
在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名),比如:
spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-content-route
spring.cloud.stream.bindings.example-topic-output.destination=test-topic
完成了上面配置之后,就可以启动应用,并尝试访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了。此时可以看到类似下面的日志:
2018-12-24 15:50:33.361 INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener : Received v1 : hello, 1.0
2018-12-24 15:50:33.363 INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener : Received v2 : hello, 2.0
从日志中可以看到,两条带有不同头信息的消息,分别通过不同的监听处理逻辑输出了对应的日志打印。
·END·
近期热文:
Git 版本控制之 GitFlow
彻底搞懂 Git-Rebase
我说分布式事务之最大努力通知型事务
我说分布式事务之TCC
让你的系统“坚挺不倒”的最后一个大招——「降级」
不可错过的CMS学习笔记
致敬| 她永远地离开了,但我们依然每天收益于您的伟大发明!
在生产中使用Java 11:需要了解的重要事项
如何在到处是“雷”的系统中「明哲保身」?这是第一招
可能是最全面的G1学习笔记
看完,赶紧点个“好看”鸭
点鸭点鸭
↓↓↓↓
Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑相关推荐
- Spring Cloud 分布式消息—Spring Cloud Stream 自定义通道与分组分区应用
在Spring Cloud 分布式消息-Spring Cloud Stream 简介与入门一篇我们简单了介绍了Spring Cloud Stream,并且使用Spring Cloud Stream提供 ...
- Spring Cloud Stream与RabbitMQ 消费者 消息分组
Spring Cloud Stream 其实是发布订阅模型,如果一个topic有多个订阅实例 ,消息就会被这些消息消费者接收到,这样就会带来一个问题,那就是消息的重复消费,这种问题在很多业务场景下是不 ...
- Spring Cloud Stream 学习小清单
由于最近一直在写Spring Cloud Stream相关的内容,在2018年最后一天,把之前写过的Spring Cloud Stream内容从基础,到入门,到深入,做一些小清单,方便大家查阅. 如果 ...
- Spring Cloud【Finchley】- 21 Spring Cloud Stream 构建消息驱动微服务
文章目录 概述 添加依赖 配置文件配置RabbitMQ的地址信息 接口定义 接收方 @EnableBinding @StreamListener 测试 消费组 发送复杂对象 消息回执 代码 概述 官网 ...
- Spring Cloud Stream消息中间件通过RabbitMQ实现消息推送
一.Spring Cloud Stream是什么 Spring Cloud Stream 是一个构建消息驱动微服务的框架. Spring Cloud Stream解决了开发人员无感知的使用消息中间件的 ...
- SpringCloud学习笔记 - 消息驱动 - Spring Cloud Stream
1. stream为什么被引入 常见MQ(消息中间件): ActiveMQ RabbitMQ RocketMQ Kafka 有没有一种新的技术诞生,让我们不再关注具体MQ的细节,我们只需要用一种适配绑 ...
- Spring Cloud Stream消息驱动
一.Spring Cloud Stream消息驱动 1.为什么引入cloud Stream MQ(消息中间件) ActiveMQ RabbitMQ RocketMQ Kafka 问题:可能中间java ...
- Spring Cloud Stream中文指导手册
Spring Cloud Stream中文指导手册 source 文章目录 Spring Cloud Stream中文指导手册 @[toc] Spring Cloud Stream 核心 1.简介 2 ...
- Spring Cloud Stream中文翻译
Ditmars.RELEASE 1.Spring Cloud Stream 介绍 Spring Cloud Stream是一个用于构建消息驱动应用的微服务框架.Spring Cloud Stream基 ...
最新文章
- 【转】GLSL资料收集
- 计算机辅助设计b实验目的,上海电力学院电路计算机辅助设计1--含有受控源电路辅助分析...
- functools.partial()==>预先设置参数,使得之后调用的时候,减少函数的参数
- stl Vecotr中遍历方法
- java foreach多线程_java关键字(一)
- 特定时间循环增加一个时间段值
- 主题模型TopicModel:LSA(隐性语义分析)模型和其实现的早期方法SVD
- f1c100s uboot调试记录
- 微信公众号申请需要哪些材料
- 安卓Android手机校园外卖订餐系统毕业设计
- hadoop无法自动生成tmp文件
- web 前端判断身份证号码是否有效
- Windows检测是否存在ms17-010(永恒之蓝)_Server Message Block
- 采集学校网站数据的10个经典方法
- ArcGIS栅格转面失败 所转面为空 显示原栅格范围有问题
- 应用统计432考研复试提问总结精简版【二】
- php网络名片系统源码免费电子云名片3.2版
- PS笔刷安装与使用方法(简单易懂)
- 关于大学生创新创业训练项目
- 运维 - 第一阶段 - linux与shell编程
热门文章
- python3 strip lstrip rstrip 删除字符串首尾指定字符
- Linux C使用bool类型 出现错误expected '=', ',', ';', 'asm' or '__attribute__'
- C语言实现单链表的逆置
- linux container容器技术框架性理解
- linux通信机制总结
- oracle在非归档模式下,Oracle在非归档模式下不能更改表空间为备份模式
- scrapy 中不同页面的拼接_scrapy官方文档提供的常见使用问题
- 长方形纸做容积最大的长方体_儿童手工折纸,童年玩具纸扇子怎么折?一起来回忆下经典折法吧...
- 字节跳动面试题:用归并排序判断冒泡排序的轮数
- flutter 人脸检测_【转载】opencv实现人脸检测