本文来分析下Eureka Server启动源码

文章目录

  • 概述
  • 源码解析
    • 服务同步
    • 服务剔除
  • start包配置
  • 启动源码分析
    • EurekaServerAutoConfiguration
    • EurekaServerInitializerConfiguration
    • EurekaServerBootstrap
    • PeerEurekaNodes
    • PeerEurekaNodes
  • 本文小结

概述

Eureka server启动流程图和源码分析 流程图 源码解析 由流程图看出,Eureka server启动时会做两件事 1.服务同步 因为eureka服务可以搭建集群,所以每个服务节点启动时,会从相邻的eureka节点中同步实例数据

流程图


源码解析

由流程图看出,Eureka server启动时会做两件事


服务同步

因为eureka服务可以搭建集群,所以每个服务节点启动时,会从相邻的eureka节点中同步实例数据,源码如下:

@Override
public int syncUp() { // Copy entire entry from neighboring DS node int count = 0; //当我们配置register-with-eureka: true时候,getRegistrySyncRetries的返回值为5 for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {if (i > 0) { try { //从第二次开始,休眠30秒 Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); } catch (InterruptedException e) { logger.warn("Interrupted during registry transfer.."); break; }} //eurekaClient是当前节点的最近一个节点,在启动时会初始化 Applications apps = eurekaClient.getApplications(); //循环节点上的实例 for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) {try { if (isRegisterable(instance)) { //将实例同步到本地内存 register(instance, instance.getLeaseInfo().getDurationInSecs(), true);count++;}} catch (Throwable t) { logger.error("During DS init copy", t); } } }
}
return count; }

当然,若我们的eureka服务为单机的话,可以设置以下配置

eureka:client: fetch-registry: false   #是否把eureka服务当成client注册到配置中心 register-with-eureka: false  #eureka服务是否需要读取配置中心的实例

服务剔除

当完成服务同步后,eureka会开启服务剔除的定时任务,源码如下:

protected void postInit() { renewsLastMin.start(); if (evictionTaskRef.get() != null) { evictionTaskRef.get().cancel(); } evictionTaskRef.set(new EvictionTask()); //执行java Timer 定时器任务 evictionTimer.schedule(evictionTaskRef.get(), //60秒后执行任务,可配置 serverConfig.getEvictionIntervalTimerInMs(), //每次执行完任务后,间隔60秒再执行一次serverConfig.getEvictionIntervalTimerInMs());
}

EvictionTask实现了Runnable方法,其run方法会调用evict方法,evict方法源码如下

//additionalLeaseMs为当前时间
public void evict(long additionalLeaseMs) { logger.debug("Running the evict task"); if (!isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); return; } // We collect first all expired items, to evict them in random order. For large eviction sets, // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it, // the impact should be evenly distributed across all applications. List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>(); //循环实例 for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) { Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue(); if (leaseMap != null) { for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {Lease<InstanceInfo> lease = leaseEntry.getValue(); //如果该实例最后一次心跳续约记录日期与当前时间间隔大于90秒,则add到List,后续执行剔除操作 if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) { expiredLeases.add(lease); }}}} // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for // triggering self-preservation. Without that we would wipe out full registry. //设置最大剔除数量 int registrySize = (int) getLocalRegistrySize();int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold()); int evictionLimit = registrySize - registrySizeThreshold; int toEvict = Math.min(expiredLeases.size(), evictionLimit); if (toEvict > 0) { logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit); //随机剔除实例 Random random = new Random(System.currentTimeMillis()); for (int i = 0; i < toEvict; i++) { // Pick a random item (Knuth shuffle algorithm) int next = i + random.nextInt(expiredLeases.size() - i);Collections.swap(expiredLeases, i, next);Lease<InstanceInfo> lease = expiredLeases.get(i); String appName = lease.getHolder().getAppName();String id = lease.getHolder().getId();EXPIRED.increment(); logger.warn("DS: Registry: expired lease for {}/{}", appName, id);internalCancel(appName, id, false); }}
}

start包配置

spring-cloud-netflix-eureka-server包结构,在这个包下面META-INF下面有个spring.factories这个配置文件

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

