springcloud的stream消息组件的使用@StreamListener
常见问题(使用rabbitmq)
消息分组防止多实例重复消费
在一个服务多实例场景下使用默认使用@StreamListener
监听消息消费,yml中没有特殊配置的话是会导致消息重复消费的,原因是此时每个实例都是匿名在rabbitmq上注册的队列,需要给消费者指定一个消费组,让消息在组里只被消费一次;
spring.cloud.stream.bindings.xxx(消费者队列名).group=xxx(组名
)
在springboot下在同一个服务(项目中)使用@input
和@outPut
时指定的队列名是不可以重复的.会在启动编译的时候报bean定义重复。需要在yml给生产者和消费者指定同一个交换机。
spring:rabbitmq:host: xxx.xxx.xxx.xxport: 35672username: xxxpassword: xxxvirtual-host: /xxxcloud:stream:bindings:in:#若消息系统是RabbitMQ,目的地(destination)就是指exchange,消息系统是Kafka,那么就是指topicdestination: test#在多实例的时候需要制定一个消息分组,不然每个实例都是匿名方式把队列注册到rabbitmq上去,导致一个交换机下有多个队列#并且默认生成的交换机是topic类型的,会导致重复消费group: myInout:destination: test
先上依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.8.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.fchan</groupId><artifactId>springcloudstream</artifactId><version>0.0.1-SNAPSHOT</version><name>springcloudstream</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<!-- <version>2.0.1.RELEASE</version>--></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-dependencies</artifactId><version>Ditmars.RELEASE</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
再上yml配置
spring:rabbitmq:host: xxx.xxx.xxx.xxport: 35672username: xxxpassword: xxxvirtual-host: /xxxcloud:stream:bindings:in:#若消息系统是RabbitMQ,目的地(destination)就是指exchange,消息系统是Kafka,那么就是指topicdestination: test#在多实例的时候需要制定一个消息分组,不然每个实例都是匿名方式把队列注册到rabbitmq上去,导致一个交换机下有多个队列#并且默认生成的交换机是topic类型的,会导致重复消费group: myInout:destination: test
消息生产者
package com.fchan.springcloudstream.service;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;public interface MyMessageChannel {String out = "out";String in = "in";@Output(out)MessageChannel out();@Input(in)SubscribableChannel in();}
发送消息
package com.fchan.springcloudstream.controller;import com.fchan.springcloudstream.service.MyMessageChannel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;@RestController
public class MessageController {@Resourceprivate MyMessageChannel myMessageChannel;@RequestMapping("test")public String testMessage(){Map<String,Object> map = new HashMap<>();map.put("shopId", "123");myMessageChannel.out().send(MessageBuilder.withPayload(map).build());return "success";}}
消息消费者
package com.fchan.springcloudstream.service;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@EnableBinding({MyMessageChannel.class})
public class MyConsumer {Logger log = LoggerFactory.getLogger(MyConsumer.class);@StreamListener(MyMessageChannel.in)public void input(Message<Map<String,Object>> message){log.info("收到消息:{}", message.getPayload());}}
springcloud的stream消息组件的使用@StreamListener相关推荐
- SpringCloud (十一) --------- Stream 消息驱动框架
目录 一.Stream 概述 二.Stream 重要概念 三.Stream 应用 四.Stream 自定义消息通道 五.Stream 分组与持久化 六.Stream 设置路由键 一.Stream 概述 ...
- SpringCloud微服务架构,Config 分布式配置中心,Bus 消息总线, Stream 消息驱动,Sleuth+Zipkin 链路追踪
Config分布式配置中心 Config 概述 概述 • Spring Cloud Config 解决了在分布式场景下多环境配置文件的管理和维护. • 好处: • 集中管理配置文件 • 不同环境不同配 ...
- SpringCloud Stream消息驱动
为啥有这个技术??? 1. 这个stream是操作消息队列的,简化,学习消息队列的成本降低. 2. 可操作rabbitMQ兔子message queue,kafaka,可理解为jdbc可操作oracl ...
- SpringCloud2020学习笔记13——SpringCloud Stream消息驱动
目录 一.消息驱动概述 1.简介 2.官网 2.设计思想 ① 标准MQ ② 为什么用Cloud Stream ③ Stream中的消息通信方式遵循了发布-订阅模式 3.Spring Cloud Str ...
- 手把手教你搭建SpringCloud项目(十六)集成Stream消息驱动
Spring Cloud全集文章目录: 零.什么是微服务?一看就会系列! 一.手把手教你搭建SpringCloud项目(一)图文详解,傻瓜式操作 二.手把手教你搭建SpringCloud项目(二)生产 ...
- SpringCloud学习笔记 - 消息驱动 - Spring Cloud Stream
1. stream为什么被引入 常见MQ(消息中间件): ActiveMQ RabbitMQ RocketMQ Kafka 有没有一种新的技术诞生,让我们不再关注具体MQ的细节,我们只需要用一种适配绑 ...
- SpringCloud(十一)Bus消息总线、Stream消息驱动
一.Bus消息总线 需求:分布式自动刷新配置功能: 解决:SpringCloud Bus配合Spring cloud Config使用可以实现配置的动态刷新. 1.概述 定义:Spring Cloud ...
- SpringCloud第九章(消息驱动Strem和RabbitMQ)
目录 1:什么是Strem 2:Strem架构 2.1:传统的消息对列架构 2.2:strem的消息对列架构 3:案例介绍 3.1:8001生产者 3.2:8002.8003消费者配置一致 4:总结消 ...
- Spring Cloud Stream消息驱动
一.Spring Cloud Stream消息驱动 1.为什么引入cloud Stream MQ(消息中间件) ActiveMQ RabbitMQ RocketMQ Kafka 问题:可能中间java ...
最新文章
- 1_Linux基础知识及命令————登陆登出及图形化界面的初始化
- Apache POI和EasyExcel 第二集:Apache POI的基本Excel写入(分为03版的xls和07版的xlsx)
- 【译】如何停止使用console.log()转而使用浏览器debugger
- mybatis传递多个参数_Mybatis传递多个参数的4种方式(干货)
- 经典FOXMAIL报错 winsock error 11004
- 为什么需要实现幂等性?
- Android系统init进程启动及init.rc全解析
- dedeCMS 会员:个人空间模板style修改
- win10浏览器html设置字体大小设置,edge字体大小设置?win10 edge浏览器字体设置技巧...
- sakai mysql_sakai 开发环境搭建(一)
- 【Apollo 6.0项目实战】Canbus模块
- word文件太大如何压缩到最小?
- MathType如何编辑大三角形符号
- 数据挖掘 任务一:预测贷款是否逾期
- Oracle中用户查询表一定要加上表空间名的问题
- 2017普实软件迎新年会报道
- 数据结构实验报告-实验四-实现桶式排序和基于桶式排序的基数排序
- EasyUI项目之门户(添加查询购物车与清空购物车)
- 蓝芽技术的原理和应用(2)(转)
- linux 内存清理 释放命令,linux 内存清理释放命令(示例代码)