接上一篇:【Elasticsearch源码】CCR源码分析(一)。

sendShardChangesRequest方法最终进入到ShardChangesAction.TransportAction#shardOperation,跟据上面的read request,从Translog中获取该shard的seq_no范围内的所有Operation,返回最新的shard需要的Operation。

        protected Response shardOperation(Request request, ShardId shardId) throws IOException {.......// 获取Operationfinal Translog.Operation[] operations = getOperations(indexShard,seqNoStats.getGlobalCheckpoint(),request.getFromSeqNo(),request.getMaxOperationCount(),request.getExpectedHistoryUUID(),request.getMaxBatchSize());// 在快照操作完成之后,确保maxSeqNoOfUpdatesOrDeletes,索引元数据,mapping和setting是最新的final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();final IndexMetaData indexMetaData = indexService.getMetaData();final long mappingVersion = indexMetaData.getMappingVersion();final long settingsVersion = indexMetaData.getSettingsVersion();return getResponse(......);}

获取Operation的操作如下:先进行参数检验,然后创建Translog快照,遍历快照里面的Operation并添加直至超过最大的批次限制。

    static Translog.Operation[] getOperations(....) throws IOException {.....// 参数检验int seenBytes = 0;long toSeqNo = Math.min(globalCheckpoint, (fromSeqNo + maxOperationCount) - 1);final List<Translog.Operation> operations = new ArrayList<>();// 创建Translog快照,根据translog快照读取Operationtry (Translog.Snapshot snapshot = indexShard.newChangesSnapshot("ccr", fromSeqNo, toSeqNo, true)) {Translog.Operation op;while ((op = snapshot.next()) != null) {operations.add(op);seenBytes += op.estimateSize();if (seenBytes > maxBatchSize.getBytes()) {break;}}} catch (MissingHistoryOperationsException e) {......}return operations.toArray(EMPTY_OPERATIONS_ARRAY);}

sendShardChangesRequest方法中通过handleReadResponse方法监听处理Response。

void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) {// 处理read responseRunnable handleResponseTask = () -> innerHandleReadResponse(from, maxRequiredSeqNo, response);// 更新follow index mappingRunnable updateMappingsTask = () -> maybeUpdateMapping(response.getMappingVersion(),handleResponseTask);// 更新follow index settingsmaybeUpdateSettings(response.getSettingsVersion(), updateMappingsTask);}

调用innerHandleReadResponse方法对read response进行处理,如果response没有Operation,会重新发送sendShardChangesRequest请求,否则将response里面的所有Operation添加到buffer里面,然后进入write流程。

    synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) {.......if (response.getOperations().length == 0) { newFromSeqNo = from;} else {List<Translog.Operation> operations = Arrays.asList(response.getOperations());long operationsSize = operations.stream().mapToLong(Translog.Operation::estimateSize).sum();buffer.addAll(operations);bufferSizeInBytes += operationsSize;final long maxSeqNo = response.getOperations()[response.getOperations().length - 1].seqNo();newFromSeqNo = maxSeqNo + 1;lastRequestedSeqNo = Math.max(lastRequestedSeqNo, maxSeqNo);coordinateWrites();//进入write}if (newFromSeqNo <= maxRequiredSeqNo && isStopped() == false) {int newSize = Math.toIntExact(maxRequiredSeqNo - newFromSeqNo + 1);sendShardChangesRequest(newFromSeqNo, newSize, maxRequiredSeqNo); //重新发送请求} else {numOutstandingReads--;coordinateReads(); //重新进入read}}

write流程同样会先判断write容量是否满了,然后从buffer队列里面遍历所有的Operation添加到ops的ArrayList里面,并通过sendBulkShardOperationsRequest发生请求。

    private synchronized void coordinateWrites() {......while (hasWriteBudget() && buffer.isEmpty() == false) {long sumEstimatedSize = 0L;int length = Math.min(params.getMaxWriteRequestOperationCount(), buffer.size());List<Translog.Operation> ops = new ArrayList<>(length);for (int i = 0; i < length; i++) {Translog.Operation op = buffer.remove();ops.add(op);sumEstimatedSize += op.estimateSize();if (sumEstimatedSize > params.getMaxWriteRequestSize().getBytes()) {break;}}bufferSizeInBytes -= sumEstimatedSize;numOutstandingWrites++;// 发送bulk写请求sendBulkShardOperationsRequest(ops, leaderMaxSeqNoOfUpdatesOrDeletes, new AtomicInteger(0));}}

进入到TransportBulkShardOperationsAction类里面,开始写入主分片和副本分片,TransportBulkShardOperationsAction继承于TransportWriteAction类。

其写入流程和正常的写入bulk一致,只不过重写了shardOperationOnPrimary和shardOperationOnReplica方法。通过重放translog文件进行primary的写入,写入成功之后更新同步translog location并构建replicaRequest。副本写入过程,根据上面构建好的replicaRequest直接写入。

    public static CcrWritePrimaryResult shardOperationOnPrimary(....) throws IOException {......final List<Translog.Operation> appliedOperations = new ArrayList<>(sourceOperations.size());Translog.Location location = null;for (Translog.Operation sourceOp : sourceOperations) {final Translog.Operation targetOp = rewriteOperationWithPrimaryTerm(sourceOp, primary.getOperationPrimaryTerm()); //包含操作类型,和相关的信息以及sourcefinal Engine.Result result = primary.applyTranslogOperation(targetOp, Engine.Operation.Origin.PRIMARY);  // 通过重放translog文件,最终进入到了写primary的逻辑if (result.getResultType() == Engine.Result.Type.SUCCESS) {appliedOperations.add(targetOp);location = locationToSync(location, result.getTranslogLocation());  // 写入成功的话更新同步translog location} else {......}}// 写入主分片成功之后,构建replicaRequestfinal BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest(shardId, historyUUID, appliedOperations, maxSeqNoOfUpdatesOrDeletes);return new CcrWritePrimaryResult(replicaRequest, location, primary, logger);  //更新Checkpoint,SeqNo}public static WriteReplicaResult<BulkShardOperationsRequest> shardOperationOnReplica(.....) throws IOException {Translog.Location location = null;for (final Translog.Operation operation : request.getOperations()) {final Engine.Result result = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA);  //进入到写数据流程if (result.getResultType() != Engine.Result.Type.SUCCESS) {.....}location = locationToSync(location, result.getTranslogLocation());// 写入成功的话更新同步translog location}return new WriteReplicaResult<>(request, location, null, replica, logger);}

handleWriteResponse方法监听并处理sendBulkShardOperationsRequest的结果,每次处理成功numOutstandingWrites减1,直到numOutstandingWrites等于0,如果缓冲区有预量,则继续进行read。

private synchronized void handleWriteResponse(final BulkShardOperationsResponse response) {this.followerGlobalCheckpoint = Math.max(this.followerGlobalCheckpoint, response.getGlobalCheckpoint());this.followerMaxSeqNo = Math.max(this.followerMaxSeqNo, response.getMaxSeqNo());numOutstandingWrites--;assert numOutstandingWrites >= 0;coordinateWrites();// 缓冲区有预量时开始读取coordinateReads();}

总的来说,follower shard发送read request,在seq_no范围之内:如果leader shard有可用的新Operation,则按配置的参数来限制响应,然后写入数据;如果leader shard没有可用的新Operation,则在超时时间内等待;如果超时时间内发生了新的Operation,则立即对新的Operation进行响应,否则,如果超时,将会回复follower shard没有新的Operation。

read和write的过程中通过一个buffer缓存区来进行缓存:buffer是一个按seqNo进行排序的优先队列。

private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo));

4 小结

  1. CCR以插件的方式加载和使用,不侵入式修改内核;
  2. 采用了快照恢复的方式进行全量的复制;
  3. 增量复制的过程采用了从远程集群Translog事务日志里面获取所有的Operation并将数据写入本地集群;
  4. 复制是在shard级别的,所以每个shard有自身的Follower Shard Task;
  5. 集群间数据的一致性通过seq_no和GlobalCheckpoint来校验;
  6. ES的段文件在merge过程中可能会删除或更新部分doc的关联操作,会导致seq_no的变化,所以必须要使用soft_deletes,默认保留12小时。

【Elasticsearch源码】CCR源码分析(二)相关推荐

  1. 【Elasticsearch源码】CCR源码分析(一)

    1 CCR的基本概念 什么是CCR? CCR( cross-cluster replication):跨集群复制是ES 6.5发布的一个新的特性:可以将两个集群中的数据进行远程复制. 集群复制类似于数 ...

  2. Elasticsearch CCR源码分析(补充)

    接上篇TODO Elasticsearch CCR源码分析 上篇TODO: http请求(ccr/follow)接收到后,follow集群节点开始全量同步,是以snapshot的模式去拉leader集 ...

  3. Elasticsearch CCR源码分析

    本文基于Elasticsearch6.8.5版本 ES使用的是Guice框架,依赖注入和暴露接口的方式和Spring差距较大,可先查看guice框架 节点启动过程: org/elasticsearch ...

  4. 【Android 事件分发】ItemTouchHelper 源码分析 ( OnItemTouchListener 事件监听器源码分析 二 )

    Android 事件分发 系列文章目录 [Android 事件分发]事件分发源码分析 ( 驱动层通过中断传递事件 | WindowManagerService 向 View 层传递事件 ) [Andr ...

  5. SpringBoot源码分析(二)之自动装配demo

    SpringBoot源码分析(二)之自动装配demo 文章目录 SpringBoot源码分析(二)之自动装配demo 前言 一.创建RedissonTemplate的Maven服务 二.创建测试服务 ...

  6. gSOAP 源码分析(二)

    gSOAP 源码分析(二) 2012-5-24 flyfish 一 gSOAP XML介绍 Xml的全称是EXtensible Markup Language.可扩展标记语言.仅仅是一个纯文本.适合用 ...

  7. 开源中国源码学习UI篇(二)之NavigationDrawer+Fragment的使用分析

    前文链接:开源中国源码学习UI篇(一)之FragmentTabHost的使用分析 开源中国2.2版,完整源码地址为:http://git.oschina.net/oschina/android-app ...

  8. Android Q 10.1 KeyMaster源码分析(二) - 各家方案的实现

    写在之前 这两篇文章是我2021年3月初看KeyMaster的笔记,本来打算等分析完KeyMaster和KeyStore以后再一起做成一系列贴出来,后来KeyStore的分析中断了,这一系列的文章就变 ...

  9. 【投屏】Scrcpy源码分析二(Client篇-连接阶段)

    Scrcpy源码分析系列 [投屏]Scrcpy源码分析一(编译篇) [投屏]Scrcpy源码分析二(Client篇-连接阶段) [投屏]Scrcpy源码分析三(Client篇-投屏阶段) [投屏]Sc ...

最新文章

  1. Java基础知识强化68:基本类型包装类之Character概述和Character常见方法
  2. 十六进制数用int吗_你真的精通C语言吗?来解这十道C语言迷题试试吧!
  3. php接收post原始数据
  4. powerful number求积性函数前缀和
  5. 树莓派4B监控CPU占用率、内存使用率、磁盘使用量以及CPU温度
  6. 中国已消失的 9 所世界级大学
  7. Learning through Auxiliary Tasks——辅助任务学习or自监督学习中的pretext
  8. 7-2 求奇数和 (5 分)
  9. android利用多线程加载图片【不使用第三方库】
  10. uniapp 抖音授权登录、发布、分享 Ba-Aweme
  11. 《AngularJS深度剖析与最佳实践》一2.10 承诺
  12. Qt4 到Qt5 最小化后 点击任务栏不显示问题
  13. LeetCode二叉树系列——515.最每个树行中找最大值
  14. 青藤 #44 比例简化
  15. 用python来做一个五子棋游戏,源码分享~
  16. 如何理解充分条件和必要条件
  17. OpenGL ES:绘制函数glDrawArrays 和 glDrawElements 的区别
  18. 记录一个本地安装sonarqube的问题
  19. 使用Tuimgs在线工具对图片无损压缩教程
  20. 计算机硬件闫宏印答案,计算机硬件技术基础

热门文章

  1. 2023年华数杯国际赛数学建模
  2. 代码随想录训练营day9
  3. error C2448: 'Unknown' : function-style initializer appears to be a function definition
  4. 灰色预测模型--两秒直接上手
  5. ai前世识别_AI人脸识别前世今生app-AI人脸识别前世今生软件下载v2.0-西西软件下载...
  6. 用Python做市场调查:餐饮商铺的用户满意度分析
  7. 全球行政区划数据库 地理数据库
  8. HTTPSQS 1.7 版本更新内容
  9. 抖音测试的软件,抖音app测试版
  10. Apache Flink之架构概述和环境(章节一)