为什么Nacos需要一致性协议?

简单来说就是为了保证在集群模式下各个节点之间数据一致性以及数据同步。

Distro协议是什么?

Distro 协议是 Nacos 社区自研的一种 AP 分布式协议,是面向临时实例设计的一种分布式协议,其保证了在某些 Nacos 节点宕机后,整个临时实例处理系统依旧可以正常工作。

Distro协议的设计思想

  • Nacos 每个节点是平等的都可以处理写请求,同时把新数据同步到其他节点。

  • 每个节点只负责部分数据,定时发送自己负责数据的校验值到其他节点来保持数据一致性。

  • 每个节点独立处理读请求,及时从本地发出响应。

源码分析

数据初始化

新加入的 Distro 节点会进行全量数据拉取。具体操作是轮询所有的 Distro 节点,通过向其他的机器发送请求拉取全量数据。

首先,在DistroProtocol类的构造方法中启动了一个startDistroTask()任务,其中包括了初始化同步任务 startLoadTask()

private void startDistroTask() {if (EnvUtil.getStandaloneMode()) {isInitialized = true;return;}startVerifyTask();startLoadTask();
}

startLoadTask()数据加载任务创建了一个DistroLoadDataTask任务,并传入了一个修改当前节点Distro协议完成状态的回调函数。

private void startLoadTask() {DistroCallback loadCallback = new DistroCallback() {@Overridepublic void onSuccess() {isInitialized = true;}@Overridepublic void onFailed(Throwable throwable) {isInitialized = false;}};GlobalExecutor.submitLoadDataTask(new DistroLoadDataTask(memberManager, distroComponentHolder, DistroConfig.getInstance(), loadCallback));
}

加载任务load()启动,调用loadAllDataSnapshotFromRemote获取同步数据。

private void load() throws Exception {// 若出自身之外没有其他节点,则休眠1秒,可能其他节点还未启动完毕while (memberManager.allMembersWithoutSelf().isEmpty()) {Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");TimeUnit.SECONDS.sleep(1);}// 若数据类型为空,说明distroComponentHolder的组件注册器还未初始化完毕while (distroComponentHolder.getDataStorageTypes().isEmpty()) {Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");TimeUnit.SECONDS.sleep(1);}// 加载每个类型的数据for (String each : distroComponentHolder.getDataStorageTypes()) {if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {// 调用加载方法,并标记已处理loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));}}
}

从其他节点获取同步数据,使用DistroTransportAgent获取数据,使用DistroDataProcessor来处理数据。

private boolean loadAllDataSnapshotFromRemote(String resourceType) {// 获取数据传输对象DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);// 获取数据处理器DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);if (null == transportAgent || null == dataProcessor) {Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",resourceType, transportAgent, dataProcessor);return false;}// 向每个节点请求数据for (Member each : memberManager.allMembersWithoutSelf()) {try {Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());// 获取数据DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());// 解析数据    boolean result = dataProcessor.processSnapshot(distroData);Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),result);// 若解析成功,标记此类型数据已加载完毕if (result) {distroComponentHolder.findDataStorage(resourceType).finishInitial();return true;}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);}}return false;
}

使用DistroTransportAgent获取数据。

public DistroData getDatumSnapshot(String targetServer) {// 从节点管理器获取目标节点信息Member member = memberManager.find(targetServer);// 判断目标服务器是否健康if (checkTargetServerStatusUnhealthy(member)) {throw new DistroException(String.format("[DISTRO] Cancel get snapshot caused by target server %s unhealthy", targetServer));}// 构建请求参数DistroDataRequest request = new DistroDataRequest();// 设置请求的操作类型为DataOperation.SNAPSHOTrequest.setDataOperation(DataOperation.SNAPSHOT);try {// 使用Rpc代理对象发送同步rpc请求Response response = clusterRpcClientProxy.sendRequest(member, request);if (checkResponse(response)) {return ((DistroDataResponse) response).getDistroData();} else {throw new DistroException(String.format("[DISTRO-FAILED] Get snapshot request to %s failed, code: %d, message: %s",targetServer, response.getErrorCode(), response.getMessage()));}} catch (NacosException e) {throw new DistroException("[DISTRO-FAILED] Get distro snapshot failed! ", e);}
}

使用DistroDataProcessor处理数据,调用handlerClientSyncData方法进行处理。

public boolean processSnapshot(DistroData distroData) {// 反序列化获取的DistroData为ClientSyncDatumSnapshotClientSyncDatumSnapshot snapshot = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncDatumSnapshot.class);// 处理结果集,这里将返回远程节点负责的所有client以及client下面的service、instance信息for (ClientSyncData each : snapshot.getClientSyncDataList()) {// 每次处理一个clienthandlerClientSyncData(each);}return true;
}

handlerClientSyncData方法。

private void handlerClientSyncData(ClientSyncData clientSyncData) {Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());// 因为是同步数据,因此创建IpPortBasedClient,并缓存clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());Client client = clientManager.getClient(clientSyncData.getClientId());// 升级此客户端的服务信息upgradeClient(client, clientSyncData);
}

