1. 概述

在本文中,我们将向您介绍Spring Cloud Stream,这是一个用于构建消息驱动的微服务应用程序的框架,这些应用程序由一个常见的消息传递代理(如RabbitMQApache Kafka等)连接。

Spring Cloud Stream构建在现有Spring框架(如Spring MessagingSpring Integration)之上。尽管这些框架经过了实战测试,工作得非常好,但是实现与使用的message broker紧密耦合。此外,有时对某些用例进行扩展是困难的。

Spring Cloud Stream背后的想法是一个非常典型的Spring Boot概念——抽象地讲,让Spring根据配置和依赖关系管理在运行时找出实现自动注入。这意味着您可以通过更改依赖项和配置文件来更改message broker。可以在这里找到目前已经支持的各种消息代理。

本文将使用RabbitMQ作为message broker。在此之前,让我们了解一下broker(代理)的一些基本概念,以及为什么要在面向微服务的体系架构中需要它。

2. 微服务中的消息

在微服务体系架构中,我们有许多相互通信以完成请求的小型应用程序—它们的主要优点之一是改进了的可伸缩性。一个请求从多个下游微服务传递到完成是很常见的。例如,假设我们有一个Service-A内部调用Service-BService-C来完成一个请求:
[外链图片转存失败(img-jzvHHRXw-1562549429195)(https://user-gold-cdn.xitu.io/2019/7/7/16bccd47c4051b28?w=511&h=347&f=png&s=11713)]

是的,还会有其他组件,比如Spring Cloud EurekaSpring Cloud Zuul等等,但我们还是专注关心这类架构的特有问题。

假设由于某种原因Service-B需要更多的时间来响应。也许它正在执行I/O操作或长时间的DB事务,或者进一步调用其它导致Service-B变得更慢的服务,这些都使其无法更具效率。

现在,我们可以启动更多的Service-B实例来解决这个问题,这样很好,但是Service-A实际上是响应很快的,它需要等待Service-B的响应来进一步处理。这将导致Service-A无法接收更多的请求,这意味着我们还必须启动Service-A的多个实例。

另一种方法解决类似情况的是使用事件驱动的微服务体系架构。这基本上意味着Service-A不直接通过HTTP调用Service-BService-C,而是将请求或事件发布给message broker(消息代理)。Service-BService-C将成为message broker(消息代理)上此事件的订阅者。

与依赖HTTP调用的传统微服务体系架构相比,这有许多优点:

  • 提高可伸缩性和可靠性——现在我们知道哪些服务是整个应用程序中的真正瓶颈。
  • 鼓励松散耦合——Service-A不需要了解Service-BService-C。它只需要连接到message broker并发布事件。事件如何进一步编排取决于代理设置。通过这种方式,Service-A可以独立地运行,这是微服务的核心概念之一。
  • 与遗留系统交互——通常我们不能将所有东西都移动到一个新的技术堆栈中。我们仍然必须使用遗留系统,虽然速度很慢,但是很可靠。

3. RabbitMQ

高级消息队列协议(AMQP)RabbitMQ用于消息传递的协议。虽然RabbitMQ支持其他一些协议,但是AMQP由于兼容性和它提供的大量特性而更受欢迎。

3.1 RabbitMQ架构设计

因此发布者将消息发布到RabbitMQ中称为Exchange(交换器)。Exchange(交换器)接收消息并将其路由到一个或多个Queues(队列)。路由算法依赖于Exchange(交换器)类型和routing(路由)key/header(与消息一起传递)。将Exchange(交换器)连接到Queues(队列)的这些规则称为bindings(绑定)。

绑定可以有4种类型:

  • Direct: 它根据routing key(路由键)将Exchange(交换器)类型直接路由到特定的Queues(队列)。
  • Fanout:它将消息路由到绑定Exchange(交换器)中的所有Queues(队列)。
  • Topic:它根据完全匹配或部分据routing key(路由键)匹配将消息路由到(0、1或更多)的Queues(队列)。
  • Headers:它类似于Topic(主题)交换类型,但是它是基routing header(路由头)而不是routing key(路由键)来路由的。

来源: https://www.cloudamqp.com/

通过Exchange(交换器)和Queues(队列)发布和消费消息的整个过程是通过一个Channel(通道)完成的。

有关路由的详细信息,请访问此链接。

3.2 RabbitMQ 设置

3.2.1 安装

我们可以从这里下载并安装基于我们的操作系统的二进制文件。

然而,在本文中,我们将使用cloudamqp.com提供的免费云安装。只需注册服务并登录即可。

在主仪表板上单击创建新实例:

然后给你的实例起个名字,然后进入下一步:

然后选择一个可用区:

最后,查看实例信息,点击右下角的创建实例:

就是这样。现在在云中运行了一个RabbitMQ实例。有关实例的更多信息,请转到您的仪表板并单击新创建的实例:

我们可以看到我们可以访问RabbitMQ实例的主机,比如从我们的项目连接所需的用户名和密码:

我们将在Spring应用程序中使用AMQP URL连接到这个实例,所以请在某个地方记下它。

您还可以通过单击左上角的RabbitMQ manager来查看管理器控制台。这将采用它来管理的您的RabbitMQ实例。

Project 配置

现在我们的设置已经准备好了,让我们创建我们的服务:

  • cloud-stream-producer-rabbitmq: 作为一个发布者,将消息推送到RabbitMQ
  • cloud-stream-consumer-rabbitmq: 消费者消费消息

使用Spring Initializr创建一个脚手架项目。这将是我们的producer项目,我们将使用REST端点发布消息。

选择您喜欢的Spring Boot版本,添加WebCloud Stream依赖项,生成Maven项目:

注意:

请注意cloud-stream依赖项。这也需要像RabbitMQKafka等绑定器依赖项才能工作。

由于我们将使用RabbitMQ,添加以下Maven依赖项:

<dependency>  <groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

或者,我们也可以将两者结合起来使用spring-cloud-starter-stream-rabbit:

<dependency>  <groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

使用同样的方法,创建消费者项目,但仅使用spring-cloud-starter-stream-rabbit依赖项。

4. 创建生产者

如前所述,将消息从发布者传递到队列的整个过程是通过通道完成的。因此,让我们创建一个HelloBinding接口,其中包含我们的消息机制greetingChannel:

interface HelloBinding {@Output("greetingChannel")MessageChannel greeting();
}

因为这将发布消息,所以我们使用@Output注解。方法名可以是我们想要的任意名称,当然,我们可以在一个接口中有多个Channel(通道)。

现在,让我们创建一个REST,它将消息推送到这个Channel(通道)

@RestController
public class ProducerController {private MessageChannel greet;public ProducerController(HelloBinding binding) {greet = binding.greeting();}@GetMapping("/greet/{name}")public void publish(@PathVariable String name) {String greeting = "Hello, " + name + "!";Message<String> msg = MessageBuilder.withPayload(greeting).build();this.greet.send(msg);}
}

上面,我们创建了一个ProducerController类,它有一个MessageChannel类型的属性 greet。这是通过我们前面声明的方法在构造函数中初始化的。

注意: 我们可以用简洁的方式做同样的事情,但是我们使用不同的名称来让您更清楚地了解事物是如何连接的。

然后,我们有一个简单的REST接口,它接收PathVariablename,并使用MessageBuilder创建一个String类型的消息。最后,我们使用MessageChannel上的.send()方法来发布消息。

现在,我们将在的主类中添加@EnableBinding注解,传入HelloBinding告诉Spring加载。

@EnableBinding(HelloBinding.class)
@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}

最后,我们必须告诉Spring如何连接到RabbitMQ(通过前面的AMQP URL),并将greetingChannel连接到一可用的消费者。

这两个都是在application.properties中定义的:

spring.rabbitmq.addresses=<amqp url>spring.cloud.stream.bindings.greetingChannel.destination = greetingsserver.port=8080

5. 创建消费者

现在,我们需要监听之前创建的通道greetingChannel。让我们为它创建一个绑定:

public interface HelloBinding {String GREETING = "greetingChannel";@Input(GREETING)SubscribableChannel greeting();
}

与生产者绑定的两个非常明显区别。因为我们正在消费消息,所以我们使用SubscribableChannel@Input注解连接到greetingChannel,消息数据将被推送这里。

现在,让我们创建处理数据的方法:

@EnableBinding(HelloBinding.class)
public class HelloListener {@StreamListener(target = HelloBinding.GREETING)public void processHelloChannelGreeting(String msg) {System.out.println(msg);}
}

在这里,我们创建了一个HelloListener类,在processHelloChannelGreeting方法上添加@StreamListener注解。这个方法需要一个字符串作为参数,我们刚刚在控制台打印了这个参数。我们还在类添加@EnableBinding启用了HelloBinding

同样,我们在这里使用@EnableBinding,而不是主类,以便告诉我们如何使用。

看看我们的主类,我们没有任何修改:

@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}
}

