“不积跬步,无以至千里。”

Eureka client初始化的时候,就会自动发送个请求到Eureka server一次性抓取全量的注册表,这一篇文章,我们来看看Eureka server端是如何处理抓取全量注册表的请求的,首先我们知道,Eureka client发送的请求是类似:http://localhost:8080/v2/apps/,GET请求。

server这一端,我们来到ApplicationsResource这个类,通过匹配url确定getContainers方法就是处理抓取全量注册表请求的。

@GET
public Response getContainers(@PathParam("version") String version,@HeaderParam(HEADER_ACCEPT) String acceptHeader,@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,@Context UriInfo uriInfo,@Nullable @QueryParam("regions") String regionsStr) {boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();String[] regions = null;if (!isRemoteRegionRequested) {EurekaMonitors.GET_ALL.increment();} else {regions = regionsStr.toLowerCase().split(",");Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();}// Check if the server allows the access to the registry. The server can// restrict access if it is not// ready to serve traffic depending on various reasons.if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {return Response.status(Status.FORBIDDEN).build();}CurrentRequestVersion.set(Version.toEnum(version));KeyType keyType = Key.KeyType.JSON;String returnMediaType = MediaType.APPLICATION_JSON;if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {keyType = Key.KeyType.XML;returnMediaType = MediaType.APPLICATION_XML;}Key cacheKey = new Key(Key.EntityType.Application,ResponseCacheImpl.ALL_APPS,keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions);Response response;if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {response = Response.ok(responseCache.getGZIP(cacheKey)).header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE).header(HEADER_CONTENT_TYPE, returnMediaType).build();} else {response = Response.ok(responseCache.get(cacheKey)).build();}CurrentRequestVersion.remove();return response;
}

这里说一个我源码阅读的小技巧:抓大放小,把握住源码的整体架构核心流程复杂机制,不要沉迷于细节。源码里会有很多琐碎的细节,看不懂很正常,没必要强迫自己把所有的细节都扣死,那是跟自己过不去,只会让你更加快速的放弃阅读源码。

但是怎么把握看源码的重点呢,首先你要对这个技术有一定的知识积累,要对其核心原理,底层实现知道是怎么回事,这样你在看源码的时候,看见一些关键字,方法名,上下文语句,才知道哪里是重点;否则你写上几个demo,就大言不惭的要来看看源码,那不是欺骗自己吗,别搞笑!

我们之前说过,eureka server端有一套设计巧妙的多级缓存机制,这个cacheKey很可能就是缓存的key

Key cacheKey = new Key(Key.EntityType.Application,ResponseCacheImpl.ALL_APPS,keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions);

ResponseCache,就是eureka server端的缓存机制

private final ResponseCache responseCache;
... ...
response = Response.ok(responseCache.get(cacheKey)).build();

eureka server的多级缓存机制,其实是用了两个map,来做两级缓存,俗称RO缓存(只读缓存map)和RW缓存(读写缓存map),先从只读缓存里去读,如果没有的话,会从读写缓存里去读,如果还是没有就会从eureka server的注册表中去读取。

public String get(final Key key) {return get(key, shouldUseReadOnlyResponseCache);
}
@VisibleForTesting
String get(final Key key, boolean useReadOnlyCache) {Value payload = getValue(key, useReadOnlyCache);if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {return null;} else {return payload.getPayload();}
}

这里useReadOnlyCache默认是true,所有会使用RO缓存,如果RO缓存中有数据,会直接获取到数据(注册表)进行返回。

@VisibleForTesting
Value getValue(final Key key, boolean useReadOnlyCache) {Value payload = null;try {if (useReadOnlyCache) {final Value currentPayload = readOnlyCacheMap.get(key);if (currentPayload != null) {payload = currentPayload;} else {payload = readWriteCacheMap.get(key);readOnlyCacheMap.put(key, payload);}} else {payload = readWriteCacheMap.get(key);}} catch (Throwable t) {logger.error("Cannot get value for key : {}", key, t);}return payload;
}

可以看到,RO缓存的数据结构是一个ConcurrentHashMap,而RW缓存是一个LoadingCache,为什么是一个Cache,后面增量拉取注册表信息的时候详细讲。

private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
private final LoadingCache<Key, Value> readWriteCacheMap;

如果RO缓存中读取到的为null,就会从RW缓存中读取,并在RO缓存中放一份

payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);

这个RW cache中的数据,会在ResponseCache对象的构造方法中完成初始化,实际上是从注册表中获取Applications,ServerCodecs等等这些组件,将Applications序列化成一个json字符串,放入RW缓存。

