1.概述

转载并且补充:es 写入流程

读取这篇文章的时候先看看

【Elasticsearch】了解Elasticsearch写入磁盘的数据

然后再看看

【ES】ES 写入数据流程

写索引后端变化太大,等以后熟悉了再补充

Es写索引包括协调节点流程和节点写索引的流程

  1. 协调节点主要做索引的预处理、检查、分发任务

  2. 节点执行完后发发送给主分片所在节点,该节点把response发送给协调节点,协调节点发送给用户

2.源码分析

Elasticsearch构建请求有两种方式:

Restful API
Java Cliet API (或其他语言)

下面是两种请方式的实例(效果一样):

Restful API

POST /blog/blog/1
{"title":"我是一篇博客","content":"十九大终于闭幕了"
}

该restful 请求最终会被RestIndexAction处理,源码如下:

public class RestIndexAction extends BaseRestHandler {private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(RestDeleteAction.class));public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in document " +"index requests is deprecated, use the typeless endpoints instead (/{index}/_doc/{id}, /{index}/_doc, " +"or /{index}/_create/{id}).";@Overridepublic List<Route> routes() {return unmodifiableList(asList(new Route(POST, "/{index}/_doc/{id}"),new Route(PUT, "/{index}/_doc/{id}"),new Route(POST, "/{index}/{type}/{id}"),new Route(PUT, "/{index}/{type}/{id}")));}@Overridepublic String getName() {return "document_index_action";}public static final class CreateHandler extends RestIndexAction {@Overridepublic String getName() {return "document_create_action";}@Overridepublic List<Route> routes() {return unmodifiableList(asList(new Route(POST, "/{index}/_create/{id}"),new Route(PUT, "/{index}/_create/{id}"),new Route(POST, "/{index}/{type}/{id}/_create"),new Route(PUT, "/{index}/{type}/{id}/_create")));}@Overridepublic RestChannelConsumer prepareRequest(RestRequest request, final NodeClient client) throws IOException {validateOpType(request.params().get("op_type"));request.params().put("op_type", "create");return super.prepareRequest(request, client);}void validateOpType(String opType) {if (null != opType && false == "create".equals(opType.toLowerCase(Locale.ROOT))) {throw new IllegalArgumentException("opType must be 'create', found: [" + opType + "]");}}}public static final class AutoIdHandler extends RestIndexAction {private final Supplier<DiscoveryNodes> nodesInCluster;public AutoIdHandler(Supplier<DiscoveryNodes> nodesInCluster) {this.nodesInCluster = nodesInCluster;}@Overridepublic String getName() {return "document_create_action_auto_id";}@Overridepublic List<Route> routes() {return unmodifiableList(asList(new Route(POST, "/{index}/_doc"),new Route(POST, "/{index}/{type}")));}@Overridepublic RestChannelConsumer prepareRequest(RestRequest request, final NodeClient client) throws IOException {assert request.params().get("id") == null : "non-null id: " + request.params().get("id");if (request.params().get("op_type") == null && nodesInCluster.get().getMinNodeVersion().onOrAfter(Version.V_7_5_0)) {// default to op_type createrequest.params().put("op_type", "create");}return super.prepareRequest(request, client);}}@Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {IndexRequest indexRequest;// 获取请求中的的index,type,id,并构建一个IndexRequest,用于创建索引的请求final String type = request.param("type");if (type != null && type.equals(MapperService.SINGLE_MAPPING_NAME) == false) {deprecationLogger.deprecatedAndMaybeLog("index_with_types", TYPES_DEPRECATION_MESSAGE);indexRequest = new IndexRequest(request.param("index"), type, request.param("id"));} else {indexRequest = new IndexRequest(request.param("index"));indexRequest.id(request.param("id"));}// 获取请求中的routing参数indexRequest.routing(request.param("routing"));indexRequest.setPipeline(request.param("pipeline"));indexRequest.source(request.requiredContent(), request.getXContentType());indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));indexRequest.setRefreshPolicy(request.param("refresh"));indexRequest.version(RestActions.parseVersion(request));indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));indexRequest.setIfSeqNo(request.paramAsLong("if_seq_no", indexRequest.ifSeqNo()));indexRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", indexRequest.ifPrimaryTerm()));String sOpType = request.param("op_type");String waitForActiveShards = request.param("wait_for_active_shards");if (waitForActiveShards != null) {indexRequest.waitForActiveShards(ActiveShardCount.parseString(waitForActiveShards));}if (sOpType != null) {indexRequest.opType(sOpType);}return channel ->client.index(indexRequest, new RestStatusToXContentListener<>(channel, r -> r.getLocation(indexRequest.routing())));}}

然后主要看代码

