Nacos服务注册需要具备的能力

  • 服务提供者把自己的协议地址注册到Nacos server
  • 服务消费者需要从Nacos Server上去查询服务提供者的地址(根据服务名称)
  • Nacos Server需要感知到服务提供者的上下线的变化
  • 服务消费者需要动态感知到Nacos Server端服务地址的变化

Nacos API

SDK(底层也是基于open Api调用) / OPEN API(Rest 接口)

官网

服务注册

void registerInstance(String serviceName, String ip, int port) throws NacosException;void registerInstance(String serviceName, String ip, int port, String clusterName) throws NacosException;void registerInstance(String serviceName, Instance instance) throws NacosException;

请求参数

名称 类型 描述
serviceName 字符串 服务名
ip 字符串 服务实例IP
port int 服务实例port
clusterName 字符串 集群名
instance 参见代码注释 实例属性

请求示例

NamingService naming = NamingFactory.createNamingService(System.getProperty("serveAddr"));
naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1");Instance instance = new Instance();
instance.setIp("55.55.55.55");
instance.setPort(9999);
instance.setHealthy(false);
instance.setWeight(2.0);
Map<String, String> instanceMeta = new HashMap<>();
instanceMeta.put("site", "et2");
instance.setMetadata(instanceMeta);Service service = new Service("nacos.test.4");
service.setApp("nacos-naming");
service.sethealthCheckMode("server");
service.setEnableHealthCheck(true);
service.setProtectThreshold(0.8F);
service.setGroup("CNCF");
Map<String, String> serviceMeta = new HashMap<>();
serviceMeta.put("symmetricCall", "true");
service.setMetadata(serviceMeta);
instance.setService(service);Cluster cluster = new Cluster();
cluster.setName("TEST5");
AbstractHealthChecker.Http healthChecker = new AbstractHealthChecker.Http();
healthChecker.setExpectedResponseCode(400);
healthChecker.setCurlHost("USer-Agent|Nacos");
healthChecker.setCurlPath("/xxx.html");
cluster.setHealthChecker(healthChecker);
Map<String, String> clusterMeta = new HashMap<>();
clusterMeta.put("xxx", "yyyy");
cluster.setMetadata(clusterMeta);instance.setCluster(cluster);naming.registerInstance("nacos.test.4", instance);

服务查询

List<Instance> getAllInstances(String serviceName) throws NacosException;List<Instance> getAllInstances(String serviceName, List<String> clusters) throws NacosException;

请求参数

名称 类型 描述
serviceName 字符串 服务名
clusters List 集群列表

请求示例

NamingService naming = NamingFactory.createNamingService(System.getProperty("serveAddr"));
System.out.println(naming.getAllInstances("nacos.test.3"));

心跳机制

  • 心跳的发送间隔
  • 心跳的超时时间
    • 设置一个心跳超时的阈值
    • 记录针对于某一个服务实例的最后一次更新的时间
    • 当前时间-当前实例最后一次更新的时间> 心跳超时的阈值

Nacos的实现原理图

Nacos的源码分析

Nacos客户端注册的流程

Dubbo服务注册的流程有两个,一个是和之前分析Eureka源码时的路径一样(参考Eureka源码分析)

另一个是基于Dubbo本身提供的自动装配机制,而在基于Dubbo服务发布的过程中, 是走的事件监听机制,在 DubboServiceRegistrationNonWebApplicationAutoConfiguration 这个类中,这个类会 监听 ApplicationStartedEvent 事件,这个事件是spring boot在2.0新增的,就是当spring boot应用 启动完成之后会发布这个事件。而此时监听到这个事件之后,会触发注册的动作。

Spring Cloud Nacos 继承Dubbo

DubboServiceRegistrationNonWebApplicationAutoConfiguration

//监听ApplicationStartedEvent事件,进行服务的注册
@EventListener(ApplicationStartedEvent.class)
public void onApplicationStarted() {setServerPort();register();
}private void register() {if (registered) {return;}serviceRegistry.register(registration);registered = true;
}

this.serviceRegistry。 是一个注入的实例: NacosServiceRegistry

NacosServiceRegistry.register

@Override
public void register(Registration registration) {if (StringUtils.isEmpty(registration.getServiceId())) {log.warn("No service to register for nacos client...");return;}//serviceId对应当前应用的application.nameString serviceId = registration.getServiceId();//group表示nacos上的分组配置String group = nacosDiscoveryProperties.getGroup();//instance表示服务实例信息Instance instance = getNacosInstanceFromRegistration(registration);try {namingService.registerInstance(serviceId, group, instance);log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,instance.getIp(), instance.getPort());}catch (Exception e) {log.error("nacos registry, {} register failed...{},", serviceId,registration.toString(), e);// rethrow a RuntimeException if the registration is failed.// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132rethrowRuntimeException(e);}
}

NacosNamingService.registerInstance

