年前聊了Eureka和Zookeeper的区别,然后微服务架构系列就鸽了三个多月,一直沉迷逛B站,无法自拔。最近公司复工,工作状态慢慢恢复(又是元气满满地划水)。本文从以下3个方面进行分析(参考了翟永超[程序猿DD])的《Spring Cloud微服务实战》

  1. LoadBalancerInterceptor拦截器对RestTemplate的请求拦截;
  2. RibbonLoadBalancerClient实际接口实现;
  3. 负载均衡策略

1、LoadBalancerInterceptor源码

public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {private LoadBalancerClient loadBalancer;private LoadBalancerRequestFactory requestFactory;public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,LoadBalancerRequestFactory requestFactory) {this.loadBalancer = loadBalancer;this.requestFactory = requestFactory;}public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {// for backwards compatibilitythis(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));}@Overridepublic ClientHttpResponse intercept(final HttpRequest request, final byte[] body,final ClientHttpRequestExecution execution) throws IOException {final URI originalUri = request.getURI();String serviceName = originalUri.getHost();Assert.state(serviceName != null,"Request URI does not contain a valid hostname: " + originalUri);return this.loadBalancer.execute(serviceName,this.requestFactory.createRequest(request, body, execution));}}

可以看出,该拦截器注入了LoadBalancerClient实例,当一个被@LoadBalanced修饰的RestTemplate对象发起Http请求,会被LoadBalancerInterceptor中的intercept函数拦截。该函数会通过getHost()获取Http请求的服务名,恰巧我们使用的RestTemplate对象采用服务名作为Host,接着loadBalancer查找到对应服务名的服务,调用execute函数对该服务发起请求。

2、RibbonLoadBalancerClient实现

/*** New: Execute a request by selecting server using a 'key'. The hint will have to be* the last parameter to not mess with the `execute(serviceId, ServiceInstance,* request)` method. This somewhat breaks the fluent coding style when using a lambda* to define the LoadBalancerRequest.* @param <T> returned request execution result type* @param serviceId id of the service to execute the request to* @param request to be executed* @param hint used to choose appropriate {@link Server} instance* @return request execution result* @throws IOException executing the request may result in an {@link IOException}*/public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)throws IOException {ILoadBalancer loadBalancer = getLoadBalancer(serviceId);Server server = getServer(loadBalancer, hint);if (server == null) {throw new IllegalStateException("No instances available for " + serviceId);}RibbonServer ribbonServer = new RibbonServer(serviceId, server,isSecure(server, serviceId),serverIntrospector(serviceId).getMetadata(server));return execute(serviceId, ribbonServer, request);}

经过LoadBalancerInterceptor拦截器后,调用LoadBalancerClient的execute函数去发起对应服务的请求。(LoadBalancerClient只是个抽象的负载均衡接口,RibbonLoadBalancerClient则是该接口的具体实现)
execute函数的作用,如官方所说:通过‘key’找到对应的服务并执行请求。
从源码中可以看出,execute函数具体实现首先是定义一个传入serviceId的loadBalancer对象,再getServer获取对应的具体服务,最后通过ribbonServer整合一系列服务信息发起请求。
其中getServer()是关键操作,来看看对应的源码:

protected Server getServer(ILoadBalancer loadBalancer, Object hint) {if (loadBalancer == null) {return null;}// Use 'default' on a null hint, or just pass it on?return loadBalancer.chooseServer(hint != null ? hint : "default");}

显然,需要再深入看下loadBalancer 。

public interface ILoadBalancer {/*** Initial list of servers.* This API also serves to add additional ones at a later time* The same logical server (host:port) could essentially be added multiple times* (helpful in cases where you want to give more "weightage" perhaps ..)* * @param newServers new servers to add*/public void addServers(List<Server> newServers);/*** Choose a server from load balancer.* * @param key An object that the load balancer may use to determine which server to return. null if *         the load balancer does not use this parameter.* @return server chosen*/public Server chooseServer(Object key);/*** To be called by the clients of the load balancer to notify that a Server is down* else, the LB will think its still Alive until the next Ping cycle - potentially* (assuming that the LB Impl does a ping)* * @param server Server to mark as down*/public void markServerDown(Server server);/*** @deprecated 2016-01-20 This method is deprecated in favor of the* cleaner {@link #getReachableServers} (equivalent to availableOnly=true)* and {@link #getAllServers} API (equivalent to availableOnly=false).** Get the current list of servers.** @param availableOnly if true, only live and available servers should be returned*/@Deprecatedpublic List<Server> getServerList(boolean availableOnly);/*** @return Only the servers that are up and reachable.*/public List<Server> getReachableServers();/*** @return All known servers, both reachable and unreachable.*/public List<Server> getAllServers();
}

ILoadBalancer定义了客户端负载均衡器的一系列抽象操作接口,从官方说明看出:

  • addServers:向负载均衡器的实例列表中添加新的服务实例
  • chooseServer:通过某种策略,挑选出一个具体的服务实例
  • markServerDown:通知并标识负载均衡器中某个具体服务实例已停止服务,不然的话,负载均衡器在下一次获取具体服务实例的时候,还会以为该服务正常
  • getReachableServers:获取可正常使用的服务实例列表
  • getAllServers:获取所有服务实例列表,包括正常和停止的

来看看具体实现BaseLoadBalancer,

public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,IPing ping, IPingStrategy pingStrategy) {logger.debug("LoadBalancer [{}]:  initialized", name);this.name = name;this.ping = ping;this.pingStrategy = pingStrategy;setRule(rule);setupPingTask();lbStats = stats;init();}

