我最近参与了一个项目,在该项目中,我不得不有效地处理通过AWS SQS Queue流入的大量消息。 在这篇文章(可能还有一篇)中,我将介绍使用出色的Project Reactor处理消息的方法。

以下是我要进行的设置:


设置本地AWS环境

在我进入代码之前,让我先做一些准备。 首先,如何获得SNS和SQS的本地版本。 最简单的方法之一是使用localstack 。 我使用这里描述的docker-compose版本

我将使用的第二个实用程序是AWS CLI。 该网站包含有关如何在本地安装的详细信息。

一旦这两个实用程序都到位,快速测试应验证设置:

 # Create a queue  aws --endpoint http: //localhost:4576 sqs create-queue --queue-name test-queue  # Send a sample message  aws --endpoint http: //localhost:4576 sqs send-message --queue-url http://localhost:4576/queue/test-queue --message-body "Hello world"  # Receive the message  aws --endpoint http: //localhost:4576 sqs receive-message --queue-url http://localhost:4576/queue/test-queue 

项目反应堆的基础

Project Reactor实现了Reactive Streams规范,并提供了一种跨异步边界处理数据流的方法,该方法尊重背压。 这里有很多词,但本质上是这样想的:

1. SQS产生数据 2.应用程序将使用它并将其作为数据流进行处理 3.应用程序应以可持续的速度使用数据–不应输入太多数据。这正式称为 “背压”

AWS开发工具包2

我将用于消耗AWS SQS数据的库是
AWS开发工具包2 。 该库在幕后使用了非阻塞IO。

该库提供了拨打电话的同步版本以及异步版本。 考虑从SQS队列中获取记录的同步方式:

 import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest  import software.amazon.awssdk.services.sqs.SqsClient  val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build()  val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() 

在这里,“ software.amazon.awssdk.services.sqs.SqsClient”用于查询sqs和同步检索一批结果。 另一方面,异步结果如下所示:

 val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build()  val messages: CompletableFuture<List<Message>> = sqsAsyncClient .receiveMessage(receiveMessageRequest) .thenApply { result -> result.messages() } 

现在,输出为“ CompletableFuture”

无限循环,无背压

我最初创建消息流( Flux )的尝试非常简单–一个无限循环,它轮询AWS sqs并使用“ Flux.create”运算符从中创建Flux ,方法是:

 fun listen(): Flux<Pair<String, () -> Unit>> { return Flux.create { sink: FluxSink<List<Message>> -> while (running) { try { val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() LOGGER.info( "Received: $messages" ) sink.next(messages) } catch (e: InterruptedException) { LOGGER.error(e.message, e) } catch (e: Exception) { LOGGER.error(e.message, e) } } } .flatMapIterable(Function.identity()) .doOnError { t: Throwable -> LOGGER.error(t.message, t) } .retry() .map { snsMessage: Message -> val snsMessageBody: String = snsMessage.body() val snsNotification: SnsNotification = readSnsNotification(snsMessageBody) snsNotification.message to { deleteQueueMessage(snsMessage.receiptHandle(), queueUrl) } }  } 

它的工作方式是存在一个无限循环,该循环使用long-polling检查新消息。 消息可能并非在每次轮询时都可用,在这种情况下,会将空列表添加到流中。

然后,使用“ flatMapIterable”运算符将此列表中的最多5条消息映射到单个消息流,并通过从SNS包装器中提取消息来进一步映射(当消息从SNS转发到SQS时,SNS将包装器添加到消息),并在消息成功处理后删除消息的方法(deleteHandle)作为对返回。

这种方法可以很好地工作……但是,请想象一下有大量消息进入的情况,因为循环并没有真正意识到下游的吞吐量,它将继续将数据泵送到流中。 中间操作员的默认行为是根据最终使用者使用数据的方式来缓冲流入的数据。 由于此缓冲区是无界的,因此系统可能会达到不可持续的状态。

背压感知流

解决方法是使用其他运算符生成数据流–
助焊剂
使用此运算符的代码如下所示:

 fun listen(): Flux<Pair<String, () -> Unit>> { return Flux.generate { sink: SynchronousSink<List<Message>> -> val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() LOGGER.info( "Received: $messages" ) sink.next(messages) } .flatMapIterable(Function.identity()) .doOnError { t: Throwable -> LOGGER.error(t.message, t) } .retry() .map { snsMessage: Message -> val snsMessageBody: String = snsMessage.body() val snsNotification: SnsNotification = readSnsNotification(snsMessageBody) snsNotification.message to { deleteQueueMessage(snsMessage.receiptHandle(), queueUrl) } }  } 

这种工作方式是重复调用传递给“ Flux.generate”运算符的块–与while循环类似,在每个循环中,期望将一项添加到流中。 在这种情况下,添加到流中的项目恰好是一个列表,该列表像以前一样分解为单独的消息。

背压在这种情况下如何工作–