关键方法upgradeClient。

private void upgradeClient(Client client, ClientSyncData clientSyncData) {List<String> namespaces = clientSyncData.getNamespaces();List<String> groupNames = clientSyncData.getGroupNames();List<String> serviceNames = clientSyncData.getServiceNames();List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();// 已同步的服务集合Set<Service> syncedService = new HashSet<>();for (int i = 0; i < namespaces.size(); i++) {// 从获取的数据中构建一个Service对象Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));Service singleton = ServiceManager.getInstance().getSingleton(service);// 标记此service已被处理syncedService.add(singleton);// 获取当前的实例InstancePublishInfo instancePublishInfo = instances.get(i);// 判断是否已经包含当前实例if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {// 不包含则添加client.addServiceInstance(singleton, instancePublishInfo);// 当前节点发布服务注册事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));}}// 若当前client内部已发布的service不在本次同步的列表内,说明已经过时了,要删掉for (Service each : client.getAllPublishedService()) {if (!syncedService.contains(each)) {client.removeServiceInstance(each);// 发布客户端下线事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));}}
}

至此就完成了数据初始化同步,在全量拉取操作完成之后,Nacos 的每台机器上都维护了当前的所有注册上来的非持久化实例数据。

增量数据同步

数据完成初始化后,节点的数据发生变化后需要讲增量数据同步到其他节点。

DistroClientDataProcessor类继承了SmartSubscriber,遵循Subscriber/Notify模式,当有订阅的事件时会进行回调通知。DistroClientDataProcessor订阅了ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent事件。

public List<Class<? extends Event>> subscribeTypes() {List<Class<? extends Event>> result = new LinkedList<>();result.add(ClientEvent.ClientChangedEvent.class);result.add(ClientEvent.ClientDisconnectEvent.class);result.add(ClientEvent.ClientVerifyFailedEvent.class);return result;
}

这里我们重点关注ClientChangedEvent事件,当ClientChangedEvent事件发生时,DefaultPublisher会回调onEvent方法。

public void onEvent(Event event) {if (EnvUtil.getStandaloneMode()) {return;}if (!upgradeJudgement.isUseGrpcFeatures()) {return;}if (event instanceof ClientEvent.ClientVerifyFailedEvent) {syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {//增量同步调用方法syncToAllServer((ClientEvent) event);}
}

syncToAllServer方法调用DistroProtocol类的sync方法进行数据同步。

private void syncToAllServer(ClientEvent event) {Client client = event.getClient();// Only ephemeral data sync by Distro, persist client should sync by raft.if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);}//节点变更事件,即增量数据的同步方法else if (event instanceof ClientEvent.ClientChangedEvent) {DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}
}

向除本节点外的所有节点进行数据同步,对每个节点执行具体的同步逻辑syncToTarget方法。

public void sync(DistroKey distroKey, DataOperation action, long delay) {for (Member each : memberManager.allMembersWithoutSelf()) {syncToTarget(distroKey, action, each.getAddress(), delay);}
}

调用distroTaskEngineHolder发布延迟任务。

public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),targetServer);DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);}
}

调用DistroDelayTaskProcessor的process() 方法进行任务投递。执行变更任务 DistroSyncChangeTask向指定节点发送消息。

public boolean process(NacosTask task) {if (!(task instanceof DistroDelayTask)) {return true;}DistroDelayTask distroDelayTask = (DistroDelayTask) task;DistroKey distroKey = distroDelayTask.getDistroKey();switch (distroDelayTask.getAction()) {case DELETE:DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);return true;case CHANGE:case ADD:DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);return true;default:return false;}
}

使用DistroClientTransportAgent进行实际的数据发送。

public boolean syncData(DistroData data, String targetServer) {if (isNoExistTarget(targetServer)) {return true;}DistroDataRequest request = new DistroDataRequest(data, data.getType());Member member = memberManager.find(targetServer);if (checkTargetServerStatusUnhealthy(member)) {Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);return false;}try {// 使用Rpc代理对象发送同步rpc请求Response response = clusterRpcClientProxy.sendRequest(member, request);return checkResponse(response);} catch (NacosException e) {Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);}return false;
}

DistroDataRequestHandler用于处理Distro协议相关的RPC请求,可以看出ADD、CHANGE、DELETE都是通过handleSyncData方法进行处理。

public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {try {switch (request.getDataOperation()) {case VERIFY:return handleVerify(request.getDistroData(), meta);case SNAPSHOT:return handleSnapshot();case ADD:case CHANGE:case DELETE:return handleSyncData(request.getDistroData());case QUERY:return handleQueryData(request.getDistroData());default:return new DistroDataResponse();}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);DistroDataResponse result = new DistroDataResponse();result.setErrorCode(ResponseCode.FAIL.getCode());result.setMessage("handle distro request with exception");return result;}
}

通过DistroClientDataProcessor的processData方法,调用handlerClientSyncData方法。

