本文以TermQuery,GlobalOrdinalsStringTermsAggregator为例,通过代码,分析es,lucene搜索及聚合流程。1:协调节点收到请求后,将search任务发到相关的各个shard。

相关代码:

TransportSearchAction.executeSearch
TransportSearchAction.searchAsyncAction.start
AbstractSearchAsyncAction.executePhase(SearchQueryThenFetchAsyncAction)
InitialSearchPhase.performPhaseOnShard
SearchQueryThenFetchAsyncAction.executePhaseOnShard

2:数据节点查询及聚合一个shard。

相关代码:

SearchService.executeQueryPhase

2.1:根据request构造SearchContext。

SearchContext
包含Query,Aggregator等重要信息。并将记录查询,聚合结果。Query
根据request创建具体的query,如:
TermQuery:用于keyword,text字段。索引结构为倒排。
PointRangeQuery:用于数字,日期,ip,point等字段。索引结构为k-d tree。Aggregator
此时仅根据request创建AggregatorFactory,用于后续创建Aggregator。

相关代码:

SearchService.createAndPutContext
//数字都是KD-tree.
PointValues

2.2:根据SearchContext构造Aggregator。

根据SearchContext构造具体的Aggregator,如:
GlobalOrdinalsStringTermsAggregator:用于keyword字段,开启global ordinal的term聚合。
StringTermsAggregator:用于keyword字段,关闭global ordinal的term聚合。
LongTermsAggregator:用于long字段的term聚合。
TopScoreDocCollector:用于为doc 评分并取topN。

相关代码:

AggregationPhase.preProcess

2.3:创建GlobalOrdinalsStringTermsAggregator,如果cache中没有GlobalOrdinals,将创建GlobalOrdinals,并cache。当shard下数据发生变化时,应当清空cache。

GlobalOrdinals
将所有segment ,指定field的所有term排序,合并成一个GlobalOrdinals,并创建OrdinalMap。collect时,使用doc的segment ord获取global ord。OrdinalMap
为每一个segmentValueCount小于globalValueCount的segment,保存了一份segment ord到global ord的mapping(LongValues)。对于segment valueCount等于globalValueCount的segment,原本的segment ord就是global ord,后续获取ord时,直接从SortedSetDV(dvd)中读取。value count
指的是不同term数量(term集合的大小)。使用globalValueCount 用来在collect时,确定结果集的大小。

举例
segment 1:{sorted terms: [aa, bb, cc],ord:[0, 1, 2]}。
segment 2:{sorted terms: [bb, cc, dd],ord:[0, 1, 2]}。
segment 3:{sorted terms: [aa, bb, cc, dd],ord:[0, 1, 2, 3]}。
GlobalOrdinals:{sorted terms: [aa, bb, cc, dd],ord:[0, 1, 2, 3]}。
ordinalMap:segment1:[0, 1, 2]->[0, 1, 2],segment2:[0, 1, 2]->[1, 2, 3]。segment3则使用原始的segment ord。

docCounts
int[globalValueCount],用来记录ord对应的count。
注:经查询条件过滤后,有些ord可能没有对应doc。

bucketOrds
稀疏(value count多,但doc少)时使用,缩减docCounts size。
LongHash:globalOrd与 id (size)映射。collect时在id处++,build agg时取出id对应的count。
当父聚合是BucketAggregator聚合时,子聚合只对父的某个term聚合,所以doc会减少,使用bucketOrds。注:按照此逻辑,如果query本身有term过滤条件,也应该启用bucketOrds(global_ordinals_hash)。

cache结构
fieldName -> shardId -> globalOrd.

相关代码:

TermsAggregatorFactory.doCreateInternal。
//获取globalValueCount决定是否global_ordinals_low_cardinality, global_ordinals_low_cardinality中又因不是ValuesSource.Bytes.FieldData,创建global_ordinals。
ValuesSource$WithOrdinals.globalMaxOrd。
//通过获取一个segment的globalOrdinals,触发如果cache中没有一个shardId+field对应的globalOrdinals,load 所有segment ord,建立global ords。
ValuesSource$FieldData.globalOrdinalsValues。
SortedSetDVOrdinalsIndexFieldData.loadGlobal。
IndicesFieldDataCache$IndexFieldCache.load
SortedSetDVOrdinalsIndexFieldData.localGlobalDirect。
GlobalOrdinalsBuilder.build。
//globalOrdinals主要类
GlobalOrdinalsIndexFieldData。
MultiDocValues$OrdinalMap

2.3.1:从docValues中读取单个segment,指定field的ordinals,term等。

相关代码:

SortedSetDVOrdinalsIndexFieldData.load。
SortedSetDVBytesAtomicFieldData.getOrdinalsValues。
//获取segment指定field的SortedSetDocValues
DocValues.getSortedSet。
//获取segment的docValuesReader
SegmentReader.getDocValuesReader。
//读取field的SortedDocValues
Lucene54DocValuesProducer.getSortedSet。

2.3.2:对多个segment的SortedSetDocValues排序,创建OrdinalMap。

具体为获取每个segment的SortedDocValuesTermsEnum。使用多个SortedDocValuesTermsEnum构建成小顶堆,合并成一个。

相关代码:

MultiDocValues$OrdinalMap.build。
MultiTermsEnum
TermMergeQueue
//获取一个segment的segment ord到global ord的mapping。
MultiDocValues$OrdinalMap.getGlobalOrds

2.4:查询及聚合数据。

相关代码:

QueryPhase.execute。

2.4.1:根据Query创建具体的weight。
weigth将用于query segment,并创建scorer。
scorer将用于评分和collect。
如果需要评分,读取field的fst,查询term,定位postings将提前到这里执行。

相关代码:

IndexSearcher.createNormalizedWeight。
TermQuery.createWeight。

2.4.2:为每个leafReader(segment)创建leafCollector。

创建LeafBucketCollector,获取该segment的globalOrds。globalOrds
如果segment的value count等于global value count,则返回segment ords(从dvd中读取);

如果不等,则从OrdinalMap中获取该segment的GlobalOrdinalMapping,且该segment的value count改为获取global value count。singleValues
并判断该field的docValues是否为singleValues(keyword single ord,text则为多term多ord)。

相关代码:

//串行查询及聚合一个分片下的所有segment。
IndexSearcher.search。
IndexSearcher.search.collector.getLeafCollector。
GlobalOrdinalsStringTermsAggregator.getLeafCollector。
//获取指定segment的globalOrdinals,如果cache中没有该shardId+field对应的globalOrdinals,load 所有segment ord,建立global ords。
ValuesSource$FieldData.globalOrdinalsValues
//获取一个segment的global ords。
GlobalOrdinalsIndexFieldData$Atomic.getOrdinalsValues
//提供获取该segment ord对应的global ord,使用globalOrd获取termBytes等方法。
GlobalOrdinalMapping
//singleValues
SingletonSortedSetDocValues

2.4.3:query该segment, 获取DocIdSetIterator,并构造scorer。

DocIdSetIterator即查询出的docId集合,对于倒排是PostingsEnum,对于数字使用的是BitSetIterator。

相关代码:

IndexSearcher.search.weight.bulkScorer。
Weight.bulkScorer。
//构造bulkScorer。
TermQuery$TermWeight.scorer。
//查询segment,获取TermsEnum,并根据搜索关键字,定位PostingsEnum位置。
TermQuery$TermWeight.getTermsEnum。

query segment流程如下:

1:根据field读取.tip(fst索引结构,term index)文件,获取该field下所有term前缀构造的索引,并缓存。

FST(Finite State Transducer,有限状态传感器)其他用途:阿里对hbase rowkey索引定位block(类似lucene tip索引term),

自然语言处理中一个单词或汉字下一个状态各个状态的概率。

相关代码:

BlockTreeTermsReader.terms。
FieldReader。
//Load a previously saved FST
FST。

注:官方lucene在open IndexReader(es recovery shard)时,就要通过构造SegmentReader,BlockTreeTermsReader,构造FieldReader,读取FST。

相关代码:

DirectoryReader.open

2:从fst中查找term,如果能找到的value(fst正常结束),value记录了该term前缀对应的term dict所在的block(.tim,term dictionary)位置,读取该block,查找具体的term,获取posting所在.doc, .pos(postings)的位置。

相关代码:

TermQuery$TermWeight.getTermsEnum.termsEnum.seekExact。
SegmentTermsEnum.seekExact。
SegmentTermsEnumFrame.scanToTerm。
//根据termsEnum(已经设置term)读取postings。
TermQuery$TermWeight.scorer.termsEnum.postings。
SegmentTermsEnum.postings。
//根据termsEnum中的term,设置postings在.doc中位置。
SegmentTermsEnum.postings.currentFrame.decodeMetaData。

3:从.doc,.pos中读取postings,返回PostingsEnum(BlockDocsEnum)。

相关代码:

Lucene50PostingsReader.postings。

上述流程如下图:

postings
(docID, termFreq, positions), (docID, termFreq, positions),.....termFreq
term在该文档出现的次数。
用于对文档频分。positions
term在该文档中每次的位置。
用于短语查询时,多个term是否连续出现,或者小于指定位置。