开始注册实例,主要做两个动作

  • 如果当前注册的是临时节点,则构建心跳信息,通过beat反应堆来构建心跳任务
  • 调用registerService发起服务注册
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {//是否是临时节点,如果是临时节点,则构建心跳信息if (instance.isEphemeral()) {BeatInfo beatInfo = new BeatInfo();beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));beatInfo.setIp(instance.getIp());beatInfo.setPort(instance.getPort());beatInfo.setCluster(instance.getClusterName());beatInfo.setWeight(instance.getWeight());beatInfo.setMetadata(instance.getMetadata());beatInfo.setScheduled(false);beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());//beatReactor, 添加心跳信息进行处理beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);}serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}

NamingProxy.registerService

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",namespaceId, serviceName, instance);final Map<String, String> params = new HashMap<String, String>(9);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put(CommonParams.GROUP_NAME, groupName);params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());params.put("ip", instance.getIp());params.put("port", String.valueOf(instance.getPort()));params.put("weight", String.valueOf(instance.getWeight()));params.put("enable", String.valueOf(instance.isEnabled()));params.put("healthy", String.valueOf(instance.isHealthy()));params.put("ephemeral", String.valueOf(instance.isEphemeral()));params.put("metadata", JSON.toJSONString(instance.getMetadata()));reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);}

NamingProxy.reqAPI

public String reqAPI(String api, Map<String, String> params, String body, List<String> servers, String method) throws NacosException {params.put(CommonParams.NAMESPACE_ID, getNamespaceId());if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {throw new NacosException(NacosException.INVALID_PARAM, "no server available");}NacosException exception = new NacosException();//服务列表不为空的话,对应application.properties中配置的spring.cloud.nacos.discovery.server-addr的Nacos注册中心列表if (servers != null && !servers.isEmpty()) {Random random = new Random(System.currentTimeMillis());int index = random.nextInt(servers.size());//随机选取一个服务进行注册for (int i = 0; i < servers.size(); i++) {String server = servers.get(index);try {//调用指定服务return callServer(api, params, body, server, method);} catch (NacosException e) {exception = e;if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("request {} failed.", server, e);}}//轮询index = (index + 1) % servers.size();}}if (StringUtils.isNotBlank(nacosDomain)) {for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {try {return callServer(api, params, body, nacosDomain, method);} catch (NacosException e) {exception = e;if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);}}}}NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}",api, servers, exception.getErrCode(), exception.getErrMsg());throw new NacosException(exception.getErrCode(), "failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "+ exception.getMessage());}

NamingProxy.callServer

public String callServer(String api, Map<String, String> params, String body, String curServer, String method)throws NacosException {long start = System.currentTimeMillis();long end = 0;//添加签名信息,injectSecurityInfo(params);//添加头信息List<String> headers = builderHeaders();String url;if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {url = curServer + api;} else {if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) {curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort;}url = HttpClient.getPrefix() + curServer + api;}//发起HttpClient进行注册HttpClient.HttpResult result = HttpClient.request(url, headers, params, body, UtilAndComs.ENCODING, method);end = System.currentTimeMillis();//上报Metrice进行监控MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code)).observe(end - start);if (HttpURLConnection.HTTP_OK == result.code) {return result.content;}if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) {return StringUtils.EMPTY;}throw new NacosException(result.code, result.content);
}

HttpClient.request

public static HttpResult request(String url, List<String> headers, Map<String, String> paramValues, String body, String encoding, String method) {HttpURLConnection conn = null;try {String encodedContent = encodingParams(paramValues, encoding);url += (StringUtils.isEmpty(encodedContent)) ? "" : ("?" + encodedContent);conn = (HttpURLConnection) new URL(url).openConnection();setHeaders(conn, headers, encoding);conn.setConnectTimeout(CON_TIME_OUT_MILLIS);conn.setReadTimeout(TIME_OUT_MILLIS);conn.setRequestMethod(method);conn.setDoOutput(true);if (StringUtils.isNotBlank(body)) {byte[] b = body.getBytes();conn.setRequestProperty("Content-Length", String.valueOf(b.length));conn.getOutputStream().write(b, 0, b.length);conn.getOutputStream().flush();conn.getOutputStream().close();}conn.connect();if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("Request from server: " + url);}return getResult(conn);} catch (Exception e) {try {if (conn != null) {NAMING_LOGGER.warn("failed to request " + conn.getURL() + " from "+ InetAddress.getByName(conn.getURL().getHost()).getHostAddress());}} catch (Exception e1) {NAMING_LOGGER.error("[NA] failed to request ", e1);//ignore}NAMING_LOGGER.error("[NA] failed to request ", e);return new HttpResult(500, e.toString(), Collections.<String, String>emptyMap());} finally {IoUtils.closeQuietly(conn);}
}

Nacos服务端的注册流程

服务端提供了一个InstanceController类,在这个类中提供了服务注册相关的API,而服务端发起初测 时,调用的接口是: [post]: /nacos/v1/ns/instance

InstanceController.register

@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {//serviceName表示客户端服务名称final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);//namespaceId表示nacos的namespacefinal String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);// //从请求中解析出instance实例final Instance instance = parseInstance(request);serviceManager.registerInstance(namespaceId, serviceName, instance);return "ok";
}

ServiceManager.registerInstance

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {//创建一个空服务,在Nacos控制台服务列表展示的服务信息,实际上是初始化一个serviceMap,它是一个ConcurrentHashMap集合createEmptyService(namespaceId, serviceName, instance.isEphemeral());//从serviceMap中,根据namespaceId和serviceName得到一个服务对象Service service = getService(namespaceId, serviceName);if (service == null) {throw new NacosException(NacosException.INVALID_PARAM,"service not found, namespace: " + namespaceId + ", service: " + serviceName);}//调用addInstance创建一个服务实例addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

ServiceManager.createServiceIfAbsent

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)throws NacosException {//根据namespaceId,serviceName(group:serviceName)确定一个服务Service service = getService(namespaceId, serviceName);//service如果为空,说明是第一次创建,构建一个新的Serviceif (service == null) {Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);service = new Service();service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(NamingUtils.getGroupName(serviceName));// now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();if (cluster != null) {cluster.setService(service);service.getClusterMap().put(cluster.getName(), cluster);}service.validate();//存储到Map中putServiceAndInit(service);if (!local) {addOrReplaceService(service);}}
}

ServiceManager.getService

存储的结构

key namespaceId value Map<String, Service>

​ key group::serviceName value: Service

​ Service

​ List instances;

/**
* Map(namespace, Map(group::serviceName, Service)).
*/
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();public Service getService(String namespaceId, String serviceName) {if (serviceMap.get(namespaceId) == null) {return null;}return chooseServiceMap(namespaceId).get(serviceName);
}

ServiceManager.putServiceAndInit

private void putServiceAndInit(Service service) throws NacosException {//将注册的服务添加到Map中putService(service);//初始化心跳检测机制,检测Server下不同的Insatance之间的心跳service.init();//实现数据一致性监听,ephemeral=true表示采用raft协议,false表示采用DistroconsistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

ServiceManager.putService

将新注册的实例添加到同一个namespaceId下的map中

public void putService(Service service) {if (!serviceMap.containsKey(service.getNamespaceId())) {synchronized (putServiceLock) {if (!serviceMap.containsKey(service.getNamespaceId())) {serviceMap.put(service.getNamespaceId(), new ConcurrentHashMap<>(16));}}}serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}

ServiceManager.addInstance

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)throws NacosException {String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);//Service{name='DEFAULT_GROUP@@fireweed-manager', protectThreshold=0.0, appName='null', groupName='DEFAULT_GROUP', metadata={}}Service service = getService(namespaceId, serviceName);synchronized (service) {//把服务实例添加到集合中,然后基于一致性协议进行数据的同步。List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);Instances instances = new Instances();instances.setInstanceList(instanceList);consistencyService.put(key, instances);}
}

消费者的服务查询

启动的时候主要做两件事情

  1. 从Nacos Server中读取指定服务名称的实例列表,缓存到本地内存中

  2. 开启一个定时任务,每隔10s去轮询一次服务列表

服务注册成功之后,消费者就可以从nacos server中获取到服务提供者的地址,然后进行服务的调用。 在服务消费中,有一个核心的类 NacosDiscoveryClient 来负责和nacos交互,去获得服务提供者的地 址信息。前置的具体的流程就不在这里复述了,之前在讲dubbo源码的时候已经分析过服务的订阅过 程。 NacosDiscoveryClient 中提供了一个 getInstances 方法用来根据服务提供者名称获取服务提供者的 url地址的方法.

1.客户端启动获取服务列表(从Nacos Server中读取指定服务名称的实例列表,缓存到本地内存中)

NacosDiscoveryClient.getInstances

public List<ServiceInstance> getInstances(String serviceId) {try {List<Instance> instances = this.discoveryProperties.namingServiceInstance().selectInstances(serviceId, true);return hostToServiceInstanceList(instances, serviceId);} catch (Exception var3) {throw new RuntimeException("Can not get hosts from nacos server. serviceId: " + serviceId, var3);}
}

调用NamingService,根据serviceId、group获得服务实例列表。 然后把instance转化为ServiceInstance对象

NacosNamingService.selectInstances

selectInstances首先从hostReactor获取serviceInfo,然后再从serviceInfo.getHosts()剔除非healty、 非enabled、weight小于等于0的instance再返回;如果subscribe为true,则执行 hostReactor.getServiceInfo获取serviceInfo,否则执行 hostReactor.getServiceInfoDirectlyFromServer获取serviceInfo

@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {ServiceInfo serviceInfo;//是否需要订阅,由于我们要感知服务端的变化,肯定需要进行订阅if (subscribe) {//hostReactor主机反应堆,这里建立订阅机制,主要是两件事:①.push机制  ②.每隔10s客户端的短轮询serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));} else {serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));}return selectInstances(serviceInfo, healthy);
}

HostReactor.getServiceInfo

有两个逻辑,分别是

  • updateServiceNow, 立马从Nacos server中去加载服务地址信息
  • scheduleUpdateIfAbsent 开启定时调度,每1s去查询一次服务地址
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());//拼接服务名称+集群名称(默认为空)String key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}//根据ServiceName,cluster先从本地缓存中获取ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);//如果为null,则创建一个新的然后放入serviceInfoMap,同时放入updatingMap,执行 updateServiceNow,再从updatingMap移除; if (null == serviceObj) {//构建一个ServiceInfoserviceObj = new ServiceInfo(serviceName, clusters);//存储到serviceInfoMap本地缓存中serviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());//调用Nacos服务端获取实例信息updateServiceNow(serviceName, clusters);updatingMap.remove(serviceName);} else if (updatingMap.containsKey(serviceName)) {//如果从serviceInfoMap找出来的serviceObj在updatingMap中则等待UPDATE_HOLD_INTERVALif (UPDATE_HOLD_INTERVAL > 0) {// hold a moment waiting for update finishsynchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);}}}}//如果本地缓存中存在,则通过scheduleUpdateIfAbsent开启定时任务,再从serviceInfoMap取出 serviceInfo scheduleUpdateIfAbsent(serviceName, clusters);return serviceInfoMap.get(serviceObj.getKey());
}

