【Elasticsearch源码】 GET分析
带着疑问学源码,第四篇:Elasticsearch GET
代码分析基于:https://github.com/jiankunking/elasticsearch
Elasticsearch 7.10.2+
通过前3篇的学习,可以稍微总结一下Elasticsearch:
- ES是一个集群,所以每个Node都需要和其他的Nodes 进行交互,这些交互是通过NodeClient来完成。
- ES中RPC、HTTP请求都是基于Netty自行封装的:
- NettyTransport 对应RPC协议支持
- NettyHttpServerTransport 则对应HTTP协议支持
- Transport*Action 是比较核心的类集合:
- Action -> Transport*Action
- TransportAction -> TransportHandler(即使是本地Node也会通过发请求的方式,将处理转发到TransportHandler处理)
- 真实干活的Transport*Action类(或者其父类)中doExecute(…)
目的
在看源码之前先梳理一下,自己对于GET流程疑惑的点:
- 是不是根据Document _id通过hash找到对应的Shard?
- 根据Document _id查询如何做到实时可见的?
源码分析
第二部分是代码分析的过程,不想看的朋友可以跳过直接看第三部分总结。
通过搜索/{index}/_doc/{id}可以找到RestGetAction,找到RestGetAction再加上前面的总结,其实就知道真实干活的是TransportGetAction。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IFN0q7Qv-1647420796417)(/images/elasticsearch-get-source-code-analysis/TransportGetAction.png)]
在TransportGetAction的父类TransportSingleShardAction中找到了doExecute:
@Overrideprotected void doExecute(Task task, Request request, ActionListener<Response> listener) {new AsyncSingleAction(request, listener).start();}// TransportSingleShardAction的AsyncSingleAction中private AsyncSingleAction(Request request, ActionListener<Response> listener) {this.listener = listener;ClusterState clusterState = clusterService.state();if (logger.isTraceEnabled()) {logger.trace("executing [{}] based on cluster state version [{}]", request, clusterState.version());}// 集群nodes列表nodes = clusterState.nodes();ClusterBlockException blockException = checkGlobalBlock(clusterState);if (blockException != null) {throw blockException;}String concreteSingleIndex;if (resolveIndex(request)) {concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName();} else {concreteSingleIndex = request.index();}this.internalRequest = new InternalRequest(request, concreteSingleIndex);// TransportGetAction中resolveRequest// 解析请求,更新指定routingresolveRequest(clusterState, internalRequest);blockException = checkRequestBlock(clusterState, internalRequest);if (blockException != null) {throw blockException;}// 根据路由算法获取目标shard的迭代器或者根据优先级获选择目标节点this.shardIt = shards(clusterState, internalRequest);}// TransportGetAction中@Overrideprotected ShardIterator shards(ClusterState state, InternalRequest request) {return clusterService.operationRouting().getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(),request.request().preference());}
下面看一下OperationRouting中的getShards(…)看一下是如何获取到具体的shardId的:
public ShardIterator getShards(ClusterState clusterState, String index, String id, @Nullable String routing,@Nullable String preference) {return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().getLocalNodeId(),clusterState.nodes(), preference, null, null);}protected IndexShardRoutingTable shards(ClusterState clusterState, String index, String id, String routing) {int shardId = generateShardId(indexMetadata(clusterState, index), id, routing);return clusterState.getRoutingTable().shardRoutingTable(index, shardId);}public static int generateShardId(IndexMetadata indexMetadata, @Nullable String id, @Nullable String routing) {final String effectiveRouting;final int partitionOffset;// routing参数解析可以参考具体的文档// https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.htmlif (routing == null) {assert(indexMetadata.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";effectiveRouting = id;} else {effectiveRouting = routing;}if (indexMetadata.isRoutingPartitionedIndex()) {partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetadata.getRoutingPartitionSize());} else {// we would have still got 0 above but this check just saves us an unnecessary hash calculationpartitionOffset = 0;}return calculateScaledShardId(indexMetadata, effectiveRouting, partitionOffset);}private static int calculateScaledShardId(IndexMetadata indexMetadata, String effectiveRouting, int partitionOffset) {final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;// we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size// of original index to hash documentsreturn Math.floorMod(hash, indexMetadata.getRoutingNumShards()) / indexMetadata.getRoutingFactor();}
到这里可以知道ES就是通过Document _id hash找到对应的shard。
下面看一下是如何做到实时可见的?
数据节点接收协调节点请求的入口为:TransportSingleShardAction.ShardTransportHandler# messageReceived:
@Overridepublic void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {if (logger.isTraceEnabled()) {logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);}asyncShardOperation(request, request.internalShardId, new ChannelActionListener<>(channel, transportShardAction, request));}
具体执行是在子类TransportGetAction#asyncShardOperation中:
@Overrideprotected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());IndexShard indexShard = indexService.getShard(shardId.id());// 关于realtime可以看一下官方文档// https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.htmlif (request.realtime()) { // we are not tied to a refresh cycle here anywaysuper.asyncShardOperation(request, shardId, listener);} else {indexShard.awaitShardSearchActive(b -> {try {super.asyncShardOperation(request, shardId, listener);} catch (Exception ex) {listener.onFailure(ex);}});}}
TransportGetAction#asyncShardOperation获取文档最终调用的是:
@Overrideprotected GetResponse shardOperation(GetRequest request, ShardId shardId) {IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());IndexShard indexShard = indexService.getShard(shardId.id());// 关于realtime、refresh可以看一下官方文档// https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.htmlif (request.refresh() && !request.realtime()) {indexShard.refresh("refresh_flag_get");}GetResult result = indexShard.getService().get(request.id(), request.storedFields(),request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());return new GetResponse(result);}
shardOperation先检查是否需要refresh,然后调用indexShard.getService().get()读取数据并存储到GetResult中。读取及过滤 在ShardGetService#get()函数中,调用:
GetResult getResult = innerGet(…);
获取结果。GetResult类用于存储读取的真实数据内容。核心的数据读取实现在ShardGetService#innerGet(…)函数中:
private GetResult innerGet(String id, String[] gFields, boolean realtime, long version, VersionType versionType,long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) {fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);// 调用Engine获取数据Engine.GetResult get = indexShard.get(new Engine.Get(realtime, realtime, id).version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));assert get.isFromTranslog() == false || realtime : "should only read from translog if realtime enabled";if (get.exists() == false) {get.close();}if (get == null || get.exists() == false) {return new GetResult(shardId.getIndexName(), id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null, null);}try {// 获取返回结果// break between having loaded it from translog (so we only have _source), and having a document to loadreturn innerGetLoadFromStoredFields(id, gFields, fetchSourceContext, get, mapperService);} finally {get.close();}}//对指定的field、source进行过滤(source过滤只支持对字段),//把结果存于GetResult对象中private GetResult innerGetLoadFromStoredFields(String id, String[] storedFields, FetchSourceContext fetchSourceContext,Engine.GetResult get, MapperService mapperService) {assert get.exists() : "method should only be called if document could be retrieved";// check first if stored fields to be loaded don't contain an object fieldDocumentMapper docMapper = mapperService.documentMapper();if (storedFields != null) {for (String field : storedFields) {Mapper fieldMapper = docMapper.mappers().getMapper(field);if (fieldMapper == null) {if (docMapper.mappers().objectMappers().get(field) != null) {// Only fail if we know it is a object field, missing paths / fields shouldn't fail.throw new IllegalArgumentException("field [" + field + "] isn't a leaf field");}}}}Map<String, DocumentField> documentFields = null;Map<String, DocumentField> metadataFields = null;BytesReference source = null;DocIdAndVersion docIdAndVersion = get.docIdAndVersion();// force fetching source if we read from translog and need to recreate stored fieldsboolean forceSourceForComputingTranslogStoredFields = get.isFromTranslog() && storedFields != null &&Stream.of(storedFields).anyMatch(f -> TranslogLeafReader.ALL_FIELD_NAMES.contains(f) == false);FieldsVisitor fieldVisitor = buildFieldsVisitors(storedFields,forceSourceForComputingTranslogStoredFields ? FetchSourceContext.FETCH_SOURCE : fetchSourceContext);if (fieldVisitor != null) {try {docIdAndVersion.reader.document(docIdAndVersion.docId, fieldVisitor);} catch (IOException e) {throw new ElasticsearchException("Failed to get id [" + id + "]", e);}source = fieldVisitor.source();// in case we read from translog, some extra steps are needed to make _source consistent and to load stored fieldsif (get.isFromTranslog()) {// Fast path: if only asked for the source or stored fields that have been already provided by TranslogLeafReader,// just make source consistent by reapplying source filters from mapping (possibly also nulling the source)if (forceSourceForComputingTranslogStoredFields == false) {try {source = indexShard.mapperService().documentMapper().sourceMapper().applyFilters(source, null);} catch (IOException e) {throw new ElasticsearchException("Failed to reapply filters for [" + id + "] after reading from translog", e);}} else {// Slow path: recreate stored fields from original sourceassert source != null : "original source in translog must exist";SourceToParse sourceToParse = new SourceToParse(shardId.getIndexName(), id, source, XContentHelper.xContentType(source),fieldVisitor.routing());ParsedDocument doc = indexShard.mapperService().documentMapper().parse(sourceToParse);assert doc.dynamicMappingsUpdate() == null : "mapping updates should not be required on already-indexed doc";// update special fieldsdoc.updateSeqID(docIdAndVersion.seqNo, docIdAndVersion.primaryTerm);doc.version().setLongValue(docIdAndVersion.version);// retrieve stored fields from parsed docfieldVisitor = buildFieldsVisitors(storedFields, fetchSourceContext);for (IndexableField indexableField : doc.rootDoc().getFields()) {IndexableFieldType fieldType = indexableField.fieldType();if (fieldType.stored()) {FieldInfo fieldInfo = new FieldInfo(indexableField.name(), 0, false, false, false, IndexOptions.NONE,DocValuesType.NONE, -1, Collections.emptyMap(), 0, 0, 0, false);StoredFieldVisitor.Status status = fieldVisitor.needsField(fieldInfo);if (status == StoredFieldVisitor.Status.YES) {if (indexableField.numericValue() != null) {fieldVisitor.objectField(fieldInfo, indexableField.numericValue());} else if (indexableField.binaryValue() != null) {fieldVisitor.binaryField(fieldInfo, indexableField.binaryValue());} else if (indexableField.stringValue() != null) {fieldVisitor.objectField(fieldInfo, indexableField.stringValue());}} else if (status == StoredFieldVisitor.Status.STOP) {break;}}}// retrieve source (with possible transformations, e.g. source filterssource = fieldVisitor.source();}}// put stored fields into result objectsif (!fieldVisitor.fields().isEmpty()) {fieldVisitor.postProcess(mapperService::fieldType);documentFields = new HashMap<>();metadataFields = new HashMap<>();for (Map.Entry<String, List<Object>> entry : fieldVisitor.fields().entrySet()) {if (mapperService.isMetadataField(entry.getKey())) {metadataFields.put(entry.getKey(), new DocumentField(entry.getKey(), entry.getValue()));} else {documentFields.put(entry.getKey(), new DocumentField(entry.getKey(), entry.getValue()));}}}}if (source != null) {// apply request-level source filteringif (fetchSourceContext.fetchSource() == false) {source = null;} else if (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0) {Map<String, Object> sourceAsMap;// TODO: The source might be parsed and available in the sourceLookup but that one uses unordered maps so different.// Do we care?Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source, true);XContentType sourceContentType = typeMapTuple.v1();sourceAsMap = typeMapTuple.v2();sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());try {source = BytesReference.bytes(XContentFactory.contentBuilder(sourceContentType).map(sourceAsMap));} catch (IOException e) {throw new ElasticsearchException("Failed to get id [" + id + "] with includes/excludes set", e);}}}return new GetResult(shardId.getIndexName(), id, get.docIdAndVersion().seqNo, get.docIdAndVersion().primaryTerm,get.version(), get.exists(), source, documentFields, metadataFields);}
下面看一下InternalEngine的读取过程:
InternalEngine#get过程会加读锁。处理realtime选项,如果为true,则先判断是否有数据可以刷盘,然后调用Searcher进行读取。Searcher是对IndexSearcher的封装。
从ES 5.x开始不会从translog中读取,只从Lucene中读。realtime的实现机制变成依靠refresh实现。参考官方链接:https://github.com/elastic/elasticsearch/pull/20102
@Overridepublic GetResult get(Get get, DocumentMapper mapper, Function<Engine.Searcher, Engine.Searcher> searcherWrapper) {assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();try (ReleasableLock ignored = readLock.acquire()) {ensureOpen();// 处理realtime选项,判断是否需要刷盘if (get.realtime()) {final VersionValue versionValue;// versionMap中的值是写入索引的时候添加的,不会写到磁盘中try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {// we need to lock here to access the version map to do this truly in RTversionValue = getVersionFromMap(get.uid().bytes());}if (versionValue != null) {if (versionValue.isDelete()) {return GetResult.NOT_EXISTS;}if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {throw new VersionConflictEngineException(shardId, get.id(),get.versionType().explainConflictForReads(versionValue.version, get.version()));}if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (get.getIfSeqNo() != versionValue.seqNo || get.getIfPrimaryTerm() != versionValue.term)) {throw new VersionConflictEngineException(shardId, get.id(),get.getIfSeqNo(), get.getIfPrimaryTerm(), versionValue.seqNo, versionValue.term);}if (get.isReadFromTranslog()) {// this is only used for updates - API _GET calls will always read form a reader for consistency// the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0if (versionValue.getLocation() != null) {try {final Translog.Operation operation = translog.readOperation(versionValue.getLocation());if (operation != null) {return getFromTranslog(get, (Translog.Index) operation, mapper, searcherWrapper);}} catch (IOException e) {maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic eventthrow new EngineException(shardId, "failed to read operation from translog", e);}} else {trackTranslogLocation.set(true);}}assert versionValue.seqNo >= 0 : versionValue;//执行刷盘操作refreshIfNeeded("realtime_get", versionValue.seqNo);}// 调用Searcher读取数据return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper));} else {// we expose what has been externally expose in a point in time snapshot via an explicit refreshreturn getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper));}}}
小结
- GET是根据Document _id 哈希找到对应的shard的。
- 根据Document _id查询的实时可见是通过依靠refresh实现的。
参考资料:
《Elasticsearch源码解析与优化实战》
【Elasticsearch源码】 GET分析相关推荐
- Elasticsearch源码分析—线程池(十一) ——就是从队列里处理请求
Elasticsearch源码分析-线程池(十一) 转自:https://www.felayman.com/articles/2017/11/10/1510291570687.html 线程池 每个节 ...
- elasticsearch源码分析之search模块(server端)
elasticsearch源码分析之search模块(server端) 继续接着上一篇的来说啊,当client端将search的请求发送到某一个node之后,剩下的事情就是server端来处理了,具体 ...
- elasticsearch源码分析之search模块(client端)
elasticsearch源码分析之search模块(client端) 注意,我这里所说的都是通过rest api来做的搜索,所以对于接收到请求的节点,我姑且将之称之为client端,其主要的功能我们 ...
- 【Elasticsearch源码】CCR源码分析(一)
1 CCR的基本概念 什么是CCR? CCR( cross-cluster replication):跨集群复制是ES 6.5发布的一个新的特性:可以将两个集群中的数据进行远程复制. 集群复制类似于数 ...
- 【Elasticsearch源码】CCR源码分析(二)
接上一篇:[Elasticsearch源码]CCR源码分析(一). sendShardChangesRequest方法最终进入到ShardChangesAction.TransportAction#s ...
- 渣渣菜鸡的 ElasticSearch 源码解析 —— 启动流程(上)
关注我 转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/08/11/es-code02/ 前提 上篇文章写了 ElasticSearch 源码解析 -- ...
- [1]elasticsearch源码编译
本文是elasticsearch源码分析系列文档的第一篇,本篇简单介绍了elasticsearch源码在本机的编译环境搭建 用到的工具有:IntelliJ Idea,JDK1.8,gradle3.5, ...
- Mybatis底层原理学习(二):从源码角度分析一次查询操作过程
在阅读这篇文章之前,建议先阅读一下我之前写的两篇文章,对理解这篇文章很有帮助,特别是Mybatis新手: 写给mybatis小白的入门指南 mybatis底层原理学习(一):SqlSessionFac ...
- 跟我学Kafka源码Producer分析
2019独角兽企业重金招聘Python工程师标准>>> 跟我学Kafka源码Producer分析 博客分类: MQ 本章主要讲解分析Kafka的Producer的业务逻辑,分发逻辑和 ...
- MyBatis源码骨架分析
源码包分析 MyBatis 源码下载地址:https://github.com/MyBatis/MyBatis-3 MyBatis源码导入过程: 下载MyBatis的源码 检查maven的版本,必须是 ...
最新文章
- Scala:Enumeration
- myeclipse按.自动提示方法
- Anaconda 查找安装的环境,以及移除旧环境命令
- 安全终止MFC线程全
- 为什么超长列表数据的翻页技术实现复杂(二)
- 卸载win10装Ubuntu笔记
- 1、两数之和(python)
- eclipse及tomcat设置编码
- Altium Designer 9 学习笔记(一)基础操作
- 基于java的cad_基于JavaCAD架构的安全性与IP保护研究
- Android adb shell命令详解及实例
- 怎么让模糊的数字变清楚_如何用ps将模糊图片变清晰?
- 穷举法破解密码-方法详解
- [转]我的故乡 - 冰心
- 14、UI_02拨号盘动画
- POJ,3713 Transferring Sylla
- 两轮自平衡车系统的模型构建
- mp4转gif在线转换,视频转换成gif动图怎么做?
- 编写优雅的JavaScript——前言
- 计组-booth乘法-谈原理和实现
热门文章
- 关于NPOI在word表格中插入行的问题
- htmlunit第一个爬虫演示 目标网址http://ent.sina.com.cn/film/
- macbook连接外接键盘
- 【建议收藏】 11个适合程序员逛的在线社区
- 计算机网络实验四协议分析心得,计算机网络原理实验_使用网络协议分析仪Wireshark...
- 项目管理的过程中如何做好沟通管理
- 基于JavaScript的Web端股票价格查看器——大道
- MIUI小米主题和迪士尼中国达成授权合作 推出专属定制主题
- dota2显示连接不上服务器没有响应,Win10登录不上dota2提示“无法与任何服务器建立连接”怎么办?...
- poi生成word转pdf