摘要:
本文是Nacos源码学习的第一篇,基于Nacos v1.1.3版本对Nacos源码进行学习,本片主要从exmaple的App示例入手,切入Nacos客户端NacosNamingService的初始化过程,并分析初始化相关的类。
一.源码下载
1.首先Fork一份Nacos源码到自己的github账号下面。
2.用git clone自己账号下的github项目到本地。
3.本地创建一个用于学习的分支,方便自己对源码进行一些标注,同时不会污染到主干分支的源码。
4.另外还可以创建一个开发的分支,以便进行一些源码的修改。
5.在本地搭建一套Nacos的运行环境,以便进行运行测试和调试。

二.从何入手
1.Nacos的源码采用的是Maven父子项目结构,其源码结构如下图所示,目前我们还不知道每个模块是干什么用的,但是其中一个模块我认为是值得下手研究的,那就是example,我们可以从这里入手开始调试和阅读源码。


2.com.alibaba.nacos.example.App
我们首先来看下App这个程序,如图所示,这里程序主要演示的是进行服务注册和服务获取,首先通过Properties文件将Nacos服务器信息以及命名空间进行设置,然后通过NamingFactory创建NamingService服务,最后通过NamingService服务注册了两个服务,服务名称都是nacos.test.3,但是集群名称不同。最后通过getAllInstnaces获取这个服务命对应的所有服务。

3. com.alibaba.nacos.api.naming.NamingFactory
很显然,我们需要看下NamingFactroy是干什么的,如下图所示,NamingFactory是一个工厂类,用于创建NamingService。

NamingFactory可以通过两种方式获取NamingService,我们先来看看通过Properties来创建,可以看到这里它通过反射的方式创建了NamingService的一个实现类:com.alibaba.nacos.client.naming.NacosNamingService,这个类存在于client模块中,这样客户端就能够通过NacosNamingService来管理和注册服务了。

4.NacosNamingService初始化
接下来看下NacosNamingService的构造方法,如下图所示,主要看init方法,其中进行了一些init方法:
initNamespaceForNaming:用于初始命名空间,在Nacos中命名空间用于租户粗粒度隔离,同时还可以进行环境的区别,如开发环境和测试环境等等。
initServerAddr:初始化服务器地址,其中涉及到的endpoint 等,我们后面进行讨论
initWebRootContext:初始化web上下文,其支持通过阿里云EDAS进行部署
initCacheDir:初始化缓存目录
initLogName:从配置中获取日志文件
EventDispatcher:监听事件分发,当客户端订阅了某个服务信息后,会以Listener的方式注册到EventDispatcher的队列中,当有服务变化的时候,会通知订阅者。
NamingProxy:服务端的代理,用于客户端与服务端的通信
BeatReactor:用于维持与服务器之间的心跳通信,上报客户端注册到服务端的服务信息。
HostReactor:用于客户端服务的订阅,以及从服务端更新服务信息

6.方法具体分析和对象的构建-1
a.initNamespaceForNaming
通过备注方式进行解释:

//初始化获取Namespacepublic static String initNamespaceForNaming(Properties properties) {String tmpNamespace = null;//是否使用阿里云上环境进行解析,默认为true,如果没有进行配置,默认使用DEFAULT_USE_CLOUD_NAMESPACE_PARSINGString isUseCloudNamespaceParsing =properties.getProperty(PropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING,System.getProperty(SystemPropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING,String.valueOf(Constants.DEFAULT_USE_CLOUD_NAMESPACE_PARSING)));if (Boolean.valueOf(isUseCloudNamespaceParsing)) {tmpNamespace = TenantUtil.getUserTenantForAns();//从系统变量获取ans.namespacetmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {@Overridepublic String call() {String namespace = System.getProperty(SystemPropertyKeyConst.ANS_NAMESPACE);LogUtils.NAMING_LOGGER.info("initializer namespace from System Property :" + namespace);return namespace;}});//从环境变量获取ALIBABA_ALIWARE_NAMESPACE,这一步和前一步应该是跟云上环境有关tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {@Overridepublic String call() {String namespace = System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_NAMESPACE);LogUtils.NAMING_LOGGER.info("initializer namespace from System Environment :" + namespace);return namespace;}});}//如果不是上云环境,那么从系统变量获取namespacetmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {@Overridepublic String call() {String namespace = System.getProperty(PropertyKeyConst.NAMESPACE);LogUtils.NAMING_LOGGER.info("initializer namespace from System Property :" + namespace);return namespace;}});//若上面没获取到,从properties中获取namespaceif (StringUtils.isEmpty(tmpNamespace) && properties != null) {tmpNamespace = properties.getProperty(PropertyKeyConst.NAMESPACE);}//若还没有渠道,获取系统默认的namespacetmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {@Overridepublic String call() {return UtilAndComs.DEFAULT_NAMESPACE_ID;}});return tmpNamespace;}