因此,请再次考虑下游使用者处理速度比生成端慢的情况。 在这种情况下,Flux本身将以调用generate运算符的速率减慢速度,因此要考虑下游系统的吞吐量。

结论

这应该建立一个良好的管道来处理来自SQS的消息,对此有更多细微差别,可以稍后在流中并行处理消息,我将在以后的文章中介绍。

这个例子的代码库可以在我的github仓库中找到
在这里 – https://github.com/bijukunjummen/boot-with-sns-sqs。 该代码具有完整的管道,其中包括处理消息并在处理后将其删除。

翻译自: https://www.javacodegeeks.com/2020/03/processing-sqs-messages-using-spring-boot-and-project-reactor.html

使用Spring Boot和Project Reactor处理SQS消息相关推荐

  1. 使用Spring Boot和Project Reactor处理SQS消息-第2部分

    这是我关于使用Spring Boot和Project Reactor有效处理SQS消息的博客文章的后续文章 我在第一部分中列出了一些方法上的差距. 1.处理SQS客户端调用中的失败 2.该方法一次只能 ...

  2. kafka创建topic_Kafka系列第三篇!10 分钟学会如何在 Spring Boot 程序中使用 Kafka 作为消息队列?...

    Guide哥答应大家的 Kafka系列的第3篇原创文章,写的非常详细,没有接触过 Kafka 的朋友应该都可以看懂,觉得不错的话一定要点亮你们的在看!在看就是对Guide 哥最大的鼓励! 为了保证内容 ...

  3. spring boot 整合钉钉机器人发送消息通知

    钉钉消息通知 主要用于系统预警.资源预警.重要消息通知,随时随地可以掌握重要信息 一.通知效果 1.文本通知 2.带链接的通知 3.makrdown格式 通知 4.ActionCard 通知 5.Fe ...

  4. 【Spring Boot】Spring Boot之整合RabbitMQ并实现消息的发送和接收

    一.项目配置 1)引入maven坐标 <!--amqp--><dependency><groupId>org.springframework.boot</gr ...

  5. spring boot 自学笔记(八) Rabbitmq 延迟消息(插件)

    在前面文章有通过Rabbit的死信方式来实现延迟队列机制, 但是这种方式有极大的弊端, 机试不考虑死信队列性能问题,另外发送的消息并不能保证时间延迟的可靠性,. 举例如下: 同时发送两条延迟消息,分别 ...

  6. 2021 最新版 Spring Boot 速记教程

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 本文来源:http://r6d.cn/X6FP 结束了前面 ...

  7. 如何使用MySQL和JPA使用Spring Boot构建Rest API

    Hi Everyone! For the past year, I have been learning JavaScript for full-stack web development. For ...

  8. Spring Boot Initilizr - 使用ThirdParty工具

    Spring Boot Initilizr - 使用ThirdParty工具 这是我之前的两篇文章的延续.在阅读本文之前,请先阅读我之前在" Spring Boot Initilizr We ...

  9. spring boot示例_Spring Boot REST示例

    spring boot示例 Spring Boot is an awesome module from Spring Framework. Once you are used to it, then ...

最新文章

  1. 《Pro ASP.NET MVC 3 Framework》学习笔记之二十七【视图1】
  2. 闭包--闭包作用之保护(一)
  3. linux安装rz命令_Linux 安装dep安装包命令
  4. 坦克游戏服务器未响应,《坦克世界》退出战斗 退出战斗卡死解决办法
  5. typedef让p去除了普通变量的C++身份
  6. 对过去css+div的总结
  7. Oracle-day03 上
  8. Exchange2003反病毒
  9. linux技术属于什么系,什么云计算技术?想学好这个必须了解的!
  10. c语言中的三角函数公式,高中三角函数公式大全-必背基础知识点.doc
  11. Balanced Lineup
  12. 今日学习——冒泡排序
  13. python中不等于号_python的不等于号是什么
  14. 依存句法分析与语义依存分析的区别
  15. Win10 自定义右键新建菜单
  16. 分享到QQ、QZone方法,无需登录
  17. 人事面试100问(3)
  18. python最简单的语言_Python语言的简单实用小工具
  19. android Lottie详细使用
  20. 永远的优客李林——Just for you

热门文章

  1. P3514-[POI2011]LIZ-Lollipop【思路题】
  2. 2021“MINIEYE杯”中国大学生算法设计超级联赛(1)zoto(二维数颜色)
  3. 【结论】Number(jzoj(gz) 1781)
  4. Nacos(五)之Spring集成
  5. 图解 5 种 Join 连接及实战案例!(inner/ left/ right/ full/ cross)
  6. 使用jdbc连接mysql数据库代码示例
  7. 块元素与行内元素转化(display属性)
  8. React向对象数组进行赋值
  9. php滚动公告源码,好用的滚动公告HTML代码
  10. ubuntu ifconfig_Ubuntu 设置固定 IP 最简单的方法!