←←←←←←←←←←←← 快!点关注

让我们展示如何使用Spring Cloud Stream来设计事件驱动的微服务。首先,Spring Cloud Stream首先有什么好处?因为Spring AMPQ提供了访问AMPQ工件所需的一切。如果您不熟悉Spring AMPQ,请查看此repo,其中包含许多有用的示例。那么为什么要使用Spring Cloud Stream ......?

Spring Cloud Stream概念

  • Spring Cloud Stream通过Binder概念将使用过的消息代理与Spring Integration消息通道绑定在一起。支持RabbitMQ和Kafka。
  • Spring Cloud Stream将基础架构配置从代码中分离为属性文件。这意味着即使您更改了底层代理,您的Spring Integration代码也将是相同的!

示例中的Spring Cloud Stream概念(RabbitMQ)

让我们有一个名为streamInput的交换,它有两个队列streamInput.cities和streamInput.persons。现在让我们将这两个队列插入两个消息通道citiesChannel和personsChannel来消费来自它的传入消息。使用Spring AMPQ,您需要创建SimpleMessageListenerContainer并在代码中连接基础结构。但这有很多样板代码。使用Spring Cloud Stream,您可以将AMPQ配置分离到属性文件:

spring.cloud.stream.bindings.citiesChannel.destination=streamInput
spring.cloud.stream.bindings.citiesChannel.group=cities
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.durableSubscription=true
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.bindingRoutingKey=citiesspring.cloud.stream.bindings.personsChannel.destination=streamInput
spring.cloud.stream.bindings.personsChannel.group=persons
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.durableSubscription=true
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.bindingRoutingKey=persons
复制代码

配置详细信息

在类路径上使用RabbitMQ Binder,每个目标都映射到TopicExchange。在示例中,我创建了名为streamInput的TopicExchange, 并将其附加到两个消息通道citiesChannel和personsChannel。

spring.cloud.stream.bindings.citiesChannel.destination = streamInput
spring.cloud.stream.bindings.personsChannel.destination = streamInput
复制代码

现在您需要了解RabbitMQ绑定器的灵感来自Kafka,队列的消费者被分组到消费者组中,其中只有一个消费者将获得消息。这是有道理的,因为您可以轻松扩展消费者。

因此,让我们创建两个队列streamInput.persons和streamInput.cities并将它们附加到streamInput TopicExchange和提到的消息通道

# This will create queue "streamInput.cities" connected to message channel citiesChannel where input messages will land.
spring.cloud.stream.bindings.citiesChannel.group=cities # Durable subscription, of course.
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.durableSubscription=true # AMPQ binding to exchange (previous spring.cloud.stream.bindings.<channel name>.destination settings).
# Only messages with routingKey = 'cities' will land here.
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.bindingRoutingKey=cities spring.cloud.stream.bindings.personsChannel.group=persons
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.durableSubscription=true
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.bindingRoutingKey=persons
复制代码

连接属性到Spring Integration

好的,到目前为止我创建了两个队列。StreamInput.cities绑定到citiesChannel。StreamInput.persons绑定到peopleChannel。

.是Spring Cloud Stream约定的队列命名,现在让我们将它连接到Spring Integration:

package com.example.spring.cloud.configuration;import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;/*** Created by tomask79 on 30.03.17.*/
public interface SinkRabbitAPI {String INPUT_CITIES = "citiesChannel";String INPUT_PERSONS = "personsChannel";@Input(SinkRabbitAPI.INPUT_CITIES)SubscribableChannel citiesChannel();@Input(SinkRabbitAPI.INPUT_PERSONS)SubscribableChannel personsChannel();
}
复制代码

Spring Boot启动时加载这个属性

package com.example.spring.cloud;import com.example.spring.cloud.configuration.SinkRabbitAPI;
import com.example.spring.cloud.configuration.SourceRabbitAPI;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication
@EnableBinding({SinkRabbitAPI.class})
public class StreamingApplication {public static void main(String[] args) {SpringApplication.run(StreamingApplication.class, args);}
}
复制代码

在此之后,我们可以创建消费者从绑定的消息通道中的队列接收消息:

import com.example.spring.cloud.configuration.SinkRabbitAPI;
import com.example.spring.cloud.configuration.SourceRabbitAPI;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Service;import javax.annotation.Resource;/*** Created by tomask79 on 30.03.17.*/
@Service
public class ProcessingAMPQEndpoint {@StreamListener(SinkRabbitAPI.INPUT_CITIES)public void processCity(final String city) {System.out.println("Trying to process input city: "+city);}@StreamListener(SinkRabbitAPI.INPUT_PERSONS)public void processPersons(final String person) {System.out.println("Trying to process input person: "+person);}
}
复制代码

RabbitMQ绑定器和代理配置

Spring Cloud Stream如何知道在哪里寻找消息中间件?如果在类路径中找到RabbitMQ绑定器,则使用默认RabbitMQ主机(localhost)和端口(5672)连接到RabbitMQ服务器。如果您的消息中间件配置在不同端口,则需要配置属性:

spring:cloud:stream:bindings:...binders:rabbitbinder:type: rabbitenvironment:spring:rabbitmq:host: rabbitmqport: 5672username: XXXpassword: XXX
复制代码

测试消息消费

  • 安装并运行RabbitMQ代理
  • mvn clean install
  • java -jar target / streaming-0.0.1-SNAPSHOT.jar
  • 现在使用路由键'cities'或'persons'在streamInput Exchange上发布消息...输出应该是:
Started StreamingApplication in 6.513 seconds (JVM running for 6.92)
Trying to process input city: sdjfjljksdflkjsdflkjsdfsfd
Trying to process input person: sdjfjljksdflkjsdflkjsdfsfd
复制代码

使用Spring Cloud Stream重新传递消息

您通常希望在进入DLX交换之前再次尝试接收消息。首先,让我们配置Spring Cloud Stream尝试重新发送失败消息的次数:

spring.cloud.stream.bindings.personsChannel.consumer.maxAttempts = 6
复制代码

这意味着如果从streamInput.persons队列接收的消息出错,那么Spring Cloud Stream将尝试重新发送六次。让我们试试,首先让我们修改接收端点以模拟接收崩溃:

 @StreamListener(SinkRabbitAPI.INPUT_PERSONS)public void processPersons(final String person) {System.out.println("Trying to process input person: "+person);throw new RuntimeException();}
复制代码

如果我现在尝试使用人员路由键将某些内容发布到streamInput交换中,那么这应该是输出:

Trying to process input person: sfsdfsdfsd
Trying to process input person: sfsdfsdfsd
Trying to process input person: sfsdfsdfsd
Trying to process input person: sfsdfsdfsd
Trying to process input person: sfsdfsdfsd
Trying to process input person: sfsdfsdfsdRetry Policy Exhaustedat org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover
(RejectAndDontRequeueRecoverer.java:45) ~[spring-rabbit-1.7.0.RELEASE.jar! /:na]at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterc
复制代码

建议将Spring Cloud Stream 用于事件驱动的MicroServices,因为它可以节省时间,而且您不需要为Java中的AMPQ基础架构编写样板代码。

欢迎大家加入粉丝群:963944895,群内免费分享Spring框架、Mybatis框架SpringBoot框架、SpringMVC框架、SpringCloud微服务、Dubbo框架、Redis缓存、RabbitMq消息、JVM调优、Tomcat容器、MySQL数据库教学视频及架构学习思维导图

写在最后:

秃顶程序员的不易,看到这里,点了关注吧! 点关注,不迷路,持续更新!!!

转载于:https://juejin.im/post/5c50131151882525812512aa

