微服务治理之Eureka--源码浅析
Eureka集群项目介绍
图中表示Eureka集群的整个动作流程。集群内有3个节点、3个应用服务,服务列表使用的是双层Map,第一层key—>服务名,第二层key->实例名,value是实例信息(ip、port、url等)
@EnableEurekaServer 自动配置
第一步:@Bean Marker对象来激活EurekaServerMarkerConfiguration配置类
我们进入到@EnableEurekaServer
注解中看看:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {}
关键代码在@Import(EurekaServerMarkerConfiguration.class)
,来看下源码:
@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {@Beanpublic Marker eurekaServerMarkerBean() {return new Marker();}class Marker {}
}
此时,spring下下文中就有了Marker的实例
第二步:自动配置EurekaServerConfiguration类
在spring-cloud-netflix-eureka-server/META-INF/spring.factories
中配置了一个自动配置的类EurekaServerAutoConfiguration,而想要激活必须要满足条件@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
,也就是必须要有Marker实例,这一个已在上一步完成
配置代码
spring.factories:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
EurekaServerAutoConfiguration 配置类:
@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
...
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
在EurekaServerAutoConfiguration
类中都是注册各种Bean,在类上还有一个关键注解@Import(EurekaServerInitializerConfiguration.class)
,代码如下:
@Configuration(proxyBeanMethods = false)
public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered {private static final Log log = LogFactory.getLog(EurekaServerInitializerConfiguration.class);// 服务端配置@Autowiredprivate EurekaServerConfig eurekaServerConfig;private ServletContext servletContext;@Autowiredprivate ApplicationContext applicationContext;@Autowiredprivate EurekaServerBootstrap eurekaServerBootstrap;private boolean running;private int order = 1;@Overridepublic void setServletContext(ServletContext servletContext) {this.servletContext = servletContext;}@Overridepublic void start() {new Thread(() -> {try {// 上下文初始化eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);log.info("Started Eureka Server");publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));EurekaServerInitializerConfiguration.this.running = true;publish(new EurekaServerStartedEvent(getEurekaServerConfig()));}catch (Exception ex) {// Help!log.error("Could not initialize Eureka servlet context", ex);}}).start();}// EurekaServer配置private EurekaServerConfig getEurekaServerConfig() {return this.eurekaServerConfig;}private void publish(ApplicationEvent event) {this.applicationContext.publishEvent(event);}@Overridepublic void stop() {this.running = false;eurekaServerBootstrap.contextDestroyed(this.servletContext);}@Overridepublic boolean isRunning() {return this.running;}// 获取阶段@Overridepublic int getPhase() {return 0;}// 是否自动启动,这里为true@Overridepublic boolean isAutoStartup() {return true;}@Overridepublic int getOrder() {return this.order;}}
实现的是一个SmartLifecycle
,它又继承Lifecycle
接口,主要方法在start()
,使用一个线程进行启动EurekaServer
第三步:谁来调用EurekaServerInitializerConfiguration.start()方法?
在spring初始化上下文中,主要的方法在AbstractApplicationContext.refresh()
方法来实现各种依赖注入,其中包括生命中周期
public abstract class AbstractApplicationContext extends DefaultResourceLoaderimplements ConfigurableApplicationContext {@Overridepublic void refresh() throws BeansException, IllegalStateException {...finishRefresh();}protected void finishRefresh() {...// 将刷新传播到各个生命周期处理器getLifecycleProcessor().onRefresh();...}
}
LifecycleProcessor
是一个接口,默认的实现是DefaultLifecycleProcessor
类,实现了OnRefresh()
方法
@Overridepublic void onRefresh() {startBeans(true);this.running = true;}private void startBeans(boolean autoStartupOnly) {Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();Map<Integer, LifecycleGroup> phases = new TreeMap<>();lifecycleBeans.forEach((beanName, bean) -> {if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {int phase = getPhase(bean);phases.computeIfAbsent(phase,p -> new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly)).add(beanName, bean);}});if (!phases.isEmpty()) {phases.values().forEach(LifecycleGroup::start);}}
startBeans()方法大致过程如下:
- 获取所有实现Lifecycle接口的Bean
- 筛选实例为SmartLifecycle且isAutoStartup()方法返回true
- 根据getPhase()方法进行分组保存到TreeMap中,key为phase,value为:LifecycleGroup类
- 遍历执行map中每个LifecycleGroup对象的start()方法
- 最终会调用
bean.start()
方法,也就是EurekaServerInitializerConfiguration.start()
到这里,@EnableEurekaServer
基本就完成了它的使命,调用了EurekaServer初始化的代码。
EurekaServer 启动过程
第一步:EurekaServerInitializerConfiguration.start() 方法
@Overridepublic void start() {new Thread(() -> {try {// note 上下文初始化eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);log.info("Started Eureka Server");// 发送可用事件publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));EurekaServerInitializerConfiguration.this.running = true;// 发送启动事件publish(new EurekaServerStartedEvent(getEurekaServerConfig()));}catch (Exception ex) {// Help!log.error("Could not initialize Eureka servlet context", ex);}}).start();}
关键代码是contextInitialized()
方法,EurekaServerBootstrap代码如下:
public void contextInitialized(ServletContext context) {try {initEurekaEnvironment();// note 初始化EurekaServer上下文initEurekaServerContext();context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);}catch (Throwable e) {log.error("Cannot bootstrap eureka server :", e);throw new RuntimeException("Cannot bootstrap eureka server :", e);}}protected void initEurekaServerContext() throws Exception {// For backward compatibilityJsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);// 针对AWS的处理逻辑if (isAws(this.applicationInfoManager.getInfo())) {this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry,this.applicationInfoManager);this.awsBinder.start();}EurekaServerContextHolder.initialize(this.serverContext);log.info("Initialized server context");// 从邻近的eureka节点复制注册表int registryCount = this.registry.syncUp();// 服务剔除逻辑this.registry.openForTraffic(this.applicationInfoManager, registryCount);// 注册所有监控统计信息EurekaMonitors.registerAllStats();}
主要就是对邻近节点的服务列表进行备份、服务剔除逻辑、注册所有监控信息操作
第二步:this.registry.syncUp() 同步eureka节点服务列表
// 从邻近的DS节点复制整个条目public int syncUp() {int count = 0;for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {if (i > 0) {try {Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());} catch (InterruptedException e) {logger.warn("Interrupted during registry transfer..");break;}}// 获得所有节点的应用列表数据Applications apps = eurekaClient.getApplications();for (Application app : apps.getRegisteredApplications()) {for (InstanceInfo instance : app.getInstances()) {try {// 如果可以注册则执行注册操作if (isRegisterable(instance)) {register(instance, instance.getLeaseInfo().getDurationInSecs(), true);count++;}} catch (Throwable t) {logger.error("During DS init copy", t);}}}}return count;}
第三步:服务剔除逻辑 this.registry.openForTraffic(this.applicationInfoManager, registryCount);
调用的是PeerAwareInstanceRegistry
类:
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {// 更新每30秒发生一次this.expectedNumberOfClientsSendingRenews = count;// 期望最小每分钟续租次数updateRenewsPerMinThreshold();logger.info("Got {} instances from neighboring DS node", count);logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);this.startupTime = System.currentTimeMillis();if (count > 0) {this.peerInstancesTransferEmptyOnStartup = false;}DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();// 针对AWS的处理逻辑boolean isAws = Name.Amazon == selfName;if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {logger.info("Priming AWS connections for all replicas..");primeAwsReplicas(applicationInfoManager);}logger.info("Changing status to UP");// 将EurekaSserver实例状态设置为UPapplicationInfoManager.setInstanceStatus(InstanceStatus.UP);// 开启EvictionTask()的schedule计划任务,默认每60秒执行一次剔除操作super.postInit();}
定时检测服务剔除操作task(EvictionTask):
public abstract class AbstractInstanceRegistry implements InstanceRegistry {...protected void postInit() {renewsLastMin.start();if (evictionTaskRef.get() != null) {evictionTaskRef.get().cancel();}evictionTaskRef.set(new EvictionTask());// task:剔除检测任务// delay:首次执行delay// period:执行周期,默认60秒,可以配置evictionTimer.schedule(evictionTaskRef.get(),serverConfig.getEvictionIntervalTimerInMs(),serverConfig.getEvictionIntervalTimerInMs());}
}
任务剔除操作EvictionTask代码:
class EvictionTask extends TimerTask {private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);@Overridepublic void run() {try {long compensationTimeMs = getCompensationTimeMs();logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);// 执行服务剔除操作evict(compensationTimeMs);} catch (Throwable e) {logger.error("Could not run the evict task", e);}}...}// 服务剔除public void evict(long additionalLeaseMs) {...// 超期的实例List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();// 双层Map,第一层key—>服务名,第二层key->实例名for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();if (leaseMap != null) {for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {Lease<InstanceInfo> lease = leaseEntry.getValue();if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {expiredLeases.add(lease);}}}}int registrySize = (int) getLocalRegistrySize();int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());int evictionLimit = registrySize - registrySizeThreshold;int toEvict = Math.min(expiredLeases.size(), evictionLimit);if (toEvict > 0) {Random random = new Random(System.currentTimeMillis());for (int i = 0; i < toEvict; i++) {// Pick a random item (Knuth shuffle algorithm)int next = i + random.nextInt(expiredLeases.size() - i);Collections.swap(expiredLeases, i, next);Lease<InstanceInfo> lease = expiredLeases.get(i);String appName = lease.getHolder().getAppName();String id = lease.getHolder().getId();EXPIRED.increment();// 服务剔除internalCancel(appName, id, false);}}}protected boolean internalCancel(String appName, String id, boolean isReplication) {read.lock();try {// 获取此服务名下所有的实例Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);Lease<InstanceInfo> leaseToCancel = null;if (gMap != null) {// 移除实例leaseToCancel = gMap.remove(id);}...} finally {read.unlock();}...return true;}
至此,Eureka 服务端源码解析完成。
@EnableEurekaClient 自动配置
在EurekaClient中,@EnableEurekaClient
注解是可选的,因为默认会被spring boot自动配置扫描并加载
spring-cloud-netflix-eureka-client/META-INF/spring.factories
:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfigurationorg.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaConfigServerBootstrapConfigurationorg.springframework.boot.Bootstrapper=\
org.springframework.cloud.netflix.eureka.config.EurekaConfigServerBootstrapper
配置比服务端多,关键看EurekaClientAutoConfiguration
,代码如下:
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@ConditionalOnDiscoveryEnabled
@AutoConfigureBefore({ CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = { "org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration","org.springframework.cloud.autoconfigure.RefreshAutoConfiguration","org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration","org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
public class EurekaClientAutoConfiguration {...
}
可以看到,如果要自动配置,必须满足三个condition,分别为:
- @ConditionalOnClass(EurekaClientConfig.class):必须要有EurekaClientConfig类
- @ConditionalOnProperty(value = “eureka.client.enabled”, matchIfMissing = true):就是EnableEurekaClient注解,不配置的话默认为true
- @ConditionalOnDiscoveryEnabled
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @ConditionalOnProperty(value = {"spring.cloud.discovery.enabled"},matchIfMissing = true ) public @interface ConditionalOnDiscoveryEnabled {}
默认为true,可以不用配置
所以默认是所有的Condition都满足,会自动启动EurekaClient服务
总结
对Eureka中涉及到的自动配置原理进行解析,并对EurekaServer端的源码浅析,如果要更加深入可以自行查看源码
微服务治理之Eureka--源码浅析相关推荐
- Java生鲜电商平台-秒杀系统微服务架构设计与源码解析实战
Java生鲜电商平台-秒杀系统微服务架构设计与源码解析实战 Java生鲜电商平台- 什么是秒杀 通俗一点讲就是网络商家为促销等目的组织的网上限时抢购活动 比如说京东秒杀,就是一种定时定量秒杀,在规定 ...
- SpringCloud Gateway微服务网关实战与源码分析-上
概述 定义 Spring Cloud Gateway 官网地址 https://spring.io/projects/spring-cloud-gateway/ 最新版本3.1.3 Spring Cl ...
- 电商平台 高并发 微服务 方案_Java生鲜电商平台-秒杀系统微服务架构设计与源码解析实战...
Java生鲜电商平台- 什么是秒杀 通俗一点讲就是网络商家为促销等目的组织的网上限时抢购活动 比如说京东秒杀,就是一种定时定量秒杀,在规定的时间内,无论商品是否秒杀完毕,该场次的秒杀活动都会结束.这种 ...
- 配置JPA属性 | MSCode微服务平台框架代码源码
配置JPA属性. MSCode微服务平台框架 mscodecloud.com 代码示例 spring.jpa.hibernate.naming.physical-strategy=com.exampl ...
- 【SpringCloud微服务】第3章 服务治理SpringCloudEureka(五)——Eureka源码分析
2.8 Eureka 源码分析 首先,对于服务注册中心.服务提供者.服务消费者这三个主要元素来说,后两者(也就是Eureka客户端)在整个运行机制中是大部分通信行为的主动发起者,而注册中心主要是处 ...
- 微服务发现与注册之Eureka源码分析
作者:陌北有棵树,Java人,架构师社区合伙人! [一]微服务之服务发现概述 关于微服务,近年来可谓是大火,业界也吹刮着一种实践微服务的风潮.本人有幸在去年参与到一个向微服务过渡的产品,再结合自己所学 ...
- 基于JAVA融呗智慧金融微资讯移动平台服务端计算机毕业设计源码+数据库+lw文档+系统+部署
基于JAVA融呗智慧金融微资讯移动平台服务端计算机毕业设计源码+数据库+lw文档+系统+部署 基于JAVA融呗智慧金融微资讯移动平台服务端计算机毕业设计源码+数据库+lw文档+系统+部署 本源码技术栈 ...
- 微服务治理平台的RPC方案实现
文章作者:用友云平台 原文链接:http://blog.51cto.com/14084875/2326311?utm_source=tuicool&utm_medium=referral 复制 ...
- B站价值60亿跨年晚会背后的微服务治理
B站价值60亿跨年晚会背后的微服务治理 大家都知道微服务有两个痛点,一个是如何拆分微服务,微服务的边界怎么划分制定:二是微服务上了规模之后如何管理,因为只要上了规模,任何小小的问题都可能会被放大,最后 ...
最新文章
- ANCOM:找出微生物群落中的差异物种
- 《大数据系统基础》课程实践项目中期答辩顺利举行,清华持续探索大数据人才教育创新之路
- 蓝牙L2CAP剖析(二)
- QT的QScrollArea类的使用
- GitFlow 工作流和Code Review教程
- 预告:大牛现身说法 TensorFlow在工程项目中的应用 | AI 研习社
- ABAP SAPGUI 里使用 F4 value help 选择时间
- 浅谈html的语义化和一些简单优化,html标签语义化
- 跟我一起学WCF(2)——利用.NET Remoting技术开发分布式应用
- 作者:寇纲(1975-),男,博士,西南财经大学工商管理学院教授、博士生导师、执行院长...
- CSV Converter Pro for Mac(CSV数据转换工具)
- DOM事件学习之兼容中文输入法
- 芝麻二维码,安卓和苹果二维码合并和统计工具
- 网易云信赵加雨:极致匠心的技术团队撑起60万开发者
- char类型和varchar的区别和选用
- 学计算机为什么上岗之前要培训,浙江公务员面试模拟题华图解析
- 更换头像的测试点(站在 app 的角度来分析)
- Python基础一(介绍)
- 如何建立一个真实光栅结构的光导
- 程序员最怕的四个字:通宵发布!| 程序员有话说
热门文章
- java作业不能运行_从Windows运行时,YARN作业失败
- android n 支持机型,Android N无缝更新功能不适于现有机型
- matlab两个图共用一个x轴_Matlab Figures (3) —— 叠加作图与多坐标轴
- python中的pyinstaller库_Python(00):PyInstaller库,打包成exe基本介绍
- uva-10887-枚举
- maven 一个简单项目 —— maven权威指南学习笔记(三)
- 修改同一张表的同一个字段的两个不同的值。
- Gentoo rc-update service ‘net.eth0′ does not exist
- SAP HANA中创建时间相关的数据时候需要Variant Schema
- 关于H5跳转到小程序和android的方法