一. 首先安装rabbitmq-management

这里用的是rabbitmq的docker镜像,我们可以在Docker Hub中搜索rabbitmq, 找到最新的版本安装

sudo docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.7.8-management

安装之后使用

docker ps -a

检查下, rabbitmq的镜像是否启动, 正常启动状态如下:

通过http://192.168.12.12:15672, 访问到rabbitmq的管理端,
默认账户/密码是: guest/guest

二. Spring Cloud Stream与RabbitMQ集成

引入依赖

<!-- Spring Cloud Stream RabbitMQ -->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

创建消息接受类

@Component
@Slf4j
@EnableBinding(Processor.class)
public class MyMQReciver {@StreamListener(Processor.INPUT)public void process(String message){log.info("hahahah : "+message);System.out.println("hahahah : "+message);}
}

代码解读两处:
1. @StreamListener(Processor.INPUT)
    这里其实是要声明一个订阅的键值, Processor类是一个org.springframework.cloud.stream.messaging jar包中内置的接口,查看其源码可以看到它继承了Source和Sink两个类

package org.springframework.cloud.stream.messaging;public interface Processor extends Source, Sink {
}
package org.springframework.cloud.stream.messaging;import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;public interface Source {String OUTPUT = "output";@Output("output")MessageChannel output();
}
package org.springframework.cloud.stream.messaging;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;public interface Sink {String INPUT = "input";@Input("input")SubscribableChannel input();
}

结构很简单, 我们也可以仿照去实现自己的Processor
2. @EnableBinding(Processor.class)
   这里绑定的就是Processor(或者我们自己实现的Processor)

创建发送消息测试类

@RestController
public class SendController {@Autowiredprivate Processor pipe;@GetMapping("/send")public void send(@RequestParam String message){pipe.output().send(MessageBuilder.withPayload(message).build());}
}

在application.yml中增加配置

spring:cloud:stream:
      bindings:
        input:
          destination: jeecgCloudExchangebinder: local_rabbitgroup: logMessageConsumersoutput:
          destination: jeecgCloudExchangebinder: local_rabbitbinders:
        local_rabbit:
          type: rabbitenvironment:
            spring:
              rabbitmq:
                host: localhostport: 5672username: guestpassword: guestvirtual-host: /

启动SpringCloud项目

首先浏览器进入rabbitmq管理端查看, 发现我们在application.yml中创建的output destination被自动创建出来了

input destination也被自动创建出来了,并且自动添加了绑定

确认rabbitmq这边没有问题后, 我们通过访问消息接口测试, http://localhost:8001/send?message=hello
发现MyMQReciver已经成功接受到了消息

三. 理论知识点

Spring Cloud Stream核心架构1

两个比较重要的地方:inputs(输入)消息接收端、outputs(输出)消息发送端

一个 Spring Cloud Stream 应用以消息中间件为核心,应用通过Spring Cloud Stream注入的输入/输出通道 channels 与外部进行通信。channels 通过特定的Binder实现与外部消息中间件进行通信。

Spring Cloud Stream核心架构2

黄色:表示RabbitMQ

绿色:插件,消息的输入输出都套了一层插件,插件可以用于各种各样不同的消息,也可以用于消息中间件的替换。

核心概念:

Barista接口:Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称,通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。

通道接口如何定义:

@Output:输出注解,用于定义发送消息接口

@Input:输入注解,用于定义消息的消费者接口

@StreamListener:用于定义监听方法的注解

使用Spring Cloud Stream 非常简单,只需要使用好这3个注解即可,在实现高性能消息的生产和消费的场景非常合适,但是使用SpringCloudStream框架有一个非常大的问题,就是不能实现可靠性的投递,也就是没法保证消息的100%可靠性,会存在少量消息丢失的问题。目前SpringCloudStream整合了RabbitMQ与Kafka,我们都知道Kafka是无法进行消息可靠性投递的,这个原因是因为SpringCloudStream框架为了和Kafka兼顾所以在实际工作中使用它的目的就是针对高性能的消息通信的!这点就是在当前版本SpringCloudStream的定位。

因此在实际的工作中,可以采用SpringCloudStream,如果需要保证可靠性投递,也可以单独采用RabbitMQ,也是可以的。