HostReactor.getServiceInfo0

private Map<String, ServiceInfo> serviceInfoMap;private ServiceInfo getServiceInfo0(String serviceName, String clusters) {//构建keyString key = ServiceInfo.getKey(serviceName, clusters);//根据key从本地缓存中获取return serviceInfoMap.get(key);
}

HostReactor.updateServiceNow

private NamingProxy serverProxy;public void updateServiceNow(String serviceName, String clusters) {ServiceInfo oldService = getServiceInfo0(serviceName, clusters);try {//调用NamingProxy远程获取地址列表,这里的pushReceiver.getUDPPort()在查询列表的列表的时候会传递UDP端口,当服务端有地址列表变化的时候,基于此udp端口,推送push给客户端,客户端监听此端口,更新本地缓存String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);if (StringUtils.isNotEmpty(result)) {//解析获取到结果,保存到本地缓存中processServiceJSON(result);}} catch (Exception e) {NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);} finally {if (oldService != null) {synchronized (oldService) {oldService.notifyAll();}}}
}

HostReactor.processServiceJSON

public ServiceInfo processServiceJSON(String json) {//解析ServiceInfo serviceInfo = JSON.parseObject(json, ServiceInfo.class);//根据key从本地缓存中获取ServiceInfoServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {//empty or error push, just ignorereturn oldService;}boolean changed = false;//oldService不为null,更新本地缓存if (oldService != null) {if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime()+ ", new-t: " + serviceInfo.getLastRefTime());}serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());for (Instance host : oldService.getHosts()) {oldHostMap.put(host.toInetAddr(), host);}Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());for (Instance host : serviceInfo.getHosts()) {newHostMap.put(host.toInetAddr(), host);}Set<Instance> modHosts = new HashSet<Instance>();Set<Instance> newHosts = new HashSet<Instance>();Set<Instance> remvHosts = new HashSet<Instance>();List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(newHostMap.entrySet());for (Map.Entry<String, Instance> entry : newServiceHosts) {Instance host = entry.getValue();String key = entry.getKey();if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(),oldHostMap.get(key).toString())) {modHosts.add(host);continue;}if (!oldHostMap.containsKey(key)) {newHosts.add(host);}}for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {Instance host = entry.getValue();String key = entry.getKey();if (newHostMap.containsKey(key)) {continue;}if (!newHostMap.containsKey(key)) {remvHosts.add(host);}}if (newHosts.size() > 0) {changed = true;NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: "+ serviceInfo.getKey() + " -> " + JSON.toJSONString(newHosts));}if (remvHosts.size() > 0) {changed = true;NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: "+ serviceInfo.getKey() + " -> " + JSON.toJSONString(remvHosts));}if (modHosts.size() > 0) {changed = true;NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: "+ serviceInfo.getKey() + " -> " + JSON.toJSONString(modHosts));}serviceInfo.setJsonFromServer(json);if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {eventDispatcher.serviceChanged(serviceInfo);DiskCache.write(serviceInfo, cacheDir);}} else {//直接缓存changed = true;NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JSON.toJSONString(serviceInfo.getHosts()));serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);eventDispatcher.serviceChanged(serviceInfo);serviceInfo.setJsonFromServer(json);DiskCache.write(serviceInfo, cacheDir);}MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());if (changed) {NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() +" -> " + JSON.toJSONString(serviceInfo.getHosts()));}return serviceInfo;
}

NamingProxy.queryList

public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)throws NacosException {final Map<String, String> params = new HashMap<String, String>(8);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put("clusters", clusters);params.put("udpPort", String.valueOf(udpPort));params.put("clientIP", NetUtils.localIP());params.put("healthyOnly", String.valueOf(healthyOnly));return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
}

