接上篇TODO

Elasticsearch CCR源码分析

上篇TODO:

http请求(ccr/follow)接收到后,follow集群节点开始全量同步,是以snapshot的模式去拉leader集群数据的,那么是在什么时候将leader集群伪装成snapshot的repository的?理论上应该是在Node初始化的时候...还未验证,后续再补充该逻辑...

如何加载ccr repositoryService

1.node启动:

进入构造方法:

org.elasticsearch.node.Node#Node(org.elasticsearch.env.Environment, java.util.Collection<java.lang.Class<? extends org.elasticsearch.plugins.Plugin>>, boolean)

2.CcrRepositoryManager在节点启动的时候通过调用为每一个remoteCluster注册了名为 “_ccr_” + remoteCluster 的ccrRepository

Collection<Object> pluginComponents = pluginsService.filterPlugins(Plugin.class).stream().flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,scriptModule.getScriptService(), xContentRegistry, environment, nodeEnvironment,namedWriteableRegistry).stream()).collect(Collectors.toList());

在createComponents方法(org.elasticsearch.xpack.ccr.Ccr#createComponents)中,调用了

new CcrRepositoryManager(settings, clusterService, (NodeClient) client)

继续加载其他...

开始调用org.elasticsearch.xpack.ccr.CcrRepositoryManager.RemoteSettingsUpdateListener#updateRemoteCluster

@Override
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy, boolean compressionEnabled,TimeValue pingSchedule) {String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias;if (addresses.isEmpty()) {deleteRepository(repositoryName);} else {putRepository(repositoryName);}}

经历一系列方法:

最终--->org.elasticsearch.xpack.ccr.CcrRepositoryManager#putRepository

3.加载snapshot/restore的module

modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), transportService,clusterService, threadPool, xContentRegistry));

4.进入RepositoriesModule构造方法:

Map<String, Repository.Factory> newRepoTypes = repoPlugin.getInternalRepositories(env, namedXContentRegistry);

5.进入getInternalRepositories实现类:

执行重写的 getInternalRepositories方法

org.elasticsearch.xpack.ccr.Ccr#getInternalRepositories

@Override
public Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {Repository.Factory repositoryFactory =(metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(), threadPool.get());return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory);}

6.org.elasticsearch.repositories.RepositoriesModule#RepositoriesModule构造了repositoriesService

 

7.开始注入初始化的一些bean或者service(相当于Spring的单例池)

injector = modules.createInjector();

8.node初始化的最后,暴露rest API

if (NetworkModule.HTTP_ENABLED.get(settings)) {logger.debug("initializing HTTP handlers ...");actionModule.initRestHandlers(() -> clusterService.state().nodes());}

9.小结

Node初始化的时候会将很多handler、service初始化完成(包括CCR相关service,repository),并且直接为每一个remoteCluster注册了名为 “_ccr_” + remoteCluster的ccrRepository

等到触发全量同步的时候(API触发或者auto API自动触发,检测是否是开启了Ccr且是Ccr的Restore流程,则会走到CcrRepository,否则走正常的repository),会走到org.elasticsearch.xpack.ccr.repository.CcrRepository的逻辑开始restore(即将leader集群视为一个仓库,leader数据视为名为“_latest_”的一个snapshot,获取元数据 & 数据)

