简介

Eureka是一种基于REST(Representational State Transfer)的服务,主要用于AWS云,用于定位服务,以实现中间层服务器的负载平衡和故障转移。我们将此服务称为Eureka Server。Eureka还附带了一个基于Java的客户端组件Eureka Client,它使与服务的交互变得更加容易。客户端还有一个内置的负载均衡器,可以进行基本的循环负载均衡。在Netflix,一个更复杂的负载均衡器包含Eureka基于流量,资源使用,错误条件等多种因素提供加权负载平衡,以提供卓越的弹性。
先看一张 github 上 Netflix Eureka 的一架构图,如下:

从图可以看出在这个体系中,有2个角色,即Eureka Server和Eureka Client。而Eureka Client又分为Applicaton Service和Application Client,即服务提供者何服务消费者。 每个区域有一个Eureka集群,并且每个区域至少有一个eureka服务器可以处理区域故障,以防服务器瘫痪。

Eureka Client 在 Eureka Server 注册,然后Eureka Client 每30秒向 Eureka Server 发送一次心跳来更新一次租约。如果 Eureka Client 无法续订租约几次,则会在大约90秒内 Eureka Server 将其从服务器注册表中删除。注册信息和续订将复制到群集中的所有 Eureka Server 节点。来自任何区域的客户端都可以查找注册表信息(每30秒发生一次)根据这些注册表信息,Application Client 可以远程调用 Applicaton Service 来消费服务。

源码分析

基于Spring Cloud的 eureka 的 client 端在启动类上加上 @EnableDiscoveryClient 注解,就可以 用 NetFlix 提供的 Eureka client。下面就以 @EnableDiscoveryClient 为入口,进行Eureka Client的源码分析。

@EnableDiscoveryClient,通过源码可以发现这是一个标记注解:

/*** Annotation to enable a DiscoveryClient implementation.* @author Spencer Gibb*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {boolean autoRegister() default true;
}

通过注释可以知道 @EnableDiscoveryClient 注解是用来 启用 DiscoveryClient 的实现,DiscoveryClient接口代码如下:

public interface DiscoveryClient {String description();List<ServiceInstance> getInstances(String serviceId);List<String> getServices();}

接口说明:

  • description():实现描述。
  • getInstances(String serviceId):获取与特定serviceId关联的所有ServiceInstance
  • getServices():返回所有已知的服务ID

DiscoveryClient 接口的实现结构图:

  • EurekaDiscoveryClient:Eureka 的 DiscoveryClient 实现类。
  • CompositeDiscoveryClient:用于排序可用客户端的发现客户端的顺序。
  • NoopDiscoveryClient:什么都不做的服务发现实现类,已经被废弃。
  • SimpleDiscoveryClient:简单的服务发现实现类 SimpleDiscoveryClient,具体的服务实例从 SimpleDiscoveryProperties 配置中获取。

EurekaDiscoveryClient 是 Eureka 对 DiscoveryClient接口的实现,代码如下:

public class EurekaDiscoveryClient implements DiscoveryClient {public static final String DESCRIPTION = "Spring Cloud Eureka Discovery Client";private final EurekaInstanceConfig config;private final EurekaClient eurekaClient;public EurekaDiscoveryClient(EurekaInstanceConfig config, EurekaClient eurekaClient) {this.config = config;this.eurekaClient = eurekaClient;}@Overridepublic String description() {return DESCRIPTION;}@Overridepublic List<ServiceInstance> getInstances(String serviceId) {List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId,false);List<ServiceInstance> instances = new ArrayList<>();for (InstanceInfo info : infos) {instances.add(new EurekaServiceInstance(info));}return instances;}@Overridepublic List<String> getServices() {Applications applications = this.eurekaClient.getApplications();if (applications == null) {return Collections.emptyList();}List<Application> registered = applications.getRegisteredApplications();List<String> names = new ArrayList<>();for (Application app : registered) {if (app.getInstances().isEmpty()) {continue;}names.add(app.getName().toLowerCase());}return names;}}

从代码可以看出 EurekaDiscoveryClient 实现了 DiscoveryClient 定义的规范接口,真正实现发现服务的是 EurekaClient,下面是 EurekaClient 依赖结构图:

EurekaClient 唯一实现类 DiscoveryClient,DiscoveryClient 的构造方法如下:

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider<BackupRegistry> backupRegistryProvider) {//省略...try {// default size of 2 - 1 each for heartbeat and cacheRefreshscheduler = Executors.newScheduledThreadPool(2,new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-%d").setDaemon(true).build());heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build());  // use direct handoffcacheRefreshExecutor = new ThreadPoolExecutor(1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build());  // use direct handoff//省略...initScheduledTasks();try {Monitors.registerObject(this);} catch (Throwable e) {logger.warn("Cannot register timers", e);}//省略...
}

可以看到这个构造方法里面,主要做了下面几件事:

  • 创建了scheduler定时任务的线程池,heartbeatExecutor心跳检查线程池(服务续约),cacheRefreshExecutor(服务获取)
  • 然后initScheduledTasks()开启上面三个线程池,往上面3个线程池分别添加相应任务。然后创建了一个instanceInfoReplicator(Runnable任务),然后调用InstanceInfoReplicator.start方法,把这个任务放进上面scheduler定时任务线程池(服务注册并更新)。

服务注册(Registry)

上面说了,initScheduledTasks()方法中调用了InstanceInfoReplicator.start()方法,InstanceInfoReplicator 的 run()方法代码如下:

public void run() {try {discoveryClient.refreshInstanceInfo();Long dirtyTimestamp = instanceInfo.isDirtyWithTime();if (dirtyTimestamp != null) {discoveryClient.register();instanceInfo.unsetIsDirty(dirtyTimestamp);}} catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);} finally {Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}
}

发现 InstanceInfoReplicator的run方法,run方法中会调用DiscoveryClient的register方法。DiscoveryClient 的 register方法 代码如下:

/*** Register with the eureka service by making the appropriate REST call.*/
boolean register() throws Throwable {logger.info(PREFIX + "{}: registering service...", appPathIdentifier);EurekaHttpResponse<Void> httpResponse;try {httpResponse = eurekaTransport.registrationClient.register(instanceInfo);} catch (Exception e) {logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);throw e;}if (logger.isInfoEnabled()) {logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());}return httpResponse.getStatusCode() == 204;
}

