为什么80%的码农都做不了架构师?>>>   

本文主要研究一下eureka的delta配置

client端配置

    {"sourceType": "org.springframework.cloud.netflix.eureka.EurekaClientConfigBean","defaultValue": false,"name": "eureka.client.disable-delta","description": "Indicates whether the eureka client should disable fetching of delta and should\n rather resort to getting the full registry information.\n\n Note that the delta fetches can reduce the traffic tremendously, because the rate\n of change with the eureka server is normally much lower than the rate of fetches.\n\n The changes are effective at runtime at the next registry fetch cycle as specified\n by registryFetchIntervalSeconds","type": "java.lang.Boolean"},{"sourceType": "org.springframework.cloud.netflix.eureka.EurekaClientConfigBean","defaultValue": false,"name": "eureka.client.log-delta-diff","description": "Indicates whether to log differences between the eureka server and the eureka\n client in terms of registry information.\n\n Eureka client tries to retrieve only delta changes from eureka server to minimize\n network traffic. After receiving the deltas, eureka client reconciles the\n information from the server to verify it has not missed out some information.\n Reconciliation failures could happen when the client has had network issues\n communicating to server.If the reconciliation fails, eureka client gets the full\n registry information.\n\n While getting the full registry information, the eureka client can log the\n differences between the client and the server and this setting controls that.\n\n The changes are effective at runtime at the next registry fetch cycle as specified\n by registryFetchIntervalSecondsr","type": "java.lang.Boolean"}

DiscoveryClient.fetchRegistry

eureka-client-1.8.8-sources.jar!/com/netflix/discovery/DiscoveryClient.java

    /*** Fetches the registry information.** <p>* This method tries to get only deltas after the first fetch unless there* is an issue in reconciling eureka server and client registry information.* </p>** @param forceFullRegistryFetch Forces a full registry fetch.** @return true if the registry was fetched*/private boolean fetchRegistry(boolean forceFullRegistryFetch) {Stopwatch tracer = FETCH_REGISTRY_TIMER.start();try {// If the delta is disabled or if it is the first time, get all// applicationsApplications applications = getApplications();if (clientConfig.shouldDisableDelta()|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))|| forceFullRegistryFetch|| (applications == null)|| (applications.getRegisteredApplications().size() == 0)|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta{logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());logger.info("Force full registry fetch : {}", forceFullRegistryFetch);logger.info("Application is null : {}", (applications == null));logger.info("Registered Applications size is zero : {}",(applications.getRegisteredApplications().size() == 0));logger.info("Application version is -1: {}", (applications.getVersion() == -1));getAndStoreFullRegistry();} else {getAndUpdateDelta(applications);}applications.setAppsHashCode(applications.getReconcileHashCode());logTotalInstances();} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);return false;} finally {if (tracer != null) {tracer.stop();}}// Notify about cache refresh before updating the instance remote statusonCacheRefreshed();// Update remote status based on refreshed data held in the cacheupdateInstanceRemoteStatus();// registry was fetched successfully, so return truereturn true;}

如果禁用了delta,则client端则调用getAndStoreFullRegistry()全量接口;开启的话,则调用getAndUpdateDelta(applications),对本地region的applications进行delta更新。

getAndUpdateDelta

    /*** Get the delta registry information from the eureka server and update it locally.* When applying the delta, the following flow is observed:** if (update generation have not advanced (due to another thread))*   atomically try to: update application with the delta and get reconcileHashCode*   abort entire processing otherwise*   do reconciliation if reconcileHashCode clash* fi** @return the client response* @throws Throwable on error*/private void getAndUpdateDelta(Applications applications) throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();Applications delta = null;EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {delta = httpResponse.getEntity();}if (delta == null) {logger.warn("The server does not allow the delta revision to be applied because it is not safe. "+ "Hence got the full registry.");getAndStoreFullRegistry();} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());String reconcileHashCode = "";if (fetchRegistryUpdateLock.tryLock()) {try {updateDelta(delta);reconcileHashCode = getReconcileHashCode(applications);} finally {fetchRegistryUpdateLock.unlock();}} else {logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");}// There is a diff in number of instances for some reasonif (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall}} else {logger.warn("Not updating application delta as another thread is updating it already");logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());}}

这里可以看到,如果开启了logDeltaDiff的话,则会调用reconcileAndLogDifference方法 另外这里底层请求的是queryClient.getDelta(remoteRegionsRef.get())方法

DiscoveryClient.updateDelta

eureka-client-1.8.8-sources.jar!/com/netflix/discovery/DiscoveryClient.java

    /*** Updates the delta information fetches from the eureka server into the* local cache.** @param delta*            the delta information received from eureka server in the last*            poll cycle.*/private void updateDelta(Applications delta) {int deltaCount = 0;for (Application app : delta.getRegisteredApplications()) {for (InstanceInfo instance : app.getInstances()) {Applications applications = getApplications();String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {Applications remoteApps = remoteRegionVsApps.get(instanceRegion);if (null == remoteApps) {remoteApps = new Applications();remoteRegionVsApps.put(instanceRegion, remoteApps);}applications = remoteApps;}++deltaCount;if (ActionType.ADDED.equals(instance.getActionType())) {Application existingApp = applications.getRegisteredApplications(instance.getAppName());if (existingApp == null) {applications.addApplication(app);}logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);} else if (ActionType.MODIFIED.equals(instance.getActionType())) {Application existingApp = applications.getRegisteredApplications(instance.getAppName());if (existingApp == null) {applications.addApplication(app);}logger.debug("Modified instance {} to the existing apps ", instance.getId());applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);} else if (ActionType.DELETED.equals(instance.getActionType())) {Application existingApp = applications.getRegisteredApplications(instance.getAppName());if (existingApp == null) {applications.addApplication(app);}logger.debug("Deleted instance {} to the existing apps ", instance.getId());applications.getRegisteredApplications(instance.getAppName()).removeInstance(instance);}}}logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);getApplications().setVersion(delta.getVersion());getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());for (Applications applications : remoteRegionVsApps.values()) {applications.setVersion(delta.getVersion());applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());}}

