本文基于SpringCloud-Dalston.SR5

我们继续逐个分析

  1. 所有Ribbon负载均衡器需要实现的接口IClient
  2. 服务实例列表维护机制实现的接口ServerList
  3. 负载均衡数据记录LoadBalancerStats
  4. 负责选取Server的接口ILoadBalancer
  5. 负载均衡选取规则实现的接口IRule
  6. 检查实例是否存活实现的接口IPing
  7. 服务实例列表更新机制实现的接口ServerListUpdater
  8. 服务实例列表过滤机制ServerListFilter

4. 负责选取Server的接口ILoadBalancer

ILoadBalancer负责存储并更新服务实例列表,并调用IRule(即根据配置的负载均衡规则)来返回Server以供于服务调用

这里,我们只看默认的ZoneAwareLoadBalancer相关的

AbstractLoadBalancer.java

public abstract class AbstractLoadBalancer implements ILoadBalancer {public enum ServerGroup{ALL,STATUS_UP,STATUS_NOT_UP        }public Server chooseServer() {return chooseServer(null);}public abstract List<Server> getServerList(ServerGroup serverGroup);public abstract LoadBalancerStats getLoadBalancerStats();
}

AbstractLoadBalancer在原有ILoadBalancer接口基础上,增加了按照分组获取Server的方法,有ALL,STATUS_UP,STATUS_NOT_UP三种组别。同时还增加了LoadBalancerStats,记录每次请求的Server的负载均衡统计数据ServerStat,以及其他一些的实时记录信息。之后在介绍具体的LoadBalancer实现的时候,会用到

BaseLoadBalancer.java

BaseLoadBalancer是负载均衡的基本实现,包含如下元素:

1)两个列表:所有Server列表,还有所有Up Server的列表:

protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());
protected volatile List<Server> upServerList = Collections.synchronizedList(new ArrayList<Server>());//更新两个列表的锁
protected ReadWriteLock allServerLock = new ReentrantReadWriteLock();
protected ReadWriteLock upServerLock = new ReentrantReadWriteLock();

2)定时PING任务相关的元素,为了定时检查Server是否UP的任务:

private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
protected IPing ping = null;protected Timer lbTimer = null;
protected int pingIntervalSeconds = 10;
protected int maxTotalPingTimeSeconds = 5;
protected Comparator<Server> serverComparator = new ServerComparator();
protected AtomicBoolean pingInProgress = new AtomicBoolean(false);

一般的在构造一个BaseLoadBalancer时候,会调用setupPingTask构造一个定时ping的任务:


void setupPingTask() {//判断是否需要pingif (canSkipPing()) {return;}//关闭之前已经开启的定时ping的任务if (lbTimer != null) {lbTimer.cancel();}//设置PingTask,默认是每10秒一次lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,true);lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);//这个可能有些多余,因为定时任务也是立即执行,就会pingforceQuickPing();
}//如果ping为null,或者为DummyPing,就不用定时Ping了
private boolean canSkipPing() {if (ping == null|| ping.getClass().getName().equals(DummyPing.class.getName())) {// default ping, no need to set up timerreturn true;} else {return false;}
}

Pinger包含了如何去Ping所有的Server的逻辑:

class Pinger {//ping每个server的方式,默认就是普通遍历private final IPingStrategy pingerStrategy;public Pinger(IPingStrategy pingerStrategy) {this.pingerStrategy = pingerStrategy;}public void runPinger() throws Exception {//如果设置失败,则证明,当前Ping的任务正在执行中(执行时间大于定时任务的周期)if (!pingInProgress.compareAndSet(false, true)) { return; // Ping in progress - nothing to do}// we are "in" - we get to PingServer[] allServers = null;boolean[] results = null;Lock allLock = null;Lock upLock = null;try {//读取所有Server的列表,需要获取读锁allLock = allServerLock.readLock();allLock.lock();//先读取之后再遍历,减少锁时间allServers = allServerList.toArray(new Server[allServerList.size()]);allLock.unlock();//ping每个Serverint numCandidates = allServers.length;results = pingerStrategy.pingServers(ping, allServers);final List<Server> newUpList = new ArrayList<Server>();final List<Server> changedServers = new ArrayList<Server>();//更新所有Server的状态for (int i = 0; i < numCandidates; i++) {boolean isAlive = results[i];Server svr = allServers[i];boolean oldIsAlive = svr.isAlive();svr.setAlive(isAlive);if (oldIsAlive != isAlive) {changedServers.add(svr);logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));}if (isAlive) {newUpList.add(svr);}}//更新Up Server列表需要获取写锁upLock = upServerLock.writeLock();upLock.lock();upServerList = newUpList;upLock.unlock();//通知监听Server状态变化的ListenernotifyServerStatusChangeListener(changedServers);} finally {//任务结束,需要设置状态位pingInProgress.set(false);}}
}

IPingStrategy目前只有一种默认实现,就是SerialPingStrategy,依次串行遍历Ping每个Server:

private static class SerialPingStrategy implements IPingStrategy {@Overridepublic boolean[] pingServers(IPing ping, Server[] servers) {int numCandidates = servers.length;boolean[] results = new boolean[numCandidates];logger.debug("LoadBalancer:  PingTask executing [{}] servers configured", numCandidates);for (int i = 0; i < numCandidates; i++) {results[i] = false; /* Default answer is DEAD. */try {//因为在SpringCloud的环境下,默认ping是基于访问本地内存的Eureka缓存的列表,所有串行挨个ping也不会有太大的性能影响if (ping != null) {results[i] = ping.isAlive(servers[i]);}} catch (Exception e) {logger.error("Exception while pinging Server: '{}'", servers[i], e);}}return results;}
}

3)记录每次负载均衡统计数据的LoadBalancerStats
这个暂且不表,在后面具体介绍负载均衡选取规则的时候,会用到这里面的统计数据

protected LoadBalancerStats lbStats;

4)用于预热连接的逻辑类PrimeConnections

private PrimeConnections primeConnections;
private volatile boolean enablePrimingConnections = false;

但是这个配置默认是不开启的。

5)监听Server列表变化和Server状态变化的Listener

private List<ServerListChangeListener> changeListeners = new CopyOnWriteArrayList<ServerListChangeListener>();private List<ServerStatusChangeListener> serverStatusListeners = new CopyOnWriteArrayList<ServerStatusChangeListener>();

选择Server的方法非常简单,就是调用IRule来选取:

public Server chooseServer(Object key) {if (counter == null) {counter = createCounter();}counter.increment();if (rule == null) {return null;} else {try {return rule.choose(key);} catch (Exception e) {logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);return null;}}
}

DynamicServerListLoadBalancer.java

DynamicServerListLoadBalancer在BaseLoadBalancer的基础上,增加了之前提到的:服务实例列表维护机制实现的接口ServerList、服务实例列表更新机制实现的接口ServerListUpdater和服务实例列表过滤机制ServerListFilter。利用这些元素实现服务实例列表的更新

ZoneAwareLoadBalancer.java

ZoneAwareLoadBalancer则是进一步增加了对于Zone的感知。利用一个ConcurrentHashMap来维护不同Zone下的负载均衡数据,并且不同的Zone可以设置不同的Rule:

 private ConcurrentHashMap<String, BaseLoadBalancer> balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();

我们可以通过setRule方法给不同的Zone设置不同的IRule

public void setRule(IRule rule) {super.setRule(rule);if (balancers != null) {for (String zone: balancers.keySet()) {balancers.get(zone).setRule(cloneRule(rule));}}
}

查看负载均衡方法:

public Server chooseServer(Object key) {//如果未启用(默认是启用的,可以通过ZoneAwareNIWSDiscoveryLoadBalancer.enabled修改),或者可用区域不大于1,则调用BaseLoadBalancer的choose方法选取if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {logger.debug("Zone aware logic disabled or there is only one zone");return super.chooseServer(key);}Server server = null;try {LoadBalancerStats lbStats = getLoadBalancerStats();//获取当前负载均衡数据的快照Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);logger.debug("Zone snapshots: {}", zoneSnapshot);//获取最大负载阈值配置if (triggeringLoad == null) {triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d);}//获取熔断实例比例配置if (triggeringBlackoutPercentage == null) {triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty("ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d);}//获取当前有效的可用区域(利用最大负载阈值配置和熔断实例比例配置)Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());logger.debug("Available zones: {}", availableZones);if (availableZones != null &&  availableZones.size() < zoneSnapshot.keySet().size()) {//随机选取zoneString zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);logger.debug("Zone chosen: {}", zone);if (zone != null) {BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);//在选取的zone下选取serverserver = zoneLoadBalancer.chooseServer(key);}}} catch (Exception e) {logger.error("Error choosing server using zone aware logic for load balancer={}", name, e);}if (server != null) {return server;} else {logger.debug("Zone avoidance logic is not invoked.");return super.chooseServer(key);}
}

如何筛选的AvailableZone:

public static Set<String> getAvailableZones(Map<String, ZoneSnapshot> snapshot, double triggeringLoad,double triggeringBlackoutPercentage) {if (snapshot.isEmpty()) {return null;}Set<String> availableZones = new HashSet<String>(snapshot.keySet());if (availableZones.size() == 1) {return availableZones;}Set<String> worstZones = new HashSet<String>();double maxLoadPerServer = 0;boolean limitedZoneAvailability = false;for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {String zone = zoneEntry.getKey();ZoneSnapshot zoneSnapshot = zoneEntry.getValue();int instanceCount = zoneSnapshot.getInstanceCount();//可用区域数量为0,这个可用区需要被移除if (instanceCount == 0) {availableZones.remove(zone);limitedZoneAvailability = true;} else {double loadPerServer = zoneSnapshot.getLoadPerServer();//熔断的实例个数/实例数量大于triggeringBlackoutPercentage,这个可用区需要被移除if (((double) zoneSnapshot.getCircuitTrippedCount())/ instanceCount >= triggeringBlackoutPercentage|| loadPerServer < 0) {availableZones.remove(zone);limitedZoneAvailability = true;} else {//寻找平均负载最高的可用区,将它添加到worstZones集合if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {// they are the same considering double calculation// round errorworstZones.add(zone);} else if (loadPerServer > maxLoadPerServer) {maxLoadPerServer = loadPerServer;worstZones.clear();worstZones.add(zone);}}}}//如果最大负载小于负载阈值,并且没有被移除过可用区,就直接返回当前结果if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {// zone override is not needed herereturn availableZones;}String zoneToAvoid = randomChooseZone(snapshot, worstZones);if (zoneToAvoid != null) {availableZones.remove(zoneToAvoid);}return availableZones;}

Spring Cloud Ribbon 全解 (4) - 基本组件实现源码(2)相关推荐

  1. Spring Cloud Ribbon 全解 (1) - 总览篇

    本文基于SpringCloud-Dalston.SR5 Ribbon是一个客户端负载均衡解决方案,简单来说,就是从Eureka获取可用服务实例列表,然后将请求根据某种策略发到这些实例上面执行 What ...

  2. java版Spring Cloud+Vue 前后端分离+VR全景电子商务源码

    涉及平台:平台管理(包含自营店面).买家平台(PC端.H5/公众号.小程序.APP端(IOS/Android).微服务(了解源码可+求球:1零③8七7④62陆) 核心架构:Spring Cloud.S ...

  3. Spring Cloud Ribbon的使用详解

    目录 一.概述 1.Ribbon是什么 2.Ribbon能干什么 3.Ribbon现状 4.未来替代方案 5.架构说明 二.RestTemplate 用法详解 三.Ribbon核心组件IRule 四. ...

  4. Java微服务组件Spring cloud ribbon源码分析

    微服务组件Spring Cloud Ribbon源码分析_哔哩哔哩_bilibili Ribbon源码分析 | ProcessOn免费在线作图,在线流程图,在线思维导图 | 1.什么是ribbon? ...

  5. Spring Cloud Ribbon负载均衡策略详解

    通过之前的文章可以知道, Ribbon负载均衡器选择服务实例的方式是通过"选择策略"实现的, Ribbon实现了很多种选择策略,UML静态类图如上图. IRule是负载均衡的策略接 ...

  6. 技术-2022-05-《Spring cloud Alibaba全解》阅读笔记

    技术-2022-05-<Spring cloud Alibaba全解> 创建时间: 2022/5/22 15:21 更新时间: 2023/2/17 8:52 作者: HelloXF 第一章 ...

  7. Spring Cloud原理详解

    概述 毫无疑问,Spring Cloud是目前微服务架构领域的翘楚,无数的书籍博客都在讲解这个技术.不过大多数讲解还停留在对Spring Cloud功能使用的层面,其底层的很多原理,很多人可能并不知晓 ...

  8. 3 Spring Cloud Ribbon

    Spring Cloud Ribbon Spring Cloud Ribbon 是一套基于 Netflix Ribbon 实现的客户端负载均衡和服务调用工具. Netflix Ribbon 是 Net ...

  9. Spring Cloud的负载均衡Spring Cloud Ribbon和Spring Cloud Feign

    一.客户端负载均衡:Spring Cloud Ribbon. Spring Cloud Ribbon是基于HTTP和TCP的客户端负载工具,它是基于Netflix Ribbon实现的.通过Spring ...

最新文章

  1. 数据蒋堂 | 大清单报表应当怎么做?
  2. python3精要(6)-string类的format()方法
  3. 微信公众号文章中图片加载时,占位图宽高大小的确定
  4. linux7怎么安装yum,centos7下怎么安装yum
  5. 阿里达摩院拿什么救人?
  6. [图论] 树剖LCA
  7. 新世纪音乐——天籁之音
  8. c# 操作Word总结(四)——书签使用
  9. 微信小程序公农历转换的实现
  10. java myeclipse的好处,关于MyEclipse_MyEclipse用途_MyEclipse优势
  11. RPM构建 - SPEC文件参数解析
  12. 学计算机好还是学数学好,大学专业学计算机好还是学数学好
  13. maya linux 安装教程视频,Maya快捷键插件增强工具ZooTools Pro 2.2.4 for Maya Win/Linux+ Assets pack 2.3+视频教程...
  14. 两步免费开通企业微信,不用提交资料
  15. qemu内存模型(2) 实现说明
  16. 微信缓存dat怎么转图片_PC微信dat如何转图片?方式方法
  17. 切入点表达式的写法详解
  18. html选择按键点击后锁死输入框_js Dom为页面中的元素绑定键盘或鼠标事件
  19. MP4和HR-HDTV压制教程
  20. R语言对推特twitter数据进行文本情感分析

热门文章

  1. 集合--Set集合--HashSet类、LinkedHashSet类、TreeSet类及其自然排序
  2. Vsftpd配置文件解析
  3. (附源码)spring boot毕业论文管理系统 毕业设计 030946
  4. IE11安装需要获取更新-安装失败
  5. RaSa2.5.x行为之四:表单(Forms)
  6. 综述:如何构建交通领域的基于图的深度学习架构
  7. 什么是BGP服务器,那些场景会用到?
  8. 百城千屏落地,8K超高清视频内容供给看何方?
  9. 台式电脑计算机风机,笔记本冷却风扇与台式计算机CPU风扇的质量,类型和区别...
  10. PCB生产时“补偿”“开窗”是什么意思