应用场景 有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的:

  1. @StreamListener(value = TestTopic.INPUT)

  2. public void receiveV1(String payload, @Header("version") String version) {

  3.    if("1.0".equals(version)) {

  4.        // Version 1.0

  5.    }

  6.    if("2.0".equals(version)) {

  7.        // Version 2.0

  8.    }

  9. }

那么当消息处理逻辑复杂的时候,这段逻辑就会变得特别复杂。针对这个问题,在@StreamListener注解中提供了一个不错的属性condition,可以用来优化这样的处理结构。

动手试试 下面通过编写一个简单的例子来具体体会一下这个属性的用法:

  1. @EnableBinding(TestApplication.TestTopic.class)

  2. @SpringBootApplication

  3. public class TestApplication {

  4.    public static void main(String[] args) {

  5.        SpringApplication.run(TestApplication.class, args);

  6.    }

  7.    @RestController

  8.    static class TestController {

  9.        @Autowired

  10.        private TestTopic testTopic;

  11.        /**

  12.         * 消息生产接口

  13.         *

  14.         * @param message

  15.         * @return

  16.         */

  17.        @GetMapping("/sendMessage")

  18.        public String messageWithMQ(@RequestParam String message) {

  19.            testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "1.0").build());

  20.            testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "2.0").build());

  21.            return "ok";

  22.        }

  23.    }

  24.    /**

  25.     * 消息消费逻辑

  26.     */

  27.    @Slf4j

  28.    @Component

  29.    static class TestListener {

  30.        @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='1.0'")

  31.        public void receiveV1(String payload, @Header("version") String version) {

  32.            log.info("Received v1 : " + payload + ", " + version);

  33.        }

  34.        @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='2.0'")

  35.        public void receiveV2(String payload, @Header("version") String version) {

  36.            log.info("Received v2 : " + payload + ", " + version);

  37.        }

  38.    }

  39.    interface TestTopic {

  40.        String OUTPUT = "example-topic-output";

  41.        String INPUT = "example-topic-input";

  42.        @Output(OUTPUT)

  43.        MessageChannel output();

  44.        @Input(INPUT)

  45.        SubscribableChannel input();

  46.    }

  47. }

内容很简单,既包含了消息的生产,也包含了消息消费。在/sendMessage接口的定义中,发送了两条消息,一条消息的头信息中包含version=1.0,另外一条消息的头信息中包含version=2.0。在消息监听类TestListener中,对TestTopic.INPUT通道定义了两个@StreamListener,这两个监听逻辑有不同的condition,这里的表达式表示会根据消息头信息中的version值来做不同的处理逻辑分发。

在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名),比如:

  1. spring.cloud.stream.bindings.example-topic-input.destination=test-topic

  2. spring.cloud.stream.bindings.example-topic-input.group=stream-content-route

  3. spring.cloud.stream.bindings.example-topic-output.destination=test-topic

完成了上面配置之后,就可以启动应用,并尝试访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了。此时可以看到类似下面的日志:

  1. 2018-12-24 15:50:33.361  INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener  : Received v1 : hello, 1.0

  2. 2018-12-24 15:50:33.363  INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener  : Received v2 : hello, 2.0

从日志中可以看到,两条带有不同头信息的消息,分别通过不同的监听处理逻辑输出了对应的日志打印。

·END·

 近期热文:

  • Git 版本控制之 GitFlow

  • 彻底搞懂 Git-Rebase

  • 我说分布式事务之最大努力通知型事务

  • 我说分布式事务之TCC

  • 让你的系统“坚挺不倒”的最后一个大招——「降级」

  • 不可错过的CMS学习笔记

  • 致敬| 她永远地离开了,但我们依然每天收益于您的伟大发明!

  • 在生产中使用Java 11:需要了解的重要事项

  • 如何在到处是“雷”的系统中「明哲保身」?这是第一招

  • 可能是最全面的G1学习笔记

看完,赶紧点个“好看”鸭

点鸭点鸭