服务端处理查询列表请求

InstanceController.list

@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {//获取namespaceIdString namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);//获取serviceNameString serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);String agent = WebUtils.getUserAgent(request);//获取cluster集群名称String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);//获取udpPortint udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));String env = WebUtils.optional(request, "env", StringUtils.EMPTY);//获取是否开启检测(例如Eureka的自我保护机制)boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));String app = WebUtils.optional(request, "app", StringUtils.EMPTY);String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);//获取是否健康状态boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,healthyOnly);
}

InstanceController.doSrvIpxt

public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {ClientInfo clientInfo = new ClientInfo(agent);ObjectNode result = JacksonUtils.createEmptyJsonNode();//根据namespaceId,serviceName获取服务ServiceService service = serviceManager.getService(namespaceId, serviceName);//如果不存在,构建一个空集合返回if (service == null) {if (Loggers.SRV_LOG.isDebugEnabled()) {Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);}result.put("name", serviceName);result.put("clusters", clusters);result.replace("hosts", JacksonUtils.createEmptyArrayNode());return result;}checkIfDisabled(service);long cacheMillis = switchDomain.getDefaultCacheMillis();//判断updPort端口是否大于0,并且开启自动推送// now try to enable the pushtry {if (udpPort > 0 && pushService.canEnablePush(agent)) {//将客户端的信息添加到PushaService,专门用来当服务端的地址列表发送变化的时候通过upd协议推送给客户端pushService.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),pushDataSource, tid, app);cacheMillis = switchDomain.getPushCacheMillis(serviceName);}} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);cacheMillis = switchDomain.getDefaultCacheMillis();}List<Instance> srvedIPs;srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));// filter ips using selector:if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {srvedIPs = service.getSelector().select(clientIP, srvedIPs);}if (CollectionUtils.isEmpty(srvedIPs)) {if (Loggers.SRV_LOG.isDebugEnabled()) {Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);}if (clientInfo.type == ClientInfo.ClientType.JAVA&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {result.put("dom", serviceName);} else {result.put("dom", NamingUtils.getServiceName(serviceName));}result.put("hosts", JacksonUtils.createEmptyArrayNode());result.put("name", serviceName);result.put("cacheMillis", cacheMillis);result.put("lastRefTime", System.currentTimeMillis());result.put("checksum", service.getChecksum());result.put("useSpecifiedURL", false);result.put("clusters", clusters);result.put("env", env);result.put("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));return result;}Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);//TRUE:实例时正常的状态,FALSE:不正常的状态ipMap.put(Boolean.TRUE, new ArrayList<>());ipMap.put(Boolean.FALSE, new ArrayList<>());//获取服务的健康状态,添加到ipMap中for (Instance ip : srvedIPs) {ipMap.get(ip.isHealthy()).add(ip);}//判断是否开启检查if (isCheck) {result.put("reachProtectThreshold", false);}double threshold = service.getProtectThreshold();//判断是否需要开启自我保护机制,正常的服务总数/总的服务数<= 阈值,if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);if (isCheck) {result.put("reachProtectThreshold", true);}//将健康和不健康的实例全部添加到ipMap中返回ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));ipMap.get(Boolean.FALSE).clear();}if (isCheck) {result.put("protectThreshold", service.getProtectThreshold());result.put("reachLocalSiteCallThreshold", false);return JacksonUtils.createEmptyJsonNode();}ArrayNode hosts = JacksonUtils.createEmptyArrayNode();for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {List<Instance> ips = entry.getValue();if (healthyOnly && !entry.getKey()) {continue;}for (Instance instance : ips) {// remove disabled instance:if (!instance.isEnabled()) {continue;}ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();ipObj.put("ip", instance.getIp());ipObj.put("port", instance.getPort());// deprecated since nacos 1.0.0:ipObj.put("valid", entry.getKey());ipObj.put("healthy", entry.getKey());ipObj.put("marked", instance.isMarked());ipObj.put("instanceId", instance.getInstanceId());ipObj.put("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));ipObj.put("enabled", instance.isEnabled());ipObj.put("weight", instance.getWeight());ipObj.put("clusterName", instance.getClusterName());if (clientInfo.type == ClientInfo.ClientType.JAVA&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {ipObj.put("serviceName", instance.getServiceName());} else {ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));}ipObj.put("ephemeral", instance.isEphemeral());hosts.add(ipObj);}}//组装数据返回result.replace("hosts", hosts);if (clientInfo.type == ClientInfo.ClientType.JAVA&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {result.put("dom", serviceName);} else {result.put("dom", NamingUtils.getServiceName(serviceName));}result.put("name", serviceName);result.put("cacheMillis", cacheMillis);result.put("lastRefTime", System.currentTimeMillis());result.put("checksum", service.getChecksum());result.put("useSpecifiedURL", false);result.put("clusters", clusters);result.put("env", env);result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));return result;
}

