生产者集群启动时,才会使用负载均衡,否则跳过

    <!--use dubbo protocol to export service on port 20880修改端口,即可启动集群,此时负载均衡才会使用--><dubbo:protocol name="dubbo" port="20881"/>

LoadBalance

@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {/*** select one invoker in list.** @param invokers   invokers.* @param url        refer url* @param invocation invocation.* @return selected invoker.*/@Adaptive("loadbalance")<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;}

AbstractLoadBalance

public abstract class AbstractLoadBalance implements LoadBalance {static int calculateWarmupWeight(int uptime, int warmup, int weight) {int ww = (int) ((float) uptime / ((float) warmup / (float) weight));return ww < 1 ? 1 : (ww > weight ? weight : ww);}public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {if (invokers == null || invokers.isEmpty())return null;if (invokers.size() == 1)return invokers.get(0);return doSelect(invokers, url, invocation);}protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);protected int getWeight(Invoker<?> invoker, Invocation invocation) {int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);if (weight > 0) {long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);if (timestamp > 0L) {int uptime = (int) (System.currentTimeMillis() - timestamp);int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);if (uptime > 0 && uptime < warmup) {weight = calculateWarmupWeight(uptime, warmup, weight);}}}return weight;}}

四种负载均衡策略

RandomLoadBalance随机加权,默认

public class RandomLoadBalance extends AbstractLoadBalance {public static final String NAME = "random";private final Random random = new Random();protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {int length = invokers.size(); // Number of invokersint totalWeight = 0; // The sum of weightsboolean sameWeight = true; // Every invoker has the same weight?for (int i = 0; i < length; i++) {int weight = getWeight(invokers.get(i), invocation);totalWeight += weight; // Sumif (sameWeight && i > 0&& weight != getWeight(invokers.get(i - 1), invocation)) {sameWeight = false;}}if (totalWeight > 0 && !sameWeight) {// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.int offset = random.nextInt(totalWeight);// Return a invoker based on the random value.for (int i = 0; i < length; i++) {offset -= getWeight(invokers.get(i), invocation);if (offset < 0) {return invokers.get(i);}}}// If all invokers have the same weight value or totalWeight=0, return evenly.return invokers.get(random.nextInt(length));}

生产者改变权重

<dubbo:provider delay="-1" timeout="1000000" retries="0" weight="80"/>

默认权重值时100,

我们现在假设集群有四个节点分别对应的权重为{A:1,B:2,C:3,D:4},分别将权重套入到代码中进行分析,该随机算法按总权重进行加权随机,A节点负载请求的概率为1/(1+2+3+4),依次类推,B,C,D负载的请求概率分别是20%,30%,40%。在这种方式下,用户可以根据机器的实际性能动态调整权重比率,如果发现机器D负载过大,请求堆积过多,通过调整权重可以缓解机器D处理请求的压力。

轮询加权

修改负载均衡策略

