带着疑问学源码,第四篇:Elasticsearch GET
代码分析基于:https://github.com/jiankunking/elasticsearch
Elasticsearch 7.10.2+

通过前3篇的学习,可以稍微总结一下Elasticsearch:

  • ES是一个集群,所以每个Node都需要和其他的Nodes 进行交互,这些交互是通过NodeClient来完成。
  • ES中RPC、HTTP请求都是基于Netty自行封装的:
    • NettyTransport 对应RPC协议支持
    • NettyHttpServerTransport 则对应HTTP协议支持
  • Transport*Action 是比较核心的类集合:
    • Action -> Transport*Action
    • TransportAction -> TransportHandler(即使是本地Node也会通过发请求的方式,将处理转发到TransportHandler处理)
    • 真实干活的Transport*Action类(或者其父类)中doExecute(…)

目的

在看源码之前先梳理一下,自己对于GET流程疑惑的点:

  • 是不是根据Document _id通过hash找到对应的Shard?
  • 根据Document _id查询如何做到实时可见的?

源码分析

第二部分是代码分析的过程,不想看的朋友可以跳过直接看第三部分总结。

通过搜索/{index}/_doc/{id}可以找到RestGetAction,找到RestGetAction再加上前面的总结,其实就知道真实干活的是TransportGetAction。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IFN0q7Qv-1647420796417)(/images/elasticsearch-get-source-code-analysis/TransportGetAction.png)]

在TransportGetAction的父类TransportSingleShardAction中找到了doExecute:

    @Overrideprotected void doExecute(Task task, Request request, ActionListener<Response> listener) {new AsyncSingleAction(request, listener).start();}// TransportSingleShardAction的AsyncSingleAction中private AsyncSingleAction(Request request, ActionListener<Response> listener) {this.listener = listener;ClusterState clusterState = clusterService.state();if (logger.isTraceEnabled()) {logger.trace("executing [{}] based on cluster state version [{}]", request, clusterState.version());}// 集群nodes列表nodes = clusterState.nodes();ClusterBlockException blockException = checkGlobalBlock(clusterState);if (blockException != null) {throw blockException;}String concreteSingleIndex;if (resolveIndex(request)) {concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName();} else {concreteSingleIndex = request.index();}this.internalRequest = new InternalRequest(request, concreteSingleIndex);// TransportGetAction中resolveRequest// 解析请求,更新指定routingresolveRequest(clusterState, internalRequest);blockException = checkRequestBlock(clusterState, internalRequest);if (blockException != null) {throw blockException;}// 根据路由算法获取目标shard的迭代器或者根据优先级获选择目标节点this.shardIt = shards(clusterState, internalRequest);}// TransportGetAction中@Overrideprotected ShardIterator shards(ClusterState state, InternalRequest request) {return clusterService.operationRouting().getShards(clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(),request.request().preference());}

下面看一下OperationRouting中的getShards(…)看一下是如何获取到具体的shardId的:

 public ShardIterator getShards(ClusterState clusterState, String index, String id, @Nullable String routing,@Nullable String preference) {return preferenceActiveShardIterator(shards(clusterState, index, id, routing), clusterState.nodes().getLocalNodeId(),clusterState.nodes(), preference, null, null);}protected IndexShardRoutingTable shards(ClusterState clusterState, String index, String id, String routing) {int shardId = generateShardId(indexMetadata(clusterState, index), id, routing);return clusterState.getRoutingTable().shardRoutingTable(index, shardId);}public static int generateShardId(IndexMetadata indexMetadata, @Nullable String id, @Nullable String routing) {final String effectiveRouting;final int partitionOffset;// routing参数解析可以参考具体的文档// https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.htmlif (routing == null) {assert(indexMetadata.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";effectiveRouting = id;} else {effectiveRouting = routing;}if (indexMetadata.isRoutingPartitionedIndex()) {partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetadata.getRoutingPartitionSize());} else {// we would have still got 0 above but this check just saves us an unnecessary hash calculationpartitionOffset = 0;}return calculateScaledShardId(indexMetadata, effectiveRouting, partitionOffset);}private static int calculateScaledShardId(IndexMetadata indexMetadata, String effectiveRouting, int partitionOffset) {final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;// we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size// of original index to hash documentsreturn Math.floorMod(hash, indexMetadata.getRoutingNumShards()) / indexMetadata.getRoutingFactor();}

到这里可以知道ES就是通过Document _id hash找到对应的shard。

下面看一下是如何做到实时可见的?

数据节点接收协调节点请求的入口为:TransportSingleShardAction.ShardTransportHandler# messageReceived:

        @Overridepublic void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {if (logger.isTraceEnabled()) {logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);}asyncShardOperation(request, request.internalShardId, new ChannelActionListener<>(channel, transportShardAction, request));}

