引言

本文接着撸Distro协议,上文中分析了寻址模式。有了地址就要建立连接,有了连接就能通信了。集群之间都交互啥数据?本文就扒一扒全量同步和节点之间数据校验。

一、内容提要

节点间建立RCP连接

  • 订阅了MembersChangeEvent事件,集群节点有变更能够收到回调通知

  • 与集群中其他节点建立grpc连接并缓存到Map其中key格式为「Cluster-IP:Port」

节点间校验数据通信

  • 节点之间发送校验数据是在全量同步后进行的

  • 发送校验的频率默认为5秒钟一次

  • 校验数据包括clientId和version,其中version为保留字段当前为0

  • 接受到校验数据后如果缓存中存在该client表示校验成功,同时更新保鲜时间,否则校验失败

全量数据同步

  • 在节点启动时会从集群中其他节点中的一个节点同步快照数据并缓存在Map中

  • 缓存的数据类型分类两类分别为HTTP和gRPC

  • 具体数据即客户端注册节点信息含命名空间、分组名称、服务名称、节点Instance信息等

  • 集群中每个节点都拥有所有的快照数据

二、节点间建立RPC连接

节点之间要通信,需要建立连接。Nacos集群节点之间也不例外,下面看下Nacos是如何和集群之间建立连接的,以gRPC为例。

Nacos中ClusterRpcClientProxy封装了集群中节点之间的通道。