channel ->client.index(

这里会调用到

org.elasticsearch.client.support.AbstractClient#index(org.elasticsearch.action.index.IndexRequest, org.elasticsearch.action.ActionListener<org.elasticsearch.action.index.IndexResponse>)@Overridepublic void index(final IndexRequest request, final ActionListener<IndexResponse> listener) {execute(IndexAction.INSTANCE, request, listener);}@Overridepublic final <Request extends ActionRequest, Response extends ActionResponse> void execute(ActionType<Response> action, Request request, ActionListener<Response> listener) {listener = threadedWrapper.wrap(listener);doExecute(action, request, listener);}

参考:Elasticsearch源码分析—索引写入模型以及分析(十二).md

【es】es 写入流程相关推荐

  1. Elasticsearch专栏-7.es底层写入原理

    es底层写入原理 概念说明 es数据落盘过程 mysql数据落盘过程 redis数据落盘过程 概念说明 在第一章节中,已经提到过几个名词:lucence.segment.translog.refres ...

  2. 【Elasticsearch】Elasticsearch 优化写入流程实现NRT近实时(filesystem cache,refresh)

    现有流程的问题,每次都必须等待fsync将segment刷入磁盘,才能将segment打开供search使用,这样的话,从一个document写入,到它可以被搜索,可能会超过1分钟!!!这就不是近实时 ...

  3. flume源码学习8-hdfs sink的具体写入流程

    上一篇说了HDFSEventSink的实现,这里根据hdfs sink的配置和调用分析来看下sink中整个hdfs数据写入的过程: 线上hdfs sink的几个重要设置 1 2 3 4 5 6 7 8 ...

  4. HBase - 数据写入流程解析

    本文由  网易云 发布. 作者:范欣欣 本篇文章仅限内部分享,如需转载,请联系网易获取授权. 众所周知,HBase默认适用于写多读少的应用,正是依赖于它相当出色的写入性能:一个100台RS的集群可以轻 ...

  5. RocksDB 写入流程详解

    摘要: 最初的写入流程,继承自 leveldb,多个 写线程组成一个 group, leader 负责 group 的 WAL 及 memtable 的提交,提交完后唤醒所有的 follwer,向上层 ...

  6. hfds_HFDS的数据写入流程

    1.HFDS的数据写入流程的基本参数 首先了解数据写入过程中,什么是block, packet, chunk 1.block:数据块,当上传的文件太大时, 就需要分块,一个块默认设置时128M, 在客 ...

  7. influxdb 插入数据_Influxdb 数据写入流程

    数据写入流程分析 本篇不涉及存储层的写入,只分析写入请求的处理流程 Influxdb名词介绍 如果想搞清楚Influxdb数据写入流程,Influxdb本身的用法和其一些主要的专用词还是要明白是什么意 ...

  8. ES索引恢复流程解析

    文章目录 背景 主分片恢复流程 INIT阶段 INDEX阶段 VERIFY_INDEX阶段 TRANSLOG阶段 FINALIZE阶段 DONE阶段 副分片恢复流程 流程概述 副分片节点处理过程 IN ...

  9. MySQL数据以全量和增量方式,向ES搜索引擎同步流程

    本文源码:GitHub·点这里 || GitEE·点这里 一.配置详解 场景描述:MySQL数据表以全量和增量的方式向ElasticSearch搜索引擎同步. 1.下载内容 elasticsearch ...

最新文章

  1. vscode 终端 进入node_安装了Node.js 从VScode 使用node -v 和 npm -v等命令却无效
  2. BAT-使用BAT生成快捷方式
  3. Hadoop学习笔记—4.初识MapReduce
  4. 安装cv2(opencv-python)遇到的问题
  5. Docker的容器运行时组件Containerd
  6. python写选择排序_如何快速掌握python选择排序算法?
  7. java中怎么删除多表连接_在Java中从多个列表中合并和删除重复的最佳方式
  8. 英特尔自动驾驶部门秘密申请IPO 有望成今年美股规模最大IPO
  9. 蓝桥杯嵌入式板-解决LCD使LED亮灭混乱的办法
  10. Linux(2) vi和vim编辑器
  11. andorid之帧布局FrameLayout
  12. C++ 使用正则表达式拆分字符串
  13. setitime和相关函数
  14. 机器学习笔记(十三):主成分分析法(PCA)
  15. 图解OAuth 2.0协议族(一):授权码 auth code
  16. 2016最新版App Store应用审核指南完整版
  17. Project2007工具栏没有Pert分析按钮
  18. 第3讲 移动通信技术
  19. Python win32gui.ShowWindow() 窗口没弹出来解决方法
  20. 2021-09-12 Autodesk inventor 技巧整理

热门文章

  1. Uber CEO亲自体验送外卖:三个半小时挣了106美元
  2. 华为P50 Pro外形首曝:新造型,头次见!
  3. 刘慈欣、Netflix联手!《三体》系列将拍摄剧集,但编剧被网友疯狂吐槽
  4. 交钱赎“人”!B站500万粉UP主被黑客勒索,腾讯都表示无解
  5. iPhone 12 Pro最新概念图:五摄相机模组也带下巴
  6. 曾经辉煌无限,如今员工持续大量流失,集团目前仅剩10余人
  7. 买iPhone 11的要不再等等?iPhone 12首曝:全系5G,回归经典造型
  8. iQOO骑士黑版本四月亮相:搭载骁龙855+12G运存
  9. 史上最奢华AirPods登场 售价直接翻四倍却还算良心
  10. 自如被起诉索赔了!因装修致邻居家房屋漏水...