b.initServerAddr
这里让人疑惑的是,有个初始化Endpoint,参考官网给出的解释:Endpoint是提供一种能力,让客户端能够感知到Nacos服务端的扩缩容,说直白一点就是配置一个URL,通过URL可以获取Nacos服务器信息,也就是再也不用通过在客户端配置死服务器端地址(这点设计还是很不错),具体看下Endpoint是怎么初始化的

//初始化服务器地址
private void initServerAddr(Properties properties) {//从properties中获取服务器地址serverList = properties.getProperty(PropertyKeyConst.SERVER_ADDR);//初始化endpoint,如果有endpoint,则废弃serverListendpoint = InitUtils.initEndpoint(properties);if (StringUtils.isNotEmpty(endpoint)) {serverList = "";}
}
public static String initEndpoint(final Properties properties) {if (properties == null) {return "";}// Whether to enable domain name resolution rules//是否使用endpoint解析,默认为true,也就是:USE_ENDPOINT_PARSING_RULE_DEFAULT_VALUEString isUseEndpointRuleParsing =properties.getProperty(PropertyKeyConst.IS_USE_ENDPOINT_PARSING_RULE,System.getProperty(SystemPropertyKeyConst.IS_USE_ENDPOINT_PARSING_RULE,String.valueOf(ParamUtil.USE_ENDPOINT_PARSING_RULE_DEFAULT_VALUE)));boolean isUseEndpointParsingRule = Boolean.valueOf(isUseEndpointRuleParsing);String endpointUrl;//使用endpoint解析功能if (isUseEndpointParsingRule) {// Get the set domain name informationendpointUrl = ParamUtil.parsingEndpointRule(properties.getProperty(PropertyKeyConst.ENDPOINT));if (StringUtils.isBlank(endpointUrl)) {return "";}} else {//不使用的化,直接通过properties文件来获取endpointUrl = properties.getProperty(PropertyKeyConst.ENDPOINT);}if (StringUtils.isBlank(endpointUrl)) {return "";}//获取endpoint的端口String endpointPort = TemplateUtils.stringEmptyAndThenExecute(System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_ENDPOINT_PORT), new Callable<String>() {@Overridepublic String call() {return properties.getProperty(PropertyKeyConst.ENDPOINT_PORT);}});endpointPort = TemplateUtils.stringEmptyAndThenExecute(endpointPort, new Callable<String>() {@Overridepublic String call() {return "8080";}});return endpointUrl + ":" + endpointPort;
}

c.initWebRootContext
这里不用多说了,初始化基本的web上下文,同样会涉及到阿里云上云的环境

public static void initWebRootContext() {// support the web context with ali-yun if the app deploy by EDAS//支持阿里云上的webContextfinal String webContext = System.getProperty(SystemPropertyKeyConst.NAMING_WEB_CONTEXT);//生成url_base和url_instance拼接TemplateUtils.stringNotEmptyAndThenExecute(webContext, new Runnable() {@Overridepublic void run() {UtilAndComs.WEB_CONTEXT = webContext.indexOf("/") > -1 ? webContext: "/" + webContext;UtilAndComs.NACOS_URL_BASE = UtilAndComs.WEB_CONTEXT + "/v1/ns";UtilAndComs.NACOS_URL_INSTANCE = UtilAndComs.NACOS_URL_BASE + "/instance";}});
}

d.initCacheDir
初始化缓存目录,用于存放从服务端获取的服务信息,如果客户端与服务端断开了连接,将会使用缓存的信息
//初始化缓存目录
private void initCacheDir() {
cacheDir = System.getProperty(“com.alibaba.nacos.naming.cache.dir”);
if (StringUtils.isEmpty(cacheDir)) {
cacheDir = System.getProperty(“user.home”) + “/nacos/naming/” + namespace;
}
}