    <dubbo:reference id="userService" check="false" interface="com.study.dubbo.userapi.service.UserService" loadbalance="roundrobin"/>

消费者添加 loadbalance=“roundrobin”,就是类中Name常量

RoundRobinLoadBalance

public class RoundRobinLoadBalance extends AbstractLoadBalance {public static final String NAME = "roundrobin";private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();int length = invokers.size(); // Number of invokersint maxWeight = 0; // The maximum weightint minWeight = Integer.MAX_VALUE; // The minimum weightfinal LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();int weightSum = 0;for (int i = 0; i < length; i++) {int weight = getWeight(invokers.get(i), invocation);maxWeight = Math.max(maxWeight, weight); // Choose the maximum weightminWeight = Math.min(minWeight, weight); // Choose the minimum weightif (weight > 0) {invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));weightSum += weight;}}AtomicPositiveInteger sequence = sequences.get(key);if (sequence == null) {sequences.putIfAbsent(key, new AtomicPositiveInteger());sequence = sequences.get(key);}int currentSequence = sequence.getAndIncrement();if (maxWeight > 0 && minWeight < maxWeight) {int mod = currentSequence % weightSum;for (int i = 0; i < maxWeight; i++) {for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {final Invoker<T> k = each.getKey();final IntegerWrapper v = each.getValue();if (mod == 0 && v.getValue() > 0) {return k;}if (v.getValue() > 0) {v.decrement();mod--;}}}}// Round robinreturn invokers.get(currentSequence % length);}private static final class IntegerWrapper {private int value;public IntegerWrapper(int value) {this.value = value;}public int getValue() {return value;}public void setValue(int value) {this.value = value;}public void decrement() {this.value--;}}}

最少调用

LeastActiveLoadBalance

public class LeastActiveLoadBalance extends AbstractLoadBalance {public static final String NAME = "leastactive";private final Random random = new Random();protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {int length = invokers.size(); // Number of invokersint leastActive = -1; // The least active value of all invokersint leastCount = 0; // The number of invokers having the same least active value (leastActive)int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)int totalWeight = 0; // The sum of weightsint firstWeight = 0; // Initial value, used for comparisionboolean sameWeight = true; // Every invoker has the same weight value?for (int i = 0; i < length; i++) {Invoker<T> invoker = invokers.get(i);int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active numberint weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weightif (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.leastActive = active; // Record the current least active valueleastCount = 1; // Reset leastCount, count again based on current leastCountleastIndexs[0] = i; // ResettotalWeight = weight; // ResetfirstWeight = weight; // Record the weight the first invokersameWeight = true; // Reset, every invoker has the same weight value?} else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.leastIndexs[leastCount++] = i; // Record index number of this invokertotalWeight += weight; // Add this invoker's weight to totalWeight.// If every invoker has the same weight?if (sameWeight && i > 0&& weight != firstWeight) {sameWeight = false;}}}// assert(leastCount > 0)if (leastCount == 1) {// If we got exactly one invoker having the least active value, return this invoker directly.return invokers.get(leastIndexs[0]);}if (!sameWeight && totalWeight > 0) {// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.int offsetWeight = random.nextInt(totalWeight);// Return a invoker based on the random value.for (int i = 0; i < leastCount; i++) {int leastIndex = leastIndexs[i];offsetWeight -= getWeight(invokers.get(leastIndex), invocation);if (offsetWeight <= 0)return invokers.get(leastIndex);}}// If all invokers have the same weight value or totalWeight=0, return evenly.return invokers.get(leastIndexs[random.nextInt(leastCount)]);}
}

一致hash

使用consistenthash

<dubbo:reference id="userService" check="false" interface="com.study.dubbo.userapi.service.UserService" loadbalance="consistenthash"/>
public class ConsistentHashLoadBalance extends AbstractLoadBalance {private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();@SuppressWarnings("unchecked")@Overrideprotected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();int identityHashCode = System.identityHashCode(invokers);ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);if (selector == null || selector.identityHashCode != identityHashCode) {selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));selector = (ConsistentHashSelector<T>) selectors.get(key);}return selector.select(invocation);}private static final class ConsistentHashSelector<T> {private final TreeMap<Long, Invoker<T>> virtualInvokers;private final int replicaNumber;private final int identityHashCode;private final int[] argumentIndex;ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {this.virtualInvokers = new TreeMap<Long, Invoker<T>>();this.identityHashCode = identityHashCode;URL url = invokers.get(0).getUrl();this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));argumentIndex = new int[index.length];for (int i = 0; i < index.length; i++) {argumentIndex[i] = Integer.parseInt(index[i]);}for (Invoker<T> invoker : invokers) {String address = invoker.getUrl().getAddress();for (int i = 0; i < replicaNumber / 4; i++) {byte[] digest = md5(address + i);for (int h = 0; h < 4; h++) {long m = hash(digest, h);virtualInvokers.put(m, invoker);}}}}public Invoker<T> select(Invocation invocation) {String key = toKey(invocation.getArguments());byte[] digest = md5(key);return selectForKey(hash(digest, 0));}private String toKey(Object[] args) {StringBuilder buf = new StringBuilder();for (int i : argumentIndex) {if (i >= 0 && i < args.length) {buf.append(args[i]);}}return buf.toString();}private Invoker<T> selectForKey(long hash) {Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();if (entry == null) {entry = virtualInvokers.firstEntry();}return entry.getValue();}private long hash(byte[] digest, int number) {return (((long) (digest[3 + number * 4] & 0xFF) << 24)| ((long) (digest[2 + number * 4] & 0xFF) << 16)| ((long) (digest[1 + number * 4] & 0xFF) << 8)| (digest[number * 4] & 0xFF))& 0xFFFFFFFFL;}private byte[] md5(String value) {MessageDigest md5;try {md5 = MessageDigest.getInstance("MD5");} catch (NoSuchAlgorithmException e) {throw new IllegalStateException(e.getMessage(), e);}md5.reset();byte[] bytes;try {bytes = value.getBytes("UTF-8");} catch (UnsupportedEncodingException e) {throw new IllegalStateException(e.getMessage(), e);}md5.update(bytes);return md5.digest();}}}

构造函数中,每个实际的提供者均有160个(默认值,可调整)虚拟节点,每个提供者对应的虚拟节点将平均散列到哈希环上,当有请求时,先计算该请求参数对应的哈希值,然后顺时针寻找最近的虚拟节点,得到实际的提供者节点。

ConsistentHashLoadBalance: 一致性Hash,相同参数的请求总是发到同一提供者。
当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。