@PostConstruct
public void init() {try {// 注解@1NotifyCenter.registerSubscriber(this);// 注解@2List<Member> members = serverMemberManager.allMembersWithoutSelf(); // 注解@3refresh(members);Loggers.CLUSTER.warn("[ClusterRpcClientProxy] success to refresh cluster rpc client on start up,members ={} ",members);} catch (NacosException e) {Loggers.CLUSTER.warn("[ClusterRpcClientProxy] fail to refresh cluster rpc client,{} ", e.getMessage());}
}

注解@1 注册自己订阅MembersChangeEvent事件

注解@2 获取集群中的节点列表剔除自身节点

注解@3 与各个节点建立rpc通道

private void refresh(List<Member> members) throws NacosException {for (Member member : members) {if (MemberUtil.isSupportedLongCon(member)) {// 注解@3.1createRpcClientAndStart(member, ConnectionType.GRPC);}}Set<Map.Entry<String, RpcClient>> allClientEntrys = RpcClientFactory.getAllClientEntries();Iterator<Map.Entry<String, RpcClient>> iterator = allClientEntrys.iterator();List<String> newMemberKeys = members.stream().filter(a -> MemberUtil.isSupportedLongCon(a)).map(a -> memberClientKey(a)).collect(Collectors.toList());// 注解@3.2while (iterator.hasNext()) {Map.Entry<String, RpcClient> next1 = iterator.next();if (next1.getKey().startsWith("Cluster-") && !newMemberKeys.contains(next1.getKey())) {Loggers.CLUSTER.info("member leave,destroy client of member - > : {}", next1.getKey());RpcClientFactory.getClient(next1.getKey()).shutdown();iterator.remove();}}}

注解@3.1 为集群中每个节点member创建rcp client

注解@3.2 关闭旧的grpc连接

private void createRpcClientAndStart(Member member, ConnectionType type) throws NacosException {Map<String, String> labels = new HashMap<String, String>(2);labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_CLUSTER);// 注解@3.1.1String memberClientKey = memberClientKey(member);RpcClient client = RpcClientFactory.createClusterClient(memberClientKey, type, labels);if (!client.getConnectionType().equals(type)) {Loggers.CLUSTER.info(",connection type changed,destroy client of member - > : {}", member);RpcClientFactory.destroyClient(memberClientKey);// 注解@3.1.2client = RpcClientFactory.createClusterClient(memberClientKey, type, labels);}if (client.isWaitInitiated()) {Loggers.CLUSTER.info("start a new rpc client to member - > : {}", member);// 注解@3.1.3client.serverListFactory(new ServerListFactory() {@Overridepublic String genNextServer() {return member.getAddress(); // 返回连接集群其他节点地址}@Overridepublic String getCurrentServer() {return member.getAddress();}@Overridepublic List<String> getServerList() {return Lists.newArrayList(member.getAddress());}});// 注解@3.1.4client.start(); }
}

注解@3.1.1 memberClientKey由「Cluster-IP:Port」构成,例如:Cluster-1.2.3.4:2008

注解@3.1.2 创建grpc client并缓存在 clientMap,key为memberClientKey 此时client的状态为WAIT_INIT

注解@3.1.3 集群中固定的某一台节点

注解@3.1.4  grpc连接集群中的member节点设置client的状态RUNNING

小结: 在与Nacos集群其他节点建立连接的过程中做了两件事情:@1.订阅了MembersChangeEvent事件 @2.与集群中其他节点建立grpc连接并缓存到Map其中key格式为「Cluster-IP:Port」。

三、节点间校验数据通信

节点之间建立rpc通道必然是为了互相之间能通信,其中一个通信是节点之间发送校验数据。那为什么要发这些校验数据?这些数据都是些什么内容?下面咱就去扒一扒。

在DistroProtocol的构造函数中的最后一个行有一个startDistroTask(),主要分析startVerifyTask的逻辑。

public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig) {this.memberManager = memberManager;this.distroComponentHolder = distroComponentHolder;this.distroTaskEngineHolder = distroTaskEngineHolder;this.distroConfig = distroConfig;startDistroTask();
}
private void startDistroTask() {// 单机模式直接返回if (EnvUtil.getStandaloneMode()) {isInitialized = true;return;}startVerifyTask();startLoadTask();
}
private void startVerifyTask() {// 注解@4GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTimedTask(memberManager, distroComponentHolder,distroTaskEngineHolder.getExecuteWorkersManager()), distroConfig.getVerifyIntervalMillis());
}

注解@4  每隔5秒执行,也就是节点之间发送校验时间的默认频率是5秒。

可以通过配置参数「nacos.core.protocol.distro.data.verify_interval_ms」自定义。

接着看DistroVerifyTimedTask的run方法。

@Override
public void run() {try {// 注解@5List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("server list is: {}", targetServer);}// 注解@6for (String each : distroComponentHolder.getDataStorageTypes()) {verifyForDataStorage(each, targetServer);}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);}
}

注解@5 拿到集群中其他节点

注解@6 在Nacos server启动时初始化时两种类型HTTP和gRPC,本文以gRPC为例进行分析。

private void verifyForDataStorage(String type, List<Member> targetServer) {// 注解@7DistroDataStorage dataStorage = distroComponentHolder.findDataStorage(type);// 注解@8if (!dataStorage.isFinishInitial()) {  // 未完成全量数据同步退出Loggers.DISTRO.warn("data storage {} has not finished initial step, do not send verify data",dataStorage.getClass().getSimpleName());return;}//注解@9List<DistroData> verifyData = dataStorage.getVerifyData();if (null == verifyData || verifyData.isEmpty()) {return;}for (Member member : targetServer) {DistroTransportAgent agent = distroComponentHolder.findTransportAgent(type);if (null == agent) {continue;}// 注解@10executeTaskExecuteEngine.addTask(member.getAddress() + type,new DistroVerifyExecuteTask(agent, verifyData, member.getAddress(), type));}
}

注解@7 Nacos启动时缓存在dataStorageMap中两种类型处理器分别用于处理gRPC和HTTP通信方式。

「Nacos:Naming:v2:ClientData->DistroClientDataProcessor」和 「com.alibaba.nacos.naming.iplist.->DistroDataStorageImpl」

注解@8 当从其他节点同步了全部数据后,则完成了初始化finished initial,全量数据同步下小节分析。

注解@9  获取校验的数据,数据为由本节点负责的clientId列表。

@Override
public List<DistroData> getVerifyData() {List<DistroData> result = new LinkedList<>(); // 一组DistroDatafor (String each : clientManager.allClientId()) {Client client = clientManager.getClient(each);if (null == client || !client.isEphemeral()) { // 无效client或者非临时节点continue;}// 注解@9.1if (clientManager.isResponsibleClient(client)) {// 注解@9.2DistroClientVerifyInfo verifyData = new DistroClientVerifyInfo(client.getClientId(), 0);DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);DistroData data = new DistroData(distroKey,ApplicationUtils.getBean(Serializer.class).serialize(verifyData)); // 序列化校验数据data.setType(DataOperation.VERIFY);result.add(data);}}return result;
}

注解@9.1 判断client是否为本几点负责的逻辑为ClientManagerDelegate#isResponsibleClient。即:属于ConnectionBasedClient并且

isNative为true表示该client是直连到该节点的。

@Override
public boolean isResponsibleClient(Client client) {return (client instanceof ConnectionBasedClient) && ((ConnectionBasedClient) client).isNative();
}

注解@9.2 构造Verify Data 主要信息为clientId,还有一个版本信息作为保留字段,目前都是0。

注解@10 向集群其他节点发送校验数据DistroVerifyExecuteTask#run

@Override
public void run() {for (DistroData each : verifyData) {try {if (transportAgent.supportCallbackTransport()) { // grpc支持回调doSyncVerifyDataWithCallback(each);} else { // http不支持回调使用同步doSyncVerifyData(each);}} catch (Exception e) {//...}}
}
@Override
public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {if (isNoExistTarget(targetServer)) {callback.onSuccess();}DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);Member member = memberManager.find(targetServer);try {DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer,verifyData.getDistroKey().getResourceKey(), callback, member);// 注解@11         clusterRpcClientProxy.asyncRequest(member, request, wrapper); } catch (NacosException nacosException) {callback.onFailed(nacosException);}
}

注解@11 向其他节点发送本节点负责的clientId信息

那集群其他节点接收到校验数据做什么处理呢?

翻到DistroDataRequestHandler#handle,此处包含了处理校验数据的逻辑。

@Override
public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {try {switch (request.getDataOperation()) {case VERIFY:return handleVerify(request.getDistroData(), meta);case SNAPSHOT:return handleSnapshot();case ADD:case CHANGE:case DELETE:return handleSyncData(request.getDistroData());case QUERY:return handleQueryData(request.getDistroData());default:return new DistroDataResponse();}} catch (Exception e) {// ...}
}
private DistroDataResponse handleVerify(DistroData distroData, RequestMeta meta) {DistroDataResponse result = new DistroDataResponse();// 注解@12if (!distroProtocol.onVerify(distroData, meta.getClientIp())) {result.setErrorInfo(ResponseCode.FAIL.getCode(), "[DISTRO-FAILED] distro data verify failed");}return result;
}

注解@12 数据校验,下面可以看到,如果缓存存在client则校验成功,刷新client保鲜时间,否则校验失败。

@Override
public boolean verifyClient(String clientId) {ConnectionBasedClient client = clients.get(clientId);if (null != client) {client.setLastRenewTime();return true;}return false;
}

小结: 节点之间发送校验数据是在全量同步后进行的;发送校验的频率默认为5秒钟一次;校验数据包括clientId和version,其中version为保留字段当前为0;接受到校验数据后如果缓存中存在该client表示校验成功,同时更新保鲜时间,否则校验失败。

四、全量数据同步

上文中提到在发送校验数据之前需要先完成全量数据同步,先翻回DistroProtocol#startDistroTask()方法的startLoadTask()部分。

private void startLoadTask() {DistroCallback loadCallback = new DistroCallback() {@Overridepublic void onSuccess() {isInitialized = true;}@Overridepublic void onFailed(Throwable throwable) {isInitialized = false;}};GlobalExecutor.submitLoadDataTask(new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback));
}

DistroLoadDataTask#run

@Override
public void run() {try {load(); // 注解@13if (!checkCompleted()) { // 注解@14GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis());} else {loadCallback.onSuccess();Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success");}} catch (Exception e) {loadCallback.onFailed(e);Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e);}
}

注解@13 从集群中其他节点全量加载数据

注解@14 如果没有加载成功延迟30秒钟重新执行一次,可以通过参数「nacos.core.protocol.distro.data.load_retry_delay_ms」指定

private void load() throws Exception {while (memberManager.allMembersWithoutSelf().isEmpty()) {Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");TimeUnit.SECONDS.sleep(1);}while (distroComponentHolder.getDataStorageTypes().isEmpty()) {Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");TimeUnit.SECONDS.sleep(1);}for (String each : distroComponentHolder.getDataStorageTypes()) { // 注解@15if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each)); // 加载快照}}
}

注解@15 为不同的数据类型缓存快照,此处有gRPC和http两类数据类型。即:Nacos:Naming:v2:ClientData和com.alibaba.nacos.naming.iplist.

private boolean loadAllDataSnapshotFromRemote(String resourceType) {DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);if (null == transportAgent || null == dataProcessor) {return false;}for (Member each : memberManager.allMembersWithoutSelf()) { // 注解@16try {DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());boolean result = dataProcessor.processSnapshot(distroData);if (result) {distroComponentHolder.findDataStorage(resourceType).finishInitial(); // 设置为完成初始化return true;}} catch (Exception e) {}}return false;
}

注解@16 获取集群中除了本节点的其他节点,循环重试获取快照,直到有成功节点返回快照,成功后设置状态状态完成初始化「finishInitial」。

@Override
public DistroData getDatumSnapshot(String targetServer) {Member member = memberManager.find(targetServer);if (checkTargetServerStatusUnhealthy(member)) {throw new DistroException(String.format("[DISTRO] Cancel get snapshot caused by target server %s unhealthy", targetServer));}DistroDataRequest request = new DistroDataRequest();// 设置请求操作为SNAPSHOTrequest.setDataOperation(DataOperation.SNAPSHOT); try {// 发起请求快照数据Response response = clusterRpcClientProxy.sendRequest(member, request);if (checkResponse(response)) {return ((DistroDataResponse) response).getDistroData();} else {throw new DistroException(String.format("[DISTRO-FAILED] Get snapshot request to %s failed, code: %d, message: %s",targetServer, response.getErrorCode(), response.getMessage()));}} catch (NacosException e) {throw new DistroException("[DISTRO-FAILED] Get distro snapshot failed! ", e);}
}

接下来看看其他节点收到快照请求如何响应的

还是翻到DistroDataRequestHandler#handle,具体由handleSnapshot()方法来处理。

private DistroDataResponse handleSnapshot() {DistroDataResponse result = new DistroDataResponse();DistroData distroData = distroProtocol.onSnapshot(DistroClientDataProcessor.TYPE);result.setDistroData(distroData);return result;
}
@Override
public DistroData getDatumSnapshot() {List<ClientSyncData> datum = new LinkedList<>();// 把本节点的所有client数据全部封装for (String each : clientManager.allClientId()) {Client client = clientManager.getClient(each);if (null == client || !client.isEphemeral()) {continue;}datum.add(client.generateSyncData());}ClientSyncDatumSnapshot snapshot = new ClientSyncDatumSnapshot();snapshot.setClientSyncDataList(datum);byte[] data = ApplicationUtils.getBean(Serializer.class).serialize(snapshot); // 序列化数据return new DistroData(new DistroKey(DataOperation.SNAPSHOT.name(), TYPE), data);
}

下面看下client数据信息,命名空间、分组名称、服务名称、节点Instance信息(IP、端口等等)。

public ClientSyncData generateSyncData() {List<String> namespaces = new LinkedList<>();List<String> groupNames = new LinkedList<>();List<String> serviceNames = new LinkedList<>();List<InstancePublishInfo> instances = new LinkedList<>();for (Map.Entry<Service, InstancePublishInfo> entry : publishers.entrySet()) {namespaces.add(entry.getKey().getNamespace());groupNames.add(entry.getKey().getGroup());serviceNames.add(entry.getKey().getName());instances.add(entry.getValue());}return new ClientSyncData(getClientId(), namespaces, groupNames, serviceNames, instances);
}

小结: 集群中每个节点都拥有所有的快照数据;在节点启动时会从集群中其他节点中的一个节点同步快照数据并缓存在Map中;缓存的数据类型分类两类分别为HTTP和gRPC;具体数据即客户端注册节点信息含命名空间、分组名称、服务名称、节点Instance信息等。

Nacos6# Distro协议全量同步与校验相关推荐

  1. Elasticsearch 的全量同步和增量同步

    (1)全量同步 什么是全量同步:将一个mysql的整个表的所有数据都同步到es中 常用插件是logstash-input-jdbc,logstash通过sql语句分区间对数据进行查询,然后输出到es进 ...

  2. MongoDB 3.4 复制集全量同步改进

    3.2版本复制集同步的过程参考MongoDB 复制集同步原理解析 在 3.4 版本里 MongoDB 对复制集同步的全量同步阶段做了2个改进 在拷贝数据的时候同时建立所有的索引,在之前的版本里,拷贝数 ...

  3. Redis持久化机制 -全量同步与增量同步的区别

    全量同步与增量同步的区别 全量同步:就是每天定时(避开高峰期)或者采用一个周期实现将数据拷贝到一个地方也就是Rdb存储. 增量同步:比如采用对行为的操作实现对数据的同步,也就是AOF. 全量与增量的比 ...

  4. 【图解】redis主从同步流程——全量同步、部分同步、命令传播

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言 一.全量同步 二.部分同步 三.命令传播 总结 前言 本文主要介绍Redis高可用下的主从同步问题,包括全量同步.部分 ...

  5. 数据同步之全量同步与增量同步

    一.什么是数据同步 业务数据是数据仓库的重要数据来源,我们需要每日定时从业务数据库中抽取数据,传输到数据仓库中,之后再对数据进行分析统计. 为保证统计结果的正确性,需要保证数据仓库中的数据与业务数据库 ...

  6. MySQL全量同步和增量同步-

    me:为啥你们队答辩时说的话, 我听了没啥感觉, 评委听了直接就深有感触了? 资深:生产不可能只同步一张表和10W数据, 其他队伍用jdbc的方法同步全量数据, 在真实环境决绝报错.生产都是5000万 ...

  7. es 全量同步mysql_什么时候该用MySQL,什么时候该用ES呢?

    作者:张sir 来源:京东技术 京东到家订单中心系统业务中,无论是外部商家的订单生产,或是内部上下游系统的依赖,订单查询的调用量都非常大,造成了订单数据读多写少的情况. 我们把订单数据存储在MySQL ...

  8. DM 源码阅读系列文章(四)dump/load 全量同步的实现

    作者:杨非 本文为 DM 源码阅读系列文章的第四篇,上篇文章 介绍了数据同步处理单元实现的功能,数据同步流程的运行逻辑以及数据同步处理单元的 interface 设计.本篇文章在此基础上展开,详细介绍 ...

  9. es 全量同步mysql_使用canal将mysql同步到es中

    因为自己项目中需要用到mysql数据同步到es中,查找了相关资料最后决定用canal来做,所以便有了本文,下面一起来看如何使用canal吧 canal教程 根据 https://github.com/ ...

  10. 大数据-数仓-数据采集-业务数据(二):全量同步采集【MySQL<-->DataX(全量)<-->HDFS】【每日全量:每天都将业务数据库中全部数据同步到数据仓库,是保证两侧数据同步的最简单方式】

    DataX源码地址:GitHub - alibaba/DataX: DataX是阿里云DataWorks数据集成的开源版本. 一.第1章 DataX介绍 1.1 DataX概述 1. 介绍:DataX ...

最新文章

  1. linux内存分配缺陷,Linux系统优化-内存错误分析
  2. 【NLP保姆级教程】手把手带你RNN文本分类(附代码)
  3. Day6 数据清洗(2)
  4. 用iptables实现NAT
  5. mysql之前缀索引
  6. 【开发经验】fiddler实现请求转发
  7. .c与.cpp的区别解析
  8. CF918D: MADMAX 题解
  9. C++编程-买卖股票的最佳时机
  10. vue run dev报错 缺少package.json文件
  11. IQ使命 Marrakech 马拉喀什(六边形图案)攻略
  12. logo设计的色彩颜色搭配-北泓设计
  13. 安全刻不容缓「GitHub 热点速览 v.21.50」
  14. P3793礼物和糖果
  15. PS中的画笔工具和修饰模式(画笔模式)
  16. error C2041: illegal digit ‘9‘ for base ‘8‘ | error C2059: syntax error: ‘bad suffix on number‘
  17. JVM之一:GC垃圾回收原理及算法分析
  18. python:命令行与环境
  19. CSUOJ 1644 超能陆战队
  20. 如果让你设计铁道部购票网站,你怎么做

热门文章

  1. u盘无法格式化不在计算机中,在电脑中,为什么U盘不能格式化?
  2. word无法在公式编辑器中输入字符
  3. 磁盘空间的三种分配方式
  4. 两个分数化简比怎么化_化简比的六种方法
  5. 计算机中振荡器作用,振荡器工作原理介绍
  6. arduino代码_Arduino超声波传感器测距代码完全解析
  7. 客户端禁止 Cookie,Session怎么实现
  8. 微信小程序获取Appsecret报错40125“invalid appsecret”
  9. 小米开发版安装magisk_小米9SE不刷recovery直接安装Magisk面具的详细教程
  10. 计算机拒绝访问移动硬盘,移动硬盘无法访问拒绝访问,教你移动硬盘无法访问拒绝访问怎么办...