e.initLogName
这里也不多说,这是初始化日志存放路径

 //初始化日志目录private void initLogName(Properties properties) {logName = System.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME);if (StringUtils.isEmpty(logName)) {if (properties != null && StringUtils.isNotEmpty(properties.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME))) {logName = properties.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME);} else {logName = "naming.log";}}}

f.EventDispatcher
EventDispatcher 是一个事件分发器,其维护了一个发生了变化服务的队列,一个对于某个服务的监听者队列映射,实时的将服务变化信息同步给监听者,这样客户端就可以通过注册监听者实现在服务变化后动态进行操作。

public class EventDispatcher {private ExecutorService executor = null;//发生了变化的服务队列private BlockingQueue<ServiceInfo> changedServices = new LinkedBlockingQueue<ServiceInfo>();//监听者维护映射private ConcurrentMap<String, List<EventListener>> observerMap= new ConcurrentHashMap<String, List<EventListener>>();public EventDispatcher() {executor = Executors.newSingleThreadExecutor(new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener");thread.setDaemon(true);return thread;}});executor.execute(new Notifier());}public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map");List<EventListener> observers = Collections.synchronizedList(new ArrayList<EventListener>());observers.add(listener);observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);if (observers != null) {observers.add(listener);}serviceChanged(serviceInfo);}public void removeListener(String serviceName, String clusters, EventListener listener) {NAMING_LOGGER.info("[LISTENER] removing " + serviceName + " with " + clusters + " from listener map");List<EventListener> observers = observerMap.get(ServiceInfo.getKey(serviceName, clusters));if (observers != null) {Iterator<EventListener> iter = observers.iterator();while (iter.hasNext()) {EventListener oldListener = iter.next();if (oldListener.equals(listener)) {iter.remove();}}if (observers.isEmpty()) {observerMap.remove(ServiceInfo.getKey(serviceName, clusters));}}}public List<ServiceInfo> getSubscribeServices() {List<ServiceInfo> serviceInfos = new ArrayList<ServiceInfo>();for (String key : observerMap.keySet()) {serviceInfos.add(ServiceInfo.fromKey(key));}return serviceInfos;}public void serviceChanged(ServiceInfo serviceInfo) {if (serviceInfo == null) {return;}changedServices.add(serviceInfo);}//服务变化通知线程private class Notifier implements Runnable {@Overridepublic void run() {while (true) {ServiceInfo serviceInfo = null;try {//从队列取出变化消息serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);} catch (Exception ignore) {}if (serviceInfo == null) {continue;}try {//获取监听者队列List<EventListener> listeners = observerMap.get(serviceInfo.getKey());//遍历监听者队列,调用其onEvent方法if (!CollectionUtils.isEmpty(listeners)) {for (EventListener listener : listeners) {List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), hosts));}}} catch (Exception e) {NAMING_LOGGER.error("[NA] notify error for service: "+ serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e);}}}}public void setExecutor(ExecutorService executor) {ExecutorService oldExecutor = this.executor;this.executor = executor;oldExecutor.shutdown();}
}

g.NamingProxy
serverProxy是客户端与服务器端的代理,其封装了与服务端的操作,这里代码太多,不具体列出,遇到具体场景再进行分析
h.BeatReactor
beatReactor是负责与服务端建立上报机制的类,对于ephemeral为true的服务,客户端需要通过BeatReactor周期性的进行服务的上报,告诉服务端该服务处于正常状态,若一段时间内未进行该服务的上报,服务端会移除该服务的注册。这里所说的ephemeral服务是指服务信息不会在服务端持久化的服务,对于ephemeral为false的服务,服务信息会持久化到服务端。下面是BeatReactor类的代码.