具体执行是在子类TransportGetAction#asyncShardOperation中:

    @Overrideprotected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());IndexShard indexShard = indexService.getShard(shardId.id());// 关于realtime可以看一下官方文档// https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.htmlif (request.realtime()) { // we are not tied to a refresh cycle here anywaysuper.asyncShardOperation(request, shardId, listener);} else {indexShard.awaitShardSearchActive(b -> {try {super.asyncShardOperation(request, shardId, listener);} catch (Exception ex) {listener.onFailure(ex);}});}}

TransportGetAction#asyncShardOperation获取文档最终调用的是:

    @Overrideprotected GetResponse shardOperation(GetRequest request, ShardId shardId) {IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());IndexShard indexShard = indexService.getShard(shardId.id());// 关于realtime、refresh可以看一下官方文档// https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.htmlif (request.refresh() && !request.realtime()) {indexShard.refresh("refresh_flag_get");}GetResult result = indexShard.getService().get(request.id(), request.storedFields(),request.realtime(), request.version(), request.versionType(), request.fetchSourceContext());return new GetResponse(result);}

shardOperation先检查是否需要refresh,然后调用indexShard.getService().get()读取数据并存储到GetResult中。读取及过滤 在ShardGetService#get()函数中,调用:
GetResult getResult = innerGet(…);
获取结果。GetResult类用于存储读取的真实数据内容。核心的数据读取实现在ShardGetService#innerGet(…)函数中:

private GetResult innerGet(String id, String[] gFields, boolean realtime, long version, VersionType versionType,long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) {fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);// 调用Engine获取数据Engine.GetResult get = indexShard.get(new Engine.Get(realtime, realtime, id).version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));assert get.isFromTranslog() == false || realtime : "should only read from translog if realtime enabled";if (get.exists() == false) {get.close();}if (get == null || get.exists() == false) {return new GetResult(shardId.getIndexName(), id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null, null);}try {// 获取返回结果// break between having loaded it from translog (so we only have _source), and having a document to loadreturn innerGetLoadFromStoredFields(id, gFields, fetchSourceContext, get, mapperService);} finally {get.close();}}//对指定的field、source进行过滤(source过滤只支持对字段),//把结果存于GetResult对象中private GetResult innerGetLoadFromStoredFields(String id, String[] storedFields, FetchSourceContext fetchSourceContext,Engine.GetResult get, MapperService mapperService) {assert get.exists() : "method should only be called if document could be retrieved";// check first if stored fields to be loaded don't contain an object fieldDocumentMapper docMapper = mapperService.documentMapper();if (storedFields != null) {for (String field : storedFields) {Mapper fieldMapper = docMapper.mappers().getMapper(field);if (fieldMapper == null) {if (docMapper.mappers().objectMappers().get(field) != null) {// Only fail if we know it is a object field, missing paths / fields shouldn't fail.throw new IllegalArgumentException("field [" + field + "] isn't a leaf field");}}}}Map<String, DocumentField> documentFields = null;Map<String, DocumentField> metadataFields = null;BytesReference source = null;DocIdAndVersion docIdAndVersion = get.docIdAndVersion();// force fetching source if we read from translog and need to recreate stored fieldsboolean forceSourceForComputingTranslogStoredFields = get.isFromTranslog() && storedFields != null &&Stream.of(storedFields).anyMatch(f -> TranslogLeafReader.ALL_FIELD_NAMES.contains(f) == false);FieldsVisitor fieldVisitor = buildFieldsVisitors(storedFields,forceSourceForComputingTranslogStoredFields ? FetchSourceContext.FETCH_SOURCE : fetchSourceContext);if (fieldVisitor != null) {try {docIdAndVersion.reader.document(docIdAndVersion.docId, fieldVisitor);} catch (IOException e) {throw new ElasticsearchException("Failed to get id [" + id + "]", e);}source = fieldVisitor.source();// in case we read from translog, some extra steps are needed to make _source consistent and to load stored fieldsif (get.isFromTranslog()) {// Fast path: if only asked for the source or stored fields that have been already provided by TranslogLeafReader,// just make source consistent by reapplying source filters from mapping (possibly also nulling the source)if (forceSourceForComputingTranslogStoredFields == false) {try {source = indexShard.mapperService().documentMapper().sourceMapper().applyFilters(source, null);} catch (IOException e) {throw new ElasticsearchException("Failed to reapply filters for [" + id + "] after reading from translog", e);}} else {// Slow path: recreate stored fields from original sourceassert source != null : "original source in translog must exist";SourceToParse sourceToParse = new SourceToParse(shardId.getIndexName(), id, source, XContentHelper.xContentType(source),fieldVisitor.routing());ParsedDocument doc = indexShard.mapperService().documentMapper().parse(sourceToParse);assert doc.dynamicMappingsUpdate() == null : "mapping updates should not be required on already-indexed doc";// update special fieldsdoc.updateSeqID(docIdAndVersion.seqNo, docIdAndVersion.primaryTerm);doc.version().setLongValue(docIdAndVersion.version);// retrieve stored fields from parsed docfieldVisitor = buildFieldsVisitors(storedFields, fetchSourceContext);for (IndexableField indexableField : doc.rootDoc().getFields()) {IndexableFieldType fieldType = indexableField.fieldType();if (fieldType.stored()) {FieldInfo fieldInfo = new FieldInfo(indexableField.name(), 0, false, false, false, IndexOptions.NONE,DocValuesType.NONE, -1, Collections.emptyMap(), 0, 0, 0, false);StoredFieldVisitor.Status status = fieldVisitor.needsField(fieldInfo);if (status == StoredFieldVisitor.Status.YES) {if (indexableField.numericValue() != null) {fieldVisitor.objectField(fieldInfo, indexableField.numericValue());} else if (indexableField.binaryValue() != null) {fieldVisitor.binaryField(fieldInfo, indexableField.binaryValue());} else if (indexableField.stringValue() != null) {fieldVisitor.objectField(fieldInfo, indexableField.stringValue());}} else if (status == StoredFieldVisitor.Status.STOP) {break;}}}// retrieve source (with possible transformations, e.g. source filterssource = fieldVisitor.source();}}// put stored fields into result objectsif (!fieldVisitor.fields().isEmpty()) {fieldVisitor.postProcess(mapperService::fieldType);documentFields = new HashMap<>();metadataFields = new HashMap<>();for (Map.Entry<String, List<Object>> entry : fieldVisitor.fields().entrySet()) {if (mapperService.isMetadataField(entry.getKey())) {metadataFields.put(entry.getKey(), new DocumentField(entry.getKey(), entry.getValue()));} else {documentFields.put(entry.getKey(), new DocumentField(entry.getKey(), entry.getValue()));}}}}if (source != null) {// apply request-level source filteringif (fetchSourceContext.fetchSource() == false) {source = null;} else if (fetchSourceContext.includes().length > 0 || fetchSourceContext.excludes().length > 0) {Map<String, Object> sourceAsMap;// TODO: The source might be parsed and available in the sourceLookup but that one uses unordered maps so different.//  Do we care?Tuple<XContentType, Map<String, Object>> typeMapTuple = XContentHelper.convertToMap(source, true);XContentType sourceContentType = typeMapTuple.v1();sourceAsMap = typeMapTuple.v2();sourceAsMap = XContentMapValues.filter(sourceAsMap, fetchSourceContext.includes(), fetchSourceContext.excludes());try {source = BytesReference.bytes(XContentFactory.contentBuilder(sourceContentType).map(sourceAsMap));} catch (IOException e) {throw new ElasticsearchException("Failed to get id [" + id + "] with includes/excludes set", e);}}}return new GetResult(shardId.getIndexName(), id, get.docIdAndVersion().seqNo, get.docIdAndVersion().primaryTerm,get.version(), get.exists(), source, documentFields, metadataFields);}