使用了springboot EnableAutoConfiguration这个注解,在springboot应用启动的时候,会自动加载EurekaServerAutoConfiguration这个bean,后面主要看这个bean的源码


启动源码分析

EurekaServerAutoConfiguration

用于EurekaServer往beanfactory添加相关eureka-server功能bean

@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {// 此处省略大部分代码,仅抽取一些关键的代码片段// 加载EurekaController, spring-cloud 提供了一些额外的接口,用来获取eurekaServer的信息@Bean@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)public EurekaController eurekaController() {return new EurekaController(this.applicationInfoManager);}// 配置服务节点信息,这里的作用主要是为了配置Eureka的peer节点,也就是说当有收到有节点注册上来//的时候,需要通知给那些服务节点, (互为一个集群)@Bean@ConditionalOnMissingBeanpublic PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,ServerCodecs serverCodecs) {return new PeerEurekaNodes(registry, this.eurekaServerConfig,this.eurekaClientConfig, serverCodecs, this.applicationInfoManager);}// EurekaServer的上下文@Beanpublic EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,registry, peerEurekaNodes, this.applicationInfoManager);}// 这个类的作用是spring-cloud和原生eureka的胶水代码,通过这个类来启动EurekaSever// 后面这个类会在EurekaServerInitializerConfiguration被调用,进行eureka启动@Beanpublic EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,EurekaServerContext serverContext) {return new EurekaServerBootstrap(this.applicationInfoManager,this.eurekaClientConfig, this.eurekaServerConfig, registry,serverContext);}
// 配置拦截器,ServletContainer里面实现了jersey框架,通过他来实现eurekaServer对外的restFull接口
@Bean
public FilterRegistrationBean jerseyFilterRegistration(javax.ws.rs.core.Application eurekaJerseyApp) {FilterRegistrationBean bean = new FilterRegistrationBean();bean.setFilter(new ServletContainer(eurekaJerseyApp));bean.setOrder(Ordered.LOWEST_PRECEDENCE);bean.setUrlPatterns(Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));return bean;
}// 拦截器实例
@Bean
public javax.ws.rs.core.Application jerseyApplication(Environment environment,ResourceLoader resourceLoader) {ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false, environment);// Filter to include only classes that have a particular annotation.//provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));// Find classes in Eureka packages (or subpackages)//Set<Class<?>> classes = new HashSet<Class<?>>();for (String basePackage : EUREKA_PACKAGES) {Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);for (BeanDefinition bd : beans) {Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(),resourceLoader.getClassLoader());classes.add(cls);}}// Construct the Jersey ResourceConfig//Map<String, Object> propsAndFeatures = new HashMap<String, Object>();propsAndFeatures.put(// Skip static content used by the webappServletContainer.PROPERTY_WEB_PAGE_CONTENT_REGEX,EurekaConstants.DEFAULT_PREFIX + "/(fonts|images|css|js)/.*");DefaultResourceConfig rc = new DefaultResourceConfig(classes);rc.setPropertiesAndFeatures(propsAndFeatures);return rc;
}
}

EurekaServerAutoConfiguration

  1. @Configuration 表明这是一个配置类
  2. @Import(EurekaServerInitializerConfiguration.class) 导入启动EurekaServer的bean
  3. @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) 这个是表示只有在spring容器里面含有Market这个实例的时候,才会加载当前这个Bean(EurekaServerAutoConfiguration ),这个就是控制是否开启EurekaServer的关键,在@EableEurekaServer这个注解里面,就是创建了一个Market兑现,用来告诉他,我开启了Eureka服务
  4. @EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class })
  5. @PropertySource(“classpath:/eureka/server.properties”) 加载配置文件。

EurekaServerInitializerConfiguration

从上面的代码分析上可以看到,在EurekaServerAutoConfiguration加载完成之后就会执行

EurekaServerInitializerConfiguration这个类的start方法, 这个类实现了spring的SmartLifecyl,后续会开个单章介绍这个类