//对于临时服务ephemeral(在服务器段不进行持久化的服务),需要BeatReactor进行周期性的状态上报
public class BeatReactor {private ScheduledExecutorService executorService;private NamingProxy serverProxy;public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap<String, BeatInfo>();public BeatReactor(NamingProxy serverProxy) {this(serverProxy, UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);}public BeatReactor(NamingProxy serverProxy, int threadCount) {this.serverProxy = serverProxy;executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.naming.beat.sender");return thread;}});}//当在注册服务的时候,会添加上报线程任务public void addBeatInfo(String serviceName, BeatInfo beatInfo) {NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());}//当移除服务的时候,会移除上报线程任务public void removeBeatInfo(String serviceName, String ip, int port) {NAMING_LOGGER.info("[BEAT] removing beat: {}:{}:{} from beat map.", serviceName, ip, port);BeatInfo beatInfo = dom2Beat.remove(buildKey(serviceName, ip, port));if (beatInfo == null) {return;}beatInfo.setStopped(true);MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());}private String buildKey(String serviceName, String ip, int port) {return serviceName + Constants.NAMING_INSTANCE_ID_SPLITTER+ ip + Constants.NAMING_INSTANCE_ID_SPLITTER + port;}//对于ephemeral服务,需要按周期上报服务信息class BeatTask implements Runnable {BeatInfo beatInfo;public BeatTask(BeatInfo beatInfo) {this.beatInfo = beatInfo;}@Overridepublic void run() {if (beatInfo.isStopped()) {return;}long result = serverProxy.sendBeat(beatInfo);long nextTime = result > 0 ? result : beatInfo.getPeriod();executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);}}
}

i.HostReactor
当客户端去获取服务端上注册的服务的时候,HostReactor就排上用场了,其维护了serviceInfoMap,当客户端调用NacosNamingServiced的获取服务信息方法的时候,HostReactor就把服务信息维护到serviceInfoMap当中,并通过UpdateTask能够周期性的从服务端获取订阅服务的最新信息,同时HostReactor还持有pushReceiver对象,用于通过UDP协议从服务器获取推送的信息,并更新到serviceInfoMap当中。HostReactor还持有failoverReactor对象,当服务端不可用的时候,切换到本地缓存模式,从缓存中获取服务信息。

public class HostReactor {private static final long DEFAULT_DELAY = 1000L;private static final long UPDATE_HOLD_INTERVAL = 5000L;private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();private Map<String, ServiceInfo> serviceInfoMap;private Map<String, Object> updatingMap;private PushReceiver pushReceiver;private EventDispatcher eventDispatcher;private NamingProxy serverProxy;private FailoverReactor failoverReactor;private String cacheDir;private ScheduledExecutorService executor;public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir) {this(eventDispatcher, serverProxy, cacheDir, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);}//构造方法public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir,boolean loadCacheAtStart, int pollingThreadCount) {//设置updater线程executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.client.naming.updater");return thread;}});this.eventDispatcher = eventDispatcher;this.serverProxy = serverProxy;this.cacheDir = cacheDir;//如果loadCacheAtStart就从本地缓存文件加载服务if (loadCacheAtStart) {this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));} else {this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);}//初始化更新映射this.updatingMap = new ConcurrentHashMap<String, Object>();//初始化容错服务this.failoverReactor = new FailoverReactor(this, cacheDir);//初始化pushReceiver用于接收服务端推送的UDP数据this.pushReceiver = new PushReceiver(this);}public Map<String, ServiceInfo> getServiceInfoMap() {return serviceInfoMap;}public synchronized ScheduledFuture<?> addTask(UpdateTask task) {return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);}//处理从服务端接收到的数据public ServiceInfo processServiceJSON(String json) {ServiceInfo serviceInfo = JSON.parseObject(json, ServiceInfo.class);ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {//empty or error push, just ignorereturn oldService;}boolean changed = false;if (oldService != null) {//如果本地旧服务的获取时间比服务器端获取的时间新,则保留本地旧服务的时间if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime()+ ", new-t: " + serviceInfo.getLastRefTime());}//用新服务信息替换serviceInfoMapserviceInfoMap.put(serviceInfo.getKey(), serviceInfo);Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());for (Instance host : oldService.getHosts()) {oldHostMap.put(host.toInetAddr(), host);}Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());for (Instance host : serviceInfo.getHosts()) {newHostMap.put(host.toInetAddr(), host);}Set<Instance> modHosts = new HashSet<Instance>();Set<Instance> newHosts = new HashSet<Instance>();Set<Instance> remvHosts = new HashSet<Instance>();List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(newHostMap.entrySet());for (Map.Entry<String, Instance> entry : newServiceHosts) {Instance host = entry.getValue();String key = entry.getKey();if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(),oldHostMap.get(key).toString())) {modHosts.add(host);continue;}if (!oldHostMap.containsKey(key)) {newHosts.add(host);}}for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {Instance host = entry.getValue();String key = entry.getKey();if (newHostMap.containsKey(key)) {continue;}if (!newHostMap.containsKey(key)) {remvHosts.add(host);}}if (newHosts.size() > 0) {changed = true;NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: "+ serviceInfo.getKey() + " -> " + JSON.toJSONString(newHosts));}if (remvHosts.size() > 0) {changed = true;NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: "+ serviceInfo.getKey() + " -> " + JSON.toJSONString(remvHosts));}if (modHosts.size() > 0) {changed = true;NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: "+ serviceInfo.getKey() + " -> " + JSON.toJSONString(modHosts));}serviceInfo.setJsonFromServer(json);if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {eventDispatcher.serviceChanged(serviceInfo);DiskCache.write(serviceInfo, cacheDir);}} else {changed = true;NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JSON.toJSONString(serviceInfo.getHosts()));serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);eventDispatcher.serviceChanged(serviceInfo);serviceInfo.setJsonFromServer(json);DiskCache.write(serviceInfo, cacheDir);}MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());if (changed) {NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() +" -> " + JSON.toJSONString(serviceInfo.getHosts()));}return serviceInfo;}//通过key获取服务对象private ServiceInfo getServiceInfo0(String serviceName, String clusters) {//得到ServiceInfo的keyString key = ServiceInfo.getKey(serviceName, clusters);//从serviceInfoMap中获取servicereturn serviceInfoMap.get(key);}//直接从服务器端获取Service信息,并解析为ServiceInfo对象public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters) throws NacosException {String result = serverProxy.queryList(serviceName, clusters, 0, false);if (StringUtils.isNotEmpty(result)) {return JSON.parseObject(result, ServiceInfo.class);}return null;}public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());//serviceName是分组@@服务名称,再和clusters合并,得到keyString key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);//从serviceInfoMap获取serviceObj,如果没有serviceObj,则新生成一个if (null == serviceObj) {serviceObj = new ServiceInfo(serviceName, clusters);serviceInfoMap.put(serviceObj.getKey(), serviceObj);updatingMap.put(serviceName, new Object());updateServiceNow(serviceName, clusters);updatingMap.remove(serviceName);//如果更新列表中包含服务,则等待更新结束} else if (updatingMap.containsKey(serviceName)) {if (UPDATE_HOLD_INTERVAL > 0) {// hold a moment waiting for update finishsynchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);}}}}//加入到更新调度当中scheduleUpdateIfAbsent(serviceName, clusters);return serviceInfoMap.get(serviceObj.getKey());}public void scheduleUpdateIfAbsent(String serviceName, String clusters) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}//添加更新调度任务synchronized (futureMap) {if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {return;}ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);}}//直接从服务端更新服务public void updateServiceNow(String serviceName, String clusters) {ServiceInfo oldService = getServiceInfo0(serviceName, clusters);try {String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);if (StringUtils.isNotEmpty(result)) {processServiceJSON(result);}} catch (Exception e) {NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);} finally {if (oldService != null) {synchronized (oldService) {oldService.notifyAll();}}}}public void refreshOnly(String serviceName, String clusters) {try {serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);} catch (Exception e) {NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);}}//更新服务public class UpdateTask implements Runnable {long lastRefTime = Long.MAX_VALUE;private String clusters;private String serviceName;public UpdateTask(String serviceName, String clusters) {this.serviceName = serviceName;this.clusters = clusters;}@Overridepublic void run() {try {ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));if (serviceObj == null) {updateServiceNow(serviceName, clusters);executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);return;}if (serviceObj.getLastRefTime() <= lastRefTime) {updateServiceNow(serviceName, clusters);serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));} else {// if serviceName already updated by push, we should not override it// since the push data may be different from pull through force pushrefreshOnly(serviceName, clusters);}executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);lastRefTime = serviceObj.getLastRefTime();} catch (Throwable e) {NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);}}}
}