默认构造函数ping设为null,rule策略默认设为轮询(RoundRobin)。该构造函数除了基本的赋值之外,主要是setRule(设置负载均衡策略)和setupPingTask(启动ping心跳任务)。

void setupPingTask() {if (canSkipPing()) {return;}if (lbTimer != null) {lbTimer.cancel();}lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,true);lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);forceQuickPing();}

setupPingTask逻辑主要是定义ShutdownEnabledTimer实例来执行一个10秒间隔的schedule。timer定时器还定义了个PingTask任务

class PingTask extends TimerTask {public void run() {try {new Pinger(pingStrategy).runPinger();} catch (Exception e) {logger.error("LoadBalancer [{}]: Error pinging", name, e);}}}

官方注释中,TimerTask会在自定义的时间间隔内检查服务实例列表中每个服务实例的运行状态。
再看看PingTask 任务里runPinger方法的关键逻辑:

                results = pingerStrategy.pingServers(ping, allServers);final List<Server> newUpList = new ArrayList<Server>();final List<Server> changedServers = new ArrayList<Server>();for (int i = 0; i < numCandidates; i++) {boolean isAlive = results[i];Server svr = allServers[i];boolean oldIsAlive = svr.isAlive();svr.setAlive(isAlive);if (oldIsAlive != isAlive) {changedServers.add(svr);logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));}if (isAlive) {newUpList.add(svr);}}upLock = upServerLock.writeLock();upLock.lock();upServerList = newUpList;upLock.unlock();notifyServerStatusChangeListener(changedServers);

从源码可以看出,PingTask运行runPinger方法,根据pingerStrategy.pingServers(ping, allServers)来获取服务的可用性,然后对比前后服务的状态,如果状态一致,则不去EurekaClient(一般用Eureka作为注册中心,可换成其他注册中心)获取注册列表;否则,则调用notifyServerStatusChangeListener通知EurekaClient更新或重新拉取。

简单总结下完整的过程:
RibbonLoadBalancerClient(负载均衡客户端)初始化(调用execute),通过ILoadBalance从Eureka注册中心获取服务注册列表,同时以10s为间隔往EurekaClient发送ping,来保证服务的可用性,如果服务前后发生改变,则ILoadBalance重新从Eureka注册中心获取。RibbonLoadBalancerClient拿到服务注册列表之后,再根据IRule具体的策略,去获取对应的服务实例。

3、负载均衡策略

前面讲到RibbonLoadBalancerClient获取具体服务实例的过程,这里就需要了解下负载均衡策略。众所周知,使用负载均衡的好处主要有:当一台或多台机器宕机之后,剩余的机器可以保证服务正常运行;分担机器运行的压力,防止某一高峰机器CPU负载过高。
常见的策略有:随机(Random)、轮询(RoundRobin)、一致性哈希(ConsistentHash)、哈希(Hash)、加权(Weighted)

  • 轮询(RoundRobin)