可以看到这里根据获取的delta数据的ActionType来修改本地的数据

queryClient.getDelta

eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/transport/jersey/AbstractJerseyEurekaHttpClient.java

    @Overridepublic EurekaHttpResponse<Applications> getDelta(String... regions) {return getApplicationsInternal("apps/delta", regions);}

可以看到请求的是/apps/delta接口

server端配置

    {"sourceType": "org.springframework.cloud.netflix.eureka.server.EurekaServerConfigBean","defaultValue": false,"name": "eureka.server.disable-delta","type": "java.lang.Boolean"},{"sourceType": "org.springframework.cloud.netflix.eureka.server.EurekaServerConfigBean","defaultValue": 0,"name": "eureka.server.delta-retention-timer-interval-in-ms","type": "java.lang.Long"},{"sourceType": "org.springframework.cloud.netflix.eureka.server.EurekaServerConfigBean","defaultValue": 0,"name": "eureka.server.retention-time-in-m-s-in-delta-queue","type": "java.lang.Long"}

ApplicationsResource.getContainerDifferential

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/resources/ApplicationsResource.java

    /*** Get information about all delta changes in {@link com.netflix.discovery.shared.Applications}.** <p>* The delta changes represent the registry information change for a period* as configured by* {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}. The* changes that can happen in a registry include* <em>Registrations,Cancels,Status Changes and Expirations</em>. Normally* the changes to the registry are infrequent and hence getting just the* delta will be much more efficient than getting the complete registry.* </p>** <p>* Since the delta information is cached over a period of time, the requests* may return the same data multiple times within the window configured by* {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}.The clients* are expected to handle this duplicate information.* <p>** @param version the version of the request.* @param acceptHeader the accept header to indicate whether to serve  JSON or XML data.* @param acceptEncoding the accept header to indicate whether to serve compressed or uncompressed data.* @param eurekaAccept an eureka accept extension, see {@link com.netflix.appinfo.EurekaAccept}* @param uriInfo  the {@link java.net.URI} information of the request made.* @return response containing the delta information of the*         {@link AbstractInstanceRegistry}.*/@Path("delta")@GETpublic Response getContainerDifferential(@PathParam("version") String version,@HeaderParam(HEADER_ACCEPT) String acceptHeader,@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,@Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();// If the delta flag is disabled in discovery or if the lease expiration// has been disabled, redirect clients to get all instancesif ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {return Response.status(Status.FORBIDDEN).build();}String[] regions = null;if (!isRemoteRegionRequested) {EurekaMonitors.GET_ALL_DELTA.increment();} else {regions = regionsStr.toLowerCase().split(",");Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();}CurrentRequestVersion.set(Version.toEnum(version));KeyType keyType = Key.KeyType.JSON;String returnMediaType = MediaType.APPLICATION_JSON;if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {keyType = Key.KeyType.XML;returnMediaType = MediaType.APPLICATION_XML;}Key cacheKey = new Key(Key.EntityType.Application,ResponseCacheImpl.ALL_APPS_DELTA,keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions);if (acceptEncoding != null&& acceptEncoding.contains(HEADER_GZIP_VALUE)) {return Response.ok(responseCache.getGZIP(cacheKey)).header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE).header(HEADER_CONTENT_TYPE, returnMediaType).build();} else {return Response.ok(responseCache.get(cacheKey)).build();}}

server端提供/apps/delta接口

ResponseCacheImpl.generatePayload

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/registry/ResponseCacheImpl.java

    /** Generate pay load for the given key.*/private Value generatePayload(Key key) {Stopwatch tracer = null;try {String payload;switch (key.getEntityType()) {case Application:boolean isRemoteRegionRequested = key.hasRegions();if (ALL_APPS.equals(key.getName())) {if (isRemoteRegionRequested) {tracer = serializeAllAppsWithRemoteRegionTimer.start();payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));} else {tracer = serializeAllAppsTimer.start();payload = getPayLoad(key, registry.getApplications());}} else if (ALL_APPS_DELTA.equals(key.getName())) {if (isRemoteRegionRequested) {tracer = serializeDeltaAppsWithRemoteRegionTimer.start();versionDeltaWithRegions.incrementAndGet();versionDeltaWithRegionsLegacy.incrementAndGet();payload = getPayLoad(key,registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));} else {tracer = serializeDeltaAppsTimer.start();versionDelta.incrementAndGet();versionDeltaLegacy.incrementAndGet();payload = getPayLoad(key, registry.getApplicationDeltas());}} else {tracer = serializeOneApptimer.start();payload = getPayLoad(key, registry.getApplication(key.getName()));}break;case VIP:case SVIP:tracer = serializeViptimer.start();payload = getPayLoad(key, getApplicationsForVip(key, registry));break;default:logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());payload = "";break;}return new Value(payload);} finally {if (tracer != null) {tracer.stop();}}}

