Nacos——Distro一致性协议

1. 理论

一致性一直都是分布式系统中绕不开的话题。根据CAP中,要么CP(保证强一致性牺牲可用性),要么AP(最终一致性来保证可用性),在市面上也有几种一致性算法,像PaxosRaft,Zookeeper的ZAB等。而Nacos实现了AP和CP,对非持久化实例实现了基于CP的Distro协议,那接下来就看看这个协议的工作流程。

2. 调试环境

由于需要跟踪源码并且在集群模式下,所以这里设计了这样的调试环境

  1. 将Nacos源码克隆,github地址:https://github.com/alibaba/nacos,这里用的是1.4.2版本;

  2. 本地数据库执行distribution/conf目录下的nacos-mysql.sql创建nacos数据库;

  3. 修改console模块下的application.properties,将端口变成读取环境变量(方便一份代码起多个实例),及修改数据库配置;

    server.port=${port}spring.datasource.platform=mysql
    db.num=1
    db.url.0=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
    db.user.0=root
    db.password.0=root
    
  4. IDEA启动多个实例,启动主类在console目录下的Nacos.java,添加环境变量和jvm参数;

    环境变量
    port=8848jvm参数
    -Dnacos.home=D:/nacos-home/nacos-8848 -Dnacos.standalone=false -DembeddedStorage=true
    

    注意:指定nacos home目录时也要按不同端口号区分目录

  1. 在nacos home路径下新建conf目录,创建cluster.conf文件。其他节点的home目录同样操作。

    # 集群中实例地址
    主机ip:8848
    主机ip:8849
    主机ip:8850
    

3. 源码分析

节点启动全量同步其他节点数据

服务端节点启动时,初始化DistroProtocol类,在构造函数中开启同步其他服务端节点数据任务

DistroProtocol

public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig) {this.memberManager = memberManager;this.distroComponentHolder = distroComponentHolder;this.distroTaskEngineHolder = distroTaskEngineHolder;this.distroConfig = distroConfig;// 启动当前节点时同步其他节点的全量数据startDistroTask();
}private void startDistroTask() {if (EnvUtil.getStandaloneMode()) {isInitialized = true;return;}// 开启节点间心跳检测startVerifyTask();// 开启同步其他节点数据startLoadTask();
}/*** 同步其他节点数据*/
private void startLoadTask() {// 同步回调DistroCallback loadCallback = new DistroCallback() {@Overridepublic void onSuccess() {isInitialized = true;}@Overridepublic void onFailed(Throwable throwable) {isInitialized = false;}};// 将同步数据任务放入线程池中执行GlobalExecutor.submitLoadDataTask(new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
}

同步其他节点数据任务类DistroLoadDataTask.run()

