1. 概述

本文主要分享 EndPoint 与 解析器

  • EndPoint ,服务端点。例如,Eureka-Server 的访问地址。
  • EndPoint 解析器,将配置的 Eureka-Server 的访问地址解析成 EndPoint 。

目前有多种 Eureka-Server 访问地址的配置方式,本文只分享 Eureka 1.x 的配置,不包含 Eureka 1.x 对 Eureka 2.x 的兼容配置:

  • 第一种,直接配置实际访问地址。例如,eureka.serviceUrl.defaultZone=http://127.0.0.1:8080/v2 。
  • 第二种,基于 DNS 解析出访问地址。例如,eureka.shouldUseDns=true 并且 eureka.eurekaServer.domainName=eureka.iocoder.cn 。

本文涉及类在 com.netflix.discovery.shared.resolver 包下,涉及到主体类的类图如下( 打开大图 ):

  • 红色部分 —— EndPoint
  • 黄色部分 —— EndPoint 解析器

推荐 Spring Cloud 书籍

  • 请支持正版。下载盗版,等于主动编写低级 BUG 。
  • 程序猿DD —— 《Spring Cloud微服务实战》
  • 周立 —— 《Spring Cloud与Docker微服务架构实战》
  • 两书齐买,京东包邮。

2. EndPoint

2.1 EurekaEndpoint

com.netflix.discovery.shared.resolver.EurekaEndpoint ,Eureka 服务端点接口,实现代码如下:

public interface EurekaEndpoint extends Comparable<Object> {/*** @return 完整的服务 URL*/String getServiceUrl();/*** @deprecated use {@link #getNetworkAddress()}*/@DeprecatedString getHostName();/*** @return 网络地址*/String getNetworkAddress();/*** @return 端口*/int getPort();/*** @return 是否安全( https )*/boolean isSecure();/*** @return 相对路径*/String getRelativeUri();}

2.2 DefaultEndpoint

com.netflix.discovery.shared.resolver.DefaultEndpoint ,默认 Eureka 服务端点实现类。实现代码如下:

public class DefaultEndpoint implements EurekaEndpoint {/*** 网络地址*/protected final String networkAddress;/*** 端口*/protected final int port;/*** 是否安全( https )*/protected final boolean isSecure;/*** 相对地址*/protected final String relativeUri;/*** 完整的服务 URL*/protected final String serviceUrl;public DefaultEndpoint(String serviceUrl) {this.serviceUrl = serviceUrl;// 将 serviceUrl 分解成 几个属性try {URL url = new URL(serviceUrl);this.networkAddress = url.getHost();this.port = url.getPort();this.isSecure = "https".equals(url.getProtocol());this.relativeUri = url.getPath();} catch (Exception e) {throw new IllegalArgumentException("Malformed serviceUrl: " + serviceUrl);}}public DefaultEndpoint(String networkAddress, int port, boolean isSecure, String relativeUri) {this.networkAddress = networkAddress;this.port = port;this.isSecure = isSecure;this.relativeUri = relativeUri;// 几个属性 拼接成 serviceUrlStringBuilder sb = new StringBuilder().append(isSecure ? "https" : "http").append("://").append(networkAddress);if (port >= 0) {sb.append(':').append(port);}if (relativeUri != null) {if (!relativeUri.startsWith("/")) {sb.append('/');}sb.append(relativeUri);}this.serviceUrl = sb.toString();}
}
  • 重写了 #equals(...) 和 #hashCode(...) 方法,标准实现方式,这里就不贴代码了。
  • 重写了 #compareTo(...) 方法,基于 serviceUrl 属性做比较。

2.3 AwsEndpoint

com.netflix.discovery.shared.resolver.aws.AwsEndpoint ,基于 regionzone 的 Eureka 服务端点实现类 ( 请不要在意 AWS 开头 )。实现代码如下:

public class AwsEndpoint extends DefaultEndpoint {/*** 区域*/protected final String region;/*** 可用区*/protected final String zone;
}
  • 重写了 #equals(...) 和 #hashCode(...) 方法,标准实现方式,这里就不贴代码了。

3. 解析器

EndPoint 解析器使用委托设计模式实现。所以,上文图片中我们看到好多个解析器,实际代码非常非常非常清晰

FROM 《委托模式》
委托模式是软件设计模式中的一项基本技巧。在委托模式中,有两个对象参与处理同一个请求,接受请求的对象将请求委托给另一个对象来处理。委托模式是一项基本技巧,许多其他的模式,如状态模式、策略模式、访问者模式本质上是在更特殊的场合采用了委托模式。委托模式使得我们可以用聚合来替代继承,它还使我们可以模拟mixin。

我们在上图的基础上,增加委托的关系,如下图:

3.1 ClusterResolver

com.netflix.discovery.shared.resolver.ClusterResolver ,集群解析器接口。接口代码如下:

public interface ClusterResolver<T extends EurekaEndpoint> {/*** @return 地区*/String getRegion();/*** @return EndPoint 集群( 数组 )*/List<T> getClusterEndpoints();}

3.2 ClosableResolver

com.netflix.discovery.shared.resolver.ClosableResolver ,可关闭的解析器接口,继承自 ClusterResolver 接口。接口代码如下:

public interface ClosableResolver<T extends EurekaEndpoint> extends ClusterResolver<T> {/*** 关闭*/void shutdown();
}

3.3 DnsTxtRecordClusterResolver

com.netflix.discovery.shared.resolver.aws.DnsTxtRecordClusterResolver ,基于 DNS TXT 记录类型的集群解析器。类属性代码如下:

public class DnsTxtRecordClusterResolver implements ClusterResolver<AwsEndpoint> {/*** 地区*/private final String region;/*** 集群根地址,例如 txt.default.eureka.iocoder.cn*/private final String rootClusterDNS;/*** 是否解析可用区( zone )*/private final boolean extractZoneFromDNS;/*** 端口*/private final int port;/*** 是否安全*/private final boolean isSecure;/*** 相对地址*/private final String relativeUri;
}
  • DnsTxtRecordClusterResolver 通过集群根地址( rootClusterDNS ) 解析出 EndPoint 集群。需要在 DNS 配置两层解析记录:

    • 第一层 :

      • 主机记录 :格式为 TXT.${REGION}.${自定义二级域名} 。
      • 记录类型 :TXT 记录类型
      • 记录值 :第二层的主机记录。如有多个第二层级,使用空格分隔。
    • 第二层:
      • 主机记录 :格式为 TXT.${ZONE}.${自定义二级域名} 或者 ${ZONE}.${自定义二级域名}
      • 记录类型 :TXT 记录类型
      • 记录值 :EndPoint 的网络地址。如有多个 EndPoint,使用空格分隔。
    • 举个例子:
  • rootClusterDNS ,集群根地址。例如:txt.default.eureka.iocoder.cn,其· txt.default.eureka 为 DNS 解析记录的第一层的主机记录

  • region :地区。需要和 rootClusterDNS 的 ${REGION} 一致。

  • extractZoneFromDNS :是否解析 DNS 解析记录的第二层级的主机记录的 ${ZONE} 可用区。


#getClusterEndpoints(...) 方法,实现代码如下:

1: @Override2: public List<AwsEndpoint> getClusterEndpoints() {3:     List<AwsEndpoint> eurekaEndpoints = resolve(region, rootClusterDNS, extractZoneFromDNS, port, isSecure, relativeUri);4:     if (logger.isDebugEnabled()) {5:         logger.debug("Resolved {} to {}", rootClusterDNS, eurekaEndpoints);6:     }7:     return eurekaEndpoints;8: }9:
10: private static List<AwsEndpoint> resolve(String region, String rootClusterDNS, boolean extractZone, int port, boolean isSecure, String relativeUri) {
11:     try {
12:         // 解析 第一层 DNS 记录
13:         Set<String> zoneDomainNames = resolve(rootClusterDNS);
14:         if (zoneDomainNames.isEmpty()) {
15:             throw new ClusterResolverException("Cannot resolve Eureka cluster addresses; there are no data in TXT record for DN " + rootClusterDNS);
16:         }
17:         // 记录 第二层 DNS 记录
18:         List<AwsEndpoint> endpoints = new ArrayList<>();
19:         for (String zoneDomain : zoneDomainNames) {
20:             String zone = extractZone ? ResolverUtils.extractZoneFromHostName(zoneDomain) : null; //
21:             Set<String> zoneAddresses = resolve(zoneDomain);
22:             for (String address : zoneAddresses) {
23:                 endpoints.add(new AwsEndpoint(address, port, isSecure, relativeUri, region, zone));
24:             }
25:         }
26:         return endpoints;
27:     } catch (NamingException e) {
28:         throw new ClusterResolverException("Cannot resolve Eureka cluster addresses for root: " + rootClusterDNS, e);
29:     }
30: }
  • 第 12 至 16 行 :调用 #resolve(rootClusterDNS) 解析第一层 DNS 记录。实现代码如下:
1: private static Set<String> resolve(String rootClusterDNS) throws NamingException {2:     Set<String> result;3:     try {4:         result = DnsResolver.getCNamesFromTxtRecord(rootClusterDNS);5:         // TODO 芋艿:这块是bug,不需要这一段6:         if (!rootClusterDNS.startsWith("txt.")) {7:             result = DnsResolver.getCNamesFromTxtRecord("txt." + rootClusterDNS);8:         }9:     } catch (NamingException e) {
10:         if (!rootClusterDNS.startsWith("txt.")) {
11:             result = DnsResolver.getCNamesFromTxtRecord("txt." + rootClusterDNS);
12:         } else {
13:             throw e;
14:         }
15:     }
16:     return result;
17: }
    • 第 4 行 : 调用 DnsResolver#getCNamesFromTxtRecord(...) 方法,解析 TXT 主机记录。点击链接查看带中文注释的 DnsResolver 的代码,比较解析,笔者就不啰嗦了。
    • 第 5 至 8 行 :当传递参数 rootClusterDNS 不以 txt. 开头时,即使第 4 行解析成功,也会报错,此时是个 Eureka 的 BUG 。因此,配置 DNS 解析记录时,主机记录暂时必须以 txt. 开头。
  • 第 17 至 25 行 :循环第一层 DNS 记录的解析结果,进一步解析第二层 DNS 记录。