三.小结
本文从exmaple中的App入手,粗略分析了下NacosNamingService的初始化过程,初始化包含的类,同时分析下每个类的作用,从而了解了Nacos客户端中关于服务注册中相关的机制。接下来将进一步继续分析其它exmaple中的程序。

菜鸟学源码之Nacos v1.1.3源码学习-Client模块(1):NacosNamingService初始化相关推荐

  1. 菜鸟学源码之Nacos v1.1.3源码学习-Client模块(2):NacosConfigService

    上一篇博客我们基于Nacos源码中的example模块里的app类学习了NacosNamingService相关的内容: https://blog.csdn.net/crystonesc/articl ...

  2. (Nacos源码解析五)Nacos服务事件变动源码解析

    Nacos源码解析系列目录 Nacos 源码编译运行 (Nacos源码解析一)Nacos 注册实例源码解析 (Nacos源码解析二)Nacos 服务发现源码解析 (Nacos源码解析三)Nacos 心 ...

  3. 菜鸟学WEB开发 ASP.NET 5.0 1.0

    菜鸟学WEB开发 ASP.NET 5.0 1.0 在学习之初我要强调一点"微软要向跨平台开发"大举进军了,不管他能走多远,这是微软的必经之路. 一.学习流程: 创建ASP.NET ...

  4. 菜鸟学Linux 第030篇笔记 yum使用,源码编译安装

    菜鸟学Linux 第030篇笔记 yum使用,源码编译安装 yum yellow updatet modified c/s: client, server yum repository 文件服务定义 ...

  5. 菜鸟学Android源码——Setting(1)

    菜鸟学Android源码--Setting(1) 在上一篇中,我简单介绍了Android源码的下载和编译,还没有下载编译源码的小伙伴请看这里:Android源码分析之--下载并编译源码 关于系统设置A ...

  6. 《一步一步看源码:Nacos》框架源码系列之一(其1,配置服务源码)

    Nacos源码 ​ 因为最近项目在做容器化处理,容器化后涉及到不同进程对同一个文件的读写,考虑到可能会存在同一文件的配置文件,可能会把彼此覆盖掉,所以这里学习一下Nacos源码. 整体结构图 ​ 这边 ...

  7. Spring Cloud Alibaba源码 - 16 Nacos 注册中心源码解析

    文章目录 Nacos & Ribbon & Feign 核心微服务架构图 Nacos核心功能 源码下载 & 启动 [standalone模式] [cluster模式] Naco ...

  8. 红盟云卡v1.6.2源码

    简介: 红盟云卡v1.6.2源码 网盘下载地址: http://www.zijiepan2.xyz/27C8qzDAw7u0 图片:

  9. PHP未来码支付V1.3网站源码开源版

    介绍: 一款PHP未来码V1.3网站源码开源版,喜欢就下载吧. 网盘下载地址: http://kekewl.net/cjMLteFF2UN0 图片:

最新文章

  1. 家居建材企业信息化管理路在何方?
  2. Why to do,What to do,Where to do 与 Lambda表达式!
  3. Ultrahaptics公司为Holodeck型触觉关闭了2300万美元的资金回合
  4. 【设计模式】代理模式 ( 动态代理使用流程 | 创建目标对象 | 创建被代理对象 | 创建调用处理程序 | 动态创建代理对象 | 动态代理调用 )
  5. 【Linux入门连载二】Linux系统有哪些基本目录?
  6. go uintptr unsafe Pointer offset() 的使用
  7. deactivate_sending在创建新的table entry时的作用
  8. php mysql 统计_PHP和MySQL实现优化统计每天数据
  9. 带你阅读linux内核源码:linux内核源代码编程规范
  10. 安装deb软件包时出现Unknown media type in type **/** 的解决办法
  11. html5 励志名言,励志名言五十条
  12. 尚德机构季报图解:净利达1.5亿 抗周期和盈利能力稳健
  13. python云图_python聚合云图
  14. 决定SCI论文价值的因素是什么?
  15. 2019年秋冬季读书笔记
  16. matlab zigzag算法,MATLAB 实现zigzag扫描(z字形扫描)
  17. 自己重构一个非常简单的网页
  18. 十次方项目开发系列【8】:对评论点赞功能开发 Redis的配置和使用
  19. 2023 iApp 微信余额装逼源码
  20. Office365 自定义模板(恢复)

热门文章

  1. 通过http请求 zabbix api 获取监控的流量数据
  2. 以太网封装及vlan封装类型
  3. 拓展客户有哪些诀窍?
  4. Google图算法引擎Pregel介绍
  5. java heap space解决方法
  6. waimai-cps:美团红包饿了么红包CPS小程序+ H5 +推出外卖红包应用,带有后台代码,安装超级简单-源码
  7. Clustering and Projected Clustering with Adaptive Neighbors(自适应邻域聚类CAN和自适应邻域投影聚类PCAN)
  8. PAR(Projected AR)投影增强现实系统使用Vuforia
  9. swagger 接口参数顺序_Swagger常用参数用法 - mao2080 - 博客园
  10. ubuntu20.04上openvino安装及环境配置