@Override
public void run() {try {// 从其他节点加载数据load();// 如果加载数据不成功则开启新的线程继续去拉取全量数据直到加载成功if (!checkCompleted()) {GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());} else {loadCallback.onSuccess();Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");}} catch (Exception e) {loadCallback.onFailed(e);Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);}
}private void load() throws Exception {// 除自身之外没有其他节点,则休眠一秒,等待其他节点启动while (memberManager.allMembersWithoutSelf().isEmpty()) {Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");TimeUnit.SECONDS.sleep(1);}// 等待数据类型初始化完毕while (distroComponentHolder.getDataStorageTypes().isEmpty()) {Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");TimeUnit.SECONDS.sleep(1);}// 加载每个数据类型for (String each : distroComponentHolder.getDataStorageTypes()) {if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));}}
}/*** 从远程节点拉取全量数据*/
private boolean loadAllDataSnapshotFromRemote(String resourceType) {// 用于远程拉取数据DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);// 用于处理数据DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);if (null == transportAgent || null == dataProcessor) {Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}", resourceType, transportAgent, dataProcessor);return false;}// 循环每个节点for (Member each : memberManager.allMembersWithoutSelf()) {try {Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());// 调用远程节点API GET /distro/datums拉取数据DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());// 处理数据boolean result = dataProcessor.processSnapshot(distroData);Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(), result);// 如果处理成功则返回,不再循环其他节点if (result) {return true;}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);}}return false;
}

处理数据DistroConsistencyServiceImpl.processSnapshot()

@Override
public boolean processSnapshot(DistroData distroData) {try {return processData(distroData.getContent());} catch (Exception e) {return false;}
}/*** 处理其他节点返回的数据*/
private boolean processData(byte[] data) throws Exception {if (data.length > 0) {// 反序列化数据为对象Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {// 数据存入DataStore的dataMap中dataStore.put(entry.getKey(), entry.getValue());if (!listeners.containsKey(entry.getKey())) {// pretty sure the service not exist:if (switchDomain.isDefaultInstanceEphemeral()) {// 创建空ServiceLoggers.DISTRO.info("creating service {}", entry.getKey());Service service = new Service();String serviceName = KeyBuilder.getServiceName(entry.getKey());String namespaceId = KeyBuilder.getNamespace(entry.getKey());service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(Constants.DEFAULT_GROUP);// now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();// key=com.alibaba.nacos.naming.domains.meta.的listener必须不能为空RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();if (Objects.isNull(listener)) {return false;}// ServiceManager.onChangelistener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);}}}for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {if (!listeners.containsKey(entry.getKey())) {// Should not happen:Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());continue;}try {for (RecordListener listener : listeners.get(entry.getKey())) {// 调用指定Service.onChange()listener.onChange(entry.getKey(), entry.getValue().value);}} catch (Exception e) {Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);continue;}// 由于对Service进行了修改所以要更新DataStoredataStore.put(entry.getKey(), entry.getValue());}}return true;
}

处理数据步骤:

  1. 存入到DataStore.dataMap
  2. 调用ServiceManager.onChange()
  3. 调用Service.onChange()
  4. 更新DataStore.dataMap

ServiceManager.onChange()

作用:

  1. 将Service添加到ServiceManager.serviceMap变量
  2. 将Service做为RecordListener添加到DistroConsistencyServiceImpl.listeners变量(用来新增、移除实例时,发送通知给订阅者)
@Override
public void onChange(String key, Service service) throws Exception {try {if (service == null) {Loggers.SRV_LOG.warn("received empty push from raft, key: {}", key);return;}if (StringUtils.isBlank(service.getNamespaceId())) {service.setNamespaceId(Constants.DEFAULT_NAMESPACE_ID);}Loggers.RAFT.info("[RAFT-NOTIFIER] datum is changed, key: {}, value: {}", key, service);// 从serviceMap中获取ServiceService oldDom = getService(service.getNamespaceId(), service.getName());// 旧Service不为空,则进行数据更新,并重新加入到DistroConsistencyServiceImpl.listeners中if (oldDom != null) {oldDom.update(service);// re-listen to handle the situation when the underlying listener is removed:consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), oldDom);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), oldDom);} else {// 旧Service不为空,加入到serviceMap和加入到DistroConsistencyServiceImpl.listeners中putServiceAndInit(service);}} catch (Throwable e) {Loggers.SRV_LOG.error("[NACOS-SERVICE] error while processing service update", e);}
}private void putServiceAndInit(Service service) throws NacosException {// service添加到serviceMap中putService(service);service = getService(service.getNamespaceId(), service.getName());// 开启心跳检测service.init();// 添加到DistroConsistencyServiceImpl.listeners中consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

Service.onChange()

@Override
public void onChange(String key, Instances value) throws Exception {Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);// 修正实例的权重值for (Instance instance : value.getInstanceList()) {if (instance == null) {// Reject this abnormal instance list:throw new RuntimeException("got null instance " + key);}if (instance.getWeight() > 10000.0D) {instance.setWeight(10000.0D);}if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {instance.setWeight(0.01D);}}// 更新Service数据updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));// 重新计算校验值recalculateChecksum();
}public void updateIPs(Collection<Instance> instances, boolean ephemeral) {Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());for (String clusterName : clusterMap.keySet()) {ipMap.put(clusterName, new ArrayList<>());}// 校验Instancefor (Instance instance : instances) {try {if (instance == null) {Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");continue;}if (StringUtils.isEmpty(instance.getClusterName())) {instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);}// 如果clusterName不在clusterMap中则初始化新的Clusterif (!clusterMap.containsKey(instance.getClusterName())) {Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson());Cluster cluster = new Cluster(instance.getClusterName(), this);cluster.init();getClusterMap().put(instance.getClusterName(), cluster);}// Cluster下的实例List<Instance> clusterIPs = ipMap.get(instance.getClusterName());if (clusterIPs == null) {clusterIPs = new LinkedList<>();ipMap.put(instance.getClusterName(), clusterIPs);}// 将实例加入到clusterIPs中clusterIPs.add(instance);} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);}}// 循环clusterfor (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {// cluster下的实例集合List<Instance> entryIPs = entry.getValue();// 更新整个cluster下的实例clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);}setLastModifiedMillis(System.currentTimeMillis());// 调用PushService发送事件getPushService().serviceChanged(this);StringBuilder stringBuilder = new StringBuilder();for (Instance instance : allIPs()) {stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");}Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),stringBuilder.toString());}

getPushService().serviceChanged(this)中调用了PushService,其作用是发送UDP请求到对应的订阅者。

同步全量数据逻辑图:

服务注册和心跳续约触发节点间增量同步

从前面Nacos——服务注册和心跳机制一文中,客户端调用了POST /v1/ns/instance进行注册,调用PUT /v1/ns/instance/beat进行心跳续约。现在我们分别从这两个入口分析节点增量数据的同步。

  1. Nacos服务端在处理实例的写操作时,是会转发给指定的节点的

该功能由DistroFilter实现

@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)throws IOException, ServletException {ReuseHttpRequest req = new ReuseHttpServletRequest((HttpServletRequest) servletRequest);HttpServletResponse resp = (HttpServletResponse) servletResponse;String urlString = req.getRequestURI();if (StringUtils.isNotBlank(req.getQueryString())) {urlString += "?" + req.getQueryString();}try {String path = new URI(req.getRequestURI()).getPath();String serviceName = req.getParameter(CommonParams.SERVICE_NAME);// For client under 0.8.0:if (StringUtils.isBlank(serviceName)) {serviceName = req.getParameter("dom");}if (StringUtils.isNotBlank(serviceName)) {serviceName = serviceName.trim();}// 从缓存中获取接口方法,避免运行时反复使用反射,降低性能Method method = controllerMethodsCache.getMethod(req);if (method == null) {throw new NoSuchMethodException(req.getMethod() + " " + path);}String groupName = req.getParameter(CommonParams.GROUP_NAME);if (StringUtils.isBlank(groupName)) {groupName = Constants.DEFAULT_GROUP;}// use groupName@@serviceName as new service name.// in naming controller, will use// com.alibaba.nacos.api.naming.utils.NamingUtils.checkServiceNameFormat to check it's format.String groupedServiceName = serviceName;if (StringUtils.isNotBlank(serviceName) && !serviceName.contains(Constants.SERVICE_INFO_SPLITER)) {groupedServiceName = groupName + Constants.SERVICE_INFO_SPLITER + serviceName;}// 方法中有CanDistro注解,且当前节点不负责输入的服务时则转发给目标节点处理if (method.isAnnotationPresent(CanDistro.class) && !distroMapper.responsible(groupedServiceName))    {String userAgent = req.getHeader(HttpHeaderConsts.USER_AGENT_HEADER);if (StringUtils.isNotBlank(userAgent) && userAgent.contains(UtilsAndCommons.NACOS_SERVER_HEADER)) {// This request is sent from peer server, should not be redirected again:Loggers.SRV_LOG.error("receive invalid redirect request from peer {}", req.getRemoteAddr());resp.sendError(HttpServletResponse.SC_BAD_REQUEST,"receive invalid redirect request from peer " + req.getRemoteAddr());return;}// 计算得出目标服务节点final String targetServer = distroMapper.mapSrv(groupedServiceName);List<String> headerList = new ArrayList<>(16);Enumeration<String> headers = req.getHeaderNames();while (headers.hasMoreElements()) {String headerName = headers.nextElement();headerList.add(headerName);headerList.add(req.getHeader(headerName));}final String body = IoUtils.toString(req.getInputStream(), Charsets.UTF_8.name());final Map<String, String> paramsValue = HttpClient.translateParameterMap(req.getParameterMap());// 请求目标服务节点RestResult<String> result = HttpClient.request("http://" + targetServer + req.getRequestURI(), headerList, paramsValue, body,PROXY_CONNECT_TIMEOUT, PROXY_READ_TIMEOUT, Charsets.UTF_8.name(), req.getMethod());String data = result.ok() ? result.getData() : result.getMessage();try {WebUtils.response(resp, data, result.getCode());} catch (Exception ignore) {Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + distroMapper.mapSrv(groupedServiceName) + urlString);}} else {OverrideParameterRequestWrapper requestWrapper = OverrideParameterRequestWrapper.buildRequest(req);requestWrapper.addParameter(CommonParams.SERVICE_NAME, groupedServiceName);filterChain.doFilter(requestWrapper, resp);}} catch (AccessControlException e) {resp.sendError(HttpServletResponse.SC_FORBIDDEN, "access denied: " + ExceptionUtil.getAllExceptionMsg(e));} catch (NoSuchMethodException e) {resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED,"no such api:" + req.getMethod() + ":" + req.getRequestURI());} catch (Exception e) {resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,"Server failed," + ExceptionUtil.getAllExceptionMsg(e));}}

DistroMapper:判断是否由当前节点处理,及计算目标节点

   /*** 判断当前服务端节点是否负责输入的服务*/public boolean responsible(String serviceName) {final List<String> servers = healthyList;// 当关闭了distro或standalone模式运行时,只能当前服务端处理if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {return true;}if (CollectionUtils.isEmpty(servers)) {// means distro config is not ready yetreturn false;}// 当前地址在节点集合中的开始位置int index = servers.indexOf(EnvUtil.getLocalAddress());// 当前地址在节点集合中的最后位置,因为集合中可能会有相同的地址int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());// 表示集合中不存在当前节点,则当前节点处理// 这样处理的作用是:当前节点已经不健康了,不应该继续传输到其他节点,让问题变得更加大if (lastIndex < 0 || index < 0) {return true;}// 对服务名进行hash运算,计算得出服务节点下标int target = distroHash(serviceName) % servers.size();// target在[index, lastIndex]之间则使用当前节点处理写操作,否则使用其他节点return target >= index && target <= lastIndex;}/*** 计算哪个服务端节点处理输入的服务*/public String mapSrv(String serviceName) {final List<String> servers = healthyList;// 当节点集合为空,或者关闭了distro,则返回当前的节点if (CollectionUtils.isEmpty(servers) || !switchDomain.isDistroEnabled()) {return EnvUtil.getLocalAddress();}try {// 对服务名进行hash运算,计算得出目标节点下标int index = distroHash(serviceName) % servers.size();return servers.get(index);} catch (Throwable e) {Loggers.SRV_LOG.warn("[NACOS-DISTRO] distro mapper failed, return localhost: " + EnvUtil.getLocalAddress(), e);return EnvUtil.getLocalAddress();}}
  1. 服务注册(POST /v1/ns/instance)

​ 接口关键逻辑ServiceManager.registerInstance()

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {// 创建空Service对象到serviceMap中,并加入到DistroConsistencyServiceImpl.listenerscreateEmptyService(namespaceId, serviceName, instance.isEphemeral());// 获取刚刚创建的Service对象Service service = getService(namespaceId, serviceName);if (service == null) {throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName);}// 添加实例,这里会同步数据到其他节点addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {createServiceIfAbsent(namespaceId, serviceName, local, null);
}public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)throws NacosException {Service service = getService(namespaceId, serviceName);if (service == null) {Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);service = new Service();service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(NamingUtils.getGroupName(serviceName));// now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();if (cluster != null) {cluster.setService(service);service.getClusterMap().put(cluster.getName(), cluster);}service.validate();putServiceAndInit(service);// 非持久化实例不执行if (!local) {addOrReplaceService(service);}}
}private void putServiceAndInit(Service service) throws NacosException {// service添加到serviceMap中putService(service);service = getService(service.getNamespaceId(), service.getName());service.init();// 添加到DistroConsistencyServiceImpl.listeners中consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}/*** 添加实例到Service中*/
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)throws NacosException {String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);Service service = getService(namespaceId, serviceName);synchronized (service) {// 添加实例到Service中List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);Instances instances = new Instances();instances.setInstanceList(instanceList);// 同步增量数据到其他节点consistencyService.put(key, instances);}
}

注册步骤:

  1. 创建Service对象到serviceMap中,并加入到DistroConsistencyServiceImpl.listeners
  2. 添加服务实例到Service对象中,并将数据同步到其他服务端节点

同步的关键在DistroConsistencyServiceImpl.put(),这里用到了委派模式,调用了DelegateConsistencyServiceImpl,然后委派到具体的一致性实现,因为持久化和非持久化实例的一致性是由不同算法实现的。

DistroConsistencyServiceImpl

@Override
public void put(String key, Record value) throws NacosException {// 存入DataStore,并加入到notifier任务队列,给订阅者发送udp请求onPut(key, value);// 同步数据到其他节点distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);
}public void onPut(String key, Record value) {// 非持久化实例,保存到DataStore中if (KeyBuilder.matchEphemeralInstanceListKey(key)) {Datum<Instances> datum = new Datum<>();datum.value = (Instances) value;datum.key = key;datum.timestamp.incrementAndGet();dataStore.put(key, datum);}if (!listeners.containsKey(key)) {return;}// 添加任务到notifier任务队列中,用于触发udp发送notifier.addTask(key, DataOperation.CHANGE);
}

DistroProtocol.sync()

/*** 开始同步数据到所有远程节点*/
public void sync(DistroKey distroKey, DataOperation action, long delay) {// 循环除自己之外的其他节点for (Member each : memberManager.allMembersWithoutSelf()) {DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress());DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);// 加入到NacosDelayTaskExecuteEngine中的tasks变量,异步同步数据distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());}}
}

循环除自己之外的每个节点,将同步任务加入到NacosDelayTaskExecuteEngine的tasks变量中,而该类有个定时任务从tasks变量中获取任务然后执行

NacosDelayTaskExecuteEngine

/*** 构造方法*/
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {super(logger);tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));// 固定时间周期执行processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}/*** 添加任务到tasks*/
@Override
public void addTask(Object key, AbstractDelayTask newTask) {lock.lock();try {AbstractDelayTask existTask = tasks.get(key);if (null != existTask) {newTask.merge(existTask);}tasks.put(key, newTask);} finally {lock.unlock();}
}private class ProcessRunnable implements Runnable {@Overridepublic void run() {try {processTasks();} catch (Throwable e) {getEngineLog().error(e.toString(), e);}}
}/*** 线程池执行任务*/
protected void processTasks() {Collection<Object> keys = getAllTaskKeys();for (Object taskKey : keys) {// 获取任务AbstractDelayTask task = removeTask(taskKey);if (null == task) {continue;}// 获取任务执行器NacosTaskProcessor processor = getProcessor(taskKey);if (null == processor) {getEngineLog().error("processor not found for task, so discarded. " + task);continue;}try {// 执行任务,如果执行失败重新加入到tasks中if (!processor.process(task)) {retryFailedTask(taskKey, task);}} catch (Throwable e) {getEngineLog().error("Nacos task execute error : " + e.toString(), e);retryFailedTask(taskKey, task);}}
}

这里调用NacosTaskProcessor实现类DistroHttpDelayTaskProcessor

@Override
public boolean process(NacosTask task) {DistroDelayTask distroDelayTask = (DistroDelayTask) task;DistroKey distroKey = distroDelayTask.getDistroKey();DistroHttpCombinedKeyExecuteTask executeTask = new DistroHttpCombinedKeyExecuteTask(globalConfig,distroTaskEngineHolder.getDelayTaskExecuteEngine(), distroKey, distroDelayTask.getAction());distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, executeTask);return true;
}

NacosExecuteTaskExecuteEngine

@Override
public void addTask(Object tag, AbstractExecuteTask task) {NacosTaskProcessor processor = getProcessor(tag);if (null != processor) {processor.process(task);return;}TaskExecuteWorker worker = getWorker(tag);worker.process(task);
}

TaskExecuteWorker

/*** 构造函数*/
public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {this.name = name + "_" + mod + "%" + total;this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);this.closed = new AtomicBoolean(false);this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;new InnerWorker(name).start();
}@Override
public boolean process(NacosTask task) {if (task instanceof AbstractExecuteTask) {putTask((Runnable) task);}return true;
}private void putTask(Runnable task) {try {// 加入到队列中queue.put(task);} catch (InterruptedException ire) {log.error(ire.toString(), ire);}
}private class InnerWorker extends Thread {InnerWorker(String name) {setDaemon(false);setName(name);}@Overridepublic void run() {while (!closed.get()) {try {// 从队列中获取任务Runnable task = queue.take();long begin = System.currentTimeMillis();// 执行任务task.run();long duration = System.currentTimeMillis() - begin;if (duration > 1000L) {log.warn("distro task {} takes {}ms", task, duration);}} catch (Throwable e) {log.error("[DISTRO-FAILED] " + e.toString(), e);}}}
}

调用DistroHttpCombinedKeyExecuteTask.run()

@Override
public void run() {try {DistroKey newKey = new DistroKey(DistroHttpCombinedKey.getSequenceKey(),DistroHttpCombinedKeyDelayTask.class.getSimpleName(), singleDistroKey.getTargetServer());DistroHttpCombinedKeyDelayTask combinedTask = new DistroHttpCombinedKeyDelayTask(newKey, taskAction,globalConfig.getTaskDispatchPeriod() / 2, globalConfig.getBatchSyncKeyCount());combinedTask.getActualResourceKeys().add(singleDistroKey.getResourceKey());distroDelayTaskExecuteEngine.addTask(newKey, combinedTask);} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] Combined key for http failed. ", e);}
}

又走回NacosDelayTaskExecuteEngine, 不过这次获取的执行器是DistroDelayTaskProcessor

@Override
public boolean process(NacosTask task) {if (!(task instanceof DistroDelayTask)) {return true;}DistroDelayTask distroDelayTask = (DistroDelayTask) task;DistroKey distroKey = distroDelayTask.getDistroKey();if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);return true;}return false;
}

DistroSyncChangeTask.run():调用节点的Http接口同步数据

@Override
public void run() {Loggers.DISTRO.info("[DISTRO-START] {}", toString());try {String type = getDistroKey().getResourceType();// 从DataStore中获取数据DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey());distroData.setType(DataOperation.CHANGE);// http调用PUT /distro/datum接口同步boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());if (!result) {handleFailedTask();}Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result);} catch (Exception e) {Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e);handleFailedTask();}
}

注册逻辑图:

  1. 心跳续约(PUT /v1/ns/instance/beat)

    关键逻辑Service.processClientBeat()

    /*** 执行客户端心跳操作*/
    public void processClientBeat(final RsInfo rsInfo) {ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();clientBeatProcessor.setService(this);clientBeatProcessor.setRsInfo(rsInfo);// 异步执行ClientBeatProcessor任务HealthCheckReactor.scheduleNow(clientBeatProcessor);
    }
    

    ClientBeatProcessor.run()

    @Override
    public void run() {Service service = this.service;if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());}String ip = rsInfo.getIp();String clusterName = rsInfo.getCluster();int port = rsInfo.getPort();Cluster cluster = service.getClusterMap().get(clusterName);List<Instance> instances = cluster.allIPs(true);for (Instance instance : instances) {if (instance.getIp().equals(ip) && instance.getPort() == port) {if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());}// 更新最后心跳时间instance.setLastBeat(System.currentTimeMillis());if (!instance.isMarked()) {// 如果之前是不健康状态更新为健康状态if (!instance.isHealthy()) {instance.setHealthy(true);Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok", cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);// 给订阅者发送udp请求getPushService().serviceChanged(service);}}}}
    }
    

​ 心跳续约为什么不用同步数据到其他节点呢?继续往下看

实例不健康时同步数据

在创建Service后,会调用Service.init(),其中有个定时任务每5秒检测Service是否健康

Service

private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);public void init() {// 心跳检测HealthCheckReactor.scheduleCheck(clientBeatCheckTask);for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);entry.getValue().init();}
}

心跳任务ClientBeatCheckTask

@Override
public void run() {try {// 是否由当前节点处理if (!getDistroMapper().responsible(service.getName())) {return;}if (!getSwitchDomain().isHealthCheckEnabled()) {return;}List<Instance> instances = service.allIPs(true);// first set health status of instances:for (Instance instance : instances) {// 当前时间-最后心跳时间是否大于15秒if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {if (instance.isHealthy()) {instance.setHealthy(false);Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}", instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(), UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());// 发送UDPgetPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}// then remove obsolete instances:for (Instance instance : instances) {if (instance.isMarked()) {continue;}// 当前时间-最后心跳时间是否大于30秒if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),JacksonUtils.toJson(instance));// 调用当前节点http接口删除实例deleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}private void deleteIp(Instance instance) {try {NamingProxy.Request request = NamingProxy.Request.newRequest();request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort())).appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName()).appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId());// 调用当前节点http接口删除实例String url = "http://" + IPUtil.localHostIP() + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort() + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl();// delete instance asynchronously:HttpClient.asyncHttpDelete(url, null, null, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.SRV_LOG.error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}", instance.toJson(), result.getMessage(), result.getCode());}}@Overridepublic void onError(Throwable throwable) {Loggers.SRV_LOG.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), throwable);}@Overridepublic void onCancel() {}});} catch (Exception e) {Loggers.SRV_LOG.error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e);}
}

心跳检测逻辑:当前时间减去最后心跳时间t,当t大于15秒,则标记实例为不健康;当t大于30秒,则删除实例。

删除实例时调用了自身的http接口DELETE /v1/ns/instance,接口中关键逻辑ServiceManager.removeInstance()

private void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Service service,Instance... ips) throws NacosException {String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);List<Instance> instanceList = substractIpAddresses(service, ephemeral, ips);Instances instances = new Instances();instances.setInstanceList(instanceList);consistencyService.put(key, instances);
}