    • 第 20 行 :解析可用区( zone )。
    • 第 21 行 :调用 #resolve(rootClusterDNS) 解析第二层 DNS 记录。

3.4 ConfigClusterResolver

com.netflix.discovery.shared.resolver.aws.ConfigClusterResolver ,基于配置文件的集群解析器。类属性代码如下:

public class ConfigClusterResolver implements ClusterResolver<AwsEndpoint> {private final EurekaClientConfig clientConfig;private final InstanceInfo myInstanceInfo;public ConfigClusterResolver(EurekaClientConfig clientConfig, InstanceInfo myInstanceInfo) {this.clientConfig = clientConfig;this.myInstanceInfo = myInstanceInfo;}
}

#getClusterEndpoints(...) 方法,实现代码如下:

 1: @Override2: public List<AwsEndpoint> getClusterEndpoints() {3:     // 使用 DNS 获取 EndPoint4:     if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {5:         if (logger.isInfoEnabled()) {6:             logger.info("Resolving eureka endpoints via DNS: {}", getDNSName());7:         }8:         return getClusterEndpointsFromDns();9:     } else {
10:     // 直接配置实际访问地址
11:         logger.info("Resolving eureka endpoints via configuration");
12:         return getClusterEndpointsFromConfig();
13:     }
14: }
  • 第 3 至 8 行 :基于 DNS 获取 EndPoint 集群,调用 #getClusterEndpointsFromDns() 方法,实现代码如下:
private List<AwsEndpoint> getClusterEndpointsFromDns() {String discoveryDnsName = getDNSName(); // 获取 集群根地址int port = Integer.parseInt(clientConfig.getEurekaServerPort()); // 端口// cheap enough so just re-useDnsTxtRecordClusterResolver dnsResolver = new DnsTxtRecordClusterResolver(getRegion(),discoveryDnsName,true, // 解析 zoneport,false,clientConfig.getEurekaServerURLContext());// 调用 DnsTxtRecordClusterResolver 解析 EndPointList<AwsEndpoint> endpoints = dnsResolver.getClusterEndpoints();if (endpoints.isEmpty()) {logger.error("Cannot resolve to any endpoints for the given dnsName: {}", discoveryDnsName);}return endpoints;
}private String getDNSName() {return "txt." + getRegion() + '.' + clientConfig.getEurekaServerDNSName();
}
    • 必须配置 eureka.shouldUseDns=true ,开启基于 DNS 获取 EndPoint 集群。
    • 必须配置 eureka.eurekaServer.domainName=${xxxxx} ,配置集群根地址。
    • 选填配 eureka.eurekaServer.port ,eureka.eurekaServer.context 。
    • 从代码中我们可以看出,使用 DnsTxtRecordClusterResolver 解析出 EndPoint 集群。
  • 第 9 至 13 行 :直接配置文件填写实际 EndPoint 集群,调用 #getClusterEndpointsFromConfig() 方法,实现代码如下:

1: private List<AwsEndpoint> getClusterEndpointsFromConfig() {2:     // 获得 可用区3:     String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());4:     // 获取 应用实例自己 的 可用区5:     String myZone = InstanceInfo.getZone(availZones, myInstanceInfo);6:     // 获得 可用区与 serviceUrls 的映射7:     Map<String, List<String>> serviceUrls = EndpointUtils.getServiceUrlsMapFromConfig(clientConfig, myZone, clientConfig.shouldPreferSameZoneEureka());8:     // 拼装 EndPoint 集群结果9:     List<AwsEndpoint> endpoints = new ArrayList<>();
10:     for (String zone : serviceUrls.keySet()) {
11:         for (String url : serviceUrls.get(zone)) {
12:             try {
13:                 endpoints.add(new AwsEndpoint(url, getRegion(), zone));
14:             } catch (Exception ignore) {
15:                 logger.warn("Invalid eureka server URI: {}; removing from the server pool", url);
16:             }
17:         }
18:     }
19:
20:     // 打印日志,EndPoint 集群
21:     if (logger.isDebugEnabled()) {
22:         logger.debug("Config resolved to {}", endpoints);
23:     }
24:     // 打印日志,解析结果为空
25:     if (endpoints.isEmpty()) {
26:         logger.error("Cannot resolve to any endpoints from provided configuration: {}", serviceUrls);
27:     }
28:
29:     return endpoints;
30: }
  • 第 3 行 :获得可用区数组。通过 eureka.${REGION}.availabilityZones 配置。

  • 第 5 行 :调用 InstanceInfo#getZone(...) 方法,获得应用实例自己所在的可用区zone )。非亚马逊 AWS 环境下,可用区数组的第一个元素就是应用实例自己所在的可用区

  • 第 7 行 :调用 EndpointUtils#getServiceUrlsMapFromConfig(...) 方法,获得可用区与 serviceUrls 的映射。实现代码如下:

// EndpointUtils.java1: public static Map<String, List<String>> getServiceUrlsMapFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) {2:     Map<String, List<String>> orderedUrls = new LinkedHashMap<>(); // key:zone;value:serviceUrls3:     // 获得 应用实例的 地区( region )4:     String region = getRegion(clientConfig);5:     // 获得 应用实例的 可用区6:     String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());7:     if (availZones == null || availZones.length == 0) {8:         availZones = new String[1];9:         availZones[0] = DEFAULT_ZONE;10:     }11:     logger.debug("The availability zone for the given region {} are {}", region, Arrays.toString(availZones));12:     // 获得 开始位置13:     int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);14:     // 将 开始位置 的 serviceUrls 添加到结果15:     String zone = availZones[myZoneOffset];16:     List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);17:     if (serviceUrls != null) {18:         orderedUrls.put(zone, serviceUrls);19:     }20:     // 从开始位置顺序遍历剩余的 serviceUrls 添加到结果21:     int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1);22:     while (currentOffset != myZoneOffset) {23:         zone = availZones[currentOffset];24:         serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);25:         if (serviceUrls != null) {26:             orderedUrls.put(zone, serviceUrls);27:         }28:         if (currentOffset == (availZones.length - 1)) {29:             currentOffset = 0;30:         } else {31:             currentOffset++;32:         }33:     }34: 35:     // 为空,报错36:     if (orderedUrls.size() < 1) {37:         throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!");38:     }39:     return orderedUrls;40: }
  • 第 13 行 :获得开始位置。实现代码如下:
private static int getZoneOffset(String myZone, boolean preferSameZone, String[] availZones) {for (int i = 0; i < availZones.length; i++) {if (myZone != null && (availZones[i].equalsIgnoreCase(myZone.trim()) == preferSameZone)) {return i;}}logger.warn("DISCOVERY: Could not pick a zone based on preferred zone settings. My zone - {}," +" preferSameZone- {}. Defaulting to " + availZones[0], myZone, preferSameZone);return 0;
}
      • 当方法参数 preferSameZone=true ,即 eureka.preferSameZone=true( 默认值 :true ) 时,开始位置为可用区数组( availZones )的第一个和应用实例所在的可用区( myZone )【相等】元素的位置。
      • 当方法参数 preferSameZone=false ,即 eureka.preferSameZone=false( 默认值 :true ) 时,开始位置为可用区数组( availZones )的第一个和应用实例所在的可用区( myZone )【不相等】元素的位置。
    • 第 20 至 33 行 :从开始位置顺序将剩余的可用区的 serviceUrls 添加到结果。顺序理解如下图:

  • 第 9 至 18 行 :拼装 EndPoint 集群结果。

3.5 ZoneAffinityClusterResolver

com.netflix.discovery.shared.resolver.aws.ZoneAffinityClusterResolver ,使用可用区亲和的集群解析器。类属性代码如下:

public class ZoneAffinityClusterResolver implements ClusterResolver<AwsEndpoint> {private static final Logger logger = LoggerFactory.getLogger(ZoneAffinityClusterResolver.class);/*** 委托的解析器* 目前代码里为 {@link ConfigClusterResolver}*/private final ClusterResolver<AwsEndpoint> delegate;/*** 应用实例的可用区*/private final String myZone;/*** 是否可用区亲和*/private final boolean zoneAffinity;public ZoneAffinityClusterResolver(ClusterResolver<AwsEndpoint> delegate, String myZone, boolean zoneAffinity) {this.delegate = delegate;this.myZone = myZone;this.zoneAffinity = zoneAffinity;}
}
  • 属性 delegate ,委托的解析器。目前代码里使用的是 ConfigClusterResolver 。
  • 属性 zoneAffinity ,是否可用区亲和。
    • true :EndPoint 可用区为本地的优先被放在前面。
    • false :EndPoint 可用区非本地的优先被放在前面。

#getClusterEndpoints(...) 方法,实现代码如下:

1: @Override2: public List<AwsEndpoint> getClusterEndpoints() {3:     // 拆分成 本地的可用区和非本地的可用区的 EndPoint 集群4:     List<AwsEndpoint>[] parts = ResolverUtils.splitByZone(delegate.getClusterEndpoints(), myZone);5:     List<AwsEndpoint> myZoneEndpoints = parts[0];6:     List<AwsEndpoint> remainingEndpoints = parts[1];7:     // 随机打乱 EndPoint 集群并进行合并8:     List<AwsEndpoint> randomizedList = randomizeAndMerge(myZoneEndpoints, remainingEndpoints);9:     // 非可用区亲和,将非本地的可用区的 EndPoint 集群放在前面
10:     if (!zoneAffinity) {
11:         Collections.reverse(randomizedList);
12:     }
13:
14:     if (logger.isDebugEnabled()) {
15:         logger.debug("Local zone={}; resolved to: {}", myZone, randomizedList);
16:     }
17:
18:     return randomizedList;
19: }
  • 第 2 行 :调用 ClusterResolver#getClusterEndpoints() 方法,获得 EndPoint 集群。再调用 ResolverUtils#splitByZone(...) 方法,拆分成本地非本地的可用区的 EndPoint 集群,点击链接查看实现。

  • 第 8 行 :调用 #randomizeAndMerge(...) 方法,分别随机打乱每个 EndPoint 集群,并进行合并数组,实现代码如下:

// ZoneAffinityClusterResolver.java
private static List<AwsEndpoint> randomizeAndMerge(List<AwsEndpoint> myZoneEndpoints, List<AwsEndpoint> remainingEndpoints) {if (myZoneEndpoints.isEmpty()) {return ResolverUtils.randomize(remainingEndpoints); // 打乱}if (remainingEndpoints.isEmpty()) {return ResolverUtils.randomize(myZoneEndpoints); // 打乱}List<AwsEndpoint> mergedList = ResolverUtils.randomize(myZoneEndpoints); // 打乱mergedList.addAll(ResolverUtils.randomize(remainingEndpoints)); // 打乱return mergedList;
}// ResolverUtils.java
public static <T extends EurekaEndpoint> List<T> randomize(List<T> list) {// 数组大小为 0 或者 1 ,不进行打乱List<T> randomList = new ArrayList<>(list);if (randomList.size() < 2) {return randomList;}// 以本地IP为随机种子,有如下好处:// 多个主机,实现对同一个 EndPoint 集群负载均衡的效果。// 单个主机,同一个 EndPoint 集群按照固定顺序访问。Eureka-Server 不是强一致性的注册中心,Eureka-Client 对同一个 Eureka-Server 拉取注册信息,保证两者之间增量同步的一致性。Random random = new Random(LOCAL_IPV4_ADDRESS.hashCode());int last = randomList.size() - 1;for (int i = 0; i < last; i++) {int pos = random.nextInt(randomList.size() - i);if (pos != i) {Collections.swap(randomList, i, pos);}}return randomList;
}
    • 注意,ResolverUtils#randomize(...) 使用以本机IP为随机种子,有如下好处:

      • 多个主机,实现对同一个 EndPoint 集群负载均衡的效果。
      • 单个主机,同一个 EndPoint 集群按照固定顺序访问。Eureka-Server 不是强一致性的注册中心,Eureka-Client 对同一个 Eureka-Server 拉取注册信息,保证两者之间增量同步的一致性。
  • 第 10 至 12 行 :非可用区亲和,将非本地的可用区的 EndPoint 集群放在前面。

3.6 AsyncResolver

com.netflix.discovery.shared.resolver.AsyncResolver ,异步执行解析的集群解析器。AsyncResolver 属性较多,而且复杂的多,我们拆分到具体方法里分享。

3.6.1 定时任务

AsyncResolver 内置定时任务,定时刷新 EndPoint 集群解析结果。

为什么要刷新?例如,Eureka-Server 的 serviceUrls 基于 DNS 配置。

定时任务代码如下

/**
* 是否已经调度定时任务 {@link #updateTask}
*/
private final AtomicBoolean scheduled = new AtomicBoolean(false);
/**
* 委托的解析器
* 目前代码为 {@link com.netflix.discovery.shared.resolver.aws.ZoneAffinityClusterResolver}
*/
private final ClusterResolver<T> delegate;
/**
* 定时服务
*/
private final ScheduledExecutorService executorService;
/**
* 线程池执行器
*/
private final ThreadPoolExecutor threadPoolExecutor;
/**
* 后台任务
* 定时解析 EndPoint 集群
*/
private final TimedSupervisorTask backgroundTask;
/**
* 解析 EndPoint 集群结果
*/
private final AtomicReference<List<T>> resultsRef;/**
* 定时解析 EndPoint 集群的频率
*/
private final int refreshIntervalMs;
/**
* 预热超时时间,单位:毫秒
*/
private final int warmUpTimeoutMs;// Metric timestamp, tracking last time when data were effectively changed.
private volatile long lastLoadTimestamp = -1;AsyncResolver(String name,ClusterResolver<T> delegate,List<T> initialValue,int executorThreadPoolSize,int refreshIntervalMs,int warmUpTimeoutMs) {this.name = name;this.delegate = delegate;this.refreshIntervalMs = refreshIntervalMs;this.warmUpTimeoutMs = warmUpTimeoutMs;// 初始化 定时服务this.executorService = Executors.newScheduledThreadPool(1, // 线程大小=1new ThreadFactoryBuilder().setNameFormat("AsyncResolver-" + name + "-%d").setDaemon(true).build());// 初始化 线程池执行器this.threadPoolExecutor = new ThreadPoolExecutor(1, // 线程大小=1executorThreadPoolSize, 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),  // use direct handoffnew ThreadFactoryBuilder().setNameFormat("AsyncResolver-" + name + "-executor-%d").setDaemon(true).build());// 初始化 后台任务this.backgroundTask = new TimedSupervisorTask(this.getClass().getSimpleName(),executorService,threadPoolExecutor,refreshIntervalMs,TimeUnit.MILLISECONDS,5,updateTask);this.resultsRef = new AtomicReference<>(initialValue);Monitors.registerObject(name, this);
}

backgroundTask ,后台任务,定时解析 EndPoint 集群。

  • TimedSupervisorTask ,在 《Eureka 源码解析 —— 应用实例注册发现(二)之续租》「2.3 TimedSupervisorTask」 有详细解析。

  • updateTask 实现代码如下:

private final Runnable updateTask = new Runnable() {@Overridepublic void run() {try {List<T> newList = delegate.getClusterEndpoints(); // 调用 委托的解析器 解析 EndPoint 集群if (newList != null) {resultsRef.getAndSet(newList);lastLoadTimestamp = System.currentTimeMillis();} else {logger.warn("Delegate returned null list of cluster endpoints");}logger.debug("Resolved to {}", newList);} catch (Exception e) {logger.warn("Failed to retrieve cluster endpoints from the delegate", e);}}
};
  • delegate ,委托的解析器,目前代码为 ZoneAffinityClusterResolver。
  • 后台任务的发起在 #getClusterEndpoints() 方法,在 「3.6.2 解析 EndPoint 集群」 详细解析。

3.6.2 解析 EndPoint 集群

调用 #getClusterEndpoints() 方法,解析 EndPoint 集群,实现代码如下:

1: @Override2: public List<T> getClusterEndpoints() {3:     long delay = refreshIntervalMs;4:     // 若未预热解析 EndPoint 集群结果,进行预热5:     if (warmedUp.compareAndSet(false, true)) {6:         if (!doWarmUp()) {7:             delay = 0; // 预热失败,取消定时任务的第一次延迟8:         }9:     }
10:     // 若未调度定时任务,进行调度
11:     if (scheduled.compareAndSet(false, true)) {
12:         scheduleTask(delay);
13:     }
14:     // 返回 EndPoint 集群
15:     return resultsRef.get();
16: }
  • 第 5 至 9 行 :若未预热解析 EndPoint 集群结果,调用 #doWarmUp() 方法,进行预热。若预热失败,取消定时任务的第一次延迟。#doWarmUp() 方法实现代码如下:
boolean doWarmUp() {Future future = null;try {future = threadPoolExecutor.submit(updateTask);future.get(warmUpTimeoutMs, TimeUnit.MILLISECONDS);  // block until done or timeoutreturn true;} catch (Exception e) {logger.warn("Best effort warm up failed", e);} finally {if (future != null) {future.cancel(true);}}return false;
}
    • 调用 updateTask ,解析 EndPoint 集群。
  • 第 10 至 13 行 : 若未调度定时任务,进行调度,调用 #scheduleTask() 方法,实现代码如下:

void scheduleTask(long delay) {executorService.schedule(backgroundTask, delay, TimeUnit.MILLISECONDS);
}
  • 第 15 行 :返回 EndPoint 集群。当第一次预热失败,会返回空,直到定时任务获得到结果

4. 初始化解析器

Eureka-Client 在初始化时,调用 DiscoveryClient#scheduleServerEndpointTask() 方法,初始化 AsyncResolver 解析器。实现代码如下:

private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,AbstractDiscoveryClientOptionalArgs args) {// ... 省略无关代码// 创建 EndPoint 解析器eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver(clientConfig,transportConfig,eurekaTransport.transportClientFactory,applicationInfoManager.getInfo(),applicationsSource);// ... 省略无关代码
}
  • 调用 EurekaHttpClients#newBootstrapResolver(...) 方法,创建 EndPoint 解析器,实现代码如下:
1: public static final String COMPOSITE_BOOTSTRAP_STRATEGY = "composite";2: 3: public static ClosableResolver<AwsEndpoint> newBootstrapResolver(4:         final EurekaClientConfig clientConfig,5:         final EurekaTransportConfig transportConfig,6:         final TransportClientFactory transportClientFactory,7:         final InstanceInfo myInstanceInfo,8:         final ApplicationsResolver.ApplicationsSource applicationsSource)9: {
10:     if (COMPOSITE_BOOTSTRAP_STRATEGY.equals(transportConfig.getBootstrapResolverStrategy())) {
11:         if (clientConfig.shouldFetchRegistry()) {
12:             return compositeBootstrapResolver(
13:                     clientConfig,
14:                     transportConfig,
15:                     transportClientFactory,
16:                     myInstanceInfo,
17:                     applicationsSource
18:             );
19:         } else {
20:             logger.warn("Cannot create a composite bootstrap resolver if registry fetch is disabled." +
21:                     " Falling back to using a default bootstrap resolver.");
22:         }
23:     }
24:
25:     // if all else fails, return the default
26:     return defaultBootstrapResolver(clientConfig, myInstanceInfo);
27: }
28:
29: /**
30:  * @return a bootstrap resolver that resolves eureka server endpoints based on either DNS or static config,
31:  *         depending on configuration for one or the other. This resolver will warm up at the start.
32:  */
33: static ClosableResolver<AwsEndpoint> defaultBootstrapResolver(final EurekaClientConfig clientConfig,
34:                                                               final InstanceInfo myInstanceInfo) {
35:     // 获得 可用区集合
36:     String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
37:     // 获得 应用实例的 可用区
38:     String myZone = InstanceInfo.getZone(availZones, myInstanceInfo);
39:
40:     // 创建 ZoneAffinityClusterResolver
41:     ClusterResolver<AwsEndpoint> delegateResolver = new ZoneAffinityClusterResolver(
42:             new ConfigClusterResolver(clientConfig, myInstanceInfo),
43:             myZone,
44:             true
45:     );
46:
47:     // 第一次 EndPoint 解析
48:     List<AwsEndpoint> initialValue = delegateResolver.getClusterEndpoints();
49:
50:     // 解析不到 Eureka-Server EndPoint ,快速失败
51:     if (initialValue.isEmpty()) {
52:         String msg = "Initial resolution of Eureka server endpoints failed. Check ConfigClusterResolver logs for more info";
53:         logger.error(msg);
54:         failFastOnInitCheck(clientConfig, msg);
55:     }
56:
57:     // 创建 AsyncResolver
58:     return new AsyncResolver<>(
59:             EurekaClientNames.BOOTSTRAP,
60:             delegateResolver,
61:             initialValue,
62:             1,
63:             clientConfig.getEurekaServiceUrlPollIntervalSeconds() * 1000
64:     );
65: }
* 第 10 至 23 行 :组合解析器,用于 Eureka 1.x 对 Eureka 2.x 的兼容配置,暂时不需要了解。TODO[0028]写入集群和读取集群
* 第 26 行 :调用 `#defaultBootstrapResolver()` 方法,创建默认的解析器 AsyncResolver 。
* 第 40 至 45 行 :创建 ZoneAffinityClusterResolver 。在 ZoneAffinityClusterResolver 构造方法的参数,我们看到创建 ConfigClusterResolver 作为 `delegate` 参数。
* 第 48 行 :调用 `ZoneAffinityClusterResolver#getClusterEndpoints()` 方法,**第一次 Eureka-Server EndPoint 集群解析**。
* 第 51 至 55 行 :解析不到 Eureka-Server EndPoint 集群时,可以通过配置( `eureka.experimental.clientTransportFailFastOnInit=true` ),使 Eureka-Client 初始化失败。`#failFastOnInitCheck(...)` 方法,实现代码如下:

* 第 10 至 23 行 :组合解析器,用于 Eureka 1.x 对 Eureka 2.x 的兼容配置,暂时不需要了解。TODO[0028]写入集群和读取集群
* 第 26 行 :调用 `#defaultBootstrapResolver()` 方法,创建默认的解析器 AsyncResolver 。
* 第 40 至 45 行 :创建 ZoneAffinityClusterResolver 。在 ZoneAffinityClusterResolver 构造方法的参数,我们看到创建 ConfigClusterResolver 作为 `delegate` 参数。
* 第 48 行 :调用 `ZoneAffinityClusterResolver#getClusterEndpoints()` 方法,**第一次 Eureka-Server EndPoint 集群解析**。
* 第 51 至 55 行 :解析不到 Eureka-Server EndPoint 集群时,可以通过配置( `eureka.experimental.clientTransportFailFastOnInit=true` ),使 Eureka-Client 初始化失败。`#failFastOnInitCheck(...)` 方法,实现代码如下:

// potential future feature, guarding with experimental flag for now
private static void failFastOnInitCheck(EurekaClientConfig clientConfig, String msg) {if ("true".equals(clientConfig.getExperimental("clientTransportFailFastOnInit"))) {throw new RuntimeException(msg);}
}
  • 第 58 至 64 行 :创建 AsyncResolver 。从代码上,我们可以看到,AsyncResolver.resultsRef 属性一开始已经用 initialValue 传递给 AsyncResolver 构造方法。实现代码如下:
public AsyncResolver(String name,ClusterResolver<T> delegate,List<T> initialValues,int executorThreadPoolSize,int refreshIntervalMs) {this(name,delegate,initialValues,executorThreadPoolSize,refreshIntervalMs,0);// 设置已经预热warmedUp.set(true);
}

Eureka 源码解析 —— EndPoint 与 解析器相关推荐

  1. 4、Eureka 源码解析 之 Eureka Client 启动原理分析

    在前面的一篇文章 3.Eureka 源码解析 之 Eureka Server 启动原理分析当中我们分析了一下 Eureka Server 的启动.在集群环境下 Eureka Server 相互之前需要 ...

  2. 注册中心 Eureka 源码解析 —— 应用实例注册发现(五)之过期

    2019独角兽企业重金招聘Python工程师标准>>> 摘要: 原创出处 http://www.iocoder.cn/Eureka/instance-registry-evict/ ...

  3. MyBatis 源码分析 - 映射文件解析过程

    1.简介 在上一篇文章中,我详细分析了 MyBatis 配置文件的解析过程.由于上一篇文章的篇幅比较大,加之映射文件解析过程也比较复杂的原因.所以我将映射文件解析过程的分析内容从上一篇文章中抽取出来, ...

  4. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [三] 上一章:[flink]Flink 1.12.2 源码浅析 : yarn-per-job模式解析 yar ...

  5. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 从脚本到主类

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [一] 可以去看原文.这里是补充专栏.请看原文 2. 前言 主要针对yarn-per-job模式进行代码分析. ...

  6. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [四] 上一篇: [flink]Flink 1.12.2 源码浅析 : yarn-per-job模式解析 Jo ...

  7. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [二] 请大家看原文去. 接上文Flink 1.12.2 源码分析 : yarn-per-job模式浅析 [一 ...

  8. Android Fragment 从源码的角度去解析(上)

    ###1.概述 本来想着昨天星期五可以早点休息,今天可以早点起来跑步,可没想到事情那么的多,晚上有人问我主页怎么做到点击才去加载Fragment数据,而不是一进入主页就去加载所有的数据,在这里自己就对 ...

  9. [darknet源码系列-2] darknet源码中的cfg解析

    [darknet源码系列-2] darknet源码中的cfg解析 FesianXu 20201118 at UESTC 前言 笔者在[1]一文中简单介绍了在darknet中常见的数据结构,本文继续上文 ...

最新文章

  1. tf.variable_scope 参数
  2. 头脑风暴是如何毁掉创意的?
  3. linux命令之vmstat
  4. 手机端h5页面字体大小适配
  5. QML-关于Qt.rgba()颜色无法正常显示问题
  6. RTSP鉴权认证之基础认证和摘要认证
  7. 【转载】一百年后,人类怎样编程?
  8. Attension Mechanism模型的详细介绍,原理、分类及应用
  9. API/POSIX/C库的区别与联系
  10. Unity3D基础7:Transform组件
  11. 银行系统开发 经验谈
  12. http请求接口开发的几种方式
  13. 如何在html中插入乘积函数,excel乘法怎么保留两位小数
  14. SiamRPN代码讲解,推理测试讲解
  15. php支付宝 缺少签名参数,【支付宝】退款接口 报 “缺少签名参数”
  16. [乱七八糟]Google搜索秘籍
  17. 关于“Threats to Validity”的介绍
  18. 计算机锁定不能强制选项无法关机,Win10无法关机只能按电源强制关机的解决方法...
  19. 开源SWD脱机烧录器-第一章 软硬件配置及其初始化
  20. Detectron2安装踩坑记录(比较详细版)

热门文章

  1. linux kernel --- dts的相关操作函数
  2. 证件号通用脱敏、名称脱敏
  3. 跟jQuery那样简单方便操作Html文档的Java工具类,今天我必须要告知你!
  4. 我的stm32学习——串口屏的使用
  5. MoonShell2皮肤文件(.skn) 结构解析
  6. 1024程序员节,琪琪酱来送祝福啦!!
  7. JAVA求出长方形的体积
  8. Dijkstra(迪杰斯特拉)算法:求给定顶点到其余各顶点的最短路径
  9. Java 下载解析Excel文件 报错:jxl.read.biff.BiffException: Unable to recognize OLE stream
  10. 【Database System Concept 7th】Chapter 2读书笔记