【本人秃顶程序员】使用Spring Cloud Stream和RabbitMQ实现事件驱动的微服务相关推荐

  1. 【本人秃顶程序员】中小型互联网公司微服务实践-经验和教训

    ←←←←←←←←←←←← 我都秃顶了,还不点关注! 在开始之前我们先介绍一下几个概念,什么是微服务,它的特点是什么? Spring Boot/Cloud都做了那些事情?他们三者之间又有什么联系? 技术 ...

  2. java程序员秃顶,【本人秃顶程序员】使用Azure Function + Cognitive Services 实现图片自动化审核...

    ←←←←←←←←←←←← 快!点关注 假定我们正在运行某个应用程序,此应用程序需要用户在应用程序中提交大量图片文件,那么对于系统管理员来说手动审核这些图片是很消耗时间的,并且对于图片的审核也许并不是即 ...

  3. 【本人秃顶程序员】美女程序员观点:程序员最重要的非编程技巧

    ←←←←←←←←←←←← 快!点关注 这是来自一位美女程序员Ali Spittel的观点,至少可以看看美女喜欢和怎样的男程序猿打交道: 当我想与我希望的程序员合作时,我更多地考虑非技术技能,而不是技术 ...

  4. 【本人秃顶程序员】图解分布式架构的演进过程

    ←←←←←←←←←←←← 快!点关注!!! 一.什么是分布式架构 分布式系统(distributed system) 是建立在网络之上的软件系统. 内聚性: 是指每一个数据库分布节点高度自治,有本地的 ...

  5. 【本人秃顶程序员】Java面试题集(意思意思)

    ←←←←←←←←←←←← 我都秃顶了,还不点关注! 一.什么是Spring MVC ?简单介绍下你对springMVC的理解? Spring MVC是一个基于MVC架构的 用来简化web应用程序开发的 ...

  6. 【本人秃顶程序员】分享一些数据结构与算法常用的算法技巧总结

    今天和大家讲讲,在做算法题时常用的一些技巧.对于平时没用过这些技巧的人,或许你可以考虑试着去看看在实践中能否用的上这些技巧来优化问题的解. 一.巧用数组下标 数组的下标是一个隐含的很有用的数组,特别是 ...

  7. 【本人秃顶程序员】高级 Java 必须突破的 10 个知识点!

    ←←←←←←←←←←←← 快!点关注!!! 工作多少年了,还在传统公司写if / for 等简单的代码?那你就真的要被社会淘汰了,工作多年其实你与初级工程师又有多少区别呢?那么作为一个高级Java攻城 ...

  8. 【本人秃顶程序员】Redis 这么火,它都解决了哪些问题?

    ←←←←←←←←←←←← 快!点关注 先看一下Redis是一个什么东西.官方简介解释到: Redis是一个基于BSD开源的项目,是一个把结构化的数据放在内存中的一个存储系统,你可以把它作为数据库,缓存 ...

  9. 【本人秃顶程序员】求求你别再写 bug 了,秃顶程序员告诉你避免空指针的 5 个案例!

    ←←←←←←←←←←←← 快!点关注 空指针是我们 Java 开发人员经常遇到的一个基本异常,这是一个极其普遍但似乎又无法根治的问题. 本文,栈长将带你了解什么是空指针,还有如何有效的避免空指针. 什 ...

最新文章

  1. MyBatis实现与插件开发
  2. 树莓派发布全新计算模块CM3,性能提升10倍
  3. 定义python的色条_Python:定义颜色曲线部分
  4. 7、MySQL默认值(DEFAULT)
  5. 使用WampServer搭建本地PHP环境,绑定域名,配置伪静态
  6. reportviewer控件mysql_reportviewer控件下载
  7. JSP和HTML中实现字符串换行
  8. Bugtags 2016-06-16 更新内容
  9. Scrapy网络爬虫框架实战[以腾讯新闻网为例]
  10. python检查字典中是否已存在给定键
  11. C语言:替换字符串中某一段子字符串
  12. 软件是怎样控制硬件的?
  13. spring注解记录
  14. 博士申请 | 美国匹兹堡大学高伟教授招收Mobile AI方向全奖博士生
  15. 开源网络情报系统释放数据黄金价值
  16. 鞍点(saddle point)
  17. 关于电池和充电与大家谈
  18. Obsidian+SyncTrayzor打造个人文档云同步平台
  19. JAVA学习6-集合工具类、流
  20. Python基础三、2、list列表练习题 引用随机数

热门文章

  1. 深度强化学习在智能城市领域应用介绍
  2. 机器学习2021 | 机器学习算法如何商业落地?
  3. 重磅!“全脑介观神经联接图谱”大科学计划中国工作组成立!
  4. 国产首款脑机编解码集成芯片发布
  5. 美国重夺超算“头把交椅”,专家建议中国加快E级超算研制
  6. 中国学者用人工光感受器助失明小鼠复明
  7. 华为正式发布5G商用芯片、5G终端!
  8. 微软为什么要公开AI系统测试数据集和度量指标?
  9. 10 年 IT 老兵给新人程序员的几点建议
  10. 面试官钟爱的 8 个问题,这样答才能拿高薪 Offer!