void restoreFiles() throws IOException {ArrayList<FileInfo> fileInfos = new ArrayList<>();for (StoreFileMetaData fileMetaData : sourceMetaData) {ByteSizeValue fileSize = new ByteSizeValue(fileMetaData.length());fileInfos.add(new FileInfo(fileMetaData.name(), fileMetaData, fileSize));}SnapshotFiles snapshotFiles = new SnapshotFiles(LATEST, fileInfos);// 获取数据的核心方法restore(snapshotFiles);}

TODO

到此CCR的逻辑已经完全通了,CCR借助snapshot的逻辑进行restore的细节仍然不明朗,例如,把leader集群视为一个repository,将leader集群的数据作为snapshot,是如何进行恢复的?直接拷贝文件到follow集群本地,follow集群的shard逐一开始恢复?增量复制的时候直接读取leader集群的operations(translog),是直接读取在内存中直接回放(bulk写入follow集群shard)?还是其他逻辑?

Elasticsearch CCR源码分析(补充)相关推荐

  1. Elasticsearch CCR源码分析

    本文基于Elasticsearch6.8.5版本 ES使用的是Guice框架,依赖注入和暴露接口的方式和Spring差距较大,可先查看guice框架 节点启动过程: org/elasticsearch ...

  2. 【Elasticsearch源码】CCR源码分析(一)

    1 CCR的基本概念 什么是CCR? CCR( cross-cluster replication):跨集群复制是ES 6.5发布的一个新的特性:可以将两个集群中的数据进行远程复制. 集群复制类似于数 ...

  3. 【Elasticsearch源码】CCR源码分析(二)

    接上一篇:[Elasticsearch源码]CCR源码分析(一). sendShardChangesRequest方法最终进入到ShardChangesAction.TransportAction#s ...

  4. elasticsearch _field_stats 源码分析

    _field_stats 实现的功能:https://www.elastic.co/guide/en/elasticsearch/reference/5.6/search-field-stats.ht ...

  5. CCR源码分析-CCR架构

    CCR,并发与协调运行时(Concurrency and Coordination Runtime).从名字里我们就可以知道,这个东东是用来简化并发程序设计的.为何要并发呢?因为我们有多个任务需要处理 ...

  6. elasticsearch源码分析之search模块(server端)

    elasticsearch源码分析之search模块(server端) 继续接着上一篇的来说啊,当client端将search的请求发送到某一个node之后,剩下的事情就是server端来处理了,具体 ...

  7. Elasticsearch源码分析—线程池(十一) ——就是从队列里处理请求

    Elasticsearch源码分析-线程池(十一) 转自:https://www.felayman.com/articles/2017/11/10/1510291570687.html 线程池 每个节 ...

  8. elasticsearch源码分析之search模块(client端)

    elasticsearch源码分析之search模块(client端) 注意,我这里所说的都是通过rest api来做的搜索,所以对于接收到请求的节点,我姑且将之称之为client端,其主要的功能我们 ...

  9. elasticsearch index、create和update的源码分析

    https://segmentfault.com/a/1190000011272749 社区里面有人问了如下一个问题: 执行 bulk 索引文档的时候,用 index 或者 create 类型并且自定 ...

最新文章

  1. eclipse工作空间配置导出
  2. Ghost后不能启动解决小工具
  3. PHP中面向对象的图片处理类
  4. 最长配对(51Nod-2494)
  5. php特殊符号写入excel_PHP:使用PEAR写入excel文件
  6. Notepad++ 配合TCC直接编译运行C代码
  7. linux内核killler,Linux 的 OOM Killer 机制分析
  8. MyBatis学习笔记01
  9. 模仿QZONE挂图效果
  10. Pycharm同步代码到Github 踩坑
  11. Java之实现简单中文笔画验证码
  12. Oracle数据库之导入导出
  13. (CVPR 2020) PointGroup: Dual-Set Point Grouping for 3D Instance Segmentation
  14. 华为鸿蒙系统操作教程_鸿蒙OS Beta版怎么使用
  15. 从svn拉下的代码无状态图标(绿色对勾)的解决方法
  16. Vivado中FIFO遇到【Common17-55】警告总结
  17. Protobuf数据格式解析
  18. php开发公众号素材管理总结
  19. 呼叫中心中继网关参数选型
  20. javaSE<String和StringBuffer和StringBuider>day11

热门文章

  1. 《Xmind 用好思维导图走上开挂人生》记录
  2. [海森推荐]人类相容性:人工智能与控制问题
  3. 应用包含Involution算子的RedNet实现Classification
  4. mov ah,4ch int 21的作用
  5. pandas 读表格_手把手教你数据分析(1)--Pandas读取Excel信息
  6. 计算机设备维修预算申,维修费用申请报告
  7. 如何轻松快速搭建商城系统?
  8. 文件扫描-TWAIN,WIA,ISIS,SANE
  9. layui实现文件压缩上传_基于SSM框架、Layui的多文件上传、包括图片,压缩包,音频等文件(与数据库挂钩) - 爱秧博客...
  10. c51中的intrins.h库函数