【源码系列】Eureka源码分析
对于服务注册中心、服务提供者、服务消费者这个三个主要元素来说,服务提供者和服务消费者(即Eureka客户端)在整个运行机制中是大部分通信行为的主动发起者(服务注册、续约、下线等),而注册中心主要是处理请求的接收者。所以,我们从Eureka的客户端为入口分析它是如何完成这些主动通信的。
一般情况下,我们将一个SpringBoot应用注册到 Eureka Server 或者从 Eureka Server 获取服务器列表时,就做了两件事:
- 在应用启动类添加注解 @EnableDiscoveryClient
- 在 application.properties 文件上用 eureka.client.service-url.defaultZone 参数指定注册中心的地址
我们先看看 @EnableDiscoveryClient 这个注解的源码,如下:
/*** Annotation to enable a DiscoveryClient implementation.* @author Spencer Gibb*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {/*** If true, the ServiceRegistry will automatically register the local server.*/boolean autoRegister() default true;
}
通过注释可以知道,该注解可以开启 DiscoveryClient 实例,然后我们搜索 DiscoveryClient 会发现一个类和一个接口,它们的关系如图。
右边的org.springframework.cloud.client.discovery.DiscoveryClient 是SpringCloud的接口,体现了面向接口编程的思想,定义了用来发现服务的常用抽象方法。org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient是该接口的实现,是对Eureka发现服务的封装,内部依赖了一个EurekaClient接口,所以真正实现发现服务的是com.netflix.discovery.DiscoveryClient类。
查看类注释的内容:
/*** The class that is instrumental for interactions with <tt>Eureka Server</tt>.** <p>* <tt>Eureka Client</tt> is responsible for a) <em>Registering</em> the* instance with <tt>Eureka Server</tt> b) <em>Renewal</em>of the lease with* <tt>Eureka Server</tt> c) <em>Cancellation</em> of the lease from* <tt>Eureka Server</tt> during shutdown* <p>* d) <em>Querying</em> the list of services/instances registered with* <tt>Eureka Server</tt>* <p>** <p>* <tt>Eureka Client</tt> needs a configured list of <tt>Eureka Server</tt>* {@link java.net.URL}s to talk to.These {@link java.net.URL}s are typically amazon elastic eips* which do not change. All of the functions defined above fail-over to other* {@link java.net.URL}s specified in the list in the case of failure.* </p>** @author Karthik Ranganathan, Greg Kim* @author Spencer Gibb**/
@Singleton
public class DiscoveryClient implements EurekaClient {...
}
这个类用于帮助与 Eureka Server 相互协作
Eureka Client客户端负责以下内容:
- 向Eureka Server 注册服务实例
- 向 Eureka Server 服务续约
- 服务关闭时取消租约
- 查询注册在 Eureka Server 上的服务或实例列表
Eureka Client 还需要配置一个 Eureka Server 的服务列表。
哪里对Eureka Server的URL列表配置?
根据我们配置的属性名eureka.client.serviceUrl.defaultZone,通过serviceUrl可以找到该属性相关的加载属性,就是DiscoveryClient里有个getEurekaServiceUrlsFromConfig()方法但是弃用了,改用EndpointUtils这个工具类,代码如下:
- /**
- * Get the list of all eureka service urls from properties file for the eureka client to talk to.
- *
- * @param clientConfig the clientConfig to use
- * @param instanceZone The zone in which the client resides
- * @param preferSameZone true if we have to prefer the same zone as the client, false otherwise
- * @return an (ordered) map of zone -> list of urls mappings, with the preferred zone first in iteration order
- */
- public static Map<String, List<String>> getServiceUrlsMapFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) {
- Map<String, List<String>> orderedUrls = new LinkedHashMap<>();
- String region = getRegion(clientConfig);
- String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
- if (availZones == null || availZones.length == 0) {
- availZones = new String[1];
- availZones[0] = DEFAULT_ZONE;
- }
- logger.debug("The availability zone for the given region {} are {}", region, availZones);
- int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);
- String zone = availZones[myZoneOffset];
- List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);
- if (serviceUrls != null) {
- orderedUrls.put(zone, serviceUrls);
- }
- int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1);
- while (currentOffset != myZoneOffset) {
- zone = availZones[currentOffset];
- serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);
- if (serviceUrls != null) {
- orderedUrls.put(zone, serviceUrls);
- }
- if (currentOffset == (availZones.length - 1)) {
- currentOffset = 0;
- } else {
- currentOffset++;
- }
- }
- if (orderedUrls.size() < 1) {
- throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!");
- }
- return orderedUrls;
- }
Region,Zone
getRegion()方法可以看出一个微服务应用只可以属于一个Region,如果没配置则为default,可以通过eureka.client.region属性来定义。
public static String getRegion(EurekaClientConfig clientConfig) {String region = clientConfig.getRegion();if (region == null) {region = DEFAULT_REGION;}region = region.trim().toLowerCase();return region;}
getAvailabilityZones()方法可以看出Region与Zone的关系,一个Region可以有多个Zone,设置时可以用逗号来分隔。默认采用defaultZone。
public String[] getAvailabilityZones(String region) {String value = (String)this.availabilityZones.get(region);if(value == null) {value = "defaultZone";}return value.split(",");}
在获取Region和Zone的信息后,根据传入的参数按一定的算法确定加载位于哪一个Zone的serviceUrls。
getEurekaServerServiceUrls方法是EurekaClientConfigBean的实现类,该方法用来获取一个Zone下配置的所以serviceUrl,通过标注出来的地方可以知道,eureka.client.serviceUrl.defaultZone属性可以配置多个,用逗号来分隔。
注意: Ribbon具有区域亲和特性,Ribbon的默认策略会优先访问同客户端处于同一个Zone中的实例。所以通过Zone属性的定义,配置实际部署的物理结构,我们就可以有效地设计出对区域性故障的容错集群。
服务注册
前面说了多个服务注册中心信息的加载,这里再看看 DiscoveryClient 类是如何实现服务注册的。通过查看该类的构造函数,发现它调用了以下方法。
/*** Initializes all scheduled tasks.*/private void initScheduledTasks() {...if (clientConfig.shouldRegisterWithEureka()) {...// Heartbeat timerscheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()),renewalIntervalInSecs, TimeUnit.SECONDS);// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSize...instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}}
这里先根据配置判断是不是要注册到 Eureka,然后创建心跳检测任务,获取 instanceInfoReplicator。InstanceInfoReplicator类实现 Runnable接口,instanceInfoReplicator实例会执行一个定时任务,这个定时任务的内容可以查看该类的run()方法。
这里定时刷新实例信息,discoveryClient.register()这里触发了服务注册,register()的内容如下:
通过注释也能看出来,这里是通过发送REST请求的方式进行的,com.netflix.appinfo.InstanceInfo就是注册时客户端给服务端的元数据。
服务获取与服务续约
上面说到的 initScheduledTasks() 方法还有两个定时任务,分别是服务获取和服务续约。
private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();scheduler.schedule(new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread()),registryFetchIntervalSeconds, TimeUnit.SECONDS);}if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);// Heartbeat timerscheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()),renewalIntervalInSecs, TimeUnit.SECONDS);...instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}}
clientConfig.shouldFetchRegistry()这里其实是通过eureka.client.fetch-registry参数来判断的,默认为true,它可以定期更新客户端的服务清单,从而客户端能访问到健康的服务实例。
服务续约也是发送REST请求实现的。
boolean renew() {EurekaHttpResponse<InstanceInfo> httpResponse;try {httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());if (httpResponse.getStatusCode() == 404) {REREGISTER_COUNTER.increment();logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());long timestamp = instanceInfo.setIsDirtyWithTime();boolean success = register();if (success) {instanceInfo.unsetIsDirty(timestamp);}return success;}return httpResponse.getStatusCode() == 200;} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);return false;}}
服务获取的过程省略。
服务下线
服务端根据实例Id和appName执行remove操作。
void unregister() {// It can be null if shouldRegisterWithEureka == falseif(eurekaTransport != null && eurekaTransport.registrationClient != null) {try {logger.info("Unregistering ...");EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());} catch (Exception e) {logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);}}}
注册中心处理
前面的分析都是从客户端出发的,现在看看 Eureka Server是如何处理各种Rest请求的。这种请求的定义都在com.netflix.eureka.resources包下。
以服务注册为例:
调用 ApplicationResource 类下的 addInstance()方法。
@POST@Consumes({"application/json", "application/xml"})public Response addInstance(InstanceInfo info, @HeaderParam("x-netflix-discovery-replication") String isReplication) {logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);if(this.isBlank(info.getId())) {return Response.status(400).entity("Missing instanceId").build();} else if(this.isBlank(info.getHostName())) {return Response.status(400).entity("Missing hostname").build();} else if(this.isBlank(info.getIPAddr())) {return Response.status(400).entity("Missing ip address").build();} else if(this.isBlank(info.getAppName())) {return Response.status(400).entity("Missing appName").build();} else if(!this.appName.equals(info.getAppName())) {return Response.status(400).entity("Mismatched appName, expecting " + this.appName + " but was " + info.getAppName()).build();} else if(info.getDataCenterInfo() == null) {return Response.status(400).entity("Missing dataCenterInfo").build();} else if(info.getDataCenterInfo().getName() == null) {return Response.status(400).entity("Missing dataCenterInfo Name").build();} else {DataCenterInfo dataCenterInfo = info.getDataCenterInfo();if(dataCenterInfo instanceof UniqueIdentifier) {String dataCenterInfoId = ((UniqueIdentifier)dataCenterInfo).getId();if(this.isBlank(dataCenterInfoId)) {boolean experimental = "true".equalsIgnoreCase(this.serverConfig.getExperimental("registration.validation.dataCenterInfoId"));if(experimental) {String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";return Response.status(400).entity(entity).build();}if(dataCenterInfo instanceof AmazonInfo) {AmazonInfo amazonInfo = (AmazonInfo)dataCenterInfo;String effectiveId = amazonInfo.get(MetaDataKey.instanceId);if(effectiveId == null) {amazonInfo.getMetadata().put(MetaDataKey.instanceId.getName(), info.getId());}} else {logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());}}}this.registry.register(info, "true".equals(isReplication));return Response.status(204).build();}}
在对注册信息进行校验后,会调用org.springframework.cloud.netflix.eureka.server.InstanceRegistry的register(InstanceInfo info, int leaseDuration, boolean isReplication)方法。
首先会把新服务注册事件传播出去,然后调用父类com.netflix.eureka.registry.AbstractInstanceRegistry中的实现。
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {try {this.read.lock();Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName());...} finally {this.read.unlock();}}
InstanceInfo的元数据信息保存在一个ConcurrentHashMap中,它是一个双层的Map结构,第一层的key是服务名(即InstanceInfo的appName属性),第二层的key是实例名(即InstanceInfo的InstanceId属性)。
ApplicationResource中的其他方法可以自行研究。
转载于:https://www.cnblogs.com/2YSP/p/11072255.html
【源码系列】Eureka源码分析相关推荐
- Java集合Collection源码系列-ArrayList源码分析
Java集合系列-ArrayList源码分析 文章目录 Java集合系列-ArrayList源码分析 前言 一.为什么想去分析ArrayList源码? 二.源码分析 1.宏观上分析List 2.方法汇 ...
- JUC源码系列-CountDownLatch源码研读
前言 CountDownLatch是一个很有用的工具,latch是门闩的意思,该工具是为了解决某些操作只能在一组操作全部执行完成后才能执行的情景.例如,小组早上开会,只有等所有人到齐了才能开:再如,游 ...
- 【SpringClould】Spring Cloud Eureka源码分析
文章目录 1.概述 1.1 Eureka的一些概念 2.源码分析 2.1 Eureka Server源码 2.1.1 `@EnableEurekaServer`注解 2.1.2 EurekaServe ...
- 微服务发现与注册之Eureka源码分析
作者:陌北有棵树,Java人,架构师社区合伙人! [一]微服务之服务发现概述 关于微服务,近年来可谓是大火,业界也吹刮着一种实践微服务的风潮.本人有幸在去年参与到一个向微服务过渡的产品,再结合自己所学 ...
- Eureka源码分析
Eureka源码分析 Eureka server 入口: Spring.factories PS: 意味着如果加载EurekaServerAutoConfiguration成功,需要 @Conditi ...
- c++ map 获取key列表_好未来Golang源码系列一:Map实现原理分析
分享老师:学而思网校 郭雨田 一.map的结构与设计原理 golang中map是一个kv对集合.底层使用hash table,用链表来解决冲突 ,出现冲突时,不是每一个key都申请一个结构通过链表串起 ...
- 【Linux后台开发系列】Nginx源码从模块开发开始,不再对nginx源码陌生丨源码分析
Nginx源码从模块开发开始,不再对nginx源码发怵,值得学习,认真听完. 1. nginx的conf配置,cmd解析 2. nginx模块的八股文 3. nginx开发的细枝末节 [Linu ...
- 4、Eureka 源码解析 之 Eureka Client 启动原理分析
在前面的一篇文章 3.Eureka 源码解析 之 Eureka Server 启动原理分析当中我们分析了一下 Eureka Server 的启动.在集群环境下 Eureka Server 相互之前需要 ...
- 【SpringCloud微服务】第3章 服务治理SpringCloudEureka(五)——Eureka源码分析
2.8 Eureka 源码分析 首先,对于服务注册中心.服务提供者.服务消费者这三个主要元素来说,后两者(也就是Eureka客户端)在整个运行机制中是大部分通信行为的主动发起者,而注册中心主要是处 ...
- Nacos源码系列——第一章(Nacos核心源码主线剖析上)
在讲具体的源码之前,我有几点想说明下,很多开发可能觉得源码不重要,甚至觉得互联网 的知识,目前够用就可以,也不需要多么精通.的确,在大多数的公司中,你能用你的知识 解决问题就可以,不一定非要涉及到源码 ...
最新文章
- java和android 语法区别_PET和PTE的区别在哪里?出国留学到底选哪个
- 实训说明书 在线音乐平台项目规格说明书
- 转载:jsonp详解
- arcgis flexviewer中由Application向widget传值
- 【实践】对比学习在快手推荐系统中的的应用探索
- python复杂代码示例_6 个例子教你重构 Python 代码
- 了解编译原理-笔记小结
- Python io – BytesIO,StringIO
- MIMO-OTFS in High-Doppler Fading Channels:Signal Detection and Channel Estimation(4)
- linux备份整个系统
- 【机器学习】DBSCAN聚类算法
- windows10家庭中文版设置共享文件密码访问
- 供应链金融(Supply Chain Finance)
- python读取千万级数据库数据类型_解决python读取几千万行的大表内存问题
- 英语四级和计算机二级考试冲突吗,大学必须过英语四级吗?必须过计算机二级吗?...
- html页面和手机比例一致 一比一自适应 Mixed Content: The page at ‘xxx‘ was loaded over HTTPS, but requested an insec
- C#实现将度分秒化为弧度值
- 威尔逊置信区间 php,应用:推荐系统-威尔逊区间法
- 阿里云服务器如何升级公网带宽
- 为什么计算机32到64位,为你解答win764位和32位有什么区别