通过上一篇《消息驱动的微服务(消费组)》的学习,我们已经能够在多实例环境下,保证同一消息只被一个消费者实例进行接收和处理。但是,对于一些特殊场景,除了要保证单一实例消费之外,还希望那些具备相同特征的消息都能够被同一个实例进行消费。这时候我们就需要对消息进行分区处理。

使用消息分区

在Spring Cloud Stream中实现消息分区非常简单,我们可以根据消费组示例做一些配置修改就能实现,具体如下:

  • 在消费者应用 SinkReceiver中,我们对配置文件做一些修改,具体如下:

  1. spring.cloud.stream.bindings.input.group=Service-A

  2. spring.cloud.stream.bindings.input.destination=greetings

  3. spring.cloud.stream.bindings.input.consumer.partitioned=true

  4. spring.cloud.stream.instanceCount=2

  5. spring.cloud.stream.instanceIndex=0

从上面的配置中,我们可以看到增加了这三个参数:

  1. spring.cloud.stream.bindings.input.consumer.partitioned:通过该参数开启消费者分区功能;

  2. spring.cloud.stream.instanceCount:该参数指定了当前消费者的总实例数量;

  3. spring.cloud.stream.instanceIndex:该参数设置当前实例的索引号,从0开始,最大值为 spring.cloud.stream.instanceCount参数 - 1。我们试验的时候需要启动多个实例,可以通过运行参数来为不同实例设置不同的索引值。

  • 在生产者应用 SinkSender中,我们对配置文件也做一些修改,具体如下:

  1. spring.cloud.stream.bindings.output.destination=greetings

  2. spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload

  3. spring.cloud.stream.bindings.output.producer.partitionCount=2

从上面的配置中,我们可以看到增加了这两个参数:

  1. spring.cloud.stream.bindings.output.producer.partitionKeyExpression:通过该参数指定了分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键;

  2. spring.cloud.stream.bindings.output.producer.partitionCount:该参数指定了消息分区的数量。

到这里消息分区配置就完成了,我们可以再次启动这两个应用,同时消费者启动多个,但需要注意的是要为消费者指定不同的实例索引号,这样当同一个消息被发给消费组时,我们可以发现只有一个消费实例在接收和处理这些相同的消息。

博客原文:http://blog.didispace.com/spring-cloud-starter-dalston-7-4/

本文内容部分节选自我的《Spring Cloud微服务实战》

但对依赖的Spring Boot和Spring Cloud版本做了升级。

更多Spring Cloud干货戳 ==> 这里

推荐阅读

微服务2017年度报告出炉:4大客户画像,15%传统企业已领跑

Netflix 的上线工具 Spinnaker

Dubbo将积极适配Spring Cloud生态

Spring Cloud微服务架构汇总

浅谈微服务基建的逻辑

Service Mesh:下一代微服务

微服务(Microservices)【翻译】

那些没说出口的研发之痛,做与不做微服务的几大理由

长按指纹

一键关注



点击 “阅读原文” 看看本号其他精彩内容

Spring Cloud构建微服务架构:消息驱动的微服务(消费分区)【Dalston版】相关推荐

  1. Spring Cloud实战小贴士:Zuul统一异常处理(三)【Dalston版】

    本篇作为<Spring Cloud微服务实战>一书关于Spring Cloud Zuul网关在Dalston版本对异常处理的补充.没有看过本书的读书也不要紧,可以先阅读我之前的两篇博文:& ...

  2. Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

    应用场景 有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的: ...

  3. Spring Cloud构建微服务架构:服务网关(基础)【Dalston版】

    通过之前几篇Spring Cloud中几个核心组件的介绍,我们已经可以构建一个简略的(不够完善)微服务架构了.比如下图所示: 我们使用Spring Cloud Netflix中的Eureka实现了服务 ...

  4. Spring Cloud构建微服务架构:服务网关(路由配置)【Dalston版】

    在上一篇<Spring Cloud构建微服务架构:服务网关(基础)>一文中,我们通过使用Spring Cloud Zuul构建了一个基础的API网关服务,同时也演示了Spring Clou ...

  5. Spring Cloud构建微服务架构:服务网关(过滤器)【Dalston版】

    在前两篇文章:服务网关(基础).服务网关(路由配置)中,我们了解了Spring Cloud Zuul作为网关所具备的最基本功能:路由.本文我们将具体介绍一下Spring Cloud Zuul的另一项核 ...

  6. Spring Cloud构建微服务架构:消息驱动的微服务(核心概念)【Dalston版】

    通过<Spring Cloud构建微服务架构:消息驱动的微服务(入门)>一文,相信大家对Spring Cloud Stream的工作模式已经有了一些基础概念,比如:输入.输出通道的绑定,通 ...

  7. Spring Cloud构建微服务架构:消息驱动的微服务(消费组)【Dalston版】

    通过之前的<消息驱动的微服务(入门)>一文,相信很多朋友已经对Spring Cloud Stream有了一个初步的认识.但是,对于<消息驱动的微服务(核心概念)>一文中提到的一 ...

  8. Spring Cloud构建微服务架构:消息驱动的微服务(入门)【Dalston版】

    之前在写Spring Boot基础教程的时候写过一篇<Spring Boot中使用RabbitMQ>.在该文中,我们通过简单的配置和注解就能实现向RabbitMQ中生产和消费消息.实际上我 ...

  9. Spring Cloud构建微服务架构(七)消息总线(续:Kafka)

    Spring Cloud Bus除了支持RabbitMQ的自动化配置之外,还支持现在被广泛应用的Kafka.在本文中,我们将搭建一个Kafka的本地环境,并通过它来尝试使用Spring Cloud B ...

最新文章

  1. SQLServer数据库试题及答案
  2. OGNL表达式语言中的#和$的区别
  3. HDU2504 又见GCD
  4. python 全栈开发,Day51(常用内置对象,函数,伪数组 arguments,关于DOM的事件操作,DOM介绍)...
  5. CF1260C-Infinite Fence【结论题】
  6. 面对SDN/NFV部署挑战 网络厂商能做什么?
  7. java中为什么设计包装类,Java 中为什么要设计包装类
  8. HDZ城市行深圳站|AIoT时代,如何抓住智联生活的战略机会点?
  9. java猜拳游戏代码_猜拳游戏 - java代码库 - 云代码
  10. Android系统(237)---OTA升级基本信息介绍
  11. 我为什么做程序猿訪谈录
  12. Jzoj1307 Jail
  13. IDEA社区版配置Spring Boot开发
  14. 聚合路由器的原理和应用
  15. 本地编码修改和编码详解
  16. 小游戏-在评论留下你的运行结果吧
  17. 国际版抖音正确打开方式
  18. 对数损失函数与最大似然损失函数
  19. searchsploit 漏洞搜索
  20. 简单的基于Tacotron2的中英文混语言合成, 包括code-switch和voice clone. 以及深入结构设计的探讨.

热门文章

  1. linux c 报错 multiple definition of ‘xxx’ 解决方法
  2. docker 容器环境 检测方法
  3. python uvloop异步框架简介
  4. OPKG 软件包管理
  5. java操作xml文件--修改节点
  6. linux下查看和添加PATH环境变量
  7. TCP/IP详解--第八章
  8. Android老版本项目导入到新版SDK提示错误
  9. C Operator | and can also operate bool operands
  10. java string s_Java字符串:“String s=新字符串(”愚蠢“);