常见问题(使用rabbitmq)

消息分组防止多实例重复消费
在一个服务多实例场景下使用默认使用@StreamListener监听消息消费,yml中没有特殊配置的话是会导致消息重复消费的,原因是此时每个实例都是匿名在rabbitmq上注册的队列,需要给消费者指定一个消费组,让消息在组里只被消费一次;

spring.cloud.stream.bindings.xxx(消费者队列名).group=xxx(组名)

在springboot下在同一个服务(项目中)使用@input@outPut时指定的队列名是不可以重复的.会在启动编译的时候报bean定义重复。需要在yml给生产者和消费者指定同一个交换机。

spring:rabbitmq:host: xxx.xxx.xxx.xxport: 35672username: xxxpassword: xxxvirtual-host: /xxxcloud:stream:bindings:in:#若消息系统是RabbitMQ,目的地(destination)就是指exchange,消息系统是Kafka,那么就是指topicdestination: test#在多实例的时候需要制定一个消息分组,不然每个实例都是匿名方式把队列注册到rabbitmq上去,导致一个交换机下有多个队列#并且默认生成的交换机是topic类型的,会导致重复消费group: myInout:destination: test

先上依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.5.8.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.fchan</groupId><artifactId>springcloudstream</artifactId><version>0.0.1-SNAPSHOT</version><name>springcloudstream</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<!--            <version>2.0.1.RELEASE</version>--></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-dependencies</artifactId><version>Ditmars.RELEASE</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

再上yml配置

spring:rabbitmq:host: xxx.xxx.xxx.xxport: 35672username: xxxpassword: xxxvirtual-host: /xxxcloud:stream:bindings:in:#若消息系统是RabbitMQ,目的地(destination)就是指exchange,消息系统是Kafka,那么就是指topicdestination: test#在多实例的时候需要制定一个消息分组,不然每个实例都是匿名方式把队列注册到rabbitmq上去,导致一个交换机下有多个队列#并且默认生成的交换机是topic类型的,会导致重复消费group: myInout:destination: test

消息生产者

package com.fchan.springcloudstream.service;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;public interface MyMessageChannel {String out = "out";String in = "in";@Output(out)MessageChannel out();@Input(in)SubscribableChannel in();}

发送消息

package com.fchan.springcloudstream.controller;import com.fchan.springcloudstream.service.MyMessageChannel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;@RestController
public class MessageController {@Resourceprivate MyMessageChannel myMessageChannel;@RequestMapping("test")public String testMessage(){Map<String,Object> map = new HashMap<>();map.put("shopId", "123");myMessageChannel.out().send(MessageBuilder.withPayload(map).build());return "success";}}

消息消费者

package com.fchan.springcloudstream.service;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;import java.util.Map;@Component
@EnableBinding({MyMessageChannel.class})
public class MyConsumer {Logger log = LoggerFactory.getLogger(MyConsumer.class);@StreamListener(MyMessageChannel.in)public void input(Message<Map<String,Object>> message){log.info("收到消息:{}", message.getPayload());}}