public Server choose(ILoadBalancer lb, Object key) {if (lb == null) {log.warn("no load balancer");return null;}Server server = null;int count = 0;while (server == null && count++ < 10) {List<Server> reachableServers = lb.getReachableServers();List<Server> allServers = lb.getAllServers();int upCount = reachableServers.size();int serverCount = allServers.size();if ((upCount == 0) || (serverCount == 0)) {log.warn("No up servers available from load balancer: " + lb);return null;}int nextServerIndex = incrementAndGetModulo(serverCount);server = allServers.get(nextServerIndex);if (server == null) {/* Transient. */Thread.yield();continue;}if (server.isAlive() && (server.isReadyToServe())) {return (server);}// Next.server = null;}if (count >= 10) {log.warn("No available alive servers after 10 tries from load balancer: "+ lb);}return server;}
private int incrementAndGetModulo(int modulo) {for (;;) {int current = nextServerCyclicCounter.get();int next = (current + 1) % modulo;if (nextServerCyclicCounter.compareAndSet(current, next))return next;}}

轮询算法其实就一句(current + 1) % modulo,每次都取下一台服务器。

  • 随机(Random)
    choose方法其实都差不多,主要看下算法
protected int chooseRandomInt(int serverCount) {return ThreadLocalRandom.current().nextInt(serverCount);}

ThreadLocalRandom获取随机数即可

  • 一致性哈希(ConsistentHash)、哈希(Hash)
    这两个是很常见的算法,本文就不讨论了
  • 加权(Weighted)、BestAvailableRule、WeightedResponseTimeRule、ZoneAvoidanceRule

    负载均衡策略方法

    这个研究起来就又要长篇大论了,下次再写篇来介绍吧(下次一定)

Ribbon的源码分析大概就这样,后面可能会不定期更新,有兴趣的朋友可以继续深入了解下,有啥问题也可以在评论中一起讨论下。
最后有件很重要的事,那就是麻烦点赞关注赞赏,谢谢(๑•̀ㅂ•́)و✧

本文首发于java黑洞网,csdn同步更新

Spring Cloud源码分析——Ribbon客户端负载均衡相关推荐

  1. Spring Cloud源码分析(二)Ribbon(续)

    因文章长度限制,故分为两篇.上一篇:<Spring Cloud源码分析(二)Ribbon> 负载均衡策略 通过上一篇对Ribbon的源码解读,我们已经对Ribbon实现的负载均衡器以及其中 ...

  2. Spring Cloud源码分析(二)Ribbon

    断断续续看Ribbon的源码差不多也有7-8天了,总算告一段落.本文记录了这些天对源码的阅读过程与一些分析理解,如有不对还请指出. 友情提示:本文较长,请选择一个较为舒适的姿势来阅读 在之前介绍使用R ...

  3. Spring Cloud源码分析(一)Eureka

    看过之前文章的朋友们,相信已经对Eureka的运行机制已经有了一定的了解.为了更深入的理解它的运作和配置,下面我们结合源码来分别看看服务端和客户端的通信行为是如何实现的.另外写这篇文章,还有一个目的, ...

  4. Spring Cloud入门教程(二):客户端负载均衡(Ribbon)

    对于大型应用系统负载均衡(LB:Load Balancing)是首要被解决一个问题.在微服务之前LB方案主要是集中式负载均衡方案,在服务消费者和服务提供者之间又一个独立的LB,LB通常是专门的硬件,如 ...

  5. Spring Cloud源码分析(四)Zuul:核心过滤器

    通过之前发布的<Spring Cloud构建微服务架构(五)服务网关>一文,相信大家对于Spring Cloud Zuul已经有了一个基础的认识.通过前文的介绍,我们对于Zuul的第一印象 ...

  6. Spring Cloud源码分析之Eureka篇第三章:EnableDiscoveryClient与EnableEurekaClient的区别(Edgware版本)

    在基于SpringCloud做开发的时候,EnableDiscoveryClient和EnableEurekaClient这两个注解我们并不陌生,今天就来聊聊它们的区别,和网上更早期的类似文章不同的是 ...

  7. Spring AOP 源码分析 - 拦截器链的执行过程

    1.简介 本篇文章是 AOP 源码分析系列文章的最后一篇文章,在前面的两篇文章中,我分别介绍了 Spring AOP 是如何为目标 bean 筛选合适的通知器,以及如何创建代理对象的过程.现在我们的得 ...

  8. zookeeper源码分析之三客户端发送请求流程

    znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的 ...

  9. Spring AOP 源码分析 - 创建代理对象

    1.简介 在上一篇文章中,我分析了 Spring 是如何为目标 bean 筛选合适的通知器的.现在通知器选好了,接下来就要通过代理的方式将通知器(Advisor)所持有的通知(Advice)织入到 b ...

最新文章

  1. 背包思想计算方案的总数(货币系统)
  2. linux/windows下安装scala
  3. ASP.NET Core重写个人博客站点小结
  4. BAPI_SALESORDER_CREATEFROMDAT2 BAPI创建VA01 销售订单
  5. linux的yun命令是访问互联网,如何在Linux终端中知道你的公有IP?
  6. python standardscaler_Python快速实战机器学习之数据预处理
  7. 深度学习笔记(30) Inception网络
  8. iframe操作ie,firefox兼容
  9. ASP.NET 学习日志
  10. 头目一天不来,就公然上班睡觉,主管怎么当得
  11. 宝峰c1对讲机写频软件_宝峰对讲机写频软件(BF-480 编程软件)
  12. MCU低功耗设计(一)理论
  13. 为互联网IT人打造的中文版awesome-go
  14. ie6 html 模板,网页排版应该考虑IE6的兼容性问题_HTML/Xhtml_网页制作
  15. 通俗科普:弦论要求空间必须是25维的解释
  16. EasyPusher进行Android UVC外接摄像头直播推送实现方法
  17. javascript代码前端debug调试方法
  18. 参考文献格式字号字体_实用文档其他之参考文献的正确格式要求参考文献字体格式要求...
  19. 陈一舟:社交网络将彻底颠覆广告业
  20. 正则表达式提取字符串中的手机号码

热门文章

  1. Initialization failed for Block pool
  2. matlab思考,关于Matlab编程的思考(待续)
  3. 2021年NBA季后赛第三轮晋级预测
  4. 微信小程序自带地图_微信小程序之map地图
  5. STM32F103:二.(4)控制SG90舵机
  6. 12002.i2ctools工具
  7. 通过AT指令控制ESP8266
  8. STC51-C51基础知识
  9. Linux C语言操作SQLite数据库
  10. 惠普用的是微软服务器吗,惠普抛弃MediaSmart服务器 微软表示淡定