/*** @author Dave Syer*/
@Configuration
@CommonsLog
public class EurekaServerInitializerConfigurationimplements ServletContextAware, SmartLifecycle, Ordered {@Autowiredprivate EurekaServerConfig eurekaServerConfig;private ServletContext servletContext;@Autowiredprivate ApplicationContext applicationContext;@Autowiredprivate EurekaServerBootstrap eurekaServerBootstrap;private boolean running;private int order = 1;@Overridepublic void setServletContext(ServletContext servletContext) {this.servletContext = servletContext;}@Overridepublic void start() {// 启动一个线程new Thread(new Runnable() {@Overridepublic void run() {try {//初始化EurekaServer,同时启动Eureka Server ,后面着重讲这里eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);log.info("Started Eureka Server");// 发布EurekaServer的注册事件publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));// 设置启动的状态为trueEurekaServerInitializerConfiguration.this.running = true;// 发送Eureka Start 事件 , 其他还有各种事件,我们可以监听这种时间,然后做一些特定的业务需求,后面会讲到。publish(new EurekaServerStartedEvent(getEurekaServerConfig()));}catch (Exception ex) {// Help!log.error("Could not initialize Eureka servlet context", ex);}}}).start();}private EurekaServerConfig getEurekaServerConfig() {return this.eurekaServerConfig;}private void publish(ApplicationEvent event) {this.applicationContext.publishEvent(event);}@Overridepublic void stop() {this.running = false;eurekaServerBootstrap.contextDestroyed(this.servletContext);}@Overridepublic boolean isRunning() {return this.running;}@Overridepublic int getPhase() {return 0;}@Overridepublic boolean isAutoStartup() {return true;}@Overridepublic void stop(Runnable callback) {callback.run();}@Overridepublic int getOrder() {return this.order;}
}

EurekaServerBootstrap

EurekaServerBootstrap

public void contextInitialized(ServletContext context) {try {// 初始化Eureka的环境变量initEurekaEnvironment();// 初始化Eureka的上下文initEurekaServerContext();context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);}catch (Throwable e) {log.error("Cannot bootstrap eureka server :", e);throw new RuntimeException("Cannot bootstrap eureka server :", e);}
}protected void initEurekaEnvironment() throws Exception {log.info("Setting the eureka configuration..");String dataCenter = ConfigurationManager.getConfigInstance().getString(EUREKA_DATACENTER);if (dataCenter == null) {log.info("Eureka data center value eureka.datacenter is not set, defaulting to default");ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);}else {ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);}String environment = ConfigurationManager.getConfigInstance().getString(EUREKA_ENVIRONMENT);if (environment == null) {ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);log.info("Eureka environment value eureka.environment is not set, defaulting to test");}else {ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);}
}protected void initEurekaServerContext() throws Exception {// For backward compatibilityJsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),XStream.PRIORITY_VERY_HIGH);XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),XStream.PRIORITY_VERY_HIGH);if (isAws(this.applicationInfoManager.getInfo())) {this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,this.eurekaClientConfig, this.registry, this.applicationInfoManager);this.awsBinder.start();}//初始化eureka server上下文EurekaServerContextHolder.initialize(this.serverContext);log.info("Initialized server context");// Copy registry from neighboring eureka node// 从相邻的eureka节点复制注册表 int registryCount = this.registry.syncUp();// 默认每30秒发送心跳,1分钟就是2次// 修改eureka状态为up // 同时,这里面会开启一个定时任务,用于清理 60秒没有心跳的客户端。自动下线this.registry.openForTraffic(this.applicationInfoManager, registryCount);// Register all monitoring statistics.EurekaMonitors.registerAllStats();
}
public void contextDestroyed(ServletContext context) {try {log.info("Shutting down Eureka Server..");context.removeAttribute(EurekaServerContext.class.getName());destroyEurekaServerContext();destroyEurekaEnvironment();}catch (Throwable e) {log.error("Error shutting down eureka", e);}log.info("Eureka Service is now shutdown...");
}
PeerAwareInstanceRegistryImpl@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {// Renewals happen every 30 seconds and for a minute it should be a factor of 2.// 计算每分钟最大续约数this.expectedNumberOfRenewsPerMin = count * 2;// 每分钟最小续约数this.numberOfRenewsPerMinThreshold =(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());logger.info("Got " + count + " instances from neighboring DS node");logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold);this.startupTime = System.currentTimeMillis();if (count > 0) {this.peerInstancesTransferEmptyOnStartup = false;}DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();boolean isAws = Name.Amazon == selfName;if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {logger.info("Priming AWS connections for all replicas..");primeAwsReplicas(applicationInfoManager);}logger.info("Changing status to UP");// 设置实例的状态为UPapplicationInfoManager.setInstanceStatus(InstanceStatus.UP);// 开启定时任务,默认60秒执行一次,用于清理60秒之内没有续约的实例super.postInit();
}