dubbo2.6源码-负载均衡相关推荐

  1. Dubbo2.7源码分析-SPI的应用

    SPI简介 SPI是Service Provider Interface的缩写,即服务提供接口(翻译出来好绕口,还是不翻译的好),实质上是接口,作用是对外提供服务. SPI是Java的一种插件机制,可 ...

  2. 【源码】均衡优化器Equilibrium Optimizer(EO)

    EO是受控制体积质量平衡的启发来估计动态和平衡状态的. EO is inspired by control volume mass balance to estimate both dynamic a ...

  3. java warmup,20. dubbo源码-预热warmup过程

    阿飞Javaer,转载请注明原创出处,谢谢! 前言 今天群里小伙伴黄晓峰VIVO咨询一个问题:"dubbo接口怎么做预热呢,每次上线,都会有一小部分超时?",熟悉JVM都知道,JV ...

  4. Spring Cloud Alibaba - 06 RestTemplate 实现自定义负载均衡算法

    文章目录 负载均衡分类 分析 工程 调用 测试 源码 负载均衡分类 服务端负载均衡 ,比如我们常见的ng 客户端负载均衡 ,比如微服务体系中的ribbon spring cloud ribbon是 基 ...

  5. nginx前言 - 负载均衡

    前言 什么叫做负载均衡: 意思是将负载(工作任务,访问请求)进行平衡.分摊到多个操作单元(服务器,组件)上进行执行.是解决高性能,单点故障(高可用),扩展性(水平伸缩)的终极解决方案.我们希望所有服务 ...

  6. 开发直播源码需要了解哪些原理?小白必看内容

    在直播源码开发的过程中会涉及到很多技术细节和原理,只有对这些技术细节和原理有了了解,才能实现更高效率的开发.接下来我们就一起来看一下开发直播源码需要了解哪些原理吧. [一个完整直播源码架构] [一个完 ...

  7. 负载均衡的三种传输模式

    在说之前我们先了解下什么说负载均衡,以下是我在百度文科上找到的解释. 负载均衡建立在现有网络结构之上,它提供了一种廉价有效透明的方法扩展网络设备和服务器的带宽.增加吞吐量.加强网络数据处理能力.提高网 ...

  8. 什么是负载均衡(SLB)

    什么是负载均衡(SLB) SLB(服务器负载均衡):在多个提供相同服务的服务器的情况下,负载均衡设备存在虚拟服务地址,当大量客户端从外部访问虚拟服务IP地址时,负载均衡设备将这些报文请求根据负载均衡算 ...

  9. 什么是负载均衡–SLB

    SLB(服务器负载均衡):在多个提供相同服务的服务器的情况下,负载均衡设备存在虚拟服务地址,当大量客户端从外部访问虚拟服务IP地址时,负载均衡设备将这些报文请求根据负载均衡算法,将流量均衡的分配给后台 ...

最新文章

  1. python投掷骰子实验_Python小程序--模拟掷骰子
  2. RMAN备份与ORA-19625ORA-19502
  3. 深度剖析Zabbix Web scenarios数据表结构
  4. session实现登录
  5. Erlang TCP Socket的接收进程的2种方案
  6. 博鳌“‘AI+时代’来了吗”分论坛,嘉宾们有何重要观点?...
  7. Vue 2.0 入门系列(15)学习 Vue.js 需要掌握的 es6 (2)
  8. matlab 0-100随机数,添加到100的随机数:matlab
  9. JS学习笔记5-JavaScript 变量
  10. Nginx部署静态页面及引用图片有效访问的两种方式
  11. 常州模拟赛d7t2 数组
  12. 计算机视觉 | 面试题:26、LBP算法原理
  13. Java对接ChinaPay提现(公私钥方式)
  14. 视频流媒体服务器智能云终端如何快速获取直播流地址?
  15. 浅层神经网络和深层神经网络介绍
  16. 抢不到回家的票,还真不是12306技术不行
  17. 操作系统——进程调度
  18. 乘法/积运算和符号(点乘/内积/数量积,叉乘/向量积,矩阵乘法,Hadamard, Kronecker积,卷积)一网打尽
  19. FX5 C的编程语言,三菱 FX5UC系列PLC性能规格/参数说明
  20. (div,p)等标签之间“分割线”的两种实现方式

热门文章

  1. 零基础带你学习MySQL—修改表(六)
  2. 电脑ip4和ip6的怎么选择?
  3. 家里安装宽带,另一个房子相距150米怎么连网?
  4. 一定要吃透的四个人性真相
  5. 为什么现在那么多人都想做电商?
  6. 如果“王思聪”们创业就一定能成功
  7. 有没有能把excel表弄成线上多人填报的办法?
  8. 现在装修还有必要铺设网线吗?
  9. Spring_day4
  10. sql azure 语法_Azure Data Studio中SQL代码段