关键词:Eureka    服务发现    服务注册


Spring Cloud是基于Spring Boot做为开发框架的,在Spring Cloud的很多功能实现中,大量使用了Spring Boot的自动注入功能(AutoConfigure)。Spring Cloud中,服务与服务之间的通信使用的是http协议。

Eureka具有服务注册与服务发现,并且可以作为注册中心。Eureka分为两种角色,服务端(Eureka Server)和客户端(Eureka Client)。Eureka服务端可以看做是一个注册中心,只是这个注册中心需要依赖于Spring Boot项目,我们在使用Eureka服务端功能的时候,需要创建一个Spring Boot项目然后引入相关依赖,修改配置,使用注解,然后运行这个Spring Boot项目即可。关于我们项目自己的服务提供者和服务消费者,对Eureka来说,都是Eureka客户端。在我们自己的项目中引入相关依赖,然后使用注册,Eureka客户端功能会自动的将我们的服务注册到Eureka服务端中,并完成我们的服务消费者服务发现的功能。

Eureka服务端对自己来说,也是Eureka客户端。所以在spring-cloud-starter-netflix-eureka-server依赖中还包含了spring-cloud-netflix-eureka-client的内容,并且在Eureka Server自动注入的时候,还会依赖于client端自动注入的bean。

Eureka客户端通过调用http接口与Eureka服务端进行通信,完成服务注册与获取等功能。默认使用Jersey进行http通信。服务提供者使用HTTP POST请求(/eureka/apps/{appName})注册服务。每隔30秒,它必须使用HTTP PUT请求(/eureka/apps/{appName}/{id})刷新其注册,与Eureka服务端保持心跳联系。通过使用HTTP DELETE请求(/eureka/apps/{appName}/{id})或实例注册超时来删除注册。服务消费者可以通过使用HTTP GET请求(/eureka/apps或/eureka/apps/{appName})检索已注册的服务实例。

Eureke客户端调用请求接口:EurekaHttpClientAbstractJerseyEurekaHttpClient及其实现类

Eureke服务端接口资源定义:ApplicationsResourceApplicationResourceInstanceResource及其同一个包下的其他Resource


█ Eureka服务端

角色:注册中心

使用:

(1)创建maven项目,修改pom.xml文件,引入相应的依赖

<properties><java.version>1.8</java.version><!-- Spring Cloud版本 --><spring-cloud.version>Hoxton.RELEASE</spring-cloud.version>
</properties>
<dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>
<dependencies><!-- 添加eureka-server依赖,引入jar包 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-server</artifactId></dependency>
</dependencies>

spring-cloud-starter-netflix-eureka-server依赖中已经包含了Spring Boot的依赖,为了避免版本冲突带来的问题,建议无需再在项目中单独引入Spring Boot依赖。spring-cloud-starter-netflix-eureka-server中包含了如下的依赖:

(2)编写application.yml配置

因为是Spring Boot项目,所以使用yaml文件作为配置文件。

# Eureka作为Spring Boot项目对外提供服务,需要设置端口号,默认为8080
# 因为在Spring Cloud中服务之间通信使用http协议,所以每个项目需要是web项目
server:port: 8899eureka:client:// 是否进行注册服务,当前服务不作为eureka 客户端,不进行服务注册  register-with-eureka: false// 是否获取注册服务列表fetch-registry: false

(3)编写Spring Boot启动类

package myeureka;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
// Spring Boot注解
@SpringBootApplication
// 开启Eureka server功能
@EnableEurekaServer
public class MyApplication {public static void main(String[] args) {SpringApplication.run(MyApplication.class, args);}}

启动的时候,如果出现下图所示的错误,此时,删除本地maven仓库中对应的jar文件即可:

(4)启动成功之后,在浏览器访问:localhost:8899,会看见如下图所示页面。此页面为Eureka服务端的仪表盘,在里面可以看见已经注册的服务提供者的信息,以及Eureka服务集群的情况:

经过上面的几个简单的步骤,就能搭建一个Eureka服务作为注册中心了。

补充:将当前Eureka服务端也作为客户端服务进行注册,修改yml配置:

eureka:client:// 注册服务,默认为true  register-with-eureka: truefetch-registry: falseservice-url:// eureka 服务端的服务地址,请求路径为/eureka/。因为此时的服务端就是本服务自己,所以这里的服务地址就是当前服务  defaultZone: http://localhost:${server.port}/eureka/

重启服务之后,访问:localhost:8899/,此时在服务列表会出现当前服务:

原理:

(1)从@EnableEurekaServer注解开始,通过@Import,在Spring启动的时候会被加载:

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import({EurekaServerMarkerConfiguration.class})
public @interface EnableEurekaServer {
}

(2)EurekaServerMarkerConfiguration就做了一件事情,创建Marker对象,Spring会扫描@Bean注解,将Marker对象注册成Spring bean,加入Spring容器中:

public class EurekaServerMarkerConfiguration {public EurekaServerMarkerConfiguration() {}@Beanpublic EurekaServerMarkerConfiguration.Marker eurekaServerMarkerBean() {return new EurekaServerMarkerConfiguration.Marker();}class Marker {Marker() {}}
}

(3)此时,如果你不熟悉Spring Boot的自动配置功能,可能会一头雾水,就这?熟悉Spring Boot自动配置的同学,应该了解在项目的META-INF文件夹下,如果存在一个文件为spring.factories,里面的内容是键值对形式,键为org.springframework.boot.autoconfigure.EnableAutoConfiguration,这样Spring Booot在启动的时候,会根据value的值,自动完成配置,创建Bean等等。找到spring-cloud-starter-netflix-eureka-server的jar包,可以看见:

spring.factories的内容为下面所示:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

(4)EurekaServerAutoConfiguration

EurekaServerAutoConfiguration类的内容比较多,主要完成的就是多个bean的创建。我们一段段来看:

  • 类上注解:
// Spring会加载EurekaServerInitializerConfiguration
@Import({EurekaServerInitializerConfiguration.class})
// 当Spring 容器中存在Marker类型的bean时,Spring才会加载EurekaServerAutoConfiguration
// 因为在启动类上使用了@EnableEurekaServer注解,所以Spring容器中会存在Merker类型的bean
@ConditionalOnBean({Marker.class})
// 配置的加载,将yml中的配置,设置到EurekaDashboardProperties、InstanceRegistryProperties两个bean对应的属性上。
@EnableConfigurationProperties({EurekaDashboardProperties.class, InstanceRegistryProperties.class})
// 当前类可以使用类路径下/eureka/server.properties路径下的配置信息
@PropertySource({"classpath:/eureka/server.properties"})
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {......
}
  • 通过@Autowired自动注入相应类型的bean
// ApplicationInfoManager是eureka-client jar包下面的类
// @Singleton
// public class ApplicationInfoManager
@Autowired
private ApplicationInfoManager applicationInfoManager;
@Autowired
private EurekaServerConfig eurekaServerConfig;
@Autowired
private EurekaClientConfig eurekaClientConfig;
@Autowired
private EurekaClient eurekaClient;
@Autowired
private InstanceRegistryProperties instanceRegistryProperties;

