带着疑问学源码,第三篇:Elasticsearch 更新性能
代码分析基于:https://github.com/jiankunking/elasticsearch
Elasticsearch 7.10.2+

目的

在看源码之前先梳理一下,自己对于更新疑惑的点:
为什么Elasticsearch更新与写入的性能会有比较大的差异?

源码分析

建议先看一下:【Elasticsearch源码】 写入分析

在【Elasticsearch源码】 写入分析中可以看到bulk请求最终在TransportShardBulkAction doRun()中执行的时候,还是通过一个循环,一个一个处理的,并没有什么神奇之处。

下面看一下具体执行的代码executeBulkItemRequest doRun():

     /*** Executes bulk item requests and handles request execution exceptions.* @return {@code true} if request completed on this thread and the listener was invoked, {@code false} if the request triggered*                      a mapping update that will finish and invoke the listener on a different thread*/static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier,MappingUpdatePerformer mappingUpdater, Consumer<ActionListener<Void>> waitForMappingUpdate,ActionListener<Void> itemDoneListener) throws Exception {final DocWriteRequest.OpType opType = context.getCurrent().opType();final UpdateHelper.Result updateResult;if (opType == DocWriteRequest.OpType.UPDATE) {final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent();try {// updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier);} catch (Exception failure) {// we may fail translating a update to index or delete operation// we use index result to communicate failure while translating update requestfinal Engine.Result result =new Engine.IndexResult(failure, updateRequest.version());context.setRequestToExecute(updateRequest);context.markOperationAsExecuted(result);context.markAsCompleted(context.getExecutionResult());return true;}// execute translated update requestswitch (updateResult.getResponseResult()) {case CREATED:case UPDATED:IndexRequest indexRequest = updateResult.action();IndexMetadata metadata = context.getPrimary().indexSettings().getIndexMetadata();MappingMetadata mappingMd = metadata.mapping();indexRequest.process(metadata.getCreationVersion(), mappingMd, updateRequest.concreteIndex());context.setRequestToExecute(indexRequest);break;case DELETED:context.setRequestToExecute(updateResult.action());break;case NOOP:context.markOperationAsNoOp(updateResult.action());context.markAsCompleted(context.getExecutionResult());return true;default:throw new IllegalStateException("Illegal update operation " + updateResult.getResponseResult());}} else {context.setRequestToExecute(context.getCurrent());updateResult = null;}assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED statefinal IndexShard primary = context.getPrimary();final long version = context.getRequestToExecute().version();final boolean isDelete = context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE;final Engine.Result result;if (isDelete) {final DeleteRequest request = context.getRequestToExecute();result = primary.applyDeleteOperationOnPrimary(version, request.id(), request.versionType(),request.ifSeqNo(), request.ifPrimaryTerm());} else {final IndexRequest request = context.getRequestToExecute();result = primary.applyIndexOperationOnPrimary(version, request.versionType(), new SourceToParse(request.index(), request.id(), request.source(), request.getContentType(), request.routing()),request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry());}if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {try {primary.mapperService().merge(MapperService.SINGLE_MAPPING_NAME,new CompressedXContent(result.getRequiredMappingUpdate(), XContentType.JSON, ToXContent.EMPTY_PARAMS),MapperService.MergeReason.MAPPING_UPDATE_PREFLIGHT);} catch (Exception e) {logger.info(() -> new ParameterizedMessage("{} mapping update rejected by primary", primary.shardId()), e);onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult);return true;}mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(),new ActionListener<>() {@Overridepublic void onResponse(Void v) {context.markAsRequiringMappingUpdate();waitForMappingUpdate.accept(ActionListener.runAfter(new ActionListener<>() {@Overridepublic void onResponse(Void v) {assert context.requiresWaitingForMappingUpdate();context.resetForExecutionForRetry();}@Overridepublic void onFailure(Exception e) {context.failOnMappingUpdate(e);}}, () -> itemDoneListener.onResponse(null)));}@Overridepublic void onFailure(Exception e) {onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult);// Requesting mapping update failed, so we don't have to wait for a cluster state updateassert context.isInitial();itemDoneListener.onResponse(null);}});return false;} else {onComplete(result, context, updateResult);}return true;}/*** Prepares an update request by converting it into an index or delete request or an update response (no action).*/public Result prepare(UpdateRequest request, IndexShard indexShard, LongSupplier nowInMillis) {// 这里是实时获取// 获取结果最终会到InternalEngine // get(Get get, DocumentMapper mapper, Function<Engine.Searcher, Engine.Searcher> searcherWrapper)// 后面会附上 代码final GetResult getResult = indexShard.getService().getForUpdate(request.id(), request.ifSeqNo(), request.ifPrimaryTerm());return prepare(indexShard.shardId(), request, getResult, nowInMillis);}public GetResult getForUpdate(String id, long ifSeqNo, long ifPrimaryTerm) {// realtime是truereturn get(id, new String[]{RoutingFieldMapper.NAME}, true,Versions.MATCH_ANY, VersionType.INTERNAL, ifSeqNo, ifPrimaryTerm, FetchSourceContext.FETCH_SOURCE);}private GetResult get(String id, String[] gFields, boolean realtime, long version, VersionType versionType,long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) {currentMetric.inc();try {long now = System.nanoTime();GetResult getResult =innerGet(id, gFields, realtime, version, versionType, ifSeqNo, ifPrimaryTerm, fetchSourceContext);if (getResult.isExists()) {existsMetric.inc(System.nanoTime() - now);} else {missingMetric.inc(System.nanoTime() - now);}return getResult;} finally {currentMetric.dec();}}private GetResult innerGet(String id, String[] gFields, boolean realtime, long version, VersionType versionType,long ifSeqNo, long ifPrimaryTerm, FetchSourceContext fetchSourceContext) {fetchSourceContext = normalizeFetchSourceContent(fetchSourceContext, gFields);Engine.GetResult get = indexShard.get(new Engine.Get(realtime, realtime, id).version(version).versionType(versionType).setIfSeqNo(ifSeqNo).setIfPrimaryTerm(ifPrimaryTerm));assert get.isFromTranslog() == false || realtime : "should only read from translog if realtime enabled";if (get.exists() == false) {get.close();}if (get == null || get.exists() == false) {return new GetResult(shardId.getIndexName(), id, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, -1, false, null, null, null);}try {// break between having loaded it from translog (so we only have _source), and having a document to loadreturn innerGetLoadFromStoredFields(id, gFields, fetchSourceContext, get, mapperService);} finally {get.close();}}public Engine.GetResult get(Engine.Get get) {readAllowed();DocumentMapper mapper = mapperService.documentMapper();if (mapper == null) {return GetResult.NOT_EXISTS;}return getEngine().get(get, mapper, this::wrapSearcher);}/*** Prepares an update request by converting it into an index or delete request or an update response (no action, in the event of a* noop).*/protected Result prepare(ShardId shardId, UpdateRequest request, final GetResult getResult, LongSupplier nowInMillis) {if (getResult.isExists() == false) {// If the document didn't exist, execute the update request as an upsertreturn prepareUpsert(shardId, request, getResult, nowInMillis);} else if (getResult.internalSourceRef() == null) {// no source, we can't do anything, throw a failure...throw new DocumentSourceMissingException(shardId, request.id());} else if (request.script() == null && request.doc() != null) {// The request has no script, it is a new doc that should be merged with the old documentreturn prepareUpdateIndexRequest(shardId, request, getResult, request.detectNoop());} else {// The request has a script (or empty script), execute the script and prepare a new index requestreturn prepareUpdateScriptRequest(shardId, request, getResult, nowInMillis);}}

其中,prepare在org/elasticsearch/action/update/UpdateHelper.java 中。

从代码中可以看到更新逻辑分两步:

  • 获取待更新文档的数据
  • 执行更新文档的操作

第1步最终会调用InternalEngine中的get方法。代码如下:

    @Overridepublic GetResult get(Get get, DocumentMapper mapper, Function<Engine.Searcher, Engine.Searcher> searcherWrapper) {assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();try (ReleasableLock ignored = readLock.acquire()) {ensureOpen();// 是否实时获取if (get.realtime()) {final VersionValue versionValue;try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {// we need to lock here to access the version map to do this truly in RTversionValue = getVersionFromMap(get.uid().bytes());}if (versionValue != null) {if (versionValue.isDelete()) {return GetResult.NOT_EXISTS;}if (get.versionType().isVersionConflictForReads(versionValue.version, get.version())) {throw new VersionConflictEngineException(shardId, get.id(),get.versionType().explainConflictForReads(versionValue.version, get.version()));}if (get.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO && (get.getIfSeqNo() != versionValue.seqNo || get.getIfPrimaryTerm() != versionValue.term)) {throw new VersionConflictEngineException(shardId, get.id(),get.getIfSeqNo(), get.getIfPrimaryTerm(), versionValue.seqNo, versionValue.term);}// 是否从Translog获取if (get.isReadFromTranslog()) {// this is only used for updates - API _GET calls will always read form a reader for consistency// the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0if (versionValue.getLocation() != null) {try {final Translog.Operation operation = translog.readOperation(versionValue.getLocation());if (operation != null) {return getFromTranslog(get, (Translog.Index) operation, mapper, searcherWrapper);}} catch (IOException e) {maybeFailEngine("realtime_get", e); // lets check if the translog has failed with a tragic eventthrow new EngineException(shardId, "failed to read operation from translog", e);}} else {trackTranslogLocation.set(true);}}assert versionValue.seqNo >= 0 : versionValue;refreshIfNeeded("realtime_get", versionValue.seqNo);}return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper));} else {// we expose what has been externally expose in a point in time snapshot via an explicit refreshreturn getFromSearcher(get, acquireSearcher("get", SearcherScope.EXTERNAL, searcherWrapper));}}}

总结

update操作需要先获取原始文档,如果查询不到,会新增;如果存在,会根据原始文档更新。

虽然更新操作最终调用的方法也是InternalEngine中的index,但在更新时调用lucene softUpdateDocuments,会包含两个操作:标记删除、新增。

相对于新增而言:

  • 多了一次完整的查询(为了保证一致性,update调用GET时将realtime选项设置为true,并且不可配置。因此update操作可能会导致refresh生成新的Lucene分段。)
  • 多了一个标记删除

如果数据量比较大,操作又比较频繁的情况下,update这种操作还是要慎重。

【Elasticsearch源码】 更新性能分析相关推荐

  1. 【Elasticsearch源码】 写入分析

    带着疑问学源码,第一篇:Elasticsearch写入 代码分析基于:https://github.com/jiankunking/elasticsearch Elasticsearch 7.10.2 ...

  2. 【Elasticsearch源码】 检索分析

    带着疑问学源码,第二篇:Elasticsearch 搜索 代码分析基于:https://github.com/jiankunking/elasticsearch Elasticsearch 7.10. ...

  3. 【Elasticsearch源码】CCR源码分析(一)

    1 CCR的基本概念 什么是CCR? CCR( cross-cluster replication):跨集群复制是ES 6.5发布的一个新的特性:可以将两个集群中的数据进行远程复制. 集群复制类似于数 ...

  4. 【Elasticsearch源码】CCR源码分析(二)

    接上一篇:[Elasticsearch源码]CCR源码分析(一). sendShardChangesRequest方法最终进入到ShardChangesAction.TransportAction#s ...

  5. Elasticsearch源码分析—线程池(十一) ——就是从队列里处理请求

    Elasticsearch源码分析-线程池(十一) 转自:https://www.felayman.com/articles/2017/11/10/1510291570687.html 线程池 每个节 ...

  6. elasticsearch源码分析之search模块(server端)

    elasticsearch源码分析之search模块(server端) 继续接着上一篇的来说啊,当client端将search的请求发送到某一个node之后,剩下的事情就是server端来处理了,具体 ...

  7. elasticsearch源码分析之search模块(client端)

    elasticsearch源码分析之search模块(client端) 注意,我这里所说的都是通过rest api来做的搜索,所以对于接收到请求的节点,我姑且将之称之为client端,其主要的功能我们 ...

  8. JAVA源码优化、分析工具

    JAVA源码优化.分析工具 一.11款用于优化.分析源代码的Java工具 1. PMD from http://pmd.sourceforge.net/ PMD能够扫描Java 源代码,查找类似以下的 ...

  9. ArrayList源码扩容机制分析

    ArrayList源码&扩容机制分析 发上等愿,结中等缘,享下等福 文章目录 ArrayList源码&扩容机制分析 1. ArrayList 简介 1.1. Arraylist 和 V ...

  10. 渣渣菜鸡的 ElasticSearch 源码解析 —— 启动流程(上)

    关注我 转载请务必注明原创地址为:http://www.54tianzhisheng.cn/2018/08/11/es-code02/ 前提 上篇文章写了 ElasticSearch 源码解析 -- ...

最新文章

  1. 今天是我“略懂”Python的第200天,我遇到了新刺激……
  2. sws_scale sws_getContext
  3. python项目归纳总结-【第108期】总结最近项目中常用的几个Python库
  4. 从零开始学python数据分析-从零开始学Python数据分析(视频教学版)
  5. 使用parted命令对硬盘进行操作
  6. @echo off是什么意思_为什么执行自己的程序要在前面加./
  7. 【bzoj3033】太鼓达人 DFS欧拉图
  8. SparkSQL 之 Shuffle Join 内核原理及应用深度剖析-Spark商业源码实战
  9. php html邮件,php发送HTML邮件
  10. 20000W的电灯泡,真的是叼炸天
  11. 使用ajaxfileupload.js上传文件成功之后,没有执行success方法
  12. Unity简单实现调用电脑打印机打印图片功能
  13. 计算机械效率的公式四种,物理计算公式;
  14. 光影学习 - 三点光照
  15. leetcode刷题规划
  16. 【ybt金牌导航2-3-3】【luogu P3975】K小子串 / 弦论
  17. lnmp架构的工作原理
  18. 李炎恢老师PHP第三季视频课程(设计模式+MVC模式+SMARTY+在线商城)
  19. 【全栈开发实战小草看书之开篇】
  20. IDC发布2020年中国电子签名软件市场份额

热门文章

  1. 国内外CDN服务商CNAME特征串调研
  2. sqlite创建表格
  3. Linux制作U盘Windows启动盘,windows下制作linux U盘启动盘或者安装优盘(转)
  4. HTML中img路径问题
  5. iOS——结构体指针
  6. OpenCV的车道线检测
  7. 均值滤波器类型_[数字图像处理]图像去噪初步(1)--均值滤波器
  8. echarts双柱_R+Echarts画双坐标轴折柱混合图
  9. SI24R1切换收发模式问题调试与解决
  10. 行业大数据技术发展趋势