组件版本

<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>2.1.4.RELEASE</version></dependency>

背景信息

stream-binder 通过绑定 channel 发送消息很方便,
但 MessageChannel中 send 方法只接收 Message 参数无法指定 partitionKey;

default boolean send(Message<?> message) {return send(message, INDEFINITE_TIMEOUT);
}boolean send(Message<?> message, long timeout);

网友推荐

方法 如下

// 指定
partitionKeyExtractorClasspartitionSelectorClasspartitionKeyExpression

需要自己定义Bean并且 实现对应的接口,略显麻烦。

自查源码

可以在构造 Message 时,设置 header信息 来指定 messageKey,后续会用作选取partition。

// partitionKey 为自定义的 业务key (String类型)
setHeader(KafkaHeaders.MESSAGE_KEY, partitionKey.getBytes())

理论依据

在 KafkaProducerMessageHandler 中 有相应的处理逻辑:

//  org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(); L348protected Object handleRequestMessage(final Message<?> message) {MessageHeaders messageHeaders = message.getHeaders();String topic = this.topicExpression != null ?this.topicExpression.getValue(this.evaluationContext, message, String.class): messageHeaders.get(KafkaHeaders.TOPIC, String.class);Assert.state(StringUtils.hasText(topic), "The 'topic' can not be empty or null");Integer partitionId = this.partitionIdExpression != null ?this.partitionIdExpression.getValue(this.evaluationContext, message, Integer.class): messageHeaders.get(KafkaHeaders.PARTITION_ID, Integer.class);// 优先级低于 messageKeyExpressionObject messageKey = this.messageKeyExpression != null? this.messageKeyExpression.getValue(this.evaluationContext, message): messageHeaders.get(KafkaHeaders.MESSAGE_KEY);Long timestamp = this.timestampExpression != null? this.timestampExpression.getValue(this.evaluationContext, message, Long.class): messageHeaders.get(KafkaHeaders.TIMESTAMP, Long.class);...
}

DefaultPartitioner 根据 keyBytes 选取 partition

         // org.apache.kafka.clients.producer.internals.DefaultPartitioner.partition L69// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

spring-cloud-stream-binder-kafka发消息指定 partitionKey相关推荐

  1. 一. spring cloud gateway集成 spring cloud stream binder kafka,实现“动态路由“刷新与加载之采坑记录

    一.前言 Spring Cloud Stream是用于构建消息驱动的微服务应用程序的框架. 本文主要介绍如何集成 Spring Cloud Stream,以 Kafka发布订阅模式(topic),实现 ...

  2. Spring Cloud Stream 与 Kafka 整合

    重要注意事项: 1. 尽管 Spring Cloud Stream Binder 中存在 Kafka 的整合,然而Spring Kafka 和 Spring Cloud Stream Kafka 在处 ...

  3. Spring Cloud Stream Binder 实现

    Spring Cloud Stream Binder 实现 JMS 实现 ActiveMQ 1.增加Maven依赖 <!-- 整合 Sprig Boot Starter ActiveMQ --& ...

  4. Spring Cloud Stream与RabbitMQ 消费者 消息分组

    Spring Cloud Stream 其实是发布订阅模型,如果一个topic有多个订阅实例 ,消息就会被这些消息消费者接收到,这样就会带来一个问题,那就是消息的重复消费,这种问题在很多业务场景下是不 ...

  5. Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

    应用场景 有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的: ...

  6. Spring Cloud【Finchley】- 21 Spring Cloud Stream 构建消息驱动微服务

    文章目录 概述 添加依赖 配置文件配置RabbitMQ的地址信息 接口定义 接收方 @EnableBinding @StreamListener 测试 消费组 发送复杂对象 消息回执 代码 概述 官网 ...

  7. Spring Cloud Stream消息中间件通过RabbitMQ实现消息推送

    一.Spring Cloud Stream是什么 Spring Cloud Stream 是一个构建消息驱动微服务的框架. Spring Cloud Stream解决了开发人员无感知的使用消息中间件的 ...

  8. SpringCloud学习笔记 - 消息驱动 - Spring Cloud Stream

    1. stream为什么被引入 常见MQ(消息中间件): ActiveMQ RabbitMQ RocketMQ Kafka 有没有一种新的技术诞生,让我们不再关注具体MQ的细节,我们只需要用一种适配绑 ...

  9. Kafka的Spring Cloud Stream

    总览 该示例项目演示了如何使用事件驱动的体系结构 , Spring Boot ,Spring Cloud Stream, Apache Kafka和Lombok构建实时流应用程序. 在本教程结束时,您 ...

最新文章

  1. 用JavaScript和CSS实现“在页面中水平和垂直居中”的时钟
  2. 创建试图 失败_导致微服务失败的 11 个原因
  3. 【Python基础】盘点 Python 10 大常用数据结构(下篇)
  4. YTU 2412: 帮警长数一数【循环、分支简单综合】
  5. 鸿蒙思维和小央美,中心路汝南路站附近艺术培训
  6. windows和linux如何通信,别总是把Windows和Linux混为一谈
  7. ArcMAP TIN与栅格DEM的坡度坡向对比分析
  8. mysql spider引擎安装_mysql spider 引擎的使用
  9. 网络调试助手(NetAssist)java socket通讯代码
  10. ocr常用数据集介绍
  11. 用python的matplotlib和numpy库绘制股票K线均线
  12. 嵌入式行业和职业选择的看法
  13. Mstar的Monitor方案笔记(七)——EDID基本数据结构
  14. 完美卸载itunes
  15. ROS智能车定位导航仿真(已实现)
  16. 附加题——求n的阶乘和
  17. python的flask框架实现的小型二手商城
  18. 【Linux 性能优化系列】Linux 性能优化 -- CPU 性能篇(三) Linux 软中断
  19. python图片内容识别_TensorFlow从1到2(五)图片内容识别和自然语言语义识别
  20. 免费PPT模板 | 《超能陆战队》暖心大白通用演示PPT模板

热门文章

  1. 阿里云oos对象存储 上手快速入门
  2. 嘉兴 机器人仓库 菜鸟_揭秘菜鸟自动化仓库 机器人物联网+大数据(附视频)...
  3. 2020软件测试学习自学路线分享,附完整资料,绝对有用哟
  4. mysql hash索引 btree索引_HASH索引做等值查询会很快,BTree索引做范围查询比较快...
  5. Windows8.1安装tools提示:安装程序无法验证是否已安装所需的Microsoft更新KB2919355的问题详解
  6. C++stl翁恺老师笔记
  7. 南京:探索实施“电子围栏”管理新模式 缓解商圈周边道路交通拥堵
  8. 马踏棋盘问题的程序c语言,C语言马踏棋盘
  9. mac 设置mysql登录快捷键_史上最详细的苹果Macbook快捷键使用
  10. 用c语言编写九九乘法表