2.4.4:遍历PostingsEnum(过滤deleted doc),评分及collect数据。

相关代码:

acceptDocs:getLiveDocs
IndexSearcher.search.scorer.score。
BulkScorer.score。
DefaultBulkScorer.score。
//在查询结果中前进到>=target的docID,并返回docID。
Lucene50PostingsReader$BlockDocsEnum.advance(target)。
//遍历BlockDocsEnum(PostingsEnum)中的查询结果,collect doc。
DefaultBulkScorer.scoreRange。
//collect一个doc。
MultiCollector$MultiLeafCollector.collect。

TopScoreDocCollector对doc评分,并取topN的流程如下:为该doc评分,并基于score构建N节点的小顶堆,用于保留TopN。

相关代码:

TopScoreDocCollector$SimpleTopScoreDocCollector.collect。

1:根据设置的Similarity,使用BM25或TFIDF等算法为doc评分。

BM25,TFIDF都使用freq,norms(NumericDocValues),算法不同,可能使用的NumericDocValues也不同。

相关代码:

TermScorer.score。
BM25Similarity$BM25DocScorer.score。
TFIDFSimilarity$TFIDFSimScorer.score。
IndexWriterConfig.setSimilarity。
IndexSearcher.setSimilarity。
NumericDocValues。

2:根据doc得到的score构建N节点的小顶堆。

相关代码:

TopScoreDocCollector$SimpleTopScoreDocCollector.collect。
PriorityQueue.updateTop/downHeap/insertWithOverflow。

GlobalOrdinalsStringTermsAggregator统计各term doc数的流程如下:1:根据doc是否为singleValues,获取doc的ord或ords。

相关代码:

//singleValues获取ord
singleValues.getOrd(doc)。
//获取ords
//设置doc。
GlobalOrdinalsStringTermsAggregator$LeafBucketCollector.collect.globalOrds.setDocument(doc)
AbstractRandomAccessOrds.setDocument(doc)。
//获取doc对应的term基数。
GlobalOrdinalsStringTermsAggregator$LeafBucketCollector.collect.globalOrds.cardinality()。
GlobalOrdinalMapping.cardinality()。
//遍历doc ords。
GlobalOrdinalsStringTermsAggregator$LeafBucketCollector.collect.globalOrds.ordAt(i)。
GlobalOrdinalMapping.ordAt(i)。

2:docCounts(IntArray)对应的ord count++。如果启用bucketOrds(稀疏处理,见2.3),则将ord映射到bucketOrd,docCounts的bucketOrd位置 count++。

相关代码:

//将ord对应count++。传入doc,用于sub collect。
GlobalOrdinalsStringTermsAggregator.collectGlobalOrd。

2.4.5:取topDocs。TopScoreDocCollector collect时仅保留topN。在此每次取堆顶元素,得到逆序的topN。

相关代码:

TopDocsCollector.topDocs。

2.4.6:根据聚合数据,按docCount取topN,排序。
根据aggregator的数据,按docCount构建小顶堆。
每次取走堆顶元素,逆序放入数组,得到降序的topN。
设置termBytes。

相关代码:

AggregationPhase.execute。
GlobalOrdinalsStringTermsAggregator.buildAggregation。
PriorityQueue.updateTop/downHeap/insertWithOverflow。
//根据globalOrd从所有segment中获取第一个含有该globalOrd的segment,并从该segment中读取term值BytesRef。
GlobalOrdinalMapping.lookupOrd。

3:协调节点reduce 各个shard返回的结果。使用各shard返回的有序结果,构造堆,合并聚合,合并TopDocs。

相关代码:

InitialSearchPhase.onShardResult。
InitialSearchPhase.onShardFailure。
//reduce结果
FetchSearchPhase.innerRun.resultConsumer.reduce。
SearchPhaseController.reducedQueryPhase。
SearchPhaseController.sortDocs。
//mergeTopDocs
SearchPhaseController.mergeTopDocs。
TopDocs.merge。
TopDocs.mergeAux。
PriorityQueue。

4:fetch数据。协调发送fecth请求到相关shard,数据节点从stored field中fetch结果。

相关代码:

FetchSearchPhase.innerRun。

参考:
source code: elasticsearch 5.6.12, lucene 6.6.1。https://www.elastic.co/blog/lucene-points-6.0

PointRangeQuery:abstract class也可以有构造方法。

