文章目录

  • 获取Topic的可用分区
  • 不指定分区key
  • 指定分区key

获取Topic的可用分区

发送消息时,有了元数据了,就要把消息路由到分区了。执行doSend方法中的对应方法:

//使用Partitioner组件获取消息对应的分区
int partition = partition(record, serializedKey, serializedValue, metadata.fetch());

把消息、序列化的key、序列化的value、元数据都传参给partition()方法。

首先要获取消息的分区号:

//获取消息的分区
Integer partition = record.partition();

在创建ProducerRecord的时候,有很多重载的构造函数。常用的就是:

    //未指定分区key,只是将某一条消息发送到哪个Topic上。public ProducerRecord(String topic, V value) {this(topic, null, null, null, value);}

或者指定了分区key、甚至指定了具体的partition。一般不指定partition:

//不指定partition,就会走这
return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,cluster);

这个partition方法默认是DefaultPartitioner#partition()方法,它是负责默认的分区路由策略(总共3种:指定partition、指定key、不指定key)

首先要从Metadata缓存中,找到Topic下属的所有分区:

//从Metadata缓存中,找Topic下属的所有分区
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

然后就知道当前Topic总共有多少分区:

//当前Topic总共有多少分区
int numPartitions = partitions.size();

不指定分区key

这里分析未指定分区key的情况,就走if。首先会取1个随机递增的数字:

//默认初始值是一个Integer的数字,并且会递增
int nextValue = counter.getAndIncrement();

然后查看当前Topic名下可用的分区都有哪些

//查看当前Topic名下可用的分区都有哪些
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);

然后就用这个递增的正整数,%可用分区的数量:

//递增的Integer数字 % 分区个数 = 本条消息要去往的分区号
int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();

假设这个递增正整数的初始值是23,Topic的可用分区size=5.那么这条消息的去处就是23%5。下一条消息再来,就是24%5…以此类推。其实就为了让消息落的更散列、均匀,保证在未指定分区key的前提下,所有的消息还能均匀的分发到各分区中去。

指定分区key

如果将订单id作为分区key:

ProducerRecord<String,String> record = new ProducerRecord<>("test",orderId);

对应partition方法,走以下逻辑:

//指定了分区key
return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

Kafka通过自己的工具类–murmur2,将key转换为32位的int类型的hash值。用这个hash值对可用分区数取模

如何利用Partitioner将消息路由到分区?相关推荐

  1. 利用MPLS解决BGP路由黑洞问题

    致歉声明:前版博客由于博主个人对于BGP知识的理解出现偏差,导致路由黑洞产生的原因解释错误,误导大家,抱歉. 配置文件的百度网盘连接及提取码. 链接:https://pan.baidu.com/s/1 ...

  2. RabbitMQ消息路由失败的处理方案(回调与备份交换机AE)

    我们知道,消息在RabbitMQ的整个生命周期是生产者投递消息到Exchange,Exchange根据路由键将消息路由到合适的Queue,Queue再将消息推(或消费者主动拉)给消费者. 在这个过程当 ...

  3. EventBridge消息路由|高效构建消息路由能力

    作者:肯梦 企业数字化转型过程中,天然会遇到消息路由,异地多活,协议适配,消息备份等场景.本篇主要通过 EventBridge 消息路由的应用场景和应用实验介绍,帮助大家了解如何通过 EventBri ...

  4. 没有终结点在侦听可以接受消息的_IoT Hub入门(3)使用消息路由将原始设备数据记录存档...

    本文主要分享一个案例: 10分钟使用消息路由将原始设备数据记录存档 本文主要有如下内容: 1.理解什么是消息路由,为什么要用消息路由 2.消息路由的类型 3.配置一个到Storage的消息路由,将原始 ...

  5. 基于Topic消息路由的M2M设备间通信Node JS SDK 示例

    概述 M2M(即Machine-to-Machine)是一种端对端通信技术.本章节以Node JS SDK为例,使用基于Topic消息路由的M2M设备间通信,主要介绍如何基于物联网平台构建一个M2M设 ...

  6. 开源流媒体云视频平台EasyDarwin中EasyCMS服务是如何进行命令转发和消息路由的

    开源流媒体云视频平台EasyDarwin中EasyCMS服务是如何进行命令转发和消息路由的 EasyCMS介绍 EasyCMS做为EasyDarwin开源流媒体云平台解决方案的一部分,主要进行的是设备 ...

  7. 融云服务器实时消息路由,IM 北极星产品功能介绍

    IM 北极星 概述 IM 北极星主要为开发者提供了终端用户连接及消息发送状态查询功能:通过查看用户的连接日志,判断用户某时间段内是否可以正常使用融云服务,如:用户反馈消息发送不成功,可能是因为该用户在 ...

  8. 2.18 haas506 2.0开发教程 - 阿里云M2M设备间通信 - 规则引擎/Topic消息路由(仅支持2.2以上版本)

    haas506 2.0开发教程 - 阿里云M2M设备间通信 - 规则引擎/Topic消息路由 阿里云M2M设备间通信 通信流程 功能实现 1.创建产品 2.设备端开发 联云测试 3.建立topic之间 ...

  9. 从TrackPopupMenu(创建快捷菜单的函数)看菜单消息路由机制

    先摘段<vc++深入详解>孙鑫的话 以上讲的是点击主菜单时的消息路由过程,但鼠标右键快捷菜单的消息路由有点区别. TrackPopupMenu函数用来创建右键快捷菜单. 例: CMenu ...

最新文章

  1. Android移动开发之【Android实战项目】DAY13-MPChart简单的折线图LineChart
  2. dependencies与devDependencies之间的区别
  3. 来自高维的对抗 - 逆向TinyTool自制
  4. 苹果广告背景音乐大全【转】
  5. Silverlight3 Tools Download link
  6. Activity内嵌Fragment,当Activity recreate时Fragment被添加多次,造成相互遮盖
  7. Win XP必须禁止的服务
  8. STM32工作笔记0061---通用定时器基本原理
  9. avalon做的抽奖效果
  10. phpstorm连接不上MySQL_PHPStorm无法连接到本地MySQL [重复]
  11. Lena与数字图像处理
  12. ACM题库(计蒜客A1001整除问题)
  13. AirServer7电脑桌面版一款便捷式手机电脑投屏软件
  14. People seldom do what they believe in. They do what is convenient, then repent.
  15. python去除含st的股票
  16. js 选择本地图片并显示
  17. 碳减排量和碳配额的区别是什么?
  18. xilinx IP 汇总
  19. pdf转换成word转换器在线哪个最好
  20. 计算机基础三: 二进制减法实现

热门文章

  1. [清华集训2016]石家庄的工人阶级队伍比较坚强——三进制FWT
  2. StringUtils之equals
  3. live555点播服务器流程深入分析(一)
  4. Git的stash操作
  5. 《ActionScript 3.0基础教程》——1.4 对象参数
  6. ios中一个开发者证书如何创建多个app应用
  7. T-Sql(七)用户权限操作(grant)
  8. 服务器安全股v4.0正式版发布 防火墙效能更强
  9. 网页加速系列(五)、 网页加速之进阶上篇
  10. angular模拟web API