最终又经过一系列调用,最终会调用到AbstractJerseyEurekaHttpClient的register方法,代码如下:

public EurekaHttpResponse<Void> register(InstanceInfo info) {String urlPath = "apps/" + info.getAppName();ClientResponse response = null;try {Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();addExtraHeaders(resourceBuilder);response = resourceBuilder.header("Accept-Encoding", "gzip").type(MediaType.APPLICATION_JSON_TYPE).accept(MediaType.APPLICATION_JSON).post(ClientResponse.class, info);return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();} finally {if (logger.isDebugEnabled()) {logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),response == null ? "N/A" : response.getStatus());}if (response != null) {response.close();}}
}

可以看到最终通过http rest请求eureka server端,把应用自身的InstanceInfo实例注册给server端,我们再来完整梳理一下服务注册流程:

Renew服务续约

服务续约和服务注册非常类似,HeartbeatThread 代码如下:

private class HeartbeatThread implements Runnable {public void run() {if (renew()) {//更新最后一次心跳的时间lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();}}
}
// 续约的主方法
boolean renew() {EurekaHttpResponse<InstanceInfo> httpResponse;try {httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());if (httpResponse.getStatusCode() == 404) {REREGISTER_COUNTER.increment();logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());long timestamp = instanceInfo.setIsDirtyWithTime();boolean success = register();if (success) {instanceInfo.unsetIsDirty(timestamp);}return success;}return httpResponse.getStatusCode() == 200;} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);return false;}
}

发送心跳 ,请求eureka server 端 ,如果接口返回值为404,就是说服务不存在,那么重新走注册流程。

如果接口返回值为404,就是说不存在,从来没有注册过,那么重新走注册流程。

服务续约流程如下图:

服务下线cancel

在服务shutdown的时候,需要及时通知服务端把自己剔除,以避免客户端调用已经下线的服务,shutdown()方法代码如下:

public synchronized void shutdown() {if (isShutdown.compareAndSet(false, true)) {logger.info("Shutting down DiscoveryClient ...");if (statusChangeListener != null && applicationInfoManager != null) {applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());}// 关闭各种定时任务// 关闭刷新实例信息/注册的定时任务// 关闭续约(心跳)的定时任务// 关闭获取注册信息的定时任务cancelScheduledTasks();// If APPINFO was registeredif (applicationInfoManager != null&& clientConfig.shouldRegisterWithEureka()&& clientConfig.shouldUnregisterOnShutdown()) {// 更改实例状态,使实例不再接收流量applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);//向EurekaServer端发送下线请求unregister();}if (eurekaTransport != null) {eurekaTransport.shutdown();}heartbeatStalenessMonitor.shutdown();registryStalenessMonitor.shutdown();logger.info("Completed shut down of DiscoveryClient");}
}private void cancelScheduledTasks() {if (instanceInfoReplicator != null) {instanceInfoReplicator.stop();}if (heartbeatExecutor != null) {heartbeatExecutor.shutdownNow();}if (cacheRefreshExecutor != null) {cacheRefreshExecutor.shutdownNow();}if (scheduler != null) {scheduler.shutdownNow();}
}void unregister() {// It can be null if shouldRegisterWithEureka == falseif(eurekaTransport != null && eurekaTransport.registrationClient != null) {try {logger.info("Unregistering ...");EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());} catch (Exception e) {logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);}}
}

先关闭各种定时任务,然后向eureka server 发送服务下线通知。服务下线流程如下图:

参考

https://github.com/Netflix/eureka/wiki
http://yeming.me/2016/12/01/eureka1/
http://blog.didispace.com/springcloud-sourcecode-eureka/
https://www.jianshu.com/p/71a8bdbf03f4

Eureka(一): Eureka Client相关推荐

  1. 【spring-cloud】Eureka server和client之间的心跳通信

    为什么80%的码农都做不了架构师?>>>    启动两个Eureka Client,过了一会,停了其中一个,访问注册中心时,界面上显示了红色粗体警告信息: EMERGENCY! EU ...

  2. Eureka工作原理(Eureka简介Eureka ServerEureka Client自我保护机制分布式系统中的CAP理论Eureka 工作流程)

    一.Eureka简介 Eureka Server(注册中心,相当于zookeeper) Eureka Client: Provider Consumer 多个Eureka就叫集群.集群之间会定时通过r ...

  3. 什么是Eureka?Eureka能干什么?Eureka怎么用?

    目录 一.概念 1.1.什么是服务治理 1.2. 什么是Eureka 1.3. Eureka包含两个组件 1.4. 什么场景使用Eureka 1.5. Eureka停更 1.6.代码要实现的内容 二. ...

  4. 创建多模块springcloud应用eureka server和client和消费端demo

    使用环境是 STS + maven 1 创建父级 项目,springcloud-demo1 new -> maven project -> 按照要求进行配置即可.然后删除 src目录,因为 ...

  5. 重新定义SpringCloud-SpringCloud Eureka笔记- Eureka的核心类(二)

    Eureka的核心类. InstanceInfo <instance> <instanceId>CHEND-PC.sen5.sz:sw-user:8006</instan ...

  6. Spring Cloud Eureka 2 (Eureka Server搭建服务注册中心)

    工具:IntelliJ IDEA 2017.1.2 x64.maven3.3.9 打开IDE  file===>new===>project next next 选择相应的依赖 next ...

  7. 【Eureka】eureka的搭建注册(新手搭建,大佬见谅)

    1 创建一个父类 2创建一个maven子项目 SpringCloud-eureka-server 1.1 porm.xml 中加入相关依赖 <dependency>           & ...

  8. Eureka(eureka)服务集群搭建搭建

    编写Eureka Server集群: eureka-server总结 编写Eureka Server集群: 1.在POM文件中引入依赖:<dependency><groupId> ...

  9. SpringCloud Eureka Client和Server侧配置及Eureka高可用配置

    一.Eureka注册中心和客户端配置Demo. 1.Server端 a.使用Idea创建Spring项目,如下所示: b.相关配置 application.yaml配置文件如下: # eureka本身 ...

  10. Eureka Client注册到Eureka Server的秘密

    前言 我们知道Eureka分为两部分,Eureka Server和Eureka Client.Eureka Server充当注册中心的角色,Eureka Client相对于Eureka Server来 ...

最新文章

  1. 连接一切:自媒体的未来是什么??
  2. vue中搜索关键词,使文本标红
  3. [react] 写例子说明React如何在JSX中实现for循环
  4. python课程水平测试成绩查询_学业水平考试成绩查询系统入口
  5. jQuery操作示例
  6. Python输出当前时间
  7. android开发实现静默安装(fota升级)
  8. torch -index_select()、Pytorch 之修改Tensor部分值、pytorch中Tensor的数据类型
  9. 【IDE工具】win10电脑设置保护眼睛色
  10. SVN提交时文件上出现的问号图标是什么
  11. 第一节 细胞是生命活动的基本单位
  12. 从发不起工资到融资1650万,逸创创始人叶翔如何熬过来的?
  13. DICOM世界观●开篇
  14. Deus Ex:人类革命 - 图形研究
  15. STM32F103ZET6+TJA1050 HAL CAN通讯笔记
  16. 二元非洲秃鹫优化算法附Matlab代码
  17. 2016.1云南之旅
  18. 手机录制连续点赞并周期执行(免代码)
  19. Contest3410 - 2022大中小学生联合训练第五场
  20. 【javaweb简单教程】1.搭建Web环境、初识JSP

热门文章

  1. HTML5之10 __使用 Canvas API创建 热点图
  2. 一、python快速入门(每个知识点后包含练习)
  3. 问卷调查网站制作-前后端开发
  4. 使用 javaScript 编写倒计时小程序,到时提交表单
  5. java计算机毕业设计乒乓球俱乐部管理源码+系统+数据库+lw文档+mybatis+运行部署
  6. shell 脚本生成的文件名出现? 或者^M解决办法
  7. Linux环境准备五---VMWare打开CentOS虚拟机报错VT(长模式不兼容)等错误的解决方案
  8. WPF真入门教程02--新建WPF工程
  9. css之透明度和外发光
  10. 对[我所认识的BIOS]系列 -- CPU的第一条指令 一文扩充(III):从源代码到 FFS 文件