这里提供了缓存不存在时触发的方法,具体重点看ALL_APPS_DELTA的方法,重点调用了registry.getApplicationDeltas()及registry.getApplicationDeltasFromMultipleRegions方法

registry.getApplicationDeltas()

    /*** Get the registry information about the delta changes. The deltas are* cached for a window specified by* {@link EurekaServerConfig#getRetentionTimeInMSInDeltaQueue()}. Subsequent* requests for delta information may return the same information and client* must make sure this does not adversely affect them.** @return all application deltas.* @deprecated use {@link #getApplicationDeltasFromMultipleRegions(String[])} instead. This method has a* flawed behavior of transparently falling back to a remote region if no instances for an app is available locally.* The new behavior is to explicitly specify if you need a remote region.*/@Deprecatedpublic Applications getApplicationDeltas() {GET_ALL_CACHE_MISS_DELTA.increment();Applications apps = new Applications();apps.setVersion(responseCache.getVersionDelta().get());Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();try {write.lock();Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();logger.debug("The number of elements in the delta queue is : {}",this.recentlyChangedQueue.size());while (iter.hasNext()) {Lease<InstanceInfo> lease = iter.next().getLeaseInfo();InstanceInfo instanceInfo = lease.getHolder();logger.debug("The instance id {} is found with status {} and actiontype {}",instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name());Application app = applicationInstancesMap.get(instanceInfo.getAppName());if (app == null) {app = new Application(instanceInfo.getAppName());applicationInstancesMap.put(instanceInfo.getAppName(), app);apps.addApplication(app);}app.addInstance(decorateInstanceInfo(lease));}boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();if (!disableTransparentFallback) {Applications allAppsInLocalRegion = getApplications(false);for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) {Applications applications = remoteRegistry.getApplicationDeltas();for (Application application : applications.getRegisteredApplications()) {Application appInLocalRegistry =allAppsInLocalRegion.getRegisteredApplications(application.getName());if (appInLocalRegistry == null) {apps.addApplication(application);}}}}Applications allApps = getApplications(!disableTransparentFallback);apps.setAppsHashCode(allApps.getReconcileHashCode());return apps;} finally {write.unlock();}}