application.properties配置文件中,我们需要定义与生产者一样的属性,除了修改端口之外

spring.rabbitmq.addresses=<amqp url>
spring.cloud.stream.bindings.greetingChannel.destination=greetings
server.port=9090

6. 全部测试

让我们同时启动生产者和消费者服务。首先,让我们通过点击端点http://localhost:8080/greet/john来生产消息。

在消费者日志中看到消息内容:

我们使用以下命令启动另一个消费者服务实例(在另一个端口(9091)上):

$ mvn spring-boot:run -Dserver.port=9091

现在,当我们点击生产者REST端点生产消息时,我们看到两个消费者都收到了消息:

这可能是我们在一些用例中想要的。但是,如果我们只想让一个消费者消费一条消息呢?为此,我们需要在application.properties中创建一个消费者组。消费者的配置文件:

spring.cloud.stream.bindings.greetingChannel.group = greetings-group

现在,再次在不同的端口上运行消费者的2个实例,并通过生产者生产消息再次查看:

这一切也可以在RabbitMQ管理器控制台看到:

7. 结论

在本文中,我们解释了消息传递的主要概念、它在微服务中的角色以及如何使用Spring Cloud Stream实现它。我们使用RabbitMQ作为消息代理,但是我们也可以使用其他流行的代理,比如Kafka,只需更改配置和依赖项。

