Nacos 的AP

Nacos的AP模式,采用server之间互相的数据同步来实现数据在集群中的同步、复制操作

触发数据广播

public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {@Overridepublic void put(String key, Record value) throws NacosException {onPut(key, value);taskDispatcher.addTask(key);}
}

当调用 ConsistencyService中定义的put、remove方法时,涉及到了server端数据的变更 此时会创建一个任务,将数据的key传入 taskDispatcher.addTask 方法中,用于后面数据变更时数据查找操作。

public class TaskDispatcher {public void addTask(String key) {taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);}
}

这里有一个方法需要注意——shakeUp,查看官方代码注解可知这是将keykey可以看作是一次数据变更事件)这里应该是将任务均匀的路由到不同的TaskScheduler对象,确保每个TaskScheduler所承担的任务都差不多。

public class TaskScheduler implements Runnable {....private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);....public void addTask(String key) {queue.offer(key);}@Overridepublic void run() {List<String> keys = new ArrayList<>();while (true) {try {String key = queue.poll(partitionConfig.getTaskDispatchPeriod(), TimeUnit.MILLISECONDS);if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {Loggers.DISTRO.debug("got key: {}", key);}if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {continue;}if (StringUtils.isBlank(key)) {continue;}if (dataSize == 0) {keys = new ArrayList<>();}keys.add(key);dataSize++;if (dataSize == partitionConfig.getBatchSyncKeyCount()|| (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {// bookmark 核心代码模块 将数据在nacos server 中进行广播操作for (Member member : dataSyncer.getServers()) {// bookmark 是自己的话 不需要广播操作if (NetUtils.localServer().equals(member.getAddress())) {continue;}// bookmark 创建SyncTaskSyncTask syncTask = new SyncTask();// bookmark 设置事件集合(keys 集合)syncTask.setKeys(keys);// bookmark 将目标server信息设置到SyncTask中syncTask.setTargetServer(member.getAddress());if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {Loggers.DISTRO.debug("add sync task: {}", JacksonUtils.toJson(syncTask));}// bookmark 将数据广播任务提交到DataSync中dataSyncer.submit(syncTask, 0);}lastDispatchTime = System.currentTimeMillis();dataSize = 0;}} catch (Exception e) {Loggers.DISTRO.error("dispatch sync task failed.", e);}}}
}

核心代码就是for (Server member : dataSyncer.getServers()) {..}循环体内的代码,此处就是将数据在Nacos Server中进行广播操作;具体步骤如下:

  • 创建SyncTask,并设置事件集合(就是key集合)
  • 将目标Server信息设置到SyncTask中——syncTask.setTargetServer(member.getKey())
  • 将数据广播任务提交到DataSyncer

执行数据广播 DataSyncer

public class DataSyncer {public void submit(SyncTask task, long delay) {// If it's a new task:// bookmark 是否是新任务if (task.getRetryCount() == 0) {// bookmark 遍历所有的keysIterator<String> iterator = task.getKeys().iterator();while (iterator.hasNext()) {String key = iterator.next();// bookmark 添加进taskMapif (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {// associated key already exist:if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("sync already in process, key: {}", key);}iterator.remove();}}}if (task.getKeys().isEmpty()) {// all keys are removed:return;}// bookmark 提交一个数据广播任务GlobalExecutor.submitDataSync(() -> {// 1. check the serverif (getServers() == null || getServers().isEmpty()) {Loggers.SRV_LOG.warn("try to sync data but server list is empty.");return;}List<String> keys = task.getKeys();if (Loggers.SRV_LOG.isDebugEnabled()) {Loggers.SRV_LOG.debug("try to sync data for this keys {}.", keys);}// bookmark 通过SyncTask中的keys 去DataStore中去查询key所对一个的数据集合// 2. get the datums by keys and check the datum is empty or notMap<String, Datum> datumMap = dataStore.batchGet(keys);if (datumMap == null || datumMap.isEmpty()) {// clear all flags of this task:for (String key : keys) {taskMap.remove(buildKey(key, task.getTargetServer()));}return;}// bookmark 对数据进行序列化操作,转化为 byte[]数组byte[] data = serializer.serialize(datumMap);long timestamp = System.currentTimeMillis();// bookmark 内部会发起http请求 进行数据广播boolean success = NamingProxy.syncData(data, task.getTargetServer());if (!success) {// bookmark 如果数据广播失败,// bookmark 将任务重新打包再次压入GlobalExecutor中SyncTask syncTask = new SyncTask();syncTask.setKeys(task.getKeys());syncTask.setRetryCount(task.getRetryCount() + 1);syncTask.setLastExecuteTime(timestamp);syncTask.setTargetServer(task.getTargetServer());retrySync(syncTask);} else {// clear all flags of this task:for (String key : task.getKeys()) {taskMap.remove(buildKey(key, task.getTargetServer()));}}}, delay);}
}

GlobalExecutor.submitDataSync(Runnable runnable)提交一个数据广播任务;首先通过SyncTask中的key集合去DataStore中去查询key所对应的数据集合,然后对数据进行序列化操作,转为byte[]数组后,执行Http请求操作——NamingProxy.syncData(data, task.getTargetServer());如果数据广播失败,则将任务重新打包再次压入GlobalExecutor中。

NamingProxy.syncData 方法:

public class NamingProxy { public static boolean syncData(byte[] data, String curServer) {Map<String, String> headers = new HashMap<>(128);headers.put(HttpHeaderConsts.CLIENT_VERSION_HEADER, VersionUtils.version);headers.put(HttpHeaderConsts.USER_AGENT_HEADER, UtilsAndCommons.SERVER_VERSION);headers.put("Accept-Encoding", "gzip,deflate,sdch");headers.put("Connection", "Keep-Alive");headers.put("Content-Encoding", "gzip");try {/** bookmark PUT http://ip:port/nacos/v1/ns//distro/datum 该url的处理器为{@link com.alibaba.nacos.naming.controllers.DistroController#onSyncDatum(Map)}*/HttpClient.HttpResult result = HttpClient.httpPutLarge("http://" + curServer + ApplicationUtils.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT+ DATA_ON_SYNC_URL, headers, data);if (HttpURLConnection.HTTP_OK == result.code) {return true;}if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) {return true;}throw new IOException("failed to req API:" + "http://" + curServer + ApplicationUtils.getContextPath()+ UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_ON_SYNC_URL + ". code:" + result.code + " msg: "+ result.content);} catch (Exception e) {Loggers.SRV_LOG.warn("NamingProxy", e);}return false;}
}

这里将数据提交到了URL为PUT http://ip:port/nacos/v1/ns//distro/datum中,而该URL对应的处理位置是com.alibaba.nacos.naming.controllers.DistroController#onSyncDatum(Map)

public class DistroController {@PutMapping("/datum")public ResponseEntity onSyncDatum(@RequestBody Map<String, Datum<Instances>> dataMap) throws Exception {if (dataMap.isEmpty()) {Loggers.DISTRO.error("[onSync] receive empty entity!");throw new NacosException(NacosException.INVALID_PARAM, "receive empty entity!");}for (Map.Entry<String, Datum<Instances>> entry : dataMap.entrySet()) {if (KeyBuilder.matchEphemeralInstanceListKey(entry.getKey())) {String namespaceId = KeyBuilder.getNamespace(entry.getKey());String serviceName = KeyBuilder.getServiceName(entry.getKey());if (!serviceManager.containService(namespaceId, serviceName) && switchDomain.isDefaultInstanceEphemeral()) {serviceManager.createEmptyService(namespaceId, serviceName, true);}// bookmark 进行数据的更新操作,onPut方法不会涉及 taskDispatcher.addTask操作,而是将数据更新压入了Notifier的Task列表中consistencyService.onPut(entry.getKey(), entry.getValue().value);}}return ResponseEntity.ok("ok");}
}
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {public void onPut(String key, Record value) {    if (KeyBuilder.matchEphemeralInstanceListKey(key)) {Datum<Instances> datum = new Datum<>();datum.value = (Instances) value;datum.key = key;datum.timestamp.incrementAndGet();dataStore.put(key, datum);}if (!listeners.containsKey(key)) {return;}notifier.addTask(key, ApplyAction.CHANGE);}
}

数据同步操作

Nacos源码分析-注册中心-Distro相关推荐

  1. 《跟二师兄学Nacos吧》第1篇 Nacos客户端服务注册源码分析

    开篇构想 在此之前,已经写了十多篇Nacos的文章,感觉Nacos还值得更深入的学习一下.于是萌生了写一个Nacos源码系列专栏的文章. 写作的目标呢,有两个:第一,能够系统的学习Nacos知识:第二 ...

  2. Dubbo服务注册源码分析

    本代码版本基于Dubbo2.7.8版本进行源码分析 注册概览 扫描所有@DubboService注解, 加载配置文件, 装载注解中的所有属性, 把每个服务都封装成一个ServiceBean, 注入到S ...

  3. Android—EventBus使用与源码分析

    EventBus 安卓事件发布/订阅框架 事件传递既可用于Android四大组件间通讯 EventBus的优点是代码简洁,使用简单,并将事件发布和订阅充分解耦 在onStart进行注册,onStop进 ...

  4. SpringCloudAlibaba注册中心与配置中心之利器Nacos实战与源码分析(下)

    源码资料 文档资料 <<Nacos架构与原理>>书籍于2021.12.21发布,并在Nacos官方网站非常Nice的提供其电子书的下载.我们学习Nacos源码更多是要吸取其优秀 ...

  5. Nacos注册中心CP架构Raft源码分析

    @toc[] 一.CAP介绍 二.Nacos如何设置CP.AP模式 我们使用nacos的时候,有一个关于节点类型的配置: cloud:nacos:discovery:server-addr: 192. ...

  6. 阿里面试这样问:Nacos配置中心交互模型是 push 还是 pull ?(原理+源码分析)...

    本文来源:公众号「 程序员内点事」 对于Nacos大家应该都不太陌生,出身阿里名声在外,能做动态服务发现.配置管理,非常好用的一个工具.然而这样的技术用的人越多面试被问的概率也就越大,如果只停留在使用 ...

  7. eureka 之前的服务如何关闭_干货分享 | 服务注册中心Spring Cloud Eureka部分源码分析...

    友情提示:全文13000多文字,预计阅读时间10-15分钟 Spring Cloud Eureka作为常用的服务注册中心,我们有必要去了解其内在实现机制,这样出现问题的时候我们可以快速去定位问题.当我 ...

  8. apollo源码分析 感知_Kitty中的动态线程池支持Nacos,Apollo多配置中心了

    目录 回顾昨日 nacos 集成 Spring Cloud Alibaba 方式 Nacos Spring Boot 方式 Apollo 集成 自研配置中心对接 无配置中心对接 实现源码分析 兼容 A ...

  9. 源码分析Dubbo前置篇-寻找注册中心、服务提供者、服务消费者功能入口

    本节主要阐述如下两个问题:  1.Dubbo自定义标签实现.  2.dubbo通过Spring加载配置文件后,是如何触发注册中心.服务提供者.服务消费者按照Dubbo的设计执行相关的功能.  所谓的执 ...

  10. Nacos 服务端健康检查及客户端服务订阅机制源码分析(三)

    Nacos 服务端健康检查 长连接 概念:长连接,指在一个连接上可以连续发送多个数据包,在连接保持期间,如果没有数据包发送,需要双方发送链路检测包 注册中心客户端 2.0 以后使用 gRPC 代替 h ...

最新文章

  1. 各种光学仪器成像技术(上)
  2. 双非山东科技胜过吉大,湖南大学超哈工大,US News2022世界大学排行榜引热议...
  3. 如何在Keras中训练大型数据集
  4. C++容器适配器之priority_queue
  5. php 计算字符串相邻最大重复数_php查找字符串出现次数的方法
  6. JavaScript:零星知识
  7. mysql5.0 java连接_Java连接mysql5.0
  8. Easy.Ajax 部分源代码, 支持文件上传功能, 兼容所有主流浏览器
  9. C语言中信号函数(signal)的使用
  10. Java编程作业体会_Java作业的几点总结感想
  11. debugfs查看文件块号,dd命令读指定块号的内容
  12. 文档大小超出上传限制怎么办_一键PDF转Word、PPT、图片等文档,这才是办公族必备的效率神器!...
  13. 面试题:怎么以最快速度计算8*4:
  14. python能用于机械设计吗_为什么人工智能首选Python?因为有很多适用于ML和DL的Python库!...
  15. 【Kubernetes 015】pod调度之Affinity亲和性
  16. 大数据培训:Hadoop生态系统圈
  17. 《MYSQL是怎样运行的》笔记|配置文件|系统变量|字符集|InnoDB存储结构|数据页结构|索引结构与使用|数据目录|表空间|连表原理|查询优化|BufferPool|事务|redo与undo|锁
  18. 专利 | 基于微表情与脑波分析算法的心理健康状况分析算法
  19. 机器学习编译入门课程学习笔记第二讲 张量程序抽象
  20. arduino灯带随音乐_【创客玩音乐】用灯带让音乐可视化

热门文章

  1. Linux命令详解之 cat
  2. 太阳能发电与蓄电池研究(Matlab代码实现)
  3. 董明珠接连直播背后:格力的线上焦虑
  4. 迅雷 应版权方要求,文件无法下载 解决方法
  5. InnoDB行记录格式
  6. 大前端技术发展趋势刨析
  7. 第三方SSD问题引起电脑频繁重启问题IONVMeController.cpp:5499
  8. mysql 向上取整_mysql的取整函数
  9. java代码对图片缩放
  10. 开发步骤_社交app开发步骤,送给不懂app开发的你