elasticsearch-1.3.0

发送请求
创建

[root@centos ~]# curl -XPUT 172.16.136.159:9200/customer?pretty
{"acknowledged" : true
}

索引

[root@centos ~]# curl -XPUT 172.16.136.159:9200/customer/external/1?pretty '-d { "name":"JOhn Doe"}'
{"_index" : "customer","_type" : "external","_id" : "1","_version" : 1,"created" : true
}
[root@centos ~]# curl -XPUT 172.16.136.159:9200/customer/external/1?pretty '-d { "name":"JOhn Doe"}'
{"_index" : "customer","_type" : "external","_id" : "1","_version" : 2,"created" : false
}

这里先跟踪下索引的流程,netty的bootstrap暂且不管,从HttpRequestHandler的messageReceived说起

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {HttpRequest request = (HttpRequest) e.getMessage();// the netty HTTP handling always copy over the buffer to its own buffer, either in NioWorker internally// when reading, or using a cumalation bufferNettyHttpRequest httpRequest = new NettyHttpRequest(request, e.getChannel());serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, e.getChannel(), httpRequest));super.messageReceived(ctx, e);
}

这里的dispatchRequest启示就是NettyHttpServerTransport
NettyHttpServerTransport

void dispatchRequest(HttpRequest request, HttpChannel channel) {httpServerAdapter.dispatchRequest(request, channel);
}

Dispatcher,static class Dispatcher implements HttpServerAdapter

public void dispatchRequest(HttpRequest request, HttpChannel channel) {server.internalDispatchRequest(request, channel);
}

HttpServer

public void internalDispatchRequest(final HttpRequest request, final HttpChannel channel) {if (request.rawPath().startsWith("/_plugin/")) {RestFilterChain filterChain = restController.filterChain(pluginSiteFilter);filterChain.continueProcessing(request, channel);return;}restController.dispatchRequest(request, channel);
}

RestController的dispatchRequest()主要是executeHandler()