PeerEurekaNodes

从上面的EurekaServerAutoConfiguration类,我们可以看到有个初始化EurekaServerContext的方法

@Bean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,registry, peerEurekaNodes, this.applicationInfoManager);
}

DefaultEurekaServerContext 这个类里面的的initialize()方法是被@PostConstruct 这个注解修饰的,在应用加载的时候,会执行这个方法

public void initialize() throws Exception {logger.info("Initializing ...");// 启动一个线程,读取其他集群节点的信息,后面后续复制peerEurekaNodes.start();//registry.init(peerEurekaNodes);logger.info("Initialized");
}

peerEurekaNodes.start()主要是启动一个只拥有一个线程的线程池,第一次进去会更新一下集群其他节点信息
然后启动了一个定时线程,每60秒更新一次,也就是说后续可以根据配置动态的修改节点配置。(原生的spring cloud config支持)


PeerEurekaNodes

PeerEurekaNodes

public void start() {taskExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");thread.setDaemon(true);return thread;}});try {// 首次进来,更新集群节点信息updatePeerEurekaNodes(resolvePeerUrls());// 搞个线程Runnable peersUpdateTask = new Runnable() {@Overridepublic void run() {try {updatePeerEurekaNodes(resolvePeerUrls());} catch (Throwable e) {logger.error("Cannot update the replica Nodes", e);}}};taskExecutor.scheduleWithFixedDelay(peersUpdateTask,serverConfig.getPeerEurekaNodesUpdateIntervalMs(),serverConfig.getPeerEurekaNodesUpdateIntervalMs(),TimeUnit.MILLISECONDS);} catch (Exception e) {throw new IllegalStateException(e);}for (PeerEurekaNode node : peerEurekaNodes) {logger.info("Replica node URL:  " + node.getServiceUrl());}
}// 根据URL 构建PeerEurekaNode信息
protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);String targetHost = hostFromUrl(peerEurekaNodeUrl);if (targetHost == null) {targetHost = "host";}return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
}

registry.init(peerEurekaNodes);这里面使用的是PeerAwareInstanceRegistryImpl , 注册信息管理类里面的init方法

  • 构建一个非空的ResponseCache
  • 启动一个定时器,更新eureka的阈值 ,默认每15分钟执行一次 , 这个阈值主要是用来判断是否开启自我保护机制 , 后面会单独开篇幅来讲解
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {this.numberOfReplicationsLastMin.start();this.peerEurekaNodes = peerEurekaNodes;initializedResponseCache();scheduleRenewalThresholdUpdateTask();initRemoteRegionRegistry();try {Monitors.registerObject(this);} catch (Throwable e) {logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);}
}

至此,EurekaServer算是启动完毕 , EurekaServerBootstrap是完全复制了原生EurekaBootstrap的代码, 因为原生的Eureka是在servlet应用,但是spring-cloud的应用是运行在内嵌的Tomcat等WEB服务器里面的,这里就是使用EurekaServerBootstrap来做替换,最终是Eureka能够在springboot中使用。


本文小结

本文详细介绍了Eureka Server启动源码。