其中的ApplicationInfoManagerEurekaClientConfigEurekaClient 是在EurekaClientAutoConfiguration里面完成bean的注册的。EurekaClientAutoConfiguration在spring-cloud-netflix-eureka-client jar包下面。从这里可以看出,Eureka Server依赖于Eureka Client,无法单独使用。

  • 创建仪表盘页面的访问controller,即在使用步骤4中通过浏览器请求看到的页面。可通过eureka.dashboard.enabled=false关闭此功能,默认为true开启:
@Bean
@ConditionalOnProperty(prefix = "eureka.dashboard",name = {"enabled"},matchIfMissing = true
)
public EurekaController eurekaController() {return new EurekaController(this.applicationInfoManager);
}

访问路径可以通过eureka.dashboard.path配置,默认为/,所以我们在浏览器访问:localhost:8899可以获取到页面。若修改配置eureka.dashboard.path=myeureka,则访问路径为:localhost:8899/myeureka:

@Controller
@RequestMapping({"${eureka.dashboard.path:/}"})
public class EurekaController {@Value("${eureka.dashboard.path:/}")private String dashboardPath = "";private ApplicationInfoManager applicationInfoManager;public EurekaController(ApplicationInfoManager applicationInfoManager) {this.applicationInfoManager = applicationInfoManager;}// get请求获取页面信息@RequestMapping(method = {RequestMethod.GET})public String status(HttpServletRequest request, Map<String, Object> model) {// 下面内容的主要工作就是获取相关值放进model中,这样在页面可以获取到 this.populateBase(request, model);this.populateApps(model);StatusInfo statusInfo;try {statusInfo = (new StatusResource()).getStatusInfo();} catch (Exception var5) {statusInfo = Builder.newBuilder().isHealthy(false).build();}model.put("statusInfo", statusInfo);this.populateInstanceInfo(model, statusInfo);this.filterReplicas(model, statusInfo);// 这里返回到类路径下eureka文件夹下面的status视图页面// 这一块是Spring MVC的知识了return "eureka/status";}}

在jar包里面可以找到:

  • 创建EurekaServerConfig类型的bean,配置Eureka服务端:
@Configuration(proxyBeanMethods = false
)
protected static class EurekaServerConfigBeanConfiguration {protected EurekaServerConfigBeanConfiguration() {}@Bean@ConditionalOnMissingBeanpublic EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {// @ConfigurationProperties("eureka.server")// public class EurekaServerConfigBean implements EurekaServerConfig// 在bean创建的时候,会通过eureka.server开头的配置对EurekaServerConfigBean中的属性进行设值EurekaServerConfigBean server = new EurekaServerConfigBean();// 通过配置:eureka.client.register-with-eureka设置。默认为trueif (clientConfig.shouldRegisterWithEureka()) {// 注册失败重试5次server.setRegistrySyncRetries(5);}return server;}
}
  • 创建编码解码器,在网络中数据的传输需要编码和解码,用于Eureka服务端与Eureka客户端数据传输时的编码和解码:
@Bean
public ServerCodecs serverCodecs() {return new EurekaServerAutoConfiguration.CloudServerCodecs(this.eurekaServerConfig);
}
  • 创建Eureka服务的副本的过滤器。用于多个Eureka服务时,即Eureka集群:
@Bean
@ConditionalOnMissingBean
public ReplicationClientAdditionalFilters replicationClientAdditionalFilters() {return new ReplicationClientAdditionalFilters(Collections.emptySet());
}
  • 用于服务注册功能。此服务注册包含了集群中副本的注册以及客户端服务的注册:
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) {this.eurekaClient.getApplications();return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
  • Eureke节点集合:
@Bean
@ConditionalOnMissingBean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs, ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {return new EurekaServerAutoConfiguration.RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.applicationInfoManager, replicationClientAdditionalFilters);
}
  • Eureka服务的上下文:
@Bean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager);
}
  • 创建EurekaServerBootstrap,可以将其看成Eureka服务的大总管。负责Spring boot启动时初始化服务环境和服务上下文,Spring boot关闭时,销毁服务上下文等工作:
@Bean
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) {return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig, registry, serverContext);
}
  • 创建Application,扫描EUREKA_PACKAGES(即"com.netflix.discovery"与"com.netflix.eureka")包下面的@Path、@Provider标注的类,可以理解成Spring扫描@Controller注解的功能,注册请求资源。因为Eureka之间的通信是使用的http,自然要对服务注册、发现服务、心跳服务、查看服务等功能提供http访问资源。这里jersey的功能用法:
@Bean
public Application jerseyApplication(Environment environment, ResourceLoader resourceLoader) {ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false, environment);provider.addIncludeFilter(new AnnotationTypeFilter(Path.class));provider.addIncludeFilter(new AnnotationTypeFilter(Provider.class));Set<Class<?>> classes = new HashSet();String[] var5 = EUREKA_PACKAGES;int var6 = var5.length;for(int var7 = 0; var7 < var6; ++var7) {String basePackage = var5[var7];Set<BeanDefinition> beans = provider.findCandidateComponents(basePackage);Iterator var10 = beans.iterator();while(var10.hasNext()) {BeanDefinition bd = (BeanDefinition)var10.next();Class<?> cls = ClassUtils.resolveClassName(bd.getBeanClassName(), resourceLoader.getClassLoader());classes.add(cls);}}Map<String, Object> propsAndFeatures = new HashMap();propsAndFeatures.put("com.sun.jersey.config.property.WebPageContentRegex", "/eureka/(fonts|images|css|js)/.*");DefaultResourceConfig rc = new DefaultResourceConfig(classes);rc.setPropertiesAndFeatures(propsAndFeatures);return rc;
}

会从两个包中扫描到下面的资源,这些资源定义了Eureka服务端的提供的一些接口,比如服务注册、获取已注册服务列表:

// @Provider注解扫描到的类
com.netflix.discovery.provider.DiscoveryJerseyProvider
// @Path注解扫描到的类
// Eureka服务副本相关的资源接口
com.netflix.eureka.resources.PeerReplicationResource// Eureka客户端相关资源接口
com.netflix.eureka.resources.ApplicationsResource
// Eureka客户端相关资源接口
com.netflix.eureka.resources.ApplicationResource
// Eureka客户端相关资源接口
com.netflix.eureka.resources.InstancesResourcecom.netflix.eureka.resources.ServerInfoResource
com.netflix.eureka.resources.ASGResource
com.netflix.eureka.resources.SecureVIPResource
com.netflix.eureka.resources.VIPResource
com.netflix.eureka.resources.StatusResource
  • 将上面jerseyApplication方法创建的Application设置成过滤器,当请求路径为:/eureka/*时,会被过滤器拦截,进入eurekaJerseyApp的逻辑。参数eurekaJerseyApp在会被Spring自动注入。请求Eureka服务端的接口,路径都需要包含/eureka/,比如/eureka/apps。这样能够被过滤器拦截,做一些处理:
// Application eurekaJerseyApp作为参数,Spring会自动注入上面步骤创建的bean
@Bean
public FilterRegistrationBean<?> jerseyFilterRegistration(Application eurekaJerseyApp) {FilterRegistrationBean<Filter> bean = new FilterRegistrationBean();bean.setFilter(new ServletContainer(eurekaJerseyApp));bean.setOrder(2147483647);bean.setUrlPatterns(Collections.singletonList("/eureka/*"));return bean;
}
  • http请求的链路追踪过滤器,会记录服务之间的请求路径信息,即客户端发起请求,从服务A请求服务B,服务B又请求服务C:
@Bean
@ConditionalOnBean(name = {"httpTraceFilter"}
)
public FilterRegistrationBean<?> traceFilterRegistration(@Qualifier("httpTraceFilter") Filter filter) {FilterRegistrationBean<Filter> bean = new FilterRegistrationBean();bean.setFilter(filter);bean.setOrder(2147483637);return bean;
}

█ Eureka客户端-服务注册

使用:

(1)创建maven项目,修改pom.xml文件:

<dependencies><!--eureka client--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><!-- spring-cloud-starter-netflix-eureka-client不包含spring-boot-starter-web --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
</dependencies><properties><java.version>1.8</java.version><!--Spring Cloud版本--><spring-cloud.version>Hoxton.RELEASE</spring-cloud.version><spring-boot.version>2.2.1.RELEASE</spring-boot.version>
</properties><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>

spring-cloud-starter-netflix-eureka-server依赖中包含了spring-cloud-starter-netflix-eureka-client,其实此处的依赖可以和Eureka服务端引入的依赖一样。这边使用spring-cloud-starter-netflix-eureka-client用于区分功能。spring-cloud-starter-netflix-eureka-client不包含spring-boot-starter-web依赖,需要单独引入。

(2)修改application.yml配置:

# 配置web服务端口
server:port: 8888# 配置应用名称
spring:application:name: myServereureka:client:# 开启注册功能,默认就是为true  register-with-eureka: truefetch-registry: false# Eureka服务端地址,会向此地址注册服务service-url:defaultZone: http://localhost:8899/eureka/

(3)编写启动类,并启动:

@SpringBootApplication
public class MyProviderApplication {public static void main(String[] args) {SpringApplication.run(MyProviderApplication.class, args);}}

(4)访问服务端仪表盘:localhost:8899:

原理:

(1)在spring-cloud-starter-netflix-eureka-client的jar包中可以找到spring.factories:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfigurationorg.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration

(2)EurekaClientAutoConfiguration

此配置类,完成Eureka客户端的一些bean的创建。比如:EurekaClientConfigBean,完成客户端配置的初始化,即eureka.client开头的配置。EurekaInstanceConfigBean,完成Eureka客户端实例的配置初始化,一个服务就是一个Eureka客户端实例。等等:

// 创建Eureka客户端
@Bean(destroyMethod = "shutdown"
)
@ConditionalOnMissingBean(value = {EurekaClient.class},search = SearchStrategy.CURRENT
)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance, @Autowired(required = false) HealthCheckHandler healthCheckHandler) {ApplicationInfoManager appManager;if (AopUtils.isAopProxy(manager)) {appManager = (ApplicationInfoManager)ProxyUtils.getTargetObject(manager);} else {appManager = manager;}// 服务注册从这里开始CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs, this.context);cloudEurekaClient.registerHealthCheck(healthCheckHandler);return cloudEurekaClient;
}// Eureka客户端注册器,实际干活的完成注册的功能。
// 与Dubbo对比的话,就是ServiceConfig
@Bean
public EurekaServiceRegistry eurekaServiceRegistry() {return new EurekaServiceRegistry();
}// 结合Spring boot开始注册功能。
// 与Dubbo对比的话,就是ServiceBean
@Bean
@ConditionalOnBean({AutoServiceRegistrationProperties.class})
@ConditionalOnProperty(value = {"spring.cloud.service-registry.auto-registration.enabled"},matchIfMissing = true
)
public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) {return new EurekaAutoServiceRegistration(context, registry, registration);
}// 当前服务的信息,此时把当前服务提供者看成一个服务注册者的角色
@Bean
@org.springframework.cloud.context.config.annotation.RefreshScope
@ConditionalOnBean({AutoServiceRegistrationProperties.class})
@ConditionalOnProperty(value = {"spring.cloud.service-registry.auto-registration.enabled"},matchIfMissing = true
)
public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager, @Autowired(required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager).with(eurekaClient).with(healthCheckHandler).build();
}
  • CloudEurekaClient

创建EurekaClient类型的bean的时候,通过CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs, this.context):

public CloudEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args, ApplicationEventPublisher publisher) {// 进入父类的构造函数super(applicationInfoManager, config, args);this.cacheRefreshedCount = new AtomicLong(0L);this.eurekaHttpClient = new AtomicReference();this.applicationInfoManager = applicationInfoManager;this.publisher = publisher;this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport");ReflectionUtils.makeAccessible(this.eurekaTransportField);
}

进入CloudEurekaClient父类的构造函数,即DiscoveryClient

public DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {// 进入重载构造函数this(applicationInfoManager, config, args, ResolverUtils::randomize);
}

DiscoveryClient重载的构造函数:

public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {// 还是重载的构造函数this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {private volatile BackupRegistry backupRegistryInstance;public synchronized BackupRegistry get() {......return this.backupRegistryInstance;}}, randomizer);
}

DiscoveryClient又一个重载的构造函数:

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {......// 根据配置eureka.client.register-with-eureka与eureka.client.fetch-registry// 完成不同的逻辑if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {......} else {// 服务注册的话会进入下面的逻辑   try {// 创建几个线程池,比如心跳等// scheduler,线程任务调度器this.scheduler = Executors.newScheduledThreadPool(2, (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-%d").setDaemon(true).build());// 发送心跳的线程池this.heartbeatExecutor = new ThreadPoolExecutor(1, this.clientConfig.getHeartbeatExecutorThreadPoolSize(), 0L, TimeUnit.SECONDS, new SynchronousQueue(), (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build());// 刷新服务消费者本地拉取的已注册服务列表this.cacheRefreshExecutor = new ThreadPoolExecutor(1, this.clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0L, TimeUnit.SECONDS, new SynchronousQueue(), (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build());this.eurekaTransport = new DiscoveryClient.EurekaTransport();......// 进入这里,开启线程任务,注册服务以及定时向Eureka服务端发送心跳this.initScheduledTasks();try {Monitors.registerObject(this);} catch (Throwable var8) {logger.warn("Cannot register timers", var8);}DiscoveryManager.getInstance().setDiscoveryClient(this);DiscoveryManager.getInstance().setEurekaClientConfig(config);this.initTimestampMs = System.currentTimeMillis();logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", this.initTimestampMs, this.getApplications().size());}
}

this.initScheduledTasks:

private void initScheduledTasks() {int renewalIntervalInSecs;int expBackOffBound;// 需要拉取已注册服务if (this.clientConfig.shouldFetchRegistry()) {renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds();expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound();this.scheduler.schedule(new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.CacheRefreshThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS);}// 服务注册与心跳if (this.clientConfig.shouldRegisterWithEureka()) {renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound();logger.info("Starting heartbeat executor: renew interval is: {}", renewalIntervalInSecs);// 开启TimedSupervisorTask线程任务// 注意参数new DiscoveryClient.HeartbeatThread(),作为task// this.scheduler是个定时任务线程池,TimedSupervisorTask具有// 定时向Eureka服务端发送心跳的功能this.scheduler.schedule(new TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.HeartbeatThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS);// instanceInfoReplicator是一个线程类this.instanceInfoReplicator = new InstanceInfoReplicator(this, this.instanceInfo, this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2);// 服务状态监听器this.statusChangeListener = new StatusChangeListener() {public String getId() {return "statusChangeListener";}// 广播服务状态变化的通知public void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN != statusChangeEvent.getStatus() && InstanceStatus.DOWN != statusChangeEvent.getPreviousStatus()) {DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent);} else {DiscoveryClient.logger.warn("Saw local status change event {}", statusChangeEvent);}// 在这里完成服务的注册DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate();}};if (this.clientConfig.shouldOnDemandUpdateStatusChange()) {this.applicationInfoManager.registerStatusChangeListener(this.statusChangeListener);}} else {logger.info("Not registering with Eureka server per configuration");}
}

this.statusChangeListener会完成服务的注册,但在以上的代码中并没有去调用notify方法,所以此时还不会触发服务注册的动作。上面的代码中将statusChangeListener赋值给了applicationInfoManager,可见以后会在applicationInfoManager中完成notify方法的调用。需要查看notify方法在哪里被调用,需要继续查看自动配置类中的其他bean了。

  • EurekaAutoServiceRegistration
public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered, SmartApplicationListener

EurekaAutoServiceRegistration实现了SmartApplicationListener接口,会通过方法onApplicationEvent监听ApplicationEvent事件。onApplicationEvent方法是继承自ApplicationListener,而SmartApplicationListener继承了ApplicationListener:

public void onApplicationEvent(ApplicationEvent event) {// web服务初始化事件,即服务启动的时候if (event instanceof WebServerInitializedEvent) {// 这是EurekaAutoServiceRegistration提供的方法this.onApplicationEvent((WebServerInitializedEvent)event);} // 服务关闭事件else if (event instanceof ContextClosedEvent) {this.onApplicationEvent((ContextClosedEvent)event);}}

this.onApplicationEvent:

// 服务启动时的事件
public void onApplicationEvent(WebServerInitializedEvent event) {String contextName = event.getApplicationContext().getServerNamespace();if (contextName == null || !contextName.equals("management")) {int localPort = event.getWebServer().getPort();// 设置服务端口号,并调用this.start()方法if (this.port.get() == 0) {log.info("Updating port to " + localPort);this.port.compareAndSet(0, localPort);this.start();}}}

this.start:

// 开始注册当前服务
public void start() {if (this.port.get() != 0) {// registration是在创建EurekaAutoServiceRegistration bean的时候// 通过参数自动注入的,为EurekaRegistration// http协议端口if (this.registration.getNonSecurePort() == 0) {this.registration.setNonSecurePort(this.port.get());}// https协议端口if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {this.registration.setSecurePort(this.port.get());}}// 该服务还没被注册,并且服务的端口号大于0,即有效的端口号if (!this.running.get() && this.registration.getNonSecurePort() > 0) {// serviceRegistry是在创建EurekaAutoServiceRegistration bean的时候,// 通过参数自动注入的,为:EurekaServiceRegistry // 注册器注册服务this.serviceRegistry.register(this.registration);// 发布服务注册时间this.context.publishEvent(new InstanceRegisteredEvent(this, this.registration.getInstanceConfig()));this.running.set(true);}}

this.serviceRegistry.register:

public void register(EurekaRegistration reg) {// 前置检查,ApplicationInfoManager、CloudEurekaClient是否已经设置this.maybeInitializeClient(reg);if (log.isInfoEnabled()) {log.info("Registering application " + reg.getApplicationInfoManager().getInfo().getAppName() + " with eureka with status " + reg.getInstanceConfig().getInitialStatus());}// 设置服务状态,初始化为:InstanceStatus.UP// 通过http://localhost:8899访问Eureka服务端仪表盘时,在已注册的服务// 列表可以查看服务的状态信息。在线:UP,掉线:DOWNreg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());// 设置健康检查处理器,发送心跳给Eureka服务端,说明自己服务还可用reg.getHealthCheckHandler().ifAvailable((healthCheckHandler) -> {reg.getEurekaClient().registerHealthCheck(healthCheckHandler);});
}

reg.getApplicationInfoManager().setInstanceStatus,还记得在上面的代码,在DiscoveryClient中,有将statusChangeListener赋值给applicationInfoManager的操作吗?此时在这里调用了setInstanceStatus:

public synchronized void setInstanceStatus(InstanceStatus status) {InstanceStatus next = this.instanceStatusMapper.map(status);if (next != null) {InstanceStatus prev = this.instanceInfo.setStatus(next);if (prev != null) {Iterator var4 = this.listeners.values().iterator();while(var4.hasNext()) {ApplicationInfoManager.StatusChangeListener listener = (ApplicationInfoManager.StatusChangeListener)var4.next();try {// 原来notify方法是在这里调用的啊listener.notify(new StatusChangeEvent(prev, next));} catch (Exception var7) {logger.warn("failed to notify listener: {}", listener.getId(), var7);}}}}
}

找到了notify方法调用的地方,让我们回去看看StatusChangeListener的notify方法,在DiscoveryClient中statusChangeListener是匿名内部类:

this.statusChangeListener = new StatusChangeListener() {public String getId() {return "statusChangeListener";}// 广播服务状态变化的通知public void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN != statusChangeEvent.getStatus() && InstanceStatus.DOWN != statusChangeEvent.getPreviousStatus()) {DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent);} else {DiscoveryClient.logger.warn("Saw local status change event {}", statusChangeEvent);}// 在这里完成服务的注册DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate();}};

this.instanceInfoReplicator.onDemandUpdate。this. instanceInfoReplicator 是通过this.instanceInfoReplicator = new InstanceInfoReplicator(this, this.instanceInfo, this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2);创建的。下面进入InstanceInfoReplicator的onDemandUpdate方法:

public boolean onDemandUpdate() {if (this.rateLimiter.acquire(this.burstSize, (long)this.allowedRatePerMinute)) {if (!this.scheduler.isShutdown()) {this.scheduler.submit(new Runnable() {public void run() {InstanceInfoReplicator.logger.debug("Executing on-demand update of local InstanceInfo");Future latestPeriodic = (Future)InstanceInfoReplicator.this.scheduledPeriodicRef.get();if (latestPeriodic != null && !latestPeriodic.isDone()) {InstanceInfoReplicator.logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");latestPeriodic.cancel(false);}// 进入run方法InstanceInfoReplicator.this.run();}});return true;} else {logger.warn("Ignoring onDemand update due to stopped scheduler");return false;}} else {logger.warn("Ignoring onDemand update due to rate limiter");return false;}
}

run:

public void run() {boolean var6 = false;ScheduledFuture next;label53: {try {var6 = true;this.discoveryClient.refreshInstanceInfo();Long dirtyTimestamp = this.instanceInfo.isDirtyWithTime();if (dirtyTimestamp != null) {// 客户端注册this.discoveryClient.register();this.instanceInfo.unsetIsDirty(dirtyTimestamp);var6 = false;} else {var6 = false;}break label53;} catch (Throwable var7) {logger.warn("There was a problem with the instance info replicator", var7);var6 = false;} finally {if (var6) {ScheduledFuture next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS);this.scheduledPeriodicRef.set(next);}}next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS);this.scheduledPeriodicRef.set(next);return;}next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS);this.scheduledPeriodicRef.set(next);
}

this.discoveryClient.register():

boolean register() throws Throwable {logger.info("DiscoveryClient_{}: registering service...", this.appPathIdentifier);EurekaHttpResponse httpResponse;try {// 向Eureka服务端发送http请求注册服务// 调用的是/eureka/apps/服务名,比如http://localhost:8899/eureka/apps/MYSERVER,请求体是: instanceInfohttpResponse = this.eurekaTransport.registrationClient.register(this.instanceInfo);} catch (Exception var3) {logger.warn("DiscoveryClient_{} - registration failed {}", new Object[]{this.appPathIdentifier, var3.getMessage(), var3});throw var3;}if (logger.isInfoEnabled()) {logger.info("DiscoveryClient_{} - registration status: {}", this.appPathIdentifier, httpResponse.getStatusCode());}return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}

this.eurekaTransport.registrationClient.register,先到SessionedEurekaHttpClient再到RetryableEurekaHttpClient再到RedirectingEurekaHttpClient再到MetricsCollectingEurekaHttpClient再到JerseyApplicationClient,JerseyApplicationClient继承了AbstractJerseyEurekaHttpClient:

public EurekaHttpResponse<Void> register(InstanceInfo info) {// 拼接参数,info.getAppName获取到的就是spring.application.name配置的值的全大写名// urlPath=http://localhost:8899/eureka/apps/MYSERVERString urlPath = "apps/" + info.getAppName();ClientResponse response = null;EurekaHttpResponse var5;try {Builder resourceBuilder = this.jerseyClient.resource(this.serviceUrl).path(urlPath).getRequestBuilder();this.addExtraHeaders(resourceBuilder);// 添加请求头,调用http://localhost:8899/eureka/apps/MYSERVER post请求,请求体为:InstanceInfo response = (ClientResponse)((Builder)((Builder)((Builder)resourceBuilder.header("Accept-Encoding", "gzip")).type(MediaType.APPLICATION_JSON_TYPE)).accept(new String[]{"application/json"})).post(ClientResponse.class, info);var5 = EurekaHttpResponse.anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();} finally {if (logger.isDebugEnabled()) {logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", new Object[]{this.serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()});}if (response != null) {response.close();}}return var5;
}

到这里服务就完成了服务注册功能。

补充:

(1)this.eurekaTransport.registrationClient.register的调用顺序为什么是先到SessionedEurekaHttpClient再到RetryableEurekaHttpClient再到RedirectingEurekaHttpClient再到MetricsCollectingEurekaHttpClient再到JerseyApplicationClient呢?

DiscoveryClient的构造函数中,有:

this.eurekaTransport = new DiscoveryClient.EurekaTransport();
this.scheduleServerEndpointTask(this.eurekaTransport, args);

scheduleServerEndpointTask:

private void scheduleServerEndpointTask(DiscoveryClient.EurekaTransport eurekaTransport, AbstractDiscoveryClientOptionalArgs args) {......TransportClientFactories transportClientFactories = argsTransportClientFactories == null ? new Jersey1TransportClientFactories() : argsTransportClientFactories;......// 调用Jersey1TransportClientFactories的newTransportClientFactory方法// 实际获取到MetricsCollectingEurekaHttpClient.createFactory(jerseyFactory)// MetricsCollectingEurekaHttpClient中持有的clientFactory是jerseyFactory,即JerseyEurekaHttpClientFactoryeurekaTransport.transportClientFactory = providedJerseyClient == null ? ((TransportClientFactories)transportClientFactories).newTransportClientFactory(this.clientConfig, (Collection)additionalFilters, this.applicationInfoManager.getInfo(), sslContext, hostnameVerifier) : ((TransportClientFactories)transportClientFactories).newTransportClientFactory((Collection)additionalFilters, providedJerseyClient);......EurekaHttpClientFactory newQueryClientFactory;EurekaHttpClient newQueryClient;if (this.clientConfig.shouldRegisterWithEureka()) {newQueryClientFactory = null;newQueryClient = null;try {// 创建ClientFactory与Client// eurekaTransport.transportClientFactory就是上面的TransportClientFactories)transportClientFactories).newTransportClientFactory(this.clientConfig, (Collection)additionalFilters, this.applicationInfoManager.getInfo(), sslContext, hostnameVerifier)newQueryClientFactory = EurekaHttpClients.registrationClientFactory(eurekaTransport.bootstrapResolver, eurekaTransport.transportClientFactory, this.transportConfig);newQueryClient = newQueryClientFactory.newClient();} catch (Exception var14) {logger.warn("Transport initialization failure", var14);}// 将ClientFactory与client赋值eurekaTransport.registrationClientFactory = newQueryClientFactory;eurekaTransport.registrationClient = newQueryClient;}
}

EurekaHttpClients.registrationClientFactory:

public static EurekaHttpClientFactory registrationClientFactory(ClusterResolver bootstrapResolver, TransportClientFactory transportClientFactory, EurekaTransportConfig transportConfig) {return canonicalClientFactory("registration", transportConfig, bootstrapResolver, transportClientFactory);
}

canonicalClientFactory:

static EurekaHttpClientFactory canonicalClientFactory(final String name, final EurekaTransportConfig transportConfig, final ClusterResolver<EurekaEndpoint> clusterResolver, final TransportClientFactory transportClientFactory) {return new EurekaHttpClientFactory() {// 返回SessionedEurekaHttpClient,SessionedEurekaHttpClient中的clientFacotry是RetryableEurekaHttpClient// RetryableEurekaHttpClient中的clientFactory是RedirectingEurekaHttpClientpublic EurekaHttpClient newClient() {return new SessionedEurekaHttpClient(name, RetryableEurekaHttpClient.createFactory(name, transportConfig, clusterResolver, RedirectingEurekaHttpClient.createFactory(transportClientFactory), ServerStatusEvaluators.legacyEvaluator()), (long)(transportConfig.getSessionedClientReconnectIntervalSeconds() * 1000));}public void shutdown() {EurekaHttpClients.wrapClosable(clusterResolver).shutdown();}};
},

RetryableEurekaHttpClient.createFactory:

public static EurekaHttpClientFactory createFactory(final String name, final EurekaTransportConfig transportConfig, final ClusterResolver<EurekaEndpoint> clusterResolver, final TransportClientFactory delegateFactory, final ServerStatusEvaluator serverStatusEvaluator) {return new EurekaHttpClientFactory() {// 就是返回RetryableEurekaHttpClient对象public EurekaHttpClient newClient() {return new RetryableEurekaHttpClient(name, transportConfig, clusterResolver, delegateFactory, serverStatusEvaluator, 3);}public void shutdown() {delegateFactory.shutdown();}};
}

RedirectingEurekaHttpClient.createFactory:

public static TransportClientFactory createFactory(final TransportClientFactory delegateFactory) {final DnsServiceImpl dnsService = new DnsServiceImpl();return new TransportClientFactory() {// 返回RedirectingEurekaHttpClient对象public EurekaHttpClient newClient(EurekaEndpoint endpoint) {// RedirectingEurekaHttpClient中持有的clientFactory就是一开始的参数clientFactory,// 即eurekaTransport.transportClientFactory,获取到的是MetricsCollectingEurekaHttpClient.createFactory(jerseyFactory)return new RedirectingEurekaHttpClient(endpoint.getServiceUrl(), delegateFactory, dnsService);}public void shutdown() {delegateFactory.shutdown();}};
}

通过上面的方法可以得到this.eurekaTransport.registrationClient获取到的是SessionedEurekaHttpClient,则调用SessionedEurekaHttpClient的register方法,SessionedEurekaHttpClient继承了EurekaHttpClientDecorator,实际调用的是EurekaHttpClientDecorator中的register方法:

public EurekaHttpResponse<Void> register(final InstanceInfo info) {// this.execute是抽象方法,调用实际的子类对象中的实现方法,则调用SessionedEurekaHttpClientreturn this.execute(new EurekaHttpClientDecorator.RequestExecutor<Void>() {public EurekaHttpResponse<Void> execute(EurekaHttpClient delegate) {// 调用EurekaHttpClient的register方法return delegate.register(info);}public EurekaHttpClientDecorator.RequestType getRequestType() {return EurekaHttpClientDecorator.RequestType.Register;}});
}

this.execute是抽象方法,调用实际的子类对象中的实现方法,则调用SessionedEurekaHttpClient:

protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {......EurekaHttpClient eurekaHttpClient = (EurekaHttpClient)this.eurekaHttpClientRef.get();if (eurekaHttpClient == null) {// 创建EurekaHttpClient,因为SessionedEurekaHttpClient的clientFactory是RetryableEurekaHttpClientFactory,所以// 通过this.clientFactory.newClient()获取到的是RetryableEurekaHttpClienteurekaHttpClient = TransportUtils.getOrSetAnotherClient(this.eurekaHttpClientRef, this.clientFactory.newClient());}// 这里调用的是在EurekaHttpClientDecorator的register中的匿名内部内方法return requestExecutor.execute(eurekaHttpClient);
}

requestExecutor.execute(eurekaHttpClient),会调用this.execute方法中的匿名内部类中的方法逻辑,即delegate.register(info),delegate就是RetryableEurekaHttpClientFactory,则会调用RetryableEurekaHttpClientFactory的register。RetryableEurekaHttpClientFactory也是继承了EurekaHttpClientDecorator,所以会重复上面的步骤。在RetryableEurekaHttpClientFactory获取到RedirectingEurekaHttpClient,调用RedirectingEurekaHttpClient的register方法。RedirectingEurekaHttpClient也是继承了EurekaHttpClientDecorator。进到RedirectingEurekaHttpClient的execute方法:

protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {......// this.factory是MetricsCollectingEurekaHttpClient.createFactory(jerseyFactory)// this.factory.newClient获取到的是MetricsCollectingEurekaHttpClientAtomicReference currentEurekaClientRef = new AtomicReference(this.factory.newClient(this.serviceEndpoint));......EurekaHttpResponse<R> response = this.executeOnNewServer(requestExecutor, currentEurekaClientRef);TransportUtils.shutdown((EurekaHttpClient)this.delegateRef.getAndSet(currentEurekaClientRef.get()));
}

executeOnNewServer:

private <R> EurekaHttpResponse<R> executeOnNewServer(RequestExecutor<R> requestExecutor, AtomicReference<EurekaHttpClient> currentHttpClientRef) {URI targetUrl = null;for(int followRedirectCount = 0; followRedirectCount < 10; ++followRedirectCount) {// currentHttpClientRef.get()获取到MetricsCollectingEurekaHttpClient// 还是执行匿名函数中的逻辑EurekaHttpResponse<R> httpResponse = requestExecutor.execute((EurekaHttpClient)currentHttpClientRef.get());............}......
}

MetricsCollectingEurekaHttpClient.register,再到MetricsCollectingEurekaHttpClient.execute。MetricsCollectingEurekaHttpClient中持有的clientFactory是JerseyEurekaHttpClientFactory,获取到JerseyApplicationClient,调用JerseyApplicationClient的register方法:

public EurekaHttpResponse<Void> register(InstanceInfo info) {String urlPath = "apps/" + info.getAppName();ClientResponse response = null;EurekaHttpResponse var5;try {Builder resourceBuilder = this.jerseyClient.resource(this.serviceUrl).path(urlPath).getRequestBuilder();this.addExtraHeaders(resourceBuilder);response = (ClientResponse)((Builder)((Builder)((Builder)resourceBuilder.header("Accept-Encoding", "gzip")).type(MediaType.APPLICATION_JSON_TYPE)).accept(new String[]{"application/json"})).post(ClientResponse.class, info);var5 = EurekaHttpResponse.anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();} finally {if (logger.isDebugEnabled()) {logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", new Object[]{this.serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()});}if (response != null) {response.close();}}return var5;
}

(2)心跳服务:

new TimedSupervisorTask:

public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor, int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {// 参数scheduler就是 this.scheduler = Executors.newScheduledThreadPool(2, (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-%d").setDaemon(true).build());this.scheduler = scheduler;// 参数executor就是this.heartbeatExecutor = new ThreadPoolExecutor(1, this.clientConfig.getHeartbeatExecutorThreadPoolSize(), 0L, TimeUnit.SECONDS, new SynchronousQueue(), (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build());this.executor = executor;// task参数是new DiscoveryClient.HeartbeatThread()this.task = task;......
}

因为TimedSupervisorTask是一个线程类,线程启动后,运行的是run方法里面的逻辑:

public void run() {Future future = null;try {// 线程池运行this.task任务,即new DiscoveryClient.HeartbeatThread()future = this.executor.submit(this.task);......} ......}

new DiscoveryClient.HeartbeatThread()也是一个线程类,逻辑也要看run方法:

public void run() {// renew方法发送心跳if (DiscoveryClient.this.renew()) {DiscoveryClient.this.lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();}}

renew:

boolean renew() {try {// 通过httpClient发送请求给Eureka服务端注册服务// 调用的是/eureka/peerreplication/batch接口,post请求,action=HeartbeatEurekaHttpResponse<InstanceInfo> httpResponse = this.eurekaTransport.registrationClient.sendHeartBeat(this.instanceInfo.getAppName(), this.instanceInfo.getId(), this.instanceInfo, (InstanceStatus)null);......} catch (Throwable var5) {logger.error("DiscoveryClient_{} - was unable to send heartbeat!", this.appPathIdentifier, var5);return false;}
}

█ Eureka客户端-服务发现

使用:

(1)创建maven项目,引入与上面服务注册时服务端一样的依赖。

(2)修改application.yml配置:

# 如果注册中心、服务提供者都在一台电脑上,服务的端口号要不一样
server:port: 8889eureka:client:# 服务消费者需要获取已注册的服务提供者的fetch-registry: true# 服务消费者就不需要注册到Eureka注册中心了register-with-eureka: false# Eureka注册中心地址,客户端需要去此地址获取服务列表service-url:defaultZone: http://localhost:8899/eureka/

(3)编写controller:

package controller;import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.shared.Application;
import com.netflix.discovery.shared.Applications;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.ArrayList;
import java.util.List;@RestController
public class ClientController {// 这里的是com.netflix.discovery.EurekaClient@Autowiredprivate EurekaClient eurekaClient;// 或者使用org.springframework.cloud.client.discovery.DiscoveryClient// @Autowired// private DiscoveryClient discoveryClient;@GetMapping("/get")public String getServer() {List<String> serverList = new ArrayList<>();// 获取服务列表Applications applications = eurekaClient.getApplications();List<Application> registeredApplications = applications.getRegisteredApplications();for (Application registeredApplication : registeredApplications) {serverList.add(registeredApplication.getName());}return "已注册的服务列表:"+serverList;}}

(4)编写Spring boot启动类,并启动:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}}

(5)启动完成后,通过浏览器访问接口地址,http://localhost:8889/get

会在浏览器看见接口返回内容:已注册的服务列表:[MYSERVER]

MYSERVER是服务提供者配置的spring.application.name内容的全大写名。

原理:

(1)在controller中通过@Autowired自动注入的EurekaClient,是在Spring boot启动时,自动配置的。在EurekaClientAutoConfiguration类中:

@Bean(destroyMethod = "shutdown"
)
@ConditionalOnMissingBean(value = {EurekaClient.class},search = SearchStrategy.CURRENT
)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance, @Autowired(required = false) HealthCheckHandler healthCheckHandler) {ApplicationInfoManager appManager;if (AopUtils.isAopProxy(manager)) {appManager = (ApplicationInfoManager)ProxyUtils.getTargetObject(manager);} else {appManager = manager;}CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs, this.context);cloudEurekaClient.registerHealthCheck(healthCheckHandler);return cloudEurekaClient;
}

org.springframework.cloud.client.discovery.DiscoveryClient是在自动配置类EurekaDiscoveryClientConfiguration完成bean的注册的:

// EurekaClient会被自动注入
@Bean
@ConditionalOnMissingBean
public EurekaDiscoveryClient discoveryClient(EurekaClient client, EurekaClientConfig clientConfig) {return new EurekaDiscoveryClient(client, clientConfig);
}

EurekaDiscoveryClient中持有EurekaClient,其实也就是调用EurekaClient中的方法。

(2)和服务注册一样,从CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs, this.context);进入构造函数开始走。在DiscoveryClient的构造函数中:

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {// Timer类型的变量,定时去注册中心拉取服务列表this.FETCH_REGISTRY_TIMER = Monitors.newTimer("DiscoveryClient_FetchRegistry");......// 线程池  this.cacheRefreshExecutor = new ThreadPoolExecutor(1, this.clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0L, TimeUnit.SECONDS, new SynchronousQueue(), (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build());......// 依然是initScheduledTasksthis.initScheduledTasks();......
}

initScheduledTasks:

private void initScheduledTasks() {......if (this.clientConfig.shouldFetchRegistry()) {// 相关的时间配置renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds();expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound();// 线程池添加线程任务this.scheduler.schedule(new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.CacheRefreshThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS);}......
}

new TimedSupervisorTask

public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor, int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {this.scheduler = scheduler;// executor为cacheRefreshExecutorthis.executor = executor;// task为new DiscoveryClient.CacheRefreshThread()this.task = task;
}

因为TimedSupervisorTask是线程类,先进入run方法:

public void run() {......// 用线程池cacheRefreshExecutor去执行new DiscoveryClient.CacheRefreshThread()线程任务future = this.executor.submit(this.task);......}

new DiscoveryClient.CacheRefreshThread():

class CacheRefreshThread implements Runnable {CacheRefreshThread() {}// 线程类,执行逻辑进入run方法public void run() {DiscoveryClient.this.refreshRegistry();}
}

DiscoveryClient.this.refreshRegistry:

void refreshRegistry() {......// 参数remoteRegionsModified为boolean类型,表示注册中心的服务列表是否改变boolean success = this.fetchRegistry(remoteRegionsModified);......}

fetchRegistry:

private boolean fetchRegistry(boolean forceFullRegistryFetch){......this.getAndStoreFullRegistry();......
}

getAndStoreFullRegistry:

private void getAndStoreFullRegistry() throws Throwable {long currentUpdateGeneration = this.fetchRegistryGeneration.get();logger.info("Getting all instance registry info from the eureka server");Applications apps = null;// 发送http请求给Eureka注册中心获取服务列表// 此处调用的是接口:http://localhost:8899/eureka/appsEurekaHttpResponse<Applications> httpResponse = this.clientConfig.getRegistryRefreshSingleVipAddress() == null ? this.eurekaTransport.queryClient.getApplications((String[])this.remoteRegionsRef.get()) : this.eurekaTransport.queryClient.getVip(this.clientConfig.getRegistryRefreshSingleVipAddress(), (String[])this.remoteRegionsRef.get());if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {apps = (Applications)httpResponse.getEntity();}logger.info("The response status is {}", httpResponse.getStatusCode());if (apps == null) {logger.error("The application is null for some reason. Not storing this information");} else if (this.fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1L)) {// 新获取的注册服务列表会被放进localRegionAppsthis.localRegionApps.set(this.filterAndShuffle(apps));logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());} else {logger.warn("Not updating applications as another thread is updating it already");}}

Eureka客户端获取服务端所有的注册服务信息,调用的是接口:http://localhost:8899/eureka/apps,在浏览器上访问该地址,会返回如下内容。内容格式是XML。可通过设置请求头:accept:application/json,返回JSON格式的内容:

新获取的注册服务列表会被放进localRegionApps,回过头看看getApplications方法,就是从localRegionApps获取内容的:

public Applications getApplications() {return (Applications)this.localRegionApps.get();
}

█ HTTP接口

(1)EurekaHttpClient定义了方法,每个方法对应一个功能,每个功能对应调用响应的接口去请求Eureka服务端的功能。Eureka客户端可通过调用此接口方法与Eureka服务端进行通信:

public interface EurekaHttpClient {// 注册服务EurekaHttpResponse<Void> register(InstanceInfo var1);EurekaHttpResponse<Void> cancel(String var1, String var2);// 发送心跳EurekaHttpResponse<InstanceInfo> sendHeartBeat(String var1, String var2, InstanceInfo var3, InstanceStatus var4);EurekaHttpResponse<Void> statusUpdate(String var1, String var2, InstanceStatus var3, InstanceInfo var4);EurekaHttpResponse<Void> deleteStatusOverride(String var1, String var2, InstanceInfo var3);// 获取所有已注册服务列表EurekaHttpResponse<Applications> getApplications(String... var1);EurekaHttpResponse<Applications> getDelta(String... var1);EurekaHttpResponse<Applications> getVip(String var1, String... var2);EurekaHttpResponse<Applications> getSecureVip(String var1, String... var2);// 根据服务名获取服务列表EurekaHttpResponse<Application> getApplication(String var1);EurekaHttpResponse<InstanceInfo> getInstance(String var1, String var2);EurekaHttpResponse<InstanceInfo> getInstance(String var1);void shutdown();
}

register:

服务注册,对应ApplicationsResource中的接口/eureka/apps/服务名,转发到ApplicationResource中的post请求。

sendHeartBeat:

心跳服务,对应ApplicationsResource中的接口/eureka/apps/服务名:

对应到ApplicationResource中的/id(实例号):

对应到InstanceResource的put方法:

......其他的方法一样,都有对应的HTTP接口。

(2)抽象类AbstractJerseyEurekaHttpClient实现了EurekaHttpClient,提供了方法的实现:

public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient

(3)AbstractJerseyEurekaHttpClient有两个子类实现,分别是JerseyApplicationClientJerseyReplicationClient。从名字上面可以看出JerseyApplicationClient用于eureka客户端的通信调用,JerseyReplicationClient用于eureka服务端副本的通信调用:

public class JerseyApplicationClient extends AbstractJerseyEurekaHttpClient
public class JerseyReplicationClient extends AbstractJerseyEurekaHttpClient implements HttpReplicationClient

服务注册与发现-Spring Cloud Netflix-Eureka相关推荐

  1. 使用Nacos实现服务注册与发现(spring cloud 组件教程大全四)

    使用Nacos实现服务注册与发现(spring cloud 组件教程大全四) idea 创建maven父子工程(spring cloud 组件教程大全 一) windows下nacos的安装及Mysq ...

  2. SpringCloud 从菜鸟到大牛之二 服务注册与发现 Sping Cloud Eureka

    继承上一篇文章 ,本文 就专门来介绍一下 服务与注册组件 服务注册与发现 Sping Cloud Eureka ,作为各个微服务的注册中心,维持心跳连接 注册中心 : Eureka Server ,E ...

  3. Spring Cloud Netflix Eureka 配置参数说明

    为什么80%的码农都做不了架构师?>>>    Eureka Client 配置项(eureka.client.*) org.springframework.cloud.netfli ...

  4. 【系统架构理论】一篇文章精通:Spring Cloud Netflix Eureka

    是官方文档的总结 http://spring.io/projects/spring-cloud-netflix#overview 讲解基于2.0.2版本官方文档 https://cloud.sprin ...

  5. 白话SpringCloud | 第三章:服务注册与发现-高可用配置(Eureka)-下

    2019独角兽企业重金招聘Python工程师标准>>> 前言 上一章节,讲解了在单机模式下的服务注册与发现的相关知识点及简单示例.而在实际生产或者在这种微服务架构的分布式环境中,需要 ...

  6. Spring Cloud Netflix Eureka client源码分析

    1.client端 EurekaClient提供三个功能: EurekaClient API contracts are: * - provide the ability to get Instanc ...

  7. Spring Cloud -> Spring Cloud Netflix Eureka快速搭建(一)

    前言:不忘初心,寻找最初的编程快感! 1.简介 Eureka是Netflix开发的服务发现框架,本身是一个基于REST的服务,主要用于定位运行在AWS域中的中间层服务,以达到负载均衡和中间层服务故障转 ...

  8. SpringCloud——Eureka服务注册和发现

    一.SpringCloud和Dubbo SpringCloud整合了一套较为完整的微服务解决方案框架,而Dubbo只是解决了微服务的几个方面的问题. content Dubbo SpringCloud ...

  9. Spring Cloud Netflix五大组件简介

    微服务与微服务架构 微服务的优缺点 优点 缺点 Dubbo与Spring Cloud Spring Cloud Netflix Eureka Eureka的自我保护机制 Eureka和ZooKeepe ...

  10. 跟着狂神学SpringCloud(Rest环境搭建+Eureka服务注册与发现+ribbon+Feign负载均衡+Hystrix+服务熔断+Zuul路由网关+SpringCloud config分布)

    跟着狂神学SpringCloud SpringCloud 回顾之前的知识- JavaSE 数据库 前端 Servlet Http Mybatis Spring SpringMVC SpringBoot ...

最新文章

  1. PHP 在 Nginx 下主动断开连接 Connection Close 与 ignore_user_abort 后台运行
  2. cesium 加载bim模型_构建统一CIM数字底盘,实现基于BIM的全流程管控
  3. GRE词汇乱序版-夹生的词汇3
  4. seata-golang 接入指南
  5. linux实时进程优先级rt,Linux实时性- PREEMPT_RT实时抢占实现
  6. PM2.5环境检测系统的设计与分析
  7. python求解分支定界(branch-and-bound)问题使用pybnb基本架构
  8. 【调参】如何为神经网络选择最合适的学习率lr-LRFinder-for-Keras
  9. M3DGIS三维数字沙盘开发教程第51课可视化交互大数据地理信息系统开发教程第51课
  10. cdr圆形渐变填充怎么设置_适用于平面设计的软件cdr!
  11. 【文学杂谈】徐宥 - 我的大学
  12. 会员中心—1—登录与注册
  13. Android 音乐播放器的开发教程(六)service的运用及音乐列表点击播放 ----- 小达
  14. 维护计算机网络教室的常见问题及解决方案
  15. 微信小程序开发需要什么前提条件?
  16. mac OS 下的开源工具 macports
  17. 开关电源模块并联供电系统_水冷风冷高频开关电源直流整流器
  18. 炫酷的android ui,25个Android酷炫开源UI框架
  19. BUUCTF 还原大师
  20. GitLab一次性下载多个项目

热门文章

  1. PS学习笔记-----提示暂存盘满了怎么办???
  2. Android实现省市区三级联动效果
  3. veu2x 消息订阅与发送
  4. 下载书籍的网址【汇总】
  5. 老电脑换Linux系统是否会更快,旧电脑不要装Windows!Bodhi Linux系统,小巧强悍,运行更流畅...
  6. 无线AP,无线中继器与无线路由
  7. 什么是应届生?要不要签三方?看看就知道了
  8. 求两个数的最小公倍数方法汇总
  9. 中文短信PDU包格式生成工具
  10. 村长网推出用韩国论山特产草莓特制的低糖“村长草莓酱”