服务的订阅,实现服务的动态更新

2.开启一个定时任务,每隔10s去轮询一次服务列表

NacosNamingService.init

public NacosNamingService(Properties properties) {init(properties);
}private void init(Properties properties) {namespace = InitUtils.initNamespaceForNaming(properties);initServerAddr(properties);InitUtils.initWebRootContext();initCacheDir();initLogName(properties);eventDispatcher = new EventDispatcher();serverProxy = new NamingProxy(namespace, endpoint, serverList, properties);//心跳检查BeatReactorbeatReactor = new BeatReactor(serverProxy, initClientBeatThreadCount(properties));//定时轮询检查HostReactorhostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir, isLoadCacheAtStart(properties),initPollingThreadCount(properties));
}

HostReactor构造方法

public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir,boolean loadCacheAtStart, int pollingThreadCount) {//开启一个线程executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.client.naming.updater");return thread;}});this.eventDispatcher = eventDispatcher;this.serverProxy = serverProxy;this.cacheDir = cacheDir;if (loadCacheAtStart) {this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));} else {this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);}this.updatingMap = new ConcurrentHashMap<String, Object>();this.failoverReactor = new FailoverReactor(this, cacheDir);//初始化PushReceiver,初始化一个服务监听的订阅this.pushReceiver = new PushReceiver(this);
}

PushReceiver构造方法

public PushReceiver(HostReactor hostReactor) {try {this.hostReactor = hostReactor;udpSocket = new DatagramSocket();//开启一个线程池,不断接受服务端传递过来的数据executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.naming.push.receiver");return thread;}});//调用run方法executorService.execute(this);} catch (Exception e) {NAMING_LOGGER.error("[NA] init udp socket failed", e);}
}
@Override
public void run() {//通过while循环不断监听客户端Nacos Server传递过来的数据,实现一个Push的机制while (true) {try {// byte[] is initialized with 0 full filled by defaultbyte[] buffer = new byte[UDP_MSS];DatagramPacket packet = new DatagramPacket(buffer, buffer.length);//初始化一个监听,不断接受客户端Nacos Server传递过来的数据udpSocket.receive(packet);String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);String ack;if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {hostReactor.processServiceJSON(pushPacket.data);// send ack to serverack = "{\"type\": \"push-ack\""+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime+ "\", \"data\":" + "\"\"}";} else if ("dump".equals(pushPacket.type)) {// dump data to serverack = "{\"type\": \"dump-ack\""+ ", \"lastRefTime\": \"" + pushPacket.lastRefTime+ "\", \"data\":" + "\""+ StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))+ "\"}";} else {// do nothing send ack onlyack = "{\"type\": \"unknown-ack\""+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime+ "\", \"data\":" + "\"\"}";}udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));} catch (Exception e) {NAMING_LOGGER.error("[NA] error while receiving push data", e);}}
}

HostReactor.updateServiceNow

在第一次启动的时候查看服务列表的时候,会将自己的ip以及upd端口传递给Nacso Server端,当Nacos Server端服务地址列表发生变化的时候,会基于此端口进行推送,从而能够让上面udpSocket.receive(packet)接收到地址列表的变化

NacosNamingService.getAllInstances--->HostReactor.getServiceInfo---->HostReactor.updateServiceNow
public void updateServiceNow(String serviceName, String clusters) {ServiceInfo oldService = getServiceInfo0(serviceName, clusters);try {String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);if (StringUtils.isNotEmpty(result)) {processServiceJSON(result);}} catch (Exception e) {NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);} finally {if (oldService != null) {synchronized (oldService) {oldService.notifyAll();}}}}

