Action这一部分主要是数据(索引)的操作和部分集群信息操作。 所有的请求通过client转发到对应的action上然后再由对应的TransportAction来执行相关请求。如果请求能在本机上执行则在本机上执行,否则使用Transport进行转发到对应的节点。action support部分是对action的抽象,所有的具体action都继承了support action中的某个类。这里将对这些抽象类进行分析。

这一部分总共分为broadcast(广播),master,nodes,replication及single几个部分。broadcast主要针对一些无具体目标主机的操作,如查询index是否存在,所有继承这个类的action都具有这种类似的性质;nodes主要是对节点的操作,如热点线程查询(hotThread)查询节点上的繁忙线程;replication的子类主要是需要或可以在副本上进行的操作,如索引操作,数据不仅要发送到主shard还要发送到各个副本。single则主要是目标明确的单shard操作,如get操作,根据doc的id取doc,doc 的id能够确定它在哪个shard上,因此操作也在此shard上执行。

这些support action的实现可以分为两类,第一类就是实现一个内部类作为异步操作器,子类执行doExecute时,初始化该操作器并启动。另外一种就是直接实现一个方法,子类doExecute方法调用该方法进行。TransportBroadcastOperationAction就属于前者,它实现了内部操作器AsyncBroadcastAction。TransportCountAction继承于它,它doExecute方法如下所示:

 @Overrideprotected void doExecute(CountRequest request, ActionListener<CountResponse> listener) {request.nowInMillis = System.currentTimeMillis();super.doExecute(request, listener);}

调用父类的doExecute方法,也就是TransportBroadcastOperationAction的方法,它的实现如下所示:

 @Overrideprotected void doExecute(Request request, ActionListener<Response> listener) {new AsyncBroadcastAction(request, listener).start();}

可以看到它初始化了AsyncBroadcastAction并启动。AsyncBroadcastAction只是确定了操作的流程,及操作完成如何返回response,并未涉及到具体的操作逻辑。因为这些逻辑都在每个子action中实现,不同的action需要进行不同的操作。如count需要count每个shard并且返回最后的总数值,而IndexExistAction则需要对比所有索引查看查询的索引是否存在。start方法的代码如下所示:

public void start() {      //没有shardsif (shardsIts.size() == 0) {// no shardstry {listener.onResponse(newResponse(request, new AtomicReferenceArray(0), clusterState));} catch (Throwable e) {listener.onFailure(e);}return;}request.beforeStart();// count the local operations, and perform the non local onesint shardIndex = -1;       //遍历对每个shards进行操作for (final ShardIterator shardIt : shardsIts) {shardIndex++;final ShardRouting shard = shardIt.nextOrNull();if (shard != null) {performOperation(shardIt, shard, shardIndex);} else {// really, no shards active in this grouponOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));}}}

start方法就是遍历所有shards,如果shard存在则执行performOperation方法,在这个方法中会区分该请求能否在本机上进行,能执行则调用shardOperation方法得到结果。这个方法在这是抽象的,每个子类都有实现。否则发送到对应的主机上。,如果shard为null则进行onOperation操作,遍历该shard的其它副本看能否找到可以操作的shard。performOperation代码如下所示:

 protected void performOperation(final ShardIterator shardIt, final ShardRouting shard, final int shardIndex) {if (shard == null) {//shard 为null抛出异常// no more active shards... (we should not really get here, just safety)onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));} else {try {final ShardRequest shardRequest = newShardRequest(shardIt.size(), shard, request);if (shard.currentNodeId().equals(nodes.localNodeId())) {//shard在本地执行shardOperation方法,并通过onOperation方法封装结果threadPool.executor(executor).execute(new Runnable() {@Overridepublic void run() {try {onOperation(shard, shardIndex, shardOperation(shardRequest));} catch (Throwable e) {onOperation(shard, shardIt, shardIndex, e);}}});} else {//不是本地shard,发送到对应节点。DiscoveryNode node = nodes.get(shard.currentNodeId());if (node == null) {// no node connected, act as failureonOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));} else {transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler<ShardResponse>() {@Overridepublic ShardResponse newInstance() {return newShardResponse();}@Overridepublic String executor() {return ThreadPool.Names.SAME;}@Overridepublic void handleResponse(ShardResponse response) {onOperation(shard, shardIndex, response);}@Overridepublic void handleException(TransportException e) {onOperation(shard, shardIt, shardIndex, e);}});}}} catch (Throwable e) {onOperation(shard, shardIt, shardIndex, e);}}}

方法shardOperation在countTransportAction的实现如下所示:

    @Overrideprotected ShardCountResponse shardOperation(ShardCountRequest request) throws ElasticsearchException {IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());//IndexShard indexShard = indexService.shardSafe(request.shardId().id());//构造查询contextSearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.shardId().getIndex(), request.shardId().id());SearchContext context = new DefaultSearchContext(0,new ShardSearchLocalRequest(request.types(), request.nowInMillis(), request.filteringAliases()),shardTarget, indexShard.acquireSearcher("count"), indexService, indexShard,scriptService, cacheRecycler, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter());SearchContext.setCurrent(context);try {// TODO: min score should move to be "null" as a value that is not initialized...if (request.minScore() != -1) {context.minimumScore(request.minScore());}BytesReference source = request.querySource();if (source != null && source.length() > 0) {try {QueryParseContext.setTypes(request.types());context.parsedQuery(indexService.queryParserService().parseQuery(source));} finally {QueryParseContext.removeTypes();}}final boolean hasTerminateAfterCount = request.terminateAfter() != DEFAULT_TERMINATE_AFTER;boolean terminatedEarly = false;context.preProcess();try {long count;if (hasTerminateAfterCount) {//调用lucene的封装接口执行查询并返回结果final Lucene.EarlyTerminatingCollector countCollector =Lucene.createCountBasedEarlyTerminatingCollector(request.terminateAfter());terminatedEarly = Lucene.countWithEarlyTermination(context.searcher(), context.query(), countCollector);count = countCollector.count();} else {count = Lucene.count(context.searcher(), context.query());}return new ShardCountResponse(request.shardId(), count, terminatedEarly);} catch (Exception e) {throw new QueryPhaseExecutionException(context, "failed to execute count", e);}} finally {// this will also release the index searcher
            context.close();SearchContext.removeCurrent();}}

可以看到这里是每个action真正的逻辑实现。因为这里涉及到index部分的内容,这里就不详细分析。后面关于index的分析会有涉及。这就是support action中的第一种实现。

第二种就master的相关操作,因此没有实现对应的操作类,而只是实现了一个方法。该方法的作用跟操作器作用相同,唯一的不同是它没有操作器这么多的变量, 而且它不是异步的。master的操作需要实时进行,执行过程中需要阻塞某些操作,保证集群状态一致性。这里就不再说明,请参考TransportMasterNodeOperationAction原码。

总结:本篇概括说了support action,并以countTransportAction为例说明了support Action中的异步操作器实现,最后简单的分析了master的同步操作。因为这里涉及到很多action不可能一一分析,有兴趣可以参考对应的代码。而且这里有以下index部分的内容,所以没有更深入的分析。在后面分析完index的相关功能后,会挑出几个重要的action做详细分析。

转载于:https://www.cnblogs.com/zziawanblog/p/6671286.html

action support分析相关推荐

  1. elasticsearch index 之 put mapping

    elasticsearch index 之 put mapping mapping机制使得elasticsearch索引数据变的更加灵活,近乎于no schema.mapping可以在建立索引时设置, ...

  2. OVS datapath之action分析(十九)

    OVS dp支持的action都在do_execute_actions函数中定义,支持的action包括: OVS_ACTION_ATTR_OUTPUT OVS_ACTION_ATTR_USERSPA ...

  3. Elasticsearch CCR源码分析

    本文基于Elasticsearch6.8.5版本 ES使用的是Guice框架,依赖注入和暴露接口的方式和Spring差距较大,可先查看guice框架 节点启动过程: org/elasticsearch ...

  4. VOC分析的实际流程

    VOC分析的实际流程 实际进行VOC分析的时候,应该做什么.怎么做呢?其大致流程如下图所示. 接下来,将分别进行具体说明. 明确目的 首先应该做的是明确"为了什么进行VOC分析"的 ...

  5. SpringBoot 操作 ElasticSearch 详解(万字长文)

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 作者:超级小豆丁 http://www.mydlq.club/ar ...

  6. ELK(Logstash+Elasticsearch+Kibana)的原理和详细搭建

    一. Elastic Stack Elastic Stack是ELK的官方称呼,网址:https://www.elastic.co/cn/products ,其作用是"构建在开源基础之上, ...

  7. 万字长文:详解 Spring Boot 中操作 ElasticSearch

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 作者 | 超级小豆丁 来源 | http://www.m ...

  8. elasticsearch 客户端工具_万字长文:详解 Spring Boot 中操作 ElasticSearch

    点击上方"小强的进阶之路",选择"星标"公众号 优质文章,及时送达 预计阅读时间: 15分钟 一.ElasticSearch 简介 1.简介 ElasticSe ...

  9. elasticSearch入门到java操作api一套搞定

    目录 写在前面 一.下载地址 二.solr与es比较 三.安装elasticsearch 四.安装可视化界面(hand插件) 使用 五.安装kibana 六.学习es核心概念 七.IK分词器插件 八. ...

最新文章

  1. Yarn 组件的指挥部 – 调度器Scheduler
  2. 使用LaunchScreen.storyboard黑屏
  3. Ubuntu利用Samba映射磁盘到Windows(转)
  4. LINUX CP 命令强制覆盖功能开启/关闭
  5. osg::PositionAttitudeTransform旋转物体
  6. FFMPEG 源码分析
  7. 出发a标签_以用户标签为例,复盘B端产品的需求挖掘方法论
  8. 虚拟机中模拟uboot启动
  9. 【开源】STM32H7-UART+DMA配置测试【含源码】
  10. Python pandas.DataFrame.combine_first函数方法的使用
  11. SAP PLM CVAPI_DOC_VIEW 获取DMS文档原件URL地址示例(Originals File)
  12. python之捕鱼问题
  13. mysql 查询和修改组合_别崩溃,来看这个MySQL全面瓦解:子查询和组合查询
  14. 初识文件管理、文件的逻辑结构
  15. matlab调整视频播放速度,会声会影如果调整视频播放速度
  16. java 虚拟机 Java内存结构 JVM垃圾回收机制算法
  17. 脚本计算机术语,谁能给我简单明了的解释一下脚本是什么意思?
  18. SOCKET编程的详细电子文档
  19. python读取输入数据的第二行_Python读取键盘输入的2种方法
  20. 浅谈估值模型:PB指标与剩余收益估值

热门文章

  1. 方舟编译器需要安装吗,还是系统升级后自带的?
  2. 古代的酒到底多少度,为何古人动不动喝好几坛都不会醉呢?
  3. 如何顺势用优势找到机会赚钱?
  4. 个人事业实现突破的关键点
  5. 现在生意不是能不能干的问题
  6. 终于把Redis场景设计搞清楚了,需要掌握的都在这了
  7. SQL Server中的动态SQL
  8. azure 导入 bak_如何使用BULK INSERT在本地和Azure中导入数据
  9. aws cli_学习AWS CLI:AWS CLI概述(AWS命令行界面)
  10. sql server 内存_SQL Server内存性能指标–第6部分–其他内存指标