介绍Spring Cloud Stream与RabbitMQ集成相关推荐

  1. 使用Spring Cloud Stream与RabbitMQ集成

    在我以前的文章中,我写了两个系统之间非常简单的集成场景-一个生成一个工作单元,另一个处理该工作单元,以及Spring Integration如何使这种集成非常容易. 在这里,我将演示如何使用Sprin ...

  2. Spring Cloud Stream与RabbitMQ 死信队列

    RabbitMQ的TTL全称为Time-To-Live,表示的是消息的有效期.消息如果在队列中一直没有被消费并且存在时间超过了TTL,消息就会变成"死信" (Dead Messag ...

  3. Spring Cloud Stream多RabbitMQ实例配置时报错no default binder has been set

    当前Spring Cloud Rabbit的版本为2.1.2 <dependency><groupId>org.springframework.cloud</groupI ...

  4. 【本人秃顶程序员】使用Spring Cloud Stream和RabbitMQ实现事件驱动的微服务

    ←←←←←←←←←←←← 快!点关注 让我们展示如何使用Spring Cloud Stream来设计事件驱动的微服务.首先,Spring Cloud Stream首先有什么好处?因为Spring AM ...

  5. Spring Cloud Stream与RabbitMQ 生产者和消费者位于同一个应用服务

    第一种模型:交换机类型为topic,路由key为"#",这是简单的使用模型 当前Spring Cloud Rabbit的版本为2.1.2 <dependency>< ...

  6. Spring Cloud Stream与RabbitMQ 消费者 消息分组

    Spring Cloud Stream 其实是发布订阅模型,如果一个topic有多个订阅实例 ,消息就会被这些消息消费者接收到,这样就会带来一个问题,那就是消息的重复消费,这种问题在很多业务场景下是不 ...

  7. Spring Cloud Stream与RabbitMQ整合

    Spring Cloud与RabbitMQ整合需要引入下述依赖 <dependency><groupId>org.springframework.cloud</group ...

  8. Spring Cloud Stream与RabbitMQ整合时Producer与Consumer的相关配置

    生产者属性 下面的属性都必须添加前缀: spring.cloud.stream.<rabbitName>.bindings.<channelName>.producer. 如果 ...

  9. Spring Cloud Stream整合RabbitMQ

    生产者: 消费者:

最新文章

  1. 利用curl下载文件(进度条显示) 代码片段
  2. 《Genesis-3D游戏引擎系列教程-进阶篇》6:动画
  3. 你知道怎么在生产环境下部署tomcat吗?,灵魂拷问
  4. 汇编实现地址对应值相加
  5. POP缩小区域扩张导致的延迟差距—Vecloud微云
  6. word 流水号 自动增加_以自动组卷软件为例浅谈Python自动化办公
  7. matlab基数排序,c语言 数据结构 利用随机函数产生N个随机整数,对这些数进行多种方法进行排序...
  8. 日期不能交叉的检测算法
  9. golang开发环境配置及Beego框架安装
  10. python读取tiff文件进行波段计算_python+tifffile之tiff文件读写方式
  11. brew安装php-ffmpeg,mac 系统编译安装ffmpeg
  12. C#项目xxx针对的是.NETFramework,Version=vxxx之解决方案
  13. 前端页面常用代码参考
  14. STM8停产,新唐的N76E003 pin对pin替换STM8S003F3P6
  15. java 语音库_语音控制pc
  16. 操作系统进程同步之吸烟者问题,C语言实现
  17. 车牌识别-基于模板匹配
  18. 微信公众账号查看历史消息
  19. 计算机图形学(四)几何变换_4_二维复合变换_4_二维刚体变换
  20. C/C++ 延时函数 (标准库)

热门文章

  1. mysql+查看connection_如何查看MySQL connection id连接id
  2. C语言实现tolower
  3. linux文件系统 环形结构图,环形缓冲器(转)
  4. 数据结构之外部排序:最佳归并树
  5. Apache ActiveMQ 远程代码执行漏洞记录(CVE-2016-3088,端口:8186)
  6. vue i18n 国际化 使用方法
  7. Educational Codeforces Round 14 - F (codeforces 691F)
  8. pt-online-schema-change 修改主键导致数据删除失败的问题调查
  9. MySqlDataReader在Using中使用
  10. (android 实战总结)android第三方组件实现总结