下面看一下InternalEngine的读取过程:

InternalEngine#get过程会加读锁。处理realtime选项,如果为true,则先判断是否有数据可以刷盘,然后调用Searcher进行读取。Searcher是对IndexSearcher的封装。

从ES 5.x开始不会从translog中读取,只从Lucene中读。realtime的实现机制变成依靠refresh实现。参考官方链接:https://github.com/elastic/elasticsearch/pull/20102

 @Overridepublic GetResult get(Get get, DocumentMapper mapper, Function<Engine.Searcher, Engine.Searcher> searcherWrapper) {assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();try (ReleasableLock ignored = readLock.acquire()) {ensureOpen();// 处理realtime选项,判断是否需要刷盘if (get.realtime()) {final VersionValue versionValue;// versionMap中的值是写入索引的时候添加的,不会写到磁盘中try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {// we need to lock here to access the version map to do this truly in RTversionValue = getVersionFromMap(get.uid().bytes());}if (versionValue != null) {if (versionValue.isDelete()) {return GetResult.NOT_EXISTS;}if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {throw new VersionConflictEngineException(shardId, get.id(),get.versionType().explainConflictForReads(versionValue.version, get.version()));}if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (get.getIfSeqNo() != versionValue.seqNo || get.getIfPrimaryTerm() != versionValue.term)) {throw new VersionConflictEngineException(shardId, get.id(),get.getIfSeqNo(), get.getIfPrimaryTerm(), versionValue.seqNo, versionValue.term);}if (get.isReadFromTranslog()) {// this is only used for updates - API _GET calls will always read form a reader for consistency// the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0if (versionValue.getLocation() != null) {try {final Translog.Operation operation = translog.readOperation(versionValue.getLocation());if (operation != null) {return getFromTranslog(get, (Translog.Index) operation, mapper, searcherWrapper);}} catch (IOException e) {maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic eventthrow new EngineException(shardId, "failed to read operation from translog", e);}} else {trackTranslogLocation.set(true);}}assert versionValue.seqNo >= 0 : versionValue;//执行刷盘操作refreshIfNeeded("realtime_get", versionValue.seqNo);}// 调用Searcher读取数据return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper));} else {// we expose what has been externally expose in a point in time snapshot via an explicit refreshreturn getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper));}}}

小结

  • GET是根据Document _id 哈希找到对应的shard的。
  • 根据Document _id查询的实时可见是通过依靠refresh实现的。

参考资料:
《Elasticsearch源码解析与优化实战》

【Elasticsearch源码】 GET分析相关推荐

  1. Elasticsearch源码分析—线程池(十一) ——就是从队列里处理请求

    Elasticsearch源码分析-线程池(十一) 转自:https://www.felayman.com/articles/2017/11/10/1510291570687.html 线程池 每个节 ...

  2. elasticsearch源码分析之search模块(server端)

    elasticsearch源码分析之search模块(server端) 继续接着上一篇的来说啊,当client端将search的请求发送到某一个node之后,剩下的事情就是server端来处理了,具体 ...

  3. elasticsearch源码分析之search模块(client端)

    elasticsearch源码分析之search模块(client端) 注意,我这里所说的都是通过rest api来做的搜索,所以对于接收到请求的节点,我姑且将之称之为client端,其主要的功能我们 ...

  4. 【Elasticsearch源码】CCR源码分析(一)

    1 CCR的基本概念 什么是CCR? CCR( cross-cluster replication):跨集群复制是ES 6.5发布的一个新的特性:可以将两个集群中的数据进行远程复制. 集群复制类似于数 ...

  5. 【Elasticsearch源码】CCR源码分析(二)

    接上一篇:[Elasticsearch源码]CCR源码分析(一). sendShardChangesRequest方法最终进入到ShardChangesAction.TransportAction#s ...

  6. 渣渣菜鸡的 ElasticSearch 源码解析 —— 启动流程(上)

    关注我 转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/08/11/es-code02/ 前提 上篇文章写了 ElasticSearch 源码解析 -- ...

  7. [1]elasticsearch源码编译

    本文是elasticsearch源码分析系列文档的第一篇,本篇简单介绍了elasticsearch源码在本机的编译环境搭建 用到的工具有:IntelliJ Idea,JDK1.8,gradle3.5, ...

  8. Mybatis底层原理学习(二):从源码角度分析一次查询操作过程

    在阅读这篇文章之前,建议先阅读一下我之前写的两篇文章,对理解这篇文章很有帮助,特别是Mybatis新手: 写给mybatis小白的入门指南 mybatis底层原理学习(一):SqlSessionFac ...

  9. 跟我学Kafka源码Producer分析

    2019独角兽企业重金招聘Python工程师标准>>> 跟我学Kafka源码Producer分析 博客分类: MQ 本章主要讲解分析Kafka的Producer的业务逻辑,分发逻辑和 ...

  10. MyBatis源码骨架分析

    源码包分析 MyBatis 源码下载地址:https://github.com/MyBatis/MyBatis-3 MyBatis源码导入过程: 下载MyBatis的源码 检查maven的版本,必须是 ...

最新文章

  1. Scala:Enumeration
  2. myeclipse按.自动提示方法
  3. Anaconda 查找安装的环境,以及移除旧环境命令
  4. 安全终止MFC线程全
  5. 为什么超长列表数据的翻页技术实现复杂(二)
  6. 卸载win10装Ubuntu笔记
  7. 1、两数之和(python)
  8. eclipse及tomcat设置编码
  9. Altium Designer 9 学习笔记(一)基础操作
  10. 基于java的cad_基于JavaCAD架构的安全性与IP保护研究
  11. Android adb shell命令详解及实例
  12. 怎么让模糊的数字变清楚_如何用ps将模糊图片变清晰?
  13. 穷举法破解密码-方法详解
  14. [转]我的故乡 - 冰心
  15. 14、UI_02拨号盘动画
  16. POJ,3713 Transferring Sylla
  17. 两轮自平衡车系统的模型构建
  18. mp4转gif在线转换,视频转换成gif动图怎么做?
  19. 编写优雅的JavaScript——前言
  20. 计组-booth乘法-谈原理和实现

热门文章

  1. 关于NPOI在word表格中插入行的问题
  2. htmlunit第一个爬虫演示 目标网址http://ent.sina.com.cn/film/
  3. macbook连接外接键盘
  4. 【建议收藏】 11个适合程序员逛的在线社区
  5. 计算机网络实验四协议分析心得,计算机网络原理实验_使用网络协议分析仪Wireshark...
  6. 项目管理的过程中如何做好沟通管理
  7. 基于JavaScript的Web端股票价格查看器——大道
  8. MIUI小米主题和迪士尼中国达成授权合作 推出专属定制主题
  9. dota2显示连接不上服务器没有响应,Win10登录不上dota2提示“无法与任何服务器建立连接”怎么办?...
  10. poi生成word转pdf