与往常一样,本文使用的示例代码可以在GitHub获得完整的源代码。

原文:https://stackabuse.com/spring-cloud-stream-with-rabbitmq-message-driven-microservices/

作者:Dhananjay Singh

译者:李东

消息驱动式微服务:Spring Cloud Stream RabbitMQ相关推荐

  1. 消息驱动 微服务器,消息驱动的微服务-Spring Cloud Stream整合RocketMQ

    系列文章导航: Spring Cloud Alibaba微服务解决方案 常用MQ产品的选择 目前主流的MQ产品有kafka.RabbitMQ.ActiveMQ.RocketMQ等.在MQ选型时可以参照 ...

  2. 分布式微服务Spring Cloud

    Spring Cloud 微服务概念 版本选择 创建工程 创建父工程 创建子模块 yml配置文件 启动类 创建数据库和表 编写实体类和DAO层 编写业务层和控制层 热部署Devtools 服务间的调用 ...

  3. java版b2b2c社交电商分布式微服务-Spring Cloud Netflix

    该项目通过自动配置为Spring Boot应用程序提供Netflix OSS集成,并绑定到Spring环境和其他Spring编程模型成语.通过几个简单的注释,您可以快速启用和配置应用程序中的常见模式, ...

  4. Spring Cloud Streams Messaging消息驱动微服务实践

    作者:禅与计算机程序设计艺术 1.简介 消息驱动微服务是一个新的分布式架构模式,它基于异步通信和事件驱动的消息传递机制,通过轻量级的消息代理与集成框架实现分布式系统的解耦合.弹性伸缩和可靠性保证.Sp ...

  5. 【本人秃顶程序员】使用Spring Cloud Stream和RabbitMQ实现事件驱动的微服务

    ←←←←←←←←←←←← 快!点关注 让我们展示如何使用Spring Cloud Stream来设计事件驱动的微服务.首先,Spring Cloud Stream首先有什么好处?因为Spring AM ...

  6. 介绍Spring Cloud Stream与RabbitMQ集成

    一. 首先安装rabbitmq-management 这里用的是rabbitmq的docker镜像,我们可以在Docker Hub中搜索rabbitmq, 找到最新的版本安装 sudo docker ...

  7. 谷歌gcp 远程计算机_引导性GCP:带有Google Cloud Pub / Sub的Spring Cloud Stream

    谷歌gcp 远程计算机 我最近在Sprint Central的工程博客上阅读了Josh Long的Bootiful GCP系列 ,特别喜欢关于使用Google Cloud的Pub / Sub的第四部分 ...

  8. Spring Cloud Stream 学习小清单

    由于最近一直在写Spring Cloud Stream相关的内容,在2018年最后一天,把之前写过的Spring Cloud Stream内容从基础,到入门,到深入,做一些小清单,方便大家查阅. 如果 ...

  9. Spring Cloud Stream Binder 实现

    Spring Cloud Stream Binder 实现 JMS 实现 ActiveMQ 1.增加Maven依赖 <!-- 整合 Sprig Boot Starter ActiveMQ --& ...

  10. Spring Cloud【Finchley】- 21 Spring Cloud Stream 构建消息驱动微服务

    文章目录 概述 添加依赖 配置文件配置RabbitMQ的地址信息 接口定义 接收方 @EnableBinding @StreamListener 测试 消费组 发送复杂对象 消息回执 代码 概述 官网 ...

