1 CCR的基本概念

什么是CCR?

CCR( cross-cluster replication):跨集群复制是ES 6.5发布的一个新的特性:可以将两个集群中的数据进行远程复制。

集群复制类似于数据订阅的方式,一个集群的数据可以被多个集群订阅,也就是可以被复制到多个集群上面去。

CCR 有两个角色,一个是 Leader,表示数据的源头,另外一个Follower,表示数据的订阅方,得到的是数据副本。
CCR 工作在索引层面,使用 Pull 的模式,Follower 索引主动的去 Pull Leader 的数据。

为什么需要CCR?

CCR可以解决下面的几个场景的问题:

  1. 集群高可用以及灾难恢复,在不同的地域部署多套ES集群,通过CCR实时同步数据。
  2. 实现数据的就近访问(地理),数据的就近访问,提升访问速度。
  3. 集中式的报告集群,通过多个备集群的处理,主集群可以进行分析处理。

2 CCR的使用

CCR是怎么使用的呢?

1. 本地集群连接远程集群

通过远程集群的节点进行连接,使用transport端口,命令如下:

PUT /_cluster/settings
{"persistent" : {"cluster" : {"remote" : {"leader" : {"seeds" : ["127.0.0.1:9300" ]}}}}
}

2. 在远程集群创建leader索引

远程集群创建leader索引,和正常创建索引操作一样,需要开启soft_deletes,不过在7.x版本之后,已经默认开启了。

3. 在本地集群创建follower索引

创建follower索引有两种方式:手动创建和自动跟随方式。

手动创建:需要指定远程集群和复制的远程索引

PUT /<follower_index>/_ccr/follow?wait_for_active_shards=1
{"remote_cluster" : "<remote_cluster>","leader_index" : "<leader_index>"
}

自动跟随创建:通过auto_follow API建立自动跟随的模板

PUT /_ccr/auto_follow/<auto_follow_pattern_name>
{"remote_cluster" : "<remote_cluster>","leader_index_patterns" :["<leader_index_pattern>"],"follow_index_pattern" : "<follow_index_pattern>"
}

这里不详细介绍CCR的使用,更多API请参考官网链接:Cross-cluster replication APIs。

3 CCR的源码分析

CCR的底层是怎么实现的呢?如何将远程集群的数据实时同步到本地集群呢?

主要涉及两部分:全量复制和增量复制

CCR是以plugins插件的形式:入口类在Ccr,继承了ActionPlugin(注册action),PersistentTaskPlugin(注册持久性任务),EnginePlugin(engine引擎接口),RepositoryPlugin(自定义快照接口)。

