2.Nacos 服务注册的原理
Nacos服务注册需要具备的能力
- 服务提供者把自己的协议地址注册到Nacos server
- 服务消费者需要从Nacos Server上去查询服务提供者的地址(根据服务名称)
- Nacos Server需要感知到服务提供者的上下线的变化
- 服务消费者需要动态感知到Nacos Server端服务地址的变化
Nacos API
SDK(底层也是基于open Api调用) / OPEN API(Rest 接口)
官网
服务注册
void registerInstance(String serviceName, String ip, int port) throws NacosException;void registerInstance(String serviceName, String ip, int port, String clusterName) throws NacosException;void registerInstance(String serviceName, Instance instance) throws NacosException;
请求参数
名称 | 类型 | 描述 |
---|---|---|
serviceName | 字符串 | 服务名 |
ip | 字符串 | 服务实例IP |
port | int | 服务实例port |
clusterName | 字符串 | 集群名 |
instance | 参见代码注释 | 实例属性 |
请求示例
NamingService naming = NamingFactory.createNamingService(System.getProperty("serveAddr"));
naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1");Instance instance = new Instance();
instance.setIp("55.55.55.55");
instance.setPort(9999);
instance.setHealthy(false);
instance.setWeight(2.0);
Map<String, String> instanceMeta = new HashMap<>();
instanceMeta.put("site", "et2");
instance.setMetadata(instanceMeta);Service service = new Service("nacos.test.4");
service.setApp("nacos-naming");
service.sethealthCheckMode("server");
service.setEnableHealthCheck(true);
service.setProtectThreshold(0.8F);
service.setGroup("CNCF");
Map<String, String> serviceMeta = new HashMap<>();
serviceMeta.put("symmetricCall", "true");
service.setMetadata(serviceMeta);
instance.setService(service);Cluster cluster = new Cluster();
cluster.setName("TEST5");
AbstractHealthChecker.Http healthChecker = new AbstractHealthChecker.Http();
healthChecker.setExpectedResponseCode(400);
healthChecker.setCurlHost("USer-Agent|Nacos");
healthChecker.setCurlPath("/xxx.html");
cluster.setHealthChecker(healthChecker);
Map<String, String> clusterMeta = new HashMap<>();
clusterMeta.put("xxx", "yyyy");
cluster.setMetadata(clusterMeta);instance.setCluster(cluster);naming.registerInstance("nacos.test.4", instance);
服务查询
List<Instance> getAllInstances(String serviceName) throws NacosException;List<Instance> getAllInstances(String serviceName, List<String> clusters) throws NacosException;
请求参数
名称 | 类型 | 描述 |
---|---|---|
serviceName | 字符串 | 服务名 |
clusters | List | 集群列表 |
请求示例
NamingService naming = NamingFactory.createNamingService(System.getProperty("serveAddr"));
System.out.println(naming.getAllInstances("nacos.test.3"));
心跳机制
- 心跳的发送间隔
- 心跳的超时时间
- 设置一个心跳超时的阈值
- 记录针对于某一个服务实例的最后一次更新的时间
- 当前时间-当前实例最后一次更新的时间> 心跳超时的阈值
Nacos的实现原理图
Nacos的源码分析
Nacos客户端注册的流程
Dubbo服务注册的流程有两个,一个是和之前分析Eureka源码时的路径一样(参考Eureka源码分析)
另一个是基于Dubbo本身提供的自动装配机制,而在基于Dubbo服务发布的过程中, 是走的事件监听机制,在 DubboServiceRegistrationNonWebApplicationAutoConfiguration 这个类中,这个类会 监听 ApplicationStartedEvent 事件,这个事件是spring boot在2.0新增的,就是当spring boot应用 启动完成之后会发布这个事件。而此时监听到这个事件之后,会触发注册的动作。
Spring Cloud Nacos 继承Dubbo
DubboServiceRegistrationNonWebApplicationAutoConfiguration
//监听ApplicationStartedEvent事件,进行服务的注册
@EventListener(ApplicationStartedEvent.class)
public void onApplicationStarted() {setServerPort();register();
}private void register() {if (registered) {return;}serviceRegistry.register(registration);registered = true;
}
this.serviceRegistry。 是一个注入的实例: NacosServiceRegistry
NacosServiceRegistry.register
@Override
public void register(Registration registration) {if (StringUtils.isEmpty(registration.getServiceId())) {log.warn("No service to register for nacos client...");return;}//serviceId对应当前应用的application.nameString serviceId = registration.getServiceId();//group表示nacos上的分组配置String group = nacosDiscoveryProperties.getGroup();//instance表示服务实例信息Instance instance = getNacosInstanceFromRegistration(registration);try {namingService.registerInstance(serviceId, group, instance);log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,instance.getIp(), instance.getPort());}catch (Exception e) {log.error("nacos registry, {} register failed...{},", serviceId,registration.toString(), e);// rethrow a RuntimeException if the registration is failed.// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132rethrowRuntimeException(e);}
}
NacosNamingService.registerInstance
开始注册实例,主要做两个动作
- 如果当前注册的是临时节点,则构建心跳信息,通过beat反应堆来构建心跳任务
- 调用registerService发起服务注册
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {//是否是临时节点,如果是临时节点,则构建心跳信息if (instance.isEphemeral()) {BeatInfo beatInfo = new BeatInfo();beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));beatInfo.setIp(instance.getIp());beatInfo.setPort(instance.getPort());beatInfo.setCluster(instance.getClusterName());beatInfo.setWeight(instance.getWeight());beatInfo.setMetadata(instance.getMetadata());beatInfo.setScheduled(false);beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());//beatReactor, 添加心跳信息进行处理beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);}serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
NamingProxy.registerService
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",namespaceId, serviceName, instance);final Map<String, String> params = new HashMap<String, String>(9);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put(CommonParams.GROUP_NAME, groupName);params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());params.put("ip", instance.getIp());params.put("port", String.valueOf(instance.getPort()));params.put("weight", String.valueOf(instance.getWeight()));params.put("enable", String.valueOf(instance.isEnabled()));params.put("healthy", String.valueOf(instance.isHealthy()));params.put("ephemeral", String.valueOf(instance.isEphemeral()));params.put("metadata", JSON.toJSONString(instance.getMetadata()));reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);}
NamingProxy.reqAPI
public String reqAPI(String api, Map<String, String> params, String body, List<String> servers, String method) throws NacosException {params.put(CommonParams.NAMESPACE_ID, getNamespaceId());if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {throw new NacosException(NacosException.INVALID_PARAM, "no server available");}NacosException exception = new NacosException();//服务列表不为空的话,对应application.properties中配置的spring.cloud.nacos.discovery.server-addr的Nacos注册中心列表if (servers != null && !servers.isEmpty()) {Random random = new Random(System.currentTimeMillis());int index = random.nextInt(servers.size());//随机选取一个服务进行注册for (int i = 0; i < servers.size(); i++) {String server = servers.get(index);try {//调用指定服务return callServer(api, params, body, server, method);} catch (NacosException e) {exception = e;if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("request {} failed.", server, e);}}//轮询index = (index + 1) % servers.size();}}if (StringUtils.isNotBlank(nacosDomain)) {for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {try {return callServer(api, params, body, nacosDomain, method);} catch (NacosException e) {exception = e;if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);}}}}NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}",api, servers, exception.getErrCode(), exception.getErrMsg());throw new NacosException(exception.getErrCode(), "failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "+ exception.getMessage());}
NamingProxy.callServer
public String callServer(String api, Map<String, String> params, String body, String curServer, String method)throws NacosException {long start = System.currentTimeMillis();long end = 0;//添加签名信息,injectSecurityInfo(params);//添加头信息List<String> headers = builderHeaders();String url;if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {url = curServer + api;} else {if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) {curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort;}url = HttpClient.getPrefix() + curServer + api;}//发起HttpClient进行注册HttpClient.HttpResult result = HttpClient.request(url, headers, params, body, UtilAndComs.ENCODING, method);end = System.currentTimeMillis();//上报Metrice进行监控MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code)).observe(end - start);if (HttpURLConnection.HTTP_OK == result.code) {return result.content;}if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) {return StringUtils.EMPTY;}throw new NacosException(result.code, result.content);
}
HttpClient.request
public static HttpResult request(String url, List<String> headers, Map<String, String> paramValues, String body, String encoding, String method) {HttpURLConnection conn = null;try {String encodedContent = encodingParams(paramValues, encoding);url += (StringUtils.isEmpty(encodedContent)) ? "" : ("?" + encodedContent);conn = (HttpURLConnection) new URL(url).openConnection();setHeaders(conn, headers, encoding);conn.setConnectTimeout(CON_TIME_OUT_MILLIS);conn.setReadTimeout(TIME_OUT_MILLIS);conn.setRequestMethod(method);conn.setDoOutput(true);if (StringUtils.isNotBlank(body)) {byte[] b = body.getBytes();conn.setRequestProperty("Content-Length", String.valueOf(b.length));conn.getOutputStream().write(b, 0, b.length);conn.getOutputStream().flush();conn.getOutputStream().close();}conn.connect();if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("Request from server: " + url);}return getResult(conn);} catch (Exception e) {try {if (conn != null) {NAMING_LOGGER.warn("failed to request " + conn.getURL() + " from "+ InetAddress.getByName(conn.getURL().getHost()).getHostAddress());}} catch (Exception e1) {NAMING_LOGGER.error("[NA] failed to request ", e1);//ignore}NAMING_LOGGER.error("[NA] failed to request ", e);return new HttpResult(500, e.toString(), Collections.<String, String>emptyMap());} finally {IoUtils.closeQuietly(conn);}
}
Nacos服务端的注册流程
服务端提供了一个InstanceController类,在这个类中提供了服务注册相关的API,而服务端发起初测 时,调用的接口是: [post]: /nacos/v1/ns/instance
InstanceController.register
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {//serviceName表示客户端服务名称final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);//namespaceId表示nacos的namespacefinal String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);// //从请求中解析出instance实例final Instance instance = parseInstance(request);serviceManager.registerInstance(namespaceId, serviceName, instance);return "ok";
}
ServiceManager.registerInstance
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {//创建一个空服务,在Nacos控制台服务列表展示的服务信息,实际上是初始化一个serviceMap,它是一个ConcurrentHashMap集合createEmptyService(namespaceId, serviceName, instance.isEphemeral());//从serviceMap中,根据namespaceId和serviceName得到一个服务对象Service service = getService(namespaceId, serviceName);if (service == null) {throw new NacosException(NacosException.INVALID_PARAM,"service not found, namespace: " + namespaceId + ", service: " + serviceName);}//调用addInstance创建一个服务实例addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
ServiceManager.createServiceIfAbsent
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)throws NacosException {//根据namespaceId,serviceName(group:serviceName)确定一个服务Service service = getService(namespaceId, serviceName);//service如果为空,说明是第一次创建,构建一个新的Serviceif (service == null) {Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);service = new Service();service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(NamingUtils.getGroupName(serviceName));// now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();if (cluster != null) {cluster.setService(service);service.getClusterMap().put(cluster.getName(), cluster);}service.validate();//存储到Map中putServiceAndInit(service);if (!local) {addOrReplaceService(service);}}
}
ServiceManager.getService
存储的结构
key namespaceId value Map<String, Service>
key group::serviceName value: Service
Service
List instances;
/**
* Map(namespace, Map(group::serviceName, Service)).
*/
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();public Service getService(String namespaceId, String serviceName) {if (serviceMap.get(namespaceId) == null) {return null;}return chooseServiceMap(namespaceId).get(serviceName);
}
ServiceManager.putServiceAndInit
private void putServiceAndInit(Service service) throws NacosException {//将注册的服务添加到Map中putService(service);//初始化心跳检测机制,检测Server下不同的Insatance之间的心跳service.init();//实现数据一致性监听,ephemeral=true表示采用raft协议,false表示采用DistroconsistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
ServiceManager.putService
将新注册的实例添加到同一个namespaceId下的map中
public void putService(Service service) {if (!serviceMap.containsKey(service.getNamespaceId())) {synchronized (putServiceLock) {if (!serviceMap.containsKey(service.getNamespaceId())) {serviceMap.put(service.getNamespaceId(), new ConcurrentHashMap<>(16));}}}serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}
ServiceManager.addInstance
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)throws NacosException {String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);//Service{name='DEFAULT_GROUP@@fireweed-manager', protectThreshold=0.0, appName='null', groupName='DEFAULT_GROUP', metadata={}}Service service = getService(namespaceId, serviceName);synchronized (service) {//把服务实例添加到集合中,然后基于一致性协议进行数据的同步。List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);Instances instances = new Instances();instances.setInstanceList(instanceList);consistencyService.put(key, instances);}
}
消费者的服务查询
启动的时候主要做两件事情
从Nacos Server中读取指定服务名称的实例列表,缓存到本地内存中
开启一个定时任务,每隔10s去轮询一次服务列表
服务注册成功之后,消费者就可以从nacos server中获取到服务提供者的地址,然后进行服务的调用。 在服务消费中,有一个核心的类 NacosDiscoveryClient 来负责和nacos交互,去获得服务提供者的地 址信息。前置的具体的流程就不在这里复述了,之前在讲dubbo源码的时候已经分析过服务的订阅过 程。 NacosDiscoveryClient 中提供了一个 getInstances 方法用来根据服务提供者名称获取服务提供者的 url地址的方法.
1.客户端启动获取服务列表(从Nacos Server中读取指定服务名称的实例列表,缓存到本地内存中)
NacosDiscoveryClient.getInstances
public List<ServiceInstance> getInstances(String serviceId) {try {List<Instance> instances = this.discoveryProperties.namingServiceInstance().selectInstances(serviceId, true);return hostToServiceInstanceList(instances, serviceId);} catch (Exception var3) {throw new RuntimeException("Can not get hosts from nacos server. serviceId: " + serviceId, var3);}
}
调用NamingService,根据serviceId、group获得服务实例列表。 然后把instance转化为ServiceInstance对象
NacosNamingService.selectInstances
selectInstances首先从hostReactor获取serviceInfo,然后再从serviceInfo.getHosts()剔除非healty、 非enabled、weight小于等于0的instance再返回;如果subscribe为true,则执行 hostReactor.getServiceInfo获取serviceInfo,否则执行 hostReactor.getServiceInfoDirectlyFromServer获取serviceInfo
@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {ServiceInfo serviceInfo;//是否需要订阅,由于我们要感知服务端的变化,肯定需要进行订阅if (subscribe) {//hostReactor主机反应堆,这里建立订阅机制,主要是两件事:①.push机制 ②.每隔10s客户端的短轮询serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));} else {serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));}return selectInstances(serviceInfo, healthy);
}
HostReactor.getServiceInfo
有两个逻辑,分别是
- updateServiceNow, 立马从Nacos server中去加载服务地址信息
- scheduleUpdateIfAbsent 开启定时调度,每1s去查询一次服务地址
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());//拼接服务名称+集群名称(默认为空)String key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}//根据ServiceName,cluster先从本地缓存中获取ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);//如果为null,则创建一个新的然后放入serviceInfoMap,同时放入updatingMap,执行 updateServiceNow,再从updatingMap移除; if (null == serviceObj) {//构建一个ServiceInfoserviceObj = new ServiceInfo(serviceName, clusters);//存储到serviceInfoMap本地缓存中serviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());//调用Nacos服务端获取实例信息updateServiceNow(serviceName, clusters);updatingMap.remove(serviceName);} else if (updatingMap.containsKey(serviceName)) {//如果从serviceInfoMap找出来的serviceObj在updatingMap中则等待UPDATE_HOLD_INTERVALif (UPDATE_HOLD_INTERVAL > 0) {// hold a moment waiting for update finishsynchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);}}}}//如果本地缓存中存在,则通过scheduleUpdateIfAbsent开启定时任务,再从serviceInfoMap取出 serviceInfo scheduleUpdateIfAbsent(serviceName, clusters);return serviceInfoMap.get(serviceObj.getKey());
}
HostReactor.getServiceInfo0
private Map<String, ServiceInfo> serviceInfoMap;private ServiceInfo getServiceInfo0(String serviceName, String clusters) {//构建keyString key = ServiceInfo.getKey(serviceName, clusters);//根据key从本地缓存中获取return serviceInfoMap.get(key);
}
HostReactor.updateServiceNow
private NamingProxy serverProxy;public void updateServiceNow(String serviceName, String clusters) {ServiceInfo oldService = getServiceInfo0(serviceName, clusters);try {//调用NamingProxy远程获取地址列表,这里的pushReceiver.getUDPPort()在查询列表的列表的时候会传递UDP端口,当服务端有地址列表变化的时候,基于此udp端口,推送push给客户端,客户端监听此端口,更新本地缓存String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);if (StringUtils.isNotEmpty(result)) {//解析获取到结果,保存到本地缓存中processServiceJSON(result);}} catch (Exception e) {NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);} finally {if (oldService != null) {synchronized (oldService) {oldService.notifyAll();}}}
}
HostReactor.processServiceJSON
public ServiceInfo processServiceJSON(String json) {//解析ServiceInfo serviceInfo = JSON.parseObject(json, ServiceInfo.class);//根据key从本地缓存中获取ServiceInfoServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {//empty or error push, just ignorereturn oldService;}boolean changed = false;//oldService不为null,更新本地缓存if (oldService != null) {if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime()+ ", new-t: " + serviceInfo.getLastRefTime());}serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());for (Instance host : oldService.getHosts()) {oldHostMap.put(host.toInetAddr(), host);}Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());for (Instance host : serviceInfo.getHosts()) {newHostMap.put(host.toInetAddr(), host);}Set<Instance> modHosts = new HashSet<Instance>();Set<Instance> newHosts = new HashSet<Instance>();Set<Instance> remvHosts = new HashSet<Instance>();List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(newHostMap.entrySet());for (Map.Entry<String, Instance> entry : newServiceHosts) {Instance host = entry.getValue();String key = entry.getKey();if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(),oldHostMap.get(key).toString())) {modHosts.add(host);continue;}if (!oldHostMap.containsKey(key)) {newHosts.add(host);}}for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {Instance host = entry.getValue();String key = entry.getKey();if (newHostMap.containsKey(key)) {continue;}if (!newHostMap.containsKey(key)) {remvHosts.add(host);}}if (newHosts.size() > 0) {changed = true;NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: "+ serviceInfo.getKey() + " -> " + JSON.toJSONString(newHosts));}if (remvHosts.size() > 0) {changed = true;NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: "+ serviceInfo.getKey() + " -> " + JSON.toJSONString(remvHosts));}if (modHosts.size() > 0) {changed = true;NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: "+ serviceInfo.getKey() + " -> " + JSON.toJSONString(modHosts));}serviceInfo.setJsonFromServer(json);if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {eventDispatcher.serviceChanged(serviceInfo);DiskCache.write(serviceInfo, cacheDir);}} else {//直接缓存changed = true;NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JSON.toJSONString(serviceInfo.getHosts()));serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);eventDispatcher.serviceChanged(serviceInfo);serviceInfo.setJsonFromServer(json);DiskCache.write(serviceInfo, cacheDir);}MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());if (changed) {NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() +" -> " + JSON.toJSONString(serviceInfo.getHosts()));}return serviceInfo;
}
NamingProxy.queryList
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)throws NacosException {final Map<String, String> params = new HashMap<String, String>(8);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put("clusters", clusters);params.put("udpPort", String.valueOf(udpPort));params.put("clientIP", NetUtils.localIP());params.put("healthyOnly", String.valueOf(healthyOnly));return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET);
}
服务端处理查询列表请求
InstanceController.list
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {//获取namespaceIdString namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);//获取serviceNameString serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);String agent = WebUtils.getUserAgent(request);//获取cluster集群名称String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);//获取udpPortint udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));String env = WebUtils.optional(request, "env", StringUtils.EMPTY);//获取是否开启检测(例如Eureka的自我保护机制)boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));String app = WebUtils.optional(request, "app", StringUtils.EMPTY);String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);//获取是否健康状态boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,healthyOnly);
}
InstanceController.doSrvIpxt
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {ClientInfo clientInfo = new ClientInfo(agent);ObjectNode result = JacksonUtils.createEmptyJsonNode();//根据namespaceId,serviceName获取服务ServiceService service = serviceManager.getService(namespaceId, serviceName);//如果不存在,构建一个空集合返回if (service == null) {if (Loggers.SRV_LOG.isDebugEnabled()) {Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);}result.put("name", serviceName);result.put("clusters", clusters);result.replace("hosts", JacksonUtils.createEmptyArrayNode());return result;}checkIfDisabled(service);long cacheMillis = switchDomain.getDefaultCacheMillis();//判断updPort端口是否大于0,并且开启自动推送// now try to enable the pushtry {if (udpPort > 0 && pushService.canEnablePush(agent)) {//将客户端的信息添加到PushaService,专门用来当服务端的地址列表发送变化的时候通过upd协议推送给客户端pushService.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),pushDataSource, tid, app);cacheMillis = switchDomain.getPushCacheMillis(serviceName);}} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);cacheMillis = switchDomain.getDefaultCacheMillis();}List<Instance> srvedIPs;srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));// filter ips using selector:if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {srvedIPs = service.getSelector().select(clientIP, srvedIPs);}if (CollectionUtils.isEmpty(srvedIPs)) {if (Loggers.SRV_LOG.isDebugEnabled()) {Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);}if (clientInfo.type == ClientInfo.ClientType.JAVA&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {result.put("dom", serviceName);} else {result.put("dom", NamingUtils.getServiceName(serviceName));}result.put("hosts", JacksonUtils.createEmptyArrayNode());result.put("name", serviceName);result.put("cacheMillis", cacheMillis);result.put("lastRefTime", System.currentTimeMillis());result.put("checksum", service.getChecksum());result.put("useSpecifiedURL", false);result.put("clusters", clusters);result.put("env", env);result.put("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));return result;}Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);//TRUE:实例时正常的状态,FALSE:不正常的状态ipMap.put(Boolean.TRUE, new ArrayList<>());ipMap.put(Boolean.FALSE, new ArrayList<>());//获取服务的健康状态,添加到ipMap中for (Instance ip : srvedIPs) {ipMap.get(ip.isHealthy()).add(ip);}//判断是否开启检查if (isCheck) {result.put("reachProtectThreshold", false);}double threshold = service.getProtectThreshold();//判断是否需要开启自我保护机制,正常的服务总数/总的服务数<= 阈值,if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName);if (isCheck) {result.put("reachProtectThreshold", true);}//将健康和不健康的实例全部添加到ipMap中返回ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));ipMap.get(Boolean.FALSE).clear();}if (isCheck) {result.put("protectThreshold", service.getProtectThreshold());result.put("reachLocalSiteCallThreshold", false);return JacksonUtils.createEmptyJsonNode();}ArrayNode hosts = JacksonUtils.createEmptyArrayNode();for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {List<Instance> ips = entry.getValue();if (healthyOnly && !entry.getKey()) {continue;}for (Instance instance : ips) {// remove disabled instance:if (!instance.isEnabled()) {continue;}ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();ipObj.put("ip", instance.getIp());ipObj.put("port", instance.getPort());// deprecated since nacos 1.0.0:ipObj.put("valid", entry.getKey());ipObj.put("healthy", entry.getKey());ipObj.put("marked", instance.isMarked());ipObj.put("instanceId", instance.getInstanceId());ipObj.put("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));ipObj.put("enabled", instance.isEnabled());ipObj.put("weight", instance.getWeight());ipObj.put("clusterName", instance.getClusterName());if (clientInfo.type == ClientInfo.ClientType.JAVA&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {ipObj.put("serviceName", instance.getServiceName());} else {ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));}ipObj.put("ephemeral", instance.isEphemeral());hosts.add(ipObj);}}//组装数据返回result.replace("hosts", hosts);if (clientInfo.type == ClientInfo.ClientType.JAVA&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {result.put("dom", serviceName);} else {result.put("dom", NamingUtils.getServiceName(serviceName));}result.put("name", serviceName);result.put("cacheMillis", cacheMillis);result.put("lastRefTime", System.currentTimeMillis());result.put("checksum", service.getChecksum());result.put("useSpecifiedURL", false);result.put("clusters", clusters);result.put("env", env);result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));return result;
}
服务的订阅,实现服务的动态更新
2.开启一个定时任务,每隔10s去轮询一次服务列表
NacosNamingService.init
public NacosNamingService(Properties properties) {init(properties);
}private void init(Properties properties) {namespace = InitUtils.initNamespaceForNaming(properties);initServerAddr(properties);InitUtils.initWebRootContext();initCacheDir();initLogName(properties);eventDispatcher = new EventDispatcher();serverProxy = new NamingProxy(namespace, endpoint, serverList, properties);//心跳检查BeatReactorbeatReactor = new BeatReactor(serverProxy, initClientBeatThreadCount(properties));//定时轮询检查HostReactorhostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir, isLoadCacheAtStart(properties),initPollingThreadCount(properties));
}
HostReactor构造方法
public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir,boolean loadCacheAtStart, int pollingThreadCount) {//开启一个线程executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.client.naming.updater");return thread;}});this.eventDispatcher = eventDispatcher;this.serverProxy = serverProxy;this.cacheDir = cacheDir;if (loadCacheAtStart) {this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));} else {this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);}this.updatingMap = new ConcurrentHashMap<String, Object>();this.failoverReactor = new FailoverReactor(this, cacheDir);//初始化PushReceiver,初始化一个服务监听的订阅this.pushReceiver = new PushReceiver(this);
}
PushReceiver构造方法
public PushReceiver(HostReactor hostReactor) {try {this.hostReactor = hostReactor;udpSocket = new DatagramSocket();//开启一个线程池,不断接受服务端传递过来的数据executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.naming.push.receiver");return thread;}});//调用run方法executorService.execute(this);} catch (Exception e) {NAMING_LOGGER.error("[NA] init udp socket failed", e);}
}
@Override
public void run() {//通过while循环不断监听客户端Nacos Server传递过来的数据,实现一个Push的机制while (true) {try {// byte[] is initialized with 0 full filled by defaultbyte[] buffer = new byte[UDP_MSS];DatagramPacket packet = new DatagramPacket(buffer, buffer.length);//初始化一个监听,不断接受客户端Nacos Server传递过来的数据udpSocket.receive(packet);String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);String ack;if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {hostReactor.processServiceJSON(pushPacket.data);// send ack to serverack = "{\"type\": \"push-ack\""+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime+ "\", \"data\":" + "\"\"}";} else if ("dump".equals(pushPacket.type)) {// dump data to serverack = "{\"type\": \"dump-ack\""+ ", \"lastRefTime\": \"" + pushPacket.lastRefTime+ "\", \"data\":" + "\""+ StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))+ "\"}";} else {// do nothing send ack onlyack = "{\"type\": \"unknown-ack\""+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime+ "\", \"data\":" + "\"\"}";}udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));} catch (Exception e) {NAMING_LOGGER.error("[NA] error while receiving push data", e);}}
}
HostReactor.updateServiceNow
在第一次启动的时候查看服务列表的时候,会将自己的ip以及upd端口传递给Nacso Server端,当Nacos Server端服务地址列表发生变化的时候,会基于此端口进行推送,从而能够让上面udpSocket.receive(packet)接收到地址列表的变化
NacosNamingService.getAllInstances--->HostReactor.getServiceInfo---->HostReactor.updateServiceNow
public void updateServiceNow(String serviceName, String clusters) {ServiceInfo oldService = getServiceInfo0(serviceName, clusters);try {String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);if (StringUtils.isNotEmpty(result)) {processServiceJSON(result);}} catch (Exception e) {NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);} finally {if (oldService != null) {synchronized (oldService) {oldService.notifyAll();}}}}
NacosNamingService.getServiceInfo
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());String key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);if (null == serviceObj) {serviceObj = new ServiceInfo(serviceName, clusters);serviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());updateServiceNow(serviceName, clusters);updatingMap.remove(serviceName);} else if (updatingMap.containsKey(serviceName)) {if (UPDATE_HOLD_INTERVAL > 0) {// hold a moment waiting for update finishsynchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);}}}}//开启一个定时任务,客户端每隔10s轮询一次,这个任务会在默认在1s之后开 始执行。而任务的具体实现是一个UpdateTask。scheduleUpdateIfAbsent(serviceName, clusters);return serviceInfoMap.get(serviceObj.getKey());
}
HostReactor.scheduleUpdateIfAbsent
private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();public void scheduleUpdateIfAbsent(String serviceName, String clusters) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}synchronized (futureMap) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}//添加一个任务UpdateTaskScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);}
}
UpdateTask
public class UpdateTask implements Runnable {long lastRefTime = Long.MAX_VALUE;private String clusters;private String serviceName;public UpdateTask(String serviceName, String clusters) {this.serviceName = serviceName;this.clusters = clusters;}@Overridepublic void run() {try {//先从本地缓存中获取服务信息ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));//如果为null,说明本地没有,需要从服务端获取if (serviceObj == null) {//调用updateServiceNow从服务端获取地址列表updateServiceNow(serviceName, clusters);//将这个定时任务重新开启,实现重复的轮询功能executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);return;}//判断服务是否已过期,当前服务的最后一次更新时间 <= 全局的最后一次更新if (serviceObj.getLastRefTime() <= lastRefTime) {//调用updateServiceNow从服务端获取地址列表,更新服务列表updateServiceNow(serviceName, clusters);serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));} else {// if serviceName already updated by push, we should not override it// since the push data may be different from pull through force push//如果服务已经被基于push机制的情况下做了更新,那么我们不需要覆盖本地服务。 //因为push过来的数据和pull数据不同,所以这里只是调用请求去刷新服务 refreshOnly(serviceName, clusters);}//更新最后一次刷新时间 lastRefTime = serviceObj.getLastRefTime();//如果没有实现订阅或者futureMap中不包含指定服务信息,则中断更新请求if (!eventDispatcher.isSubscribed(serviceName, clusters) &&!futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {// abort the update task:NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);return;}//延后10s执行,实现重复轮询 executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);} catch (Throwable e) {NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);}}
}
3.push服务端请求推送数据
在服务端处理注册请求的时候,InstanceController.registerInstance ----> ServiceManager.registerInstance
----> ServiceManager.createEmptyService—>ServiceManager.createServiceIfAbsent---->ServiceManager.putServiceAndInit
ServiceManager.putServiceAndInit
private void putServiceAndInit(Service service) throws NacosException {putService(service);//开启一个健康检查的定时任务,用来检测各实例主键的健康状态service.init();consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
Service.init
这个init方法,会和当前服务提供者建立一个心跳检测机制,这个心跳检测会每5s执行一次
private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);public void init() {//健康检测的定时任务,HealthCheckReactor.scheduleCheck(clientBeatCheckTask);for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);entry.getValue().init();}
}
HealthCheckReactor.scheduleCheck
第一次启动之后5s执行,之后每次延时5s执行
public static void scheduleCheck(ClientBeatCheckTask task) {futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}
ClientBeatCheckTask.run
public class ClientBeatCheckTask implements Runnable {private Service service;public ClientBeatCheckTask(Service service) {this.service = service;}@JsonIgnorepublic PushService getPushService() {return ApplicationUtils.getBean(PushService.class);}@JsonIgnorepublic DistroMapper getDistroMapper() {return ApplicationUtils.getBean(DistroMapper.class);}public GlobalConfig getGlobalConfig() {return ApplicationUtils.getBean(GlobalConfig.class);}public SwitchDomain getSwitchDomain() {return ApplicationUtils.getBean(SwitchDomain.class);}public String taskKey() {return KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName());}@Overridepublic void run() {try {if (!getDistroMapper().responsible(service.getName())) {return;}if (!getSwitchDomain().isHealthCheckEnabled()) {return;}List<Instance> instances = service.allIPs(true);// first set health status of instances://遍历服务节点进行心跳检测for (Instance instance : instances) {//如果服务实例的最后一次心跳时间大于设置的超时时间,则认为这个服务已经下线。 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {if (instance.isHealthy()) {//设置实例为非健康状态instance.setHealthy(false);Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",instance.getIp(), instance.getPort(), instance.getClusterName(),service.getName(), UtilsAndCommons.LOCALHOST_SITE,instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());//发起一个实例变更的推送,推送服务变更事件。getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}// then remove obsolete instances:for (Instance instance : instances) {if (instance.isMarked()) {continue;}if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),JacksonUtils.toJson(instance));//删除过期的服务实deleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}private void deleteIp(Instance instance) {try {NamingProxy.Request request = NamingProxy.Request.newRequest();request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort())).appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName()).appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId());String url = "http://127.0.0.1:" + ApplicationUtils.getPort() + ApplicationUtils.getContextPath()+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();// delete instance asynchronously:HttpClient.asyncHttpDelete(url, null, null, new AsyncCompletionHandler() {@Overridepublic Object onCompleted(Response response) throws Exception {if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {Loggers.SRV_LOG.error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}",instance.toJson(), response.getResponseBody(), response.getStatusCode());}return null;}});} catch (Exception e) {Loggers.SRV_LOG.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e);}}
}
PushService.serviceChanged
public void serviceChanged(Service service) {// merge some change events to reduce the push frequency:if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {return;}//发布一个事件变更的通知this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
}
PushService.onApplicationEvent(开启一个事件监听)
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {@Overridepublic void onApplicationEvent(ServiceChangeEvent event) {Service service = event.getService();String serviceName = service.getName();String namespaceId = service.getNamespaceId();Future future = GlobalExecutor.scheduleUdpSender(() -> {try {Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));if (MapUtils.isEmpty(clients)) {return;}Map<String, Object> cache = new HashMap<>(16);long lastRefTime = System.nanoTime();for (PushClient client : clients.values()) {if (client.zombie()) {Loggers.PUSH.debug("client is zombie: " + client.toString());clients.remove(client.toString());Loggers.PUSH.debug("client is zombie: " + client.toString());continue;}Receiver.AckEntry ackEntry;Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());byte[] compressData = null;Map<String, Object> data = null;if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);compressData = (byte[]) (pair.getValue0());data = (Map<String, Object>) pair.getValue1();Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());}if (compressData != null) {ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);} else {ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);if (ackEntry != null) {cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));}}Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",client.getServiceName(), client.getAddrStr(), client.getAgent(),(ackEntry == null ? null : ackEntry.key));//推送数据包udpPush(ackEntry);}} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);} finally {futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));}}, 1000, TimeUnit.MILLISECONDS);futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);}
}
PushService.udpPush
private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {if (ackEntry == null) {Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");return null;}if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);ackMap.remove(ackEntry.key);udpSendTimeMap.remove(ackEntry.key);failedPush += 1;return ackEntry;}try {if (!ackMap.containsKey(ackEntry.key)) {totalPush++;}ackMap.put(ackEntry.key, ackEntry);udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());Loggers.PUSH.info("send udp packet: " + ackEntry.key);//基于UDPSocket发起请求推送PushudpSocket.send(ackEntry.origin);ackEntry.increaseRetryTime();GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);return ackEntry;} catch (Exception e) {Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,ackEntry.origin.getAddress().getHostAddress(), e);ackMap.remove(ackEntry.key);udpSendTimeMap.remove(ackEntry.key);failedPush += 1;return null;}
}
客户端监听此推送
PushReceiver.run
@Override
public void run() {while (true) {try {// byte[] is initialized with 0 full filled by defaultbyte[] buffer = new byte[UDP_MSS];DatagramPacket packet = new DatagramPacket(buffer, buffer.length);udpSocket.receive(packet);String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);String ack;if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {//解析并更新本地缓存hostReactor.processServiceJSON(pushPacket.data);// send ack to serverack = "{\"type\": \"push-ack\""+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime+ "\", \"data\":" + "\"\"}";} else if ("dump".equals(pushPacket.type)) {// dump data to serverack = "{\"type\": \"dump-ack\""+ ", \"lastRefTime\": \"" + pushPacket.lastRefTime+ "\", \"data\":" + "\""+ StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))+ "\"}";} else {// do nothing send ack onlyack = "{\"type\": \"unknown-ack\""+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime+ "\", \"data\":" + "\"\"}";}udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));} catch (Exception e) {NAMING_LOGGER.error("[NA] error while receiving push data", e);}}
}
HostReactor.processServiceJSON
public ServiceInfo processServiceJSON(String json) {ServiceInfo serviceInfo = JSON.parseObject(json, ServiceInfo.class);ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {//empty or error push, just ignorereturn oldService;}boolean changed = false;if (oldService != null) {if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime()+ ", new-t: " + serviceInfo.getLastRefTime());}serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());for (Instance host : oldService.getHosts()) {oldHostMap.put(host.toInetAddr(), host);}Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());for (Instance host : serviceInfo.getHosts()) {newHostMap.put(host.toInetAddr(), host);}Set<Instance> modHosts = new HashSet<Instance>();Set<Instance> newHosts = new HashSet<Instance>();Set<Instance> remvHosts = new HashSet<Instance>();List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(newHostMap.entrySet());for (Map.Entry<String, Instance> entry : newServiceHosts) {Instance host = entry.getValue();String key = entry.getKey();if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(),oldHostMap.get(key).toString())) {modHosts.add(host);continue;}if (!oldHostMap.containsKey(key)) {newHosts.add(host);}}for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {Instance host = entry.getValue();String key = entry.getKey();if (newHostMap.containsKey(key)) {continue;}if (!newHostMap.containsKey(key)) {remvHosts.add(host);}}if (newHosts.size() > 0) {changed = true;NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: "+ serviceInfo.getKey() + " -> " + JSON.toJSONString(newHosts));}if (remvHosts.size() > 0) {changed = true;NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: "+ serviceInfo.getKey() + " -> " + JSON.toJSONString(remvHosts));}if (modHosts.size() > 0) {changed = true;NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: "+ serviceInfo.getKey() + " -> " + JSON.toJSONString(modHosts));}serviceInfo.setJsonFromServer(json);if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {eventDispatcher.serviceChanged(serviceInfo);DiskCache.write(serviceInfo, cacheDir);}} else {changed = true;NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JSON.toJSONString(serviceInfo.getHosts()));serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);eventDispatcher.serviceChanged(serviceInfo);serviceInfo.setJsonFromServer(json);DiskCache.write(serviceInfo, cacheDir);}MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());if (changed) {NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() +" -> " + JSON.toJSONString(serviceInfo.getHosts()));}return serviceInfo;
}
2.Nacos 服务注册的原理相关推荐
- Nacos服务注册与发现---Nacos简介以及原理
1. 什么是Nacos Nacos是SpringCloud Alibaba的一个服务治理的一个重要组件,英文全称Dynamic Naming and Configurat ...
- Nacos 服务注册与发现原理分析
Nacos 另一个非常重要的特性就是服务注册与发现,说到服务的注册与发现相信大家应该都不陌生,在微服务盛行的今天,服务是非常重要的,而在 Nacos 中服务更被称为他的一等公民. Nacos 支持几乎 ...
- nacis服务注册原理_Nacos 服务注册的原理
Nacos 服务注册需要具备的能力: 服务提供者把自己的协议地址注册到Nacos server 服务消费者需要从Nacos Server上去查询服务提供者的地址(根据服务名称) Nacos Serve ...
- nacos服务注册流程
nacos服务注册流程 nacos的客户端已经搭建好了,那么客户端是怎么将服务注册到注册中心去的呢. 1. 如果对springboot自动配置原理有一定了解的话,那么第三方框架一般都会通过spi的方式 ...
- 二、Nacos服务注册中心应用实践
Nacos服务注册中心 文章目录 Nacos服务注册中心 一.Nacos服务注册中心 1. 注册中心简介 2. 构建Nacos服务 2.1 准备工作 2.2 下载与安装 2.3 初始化配置 3. 服务 ...
- SpringCloud Alibaba微服务实战(二) - Nacos服务注册与restTemplate消费
说在前面 基础环境搭建,理论,请看上一篇,在这就不扯理论了,直接上代码. 项目结构 代码实现 第一步:在父pom的项目中引入dependencyManagement 在引入父pom之前咱们先来回顾下d ...
- Spring Cloud微服务之Nacos服务注册(九)
Nacos服务注册 前言 具体步骤 1.在service模块配置pom.xml 2.添加服务配置信息 3.添加Nacos客户端注解 4.启动客户端微服务 最后 前言 上一篇中我们介绍完了Nacos的基 ...
- 微服务2——服务的注册,调用(Nacos服务注册中心+服务调用+调用负载均衡)sca-comsumersca-provider
一.Nacos的安装和构建 以及启动 其官网地址如下: Nacos官网 1.安装前提: 第一:确保你电脑已配置JAVA_HOME环境变量(Nacos启动时需要),例如: 第二:确保你的MySQL版本 ...
- Nacos服务注册流程(一)
Nacos服务注册主流程 目录 一.准备事项 1.1 Nacos源码下载 1.2 启动Nacos服务端项目 1.3 启动Nacos客户端项目 二.Nacos客户端注册流程 2.1 注册方法的调用 2. ...
最新文章
- 前端学习(1732):前端系列javascript之插入内容
- 树莓派使用STEP7:安装wiringPi硬件外设驱动C库
- Little Elephant and Shifts(CF-220C)
- 消防给水及消火栓系统技术规范_对于高位消防水箱《消防给水及消火栓系统技术规范》是如何规定...
- theano 深度学习大全
- JavaScript数据类型之typeof检测变量数据类型(5)
- Java 8 Stream
- SHA256算法原理详解
- 设置服务器网站播放flv视频文件,网页制作 flvplayer.swf无法播放服务器上flv文件 如何设置...
- CCF优秀博士学位论文奖初评名单出炉!清华入选4人,数量第一
- 东子破解的java设计模式状态模式
- POI-HSSFWorkbook合并单元格边框及文字居中问题
- 0514课堂笔记--抽象类-接口
- 线程开的越多就越好吗|趣谈线程池
- 在线绘图网站文图使用教程
- Sketch(三)——插件
- web前端-综合应用案例-二维码名片的制作-educoder
- 迷你世界进云服务器需要密码,迷你世界云服务器
- Transfer Learning Toolkit (TLT) + DeepStream (DS)快速部署深度学习模型(以口罩检测为例)
- 按键精灵:函数之可选参数
热门文章
- 8道2021年美团C++/Java最新面试真题,你能做对几道?(含答案)
- 搜索不到投屏设备怎么办_电视投屏搜索不到设备解决方案
- [推荐]零售业CRM应用突破之道 穆穆-movno1 (入选推荐日志,加10币)
- python里randint是什么意思_Python中random.randint方法(精选)
- c语言程序实现进程的管道通信,C 进程间通信--命名管道通信代码实现及其原理图示...
- 耿丹CS16-2班课堂测试作业汇总
- sau交流学习社区--看小说的lovebook一个无线端BS应用
- OpenGL(5)Texture - 两张图片
- 软考一般什么时候出成绩呢?
- 软考 计算机 都有什么考试内容,了解软考是什么 软考考试流程都包括哪些