NacosNamingService.getServiceInfo

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());String key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);if (null == serviceObj) {serviceObj = new ServiceInfo(serviceName, clusters);serviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());updateServiceNow(serviceName, clusters);updatingMap.remove(serviceName);} else if (updatingMap.containsKey(serviceName)) {if (UPDATE_HOLD_INTERVAL > 0) {// hold a moment waiting for update finishsynchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);}}}}//开启一个定时任务,客户端每隔10s轮询一次,这个任务会在默认在1s之后开 始执行。而任务的具体实现是一个UpdateTask。scheduleUpdateIfAbsent(serviceName, clusters);return serviceInfoMap.get(serviceObj.getKey());
}

HostReactor.scheduleUpdateIfAbsent

private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();public void scheduleUpdateIfAbsent(String serviceName, String clusters) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}synchronized (futureMap) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}//添加一个任务UpdateTaskScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);}
}

UpdateTask

public class UpdateTask implements Runnable {long lastRefTime = Long.MAX_VALUE;private String clusters;private String serviceName;public UpdateTask(String serviceName, String clusters) {this.serviceName = serviceName;this.clusters = clusters;}@Overridepublic void run() {try {//先从本地缓存中获取服务信息ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));//如果为null,说明本地没有,需要从服务端获取if (serviceObj == null) {//调用updateServiceNow从服务端获取地址列表updateServiceNow(serviceName, clusters);//将这个定时任务重新开启,实现重复的轮询功能executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);return;}//判断服务是否已过期,当前服务的最后一次更新时间 <= 全局的最后一次更新if (serviceObj.getLastRefTime() <= lastRefTime) {//调用updateServiceNow从服务端获取地址列表,更新服务列表updateServiceNow(serviceName, clusters);serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));} else {// if serviceName already updated by push, we should not override it// since the push data may be different from pull through force push//如果服务已经被基于push机制的情况下做了更新,那么我们不需要覆盖本地服务。            //因为push过来的数据和pull数据不同,所以这里只是调用请求去刷新服务 refreshOnly(serviceName, clusters);}//更新最后一次刷新时间 lastRefTime = serviceObj.getLastRefTime();//如果没有实现订阅或者futureMap中不包含指定服务信息,则中断更新请求if (!eventDispatcher.isSubscribed(serviceName, clusters) &&!futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {// abort the update task:NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);return;}//延后10s执行,实现重复轮询 executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);} catch (Throwable e) {NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);}}
}

3.push服务端请求推送数据

在服务端处理注册请求的时候,InstanceController.registerInstance ----> ServiceManager.registerInstance

----> ServiceManager.createEmptyService—>ServiceManager.createServiceIfAbsent---->ServiceManager.putServiceAndInit

ServiceManager.putServiceAndInit

private void putServiceAndInit(Service service) throws NacosException {putService(service);//开启一个健康检查的定时任务,用来检测各实例主键的健康状态service.init();consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

Service.init

这个init方法,会和当前服务提供者建立一个心跳检测机制,这个心跳检测会每5s执行一次

private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);public void init() {//健康检测的定时任务,HealthCheckReactor.scheduleCheck(clientBeatCheckTask);for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);entry.getValue().init();}
}

HealthCheckReactor.scheduleCheck

第一次启动之后5s执行,之后每次延时5s执行

public static void scheduleCheck(ClientBeatCheckTask task) {futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}

ClientBeatCheckTask.run

public class ClientBeatCheckTask implements Runnable {private Service service;public ClientBeatCheckTask(Service service) {this.service = service;}@JsonIgnorepublic PushService getPushService() {return ApplicationUtils.getBean(PushService.class);}@JsonIgnorepublic DistroMapper getDistroMapper() {return ApplicationUtils.getBean(DistroMapper.class);}public GlobalConfig getGlobalConfig() {return ApplicationUtils.getBean(GlobalConfig.class);}public SwitchDomain getSwitchDomain() {return ApplicationUtils.getBean(SwitchDomain.class);}public String taskKey() {return KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName());}@Overridepublic void run() {try {if (!getDistroMapper().responsible(service.getName())) {return;}if (!getSwitchDomain().isHealthCheckEnabled()) {return;}List<Instance> instances = service.allIPs(true);// first set health status of instances://遍历服务节点进行心跳检测for (Instance instance : instances) {//如果服务实例的最后一次心跳时间大于设置的超时时间,则认为这个服务已经下线。 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {if (instance.isHealthy()) {//设置实例为非健康状态instance.setHealthy(false);Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",instance.getIp(), instance.getPort(), instance.getClusterName(),service.getName(), UtilsAndCommons.LOCALHOST_SITE,instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());//发起一个实例变更的推送,推送服务变更事件。getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}// then remove obsolete instances:for (Instance instance : instances) {if (instance.isMarked()) {continue;}if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),JacksonUtils.toJson(instance));//删除过期的服务实deleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}private void deleteIp(Instance instance) {try {NamingProxy.Request request = NamingProxy.Request.newRequest();request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort())).appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName()).appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId());String url = "http://127.0.0.1:" + ApplicationUtils.getPort() + ApplicationUtils.getContextPath()+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();// delete instance asynchronously:HttpClient.asyncHttpDelete(url, null, null, new AsyncCompletionHandler() {@Overridepublic Object onCompleted(Response response) throws Exception {if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {Loggers.SRV_LOG.error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",instance.toJson(), response.getResponseBody(), response.getStatusCode());}return null;}});} catch (Exception e) {Loggers.SRV_LOG.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e);}}
}

PushService.serviceChanged

public void serviceChanged(Service service) {// merge some change events to reduce the push frequency:if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {return;}//发布一个事件变更的通知this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
}

PushService.onApplicationEvent(开启一个事件监听)

public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {@Overridepublic void onApplicationEvent(ServiceChangeEvent event) {Service service = event.getService();String serviceName = service.getName();String namespaceId = service.getNamespaceId();Future future = GlobalExecutor.scheduleUdpSender(() -> {try {Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));if (MapUtils.isEmpty(clients)) {return;}Map<String, Object> cache = new HashMap<>(16);long lastRefTime = System.nanoTime();for (PushClient client : clients.values()) {if (client.zombie()) {Loggers.PUSH.debug("client is zombie: " + client.toString());clients.remove(client.toString());Loggers.PUSH.debug("client is zombie: " + client.toString());continue;}Receiver.AckEntry ackEntry;Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());byte[] compressData = null;Map<String, Object> data = null;if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);compressData = (byte[]) (pair.getValue0());data = (Map<String, Object>) pair.getValue1();Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());}if (compressData != null) {ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);} else {ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);if (ackEntry != null) {cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));}}Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",client.getServiceName(), client.getAddrStr(), client.getAgent(),(ackEntry == null ? null : ackEntry.key));//推送数据包udpPush(ackEntry);}} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);} finally {futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));}}, 1000, TimeUnit.MILLISECONDS);futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);}
}