可以看到这里主要是从recentlyChangedQueue取数据

recentlyChangedQueue

    private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<RecentlyChangedItem>();

这里存放的是RecentlyChangedItem AbstractInstanceRegistry在register、cancel、statusUpdate、deleteStatus等操作里头都会往该队列添加RecentlyChangedItem

deltaRetentionTimer

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/registry/AbstractInstanceRegistry.java

    protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {this.serverConfig = serverConfig;this.clientConfig = clientConfig;this.serverCodecs = serverCodecs;this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),serverConfig.getDeltaRetentionTimerIntervalInMs(),serverConfig.getDeltaRetentionTimerIntervalInMs());}

这里初始化了DeltaRetentionTask,调度间隔是serverConfig.getDeltaRetentionTimerIntervalInMs(),默认是30 * 1000

DeltaRetentionTask

eureka-core-1.8.8-sources.jar!/com/netflix/eureka/registry/ResponseCacheImpl.java

    private TimerTask getDeltaRetentionTask() {return new TimerTask() {@Overridepublic void run() {Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();while (it.hasNext()) {if (it.next().getLastUpdateTime() <System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {it.remove();} else {break;}}}};}

该任务主要是距离上次更新时间超过serverConfig.getRetentionTimeInMSInDeltaQueue()的值给remove掉 serverConfig.getRetentionTimeInMSInDeltaQueue()的值默认为3 * MINUTES

小结

eureka提供了delta参数,在client端及server端都有。client端主要是控制刷新registry的时候,是否使用调用/apps/delta接口,然后根据返回数据的ActionType来作用于本地数据。而server端则提供/apps/delta接口,它的主要逻辑是在registry的修改操作都会放recentlyChangedQueue存放RecentlyChangedItem事件,然后有个定时任务去剔除距离上次更新时间超过指定阈值的item;而查询接口则是从recentlyChangedQueue获取数据然后返回。

client端主要是eureka.client.disable-delta、eureka.client.log-delta-diff两个参数;server端主要是eureka.server.disable-delta、eureka.server.delta-retention-timer-interval-in-ms、eureka.server.retention-time-in-m-s-in-delta-queue三个参数。

doc

  • Eureka REST operations

转载于:https://my.oschina.net/go4it/blog/1819067

聊聊eureka的delta配置相关推荐

  1. JavaEE进阶知识学习-----SpringCloud(四)Eureka集群配置

    Eureka集群配置 microservicecloud-eureka-7001使EurekaServer服务注册中心,一旦这个出现问题,那么微服务就不能正常的工作,为防止这种情况,所以出现了集群,就 ...

  2. SpringCloud(三 Eureka集群配置)

    1.Eureka集群配置原理 互相注册!! 互相守望 ! 2.集群搭建 2.1 修改映射文件 到C:\Windows\System32\drivers\etc下的host文件中修改! 2.2 建mod ...

  3. SpringCloud Eureka Client和Server侧配置及Eureka高可用配置

    一.Eureka注册中心和客户端配置Demo. 1.Server端 a.使用Idea创建Spring项目,如下所示: b.相关配置 application.yaml配置文件如下: # eureka本身 ...

  4. 【Spring框架家族】Spring Cloud Eureka 之常用配置解析(转载)

    1. 配置项解析 1.1 通用配置 # 应用名称,将会显示在Eureka界面的应用名称列 spring.application.name=config-service# 应用端口,Eureka服务端默 ...

  5. Apollo注册到自己的Eureka注册中心+配置中心集群

    ##重要提示:在任何步骤开始之前,谨记下面的东西## 在对apollo-master这个文件进行处理的时候,要找到文件夹scripts下的文件build.bat,苹果用户请找到build.sh,并且在 ...

  6. 聊聊Eureka Server的REST API

    序 本文主要研究下Eureka Server的REST API ApplicationsResource eureka-core-1.8.8-sources.jar!/com/netflix/eure ...

  7. Eureka注册中心配置登录验证

    一.创建一个父工程,并且在父工程中指定SpringCloud的版本,并且将packaing修改为pom <packaging>pom</packaging> ​ <dep ...

  8. Eureka注册中心配置

    前言.案例需求: 修改order-service中的根据id查询订单业务,要求在查询订单的同时,根据订单中包含的userId查询出用户信息,一起返回. 因此,我们需要在order-service中 向 ...

  9. eureka自我保护机制配置关闭

    问题分析 eureka的自我保护机制指的是,当网络发生问题导致服务访问失败时,eureka注册中心不会马上把应用剔除,继续保留一段时间.但这样对我们实际开发来说,需要频繁重启应用的时候,不方便测试,需 ...

最新文章

  1. Go基础(复杂类型):映射
  2. 海思AI芯片(Hi3519A/3559A)方案学习(十四)JPEG图片转换成bgr文件
  3. 熟悉的亲切-老外婆教做的豌豆蔬菜汤
  4. 0基础学习ios开发笔记第二天
  5. 新写的c++日志库:log4K
  6. win10 详细配置JAVA环境变量(最详细),操作步骤如下:
  7. Visual Studio2012打开时弹出“遇到异常:这可能是由某个扩展导致的”错误的解决办法...
  8. 1644E. Expand the Path
  9. maven报错Non-resolvable parent POM for com.wpbxin:springboot2-first-example:0.0.1-SNAPSHOT: Could not
  10. 架构设计 - 自动化运维之架构设计六要点
  11. 简化业务代码开发:看Lambda表达式如何将代码封装为数据
  12. centos 6.3最小化安装,无法上网解决方法
  13. canvas绘图粒子扩散效果【原创】
  14. python找到文件夹下指定文件类型_python 读取指定文件夹中的指定文件类型的文件名...
  15. 21. Magento 创建新闻模块(2)
  16. AspectJWeaver
  17. 泛型编程 与 STL
  18. HTML5七夕情人节表白网页制作【爱情树-Html5实现唯美表白动画代码】HTML+CSS+JavaScript浪漫告白 求婚必备
  19. NNT 抢任务 神器 V1.0.5
  20. 笔记本java怎么启动独立显卡_笔记本双显卡怎么切换,告诉你笔记本双显卡怎么切换到独立显卡...

热门文章

  1. vue 渲染函数处理slot_详解Vue的slot新用法
  2. 业绩爆雷预测 六大异常财务指标效果实测
  3. 2019.01.11【BZOJ3308】【ProjectEuler335】 九月的咖啡店/Maximal coprime subset(费用流)
  4. php ckeditor 上传图片,CKEditor图片上传的PHP实现
  5. linux卸载informatica,【Informatica从零开始】第一弹之Informatica在linux下安装搭建
  6. kido机器人用流量吗_当4G遇到儿童手表 乐视Kido Watch评测
  7. 一个小白的BAT 文件编写之路
  8. 优雅的绕过校园网认证实现免费上网
  9. 用Python+可视化工具制作漂亮地图
  10. 如此惊艳的财务报表,领导想不重用你都难!