public boolean processData(DistroData distroData) {switch (distroData.getType()) {case ADD:case CHANGE:ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncData.class);handlerClientSyncData(clientSyncData);return true;case DELETE:String deleteClientId = distroData.getDistroKey().getResourceKey();Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);clientManager.clientDisconnected(deleteClientId);return true;default:return false;}
}

将同步过来的Client信息进行缓存,然后调用upgradeClient方法升级此客户端的服务信息,与全量同步的过程一致。

private void handlerClientSyncData(ClientSyncData clientSyncData) {Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());Client client = clientManager.getClient(clientSyncData.getClientId());upgradeClient(client, clientSyncData);
}

数据校验

在 Distro 集群启动之后,各台机器之间会定期的发送心跳。心跳信息主要为各个机器上的所有数据的元信息(之所以使用元信息,是因为需要保证网络中数据传输的量级维持在一个较低水平)。这种数据校验会以心跳的形式进行,即每台机器在固定时间间隔会向其他机器发起一次数据校验请求。

首先,在DistroProtocol类的构造方法中启动了一个startDistroTask()任务,其中包括了验证任务 startVerifyTask(),构建了DistroVerifyTimedTask定时任务,延迟5秒开始,间隔5秒轮询。

private void startVerifyTask() {GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTimedTask(memberManager, distroComponentHolder,distroTaskEngineHolder.getExecuteWorkersManager()),DistroConfig.getInstance().getVerifyIntervalMillis());
}

向其他所有节点发起每种类型的验证,调用verifyForDataStorage方法

public void run() {try {//获取除本节点外的其他节点List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("server list is: {}", targetServer);}//每一种类型的数据都需要发起验证for (String each : distroComponentHolder.getDataStorageTypes()) {// 对dataStorage内的数据进行验证verifyForDataStorage(each, targetServer);}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);}
}

调用getVerifyData方法从DistroDataStorage中获取数据,并对每一个节点的所有验证数据都创建了一个新的任务DistroVerifyExecuteTask,由它来执行具体的验证工作。

private void verifyForDataStorage(String type, List<Member> targetServer) {DistroDataStorage dataStorage = distroComponentHolder.findDataStorage(type);// 若数据还未同步完毕则不处理if (!dataStorage.isFinishInitial()) {Loggers.DISTRO.warn("data storage {} has not finished initial step, do not send verify data",dataStorage.getClass().getSimpleName());return;}//从DistroDataStorage中获取数据List<DistroData> verifyData = dataStorage.getVerifyData();if (null == verifyData || verifyData.isEmpty()) {return;}// 对每个节点开启一个异步的线程来执行for (Member member : targetServer) {DistroTransportAgent agent = distroComponentHolder.findTransportAgent(type);if (null == agent) {continue;}executeTaskExecuteEngine.addTask(member.getAddress() + type,new DistroVerifyExecuteTask(agent, verifyData, member.getAddress(), type));}
}

getVerifyData方法获取本机负责的Client信息

public List<DistroData> getVerifyData() {List<DistroData> result = new LinkedList<>();// 遍历当前节点缓存的所有clientfor (String each : clientManager.allClientId()) {Client client = clientManager.getClient(each);if (null == client || !client.isEphemeral()) {continue;}//判断是否为本机负责if (clientManager.isResponsibleClient(client)) {// TODO add revision for client.DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(), 0);DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);DistroData data = new DistroData(distroKey,ApplicationUtils.getBean(Serializer.class).serialize(verifyData));data.setType(DataOperation.VERIFY);result.add(data);}}return result;
}

DistroVerifyExecuteTask任务首先判断传输对象是否支持回调,2.0版本支持回调,调用doSyncVerifyDataWithCallback方法。

public void run() {for (DistroData each : verifyData) {try {// 判断传输对象是否支持回调if (transportAgent.supportCallbackTransport()) {doSyncVerifyDataWithCallback(each);} else {doSyncVerifyData(each);}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] verify data for type {} to {} failed.", resourceType, targetServer, e);}}
}

doSyncVerifyDataWithCallback方法调用了DistroTransportAgent的syncVerifyData方法,构建了内部请求回调类DistroVerifyCallbackWrapper,并使用Rpc代理对象发送验证rpc请求。

public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {if (isNoExistTarget(targetServer)) {callback.onSuccess();return;}DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);Member member = memberManager.find(targetServer);try {DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer,verifyData.getDistroKey().getResourceKey(), callback, member);clusterRpcClientProxy.asyncRequest(member, request, wrapper);} catch (NacosException nacosException) {callback.onFailed(nacosException);}
}

请求回调类DistroVerifyCallbackWrapper类关键是回调方法内容,校验失败发布ClientVerifyFailedEvent事件。

public void onResponse(Response response) {if (checkResponse(response)) {NamingTpsMonitor.distroVerifySuccess(member.getAddress(), member.getIp());distroCallback.onSuccess();} else {Loggers.DISTRO.info("Target {} verify client {} failed, sync new client", targetServer, clientId);// 校验失败发布ClientVerifyFailedEvent事件NotifyCenter.publishEvent(new ClientEvent.ClientVerifyFailedEvent(clientId, targetServer));NamingTpsMonitor.distroVerifyFail(member.getAddress(), member.getIp());distroCallback.onFailed(null);}
}

继续查看DistroDataRequestHandler中关于验证的RPC请求的处理逻辑,关键调用handleVerify方法。

public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {try {switch (request.getDataOperation()) {case VERIFY:return handleVerify(request.getDistroData(), meta);case SNAPSHOT:return handleSnapshot();case ADD:case CHANGE:case DELETE:return handleSyncData(request.getDistroData());case QUERY:return handleQueryData(request.getDistroData());default:return new DistroDataResponse();}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);DistroDataResponse result = new DistroDataResponse();result.setErrorCode(ResponseCode.FAIL.getCode());result.setMessage("handle distro request with exception");return result;}
}

调用distroProtocol的onVerify方法进行处理。

private DistroDataResponse handleVerify(DistroData distroData, RequestMeta meta) {DistroDataResponse result = new DistroDataResponse();if (!distroProtocol.onVerify(distroData, meta.getClientIp())) {result.setErrorInfo(ResponseCode.FAIL.getCode(), "[DISTRO-FAILED] distro data verify failed");}return result;
}

获取并调用对应的处理器DistroDataProcessor的processVerifyData方法进行数据处理。

public boolean onVerify(DistroData distroData, String sourceAddress) {if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO] Receive verify data type: {}, key: {}", distroData.getType(),distroData.getDistroKey());}// 根据此次处理的数据类型获取对应的处理器String resourceType = distroData.getDistroKey().getResourceType();DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);if (null == dataProcessor) {Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data {}", resourceType);return false;}return dataProcessor.processVerifyData(distroData, sourceAddress);
}

