文章目录

  • 引例
  • NacosConfigService的初始化
    • ServerHttpAgent的构造函数解析
    • ServerListManager的构造函数解析
    • ConfigFilterChainManager的构造函数解析
    • ClientWorker的构造函数解析
  • NacosConfigService#getConfig方法浅析
    • LocalConfigInfoProcessor.getFailover 代码分析
    • worker.getServerConfig 代码分析
  • 配置更新
    • 添加监听器
    • addCacheDataIfAbsent

引例

以一段代码开启Nacos配置中心源码的分析

public static void main(String[] args) throws NacosException, IOException {String dataId = "nacos-simple-demo.yaml";String group = "DEFAULT_GROUP";Properties prop=new Properties();//指定nacos服务地址prop.put("serverAddr","127.0.0.1:8848");//指定namespaceprop.put("namespace","ad77ed68-20f8-4b2b-91fd-1105df209258");//⭐️ ① NacosConfigService的初始化 ⭐️ConfigService configService = NacosFactory.createConfigService(prop);//⭐️ ② 获取配置内容 ⭐️String content = configService.getConfig(dataId, group, 5000);//从nacos中取配置内容System.out.println(content);//订阅nacos配置变更事件configService.addListener(dataId, group, new Listener() {@Overridepublic Executor getExecutor() {return null;}@Overridepublic void receiveConfigInfo(String configInfo) {System.out.println("updated content:" + configInfo);}});System.in.read();}

NacosConfigService的初始化

ConfigFactory#createConfigService

public static ConfigService createConfigService(Properties properties) throws NacosException {try {Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");Constructor constructor = driverImplClass.getConstructor(Properties.class);ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);return vendorImpl;} catch (Throwable e) {throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);}}

这段代码主要以反射方式创建NacosConfigService

public NacosConfigService(Properties properties) throws NacosException {//检查初始化参数,主要检查ContextPathValidatorUtils.checkInitParam(properties);//获取编码方式,如无 则默认值是UTF-8String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);if (StringUtils.isBlank(encodeTmp)) {this.encode = Constants.ENCODE;} else {this.encode = encodeTmp.trim();}//初始化NameSpace,若未配置到properties,则nameSpace=""initNamespace(properties);//创建ConfigFilterChainManager 初始化Filter 下面会展开说this.configFilterChainManager = new ConfigFilterChainManager(properties);//装饰类,     MetricsHttpAgent主要做一些统计工作,其内部执行http请求的的仍是ServerHttpAgentthis.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));//启动agent,实际上是启动ServerHttpAgentthis.agent.start();//创建clientWorker,主要以长轮询方式进行配置的拉取、配置变更后的通知等,下面会展开说this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}
ServerHttpAgent的构造函数解析
 private static final NacosRestTemplate NACOS_RESTTEMPLATE = ConfigHttpClientManager.getInstance().getNacosRestTemplate();public ServerHttpAgent(Properties properties) throws NacosException {this.serverListMgr = new ServerListManager(properties);this.securityProxy = new SecurityProxy(properties, NACOS_RESTTEMPLATE);this.namespaceId = properties.getProperty(PropertyKeyConst.NAMESPACE);init(properties);this.securityProxy.login(this.serverListMgr.getServerUrls());// init executorServicethis.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.config.security.updater");t.setDaemon(true);return t;}});this.executorService.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {securityProxy.login(serverListMgr.getServerUrls());}}, 0, this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);}

该类主要从事Http请求相关事宜,声明了很多与Http请求有关的方法,其中发起Http请求的对象就是上面列出的NACOS_RESTTEMPLATE,它就像是一个HttpClient

构造函数中,为成员属性serverListMgr、securityProxy等赋值,初始化参数,初始化线程池等,这里不再赘述。其中与服务地址相关的内容封装在了ServerListManager中,下面进行展开分析

ServerListManager的构造函数解析
public ServerListManager(Properties properties) throws NacosException {this.isStarted = false;//获取服务地址this.serverAddrsStr = properties.getProperty(PropertyKeyConst.SERVER_ADDR);//获取namspaceString namespace = properties.getProperty(PropertyKeyConst.NAMESPACE);//初始化endpoint、contentPath、serverListName等initParam(properties);//如果服务地址不为空if (StringUtils.isNotEmpty(serverAddrsStr)) {this.isFixed = true;List<String> serverAddrs = new ArrayList<String>();//处理、拼接服务地址String[] serverAddrsArr = this.serverAddrsStr.split(",");for (String serverAddr : serverAddrsArr) {if (serverAddr.startsWith(HTTPS) || serverAddr.startsWith(HTTP)) {serverAddrs.add(serverAddr);} else {String[] serverAddrArr = IPUtil.splitIPPortStr(serverAddr);if (serverAddrArr.length == 1) {serverAddrs.add(HTTP + serverAddrArr[0] + IPUtil.IP_PORT_SPLITER + ParamUtil.getDefaultServerPort());} else {serverAddrs.add(HTTP + serverAddr);}}}//将拼接好的服务地址集合赋值给成员变量,服务地址形如http://127.0.0.1:8848this.serverUrls = serverAddrs;//如果namsspace为空,则为name赋值//⭐️ name的赋值规则是:"fixed_127.0.0.1_8848",⭐️ 这个name在后面有用到if (StringUtils.isBlank(namespace)) {this.name = FIXED_NAME + "-" + getFixedNameSuffix(this.serverUrls.toArray(new String[this.serverUrls.size()]));} else {this.namespace = namespace;this.tenant = namespace;this.name = FIXED_NAME + "-" + getFixedNameSuffix(this.serverUrls.toArray(new String[this.serverUrls.size()])) + "-" + namespace;}} else {//如果逻辑走到这里,endpoint为空 抛出异常if (StringUtils.isBlank(endpoint)) {throw new NacosException(NacosException.CLIENT_INVALID_PARAM, "endpoint is blank");}this.isFixed = false;if (StringUtils.isBlank(namespace)) {this.name = endpoint;this.addressServerUrl = String.format("http://%s:%d%s/%s", this.endpoint, this.endpointPort,ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName);} else {this.namespace = namespace;this.tenant = namespace;this.name = this.endpoint + "-" + namespace;this.addressServerUrl = String.format("http://%s:%d%s/%s?namespace=%s", this.endpoint, this.endpointPort,ContextPathUtil.normalizeContextPath(this.contentPath), this.serverListName, namespace);}}}

ServerHttpAgent构造函数的代码通俗易懂,上面主要分析了成员属性name的赋值规则,为后文使用做准备

ConfigFilterChainManager的构造函数解析
 public ConfigFilterChainManager(Properties properties) {ServiceLoader<IConfigFilter> configFilters = ServiceLoader.load(IConfigFilter.class);for (IConfigFilter configFilter : configFilters) {configFilter.init(properties);addFilter(configFilter);}}public synchronized ConfigFilterChainManager addFilter(IConfigFilter filter) {// 根据order大小顺序插入int i = 0;while (i < this.filters.size()) {IConfigFilter currentValue = this.filters.get(i);if (currentValue.getFilterName().equals(filter.getFilterName())) {break;}if (filter.getOrder() >= currentValue.getOrder() && i < this.filters.size()) {i++;} else {this.filters.add(i, filter);break;}}if (i == this.filters.size()) {this.filters.add(i, filter);}return this;}

ConfigFilterChainManager构造方法主要是通过SPI方式加载IConfigFilter实现类,并将其初始化,最终赋给成员变量filters。

默认会去META-INF/services下com.alibaba.nacos.api.config.filter.IConfigFilter加载实现类,这里由于没有额外配置,因而configFilters为空

ClientWorker的构造函数解析
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,final Properties properties) {this.agent = agent;this.configFilterChainManager = configFilterChainManager;//初始化超时参数init(properties);//创建线程池this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.Worker." + agent.getName());t.setDaemon(true);return t;}});//创建线程池,这里主要是执行长轮询,下面会展开this.executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());t.setDaemon(true);return t;}});//每隔一段时间 检查一下配置信息this.executor.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {try {checkConfigInfo();} catch (Throwable e) {LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);}}}, 1L, 10L, TimeUnit.MILLISECONDS);}

NacosConfigService#getConfig方法浅析

该方法主要是获取配置信息,有两种途径:从本地缓存或从远程配置中心。下面展开分析

//tenant 租户 其实就是nameSpace,dataId 数据Id group 分组名称 timeoutMs 超时时间
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {//若未配置group,则为其赋默认值DEFAULT_GROUPgroup = blank2defaultGroup(group);//检查dataId/group,这两项是必需的,为空或验证失败时 抛出异常ParamUtils.checkKeyParam(dataId, group);ConfigResponse cr = new ConfigResponse();cr.setDataId(dataId);cr.setTenant(tenant);cr.setGroup(group);// 优先使用本地配置,从本地配置文件中加载配置内容,后面展开说String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);//如果配置内容不为空if (content != null) {// .... 注释了日志打印信息cr.setContent(content);//密文数据密钥(EncryptedDataKey)的本地快照、容灾目录相关内容String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);cr.setEncryptedDataKey(encryptedDataKey);//对ConfigResponse使用过滤器处理configFilterChainManager.doFilter(null, cr);//获取配置内容 并返回content = cr.getContent();return content;}//如果本地文件中无配置内容或本地文件不存在,则从远程配置中心读取数据,后面展开说try {ConfigResponse response = worker.getServerConfig(dataId, group, tenant, timeoutMs);cr.setContent(response.getContent());cr.setEncryptedDataKey(response.getEncryptedDataKey());configFilterChainManager.doFilter(null, cr);content = cr.getContent();return content;} catch (NacosException ioe) {if (NacosException.NO_RIGHT == ioe.getErrCode()) {throw ioe;}// .... 注释了日志打印信息//将从远程获取的配置内容 写入到本地快照文件中,后面展开说content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);cr.setContent(content);String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);cr.setEncryptedDataKey(encryptedDataKey);configFilterChainManager.doFilter(null, cr);content = cr.getContent();return content;}

总的来说,getCofnigInner获取配置的主要逻辑是:先从本地获取,若本地未缓存配置内容,则从远程获取并缓存至本地,最终返回配置内容。

LocalConfigInfoProcessor.getFailover 代码分析
public static String getFailover(String serverName, String dataId, String group, String tenant) {//获取本地缓存数据的文件File localPath = getFailoverFile(serverName, dataId, group, tenant);//如果文件不存在或者类型不是文件 则返回if (!localPath.exists() || !localPath.isFile()) {return null;}try {//从文件中读取内容return readFile(localPath);} catch (IOException ioe) {LOGGER.error("[" + serverName + "] get failover error, " + localPath, ioe);return null;}}
//这里的serverName即上面ServerHttpAgent#name属性,名称为fixed_127.0.0.1_8848
static File getFailoverFile(String serverName, String dataId, String group, String tenant) {//若未配置JM.LOG.PATH属性值,那么LOCAL_SNAPSHOT_PATH=<user.home>/nacos/config/fixed_127.0.0.1_8848_nacos,其中user.home为用户目录File tmp = new File(LOCAL_SNAPSHOT_PATH, serverName + "_nacos");//tmp文件路径为<user.home>/nacos/config/fixed_127.0.0.1_8848_nacos/datatmp = new File(tmp, "data");//如果tenant为空,那么tmp文件路径为<user.home>/nacos/config/fixed_127.0.0.1_8848_nacos/data/config-dataif (StringUtils.isBlank(tenant)) {tmp = new File(tmp, "config-data");} else {tmp = new File(tmp, "config-data-tenant");tmp = new File(tmp, tenant);}//最终文件为<user.home>/nacos/config/fixed_127.0.0.1_8848_nacos/data/config-data/DEFAULT_GROUP/nacos-simple-demo.yaml(我测试方法中的dataId就叫这个名字)return new File(new File(tmp, group), dataId);}

上面方法描述从本地缓存文件中读取配置内容的过程,主要问题点是本地缓存文件的生成规则

worker.getServerConfig 代码分析
public ConfigResponse getServerConfig(String dataId, String group, String tenant, long readTimeout)throws NacosException {ConfigResponse configResponse = new ConfigResponse();if (StringUtils.isBlank(group)) {group = Constants.DEFAULT_GROUP;}HttpRestResult<String> result = null;try {Map<String, String> params = new HashMap<String, String>(3);if (StringUtils.isBlank(tenant)) {params.put("dataId", dataId);params.put("group", group);} else {params.put("dataId", dataId);params.put("group", group);params.put("tenant", tenant);}result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);} catch (Exception ex) {String message = String.format("[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s",agent.getName(), dataId, group, tenant);LOGGER.error(message, ex);throw new NacosException(NacosException.SERVER_ERROR, ex);}switch (result.getCode()) {//当成功响应时case HttpURLConnection.HTTP_OK://保存配置内容为本地快照文件LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.getData());configResponse.setContent(result.getData());String configType;if (result.getHeader().getValue(CONFIG_TYPE) != null) {configType = result.getHeader().getValue(CONFIG_TYPE);} else {configType = ConfigType.TEXT.getType();}configResponse.setConfigType(configType);//获取秘钥内容,若秘钥不为空 则保存至本地文件中String encryptedDataKey = result.getHeader().getValue(ENCRYPTED_DATA_KEY);LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant, encryptedDataKey);configResponse.setEncryptedDataKey(encryptedDataKey);return configResponse;case HttpURLConnection.HTTP_NOT_FOUND:LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant, null);return configResponse;case HttpURLConnection.HTTP_CONFLICT: {//...日志输出throw new NacosException(NacosException.CONFLICT,"data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);}case HttpURLConnection.HTTP_FORBIDDEN: {//...日志输出throw new NacosException(result.getCode(), result.getMessage());}default: {//...日志输出throw new NacosException(result.getCode(),"http error, code=" + result.getCode() + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);}}}

这个方法主要是以get方式请求http://127.0.0.1:8848/v1/cs/configs获取配置内容。若成功获取配置内容,则将配置内容保存为本地快照文件,并将秘钥保存至本地。若未成功响应 则做异常处理,代码虽长,但是比较容易理解。下面分析保存本地快照文件及秘钥数据的逻辑

保存本地快照文件的逻辑 主要在于创建快照文件,随后就是将内容写入。而快照文件的生成规则与上面本地缓存文件的生成规则类似,不再赘述。最终生成一个形如<user.home>/nacos/config/fixed-127.0.0.1_8848_nacos/snapshot/DEFAULT_GROUP/nacos-simple-demo.yaml的快照文件。

秘钥文件的生成规则与之都类似,只是需要注意的是,若encryptedDataKey是为空,那么会将生成的秘钥文件给删除。

至此,获取配置文件内容的逻辑大致分析完成了。

配置更新

通过使用nacos我们可以发现,它可以实现自动地配置更新,即当远程配置中心文件内容被修改之后,项目中对应的配置也会进行更新,那么它是怎么实现的呢?我们一起来探讨下

这里先将结论给出:长轮询任务定时拉取远程配置内容,通过MD5值进行内容比较,若发现当前配置内容MD5与拉取的远程配置内容的MD5值不同,即认为配置内容发生了变更,将最新的配置内容推送到客户端。

前文demo中,当配置内容变更时,Nacos将最新结果推送到我们定义的监听器中。因此我们以此为入口进行分析

添加监听器
//NacosConfigService#addListener
@Overridepublic void addListener(String dataId, String group, Listener listener) throws NacosException {worker.addTenantListeners(dataId, group, Arrays.asList(listener));}
// ClientWorker#addTenantListeners
// 存储key与缓存数据,其中key是dataId、group、tenant以一定规则生成
private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap<String, CacheData>();public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)throws NacosException {//获取group,若未配置 取默认值DEFAULT_GROUPgroup = blank2defaultGroup(group);//获取租户,若未配置 则为空字符串String tenant = agent.getTenant();//获取缓存数据,这是重点CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);for (Listener listener : listeners) {//添加监听器cache.addListener(listener);}}
addCacheDataIfAbsent
public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {//生成key,大致是dataId+group+tenant的形式,若dataId/group/tenant字符串存在+、%则分别进行转义,如我这里生成的key=nacos-simple-demo.yaml+DEFAULT_GROUPString key = GroupKey.getKeyTenant(dataId, group, tenant);//从map中取缓存数据,若取到 则返回CacheData cacheData = cacheMap.get(key);if (cacheData != null) {return cacheData;}//⭐️ CacheData的初始化 稍后展开说 ⭐️cacheData = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);// putIfAbsent 将cacheData设置到map中 并将key原始值返回// ⭐️ 这里cacheMap进行put操作 ⭐️CacheData lastCacheData = cacheMap.putIfAbsent(key, cacheData);if (lastCacheData == null) {// 这里默认是false,即未开启远程同步配置if (enableRemoteSyncConfig) {//从远程获取配置内容ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);cacheData.setContent(response.getContent());}// ParamUtil.getPerTaskConfigSize()=3000,这里taskId=1/3000=0// 这里的taskId有种批次号的概念,当缓存中数据较多时分批进行处理 校验。与下文中ClientWorker#LongPollingRunnable中taskId呼应int taskId = cacheMap.size() / (int) ParamUtil.getPerTaskConfigSize();cacheData.setTaskId(taskId);lastCacheData = cacheData;}// reset so that server not hang this checklastCacheData.setInitializing(true);LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.size());return lastCacheData;}

这里主要将配置内容存储到cacheMap。那配置内容是从哪里获取的呢?这还是先从CacheData的构造函数看起吧。

public CacheData(ConfigFilterChainManager configFilterChainManager, String name, String dataId, String group,String tenant) {if (null == dataId || null == group) {throw new IllegalArgumentException("dataId=" + dataId + ", group=" + group);}this.name = name;this.configFilterChainManager = configFilterChainManager;this.dataId = dataId;this.group = group;this.tenant = tenant;//初始化listeners listeners = new CopyOnWriteArrayList<ManagerListenerWrap>();this.isInitializing = true;//⭐️从本地缓存文件中取配置内容⭐️this.content = loadCacheContentFromDiskLocal(name, dataId, group, tenant);//⭐️ 对内容进行MD5,这里需要注意⭐️this.md5 = getMd5String(content);//⭐️ 从本地文件获取秘钥⭐️this.encryptedDataKey = loadEncryptedDataKeyFromDiskLocal(name, dataId, group, tenant);}

看到了CacheData构造函数 大体都一目了然了,这里就不再赘述了。

至此 配置内容已被加载到cacheMap中

前文 ClientWorker#addTenantListeners,最终listener是在Cache#addListener被添加到集合中的

//Cache#addListener
public void addListener(Listener listener) {if (null == listener) {throw new IllegalArgumentException("listener is null");}ManagerListenerWrap wrap =(listener instanceof AbstractConfigChangeListener) ? new ManagerListenerWrap(listener, md5, content): new ManagerListenerWrap(listener, md5);if (listeners.addIfAbsent(wrap)) {LOGGER.info("[{}] [add-listener] ok, tenant={}, dataId={}, group={}, cnt={}", name, tenant, dataId, group,listeners.size());}}

这里主要注意ManagerListenerWrap的生成。根据listener的类型不同 从而选择ManagerListenerWrap不同的构造。传入ManagerListenerWrap的md5需要注意,它是CacheData的成员属性,代表当前缓存内容的MD5,后文会使用到它

分析了addListener方法的部分实现后,还记得ClientWorkercheckConfigInfo定时任务吗?其实 我们一直在为该方法做准备工作,现在一起看下方法的主要实现逻辑吧

public void checkConfigInfo() {// 接上文,我们此时cacheMap.size=1int listenerSize = cacheMap.size();// Math.ceil取上整,即Math.ceil(1/3000.0)=1int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());if (longingTaskCount > currentLongingTaskCount) {for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {//长轮询拉取配置,进行比较等executorService.execute(new LongPollingRunnable(i));}currentLongingTaskCount = longingTaskCount;}}

checkConfigInfo的实现中,我们发现了大boss–>LongPollingRunnable

这个方法主要做两件事:

  • 如果缓存中有数据且为同一个任务号(taskId) 则检查本地配置
  • 如果不存在缓存数据,从远程获取数据 并添加到缓存map中
class LongPollingRunnable implements Runnable {@Overridepublic void run() {List<CacheData> cacheDatas = new ArrayList<CacheData>();List<String> inInitializingCacheList = new ArrayList<String>();try {// check failover configfor (CacheData cacheData : cacheMap.values()) {//如果taskId一致if (cacheData.getTaskId() == taskId) {cacheDatas.add(cacheData);try {//检查本地配置checkLocalConfig(cacheData);//如果使用本地配置,则再次通过检查MD5值 来判断配置内容是否发生变更if (cacheData.isUseLocalConfigInfo()) {cacheData.checkListenerMd5();}} catch (Exception e) {LOGGER.error("get local config info error", e);}}}//... 省略暂不讨论的代码}

LongPollingRunnable#checkLocalConfig

private void checkLocalConfig(CacheData cacheData) {final String dataId = cacheData.dataId;final String group = cacheData.group;final String tenant = cacheData.tenant;//获取本地缓存文件,这里的文件格式形如/Users/wojiushiwo/nacos/config/fixed_127.0.0.1_8848_nacos/data/config-data/DEFAULT_GROUP/nacos-simple-demo.yaml File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);//如果不使用本地配置内容 并且缓存文件存在,则将缓存文件读入cacheData 并设置isUseLocalConfigInfo=true 即使用本地配置内容 if (!cacheData.isUseLocalConfigInfo() && path.exists()) {//读取本地缓存文件内容String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);//计算MD5final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);cacheData.setUseLocalConfigInfo(true);//设置本地配置信息版本 即文件的修改时间戳cacheData.setLocalConfigInfoVersion(path.lastModified());// ⭐️ 这里需要注意 设置配置内容至cacheData时 同时修改了MD5值 ⭐️cacheData.setContent(content);//获取密钥内容 String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);cacheData.setEncryptedDataKey(encryptedDataKey);//省略日志打印代码return;}// 如果使用本地配置内容 并且 本地缓存文件不存在if (cacheData.isUseLocalConfigInfo() && !path.exists()) {cacheData.setUseLocalConfigInfo(false);//省略日志打印代码return;}// 如果使用本地配置内容 而且本地缓存文件存在 但是两者的修改时间戳不一致,说明配置内容发生过修改,则以本地缓存文件内容为主,将本地缓存文件内容 设置到cacheData中if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path.lastModified()) {//读取本地缓存文件内容String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);cacheData.setUseLocalConfigInfo(true);cacheData.setLocalConfigInfoVersion(path.lastModified());cacheData.setContent(content);String encryptedDataKey = LocalEncryptedDataKeyProcessor.getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);cacheData.setEncryptedDataKey(encryptedDataKey);//省略日志打印代码}}
//结合上下文isUseLocalConfigInfo=true 有两种可能:1、本地缓存文件存在 2、本地缓存文件存在并且被修改过
// 如果使用本地配置信息 则检查配置内容是否发生过变更  若发生变更 则将变更内容推送到Listener
if (cacheData.isUseLocalConfigInfo()) {cacheData.checkListenerMd5();}void checkListenerMd5() {//ManagerListenerWrap是在NacosConfigService#addListener中构建的,传入的MD5 是根据彼时文本内容计算的for (ManagerListenerWrap wrap : listeners) {//md5是CacheData实例变量 当配置内容变更时 会重新计算//如果配置内容发生了变更,则将变更内容推送到Listener#receiveConfigInfoif (!md5.equals(wrap.lastCallMd5)) {safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);}}}

推送变更内容safeNotifyListener

private void safeNotifyListener(final String dataId, final String group, final String content, final String type,final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {final Listener listener = listenerWrap.listener;Runnable job = new Runnable() {@Overridepublic void run() {ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();ClassLoader appClassLoader = listener.getClass().getClassLoader();try {//如果listener是AbstractSharedListener类型if (listener instanceof AbstractSharedListener) {AbstractSharedListener adapter = (AbstractSharedListener) listener;adapter.fillContext(dataId, group);//省略无关代码}// 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。Thread.currentThread().setContextClassLoader(appClassLoader);//构造ConfigResponse对应ConfigResponse cr = new ConfigResponse();cr.setDataId(dataId);cr.setGroup(group);cr.setContent(content);cr.setEncryptedDataKey(encryptedDataKey);configFilterChainManager.doFilter(null, cr);String contentTmp = cr.getContent();//⭐️ 将变更内容推送到listener.receiveConfigInfo ⭐️listener.receiveConfigInfo(contentTmp);// 如果listener是AbstractConfigChangeListener类型 则推送配置变更事件if (listener instanceof AbstractConfigChangeListener) {Map data = ConfigChangeHandler.getInstance().parseChangeData(listenerWrap.lastContent, content, type);ConfigChangeEvent event = new ConfigChangeEvent(data);((AbstractConfigChangeListener) listener).receiveConfigChange(event);listenerWrap.lastContent = content;}//⭐️ 这里更新了ManagerListenerWrap中lastCallMd5值 ⭐️listenerWrap.lastCallMd5 = md5;//省略无关代码};final long startNotify = System.currentTimeMillis();try {//如果我们实现了Listener接口 并重写了getExecutor方法 则使用getExecutor()得到的线程池执行任务,否则就简单调用Runnbale#run执行if (null != listener.getExecutor()) {listener.getExecutor().execute(job);} else {job.run();}} catch (Throwable t) {//省略无关代码}

下面讨论缓存内容不存在的情况

public void run() {//省略暂不讨论代码//⭐️ 发送HTTP请求 监控配置内容是否变化;若未变化 http响应内容为空;若发生变化则将变更内容的信息返回,后面会展开分析 ⭐️List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);if (!CollectionUtils.isEmpty(changedGroupKeys)) {LOGGER.info("get changedGroupKeys:" + changedGroupKeys);}//如果配置内容发生了变化 从groupKey中解读出dataId、group、tenant// 然后从远程拉取配置内容并放入cacheData,关于cacheMap#key的生成规则以及cacheData.checkListenerMd5()等都已经介绍过了,下面的代码就不再赘述了for (String groupKey : changedGroupKeys) {String[] key = GroupKey.parseKey(groupKey);String dataId = key[0];String group = key[1];String tenant = null;if (key.length == 3) {tenant = key[2];}try {//获取远程配置内容 并设置到cacheData中ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L);                   CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));cache.setContent(response.getContent());cache.setEncryptedDataKey(response.getEncryptedDataKey());if (null != response.getConfigType()) {cache.setType(response.getConfigType());}//省略无关代码} catch (NacosException ioe) {//省略无关代码}for (CacheData cacheData : cacheDatas) {if (!cacheData.isInitializing() || inInitializingCacheList.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {cacheData.checkListenerMd5();cacheData.setInitializing(false);}}inInitializingCacheList.clear();executorService.execute(this);} catch (Throwable e) {//省略无关代码executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);}}}

上面代码主要逻辑是当监控到远程配置内容发生变更后,即拉取远程配置 存储至本地文件及cacheData,下面主要分析checkUpdateDataIds

List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception {StringBuilder sb = new StringBuilder();//这里将dataId、group、md5等拼接起来,代码一目了然 不再赘述for (CacheData cacheData : cacheDatas) {if (!cacheData.isUseLocalConfigInfo()) {sb.append(cacheData.dataId).append(WORD_SEPARATOR);sb.append(cacheData.group).append(WORD_SEPARATOR);if (StringUtils.isBlank(cacheData.tenant)) {sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);} else {sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);}//默认是true,当cacheData被初始化数据后 该属性即变为falseif (cacheData.isInitializing()) {// It updates when cacheData occurs in cacheMap by first time.inInitializingCacheList.add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));}}}boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();//主要逻辑在这里 return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);}

checkUpdateConfigStr的主逻辑是构造并发起请求如http://127.0.0.1:8848/nacos/v1/cs/configs/listener,然后解析响应内容。若配置文件内容未变更 则请求响应为空字符串,否则为dataId^2group^1tenant,形如nacos-simple-demo.yaml^2DEFAULT_GROUP^1(我这里未配置tenant,因此tenant位置处为空字符),最终checkUpdateConfigStr返回结果为 形如nacos-simple-demo.yaml+DEFAULT_GROUP的格式

List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {Map<String, String> params = new HashMap<String, String>(2);params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);Map<String, String> headers = new HashMap<String, String>(2);//长轮询超时时间 默认30sheaders.put("Long-Pulling-Timeout", "" + timeout);// cacheData第一次初始化时 设置为true 告诉服务器端不hold客户端连接if (isInitializingCacheList) {headers.put("Long-Pulling-Timeout-No-Hangup", "true");}if (StringUtils.isBlank(probeUpdateString)) {return Collections.emptyList();}try {//这里的时间是30000+30000/2=45000long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);HttpRestResult<String> result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),readTimeoutMs);if (result.ok()) {setHealthServer(true);return parseUpdateDataIdResponse(result.getData());} else {setHealthServer(false);//省略无关代码}} catch (Exception e) {setHealthServer(false);//省略无关代码throw e;}return Collections.emptyList();}

我们来看下Nacos Server端是怎么处理http://127.0.0.1:8848/nacos/v1/cs/configs/listener请求的?

通过URL查找到目标Controller是ConfigController

@PostMapping("/listener")@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)public void listener(HttpServletRequest request, HttpServletResponse response)throws ServletException, IOException {request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);//获取Nacos Client端传递的参数,如nacos-simple-demo.yaml\u0002DEFAULT_GROUP\u0002aaba0533cac09aeca47d9f804de3bc68\u0001String probeModify = request.getParameter("Listening-Configs");if (StringUtils.isBlank(probeModify)) {throw new IllegalArgumentException("invalid probeModify");}probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);Map<String, String> clientMd5Map;try {//将probeModify转为Map,key=nacos-simple-demo.yaml+DEFAULT_GROUP,value=aaba0533cac09aeca47d9f804de3bc68clientMd5Map = MD5Util.getClientMd5Map(probeModify);} catch (Throwable e) {throw new IllegalArgumentException("invalid probeModify");}// 这里发起长轮询inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());}

ConfigServletInner#doPollingConfig

public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {// 长轮询,判断条件是 是否设置了Long-Pulling-Timeout,根据前文分析 肯定是走这一段逻辑if (LongPollingService.isSupportLongPolling(request)) {//这里调用longPollingService来添加长轮询客户端longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);return HttpServletResponse.SC_OK + "";}// 兼容短连接逻辑 代码走不到这里 暂不分析// 省略无用代码}

长轮询的主要逻辑在LongPollingService,我们先来看下它的构造函数

final Queue<ClientLongPolling> allSubs;
public LongPollingService() {allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();//这里有个定时任务 每隔10s执行一次,StatTask主要记录长轮询客户端的数量 ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);// 注册 LocalDataChangeEvent到NotifyCenter.NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);// 注册Subscriber去监听LocalDataChangeEvent.NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {if (isFixedPolling()) {// Ignore.} else {if (event instanceof LocalDataChangeEvent) {LocalDataChangeEvent evt = (LocalDataChangeEvent) event;//监听到事件后 即执行DataChangeTask任务 后面展开看ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));}}}@Overridepublic Class<? extends Event> subscribeType() {return LocalDataChangeEvent.class;}});}

接着刚才的地方分析,看下 longPollingService.addLongPollingClient的实现

public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,int probeRequestSize) {//获取长轮询超时时间String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);//获取是否hold住的标记 根据前面的分析 目前是trueString noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);String tag = req.getHeader("Vipserver-Tag");//延迟时间0.5sint delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);// 客户端超时时间默认是30s,假如服务器端也处理了30s,那么客户端肯定会超时,因为这个请求的总时间是服务器端处理时间+网络传输时间,因此设置了延迟时间 以保证客户端不超时//通过计算,Math.max(10000,30000-500)=29.5s,也就是服务器端超时时间是29.5slong timeout = Math.max(10000, Long.parseLong(str) - delayTime);//默认为falseif (isFixedPolling()) {timeout = Math.max(10000, getFixedPollingInterval());// Do nothing but set fix polling timeout.} else {long start = System.currentTimeMillis();//这里是比较缓存中MD5与clientMd5Map中md5,若两者不同 则返回clientMd5Map#key值;第一次由于没有缓存CacheItem,因此两者肯定不同,最终返回数据形如nacos-simple-demo.yaml+DEFAULT_GROUPList<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);if (changedGroups.size() > 0) {//如果发生改变 立即响应出去//生成响应结果,结果形如nacos-simple-demo.yaml^2DEFAULT_GROUP^1generateResponse(req, rsp, changedGroups);//省略无用代码return;} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {//省略无用代码return;}}String ip = RequestUtil.getRemoteIp(req);// 基于Servlet3.0 发起异步请求 final AsyncContext asyncContext = req.startAsync();// AsyncContext.setTimeout() is incorrect, Control by oneselfasyncContext.setTimeout(0L);//如果没有立即响应 则执行定时任务 每29.5s处理一次ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));}

下面看下ClientLongPolling线程执行情况

class ClientLongPolling implements Runnable {@Overridepublic void run() {//创建一个定时任务延迟29.5s执行asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {@Overridepublic void run() {try {getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());/** 到达超时时间,当前ClientLongPolling不需要再维持订阅关系*  删除订阅关系*/allSubs.remove(ClientLongPolling.this);if (isFixedPolling()) {//省略无关代码//这里仍然是比较是否发生变化List<String> changedGroups = MD5Util.compareMd5((HttpServletRequest) asyncContext.getRequest(),(HttpServletResponse) asyncContext.getResponse(), clientMd5Map);if (changedGroups.size() > 0) {//如果发生了变化,则将当前任务取消,返回结果由Response写出sendResponse(changedGroups);} else {sendResponse(null);}} else {//省略无关代码sendResponse(null);}} catch (Throwable t) {//省略无关代码}}}, timeoutTime, TimeUnit.MILLISECONDS);//添加订阅关系allSubs.add(this);}

关于服务端长轮询逻辑就暂时先分析到这。

至此 长轮询检查配置内容、拉取最新配置内容、推送listener等主逻辑分析即完毕

知其所以然之Nacos配置中心源码浅析相关推荐

  1. nacos配置刷新失败导致的cpu上升和频繁重启,nacos配置中心源码解析

    大家好,我是烤鸭: nacos 版本 1.3.2,先说下结论,频繁重启的原因确实没有找到,跟nacos有关,日志没有保留多少,只能从源码找下头绪(出问题的版本 server用的是 nacos 1.1, ...

  2. Apollo 配置中心源码分析

    Apollo 配置中心源码分析 ​ Apollo是携程开源的一款分布式配置管理中心,能够集中化管理应用不同环境.不同集群的配置,配置修改后能够实时推送到应用端,并且具备规范的权限.流程治理等特性,适用 ...

  3. Spring Cloud Alibaba源码 - 16 Nacos 注册中心源码解析

    文章目录 Nacos & Ribbon & Feign 核心微服务架构图 Nacos核心功能 源码下载 & 启动 [standalone模式] [cluster模式] Naco ...

  4. apollo配置中心源码全解析

    文章目录 前言 源码解析 与springboot集成 远程配置的加载 长轮询监听配置更改 服务端长轮询机制 通过客户端发布配置 总结 前言 紧接前文nacos配置中心,本文继续讲目前比较火热的动态配置 ...

  5. spring cloud config配置中心源码分析之注解@EnableConfigServer

    spring cloud config的主函数是ConfigServerApplication,其定义如下: @Configuration @EnableAutoConfiguration @Enab ...

  6. Apollo配置中心源码解析

    1.Apollo配置中心总体设计 一.主流程-流程图 简要流程说明,后面源码分析 备注:一个 Namespace 对应一个 RemoteConfigRepository .多个 RemoteConfi ...

  7. SpringCloud Alibaba——精读Nacos+CMDB+核心源码阅读(7w字长篇)

    文章目录 Nacos 1.介绍 2.使用场景 2.1.动态配置服务 2.2.服务发现及管理 2.2.1.服务注册 2.2.2.服务心跳 2.2.3.服务同步 2.2.4.服务发现 3.环境搭建 3.1 ...

  8. Dubbo2.6.x—注册中心源码分析 dubbo-registry模块 (api and zookeeper)

    文章有点长,亲,要慢慢看! 1. 概述 1.1 注册中心作用 在Dubbo中,注册中心为核心模块,Dubbo通过注册中心实现各个服务之间的注册与发现等功能,而本次源码的分析为registry模块的ap ...

  9. html会员中心源码,响应式自适应手机端会员中心(两种编码)

    模板名称: 织梦响应式自适应手机端会员中心(两种编码) 模板介绍: 大家又有福利咯,新开发的织梦会员中心,响应式自适应手机端,后台充值功能,可以对接支付宝等其他的第三方支付软件, 高端简单大气,重要是 ...

最新文章

  1. numba.jit警告:failed type inference due to: non-precise type pyobject
  2. KVM总结-KVM性能优化之网络性能优化
  3. Laravel安装步骤
  4. iOS: 让自定义控件适应Autolayout注意的问题
  5. 使用AWS Elastic Beanstalk轻松进行Spring Boot部署
  6. leetcode842. 将数组拆分成斐波那契序列(回溯)
  7. 流畅的Python 2. 数据结构 - 序列构成的数组
  8. oracle树子类遍历父类_不懂数据库索引的底层原理?那是因为你心里没点b树
  9. assertion python_【Python】断言功能Assertion
  10. 20161120-安全测试
  11. jsf tree组件_JSF文本组件–标签,文本字段,文本区域和密码
  12. rpm软件包管理的详细解读
  13. [导入][凤穿牡丹][2008精品年代剧][全38集][李小冉 应采儿]
  14. Open vSwitch的安装与运行
  15. echartsdemo可视化统计(笔记分享)
  16. source insight的查找功能
  17. [C]你的n元一次常系数线性方程组解答小助手
  18. PostgreSQL 多维空间几何对象 相交、包含 高效率检索实践 - cube
  19. 为什么戏说php,戏说PHP——1.1切的开始
  20. Python+Vue计算机毕业设计影评网站系统4i684(源码+程序+LW+部署)

热门文章

  1. 牛逼,Python3竟然内置找茬神器!一起来找茬吧!
  2. java计算机毕业设计-游戏账号交易平台-演示录像-源码+数据库+系统+lw文档+mybatis+运行部署
  3. 【Nginx】【一】Nginx简介
  4. VMware出现配置文件 .vmx 是由VMware产品创建,但该产品与此版 VMware workstation 不兼容,因此无法使用--VMware版本不兼容问题
  5. cnpm 安装文件找不到_周杰伦说目前暂时找不到对手_腾讯新闻
  6. 股票macd计算公式php,股票技术指标分析(MA\KDJ\MACD)
  7. 未来计算机朝着微型化 巨型化,从目前来看,未来计算机将朝着微型化、巨型化、__________和智能化方向发展....
  8. 【生活点滴】-- 吉他、口琴如何自学?
  9. windows下安装php相关
  10. 税后 4W?美女HR?程序员小心了,你可能入了 “东南亚博彩骗局”