最新文章

  1. [BZOJ2653]middle
  2. 【Ubuntu】使用过的ubuntu工具记录
  3. python零基础怎么学-零基础python入门分析,如何做到一个月学会(深思极恐)
  4. NSIS 的 Modern UI 教程(二)
  5. 我看TechEd 2012之技术热点
  6. windows 连接Linux
  7. Spring AbstractBeanFactory
  8. 响应式布局(手机端)
  9. 【C语言进阶深度学习记录】二十九 main函数与命令行参数
  10. 王者荣耀女性机器人面世;深圳中院受理金立破产案;Firefox 64 发布 | 极客头条...
  11. 玩转Excel系列-SUMIF函数实例教程
  12. Python类、模块、包的概念及区别
  13. IBM Mainframe 基础知识学习 (EBCDIC,dsp3270)
  14. (转)switch与ifelse的效率问题 .
  15. android 调出键盘表情_Android--如何优雅的切换表情和键盘(原理)
  16. 汽车电子中的3225贴片晶振
  17. STL inserter
  18. java poi 设置标题_java POI操作word2010简单实现多级标题结构
  19. wav格式怎么转换成mp3
  20. Python3操作EXCEL,取汉字首字母,拼接全拼

热门文章

  1. mysql 8.0.11 Windows安装
  2. wireshark的拆包与合并
  3. tcpcopy,模拟在线MySQL压力测试的好帮手
  4. ASP.NET 表单认证与角色授权
  5. 引言(NParsing框架功能简介、NParsing的由来)
  6. 浅谈URL生成方式的演变
  7. /proc/meminfo 文件
  8. CFS调度主要代码分析一
  9. SimpleFs文件系统初步一(编译并挂载)
  10. mmap 系统调用 的使用