DistroDataProcessor处理器使用EphemeralIpPortClientManager进行处理。

public boolean processVerifyData(DistroData distroData, String sourceAddress) {DistroClientVerifyInfo verifyData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), DistroClientVerifyInfo.class);if (clientManager.verifyClient(verifyData.getClientId())) {return true;}Loggers.DISTRO.info("client {} is invalid, get new client from {}", verifyData.getClientId(), sourceAddress);return false;
}

EphemeralIpPortClientManager开启了一个心跳更新任务ClientBeatUpdateTask。

public boolean verifyClient(String clientId) {IpPortBasedClient client = clients.get(clientId);// 若不为空,启动一个心跳更新任务if (null != client) {NamingExecuteTaskDispatcher.getInstance().dispatchAndExecuteTask(clientId, new ClientBeatUpdateTask(client));return true;}return false;
}

ClientBeatUpdateTask更新Client自身的最新活跃时间。

public void run() {// 获取当前时间,更新Client和Client下的Instance的最新活跃时间long currentTime = System.currentTimeMillis();for (InstancePublishInfo each : client.getAllInstancePublishInfo()) {((HealthCheckInstancePublishInfo) each).setLastHeartBeatTime(currentTime);}// 更新client的最新更新时间client.setLastUpdatedTime();
}

以上是校验成功的处理流程,而如果校验失败,就会触发回调函数,发布ClientVerifyFailedEvent事件。

DistroClientDataProcessor类订阅了ClientVerifyFailedEvent事件,当ClientVerifyFailedEvent事件发生时,DefaultPublisher会回调onEvent方法。调用syncToVerifyFailedServer方法进行校验失败后的数据同步处理。

public void onEvent(Event event) {if (EnvUtil.getStandaloneMode()) {return;}if (!upgradeJudgement.isUseGrpcFeatures()) {return;}if (event instanceof ClientEvent.ClientVerifyFailedEvent) {syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {syncToAllServer((ClientEvent) event);}
}

获取处理类型和目标client,调用distroProtocol的syncToTarget方法进行处理。

private void syncToVerifyFailedServer(ClientEvent.ClientVerifyFailedEvent event) {//获取校验失败的目标ClientClient client = clientManager.getClient(event.getClientId());if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}//当前处理类型为Nacos:Naming:v2:ClientDataDistroKey distroKey = new DistroKey(client.getClientId(), TYPE);// Verify failed data should be sync directly.distroProtocol.syncToTarget(distroKey, DataOperation.ADD, event.getTargetServer(), 0L);
}

syncToTarget方法,distroTaskEngineHolder发布延迟任务,调用DistroDelayTaskProcessor的process() 方法进行任务投递。执行变更任务 DistroSyncChangeTask向指定节点发送消息。校验失败同步的ADD流程与增量同步的CHANGE流程一致。

public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),targetServer);DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);}
}

写操作

对于一个已经启动完成的 Distro 集群,在一次客户端发起写操作的流程中,当注册非持久化的实例的写请求打到某台 Nacos 服务器时,Distro 集群处理的流程图如下。