当看到ConsistencyService.put()时可以看出删除实例会同步到其他服务端节点。

这里可能会有疑问,就是当实例标记为不健康的时候,为什么不把状态同步到其他节点?心跳续约的时候也没有同步。答案只有一个,因为有定时任务会对比节点间的数据,如果有变更(比如状态变为不健康,元数据修改等),会拉取全量数据进行同步,这个逻辑在DistroProtocol.startVerifyTask(),当节点启动时会启动该定时任务

private void startVerifyTask() {GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTask(memberManager, distroComponentHolder), distroConfig.getVerifyIntervalMillis());
}

DistroVerifyTask.run()

@Override
public void run() {try {// 获取除自身之外的其他节点List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("server list is: {}", targetServer);}// 循环数据类型for (String each : distroComponentHolder.getDataStorageTypes()) {verifyForDataStorage(each, targetServer);}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);}
}private void verifyForDataStorage(String type, List<Member> targetServer) {// 获取数据的校验值,这里为了减少数据的传输量DistroData distroData = distroComponentHolder.findDataStorage(type).getVerifyData();if (null == distroData) {return;}distroData.setType(DataOperation.VERIFY);// 循环每个节点for (Member member : targetServer) {try {// http调用PUT /distro/checksumdistroComponentHolder.findTransportAgent(type).syncVerifyData(distroData, member.getAddress());} catch (Exception e) {Loggers.DISTRO.error(String.format("[DISTRO-FAILED] verify data for type %s to %s failed.", type, member.getAddress()), e);}}
}

PUT /distro/checksum接口在DistroController中,关键逻辑在DistroConsistencyServiceImpl.onReceiveChecksums()

public void onReceiveChecksums(Map<String, String> checksumMap, String server) {if (syncChecksumTasks.containsKey(server)) {// Already in process of this server:Loggers.DISTRO.warn("sync checksum task already in process with {}", server);return;}syncChecksumTasks.put(server, "1");try {List<String> toUpdateKeys = new ArrayList<>();List<String> toRemoveKeys = new ArrayList<>();for (Map.Entry<String, String> entry : checksumMap.entrySet()) {// 服务是自身节点负责的不需要处理if (distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) {// this key should not be sent from remote server:Loggers.DISTRO.error("receive responsible key timestamp of " + entry.getKey() + " from " + server);// abort the procedure:return;}// checksum值不一样则加入到toUpdateKeys中if (!dataStore.contains(entry.getKey()) || dataStore.get(entry.getKey()).value == null || !dataStore.get(entry.getKey()).value.getChecksum().equals(entry.getValue())) {toUpdateKeys.add(entry.getKey());}}for (String key : dataStore.keys()) {if (!server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) {continue;}if (!checksumMap.containsKey(key)) {toRemoveKeys.add(key);}}Loggers.DISTRO.info("to remove keys: {}, to update keys: {}, source: {}", toRemoveKeys, toUpdateKeys, server);for (String key : toRemoveKeys) {onRemove(key);}if (toUpdateKeys.isEmpty()) {return;}try {DistroHttpCombinedKey distroKey = new DistroHttpCombinedKey(KeyBuilder.INSTANCE_LIST_KEY_PREFIX, server);distroKey.getActualResourceTypes().addAll(toUpdateKeys);// 从远程拉取数据DistroData remoteData = distroProtocol.queryFromRemote(distroKey);if (null != remoteData) {// 处理数据——对服务实例进行更新processData(remoteData.getContent());}} catch (Exception e) {Loggers.DISTRO.error("get data from " + server + " failed!", e);}} finally {// Remove this 'in process' flag:syncChecksumTasks.remove(server);}
}

当服务的校验值不一样时,表示数据有变更,则从远程拉取数据然后变更到本节点中,所以心跳续约和实例状态不健康是不用在方法中调用一致性服务的。

服务订阅来获取服务数据

不管在注册实例、实例健康状态改变、删除实例等源码中,都可以见到getPushService().serviceChanged(this)的身影。上面已经分析过其中的逻辑是给订阅者发送UDP请求,在上一篇文章Nacos——服务订阅中讲到服务订阅是定时从服务端拉取最新实例的,其实还有服务端主动通过UDP推送最新的实例状态给订阅者,这个也就是PushService的作用。

从客户端角度出发看接收到UDP请求做了什么操作?在源码PushReceiver中会不断接收UDP请求然后进行处理

PushReceiver

@Override
public void run() {// 不断接收UDP请求while (!closed) {try {// byte[] is initialized with 0 full filled by defaultbyte[] buffer = new byte[UDP_MSS];DatagramPacket packet = new DatagramPacket(buffer, buffer.length);// 接收到的请求放入packet中udpSocket.receive(packet);String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);String ack;if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {// 处理UDP请求hostReactor.processServiceJson(pushPacket.data);// send ack to serverack = "{\"type\": \"push-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime + "\",\"data\":" + "\"\"}";} else if ("dump".equals(pushPacket.type)) {// dump data to serverack = "{\"type\": \"dump-ack\"" + ", \"lastRefTime\": \"" + pushPacket.lastRefTime + "\",\"data\":" + "\"" + StringUtils.escapeJavaScript(JacksonUtils.toJson(hostReactor.getServiceInfoMap())) + "\"}";} else {// do nothing send ack onlyack = "{\"type\": \"unknown-ack\"" + ", \"lastRefTime\":\"" + pushPacket.lastRefTime+ "\", \"data\":" + "\"\"}";}udpSocket.send(new DatagramPacket(ack.getBytes(UTF_8), ack.getBytes(UTF_8).length,packet.getSocketAddress()));} catch (Exception e) {if (closed) {return;}NAMING_LOGGER.error("[NA] error while receiving push data", e);}}
}

HostReactor.processServiceJson()处理请求,逻辑主要就是更新serviceInfoMap变量。

4. 总结

能看到这里实属不易,因为Distro一致性实现用了大量的异步任务,以及事件机制。在分析心跳续约和实例状态不健康时怎么也想不明白为什么不用同步变更数据到其他节点,重新阅读了一遍《Nacos架构与原理》电子书才发现有个节点之间的同步。源码有些绕,但也有这样写的目的,后面说说这样写的好处。相信看完这篇文章对Nacos的非持久化实例数据一致性有所了解,但没有聚合成整体的架构是吧?敬请期待…

谢谢阅读,就分享到这,未完待续…

欢迎同频共振的那一部分人

作者公众号:Tarzan写bug

Nacos——Distro一致性协议相关推荐

  1. Nacos内核设计之一致性协议

    Nacos一致性协议 Nacos技术架构 先简单介绍下Nacos的技术架构 从而对nacos有一个整体的认识 如图Nacos架构分为四层 用户层.应用层.核心层.各种插件 再深入分析下nacos一致性 ...

  2. 3.Nacos一致性协议Raft

    Nacos一致性协议 分布式一致性协议有很多,例如Paxos协议,Zab协议,Raft协议,而Nacos采用的是Distro协议和Raft协议.对于非临时数据,Nacos采用的是Raft协议,而临时数 ...

  3. 分布式一致性协议:Raft协议

    文章目录 1. Raft协议 ①:Leader选举 ②:集群节点数据同步 ③:发生网络分区,出现脑裂如何处理? 1. Raft协议 Raft协议动态演示图 nacos的CP架构是通过Raft协议来实现 ...

  4. Raft分布式一致性协议基本过程

    前言 raft协议是分布式一致性协议的一种实现方案,那么什么是分布式一致性,这就还得需要了解其他的网络知识了 为了向用户提供服务,就需要有对应的服务器提供,在早些年的时候,很多网络应用服务器节点都只有 ...

  5. CAP与一致性协议算法

    一致性算法是为了解决分布式系统的CAP理论中的一致性(Consistency)问题 数据一致性协议广泛存在于分布式系统中,例如:注册中心,消息中间件(kafka),分布式数据库 可以说只要是涉及到集群 ...

  6. 超详细解析 | 一致性协议算法-2PC、3PC、Paxos、Raft、ZAB、NWR

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 来源:r6d.cn/VMW9 背景 在常见的分布式系统中, ...

  7. 面试官问:ZooKeeper 一致性协议 ZAB 原理

    2019独角兽企业重金招聘Python工程师标准>>> 一致性协议有很多种,比如 Paxos,Raft,2PC,3PC等等,今天我们讲一种协议,ZAB 协议,该协议应该是所有一致性协 ...

  8. 一致性协议算法-2PC、3PC、Paxos、Raft、ZAB、NWR超详细解析

    背景 在常见的分布式系统中,总会发生诸如机器宕机或网络异常(包括消息的延迟.丢失.重复.乱序,还有网络分区)等情况. 一致性算法需要解决的问题就是如何在一个可能发生上述异常的分布式系统中,快速且正确地 ...

  9. 【Zookeeper】Zookeeper一致性协议——ZAB

    Zookeeper一致性协议--ZAB ZAB协议简介 Zookeeper通过ZAB保证分布式事务的最终一致性. ZAB全称Zookeeper Atomic Broadcast(ZAB,Zookeep ...

  10. PacificA 一致性协议解读

    2019独角兽企业重金招聘Python工程师标准>>> PacificA 的 paper 在 08 年左右发出来的,比 Raft 早了 6,7 年. 在 PacificA 论文中,他 ...

最新文章

  1. 在.NET2.0中解析Json和Xml
  2. docker -v 文件夹下没有数据_详细!快速入门指南!Docker
  3. 对于这款APP,我充了个终身VIP!!!
  4. javafx动画_JavaFX动画工具
  5. java 比较源文件_Beyond Compare比较Java源代码文件的操作方法
  6. php try catch,php源码-try、catch过程-原理
  7. 远控免杀从入门到实践
  8. RTOS中的任务句柄到底是什么意思?
  9. 19行Python代码让你拥有属于自己的智能聊天机器人
  10. 如何免费低价获取一切资源?​
  11. 《深入学习VMware vSphere 6》——2.3 在VMware Workstation虚拟机中安装ESXi 6
  12. 课堂派资料PDF文件下载
  13. 浅析智能访客机的应用
  14. 关于智能机器人的一些伦理道德问题
  15. 架构专家李伟山:电商系统之订单系统
  16. 电容式位移传感器的全球与中国市场2022-2028年:技术、参与者、趋势、市场规模及占有率研究报告
  17. Unity学习笔记-I2 localization
  18. 《程序员升职记》2.繁忙的收发室
  19. 北大集训2020游记
  20. Matlab转置transpose, .‘与复共轭转置ctranspose, ‘

热门文章

  1. Django模板标签regroup方法对对象进行分组
  2. AI 技术本身的一些优势,比如它能够从大量数据里去总结背后的规律
  3. PHP繁體,php如何实现转繁体
  4. 纯c语言设计的打字母游戏代码,纯c语言编写的打字游戏代码
  5. mac安装搜狗输入法
  6. 极智开发 | Go 安装教程
  7. arduino uno连接超声波传感器测距
  8. mysql列名小写_MySQL表名、列名区分大小写详解
  9. 2021-2027全球与中国铂金芯片温度传感器市场现状及未来发展趋势
  10. 根号二怎么不用计算机算,[转载]开根号,不用计算器你会么?