es 时间字段聚合_es lucene搜索及聚合流程源码分析相关推荐

  1. es 时间字段聚合_ES之五:ElasticSearch聚合

    1.单值聚合 Sum求和,dsl参考如下: { "size": 0, "aggs": { "return_balance": { " ...

  2. es 时间字段聚合_ES按时间聚合、嵌套聚合

    // POST index name/document name/_search格式 POST xxx-*/doc/_search { "query": { "bool& ...

  3. es 时间字段聚合_Elasticsearch 聚合(aggregation)查询返回所有

    Elasticsearch做查询的时候,很多情况下,有一些查询满足不了我们的要求,这个时候,我们用Elasticsearch的聚合(aggregation)函数,下面讲讲aggregation的使用姿 ...

  4. fastjson反序列化过滤字段属性_Fastjsonlt;=1.2.47反序列化漏洞源码分析及复现

    简介 fastjson 是由阿里开发的一种 json 的解析器和生成器.在 2019 年 6 月 26 日,用户提出issue[1],存在远程代码执行的版本<=1.2.47. 环境准备 jdk ...

  5. es 时间过滤java,Elasticsearch过滤与聚合的先后顺序java实现

    一.Elasticsearch的聚合 ES的聚合相当于关系型数据库里面的group by,例如查找在性别字段男女人数的多少并且按照人数的多少进行排序,在使用MySQL的时候,可以使用如下的句子 sel ...

  6. 找不到具有指定id_JAVA如何整合es指定字段不分词搜索?

    一.问题 在做一个需求的时候,需要按照电话号码查询用户关系,所以我这边先讲相关信息同步到es,但是电话号码是加密的,所以显示的字符串是杂乱的,既有字母,又有斜杠等号等字符,在进行分词查询的时候匹配不到 ...

  7. es创建索引设置字段不分词_java整合es指定字段不分词搜索

    二.问题 在做一个需求的时候,需要按照电话号码查询用户关系,所以我这边先讲相关信息同步到es,但是电话号码是加密的,所以显示的字符串是杂乱的,既有字母,又有斜杠等号等字符,在进行分词查询的时候匹配不到 ...

  8. es对分组后结果进行统计_elasticsearch聚合操作——本质就是针对搜索后的结果使用桶bucket(允许嵌套)进行group by,统计下分组结果,包括min/max/avg...

    分析 最后,我们还有一个需求需要完成:允许管理者在职员目录中进行一些分析. Elasticsearch有一个功能叫做聚合(aggregations),它允许你在数据上生成复杂的分析统计.它很像SQL中 ...

  9. es mysql in操作_es 常用操作

    一:索引相关 1,创建索引 (1) PUT /test_index/ { "settings":{ "index":{ "number_of_shar ...

  10. ElasticSearch实现商品搜索与聚合分析

    ElasticSearch实现商品搜索与聚合分析 Gitee地址:https://gitee.com/yuyuuyuy/micro-mall 文章目录 ElasticSearch实现商品搜索与聚合分析 ...

最新文章

  1. Android 应用程序集成Google 登录及二次封装
  2. XCode真机测试发布时产生The executable was signed with invalid entitlements.解决办法
  3. LetCode: 227. 简单计算器2
  4. 用C#写MSN机器人
  5. 【渝粤题库】国家开放大学2021春2143西方经济学题目
  6. eXosip中Event事件学习笔记
  7. python pyhook_python pyHook安装
  8. Semaphore释疑
  9. 中科院ICTCLAS分词汉语词性标记集
  10. 学习笔记(02):英特尔® OpenVINO™工具套件初级课程-如何加速视频处理进程?
  11. 电子邮件营销是什么?邮件群发软件
  12. 芒果tv官网服务器维护,芒果tv看不了【解决方案】
  13. 11.4王者荣耀服务器维护中,英雄战迹11号更新维护公告 删档最后一更
  14. 使用WGCLOUD来统计用户日活周活月活
  15. 强化学习gym的使用之Pendulum的训练
  16. 制作森林图-简单明了
  17. 基于Java+Swing实现雷电小游戏
  18. 学生请假管理系统(jsp+servlet)
  19. 240. 搜索二维矩阵 II
  20. 2008 网马王网站分析

热门文章

  1. asp.net中如何解决4M以上文件的上传
  2. 网络疯传IT男女标配图
  3. Android项目实战之高仿网易云音乐创建项目和配置
  4. linux定时执行脚本
  5. android ViewFlipper的使用
  6. shell中引号的应用
  7. 在Xen的DomU中安装MySQL
  8. 2022 基于SpringBoot/SSM的数据库查询平台
  9. java实现人民币数字和大写转换
  10. oracle 写递归,请问一个递归sql的写法