整个步骤包括几个部分(图中从上到下顺序):

  • 前置的 Filter 拦截请求,并根据请求中包含的 IP 和 port 信息计算其所属的 Distro 责任节点,并将该请求转发到所属的 Distro 责任节点上。

  • 责任节点上的 Controller 将写请求进行解析。

  • Distro 协议定期执行 Sync 任务,将本机所负责的所有的实例信息同步到其他节点上。

首先找到DistroFilter类,该Filter 类负责拦截请求,查看核心的doFilter方法。关键部分为distroMapper.responsible(distroTag)判断请求是否由本节点负责,distroMapper.mapSrv(distroTag)计算其所属的 Distro 责任节点。

public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)throws IOException, ServletException {ReuseHttpServletRequest req = new ReuseHttpServletRequest((HttpServletRequest) servletRequest);HttpServletResponse resp = (HttpServletResponse) servletResponse;String urlString = req.getRequestURI();if (StringUtils.isNotBlank(req.getQueryString())) {urlString += "?" + req.getQueryString();}try {Method method = controllerMethodsCache.getMethod(req);String path = new URI(req.getRequestURI()).getPath();if (method == null) {throw new NoSuchMethodException(req.getMethod() + " " + path);}if (!method.isAnnotationPresent(CanDistro.class)) {filterChain.doFilter(req, resp);return;}//根据请求获取路径,格式为IP:PORTString distroTag = distroTagGenerator.getResponsibleTag(req);//判断是否为本节点负责,如果不是就进行下面的逻辑进行请求转发if (distroMapper.responsible(distroTag)) {filterChain.doFilter(req, resp);return;}// proxy request to other server if necessary:String userAgent = req.getHeader(HttpHeaderConsts.USER_AGENT_HEADER);if (StringUtils.isNotBlank(userAgent) && userAgent.contains(UtilsAndCommons.NACOS_SERVER_HEADER)) {// This request is sent from peer server, should not be redirected again:Loggers.SRV_LOG.error("receive invalid redirect request from peer {}", req.getRemoteAddr());resp.sendError(HttpServletResponse.SC_BAD_REQUEST,"receive invalid redirect request from peer " + req.getRemoteAddr());return;}//通过distroTag计算其所属的 Distro 责任节点final String targetServer = distroMapper.mapSrv(distroTag);List<String> headerList = new ArrayList<>(16);Enumeration<String> headers = req.getHeaderNames();while (headers.hasMoreElements()) {String headerName = headers.nextElement();headerList.add(headerName);headerList.add(req.getHeader(headerName));}final String body = IoUtils.toString(req.getInputStream(), Charsets.UTF_8.name());final Map<String, String> paramsValue = HttpClient.translateParameterMap(req.getParameterMap());//转发请求RestResult<String> result = HttpClient.request("http://" + targetServer + req.getRequestURI(), headerList, paramsValue, body,PROXY_CONNECT_TIMEOUT, PROXY_READ_TIMEOUT, Charsets.UTF_8.name(), req.getMethod());String data = result.ok() ? result.getData() : result.getMessage();try {WebUtils.response(resp, data, result.getCode());} catch (Exception ignore) {Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + distroMapper.mapSrv(distroTag) + urlString);}} catch (AccessControlException e) {resp.sendError(HttpServletResponse.SC_FORBIDDEN, "access denied: " + ExceptionUtil.getAllExceptionMsg(e));} catch (NoSuchMethodException e) {resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED,"no such api:" + req.getMethod() + ":" + req.getRequestURI());} catch (Exception e) {resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,"Server failed," + ExceptionUtil.getAllExceptionMsg(e));}}

首先看distroMapper.responsible方法。

public boolean responsible(String responsibleTag) {final List<String> servers = healthyList;if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {return true;}if (CollectionUtils.isEmpty(servers)) {// means distro config is not ready yetreturn false;}//通过配置信息操作工具类EnvUtil获取到服务index信息。int index = servers.indexOf(EnvUtil.getLocalAddress());int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());if (lastIndex < 0 || index < 0) {return true;}//简单的Hash算法int target = distroHash(responsibleTag) % servers.size();//判断是否包含return target >= index && target <= lastIndex;
}

然后查看distroMapper.mapSrv方法。

public String mapSrv(String responsibleTag) {final List<String> servers = healthyList;if (CollectionUtils.isEmpty(servers) || !switchDomain.isDistroEnabled()) {return EnvUtil.getLocalAddress();}try {int index = distroHash(responsibleTag) % servers.size();return servers.get(index);} catch (Throwable e) {Loggers.SRV_LOG.warn("[NACOS-DISTRO] distro mapper failed, return localhost: " + EnvUtil.getLocalAddress(), e);return EnvUtil.getLocalAddress();}
}

可以看出,distro协议判断服务的负责节点采用简单的hash算法,如果nacos某节点宕机,则所有的服务都会重新计算映射到新的节点,变动较大。如果能采用一致性hash算法,则单节点宕机,只转移该故障节点负责的服务。

以上就是通过 Filter 拦截请求,找到对应的Distro责任节点的过程,找到了对应的责任节点后,就需要使用对应的Controller 将写请求进行解析。

找到InstanceController中的register方法,该方法负责服务的注册,查看registerInstance方法。

public String register(HttpServletRequest request) throws Exception {final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);final Instance instance = HttpRequestInstanceBuilder.newBuilder().setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();getInstanceOperator().registerInstance(namespaceId, serviceName, instance);return "ok";
}

省略一些过程,关键流程在DistroConsistencyServiceImpl类的put方法。关键在onPut方法以及 distroProtocol的sync方法,重点解析前者。

public void put(String key, Record value) throws NacosException {onPut(key, value);// If upgrade to 2.0.X, do not sync for v1.if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {return;}distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,DistroConfig.getInstance().getSyncDelayMillis());
}

onPut方法首先会将待注册的服务实例封装成Datum对象,然后放到这个DataStore对象里面的一个map里面。然后调用 notifier.addTask方法。

public void onPut(String key, Record value) {if (KeyBuilder.matchEphemeralInstanceListKey(key)) {Datum<Instances> datum = new Datum<>();datum.value = (Instances) value;datum.key = key;datum.timestamp.incrementAndGet();dataStore.put(key, datum);}if (!listeners.containsKey(key)) {return;}notifier.addTask(key, DataOperation.CHANGE);
}

Notifier为DistroConsistencyServiceImpl的一个实现了Runnable接口的内部类,在DistroConsistencyServiceImpl的初始化方法中将Notifier放入了线程池中执行。

public void init() {GlobalExecutor.submitDistroNotifyTask(notifier);
}

addTask方法构建了Pair对象将任务消息放入Notifier类中的阻塞队列中。

private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);public void addTask(String datumKey, DataOperation action) {if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {return;}if (action == DataOperation.CHANGE) {services.put(datumKey, StringUtils.EMPTY);}tasks.offer(Pair.with(datumKey, action));
}

在Notifier类的run方法中开启了一个自旋操作,不断地去阻塞队列中取消息,取到消息之后就会执行handle(pair);方法。

public void run() {Loggers.DISTRO.info("distro notifier started");for (; ; ) {try {Pair<String, DataOperation> pair = tasks.take();handle(pair);} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}}
}

