RocketMQ 提供了一致性hash 算法来做Consumer 和 MessageQueue的负载均衡。 源码中一致性hash 环的实现是很优秀的,我们一步一步分析。

一个Hash环包含多个节点, 我们用 MyNode 去封装节点, 方法 getKey() 封装获取节点的key。我们可以实现MyNode 去描述一个物理节点或虚拟节点。MyVirtualNode 实现 MyNode, 表示一个虚拟节点。这里注意:一个虚拟节点是依赖于一个物理节点,所以MyVirtualNode 中封装了 一个 泛型 T physicalNode。物理节点MyClientNode也是实现了这个MyNode接口,很好的设计。代码加注释如下:

 /*** 表示hash环的一个节点*/public interface MyNode {/*** @return 节点的key*/String getKey();}/*** 虚拟节点*/
public class MyVirtualNode<T extends MyNode> implements MyNode {final T physicalNode;  // 主节点final int replicaIndex;  // 虚节点下标public MyVirtualNode(T physicalNode, int replicaIndex) {this.physicalNode = physicalNode;this.replicaIndex = replicaIndex;}@Overridepublic String getKey() {return physicalNode.getKey() + "-" + replicaIndex;}/*** thisMyVirtualNode 是否是pNode 的 虚节点*/public boolean isVirtualNodeOf(T pNode) {return physicalNode.getKey().equals(pNode.getKey());}public T getPhysicalNode() {return physicalNode;}}private static class MyClientNode implements MyNode {private final String clientID;public MyClientNode(String clientID) {this.clientID = clientID;}@Overridepublic String getKey() {return clientID;}
}

上面实现了节点, 一致性hash 下一个问题是怎么封装hash算法呢?RocketMQ 使用 MyHashFunction 接口定义hash算法。使用MD5 + bit 位hash的方式实现hash算法。我们完全可以自己实现hash算法,具体见我的“常见的一些hash函数”文章。MyMD5Hash 算法代码的如下:

 // MD5 hash 算法, 这里hash算法可以用常用的 hash 算法替换。private static class MyMD5Hash implements MyHashFunction {MessageDigest instance;public MyMD5Hash() {try {instance = MessageDigest.getInstance("MD5");} catch (NoSuchAlgorithmException e) {}}@Overridepublic long hash(String key) {instance.reset();instance.update(key.getBytes());byte[] digest = instance.digest();long h = 0;for (int i = 0; i < 4; i++) {h <<= 8;h |= ((int)digest[i]) & 0xFF;}return h;}}

现在,hash环的节点有了, hash算法也有了,最重要的是描述一个一致性hash 环。 想一想,这个环可以由N 个物理节点, 每个物理节点对应m个虚拟节点,节点位置用hash算法值描述。每个物理节点就是每个Consumer, 每个Consumer 的 id 就是 物理节点的key。 每个MessageQueue 的toString() 值 hash 后,用来找环上对应的最近的下一个物理节点。源码如下,这里展示主要的代码,其中最巧妙地是routeNode 方法, addNode 方法 注意我的注释:

public class MyConsistentHashRouter<T extends MyNode> {private final SortedMap<Long, MyVirtualNode<T>> ring = new TreeMap<>(); // key是虚节点key的哈希值, value 是虚节点
private final MyHashFunction myHashFunction;
/*** @param pNodes 物理节点集合* @param vNodeCount 每个物理节点对应的虚节点数量* @param hashFunction hash 函数 用于 hash 各个节点*/
public MyConsistentHashRouter(Collection<T> pNodes, int vNodeCount, MyHashFunction hashFunction) {if (hashFunction == null) {throw new NullPointerException("Hash Function is null");}this.myHashFunction = hashFunction;if (pNodes != null) {for (T pNode : pNodes) {this.addNode(pNode, vNodeCount);}}
}
/*** 添加物理节点和它的虚节点到hash环。* @param pNode 物理节点* @param vNodeCount 虚节点数量。*/
public void addNode(T pNode, int vNodeCount) {if (vNodeCount < 0) {throw new IllegalArgumentException("ill virtual node counts :" + vNodeCount);}int existingReplicas = this.getExistingReplicas(pNode);for (int i = 0; i < vNodeCount; i++) {MyVirtualNode<T> vNode = new MyVirtualNode<T>(pNode, i + existingReplicas); // 创建一个新的虚节点,位置是 i+existingReplicasring.put(this.myHashFunction.hash(vNode.getKey()), vNode); // 将新的虚节点放到hash环中}
}
/*** 根据一个给定的key 在 hash环中 找到离这个key最近的下一个物理节点* @param key 一个key, 用于找这个key 在环上最近的节点*/
public T routeNode(String key) {if (ring.isEmpty()) {return null;}Long hashVal = this.myHashFunction.hash(key);SortedMap<Long, MyVirtualNode<T>> tailMap = ring.tailMap(hashVal);Long nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();return ring.get(nodeHashVal).getPhysicalNode();
}/*** @param pNode 物理节点* @return 当前这个物理节点对应的虚节点的个数*/
public int getExistingReplicas(T pNode) {int replicas = 0;for (MyVirtualNode<T> vNode : ring.values()) {if (vNode.isVirtualNodeOf(pNode)) {replicas++;}}return replicas;
}

现在一致性hash 环有了, 剩下的就是 和rocketmq 的 consumer, mq 构成负载均衡策略了。比较简单, 代码如下:

             /*** 基于一致性性hash环的Consumer负载均衡.*/     public class MyAllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {// 每个物理节点对应的虚节点的个数private final int virtualNodeCnt;private final MyHashFunction customHashFunction;public MyAllocateMessageQueueConsistentHash() {this(10);   // 默认10个虚拟节点}public MyAllocateMessageQueueConsistentHash(int virtualNodeCnt) {this(virtualNodeCnt, null);}public MyAllocateMessageQueueConsistentHash(int virtualNodeCnt, MyHashFunction customHashFunction) {if (virtualNodeCnt < 0) {throw new IllegalArgumentException("illegal virtualNodeCnt : " + virtualNodeCnt);}this.virtualNodeCnt = virtualNodeCnt;this.customHashFunction = customHashFunction;}@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {// 省去一系列非空校验Collection<MyClientNode> cidNodes = new ArrayList<>();for (String cid : cidAll) {cidNodes.add(new MyClientNode(cid));}final MyConsistentHashRouter<MyClientNode> router;if (this.customHashFunction != null) {router = new MyConsistentHashRouter<MyClientNode>(cidNodes, virtualNodeCnt, customHashFunction);}else {router = new MyConsistentHashRouter<MyClientNode>(cidNodes, virtualNodeCnt);}List<MessageQueue> results = new ArrayList<MessageQueue>();  // 当前 currentCID 对应的 mq// 将每个mq 根据一致性hash 算法找到对应的物理节点(Consumer)for (MessageQueue mq : mqAll) {MyClientNode clientNode = router.routeNode(mq.toString());   // 根据 mq toString() 方法做hash 和环上节点比较if (clientNode != null && currentCID.equals(clientNode.getKey())) {results.add(mq);}}return results;}@Overridepublic String getName() {return "CONSISTENT_HASH";}private static class MyClientNode implements MyNode {private final String clientID;public MyClientNode(String clientID) {this.clientID = clientID;}@Overridepublic String getKey() {return clientID;}}}

————————————————
版权声明:本文为CSDN博主「昊haohao」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/ZHANGYONGHAO604/article/details/82426373

RocketMQ Consumer 负载均衡算法源码学习 -- AllocateMessageQueueConsistentHash相关推荐

  1. 【详解】Ribbon 负载均衡服务调用原理及默认轮询负载均衡算法源码解析、手写

    Ribbon 负载均衡服务调用 一.什么是 Ribbon 二.LB负载均衡(Load Balancer)是什么 1.Ribbon 本地负载均衡客户端 VS Nginx 服务端负载均衡的区别 2.LB负 ...

  2. Robin六种常用负载均衡算法源码解析

    文章目录 1 经典轮询算法 2 随机算法 3 以响应时间为权重的轮询策略(重中之重) 4 重试策略 5 断言策略 6 最佳可用性策略 1 经典轮询算法 //Robin的负载均衡原理为 请求服务=请求次 ...

  3. Dubbo负载均衡的源码流程(2022.5.30)

    Dubbo负载均衡的源码流程 1.默认负载均衡策略:RandomLoadBalance(随机策略) 2.负载均衡策略存在以下五种: 2.1 RandomLoadBalance(随机) 2.2 Roun ...

  4. java 一致性hash算法 均衡分发_Dubbo一致性哈希负载均衡的源码和Bug,了解一下?...

    本文是对于Dubbo负载均衡策略之一的一致性哈希负载均衡的详细分析.对源码逐行解读.根据实际运行结果,配以丰富的图片,可能是东半球讲一致性哈希算法在Dubbo中的实现最详细的文章了. 文中所示源码,没 ...

  5. @cacheable 服务器 不一致_Dubbo一致性哈希负载均衡的源码和Bug,了解一下?

    持续输出原创文章,点击蓝字关注我吧 本文是对于Dubbo负载均衡策略之一的一致性哈希负载均衡的详细分析.对源码逐行解读.根据实际运行结果,配以丰富的图片,可能是东半球讲一致性哈希算法在Dubbo中的实 ...

  6. 基于RYU应用开发之负载均衡(源码开放)

    为什么80%的码农都做不了架构师?>>>    编者按:本文介绍的是如何在RYU上通过使用select group 来实现multipath,从而实现流量的调度,完成简单的负载均衡D ...

  7. Ribbon源码3-负载均衡算法源码分析

    0. 环境 nacos版本:1.4.1 Spring Cloud : Hoxton.SR9 Spring Boot :2.4.4 Spring Cloud alibaba: 2.2.5.RELEASE ...

  8. grpc负载均衡RoundRobin源码解读

    grpc client端创建连接时可以用WithBalancer来指定负载均衡组件,这里研究下grpc自带的RoundRobin(轮询调度)的实现.源码在google.golang.org/grpc/ ...

  9. RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的?

    RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 文章目录 RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 前言 项目 ...

最新文章

  1. mysql数据库建仓范式_存mysql个数
  2. nagios 监控shell脚本
  3. [Spark][Python]PageRank 程序
  4. jqgrid定义多选操作
  5. 十二届 - CSU 1803 :2016(同余定理)
  6. vxworks 调式
  7. 关于阈值化函数cvThreshold()
  8. 十年一诺,亚马逊中国今欲先发制人
  9. curl: (52) Empty reply from server
  10. 三星S7edge刷极光ROM的总结
  11. fclk if总线_技嘉B550手把手超频指南,光威血影为例
  12. 中国农业银行计算机专业笔试题,中国农业银行笔试题库
  13. 用Python爬取网页数据,手把手教会你!
  14. 梁宁《产品思维》之18用户体验
  15. MFC 中 如何屏蔽Esc和Ente
  16. 层次分析法 你真的懂了吗?(完更)
  17. 设备管理器,其他设备,PCI数据捕获和信号处理控制器出现感很多未知设备感叹号,通用解决方法,以华为matebook为例
  18. 北京35岁程序员失业,感叹:编程估计没戏了,想去卖点煎饼果子养家~
  19. 计算机主板外频,笨鸟先飞 主板超频BIOS选项接触(图解)
  20. View onMeasure 方法

热门文章

  1. 《分布式操作系统》知识点(22~28)四
  2. Dynamics CRM 同一实体多个Form显示不同的Ribbon按钮
  3. 保存/恢复cxGrid布局
  4. transform css3 的使用及理解
  5. CSS三栏自适应布局,左中右,上中下
  6. 使用ab(apachebench)进行压力测试
  7. 关键词多样性的重要意义
  8. OpenCV学习之Mat::at()理解
  9. 【Linux多线程】三个经典同步问题
  10. indows 平台下 Go 语言的安装和环境变量设置