深入理解SpringCloud之Eureka注册过程分析
eureka是一种去中心化的服务治理应用,其显著特点是既可以作为服务端又可以作为服务向自己配置的地址进行注册。那么这篇文章就来探讨一下eureka的注册流程。
一、Eureka的服务端
eureka的服务端核心类是EurekaBootstrap,该类实现了一个ServletContextListener的监听器。因此我们可以断定eureka是基于servlet容器实现的。关键代码如下:
public class EurekaBootStrap implements ServletContextListener {//...省略相关代码 /*** Initializes Eureka, including syncing up with other Eureka peers and publishing the registry.** @see* javax.servlet.ServletContextListener#contextInitialized(javax.servlet.ServletContextEvent)*/@Overridepublic void contextInitialized(ServletContextEvent event) {try {initEurekaEnvironment();initEurekaServerContext();ServletContext sc = event.getServletContext();sc.setAttribute(EurekaServerContext.class.getName(), serverContext);} catch (Throwable e) {logger.error("Cannot bootstrap eureka server :", e);throw new RuntimeException("Cannot bootstrap eureka server :", e);}} //省略相关代码..... }
我们可以看到在ServletContext初始化完成时,会初始化Eureka环境,然后初始化EurekaServerContext,那么我们在看一看initEurekaServerContext方法:
/*** init hook for server context. Override for custom logic.*/protected void initEurekaServerContext() throws Exception {// .....ApplicationInfoManager applicationInfoManager = null;if (eurekaClient == null) {EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())? new CloudInstanceConfig(): new MyDataCenterInstanceConfig();applicationInfoManager = new ApplicationInfoManager(instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);} else {applicationInfoManager = eurekaClient.getApplicationInfoManager();}PeerAwareInstanceRegistry registry;if (isAws(applicationInfoManager.getInfo())) {registry = new AwsInstanceRegistry(eurekaServerConfig,eurekaClient.getEurekaClientConfig(),serverCodecs,eurekaClient);awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);awsBinder.start();} else {registry = new PeerAwareInstanceRegistryImpl(eurekaServerConfig,eurekaClient.getEurekaClientConfig(),serverCodecs,eurekaClient);}//....省略部分代码}
在这个方法里会创建许多与eureka服务相关的对象,在这里我列举了两个核心对象分别是eurekaClient与PeerAwareInstanceRegistry,关于客户端部分我们等会再说,我们现在来看看PeerAwareInstanceRegistry到底是做什么用的,这里我写贴出关于这个类的类图:
根据类图我们可以清晰的发现PeerAwareInstanceRegistry的最顶层接口为LeaseManager与LookupService,其中LookupService定义了最基本的发现示例的行为而LeaseManager定义了处理客户端注册,续约,注销等操作。那么在这篇文章我们还是重点关注一下LeaseManager的相关接口的实现。回过头来我们在看PeerAwareInstanceRegistry,其实这个类用于多个节点下复制相关信息,比如说一个节点注册续约与下线那么通过这个类将会相关复制(通知)到各个节点。我们来看看它是怎么处理客户端注册的:
/*** Registers the information about the {@link InstanceInfo} and replicates* this information to all peer eureka nodes. If this is replication event* from other replica nodes then it is not replicated.** @param info* the {@link InstanceInfo} to be registered and replicated.* @param isReplication* true if this is a replication event from other replica nodes,* false otherwise.*/@Overridepublic void register(final InstanceInfo info, final boolean isReplication) {int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {leaseDuration = info.getLeaseInfo().getDurationInSecs();}super.register(info, leaseDuration, isReplication);replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);}
我们可以看到它调用了父类的register方法后又通过replicateToPeers复制对应的行为到其他节点,具体如何复制的先不在这里讨论,我们重点来看看注册方法,我们在父类里找到register()方法:
/*** Registers a new instance with a given duration.** @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)*/public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try {read.lock();Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());REGISTER.increment(isReplication);if (gMap == null) {final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);if (gMap == null) {gMap = gNewMap;}}Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());// Retain the last dirty timestamp without overwriting it, if there is already a leaseif (existingLease != null && (existingLease.getHolder() != null)) {Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted// InstanceInfo instead of the server local copy.if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");registrant = existingLease.getHolder();}} else {// The lease does not exist and hence it is a new registrationsynchronized (lock) {if (this.expectedNumberOfRenewsPerMin > 0) {// Since the client wants to cancel it, reduce the threshold// (1// for 30 seconds, 2 for a minute)this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;this.numberOfRenewsPerMinThreshold =(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());}}logger.debug("No previous lease information found; it is new registration");}Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);if (existingLease != null) {lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());}gMap.put(registrant.getId(), lease);//。。。省略部分代码 }
通过源代码,我们来简要梳理一下流程:
1)首先根据appName获取一些列的服务实例对象,如果为Null,则新创建一个map并把当前的注册应用程序信息添加到此Map当中,这里有一个Lease对象,这个类描述了泛型T的时间属性,比如说注册时间,服务启动时间,最后更新时间等,大家可以关注一下它的实现:
/** Copyright 2012 Netflix, Inc.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.netflix.eureka.lease;import com.netflix.eureka.registry.AbstractInstanceRegistry;/*** Describes a time-based availability of a {@link T}. Purpose is to avoid* accumulation of instances in {@link AbstractInstanceRegistry} as result of ungraceful* shutdowns that is not uncommon in AWS environments.** If a lease elapses without renewals, it will eventually expire consequently* marking the associated {@link T} for immediate eviction - this is similar to* an explicit cancellation except that there is no communication between the* {@link T} and {@link LeaseManager}.** @author Karthik Ranganathan, Greg Kim*/ public class Lease<T> {enum Action {Register, Cancel, Renew};public static final int DEFAULT_DURATION_IN_SECS = 90;private T holder;private long evictionTimestamp;private long registrationTimestamp;private long serviceUpTimestamp;// Make it volatile so that the expiration task would see this quickerprivate volatile long lastUpdateTimestamp;private long duration;public Lease(T r, int durationInSecs) {holder = r;registrationTimestamp = System.currentTimeMillis();lastUpdateTimestamp = registrationTimestamp;duration = (durationInSecs * 1000);}/*** Renew the lease, use renewal duration if it was specified by the* associated {@link T} during registration, otherwise default duration is* {@link #DEFAULT_DURATION_IN_SECS}.*/public void renew() {lastUpdateTimestamp = System.currentTimeMillis() + duration;}/*** Cancels the lease by updating the eviction time.*/public void cancel() {if (evictionTimestamp <= 0) {evictionTimestamp = System.currentTimeMillis();}}/*** Mark the service as up. This will only take affect the first time called,* subsequent calls will be ignored.*/public void serviceUp() {if (serviceUpTimestamp == 0) {serviceUpTimestamp = System.currentTimeMillis();}}/*** Set the leases service UP timestamp.*/public void setServiceUpTimestamp(long serviceUpTimestamp) {this.serviceUpTimestamp = serviceUpTimestamp;}/*** Checks if the lease of a given {@link com.netflix.appinfo.InstanceInfo} has expired or not.*/public boolean isExpired() {return isExpired(0l);}/*** Checks if the lease of a given {@link com.netflix.appinfo.InstanceInfo} has expired or not.** Note that due to renew() doing the 'wrong" thing and setting lastUpdateTimestamp to +duration more than* what it should be, the expiry will actually be 2 * duration. This is a minor bug and should only affect* instances that ungracefully shutdown. Due to possible wide ranging impact to existing usage, this will* not be fixed.** @param additionalLeaseMs any additional lease time to add to the lease evaluation in ms.*/public boolean isExpired(long additionalLeaseMs) {return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));}/*** Gets the milliseconds since epoch when the lease was registered.** @return the milliseconds since epoch when the lease was registered.*/public long getRegistrationTimestamp() {return registrationTimestamp;}/*** Gets the milliseconds since epoch when the lease was last renewed.* Note that the value returned here is actually not the last lease renewal time but the renewal + duration.** @return the milliseconds since epoch when the lease was last renewed.*/public long getLastRenewalTimestamp() {return lastUpdateTimestamp;}/*** Gets the milliseconds since epoch when the lease was evicted.** @return the milliseconds since epoch when the lease was evicted.*/public long getEvictionTimestamp() {return evictionTimestamp;}/*** Gets the milliseconds since epoch when the service for the lease was marked as up.** @return the milliseconds since epoch when the service for the lease was marked as up.*/public long getServiceUpTimestamp() {return serviceUpTimestamp;}/*** Returns the holder of the lease.*/public T getHolder() {return holder;}}
View Code
2)根据当前注册的ID,如果能在map中取到则做以下操作:
2.1)根据当前存在节点的触碰时间和注册节点的触碰时间比较,如果前者的时间晚于后者的时间,那么当前注册的实例就以已存在的实例为准
2.2)否则更新其每分钟期望的续约数量及其阈值
3)将当前的注册节点存到map当中,至此我们的注册过程基本告一段落了
二、eureka客户端
在服务端servletContext初始化完毕时,会创建DiscoveryClient。熟悉eureka的朋友,一定熟悉这两个属性:fetchRegistry与registerWithEureka。在springcloud中集成eureka独立模式运行时,如果这两个值不为false,那么启动会报错,为什么会报错呢?其实答案就在DiscoveryClient的构造函数中:
@InjectDiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider<BackupRegistry> backupRegistryProvider) {//....省略部分代码if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {logger.info("Client configured to neither register nor query for data.");scheduler = null;heartbeatExecutor = null;cacheRefreshExecutor = null;eurekaTransport = null;instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()// to work with DI'd DiscoveryClientDiscoveryManager.getInstance().setDiscoveryClient(this);DiscoveryManager.getInstance().setEurekaClientConfig(config);initTimestampMs = System.currentTimeMillis();logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",initTimestampMs, this.getApplications().size());return; // no need to setup up an network tasks and we are done }try {// default size of 2 - 1 each for heartbeat and cacheRefreshscheduler = Executors.newScheduledThreadPool(2,new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-%d").setDaemon(true).build());heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build()); // use direct handoff cacheRefreshExecutor = new ThreadPoolExecutor(1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build()); // use direct handoff eurekaTransport = new EurekaTransport();scheduleServerEndpointTask(eurekaTransport, args);//....省略部分代码initScheduledTasks();//.... }
根据源代码,我们可以得出以下结论:
1)如果shouldRegisterWithEureka与shouldFetchRegistry都为false,那么直接return。
2)创建发送心跳与刷新缓存的线程池
3)初始化创建的定时任务
那么我们在看看initScheduledTasks()方法里有如下代码:
// Heartbeat timer scheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()),renewalIntervalInSecs, TimeUnit.SECONDS);
此处是触发一个定时执行的线程,以秒为单位,根据renewalIntervalInSecs值定时执行发送心跳,HeartbeatThread线程执行如下:
/*** The heartbeat task that renews the lease in the given intervals.*/private class HeartbeatThread implements Runnable {public void run() {if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();}}}
我们可以看到run方法里很简单执行renew方法,如果成功记录一下时间。renew方法:
/*** Renew with the eureka service by making the appropriate REST call*/boolean renew() {EurekaHttpResponse<InstanceInfo> httpResponse;try {httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());if (httpResponse.getStatusCode() == 404) {REREGISTER_COUNTER.increment();logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());long timestamp = instanceInfo.setIsDirtyWithTime();boolean success = register();if (success) {instanceInfo.unsetIsDirty(timestamp);}return success;}return httpResponse.getStatusCode() == 200;} catch (Throwable e) {logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);return false;}}
在这里发送心跳如果返回的是404,那么会执行注册操作,注意我们根据返回值httpResponse可以断定这一切的操作都是基于http请求的,到底是不是呢?我们继续看一下register方法:
/*** Register with the eureka service by making the appropriate REST call.*/boolean register() throws Throwable {logger.info(PREFIX + appPathIdentifier + ": registering service...");EurekaHttpResponse<Void> httpResponse;try {httpResponse = eurekaTransport.registrationClient.register(instanceInfo);} catch (Exception e) {logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);throw e;}if (logger.isInfoEnabled()) {logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());}return httpResponse.getStatusCode() == 204;}
在这里又调用了eurekaTransport里registrationClient的方法:
private static final class EurekaTransport {private ClosableResolver bootstrapResolver;private TransportClientFactory transportClientFactory;private EurekaHttpClient registrationClient;private EurekaHttpClientFactory registrationClientFactory;private EurekaHttpClient queryClient;private EurekaHttpClientFactory queryClientFactory;void shutdown() {if (registrationClientFactory != null) {registrationClientFactory.shutdown();}if (queryClientFactory != null) {queryClientFactory.shutdown();}if (registrationClient != null) {registrationClient.shutdown();}if (queryClient != null) {queryClient.shutdown();}if (transportClientFactory != null) {transportClientFactory.shutdown();}if (bootstrapResolver != null) {bootstrapResolver.shutdown();}}}
在这里我们可以看到,eureka的客户端是使用http请求进行注册服务的,也就是说当我们创建DiscoveryClient就会向服务端进行实例的注册。
三、服务端提供的rest服务
服务端提供用于处理客户端注册请求的代码我们已经看过了,既然客户端是通过走HTTP协议进行注册的,那服务端总要有处理这个http请求的地址吧,其实eureka服务端是采用jax-rs标准提供rest方式进行暴露服务的,我们可以看一下这个类ApplicationResoure的addInstance方法:
/*** Registers information about a particular instance for an* {@link com.netflix.discovery.shared.Application}.** @param info* {@link InstanceInfo} information of the instance.* @param isReplication* a header parameter containing information whether this is* replicated from other nodes.*/@POST@Consumes({"application/json", "application/xml"})public Response addInstance(InstanceInfo info,@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);// validate that the instanceinfo contains all the necessary required fieldsif (isBlank(info.getId())) {return Response.status(400).entity("Missing instanceId").build();} else if (isBlank(info.getHostName())) {return Response.status(400).entity("Missing hostname").build();} else if (isBlank(info.getIPAddr())) {return Response.status(400).entity("Missing ip address").build();} else if (isBlank(info.getAppName())) {return Response.status(400).entity("Missing appName").build();} else if (!appName.equals(info.getAppName())) {return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();} else if (info.getDataCenterInfo() == null) {return Response.status(400).entity("Missing dataCenterInfo").build();} else if (info.getDataCenterInfo().getName() == null) {return Response.status(400).entity("Missing dataCenterInfo Name").build();}// handle cases where clients may be registering with bad DataCenterInfo with missing dataDataCenterInfo dataCenterInfo = info.getDataCenterInfo();if (dataCenterInfo instanceof UniqueIdentifier) {String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();if (isBlank(dataCenterInfoId)) {boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));if (experimental) {String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";return Response.status(400).entity(entity).build();} else if (dataCenterInfo instanceof AmazonInfo) {AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);if (effectiveId == null) {amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());}} else {logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());}}}registry.register(info, "true".equals(isReplication));return Response.status(204).build(); // 204 to be backwards compatible}
转载于:https://www.cnblogs.com/niechen/p/9092544.html
深入理解SpringCloud之Eureka注册过程分析相关推荐
- SpringCloud(三) Eureka注册中心介绍以及单机版搭建
一.Eureka 介绍 Spring Cloud Eureka 是 Spring Cloud Netfix微服务套件中的一部分,它基于 Netfix Eureka 做了二次封装,主要负责完成微服务架构 ...
- (一)SpringCloud之Eureka注册中心
Eureka注册中心 1.什么是注册中心 打个比方,注册中心就好比手机中的通讯录,所有的联系人的联系方式就在这个通讯录中储存.当需要打电话的时候,只需要查询通讯录就可以获取某个联系人的联系方式. 注册 ...
- SpringCloud之一eureka注册中心(Greenwich版本)
创建服务注册中心 采用Eureka作为服务注册与发现的组件 创建一个maven主工程 首先创建一个主Maven工程,在其pom文件引入依赖,spring Boot版本为2.1.3.RELEASE,Sp ...
- SpringCloud之 Eureka注册中心
文章目录 Eureka注册中心 一.服务注册与发现 1.1 依赖导入
- SpringCloud将Eureka注册中心更改为CSE注册中心
我的开发环境是SpringCloud+Eureka,现在要更改为SpringCloud+CSE 1:环境准备 下载CSE本地注册中心:https://cse-bucket.obs.cn-north-1 ...
- SpringCloud,Eureka,服务注册,微服务之间的项目调用
文章目录 前言 一.Eureka是什么? 二.项目实现步骤 1.项目外壳的搭建 2.Eureka注册中心服务搭建 3.Provider(提供者)服务搭建 4.Consumer(消费者)服务搭建 总结 ...
- 走进Spring Cloud之二 eureka注册中心(Greenwich版本)
走进Spring Cloud之二 eureka注册中心(Greenwich版本) eureka 构建SpringCloud 工程 eureka 注册中心 eureka-server moudle po ...
- springcloud(二):注册中心Eureka
Eureka是Netflix开源的一款提供服务注册和发现的产品,它提供了完整的Service Registry和Service Discovery实现.也是springcloud体系中最重要最核心的组 ...
- springCloud - 第11篇 - Eureka 注册中心集群的实现
前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家.点击跳转到教程. eureka 作为整个微服务项目的注册中心,到目前为止,在我的系统中一直是单节点的,这样并不能作到高 ...
最新文章
- 多线程并发的解决方案 volatile synchronized notify notifyAll wait关键字分析
- composer报错:received xxx bytes out of the 以及composer手动引入扩展包
- css中float详解,CSS浮动属性Float详解?史上最全Float详解
- npz文件转为npy_numpy的文件存储 .npy .npz 文件
- jquery版相片墙(鼠标控制图片聚合和散开)
- 致openGauss社区用户的一封信
- Javascript的websocket的使用方法
- 合作︱2018CCF青年精英大会首设科技创业竞赛,快来报名吧!
- sudo apt-get install,出现了下面的Unable to locate package错误:
- jdk1.7 1.8新特性
- ros之service通讯
- C11头文件threads.h声明了创建和管理线程,信号,条件变量的函数
- 【Nokov】动作捕捉系统标定与机械臂各坐标系的说明
- java二叉树的遍历,递归与非递归方法
- Visual studio2019 装svn插件
- delphi android 微信支付,Delphi XE10实现移动端微信支付接口(含源码)
- 在线latex的一些操作
- 双线一柱变色MACD指标
- 4.7 合成复用原则
- 网络系统(Java web)开发与设计项目实战——实现用户登录与注册