handle方法逻辑,调用 listener.onChange方法。

private void handle(Pair<String, DataOperation> pair) {try {String datumKey = pair.getValue0();DataOperation action = pair.getValue1();services.remove(datumKey);int count = 0;if (!listeners.containsKey(datumKey)) {return;}for (RecordListener listener : listeners.get(datumKey)) {count++;try {//判断事件类型,服务注册属于CHANGE事件if (action == DataOperation.CHANGE) {listener.onChange(datumKey, dataStore.get(datumKey).value);continue;}if (action == DataOperation.DELETE) {listener.onDelete(datumKey);continue;}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);}}if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",datumKey, count, action.name());}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}
}

listener.onChange方法中,重点关注updateIPs方法。

public void onChange(String key, Instances value) throws Exception {Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);for (Instance instance : value.getInstanceList()) {if (instance == null) {// Reject this abnormal instance list:throw new RuntimeException("got null instance " + key);}if (instance.getWeight() > 10000.0D) {instance.setWeight(10000.0D);}if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {instance.setWeight(0.01D);}}updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));recalculateChecksum();
}

最终调用了Cluster类的updateIps方法。

public void updateIps(List<Instance> ips, boolean ephemeral) {//首先复制一份服务列表    Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());for (Instance ip : toUpdateInstances) {oldIpMap.put(ip.getDatumKey(), ip);}//取并集并去重返回更新后的服务列表List<Instance> updatedIps = updatedIps(ips, oldIpMap.values());if (updatedIps.size() > 0) {for (Instance ip : updatedIps) {Instance oldIP = oldIpMap.get(ip.getDatumKey());// do not update the ip validation status of updated ips// because the checker has the most precise result// Only when ip is not marked, don't we update the health status of IP:if (!ip.isMarked()) {ip.setHealthy(oldIP.isHealthy());}if (ip.isHealthy() != oldIP.isHealthy()) {// ip validation status updatedLoggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),(ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());}if (ip.getWeight() != oldIP.getWeight()) {// ip validation status updatedLoggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP, ip);}}}//获取新增的IPList<Instance> newIPs = subtract(ips, oldIpMap.values());if (newIPs.size() > 0) {Loggers.EVT_LOG.info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),getName(), newIPs.size(), newIPs);for (Instance ip : newIPs) {HealthCheckStatus.reset(ip);}}//获取死亡的IPList<Instance> deadIPs = subtract(oldIpMap.values(), ips);if (deadIPs.size() > 0) {Loggers.EVT_LOG.info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),getName(), deadIPs.size(), deadIPs);for (Instance ip : deadIPs) {HealthCheckStatus.remv(ip);}}toUpdateInstances = new HashSet<>(ips);//将新加入的服务实例赋值给之前的内存if (ephemeral) {ephemeralInstances = toUpdateInstances;} else {persistentInstances = toUpdateInstances;}
}

这里用到了经典的COW思想,写入新的服务实例的时候会将以前的实例列表复制一份,在复制的这一份上进行操作,操作完了之后再赋值回去,大大提升了Nacos的吞吐能力。

读操作

由于每台机器上都存放了全量数据,因此在每一次读操作中,Distro 机器会直接从本地拉取数据。快速响应。

读操作为InstanceController中的list方法,调用InstanceOperatorServiceImpl的listInstance方法。

public Object list(HttpServletRequest request) throws Exception {String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);String agent = WebUtils.getUserAgent(request);String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));String app = WebUtils.optional(request, "app", StringUtils.EMPTY);String env = WebUtils.optional(request, "env", StringUtils.EMPTY);String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);Subscriber subscriber = new Subscriber(clientIP + ":" + udpPort, agent, app, clientIP, namespaceId, serviceName,udpPort, clusters);return getInstanceOperator().listInstance(namespaceId, serviceName, subscriber, clusters, healthyOnly);
}

