Elasticsearch CCR源码分析
本文基于Elasticsearch6.8.5版本
ES使用的是Guice框架,依赖注入和暴露接口的方式和Spring差距较大,可先查看guice框架
节点启动过程:
org/elasticsearch/bootstrap/Elasticsearch.java(main)---》org/elasticsearch/node/Node.java(构造方法)加载插件&module---其中包括---》org/elasticsearch/xpack/ccr/Ccr.java(注册各种action)
CCR是插件的形式:
Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin, RepositoryPlugin
CCR的几个操作:
官网操作API:https://www.elastic.co/guide/en/elasticsearch/reference/7.x/ccr-apis.html
1.本地集群连接远程集群
PUT /_cluster/settings
{"persistent" : {"cluster" : {"remote" : {"leader" : {"seeds" : ["127.0.0.1:9300" ]}}}}
}
2.在远程集群创建leader索引
远程集群创建leader索引,和正常创建索引操作一样,需要开启soft_deletes(7.x版本已经默认开启)
3.在本地集群创建follower索引
手动创建:需要指定远程集群和复制的远程索引PUT /<follower_index>/_ccr/follow?wait_for_active_shards=1
{ "remote_cluster" : "<remote_cluster>", "leader_index" : "<leader_index>"
}
自动跟随创建:通过auto_follow API建立自动跟随的模板PUT /_ccr/auto_follow/<auto_follow_pattern_name>
{"remote_cluster" : "<remote_cluster>","leader_index_patterns" :["<leader_index_pattern>"],"follow_index_pattern" : "<follow_index_pattern>"
}
源码分析:
1.执行 _ccr/follow接口,手动创建一个follow索引,该接口会生成索引并完成全量同步
org.elasticsearch.xpack.ccr.rest.RestPutFollowAction类暴露该接口
/*** 暴露 {index}/_ccr/follow 接口* @param settings* @param controller*/public RestPutFollowAction(Settings settings, RestController controller) {super(settings);controller.registerHandler(RestRequest.Method.PUT, "/{index}/_ccr/follow", this);}
通过import关联到org.elasticsearch.xpack.core.ccr.action.PutFollowAction类
import static org.elasticsearch.xpack.core.ccr.action.PutFollowAction.INSTANCE;
PutFollowAction动作在ccr.java中注册了处理类:TransportPutFollowAction
TransportMasterNodeAction将请求转发给follow集群的master节点
在该方法中org.elasticsearch.action.support.master.TransportMasterNodeAction.AsyncSingleAction#doStart
执行:transportService.sendRequest这一行,实现请求转发(如果当前节点就是master,则不转发)
-->follow集群的master节点执行TransportMasterNodeAction.masterOperation()
-->TransportPutFollowAction.createFollowerIndex()
-->restoreSnapshot() 从"_ccr_$remoteClusterName"的repository的"_latest_"快照恢复
-->afterRestoreStarted()
--->initiateFollowing() 该方法核心主要是构造一个ResumeFollowRequest请求,ResumeFollowRequest请求的handler会对每个主分片构造一个任务,去不断刷新leader的更新数据(由此开始增量同步,上面的流程主要是全量同步)
---->TransportResumeFollowAction.start() 开始恢复数据并且修改index状态
----->persistentTasksService.sendStartRequest( ShardFollowTask.NAME, shardFollowTask) 作用:Notifies the master node to create new persistent task and to assign it to a node(通知follow集群的主节点创建新的持久性任务---增量同步数据,long pull,并将其分配给各个节点)
// 执行sendStartRequest之前先create了一个task
final ShardFollowTask shardFollowTask = createShardFollowTask(shardId, clusterNameAlias, request.getParameters(),leaderIndexMetadata, followIndexMetadata, filteredHeaders);
// 循环,对每个shard产生task
for (int shardId = 0; shardId < numShards; shardId++) {String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;final ShardFollowTask shardFollowTask = createShardFollowTask(shardId, clusterNameAlias, request.getParameters(),leaderIndexMetadata, followIndexMetadata, filteredHeaders);persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, handler.getActionListener(shardId));}
到此,只剩下增量同步的逻辑,全部由ShardFollowTask完成;
PS:入口类Ccr在getPersistentTasksExecutor
方法中,完成了对ShardFollowTasksExecutor
的注册,该类为执行ShardFollowTask
的执行器
如上sendStartRequest方法发送一个Task时,会触发到org.elasticsearch.cluster.service.ClusterApplierService#runTask监听,开始执行ShardFollowTasksExecutor中的方法;
------>org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask#start 开启增量复制
------->org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask#coordinateReads
---->org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask#sendShardChangesRequest(long, int, long)
----->org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask#innerSendShardChangesRequest
作用:获取setting、mapping、translog、 leaderGlobalCheckpoint、leaderMaxSeqNo并更新
--->org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask#handleReadResponse
--->org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask#innerHandleReadResponse
--->org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask#coordinateWrites
--->org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask#sendBulkShardOperationsRequest
--->org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask#
innerSendBulkShardOperationsRequest
作用:写入数据, 得到followerGlobalCheckpoint、followerMaxSeqNo并更新
2.执行 /auto_follow接口(当有新的集群创建时,自动触发全量同步)
1. 执行该API时,集群状态中会保存该pattern
2. Master节点上的AutoFollowCoordinator会一直监听remoteCluster的集群状态,如果发现新创建了索引,且符合pattern,则调用TransportPutFollowAction,执行与上述 _ccr/follow 接口一样的流程
请求接收细节流程:
在follow集群执行该api时代码执行流程(只关心CCR逻辑时可忽略)
1.接收到请求,经过netty的各个方法
2.org.elasticsearch.rest.RestController#dispatchRequest
(在RestController类中初始化NodeClient client,往后传递)
3.解析&处理该request
4.到达 org.elasticsearch.xpack.ccr.rest.RestPutFollowAction#getName
5.到org.elasticsearch.xpack.core.ccr.action.PutFollowAction#fromXContent 进一步处理request
6.执行client.execute到 org.elasticsearch.client.support.AbstractClient#execute
7.回到AbstractClient的实现类 org.elasticsearch.client.node.NodeClient#doExecute
8.到org.elasticsearch.client.node.NodeClient#executeLocally方法中执行transportAction(action).execute(request, listener); 到此关联到了transportAction
9.到org.elasticsearch.action.support.TransportAction#execute,继续执行到该行代码:requestFilterChain.proceed(task, actionName, request, listener);在该类继续执行到 this.action.filters[i].apply(task, actionName, request, listener, this);
10.到org.elasticsearch.xpack.security.action.filter.SecurityActionFilter#apply
执行到chain.proceed(task, action, request, listener);
11.到org.elasticsearch.action.support.TransportAction.RequestFilterChain#proceed
执行this.action.doExecute(task, request, listener);
12.到TransportMasterNodeAction类
org.elasticsearch.action.support.master.TransportMasterNodeAction#doExecute(org.elasticsearch.tasks.Task, Request, org.elasticsearch.action.ActionListener<Response>)
开启线程:new AsyncSingleAction(task, request, listener).start();
13.接下来是几个线程相关类
org.elasticsearch.common.util.concurrent.EsExecutors.DirectExecutorService#execute
org.elasticsearch.common.util.concurrent.AbstractRunnable#run
14.开始执行masterOperation方法,执行到org.elasticsearch.action.support.master.TransportMasterNodeAction#masterOperation(org.elasticsearch.tasks.Task, Request, org.elasticsearch.cluster.ClusterState, org.elasticsearch.action.ActionListener<Response>)
15.进一步找到masterOperation方法实现类TransportPutFollowAction
org.elasticsearch.xpack.ccr.action.TransportPutFollowAction#masterOperation
到此:关联到TransportPutFollowAction类
通过入口类的action绑定,完成在action中找到对应实现类的过程
重点:
增量同步是如何实现不断刷新的(long pull模式)???
1.coordinateReads方法读取leader集群信息以及operations
2.org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask#start
coordinateReads->sendShardChangesRequest(long, int, long)->sendShardChangesRequest(long, int, long, java.util.concurrent.atomic.AtomicInteger)->handleReadResponse->innerHandleReadResponse(执行了coordinateWrites)->coordinateReads/sendShardChangesRequest
TODO:
http请求(ccr/follow)接收到后,follow集群节点开始全量同步,是以snapshot的模式去拉leader集群数据的,那么是在什么时候将leader集群伪装成snapshot的repository的?理论上应该是在Node初始化的时候...还未验证,后续再补充该逻辑...
PS:
如有缺失或错误,欢迎补充&指正...
Elasticsearch CCR源码分析相关推荐
- Elasticsearch CCR源码分析(补充)
接上篇TODO Elasticsearch CCR源码分析 上篇TODO: http请求(ccr/follow)接收到后,follow集群节点开始全量同步,是以snapshot的模式去拉leader集 ...
- 【Elasticsearch源码】CCR源码分析(一)
1 CCR的基本概念 什么是CCR? CCR( cross-cluster replication):跨集群复制是ES 6.5发布的一个新的特性:可以将两个集群中的数据进行远程复制. 集群复制类似于数 ...
- 【Elasticsearch源码】CCR源码分析(二)
接上一篇:[Elasticsearch源码]CCR源码分析(一). sendShardChangesRequest方法最终进入到ShardChangesAction.TransportAction#s ...
- elasticsearch _field_stats 源码分析
_field_stats 实现的功能:https://www.elastic.co/guide/en/elasticsearch/reference/5.6/search-field-stats.ht ...
- CCR源码分析-CCR架构
CCR,并发与协调运行时(Concurrency and Coordination Runtime).从名字里我们就可以知道,这个东东是用来简化并发程序设计的.为何要并发呢?因为我们有多个任务需要处理 ...
- Elasticsearch源码分析—线程池(十一) ——就是从队列里处理请求
Elasticsearch源码分析-线程池(十一) 转自:https://www.felayman.com/articles/2017/11/10/1510291570687.html 线程池 每个节 ...
- elasticsearch源码分析之search模块(server端)
elasticsearch源码分析之search模块(server端) 继续接着上一篇的来说啊,当client端将search的请求发送到某一个node之后,剩下的事情就是server端来处理了,具体 ...
- elasticsearch源码分析之search模块(client端)
elasticsearch源码分析之search模块(client端) 注意,我这里所说的都是通过rest api来做的搜索,所以对于接收到请求的节点,我姑且将之称之为client端,其主要的功能我们 ...
- elasticsearch index、create和update的源码分析
https://segmentfault.com/a/1190000011272749 社区里面有人问了如下一个问题: 执行 bulk 索引文档的时候,用 index 或者 create 类型并且自定 ...
最新文章
- 【廖雪峰python入门笔记】break和continue
- 5.Scrapy与相关应用
- 干货丨人工智能、大数据领域从菜鸟到高手的晋级指南(经典长文,值得收藏)
- python 全局变量 局部变量
- python之路-网络编程
- Winform 打印PDF顺序混乱,获取打印队列
- Visual Studio Code里一个查看括号匹配的神器 - Bracket Pair Colorizer
- jwt令牌_jwt-cli:用于解码JSON Web令牌(JWT令牌)的Shell库
- 第1章 计算机系统漫游(深入理解计算机系统)
- Linux学习之CentOS(三)----将Cent0S 7的网卡名称eno16777736改为eth0
- mysql连接nacat_【mysql】使用Navicat连接数据库
- svg 贝塞尔曲线图解(记录)
- KMP算法 学习笔记
- linux下python3 安装tkinter库
- 解决 Mac OS 下罗技鼠标滚轮无法使用问题(滚动失效)
- vue音乐播放器demo,主要是模仿慕课音乐播放器教程制作的一个小demo
- 全球及中国汽车零部件用模塑料行业盈利预测及投资前景分析报告2021年版
- PS之立体台球制作步骤
- 华为手机设置5G调试
- 洛谷P1478 陶陶摘苹果(升级版)【水题】
热门文章
- 2021-10-27 孤尽训练营D2
- 通过npm运行管理端界面并解决npm install安装报错
- React报错:Warning: ReactDOM.render is no longer supported in React 18. Use createRoot instead. Until y
- 重装系统 重启计算机失败,电脑重装系统失败,不能开机了,怎么办,急求解决办法啊...
- 基于物理着色:BRDF
- Linux动态链接库编程入门
- windows “你尚未连接代理服务器可能有问题,或地址不正确“ 解决方案
- 从P1到P7——我在淘宝这7年转载
- 高性能零售IT系统的建设03-监控体系化的重要不亚于开发的投入
- html 渲染md文件,markdown的学习和.md文件使用