springcloud的stream消息组件的使用@StreamListener相关推荐

  1. SpringCloud (十一) --------- Stream 消息驱动框架

    目录 一.Stream 概述 二.Stream 重要概念 三.Stream 应用 四.Stream 自定义消息通道 五.Stream 分组与持久化 六.Stream 设置路由键 一.Stream 概述 ...

  2. SpringCloud微服务架构,Config 分布式配置中心,Bus 消息总线, Stream 消息驱动,Sleuth+Zipkin 链路追踪

    Config分布式配置中心 Config 概述 概述 • Spring Cloud Config 解决了在分布式场景下多环境配置文件的管理和维护. • 好处: • 集中管理配置文件 • 不同环境不同配 ...

  3. SpringCloud Stream消息驱动

    为啥有这个技术??? 1. 这个stream是操作消息队列的,简化,学习消息队列的成本降低. 2. 可操作rabbitMQ兔子message queue,kafaka,可理解为jdbc可操作oracl ...

  4. SpringCloud2020学习笔记13——SpringCloud Stream消息驱动

    目录 一.消息驱动概述 1.简介 2.官网 2.设计思想 ① 标准MQ ② 为什么用Cloud Stream ③ Stream中的消息通信方式遵循了发布-订阅模式 3.Spring Cloud Str ...

  5. 手把手教你搭建SpringCloud项目(十六)集成Stream消息驱动

    Spring Cloud全集文章目录: 零.什么是微服务?一看就会系列! 一.手把手教你搭建SpringCloud项目(一)图文详解,傻瓜式操作 二.手把手教你搭建SpringCloud项目(二)生产 ...

  6. SpringCloud学习笔记 - 消息驱动 - Spring Cloud Stream

    1. stream为什么被引入 常见MQ(消息中间件): ActiveMQ RabbitMQ RocketMQ Kafka 有没有一种新的技术诞生,让我们不再关注具体MQ的细节,我们只需要用一种适配绑 ...

  7. SpringCloud(十一)Bus消息总线、Stream消息驱动

    一.Bus消息总线 需求:分布式自动刷新配置功能: 解决:SpringCloud Bus配合Spring cloud Config使用可以实现配置的动态刷新. 1.概述 定义:Spring Cloud ...

  8. SpringCloud第九章(消息驱动Strem和RabbitMQ)

    目录 1:什么是Strem 2:Strem架构 2.1:传统的消息对列架构 2.2:strem的消息对列架构 3:案例介绍 3.1:8001生产者 3.2:8002.8003消费者配置一致 4:总结消 ...

  9. Spring Cloud Stream消息驱动

    一.Spring Cloud Stream消息驱动 1.为什么引入cloud Stream MQ(消息中间件) ActiveMQ RabbitMQ RocketMQ Kafka 问题:可能中间java ...

最新文章

  1. 1_Linux基础知识及命令————登陆登出及图形化界面的初始化
  2. Apache POI和EasyExcel 第二集:Apache POI的基本Excel写入(分为03版的xls和07版的xlsx)
  3. 【译】如何停止使用console.log()转而使用浏览器debugger
  4. mybatis传递多个参数_Mybatis传递多个参数的4种方式(干货)
  5. 经典FOXMAIL报错 winsock error 11004
  6. 为什么需要实现幂等性?
  7. Android系统init进程启动及init.rc全解析
  8. dedeCMS 会员:个人空间模板style修改
  9. win10浏览器html设置字体大小设置,edge字体大小设置?win10 edge浏览器字体设置技巧...
  10. sakai mysql_sakai 开发环境搭建(一)
  11. 【Apollo 6.0项目实战】Canbus模块
  12. word文件太大如何压缩到最小?
  13. MathType如何编辑大三角形符号
  14. 数据挖掘 任务一:预测贷款是否逾期
  15. Oracle中用户查询表一定要加上表空间名的问题
  16. 2017普实软件迎新年会报道
  17. 数据结构实验报告-实验四-实现桶式排序和基于桶式排序的基数排序
  18. EasyUI项目之门户(添加查询购物车与清空购物车)
  19. 蓝芽技术的原理和应用(2)(转)
  20. linux 内存清理 释放命令,linux 内存清理释放命令(示例代码)

热门文章

  1. CollectionUtils取交集、并集和差集
  2. oracle profile default,ORACLE用户PROFILE详解
  3. 面试中五个咸鱼翻身的机会,好好把握!
  4. 【转】数据库基本知识:(五)视图应用
  5. 基于ARM的嵌入式Linux移植真实体验(4)――设备驱动
  6. const 关键字详解
  7. vue——devtools安装(实时监测vue的data数据变动)
  8. Vue 自定义编辑器 contenteditable
  9. 实施ITIL项目的十个参考步骤
  10. 对象的 游离态 持久态 临时态