Eureka(一): Eureka Client
简介
Eureka是一种基于REST(Representational State Transfer)的服务,主要用于AWS云,用于定位服务,以实现中间层服务器的负载平衡和故障转移。我们将此服务称为Eureka Server。Eureka还附带了一个基于Java的客户端组件Eureka Client,它使与服务的交互变得更加容易。客户端还有一个内置的负载均衡器,可以进行基本的循环负载均衡。在Netflix,一个更复杂的负载均衡器包含Eureka基于流量,资源使用,错误条件等多种因素提供加权负载平衡,以提供卓越的弹性。
先看一张 github 上 Netflix Eureka 的一架构图,如下:
从图可以看出在这个体系中,有2个角色,即Eureka Server和Eureka Client。而Eureka Client又分为Applicaton Service和Application Client,即服务提供者何服务消费者。 每个区域有一个Eureka集群,并且每个区域至少有一个eureka服务器可以处理区域故障,以防服务器瘫痪。
Eureka Client 在 Eureka Server 注册,然后Eureka Client 每30秒向 Eureka Server 发送一次心跳来更新一次租约。如果 Eureka Client 无法续订租约几次,则会在大约90秒内 Eureka Server 将其从服务器注册表中删除。注册信息和续订将复制到群集中的所有 Eureka Server 节点。来自任何区域的客户端都可以查找注册表信息(每30秒发生一次)根据这些注册表信息,Application Client 可以远程调用 Applicaton Service 来消费服务。
源码分析
基于Spring Cloud的 eureka 的 client 端在启动类上加上 @EnableDiscoveryClient 注解,就可以 用 NetFlix 提供的 Eureka client。下面就以 @EnableDiscoveryClient 为入口,进行Eureka Client的源码分析。
@EnableDiscoveryClient,通过源码可以发现这是一个标记注解:
/*** Annotation to enable a DiscoveryClient implementation.* @author Spencer Gibb*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {boolean autoRegister() default true;
}
通过注释可以知道 @EnableDiscoveryClient 注解是用来 启用 DiscoveryClient 的实现,DiscoveryClient接口代码如下:
public interface DiscoveryClient {String description();List<ServiceInstance> getInstances(String serviceId);List<String> getServices();}
接口说明:
- description():实现描述。
- getInstances(String serviceId):获取与特定serviceId关联的所有ServiceInstance
- getServices():返回所有已知的服务ID
DiscoveryClient 接口的实现结构图:
- EurekaDiscoveryClient:Eureka 的 DiscoveryClient 实现类。
- CompositeDiscoveryClient:用于排序可用客户端的发现客户端的顺序。
- NoopDiscoveryClient:什么都不做的服务发现实现类,已经被废弃。
- SimpleDiscoveryClient:简单的服务发现实现类 SimpleDiscoveryClient,具体的服务实例从 SimpleDiscoveryProperties 配置中获取。
EurekaDiscoveryClient 是 Eureka 对 DiscoveryClient接口的实现,代码如下:
public class EurekaDiscoveryClient implements DiscoveryClient {public static final String DESCRIPTION = "Spring Cloud Eureka Discovery Client";private final EurekaInstanceConfig config;private final EurekaClient eurekaClient;public EurekaDiscoveryClient(EurekaInstanceConfig config, EurekaClient eurekaClient) {this.config = config;this.eurekaClient = eurekaClient;}@Overridepublic String description() {return DESCRIPTION;}@Overridepublic List<ServiceInstance> getInstances(String serviceId) {List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId,false);List<ServiceInstance> instances = new ArrayList<>();for (InstanceInfo info : infos) {instances.add(new EurekaServiceInstance(info));}return instances;}@Overridepublic List<String> getServices() {Applications applications = this.eurekaClient.getApplications();if (applications == null) {return Collections.emptyList();}List<Application> registered = applications.getRegisteredApplications();List<String> names = new ArrayList<>();for (Application app : registered) {if (app.getInstances().isEmpty()) {continue;}names.add(app.getName().toLowerCase());}return names;}}
从代码可以看出 EurekaDiscoveryClient 实现了 DiscoveryClient 定义的规范接口,真正实现发现服务的是 EurekaClient,下面是 EurekaClient 依赖结构图:
EurekaClient 唯一实现类 DiscoveryClient,DiscoveryClient 的构造方法如下:
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider<BackupRegistry> backupRegistryProvider) {//省略...try {// default size of 2 - 1 each for heartbeat and cacheRefreshscheduler = Executors.newScheduledThreadPool(2,new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-%d").setDaemon(true).build());heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build()); // use direct handoffcacheRefreshExecutor = new ThreadPoolExecutor(1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build()); // use direct handoff//省略...initScheduledTasks();try {Monitors.registerObject(this);} catch (Throwable e) {logger.warn("Cannot register timers", e);}//省略...
}
可以看到这个构造方法里面,主要做了下面几件事:
- 创建了scheduler定时任务的线程池,heartbeatExecutor心跳检查线程池(服务续约),cacheRefreshExecutor(服务获取)
- 然后initScheduledTasks()开启上面三个线程池,往上面3个线程池分别添加相应任务。然后创建了一个instanceInfoReplicator(Runnable任务),然后调用InstanceInfoReplicator.start方法,把这个任务放进上面scheduler定时任务线程池(服务注册并更新)。
服务注册(Registry)
上面说了,initScheduledTasks()方法中调用了InstanceInfoReplicator.start()方法,InstanceInfoReplicator 的 run()方法代码如下:
public void run() {try {discoveryClient.refreshInstanceInfo();Long dirtyTimestamp = instanceInfo.isDirtyWithTime();if (dirtyTimestamp != null) {discoveryClient.register();instanceInfo.unsetIsDirty(dirtyTimestamp);}} catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);} finally {Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}
}
发现 InstanceInfoReplicator的run方法,run方法中会调用DiscoveryClient的register方法。DiscoveryClient 的 register方法 代码如下:
/*** Register with the eureka service by making the appropriate REST call.*/
boolean register() throws Throwable {logger.info(PREFIX + "{}: registering service...", appPathIdentifier);EurekaHttpResponse<Void> httpResponse;try {httpResponse = eurekaTransport.registrationClient.register(instanceInfo);} catch (Exception e) {logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);throw e;}if (logger.isInfoEnabled()) {logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());}return httpResponse.getStatusCode() == 204;
}
最终又经过一系列调用,最终会调用到AbstractJerseyEurekaHttpClient的register方法,代码如下:
public EurekaHttpResponse<Void> register(InstanceInfo info) {String urlPath = "apps/" + info.getAppName();ClientResponse response = null;try {Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();addExtraHeaders(resourceBuilder);response = resourceBuilder.header("Accept-Encoding", "gzip").type(MediaType.APPLICATION_JSON_TYPE).accept(MediaType.APPLICATION_JSON).post(ClientResponse.class, info);return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();} finally {if (logger.isDebugEnabled()) {logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),response == null ? "N/A" : response.getStatus());}if (response != null) {response.close();}}
}
可以看到最终通过http rest请求eureka server端,把应用自身的InstanceInfo实例注册给server端,我们再来完整梳理一下服务注册流程:
Renew服务续约
服务续约和服务注册非常类似,HeartbeatThread 代码如下:
private class HeartbeatThread implements Runnable {public void run() {if (renew()) {//更新最后一次心跳的时间lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();}}
}
// 续约的主方法
boolean renew() {EurekaHttpResponse<InstanceInfo> httpResponse;try {httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());if (httpResponse.getStatusCode() == 404) {REREGISTER_COUNTER.increment();logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());long timestamp = instanceInfo.setIsDirtyWithTime();boolean success = register();if (success) {instanceInfo.unsetIsDirty(timestamp);}return success;}return httpResponse.getStatusCode() == 200;} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);return false;}
}
发送心跳 ,请求eureka server 端 ,如果接口返回值为404,就是说服务不存在,那么重新走注册流程。
如果接口返回值为404,就是说不存在,从来没有注册过,那么重新走注册流程。
服务续约流程如下图:
服务下线cancel
在服务shutdown的时候,需要及时通知服务端把自己剔除,以避免客户端调用已经下线的服务,shutdown()方法代码如下:
public synchronized void shutdown() {if (isShutdown.compareAndSet(false, true)) {logger.info("Shutting down DiscoveryClient ...");if (statusChangeListener != null && applicationInfoManager != null) {applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());}// 关闭各种定时任务// 关闭刷新实例信息/注册的定时任务// 关闭续约(心跳)的定时任务// 关闭获取注册信息的定时任务cancelScheduledTasks();// If APPINFO was registeredif (applicationInfoManager != null&& clientConfig.shouldRegisterWithEureka()&& clientConfig.shouldUnregisterOnShutdown()) {// 更改实例状态,使实例不再接收流量applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);//向EurekaServer端发送下线请求unregister();}if (eurekaTransport != null) {eurekaTransport.shutdown();}heartbeatStalenessMonitor.shutdown();registryStalenessMonitor.shutdown();logger.info("Completed shut down of DiscoveryClient");}
}private void cancelScheduledTasks() {if (instanceInfoReplicator != null) {instanceInfoReplicator.stop();}if (heartbeatExecutor != null) {heartbeatExecutor.shutdownNow();}if (cacheRefreshExecutor != null) {cacheRefreshExecutor.shutdownNow();}if (scheduler != null) {scheduler.shutdownNow();}
}void unregister() {// It can be null if shouldRegisterWithEureka == falseif(eurekaTransport != null && eurekaTransport.registrationClient != null) {try {logger.info("Unregistering ...");EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());} catch (Exception e) {logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);}}
}
先关闭各种定时任务,然后向eureka server 发送服务下线通知。服务下线流程如下图:
参考
https://github.com/Netflix/eureka/wiki
http://yeming.me/2016/12/01/eureka1/
http://blog.didispace.com/springcloud-sourcecode-eureka/
https://www.jianshu.com/p/71a8bdbf03f4
Eureka(一): Eureka Client相关推荐
- 【spring-cloud】Eureka server和client之间的心跳通信
为什么80%的码农都做不了架构师?>>> 启动两个Eureka Client,过了一会,停了其中一个,访问注册中心时,界面上显示了红色粗体警告信息: EMERGENCY! EU ...
- Eureka工作原理(Eureka简介Eureka ServerEureka Client自我保护机制分布式系统中的CAP理论Eureka 工作流程)
一.Eureka简介 Eureka Server(注册中心,相当于zookeeper) Eureka Client: Provider Consumer 多个Eureka就叫集群.集群之间会定时通过r ...
- 什么是Eureka?Eureka能干什么?Eureka怎么用?
目录 一.概念 1.1.什么是服务治理 1.2. 什么是Eureka 1.3. Eureka包含两个组件 1.4. 什么场景使用Eureka 1.5. Eureka停更 1.6.代码要实现的内容 二. ...
- 创建多模块springcloud应用eureka server和client和消费端demo
使用环境是 STS + maven 1 创建父级 项目,springcloud-demo1 new -> maven project -> 按照要求进行配置即可.然后删除 src目录,因为 ...
- 重新定义SpringCloud-SpringCloud Eureka笔记- Eureka的核心类(二)
Eureka的核心类. InstanceInfo <instance> <instanceId>CHEND-PC.sen5.sz:sw-user:8006</instan ...
- Spring Cloud Eureka 2 (Eureka Server搭建服务注册中心)
工具:IntelliJ IDEA 2017.1.2 x64.maven3.3.9 打开IDE file===>new===>project next next 选择相应的依赖 next ...
- 【Eureka】eureka的搭建注册(新手搭建,大佬见谅)
1 创建一个父类 2创建一个maven子项目 SpringCloud-eureka-server 1.1 porm.xml 中加入相关依赖 <dependency> & ...
- Eureka(eureka)服务集群搭建搭建
编写Eureka Server集群: eureka-server总结 编写Eureka Server集群: 1.在POM文件中引入依赖:<dependency><groupId> ...
- SpringCloud Eureka Client和Server侧配置及Eureka高可用配置
一.Eureka注册中心和客户端配置Demo. 1.Server端 a.使用Idea创建Spring项目,如下所示: b.相关配置 application.yaml配置文件如下: # eureka本身 ...
- Eureka Client注册到Eureka Server的秘密
前言 我们知道Eureka分为两部分,Eureka Server和Eureka Client.Eureka Server充当注册中心的角色,Eureka Client相对于Eureka Server来 ...
最新文章
- 连接一切:自媒体的未来是什么??
- vue中搜索关键词,使文本标红
- [react] 写例子说明React如何在JSX中实现for循环
- python课程水平测试成绩查询_学业水平考试成绩查询系统入口
- jQuery操作示例
- Python输出当前时间
- android开发实现静默安装(fota升级)
- torch -index_select()、Pytorch 之修改Tensor部分值、pytorch中Tensor的数据类型
- 【IDE工具】win10电脑设置保护眼睛色
- SVN提交时文件上出现的问号图标是什么
- 第一节 细胞是生命活动的基本单位
- 从发不起工资到融资1650万,逸创创始人叶翔如何熬过来的?
- DICOM世界观●开篇
- Deus Ex:人类革命 - 图形研究
- STM32F103ZET6+TJA1050 HAL CAN通讯笔记
- 二元非洲秃鹫优化算法附Matlab代码
- 2016.1云南之旅
- 手机录制连续点赞并周期执行(免代码)
- Contest3410 - 2022大中小学生联合训练第五场
- 【javaweb简单教程】1.搭建Web环境、初识JSP
热门文章
- HTML5之10 __使用 Canvas API创建 热点图
- 一、python快速入门(每个知识点后包含练习)
- 问卷调查网站制作-前后端开发
- 使用 javaScript 编写倒计时小程序,到时提交表单
- java计算机毕业设计乒乓球俱乐部管理源码+系统+数据库+lw文档+mybatis+运行部署
- shell 脚本生成的文件名出现? 或者^M解决办法
- Linux环境准备五---VMWare打开CentOS虚拟机报错VT(长模式不兼容)等错误的解决方案
- WPF真入门教程02--新建WPF工程
- css之透明度和外发光
- 对[我所认识的BIOS]系列 -- CPU的第一条指令 一文扩充(III):从源代码到 FFS 文件