listInstance方法将所有的健康实例返回。

public ServiceInfo listInstance(String namespaceId, String serviceName, Subscriber subscriber, String cluster,boolean healthOnly) throws Exception {ClientInfo clientInfo = new ClientInfo(subscriber.getAgent());String clientIP = subscriber.getIp();ServiceInfo result = new ServiceInfo(serviceName, cluster);Service service = serviceManager.getService(namespaceId, serviceName);long cacheMillis = switchDomain.getDefaultCacheMillis();// now try to enable the pushtry {if (subscriber.getPort() > 0 && pushService.canEnablePush(subscriber.getAgent())) {subscriberServiceV1.addClient(namespaceId, serviceName, cluster, subscriber.getAgent(),new InetSocketAddress(clientIP, subscriber.getPort()), pushDataSource, StringUtils.EMPTY,StringUtils.EMPTY);cacheMillis = switchDomain.getPushCacheMillis(serviceName);}} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP,subscriber.getPort(), e);cacheMillis = switchDomain.getDefaultCacheMillis();}if (service == null) {if (Loggers.SRV_LOG.isDebugEnabled()) {Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);}result.setCacheMillis(cacheMillis);return result;}checkIfDisabled(service);List<com.alibaba.nacos.naming.core.Instance> srvedIps = service.srvIPs(Arrays.asList(StringUtils.split(cluster, StringUtils.COMMA)));// filter ips using selector:if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {srvedIps = selectorManager.select(service.getSelector(), clientIP, srvedIps);}if (CollectionUtils.isEmpty(srvedIps)) {if (Loggers.SRV_LOG.isDebugEnabled()) {Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);}result.setCacheMillis(cacheMillis);result.setLastRefTime(System.currentTimeMillis());result.setChecksum(service.getChecksum());return result;}long total = 0;Map<Boolean, List<com.alibaba.nacos.naming.core.Instance>> ipMap = new HashMap<>(2);ipMap.put(Boolean.TRUE, new ArrayList<>());ipMap.put(Boolean.FALSE, new ArrayList<>());for (com.alibaba.nacos.naming.core.Instance ip : srvedIps) {// remove disabled instance:if (!ip.isEnabled()) {continue;}ipMap.get(ip.isHealthy()).add(ip);total += 1;}double threshold = service.getProtectThreshold();List<Instance> hosts;if ((float) ipMap.get(Boolean.TRUE).size() / total <= threshold) {Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", result.getName());result.setReachProtectionThreshold(true);hosts = Stream.of(Boolean.TRUE, Boolean.FALSE).map(ipMap::get).flatMap(Collection::stream).map(InstanceUtil::deepCopy)// set all to `healthy` state to protect.peek(instance -> instance.setHealthy(true)).collect(Collectors.toCollection(LinkedList::new));} else {result.setReachProtectionThreshold(false);hosts = new LinkedList<>(ipMap.get(Boolean.TRUE));if (!healthOnly) {hosts.addAll(ipMap.get(Boolean.FALSE));}}result.setHosts(hosts);result.setCacheMillis(cacheMillis);result.setLastRefTime(System.currentTimeMillis());result.setChecksum(service.getChecksum());return result;
}

小结

Distro 协议是 Nacos 对于临时实例数据开发的一致性协议。其数据存储在缓存中,并且会在启动时进行全量数据同步,并定期进行数据校验。

在 Distro 协议的设计思想下,每个 Distro 节点都可以接收到读写请求。所有的 Distro 协议的请求场景主要分为三种情况:

1、当该节点接收到属于该节点负责的实例的写请求时,直接写入。

2、当该节点接收到不属于该节点负责的实例的写请求时,将在集群内部路由,转发给对应的节点,从而完成读写。

3、当该节点接收到任何读请求时,都直接在本机查询并返回(因为所有实例都被同步到了每台机器上)。