↓↓↓↓

Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑相关推荐

  1. Spring Cloud 分布式消息—Spring Cloud Stream 自定义通道与分组分区应用

    在Spring Cloud 分布式消息-Spring Cloud Stream 简介与入门一篇我们简单了介绍了Spring Cloud Stream,并且使用Spring Cloud Stream提供 ...

  2. Spring Cloud Stream与RabbitMQ 消费者 消息分组

    Spring Cloud Stream 其实是发布订阅模型,如果一个topic有多个订阅实例 ,消息就会被这些消息消费者接收到,这样就会带来一个问题,那就是消息的重复消费,这种问题在很多业务场景下是不 ...

  3. Spring Cloud Stream 学习小清单

    由于最近一直在写Spring Cloud Stream相关的内容,在2018年最后一天,把之前写过的Spring Cloud Stream内容从基础,到入门,到深入,做一些小清单,方便大家查阅. 如果 ...

  4. Spring Cloud【Finchley】- 21 Spring Cloud Stream 构建消息驱动微服务

    文章目录 概述 添加依赖 配置文件配置RabbitMQ的地址信息 接口定义 接收方 @EnableBinding @StreamListener 测试 消费组 发送复杂对象 消息回执 代码 概述 官网 ...

  5. Spring Cloud Stream消息中间件通过RabbitMQ实现消息推送

    一.Spring Cloud Stream是什么 Spring Cloud Stream 是一个构建消息驱动微服务的框架. Spring Cloud Stream解决了开发人员无感知的使用消息中间件的 ...

  6. SpringCloud学习笔记 - 消息驱动 - Spring Cloud Stream

    1. stream为什么被引入 常见MQ(消息中间件): ActiveMQ RabbitMQ RocketMQ Kafka 有没有一种新的技术诞生,让我们不再关注具体MQ的细节,我们只需要用一种适配绑 ...

  7. Spring Cloud Stream消息驱动

    一.Spring Cloud Stream消息驱动 1.为什么引入cloud Stream MQ(消息中间件) ActiveMQ RabbitMQ RocketMQ Kafka 问题:可能中间java ...

  8. Spring Cloud Stream中文指导手册

    Spring Cloud Stream中文指导手册 source 文章目录 Spring Cloud Stream中文指导手册 @[toc] Spring Cloud Stream 核心 1.简介 2 ...

  9. Spring Cloud Stream中文翻译

    Ditmars.RELEASE 1.Spring Cloud Stream 介绍 Spring Cloud Stream是一个用于构建消息驱动应用的微服务框架.Spring Cloud Stream基 ...

最新文章

  1. 【转】GLSL资料收集
  2. 计算机辅助设计b实验目的,上海电力学院电路计算机辅助设计1--含有受控源电路辅助分析...
  3. functools.partial()==>预先设置参数,使得之后调用的时候,减少函数的参数
  4. stl Vecotr中遍历方法
  5. java foreach多线程_java关键字(一)
  6. 特定时间循环增加一个时间段值
  7. 主题模型TopicModel:LSA(隐性语义分析)模型和其实现的早期方法SVD
  8. f1c100s uboot调试记录
  9. 微信公众号申请需要哪些材料
  10. 安卓Android手机校园外卖订餐系统毕业设计
  11. hadoop无法自动生成tmp文件
  12. web 前端判断身份证号码是否有效
  13. Windows检测是否存在ms17-010(永恒之蓝)_Server Message Block
  14. 采集学校网站数据的10个经典方法
  15. ArcGIS栅格转面失败 所转面为空 显示原栅格范围有问题
  16. 应用统计432考研复试提问总结精简版【二】
  17. php网络名片系统源码免费电子云名片3.2版
  18. PS笔刷安装与使用方法(简单易懂)
  19. 关于大学生创新创业训练项目
  20. 运维 - 第一阶段 - linux与shell编程

热门文章

  1. python3 strip lstrip rstrip 删除字符串首尾指定字符
  2. Linux C使用bool类型 出现错误expected '=', ',', ';', 'asm' or '__attribute__'
  3. C语言实现单链表的逆置
  4. linux container容器技术框架性理解
  5. linux通信机制总结
  6. oracle在非归档模式下,Oracle在非归档模式下不能更改表空间为备份模式
  7. scrapy 中不同页面的拼接_scrapy官方文档提供的常见使用问题
  8. 长方形纸做容积最大的长方体_儿童手工折纸,童年玩具纸扇子怎么折?一起来回忆下经典折法吧...
  9. 字节跳动面试题:用归并排序判断冒泡排序的轮数
  10. flutter 人脸检测_【转载】opencv实现人脸检测