消息驱动式微服务:Spring Cloud Stream RabbitMQ
1. 概述
在本文中,我们将向您介绍Spring Cloud Stream
,这是一个用于构建消息驱动的微服务应用程序的框架,这些应用程序由一个常见的消息传递代理(如RabbitMQ
、Apache Kafka
等)连接。
Spring Cloud Stream
构建在现有Spring框架(如Spring Messaging
和Spring Integration
)之上。尽管这些框架经过了实战测试,工作得非常好,但是实现与使用的message broker
紧密耦合。此外,有时对某些用例进行扩展是困难的。
Spring Cloud Stream
背后的想法是一个非常典型的Spring Boot
概念——抽象地讲,让Spring根据配置和依赖关系管理在运行时找出实现自动注入
。这意味着您可以通过更改依赖项和配置文件来更改message broker
。可以在这里找到目前已经支持的各种消息代理。
本文将使用RabbitMQ
作为message broker
。在此之前,让我们了解一下broker
(代理)的一些基本概念,以及为什么要在面向微服务的体系架构中需要它。
2. 微服务中的消息
在微服务体系架构中,我们有许多相互通信以完成请求的小型应用程序—它们的主要优点之一是改进了的可伸缩性。一个请求从多个下游微服务传递到完成是很常见的。例如,假设我们有一个Service-A
内部调用Service-B
和Service-C
来完成一个请求:
[外链图片转存失败(img-jzvHHRXw-1562549429195)(https://user-gold-cdn.xitu.io/2019/7/7/16bccd47c4051b28?w=511&h=347&f=png&s=11713)]
是的,还会有其他组件,比如Spring Cloud Eureka
、Spring Cloud Zuul
等等,但我们还是专注关心这类架构的特有问题。
假设由于某种原因Service-B
需要更多的时间来响应。也许它正在执行I/O操作
或长时间的DB事务
,或者进一步调用其它导致Service-B
变得更慢的服务,这些都使其无法更具效率。
现在,我们可以启动更多的Service-B
实例来解决这个问题,这样很好,但是Service-A
实际上是响应很快的,它需要等待Service-B
的响应来进一步处理。这将导致Service-A
无法接收更多的请求,这意味着我们还必须启动Service-A
的多个实例。
另一种方法解决类似情况的是使用事件驱动的微服务体系架构。这基本上意味着Service-A
不直接通过HTTP
调用Service-B
或Service-C
,而是将请求或事件发布给message broker
(消息代理)。Service-B
和Service-C
将成为message broker
(消息代理)上此事件的订阅者。
与依赖HTTP调用的传统微服务体系架构相比,这有许多优点:
- 提高可伸缩性和可靠性——现在我们知道哪些服务是整个应用程序中的真正瓶颈。
- 鼓励松散耦合——
Service-A
不需要了解Service-B
和Service-C
。它只需要连接到message broker
并发布事件。事件如何进一步编排取决于代理设置。通过这种方式,Service-A
可以独立地运行,这是微服务的核心概念之一。 - 与遗留系统交互——通常我们不能将所有东西都移动到一个新的技术堆栈中。我们仍然必须使用遗留系统,虽然速度很慢,但是很可靠。
3. RabbitMQ
高级消息队列协议(AMQP)
是RabbitMQ
用于消息传递的协议。虽然RabbitMQ
支持其他一些协议,但是AMQP
由于兼容性和它提供的大量特性而更受欢迎。
3.1 RabbitMQ架构设计
因此发布者将消息发布到RabbitMQ
中称为Exchange
(交换器)。Exchange
(交换器)接收消息并将其路由到一个或多个Queues
(队列)。路由算法依赖于Exchange
(交换器)类型和routing
(路由)key/header(与消息一起传递)。将Exchange
(交换器)连接到Queues
(队列)的这些规则称为bindings
(绑定)。
绑定可以有4种类型:
- Direct: 它根据
routing key
(路由键)将Exchange
(交换器)类型直接路由到特定的Queues
(队列)。 - Fanout:它将消息路由到绑定
Exchange
(交换器)中的所有Queues
(队列)。 - Topic:它根据完全匹配或部分据
routing key
(路由键)匹配将消息路由到(0、1或更多)的Queues
(队列)。 - Headers:它类似于
Topic
(主题)交换类型,但是它是基routing header
(路由头)而不是routing key
(路由键)来路由的。
来源: https://www.cloudamqp.com/
通过Exchange
(交换器)和Queues
(队列)发布和消费消息的整个过程是通过一个Channel
(通道)完成的。
有关路由的详细信息,请访问此链接。
3.2 RabbitMQ 设置
3.2.1 安装
我们可以从这里下载并安装基于我们的操作系统的二进制文件。
然而,在本文中,我们将使用cloudamqp.com
提供的免费云安装。只需注册服务并登录即可。
在主仪表板上单击创建新实例
:
然后给你的实例起个名字,然后进入下一步:
然后选择一个可用区:
最后,查看实例信息,点击右下角的创建实例
:
就是这样。现在在云中运行了一个RabbitMQ
实例。有关实例的更多信息,请转到您的仪表板并单击新创建的实例
:
我们可以看到我们可以访问RabbitMQ实例的主机,比如从我们的项目连接所需的用户名和密码:
我们将在Spring应用程序中使用AMQP URL
连接到这个实例,所以请在某个地方记下它。
您还可以通过单击左上角的RabbitMQ manager
来查看管理器控制台。这将采用它来管理的您的RabbitMQ
实例。
Project 配置
现在我们的设置已经准备好了,让我们创建我们的服务:
- cloud-stream-producer-rabbitmq: 作为一个发布者,将消息推送到
RabbitMQ
- cloud-stream-consumer-rabbitmq: 消费者消费消息
使用Spring Initializr
创建一个脚手架项目。这将是我们的producer
项目,我们将使用REST
端点发布消息。
选择您喜欢的Spring Boot
版本,添加Web
和Cloud Stream
依赖项,生成Maven
项目:
注意:
请注意cloud-stream
依赖项。这也需要像RabbitMQ
、Kafka
等绑定器依赖项才能工作。
由于我们将使用RabbitMQ
,添加以下Maven
依赖项:
<dependency> <groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
或者,我们也可以将两者结合起来使用spring-cloud-starter-stream-rabbit
:
<dependency> <groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
使用同样的方法,创建消费者项目,但仅使用spring-cloud-starter-stream-rabbit
依赖项。
4. 创建生产者
如前所述,将消息从发布者传递到队列的整个过程是通过通道完成的。因此,让我们创建一个HelloBinding
接口,其中包含我们的消息机制greetingChannel
:
interface HelloBinding {@Output("greetingChannel")MessageChannel greeting();
}
因为这将发布消息,所以我们使用@Output
注解。方法名可以是我们想要的任意名称,当然,我们可以在一个接口中有多个Channel
(通道)。
现在,让我们创建一个REST
,它将消息推送到这个Channel
(通道)
@RestController
public class ProducerController {private MessageChannel greet;public ProducerController(HelloBinding binding) {greet = binding.greeting();}@GetMapping("/greet/{name}")public void publish(@PathVariable String name) {String greeting = "Hello, " + name + "!";Message<String> msg = MessageBuilder.withPayload(greeting).build();this.greet.send(msg);}
}
上面,我们创建了一个ProducerController
类,它有一个MessageChannel
类型的属性 greet
。这是通过我们前面声明的方法在构造函数中初始化的。
注意: 我们可以用简洁的方式做同样的事情,但是我们使用不同的名称来让您更清楚地了解事物是如何连接的。
然后,我们有一个简单的REST
接口,它接收PathVariable
的name
,并使用MessageBuilder
创建一个String
类型的消息。最后,我们使用MessageChannel
上的.send()
方法来发布消息。
现在,我们将在的主类中添加@EnableBinding
注解,传入HelloBinding
告诉Spring
加载。
@EnableBinding(HelloBinding.class)
@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}
最后,我们必须告诉Spring
如何连接到RabbitMQ
(通过前面的AMQP URL
),并将greetingChannel
连接到一可用的消费者。
这两个都是在application.properties
中定义的:
spring.rabbitmq.addresses=<amqp url>spring.cloud.stream.bindings.greetingChannel.destination = greetingsserver.port=8080
5. 创建消费者
现在,我们需要监听之前创建的通道greetingChannel
。让我们为它创建一个绑定:
public interface HelloBinding {String GREETING = "greetingChannel";@Input(GREETING)SubscribableChannel greeting();
}
与生产者绑定的两个非常明显区别。因为我们正在消费消息,所以我们使用SubscribableChannel
和@Input
注解连接到greetingChannel
,消息数据将被推送这里。
现在,让我们创建处理数据的方法:
@EnableBinding(HelloBinding.class)
public class HelloListener {@StreamListener(target = HelloBinding.GREETING)public void processHelloChannelGreeting(String msg) {System.out.println(msg);}
}
在这里,我们创建了一个HelloListener
类,在processHelloChannelGreeting
方法上添加@StreamListener
注解。这个方法需要一个字符串作为参数,我们刚刚在控制台打印了这个参数。我们还在类添加@EnableBinding
启用了HelloBinding
。
同样,我们在这里使用@EnableBinding
,而不是主类,以便告诉我们如何使用。
看看我们的主类,我们没有任何修改:
@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}
在application.properties
配置文件中,我们需要定义与生产者一样的属性,除了修改端口之外
spring.rabbitmq.addresses=<amqp url>
spring.cloud.stream.bindings.greetingChannel.destination=greetings
server.port=9090
6. 全部测试
让我们同时启动生产者和消费者服务。首先,让我们通过点击端点http://localhost:8080/greet/john
来生产消息。
在消费者日志中看到消息内容:
我们使用以下命令启动另一个消费者服务实例(在另一个端口(9091)上):
$ mvn spring-boot:run -Dserver.port=9091
现在,当我们点击生产者REST
端点生产消息时,我们看到两个消费者都收到了消息:
这可能是我们在一些用例中想要的。但是,如果我们只想让一个消费者消费一条消息呢?为此,我们需要在application.properties
中创建一个消费者组。消费者的配置文件:
spring.cloud.stream.bindings.greetingChannel.group = greetings-group
现在,再次在不同的端口上运行消费者的2个实例,并通过生产者生产消息再次查看:
这一切也可以在RabbitMQ
管理器控制台看到:
7. 结论
在本文中,我们解释了消息传递的主要概念、它在微服务中的角色以及如何使用Spring Cloud Stream
实现它。我们使用RabbitMQ
作为消息代理,但是我们也可以使用其他流行的代理,比如Kafka
,只需更改配置和依赖项。
与往常一样,本文使用的示例代码可以在GitHub获得完整的源代码。
原文:https://stackabuse.com/spring-cloud-stream-with-rabbitmq-message-driven-microservices/
作者:Dhananjay Singh
译者:李东
消息驱动式微服务:Spring Cloud Stream RabbitMQ相关推荐
- 消息驱动 微服务器,消息驱动的微服务-Spring Cloud Stream整合RocketMQ
系列文章导航: Spring Cloud Alibaba微服务解决方案 常用MQ产品的选择 目前主流的MQ产品有kafka.RabbitMQ.ActiveMQ.RocketMQ等.在MQ选型时可以参照 ...
- 分布式微服务Spring Cloud
Spring Cloud 微服务概念 版本选择 创建工程 创建父工程 创建子模块 yml配置文件 启动类 创建数据库和表 编写实体类和DAO层 编写业务层和控制层 热部署Devtools 服务间的调用 ...
- java版b2b2c社交电商分布式微服务-Spring Cloud Netflix
该项目通过自动配置为Spring Boot应用程序提供Netflix OSS集成,并绑定到Spring环境和其他Spring编程模型成语.通过几个简单的注释,您可以快速启用和配置应用程序中的常见模式, ...
- Spring Cloud Streams Messaging消息驱动微服务实践
作者:禅与计算机程序设计艺术 1.简介 消息驱动微服务是一个新的分布式架构模式,它基于异步通信和事件驱动的消息传递机制,通过轻量级的消息代理与集成框架实现分布式系统的解耦合.弹性伸缩和可靠性保证.Sp ...
- 【本人秃顶程序员】使用Spring Cloud Stream和RabbitMQ实现事件驱动的微服务
←←←←←←←←←←←← 快!点关注 让我们展示如何使用Spring Cloud Stream来设计事件驱动的微服务.首先,Spring Cloud Stream首先有什么好处?因为Spring AM ...
- 介绍Spring Cloud Stream与RabbitMQ集成
一. 首先安装rabbitmq-management 这里用的是rabbitmq的docker镜像,我们可以在Docker Hub中搜索rabbitmq, 找到最新的版本安装 sudo docker ...
- 谷歌gcp 远程计算机_引导性GCP:带有Google Cloud Pub / Sub的Spring Cloud Stream
谷歌gcp 远程计算机 我最近在Sprint Central的工程博客上阅读了Josh Long的Bootiful GCP系列 ,特别喜欢关于使用Google Cloud的Pub / Sub的第四部分 ...
- Spring Cloud Stream 学习小清单
由于最近一直在写Spring Cloud Stream相关的内容,在2018年最后一天,把之前写过的Spring Cloud Stream内容从基础,到入门,到深入,做一些小清单,方便大家查阅. 如果 ...
- Spring Cloud Stream Binder 实现
Spring Cloud Stream Binder 实现 JMS 实现 ActiveMQ 1.增加Maven依赖 <!-- 整合 Sprig Boot Starter ActiveMQ --& ...
- Spring Cloud【Finchley】- 21 Spring Cloud Stream 构建消息驱动微服务
文章目录 概述 添加依赖 配置文件配置RabbitMQ的地址信息 接口定义 接收方 @EnableBinding @StreamListener 测试 消费组 发送复杂对象 消息回执 代码 概述 官网 ...
最新文章
- [BZOJ2653]middle
- 【Ubuntu】使用过的ubuntu工具记录
- python零基础怎么学-零基础python入门分析,如何做到一个月学会(深思极恐)
- NSIS 的 Modern UI 教程(二)
- 我看TechEd 2012之技术热点
- windows 连接Linux
- Spring AbstractBeanFactory
- 响应式布局(手机端)
- 【C语言进阶深度学习记录】二十九 main函数与命令行参数
- 王者荣耀女性机器人面世;深圳中院受理金立破产案;Firefox 64 发布 | 极客头条...
- 玩转Excel系列-SUMIF函数实例教程
- Python类、模块、包的概念及区别
- IBM Mainframe 基础知识学习 (EBCDIC,dsp3270)
- (转)switch与ifelse的效率问题 .
- android 调出键盘表情_Android--如何优雅的切换表情和键盘(原理)
- 汽车电子中的3225贴片晶振
- STL inserter
- java poi 设置标题_java POI操作word2010简单实现多级标题结构
- wav格式怎么转换成mp3
- Python3操作EXCEL,取汉字首字母,拼接全拼