主要注册了下面几类action。
内部请求action: 内部的核心实现,用于处理数据的同步复制;状态请求action:用于获取follow和ccr的状态;follow actions:开启,暂停以及恢复follow的处理;auto-follow actions:自动跟随follow的处理action。
而且实现了FollowingEngine用于follow shard,实现了CcrRepository依靠远程集群快照用于还原数据。如下所示:

    public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {return Arrays.asList(// internal actionsnew ActionHandler<>(BulkShardOperationsAction.INSTANCE, TransportBulkShardOperationsAction.class),new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class),new ActionHandler<>(PutInternalCcrRepositoryAction.INSTANCE,PutInternalCcrRepositoryAction.TransportPutInternalRepositoryAction.class),new ActionHandler<>(DeleteInternalCcrRepositoryAction.INSTANCE,DeleteInternalCcrRepositoryAction.TransportDeleteInternalRepositoryAction.class),new ActionHandler<>(PutCcrRestoreSessionAction.INSTANCE,PutCcrRestoreSessionAction.TransportPutCcrRestoreSessionAction.class),new ActionHandler<>(ClearCcrRestoreSessionAction.INSTANCE,ClearCcrRestoreSessionAction.TransportDeleteCcrRestoreSessionAction.class),new ActionHandler<>(GetCcrRestoreFileChunkAction.INSTANCE,GetCcrRestoreFileChunkAction.TransportGetCcrRestoreFileChunkAction.class),// stats actionnew ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class),new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class),new ActionHandler<>(FollowInfoAction.INSTANCE, TransportFollowInfoAction.class),// follow actionsnew ActionHandler<>(PutFollowAction.INSTANCE, TransportPutFollowAction.class),new ActionHandler<>(ResumeFollowAction.INSTANCE, TransportResumeFollowAction.class),new ActionHandler<>(PauseFollowAction.INSTANCE, TransportPauseFollowAction.class),new ActionHandler<>(UnfollowAction.INSTANCE, TransportUnfollowAction.class),// auto-follow actionsnew ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class),new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class),new ActionHandler<>(GetAutoFollowPatternAction.INSTANCE, TransportGetAutoFollowPatternAction.class),// forget follower actionnew ActionHandler<>(ForgetFollowerAction.INSTANCE, TransportForgetFollowerAction.class));}public Optional<EngineFactory> getEngineFactory(final IndexSettings indexSettings) {.....return Optional.of(new FollowingEngineFactory());.....}public Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {Repository.Factory repositoryFactory =(metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get(), threadPool.get());return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory);}

虽然整个复制是在索引层面进行管理的,但实际复制是在shard级别进行的。创建完follower索引之后,会自动配置和leader索引相同的mapping以及settings。然后follower索引中的shard发起request从leader索引中拉取数据。

3.1 全量复制

当创建follower索引时,会先构造快照和仓库,然后进行快照恢复。

代码入口:TransportPutFollowAction#masterOperation
因为TransportPutFollowAction继承了TransportMasterNodeAction,所以TransportMasterNodeAction#doExecute->AsyncSingleAction#start->AsyncSingleAction#doStart->TransportPutFollowAction#masterOperation

先连接远程集群,获取leader index,然后创建follower index,代码如下:

    protected void masterOperation(....) {.....// 连接远程集群String remoteCluster = request.getRemoteCluster();client.getRemoteClusterClient(remoteCluster);// 获取远程集群的leaderIndex String leaderIndex = request.getLeaderIndex();// 创建follower indexccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(client,remoteCluster,leaderIndex,listener::onFailure,(historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, request, listener)); }

创建follower index的过程如下:

  1. 先检查leaderIndexMetaData 是否为空,再检查是否开启了soft deletes。
  2. 根据内部PutInternalCcrRepositoryRequest创建好的快照仓库,快照仓库创建的逻辑在PutInternalCcrRepositoryAction#doExecute->RepositoriesService#registerInternalRepository->RepositoriesService#createRepository,调用的是snapshot模块自身的实现。
  3. 构建restoreRequest,快照恢复request。
  4. 在线程池中启动snapshot线程,进入restoreSnapshot方法进行快照恢复,调用的是snapshot模块自身的实现。
  5. afterRestoreStarted方法监听并处理整个快照恢复过程,当所有的shard都恢复成功之后,返回成功。

这里不深入分析snapshot模块是如何创建快照仓库和恢复快照的,这部分实现原理后续再更新。

    private void createFollowerIndex(.....) {.....// 参数检查// 创建好的快照仓库final String leaderClusterRepoName = CcrRepository.NAME_PREFIX + request.getRemoteCluster();// 构建restoreRequest 快照恢复requestfinal RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST).indices(request.getLeaderIndex()).indicesOptions(request.indicesOptions()).renamePattern("^(.*)$").renameReplacement(request.getFollowerIndex()).masterNodeTimeout(request.masterNodeTimeout()).indexSettings(settingsBuilder);final Client clientWithHeaders = CcrLicenseChecker.wrapClient(this.client, threadPool.getThreadContext().getHeaders());// 在线程池中执行snapshot线程请求threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {@Overridepublic void onFailure(Exception e) {listener.onFailure(e);}@Overrideprotected void doRun() throws Exception {// 执行快照恢复restoreService.restoreSnapshot(restoreRequest, new ActionListener<RestoreService.RestoreCompletionResponse>() {@Overridepublic void onResponse(RestoreService.RestoreCompletionResponse response) {// 开始恢复之后进行监听afterRestoreStarted(clientWithHeaders, request, listener, response);}@Overridepublic void onFailure(Exception e) {listener.onFailure(e);}});}});}

3.2 增量复制

前面讲过复制是在shard级别进行的,每个shard的复制都有对应的shardFollowTask,CCR插件注册了ShardFollowTasksExecutor执行器,follower接收到read request之后,会先将它们放置在写缓存区,ShardFollowTask管理器管理缓存区,并将缓存区的写请求批量提交给followe shard进行数据写入。

代码入口:ShardFollowTasksExecutor#nodeOperation

在每个执行node上执行的操作,进入ShardFollowNodeTask#start,更新follower索引的mapping和settings,确保和leader索引的mapping和settings版本一致。

    updateMapping(0L, leaderMappingVersion -> {......updateSettings(leaderSettingsVersion -> {synchronized (ShardFollowNodeTask.this) {currentSettingsVersion = leaderSettingsVersion;}......coordinateReads();});});

然后执行调用coordinateReads方法,根据seq_no和GlobalCheckpoint判断read request的范围,并通过sendShardChangesRequest方法发送该read request。
hasReadBudget方法主要判断read容量是否已满:1.是否已经超过了并发read的最大数量;2.是否已经超过buffer size的限制;3.是否已经超过了buffer count的限制。

    synchronized void coordinateReads() {final int maxReadRequestOperationCount = params.getMaxReadRequestOperationCount();// 当read容量未满且最新的seq_no小于leader的GlobalCheckpoint(即最新操作),则有数据需要更新while (hasReadBudget() && lastRequestedSeqNo < leaderGlobalCheckpoint) {final long from = lastRequestedSeqNo + 1;final long maxRequiredSeqNo = Math.min(leaderGlobalCheckpoint, from + maxReadRequestOperationCount - 1);final int requestOpCount;if (numOutstandingReads == 0) {requestOpCount = maxReadRequestOperationCount;} else {requestOpCount = Math.toIntExact(maxRequiredSeqNo - from + 1);}numOutstandingReads++;sendShardChangesRequest(from, requestOpCount, maxRequiredSeqNo);lastRequestedSeqNo = maxRequiredSeqNo;}if (numOutstandingReads == 0 && hasReadBudget()) {numOutstandingReads++;long from = lastRequestedSeqNo + 1;sendShardChangesRequest(from, maxReadRequestOperationCount, lastRequestedSeqNo);}}

接下一篇:【Elasticsearch源码】CCR源码分析(二)。

【Elasticsearch源码】CCR源码分析(一)相关推荐

  1. Elasticsearch CCR源码分析(补充)

    接上篇TODO Elasticsearch CCR源码分析 上篇TODO: http请求(ccr/follow)接收到后,follow集群节点开始全量同步,是以snapshot的模式去拉leader集 ...

  2. Elasticsearch CCR源码分析

    本文基于Elasticsearch6.8.5版本 ES使用的是Guice框架,依赖注入和暴露接口的方式和Spring差距较大,可先查看guice框架 节点启动过程: org/elasticsearch ...

  3. 【Elasticsearch源码】CCR源码分析(二)

    接上一篇:[Elasticsearch源码]CCR源码分析(一). sendShardChangesRequest方法最终进入到ShardChangesAction.TransportAction#s ...

  4. Colly源码解析——结合例子分析底层实现

    通过<Colly源码解析--框架>分析,我们可以知道Colly执行的主要流程.本文将结合http://go-colly.org上的例子分析一些高级设置的底层实现.(转载请指明出于break ...

  5. java 头尾 队列_源码|jdk源码之栈、队列及ArrayDeque分析

    栈.队列.双端队列都是非常经典的数据结构.和链表.数组不同,这三种数据结构的抽象层次更高.它只描述了数据结构有哪些行为,而并不关心数据结构内部用何种思路.方式去组织. 本篇博文重点关注这三种数据结构在 ...

  6. 【Elasticsearch】Elasticsearch 7.6 IDEA 源码环境搭建

    1.概述 1.1 软件环境 Intellij idea:2019.3 Gradle gradle-6.3-all.zip JDK 12.0.2 macOS 10.15.4 1.2 注意 elastic ...

  7. 【Mybatis源码】源码分析

    [Mybatis源码]源码分析 (一)Mybatis重要组件 [1]四大核心组件 (1)SqlSessionFactoryBuilder (2)SqlSessionFactory (3)SqlSess ...

  8. yocto源码下载和目录分析

    文章目录 前言 一.搭建环境 电脑硬件要求 安装依赖 repo 什么是repo? 获取repo 设置git 二.获取yocto源码 三.yocto源码目录讲解 总结 前言 本文记录下载yocto的源码 ...

  9. openxr runtime Monado 源码解析 源码分析:CreateInstance流程(设备系统和合成器系统)Compositor comp_main client compositor

    monado系列文章索引汇总: openxr runtime Monado 源码解析 源码分析:源码编译 准备工作说明 hello_xr解读 openxr runtime Monado 源码解析 源码 ...

最新文章

  1. 帝国cms后台用户名密码正确显示您还未登录错误的原因及解决方案
  2. ubuntu14.04 nginx php编译安装,Ubuntu 14.04 编译安装 Nginx
  3. R语言实战应用精讲50篇(十五)-R语言如何实现数据的导入导出操作
  4. 对地图的marker 添加双击事件
  5. Delphi 计算儒略日(Julian day)的代码
  6. (转) Hibernate注解开发
  7. SNMP 简单网络管理协议
  8. 什么是ipo表,ipo图,hipo图
  9. linux怎么下载ftp文件夹,命令行 - 如何在Linux上通过FTP递归下载文件夹
  10. 【数电基础知识】各逻辑运算符号盘点
  11. 设置定时器,持续调用接口
  12. L2-036 网红点打卡攻略
  13. 【Linux】返回上级目录
  14. 为什么我没圣诞帽!?微信一键生成圣诞帽方法
  15. Sun工作站技术文档
  16. 苹果官方mfi认证名单_苹果入驻抖音,完成官方认证
  17. Java录制网页_Java 录制语音的实现代码
  18. 小程序也能是App中的引流神器?
  19. python中的pip命令在cmd下用不了?
  20. 数控车椭圆编程实例带图_数控车床加工椭圆的宏程序实例

热门文章

  1. IDEA设置标签多行展示
  2. 手游《六大门派》 武侠MMORPG手机游戏全套源码下载
  3. 2019浙江计算机二级考试考纲,计算机二级考试大纲(2019全国计算机二级考试大纲)...
  4. SpringBoot项目——————签到与签退管理(源码分享)
  5. Outlook发送后,收到的邮件都是纯文本格式
  6. RESIZE DATAFILE与ORA-03297
  7. 数控铣削图案及编程_数控铣床编程30例带图
  8. java智力题扑克牌_数学智力题:猜扑克牌
  9. 奉主耶稣基督的名,斥责一切魔鬼撒旦黑暗势力对我的捆绑,斥责一切邪灵对我的束缚,仇敌必然逃跑
  10. 信息无障碍研究机构---企业