spring-cloud-stream-binder-kafka发消息指定 partitionKey
组件版本
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>2.1.4.RELEASE</version></dependency>
背景信息
stream-binder 通过绑定 channel 发送消息很方便,
但 MessageChannel中 send 方法只接收 Message 参数无法指定 partitionKey;
default boolean send(Message<?> message) {return send(message, INDEFINITE_TIMEOUT);
}boolean send(Message<?> message, long timeout);
网友推荐
方法 如下
// 指定
partitionKeyExtractorClasspartitionSelectorClasspartitionKeyExpression
需要自己定义Bean并且 实现对应的接口,略显麻烦。
自查源码
可以在构造 Message 时,设置 header信息 来指定 messageKey,后续会用作选取partition。
// partitionKey 为自定义的 业务key (String类型)
setHeader(KafkaHeaders.MESSAGE_KEY, partitionKey.getBytes())
理论依据
在 KafkaProducerMessageHandler 中 有相应的处理逻辑:
// org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(); L348protected Object handleRequestMessage(final Message<?> message) {MessageHeaders messageHeaders = message.getHeaders();String topic = this.topicExpression != null ?this.topicExpression.getValue(this.evaluationContext, message, String.class): messageHeaders.get(KafkaHeaders.TOPIC, String.class);Assert.state(StringUtils.hasText(topic), "The 'topic' can not be empty or null");Integer partitionId = this.partitionIdExpression != null ?this.partitionIdExpression.getValue(this.evaluationContext, message, Integer.class): messageHeaders.get(KafkaHeaders.PARTITION_ID, Integer.class);// 优先级低于 messageKeyExpressionObject messageKey = this.messageKeyExpression != null? this.messageKeyExpression.getValue(this.evaluationContext, message): messageHeaders.get(KafkaHeaders.MESSAGE_KEY);Long timestamp = this.timestampExpression != null? this.timestampExpression.getValue(this.evaluationContext, message, Long.class): messageHeaders.get(KafkaHeaders.TIMESTAMP, Long.class);...
}
DefaultPartitioner 根据 keyBytes 选取 partition
// org.apache.kafka.clients.producer.internals.DefaultPartitioner.partition L69// hash the keyBytes to choose a partitionreturn Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
spring-cloud-stream-binder-kafka发消息指定 partitionKey相关推荐
- 一. spring cloud gateway集成 spring cloud stream binder kafka,实现“动态路由“刷新与加载之采坑记录
一.前言 Spring Cloud Stream是用于构建消息驱动的微服务应用程序的框架. 本文主要介绍如何集成 Spring Cloud Stream,以 Kafka发布订阅模式(topic),实现 ...
- Spring Cloud Stream 与 Kafka 整合
重要注意事项: 1. 尽管 Spring Cloud Stream Binder 中存在 Kafka 的整合,然而Spring Kafka 和 Spring Cloud Stream Kafka 在处 ...
- Spring Cloud Stream Binder 实现
Spring Cloud Stream Binder 实现 JMS 实现 ActiveMQ 1.增加Maven依赖 <!-- 整合 Sprig Boot Starter ActiveMQ --& ...
- Spring Cloud Stream与RabbitMQ 消费者 消息分组
Spring Cloud Stream 其实是发布订阅模型,如果一个topic有多个订阅实例 ,消息就会被这些消息消费者接收到,这样就会带来一个问题,那就是消息的重复消费,这种问题在很多业务场景下是不 ...
- Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑
应用场景 有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的: ...
- Spring Cloud【Finchley】- 21 Spring Cloud Stream 构建消息驱动微服务
文章目录 概述 添加依赖 配置文件配置RabbitMQ的地址信息 接口定义 接收方 @EnableBinding @StreamListener 测试 消费组 发送复杂对象 消息回执 代码 概述 官网 ...
- Spring Cloud Stream消息中间件通过RabbitMQ实现消息推送
一.Spring Cloud Stream是什么 Spring Cloud Stream 是一个构建消息驱动微服务的框架. Spring Cloud Stream解决了开发人员无感知的使用消息中间件的 ...
- SpringCloud学习笔记 - 消息驱动 - Spring Cloud Stream
1. stream为什么被引入 常见MQ(消息中间件): ActiveMQ RabbitMQ RocketMQ Kafka 有没有一种新的技术诞生,让我们不再关注具体MQ的细节,我们只需要用一种适配绑 ...
- Kafka的Spring Cloud Stream
总览 该示例项目演示了如何使用事件驱动的体系结构 , Spring Boot ,Spring Cloud Stream, Apache Kafka和Lombok构建实时流应用程序. 在本教程结束时,您 ...
最新文章
- 用JavaScript和CSS实现“在页面中水平和垂直居中”的时钟
- 创建试图 失败_导致微服务失败的 11 个原因
- 【Python基础】盘点 Python 10 大常用数据结构(下篇)
- YTU 2412: 帮警长数一数【循环、分支简单综合】
- 鸿蒙思维和小央美,中心路汝南路站附近艺术培训
- windows和linux如何通信,别总是把Windows和Linux混为一谈
- ArcMAP TIN与栅格DEM的坡度坡向对比分析
- mysql spider引擎安装_mysql spider 引擎的使用
- 网络调试助手(NetAssist)java socket通讯代码
- ocr常用数据集介绍
- 用python的matplotlib和numpy库绘制股票K线均线
- 嵌入式行业和职业选择的看法
- Mstar的Monitor方案笔记(七)——EDID基本数据结构
- 完美卸载itunes
- ROS智能车定位导航仿真(已实现)
- 附加题——求n的阶乘和
- python的flask框架实现的小型二手商城
- 【Linux 性能优化系列】Linux 性能优化 -- CPU 性能篇(三) Linux 软中断
- python图片内容识别_TensorFlow从1到2(五)图片内容识别和自然语言语义识别
- 免费PPT模板 | 《超能陆战队》暖心大白通用演示PPT模板
热门文章
- 阿里云oos对象存储 上手快速入门
- 嘉兴 机器人仓库 菜鸟_揭秘菜鸟自动化仓库 机器人物联网+大数据(附视频)...
- 2020软件测试学习自学路线分享,附完整资料,绝对有用哟
- mysql hash索引 btree索引_HASH索引做等值查询会很快,BTree索引做范围查询比较快...
- Windows8.1安装tools提示:安装程序无法验证是否已安装所需的Microsoft更新KB2919355的问题详解
- C++stl翁恺老师笔记
- 南京:探索实施“电子围栏”管理新模式 缓解商圈周边道路交通拥堵
- 马踏棋盘问题的程序c语言,C语言马踏棋盘
- mac 设置mysql登录快捷键_史上最详细的苹果Macbook快捷键使用
- 用c语言编写九九乘法表