PushService.udpPush

private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {if (ackEntry == null) {Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");return null;}if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);ackMap.remove(ackEntry.key);udpSendTimeMap.remove(ackEntry.key);failedPush += 1;return ackEntry;}try {if (!ackMap.containsKey(ackEntry.key)) {totalPush++;}ackMap.put(ackEntry.key, ackEntry);udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());Loggers.PUSH.info("send udp packet: " + ackEntry.key);//基于UDPSocket发起请求推送PushudpSocket.send(ackEntry.origin);ackEntry.increaseRetryTime();GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);return ackEntry;} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,ackEntry.origin.getAddress().getHostAddress(), e);ackMap.remove(ackEntry.key);udpSendTimeMap.remove(ackEntry.key);failedPush += 1;return null;}
}

客户端监听此推送

PushReceiver.run

@Override
public void run() {while (true) {try {// byte[] is initialized with 0 full filled by defaultbyte[] buffer = new byte[UDP_MSS];DatagramPacket packet = new DatagramPacket(buffer, buffer.length);udpSocket.receive(packet);String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);String ack;if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {//解析并更新本地缓存hostReactor.processServiceJSON(pushPacket.data);// send ack to serverack = "{\"type\": \"push-ack\""+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime+ "\", \"data\":" + "\"\"}";} else if ("dump".equals(pushPacket.type)) {// dump data to serverack = "{\"type\": \"dump-ack\""+ ", \"lastRefTime\": \"" + pushPacket.lastRefTime+ "\", \"data\":" + "\""+ StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))+ "\"}";} else {// do nothing send ack onlyack = "{\"type\": \"unknown-ack\""+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime+ "\", \"data\":" + "\"\"}";}udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));} catch (Exception e) {NAMING_LOGGER.error("[NA] error while receiving push data", e);}}
}

HostReactor.processServiceJSON

public ServiceInfo processServiceJSON(String json) {ServiceInfo serviceInfo = JSON.parseObject(json, ServiceInfo.class);ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {//empty or error push, just ignorereturn oldService;}boolean changed = false;if (oldService != null) {if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime()+ ", new-t: " + serviceInfo.getLastRefTime());}serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());for (Instance host : oldService.getHosts()) {oldHostMap.put(host.toInetAddr(), host);}Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());for (Instance host : serviceInfo.getHosts()) {newHostMap.put(host.toInetAddr(), host);}Set<Instance> modHosts = new HashSet<Instance>();Set<Instance> newHosts = new HashSet<Instance>();Set<Instance> remvHosts = new HashSet<Instance>();List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(newHostMap.entrySet());for (Map.Entry<String, Instance> entry : newServiceHosts) {Instance host = entry.getValue();String key = entry.getKey();if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(),oldHostMap.get(key).toString())) {modHosts.add(host);continue;}if (!oldHostMap.containsKey(key)) {newHosts.add(host);}}for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {Instance host = entry.getValue();String key = entry.getKey();if (newHostMap.containsKey(key)) {continue;}if (!newHostMap.containsKey(key)) {remvHosts.add(host);}}if (newHosts.size() > 0) {changed = true;NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: "+ serviceInfo.getKey() + " -> " + JSON.toJSONString(newHosts));}if (remvHosts.size() > 0) {changed = true;NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: "+ serviceInfo.getKey() + " -> " + JSON.toJSONString(remvHosts));}if (modHosts.size() > 0) {changed = true;NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: "+ serviceInfo.getKey() + " -> " + JSON.toJSONString(modHosts));}serviceInfo.setJsonFromServer(json);if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {eventDispatcher.serviceChanged(serviceInfo);DiskCache.write(serviceInfo, cacheDir);}} else {changed = true;NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JSON.toJSONString(serviceInfo.getHosts()));serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);eventDispatcher.serviceChanged(serviceInfo);serviceInfo.setJsonFromServer(json);DiskCache.write(serviceInfo, cacheDir);}MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());if (changed) {NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() +" -> " + JSON.toJSONString(serviceInfo.getHosts()));}return serviceInfo;
}

2.Nacos 服务注册的原理相关推荐

  1. Nacos服务注册与发现---Nacos简介以及原理

    1. 什么是Nacos ​ ​  ​      ​  Nacos是SpringCloud Alibaba的一个服务治理的一个重要组件,英文全称Dynamic Naming and Configurat ...

  2. Nacos 服务注册与发现原理分析

    Nacos 另一个非常重要的特性就是服务注册与发现,说到服务的注册与发现相信大家应该都不陌生,在微服务盛行的今天,服务是非常重要的,而在 Nacos 中服务更被称为他的一等公民. Nacos 支持几乎 ...

  3. nacis服务注册原理_Nacos 服务注册的原理

    Nacos 服务注册需要具备的能力: 服务提供者把自己的协议地址注册到Nacos server 服务消费者需要从Nacos Server上去查询服务提供者的地址(根据服务名称) Nacos Serve ...

  4. nacos服务注册流程

    nacos服务注册流程 nacos的客户端已经搭建好了,那么客户端是怎么将服务注册到注册中心去的呢. 1. 如果对springboot自动配置原理有一定了解的话,那么第三方框架一般都会通过spi的方式 ...

  5. 二、Nacos服务注册中心应用实践

    Nacos服务注册中心 文章目录 Nacos服务注册中心 一.Nacos服务注册中心 1. 注册中心简介 2. 构建Nacos服务 2.1 准备工作 2.2 下载与安装 2.3 初始化配置 3. 服务 ...

  6. SpringCloud Alibaba微服务实战(二) - Nacos服务注册与restTemplate消费

    说在前面 基础环境搭建,理论,请看上一篇,在这就不扯理论了,直接上代码. 项目结构 代码实现 第一步:在父pom的项目中引入dependencyManagement 在引入父pom之前咱们先来回顾下d ...

  7. Spring Cloud微服务之Nacos服务注册(九)

    Nacos服务注册 前言 具体步骤 1.在service模块配置pom.xml 2.添加服务配置信息 3.添加Nacos客户端注解 4.启动客户端微服务 最后 前言 上一篇中我们介绍完了Nacos的基 ...

  8. 微服务2——服务的注册,调用(Nacos服务注册中心+服务调用+调用负载均衡)sca-comsumersca-provider

    一.Nacos的安装和构建  以及启动 其官网地址如下: Nacos官网 1.安装前提: 第一:确保你电脑已配置JAVA_HOME环境变量(Nacos启动时需要),例如: 第二:确保你的MySQL版本 ...

  9. Nacos服务注册流程(一)

    Nacos服务注册主流程 目录 一.准备事项 1.1 Nacos源码下载 1.2 启动Nacos服务端项目 1.3 启动Nacos客户端项目 二.Nacos客户端注册流程 2.1 注册方法的调用 2. ...

最新文章

  1. 前端学习(1732):前端系列javascript之插入内容
  2. 树莓派使用STEP7:安装wiringPi硬件外设驱动C库
  3. Little Elephant and Shifts(CF-220C)
  4. 消防给水及消火栓系统技术规范_对于高位消防水箱《消防给水及消火栓系统技术规范》是如何规定...
  5. theano 深度学习大全
  6. JavaScript数据类型之typeof检测变量数据类型(5)
  7. Java 8 Stream
  8. SHA256算法原理详解
  9. 设置服务器网站播放flv视频文件,网页制作 flvplayer.swf无法播放服务器上flv文件 如何设置...
  10. CCF优秀博士学位论文奖初评名单出炉!清华入选4人,数量第一
  11. 东子破解的java设计模式状态模式
  12. POI-HSSFWorkbook合并单元格边框及文字居中问题
  13. 0514课堂笔记--抽象类-接口
  14. 线程开的越多就越好吗|趣谈线程池
  15. 在线绘图网站文图使用教程
  16. Sketch(三)——插件
  17. web前端-综合应用案例-二维码名片的制作-educoder
  18. 迷你世界进云服务器需要密码,迷你世界云服务器
  19. Transfer Learning Toolkit (TLT) + DeepStream (DS)快速部署深度学习模型(以口罩检测为例)
  20. 按键精灵:函数之可选参数

热门文章

  1. 8道2021年美团C++/Java最新面试真题,你能做对几道?(含答案)
  2. 搜索不到投屏设备怎么办_电视投屏搜索不到设备解决方案
  3. [推荐]零售业CRM应用突破之道 穆穆-movno1 (入选推荐日志,加10币)
  4. python里randint是什么意思_Python中random.randint方法(精选)
  5. c语言程序实现进程的管道通信,C 进程间通信--命名管道通信代码实现及其原理图示...
  6. 耿丹CS16-2班课堂测试作业汇总
  7. sau交流学习社区--看小说的lovebook一个无线端BS应用
  8. OpenGL(5)Texture - 两张图片
  9. 软考一般什么时候出成绩呢?
  10. 软考 计算机 都有什么考试内容,了解软考是什么 软考考试流程都包括哪些