Eureka Server启动源码分析相关推荐

  1. 【Android 启动过程】Activity 启动源码分析 ( ActivityThread 流程分析 二 )

    文章目录 前言 一.ActivityManagerService.attachApplicationLocked 二.ActivityStackSupervisor.attachApplication ...

  2. 【Android 启动过程】Activity 启动源码分析 ( ActivityThread -> Activity、主线程阶段 一 )

    文章目录 前言 一.ClientTransactionHandler.scheduleTransaction 二.ActivityThread.H 处理 EXECUTE_TRANSACTION 消息 ...

  3. 【Android 启动过程】Activity 启动源码分析 ( AMS -> ActivityThread、AMS 线程阶段 二 )

    文章目录 前言 一.热启动与冷启动选择 二.AMS 进程中执行的相关操作 三.通过 Binder 机制转到 ActivityThread 中执行的操作 总结 前言 上一篇博客 [Android 启动过 ...

  4. 【Android 启动过程】Activity 启动源码分析 ( AMS -> ActivityThread、AMS 线程阶段 )

    文章目录 一.Activity 启动源码分析 ( AMS | ActivityManagerService ) 1.Instrumentation 调用 AMS 方法 2.ActivityStarte ...

  5. Android 9 (P) Zygote进程启动源码分析指南二

         Android 9 Zygote进程启动源码分析指南二 Android 9 (P) 系统启动及进程创建源码分析目录: Android 9 (P)之init进程启动源码分析指南之一 Andro ...

  6. 【Android 启动过程】Activity 启动源码分析 ( ActivityThread -> Activity、主线程阶段 二 )

    文章目录 前言 一.ActivityThread 类 handleLaunchActivity -> performLaunchActivity 方法 二.Instrumentation.new ...

  7. 【Android 启动过程】Activity 启动源码分析 ( Activity -> AMS、主线程阶段 )

    文章目录 一.Activity 启动源码分析 ( Activity -> AMS 阶段 ) 一.Activity 启动源码分析 ( Activity -> AMS 阶段 ) 调用 star ...

  8. Android 9(P)之init进程启动源码分析指南之一

         Android 9 之init进程启动源码分析指南之一 Android 9 (P) 系统启动及进程创建源码分析目录: Android 9 (P)之init进程启动源码分析指南之一 Andro ...

  9. springboot启动源码分析3-环境配置

    applyInitializersSpringBoot启动源码分析3--环境配置 springboot启动源码分析1--初步初始化 springboot启动源码分析2--run方法分析 springb ...

最新文章

  1. java 线程的创建和执行_线程管理(一)线程的创建和运行
  2. docker基础---数据卷volumes
  3. 【下载】RSA1024及RSA2048加密算法漏洞CVE-2017-7526 问题出在GnuPG加密库
  4. 创建简单的xslt transformation
  5. 计算机操作系统思维导图_我在b站学计算机
  6. 浅谈 UNIX、Linux、ios、android 他们之间的关系
  7. EF 实现自定义数据库字符串
  8. winform能连MySQL吗_c# winform中怎么连接mysql
  9. 8-18-Exercise
  10. 如何在Kaggle 首战中进入前 10%
  11. 爬取人力资源社保局咨询问题
  12. 【引】Version Control System - SVN - Developing and Deploying with Branches
  13. 突破”子网隔离”***C段
  14. 自学python-python自学难吗
  15. java基于JSP+Servlet的员工绩效考核系统
  16. QGC使用国内天地图卫星图并添加注记图层
  17. 硬件工程师的面试问题
  18. 用excel表格解线性方程组
  19. NOIP中的数学--第6课 排列与组合
  20. matlab版大学物理学,MATLAB可视化大学物理学(第2版)

热门文章

  1. HDU(1175),连连看,BFS
  2. kali实战-被动信息收集
  3. 我的Python成长之路---第六天---Python基础(20)---2016年2月20日(晴)
  4. 首届CSS开发者大会|七牛助力前端开发
  5. 从页面底部向上弹出dialog,消失时逐渐向下(转)
  6. C++ static、const对象声明与定义 问题来源?
  7. ASP.Net 数据绑定之-----选择合适的数据控件
  8. Wolf QOS 教程
  9. 一些SAP Partners能够通过二次开发实现打通C/4HANA和S/4HANA的方法介绍
  10. 老项目引入masonry后报错unrecognized selector sent to instance