try {executeHandler(request, channel);
} catch (Throwable e) {

executeHandler方法中不同的handler处理请求,这里的handler是RestIndexAction,继承自

final RestHandler handler = getHandler(request);
if (handler != null) {handler.handleRequest(request, channel);
}

在BaseRestHandler中

public final void handleRequest(RestRequest request, RestChannel channel) throws Exception {handleRequest(request, channel, usefulHeaders.length == 0 ? client : new HeadersCopyClient(client, request, usefulHeaders));
}
protected abstract void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception;

实现父类在BaseRestHandler的handleRequest方法,最后调用NodeClient的index方法
client.index(indexRequest, new RestBuilderListener<IndexResponse>(channel) {
NodeClient的父类AbstractClient中index的实现

public void index(final IndexRequest request, final ActionListener<IndexResponse> listener) {execute(IndexAction.INSTANCE, request, listener);
}

NodeClient中的execute方法实现

public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder, Client>> void execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener) {TransportAction<Request, Response> transportAction = actions.get((ClientAction)action);transportAction.execute(request, listener);//TransportIndexAction extends TransportShardReplicationOperationAction
}

这里的transportAction是TransportIndexAction,
其中TransportShardReplicationOperationAction是TransportIndexAction的父类,中TransportShardReplicationOperationAction继承自TransportAction,TransportAction中execute的实现

public void execute(Request request, ActionListener<Response> listener) {if (request.listenerThreaded()) {listener = new ThreadedActionListener<>(threadPool, listener, logger);}ActionRequestValidationException validationException = request.validate();if (validationException != null) {listener.onFailure(validationException);return;}try {doExecute(request, listener);} catch (Throwable e) {logger.trace("Error during transport action execution.", e);listener.onFailure(e);}
}

直接调用TransportIndexAction的doExecute

 protected void doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index APIif (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {request.beforeLocalFork(); // we fork on another thread...createIndexAction.execute(new CreateIndexRequest(request.index()).cause("auto(index api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {@Overridepublic void onResponse(CreateIndexResponse result) {innerExecute(request, listener);}@Overridepublic void onFailure(Throwable e) {if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {// we have the index, do ittry {innerExecute(request, listener);} catch (Throwable e1) {listener.onFailure(e1);}} else {listener.onFailure(e);}}});} else {innerExecute(request, listener);}}

这里走 innerExecute(request, listener);

private void innerExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {super.doExecute(request, listener);
}

这里的super就是TransportShardReplicationOperationAction了,TransportShardReplicationOperationAction中doExecute的实现

protected void doExecute(Request request, ActionListener<Response> listener) {new AsyncShardOperationAction(request, listener).start();
}

主要两个方法,一个是获取shard,另一个是shardOperationOnPrimary;
其中shard后边再说,shardOperationOnPrimary在TransportIndexAction实现

protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {final IndexRequest request = shardRequest.request;// validate, if routing is required, that we got routingIndexMetaData indexMetaData = clusterState.metaData().index(request.index());MappingMetaData mappingMd = indexMetaData.mappingOrDefault(request.type());if (mappingMd != null && mappingMd.routing().required()) {if (request.routing() == null) {throw new RoutingMissingException(request.index(), request.type(), request.id());}}IndexService indexService = indicesService.indexServiceSafe(shardRequest.request.index());IndexShard indexShard = indexService.shardSafe(shardRequest.shardId);SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.source()).type(request.type()).id(request.id()).routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());long version;boolean created;Engine.IndexingOperation op;if (request.opType() == IndexRequest.OpType.INDEX) {Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates());if (index.parsedDoc().mappingsModified()) {mappingUpdatedAction.updateMappingOnMaster(request.index(), index.docMapper(), indexService.indexUUID());}indexShard.index(index);version = index.version();op = index;created = index.created();} else {Engine.Create create = indexShard.prepareCreate(sourceToParse,request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY, request.canHaveDuplicates(), request.autoGeneratedId());if (create.parsedDoc().mappingsModified()) {mappingUpdatedAction.updateMappingOnMaster(request.index(), create.docMapper(), indexService.indexUUID());}indexShard.create(create);version = create.version();op = create;created = true;}if (request.refresh()) {try {indexShard.refresh(new Engine.Refresh("refresh_flag_index").force(false));} catch (Throwable e) {// ignore}}// update the version on the request, so it will be used for the replicasrequest.version(version);request.versionType(request.versionType().versionTypeForReplicationAndRecovery());assert request.versionType().validateVersionForWrites(request.version());IndexResponse response = new IndexResponse(request.index(), request.type(), request.id(), version, created);return new PrimaryResponse<>(shardRequest.request, response, op);
}

走request.opType() == IndexRequest.OpType.INDEX分支,主要是indexShard.prepareIndex,indexShard.index(index)这里IndexShard是InternalIndexShard,的index实现

public ParsedDocument index(Engine.Index index) throws ElasticsearchException {writeAllowed(index.origin());index = indexingService.preIndex(index);try {if (logger.isTraceEnabled()) {logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());}engine.index(index);index.endTime(System.nanoTime());} catch (RuntimeException ex) {indexingService.failedIndex(index);throw ex;}indexingService.postIndex(index);return index.parsedDoc();
}

indexingService对应ShardIndexingService, engine是InternalEngine,InternalEngine的index()

public void index(Index index) throws EngineException {final IndexWriter writer;try (InternalLock _ = readLock.acquire()) {writer = currentIndexWriter();try (Releasable r = throttle.acquireThrottle()) {innerIndex(index, writer);}dirty = true;possibleMergeNeeded = true;flushNeeded = true;} catch (OutOfMemoryError | IllegalStateException | IOException t) {maybeFailEngine(t, "index");throw new IndexFailedEngineException(shardId, index, t);}checkVersionMapRefresh();
}

最终在InternalEngine的innerIndex方法中调用lunece的IndexWriter的,依据是不是存在有版本,来通过 writer.addDocuments或者updateDocument方法添加或者更新索引
添加add索引

if (index.docs().size() > 1) {writer.addDocuments(index.docs(), index.analyzer());
} else {writer.addDocument(index.docs().get(0), index.analyzer());
}

更新update索引

if (index.docs().size() > 1) {writer.updateDocuments(index.uid(), index.docs(), index.analyzer());
} else {writer.updateDocument(index.uid(), index.docs().get(0), index.analyzer());
}

最后Translog

Translog.Location translogLocation = translog.add(new Translog.Index(index));

具体代码

private void innerIndex(Index index, IndexWriter writer) throws IOException {synchronized (dirtyLock(index.uid())) {final long currentVersion;VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());if (versionValue == null) {currentVersion = loadCurrentVersionFromIndex(index.uid());} else {if (enableGcDeletes && versionValue.delete() && (threadPool.estimatedTimeInMillis() - versionValue.time()) > gcDeletesInMillis) {currentVersion = Versions.NOT_FOUND; // deleted, and GC} else {currentVersion = versionValue.version();}}long updatedVersion;long expectedVersion = index.version();if (index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion)) {if (index.origin() == Operation.Origin.RECOVERY) {return;} else {throw new VersionConflictEngineException(shardId, index.type(), index.id(), currentVersion, expectedVersion);}}updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);index.updateVersion(updatedVersion);if (currentVersion == Versions.NOT_FOUND) {// document does not exists, we can optimize for createindex.created(true);if (index.docs().size() > 1) {writer.addDocuments(index.docs(), index.analyzer());} else {writer.addDocument(index.docs().get(0), index.analyzer());}} else {if (versionValue != null) {index.created(versionValue.delete()); // we have a delete which is not GC'ed...}if (index.docs().size() > 1) {writer.updateDocuments(index.uid(), index.docs(), index.analyzer());} else {writer.updateDocument(index.uid(), index.docs().get(0), index.analyzer());}}Translog.Location translogLocation = translog.add(new Translog.Index(index));versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, translogLocation));indexingService.postIndexUnderLock(index);}
}

link
分布式搜索Elasticsearch源码分析之二------索引过程源码概要分析

转载于:https://www.cnblogs.com/donganwangshi/p/4318045.html

elasticsearch-1.3.0 之索引代码粗略梳理相关推荐

  1. Elasticsearch 2.3.0 重建索引

    2019独角兽企业重金招聘Python工程师标准>>> 重建索引是2.3.0新增加的接口.这个接口是实验性质的,在未来有可能会改变. 重建索引的最基本的功能是拷贝文件从一个索引到另一 ...

  2. Elasticsearch 7.4.0官方文档操作

    官方文档地址 https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html 1.0.0 设置Elasticsea ...

  3. ElasticSearch Java Api(二) -检索索引库

    一.准备数据 String data1 = JsonUtil.model2Json(new Blog(1, "git简介", "2016-06-19", &qu ...

  4. Elasticsearch:运用 Java 对索引文档进行搜索

    这是这个系列文章中的其中一篇文章: Elasticsearch:运用 Java 创建索引并写入数据 Elasticsearch:运用 Java 更新 Elasticsearch 文档 Elastics ...

  5. Elasticsearch:运用 Java 创建索引并写入数据

    在我之前的文章 "Elasticsearch:Java 运用示例",我讲述了如何在 Java 应用中创建一个索引,并写入数据.在今天的例子中,我来着重讲述如何有目的地创建按照我们需 ...

  6. Elasticsearch 7.8.1 创建索引,IK分词器的使用

    前置知识来自:图解Elasticsearch中的_source._all.store和index属性_1.02^365=1377.41 (Lucene.ES.ELK开发交流群: 370734940, ...

  7. Elasticsearch——分布式搜索引擎01(索引库、文档、RestAPI、RestClient、拼音分词器、IK分词器)

    Elasticsearch--分布式搜索引擎01(索引库.文档.RestAPI.RestClient.拼音分词器.IK分词器) 一.初识 elesticsearch 1.1 简介 1.2 倒排索引(重 ...

  8. 详述 Elasticsearch 通过范围条件查询索引数据的方法

    文章目录 情景 查询方法 通过命令实现范围查询 通过 API 实现范围查询 情景 在使用 Elasticsearch 的时候,我们可能会遇到需要以范围为条件查询索引数据的需求.有两种方法可以实现我们的 ...

  9. ElasticSearch + Kibana + logstash+ik结合springboot代码实现,比较ES和传统Mysql查询效率

    开发环境:Win10 开发环境:STS 概要:此篇文章主要是传统的Mysql查询和ES查询两种方式的效率比较,以及代码实现,另外使用logstash进行mysql数据的同步也可以直接理解为" ...

最新文章

  1. ALD对照CVD淀积技术的优势
  2. 百度要造什么车?汽车依然逃不出 BAT 的手掌心 | 撩车
  3. 利用 TensorFlow 实现上下文的 Chat-bots
  4. fwrite视频写入帧率测试(不用测了。。)
  5. linux怎么知道ping命令,教程方法;通过ping命令查看服务器类型(linux还是windows系列)电脑技巧-琪琪词资源网...
  6. jenkins 远程启动tomcat报错:Neither the JAVA_HOME nor the JRE_HOME environment variable is defined
  7. 信息学奥赛一本通 1890:【15NOIP提高组】跳石头 | 洛谷 P2678 [NOIP2015 提高组] 跳石头
  8. 编译原理 实验四 LR(1)分析法程序
  9. ZYNQ BRAM_ctrl PS PL通信报错
  10. PHP地图规划骑行路径,规划结果 + 骑行路线绘制
  11. 《拆掉思维里的墙》—— 读后总结
  12. SSM+图书馆电子文件资源管理 毕业设计-附源码191614
  13. E280 P0410故障修复
  14. SQLServer·面试题
  15. 有道云笔记的markdown编辑器如何通过mathtype来写公式
  16. 基于深度学习的人脸识别系统:卷积神经网络实现(VIPLFaceNet、VGGNet、Xception、ResNet50、ResNet18)、AM-Softmax损失
  17. Windows XP自动登录 auto login
  18. 更改Tomcat webapps目录
  19. 学生档案复习前面HTML5新增标签/属性/input属性
  20. P1618 三连击(升级版)C语言

热门文章

  1. Mac 环境 下使用Charles 抓包Http/Https请求
  2. 眼科裂隙灯是否伤眼?
  3. Ellex激光器参数与激光消融手术风险的关系
  4. 如何从心理上缓解对浑浊物的恐惧?
  5. ubuntu让/etc/hosts修改后立刻生效
  6. leetcode:图相关算法
  7. oracle xml文件是什么文件,介绍关于Oracle下存取XML格式数据的方式
  8. php 工厂静态类,静态工厂模式(Static Factory)
  9. 忽如一夜冬风来,团队忽然就解散了
  10. linux每日命令(17):which命令