Distro 协议作为 Nacos 的内嵌临时实例一致性协议,保证了在分布式环境下每个节点上面的服务信息的状态都能够及时地通知其他节点,可以维持数十万量级服务实例的存储和一致性。

Nacos 2.0原理解析(一):Distro协议相关推荐

  1. Android进阶:七、Retrofit2.0原理解析之最简流程【上】

    retrofit 已经流行很久了,它是Square开源的一款优秀的网络框架,这个框架对okhttp进行了封装,让我们使用okhttp做网路请求更加简单.但是光学会使用只是让我们多了一个技能,学习其源码 ...

  2. FISCO BCOS 2.0原理解析: 分布式存储架构设计

    FISCO BCOS 2.0新增对分布式数据存储的支持,克服了本地化数据存储的诸多限制. 在FISCO BCOS 1.0中,节点采用MPT数据结构,通过LevelDB将数据存储于本地,这种模式受限于本 ...

  3. Nacos-注册中心原理解析

    Nacos-注册中心原理解析 一.注册中心 二.Nacos注册中心原理解析 2.1 NamingService 2.2 NacosNamingService 2.2.1 NamingProxy 2.2 ...

  4. 业界率先支持 MCP-OVER-XDS 协议,Nacos 2.0.1 + 1.4.2 Release 正式发布

    来源 | 阿里巴巴云原生公众号 ​ Nacos 是阿里巴巴开源的服务发现与配置管理项目,本次同时发布两个版本: ​ 发布 2.0.1 版本,主要致力于支持 MCP-OVER-XDS 协议,解决 Nac ...

  5. 分布式一致性协议 Gossip 和 Redis 集群原理解析

    分布式一致性协议 Gossip 和 Redis 集群原理解析 Redis 是一个开源的.高性能的 Key-Value 数据库.基于 Redis 的分布式缓存已经有很多成功的商业应用,其中就包括阿里 A ...

  6. 支持 gRPC 长链接,深度解读 Nacos 2.0 架构设计及新模型

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 作者 | 杨翊(席翁)  Nacos PMC Nacos ...

  7. Tomcat 架构原理解析到架构设计借鉴

    ‍ 点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 Tomcat 架构原理解析到架构设计借鉴 Tomcat 发展这 ...

  8. Servlet 工作原理解析

    2019独角兽企业重金招聘Python工程师标准>>> 从 Servlet 容器说起 要介绍 Servlet 必须要先把 Servlet 容器说清楚,Servlet 与 Servle ...

  9. 【Java】Servlet 工作原理解析

    Web 技术成为当今主流的互联网 Web 应用技术之一,而 Servlet 是 Java Web 技术的核心基础.因而掌握 Servlet 的工作原理是成为一名合格的 Java Web 技术开发人员的 ...

  10. Servlet 工作原理解析--转载

    原文:http://www.ibm.com/developerworks/cn/java/j-lo-servlet/index.html?ca=drs- Web 技术成为当今主流的互联网 Web 应用 ...

最新文章

  1. 2020黑客报告:7位道德黑客赏金收入超100万
  2. 北大智能学院成立!AI视觉泰斗朱松纯教授任院长
  3. 英伟达联手Arm CPU打造AI超算,百万兆级性能,主攻气候变化和核武建模
  4. 《jQuery权威指南》学习笔记——第二章
  5. [YTU]_2498 (C++类实现最大数的输出)
  6. 2020-2021年面向中小学生的全国性竞赛活动名单的公示
  7. suoi46 最大和和 (线段树)
  8. OpenStack——基于EXSI安装OpenStack解决方案
  9. 夺命雷公狗—angularjs—19—angular-route
  10. Django环境搭建及学前准备
  11. Android播放外部音乐文件
  12. p40_数据交换方式
  13. BLOCK_TYPE_US_VALID(pHead-nBlockUse)
  14. WinForm中显示韩语,韩文
  15. 谁有html制作3d浪漫相册有代码,3D相册制作代码
  16. Python开发网站步骤
  17. linux wps 公式编辑器,WPS 2012数学公式编辑器的使用方法(详细图解)
  18. sql 自定义排序 顺序
  19. iphone5计算机没有了,电脑无法识别iphone5怎么解决
  20. uniapp实现上传图片

热门文章

  1. UE4材质(六):不透明度Opacity——玻璃
  2. C语言 逻辑运算符及其优先级
  3. redis搭建集群时报错CLUSTERDOWN Hash slot not served
  4. Content not from webpack is served from ‘/Users/xxxx/xxxx/xxxx/xxxx/public‘ directory
  5. 【Python】利用tkinter开发AI对战井字棋游戏
  6. word段落中插入公式后格式编辑
  7. 【全志V3s / LicheePi Zero / 荔枝派】Air724合宙4G模块RNDIS拨号
  8. 2019年大前端技术趋势深度解读
  9. linux测速(含脚本)
  10. Newton-Raphson法求解非线性方程复根