this.readWriteCacheMap =CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache()).expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS).removalListener(new RemovalListener<Key, Value>() {@Overridepublic void onRemoval(RemovalNotification<Key, Value> notification) {Key removedKey = notification.getKey();if (removedKey.hasRegions()) {Key cloneWithNoRegions = removedKey.cloneWithoutRegions();regionSpecificKeys.remove(cloneWithNoRegions, removedKey);}}}).build(new CacheLoader<Key, Value>() {@Overridepublic Value load(Key key) throws Exception {if (key.hasRegions()) {Key cloneWithNoRegions = key.cloneWithoutRegions();regionSpecificKeys.put(cloneWithNoRegions, key);}//实际就是从注册表registry中读取数据Value value = generatePayload(key);return value;}});

那么这个ResponseCache是在哪里创建并完成初始化的呢?
答案是:eureka server初始化。
找了一下EurekaBootStrap的初始化流程那里,发现调用流程如下:EurekaBootStrap#initEurekaServerContext() --> serverContext.initialize() --> registry.init(peerEurekaNodes)

serverContext = new DefaultEurekaServerContext(eurekaServerConfig,serverCodecs,registry,peerEurekaNodes,applicationInfoManager
);
EurekaServerContextHolder.initialize(serverContext);
serverContext.initialize();
public void initialize() {logger.info("Initializing ...");peerEurekaNodes.start();try {registry.init(peerEurekaNodes);} catch (Exception e) {throw new RuntimeException(e);}logger.info("Initialized");
}

多级缓存的Cache就是在这个initializedResponseCache()方法里初始化的

@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {this.numberOfReplicationsLastMin.start();this.peerEurekaNodes = peerEurekaNodes;//初始化Eureka server的多级缓存initializedResponseCache();scheduleRenewalThresholdUpdateTask();initRemoteRegionRegistry();try {Monitors.registerObject(this);} catch (Throwable e) {logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);}
}
@Override
public synchronized void initializedResponseCache() {if (responseCache == null) {responseCache = new ResponseCacheImpl(serverConfig, serverCodecs, this);}}

registry.getApplications(),这里就是从注册表中获取所有的服务实例信息,然后通过getPayLoad方法中的编解码组件serverCodecs完成序列化,并返回。

private Value generatePayload(Key key) {Stopwatch tracer = null;try {String payload;switch (key.getEntityType()) {case Application:boolean isRemoteRegionRequested = key.hasRegions();//第一次全量拉取if (ALL_APPS.equals(key.getName())) {if (isRemoteRegionRequested) {tracer = serializeAllAppsWithRemoteRegionTimer.start();payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));} else {tracer = serializeAllAppsTimer.start();payload = getPayLoad(key, registry.getApplications());}} else if (/**后续增量拉取*/ALL_APPS_DELTA.equals(key.getName())) {if (isRemoteRegionRequested) {tracer = serializeDeltaAppsWithRemoteRegionTimer.start();versionDeltaWithRegions.incrementAndGet();versionDeltaWithRegionsLegacy.incrementAndGet();payload = getPayLoad(key,registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));} else {tracer = serializeDeltaAppsTimer.start();versionDelta.incrementAndGet();versionDeltaLegacy.incrementAndGet();payload = getPayLoad(key, registry.getApplicationDeltas());}} else {tracer = serializeOneApptimer.start();payload = getPayLoad(key, registry.getApplication(key.getName()));}break;... ...
}
private String getPayLoad(Key key, Applications apps) {EncoderWrapper encoderWrapper = serverCodecs.getEncoder(key.getType(), key.getEurekaAccept());String result;try {result = encoderWrapper.encode(apps);} catch (Exception e) {logger.error("Failed to encode the payload for all apps", e);return "";}if(logger.isDebugEnabled()) {logger.debug("New application cache entry {} with apps hashcode {}", key.toStringCompact(), apps.getAppsHashCode());}return result;
}

看到这里,我猜你一定想看registry.getApplications()方法的实现细节,其实用屁股想想也知道,它一定是在这里遍历注册表那个ConcurrentHashmap,然后封装成Application,添加到Applications,并把Applications返回的。

public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",includeRemoteRegion, remoteRegions);if (includeRemoteRegion) {GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();} else {GET_ALL_CACHE_MISS.increment();}Applications apps = new Applications();apps.setVersion(1L);for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {Application app = null;if (entry.getValue() != null) {for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {Lease<InstanceInfo> lease = stringLeaseEntry.getValue();if (app == null) {/将LeaseInfo封装成Applicationapp = new Application(lease.getHolder().getAppName());}//将封装好的Application添加到Applications里面app.addInstance(decorateInstanceInfo(lease));}}if (app != null) {apps.addApplication(app);}}... ...apps.setAppsHashCode(apps.getReconcileHashCode());//将Applications返回return apps;
}

这里面有很多代码的细节,没有贴出来,主要是因为如果把所有的点写出来,一来,篇幅体量较大,不利于阅读核心架构原理;二来,细节太多,篇幅大,很快就会把人看晕,最后抓不住重点,难以理解源码核心设计理念,切记,抓大放小,聚焦核心,细节在需要用的时候细看即可。

Eureka源码深度刨析-(5)EurekaServer处理服务发现相关推荐

  1. Metis异常检测模型训练源码深入刨析

    Metis异常检测模型训练源码深入刨析 模型训练 数据集说明 process_train 方法(detect_service.py) __generate_model方法(detect_service ...

  2. Eureka源码深度解析(上)

    前言: Eureka作为一个服务注册中心,主要功能就是服务注册与服务发现. 微服务框架中,服务注册与发现既是基础功能也是核心功能.     Eureka分为服务端和客户端. 服务端也称为服务注册中心, ...

  3. 注册中心 Eureka 源码解析 —— 应用实例注册发现(五)之过期

    2019独角兽企业重金招聘Python工程师标准>>> 摘要: 原创出处 http://www.iocoder.cn/Eureka/instance-registry-evict/ ...

  4. 微服务发现与注册之Eureka源码分析

    作者:陌北有棵树,Java人,架构师社区合伙人! [一]微服务之服务发现概述 关于微服务,近年来可谓是大火,业界也吹刮着一种实践微服务的风潮.本人有幸在去年参与到一个向微服务过渡的产品,再结合自己所学 ...

  5. Eureka源码分析

    Eureka源码分析 Eureka server 入口: Spring.factories PS: 意味着如果加载EurekaServerAutoConfiguration成功,需要 @Conditi ...

  6. Spring Cloud Eureka 源码分析(一) 服务端启动过程

    2019独角兽企业重金招聘Python工程师标准>>> 一. 前言 我们在使用Spring Cloud Eureka服务发现功能的时候,简单的引入maven依赖,且在项目入口类根据服 ...

  7. 4、Eureka 源码解析 之 Eureka Client 启动原理分析

    在前面的一篇文章 3.Eureka 源码解析 之 Eureka Server 启动原理分析当中我们分析了一下 Eureka Server 的启动.在集群环境下 Eureka Server 相互之前需要 ...

  8. 【SpringCloud微服务】第3章 服务治理SpringCloudEureka(五)——Eureka源码分析

    2.8 Eureka 源码分析   首先,对于服务注册中心.服务提供者.服务消费者这三个主要元素来说,后两者(也就是Eureka客户端)在整个运行机制中是大部分通信行为的主动发起者,而注册中心主要是处 ...

  9. Eureka 源码解析 —— EndPoint 与 解析器

    1. 概述 本文主要分享 EndPoint 与 解析器. EndPoint ,服务端点.例如,Eureka-Server 的访问地址. EndPoint 解析器,将配置的 Eureka-Server ...

  10. libevent源码深度剖析

    原文地址:http://blog.csdn.net/sparkliang/article/details/4957667 libevent源码深度剖析一 --序幕 张亮 1 前言 Libevent是一 ...

最新文章

  1. 坚果3“凉了”,罗永浩只提了一次人工智能
  2. layer-list简单使用以及shape的定义
  3. Mozilla Labs Apps Developer Preview发布了
  4. cxgrid列高度行宽度定义
  5. Android RotateAnimation详解
  6. Streams全库复制
  7. Ubuntu20.04 更新后黑屏无法加载驱动
  8. 一种结合颜色特征和区域生长的疾病斑图像分割方法(复杂环境下分割效果好)
  9. 易语言5.4一键破解工具
  10. 【第36期】游戏策划:新手入行的切入点在哪?
  11. Swagger注解传参
  12. Android 添加水印View
  13. CSS 基础3(内边距、外边距、边距模型)
  14. ValueError: only one element tensors can be converted to Python scalars
  15. 计算机网络语音传输杂音回音,Win10系统中QQ语音有回音噪音该如何解决?
  16. Arthas线上问题定位神器
  17. Python 自动化教程(5) : 自动生成Word文件
  18. 在Ubuntu上安装VM WarePlayer 编译不过
  19. Jquery实现表格动态增加一行,删除一行(最简洁的代码实现)
  20. openstack官方安装文档的解析--环境配置篇(1)

热门文章

  1. PMP第五章:项目范围管理
  2. VS Code安装,配置keil安装,Proteus8.6
  3. itest听力答案2020_itest听力题库答案
  4. 原材料涨价引发全LED显示屏全产业链价格上浮!
  5. opencv火焰检测小项目
  6. 云服务器 ECS Linux 配置 vsftpd即FTP的搭建和使用
  7. 初学爬虫-qq空间模拟登录
  8. 开发工具总结(8)之图文并茂全面总结上百个AS好用的插件(下)
  9. 计量广义差分操作过程_一分钟看完计量经济学
  10. windows消息钩子研究资料整理