Elasticsearch Java API 分组、聚合、嵌套相关查询

翼支付监控系统正使用es做后端存储,这边我们是将日志计算处理过后的数据通过kafka储存到es。选择用es作为数据储存端是考虑到es有一套完整的数据处理方式,功能全面性能优越,以及索引滚动可以方便管理es存储的数据。这理我主要是分享下Elasticsearch Java API对ES数据分组、聚合、嵌套以及时间直方图方面相关查询聚合的使用。

elasticsearch 版本为6.1.4

Elasticsearch条件查询

//创建索引查询请求
SearchRequest searchRequest = new SearchRequest(index + “_v*”);
//创建ES查询源构建体
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//创建复合查询对象(将查询条件进行and or 拼接)
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();

//条件查询
QueryBuilder testQueryBuilder=QueryBuilders.termQuery(“fieldName”,”value”);
//字段范围查询(gte,lte传Long型数据)
QueryBuilder timeQueryBuilder= = QueryBuilders.rangeQuery(fieldName).gte(startData).lte(endData);

boolQueryBuilder.must(testQueryBuilder).must(timeQueryBuilder );

等价于SQL语句
Select * from index
where fieldName = value and fieldName>=startData and fieldName <=endtime

boolQueryBuilder.must(testQueryBuilder).should(timeQueryBuilder );
等价于SQL语句
Select * from index
Where fieldName = value or(fieldName>=startData and fieldName <=endtime)

如果跟复杂的与或条件关系查询可再创建新的BoolQueryBuilder builder对象以boolQueryBuilder.must(builder)或是boolQueryBuilder.should(builder)构建出你想要的查询条件

Elasticsearch条件查询查询结果处理


将boolQueryBuilder对象装入searchSourceBuilder查询源构建体中
再将searchSourceBuilder放入searchRequest查询请求对象中
try {
SearchResponse searchResponse = restHighLevelClient.search(searchRequest);
SearchHits hits = searchResponse.getHits();

for (SearchHit hit : hits) {Map<String, Object> sourceAsMap = hit.getSourceAsMap();String filedName= ((String) sourceAsMap.get("filedName"));//ES中为数组数据结构List list= (List) sourceAsMap.get("filedList");
}

} catch (IOException e) {
log.error(“数据解析错误:{}”, e);
}

SearchResponse 为查询请求结果对象
SearchHits为数据封装集合,转为Map集合获取每个字段的属性值
其中filedName、filedList为ES中数据的字段名

分组、聚合及嵌套查询

这边可将分组类比为SQL语句中group by查询
聚合可理解为SQL语句中的求和、求最大值、最小值以及求均值的需求
嵌套则可以理解为Elasticsearch存值的某一字段为对像属性的值做处理

Elasticsearch Java API分组与聚合结合

其中对timestamp字段进行分组,分组别名为timestamp取2^31-1组数据
不设置size查询结果会返回默认size大小
AggregationBuilder oneAgg
= AggregationBuilders.terms(“fieldOne”) .field(“field_one”).size(2^31-1);

如果需要多个字段分组则需要再创建
AggregationBuilder twoAgg
=AggregationBuilders.terms(“fieldTwo”) .field(“field_two”).size(2^31-1);

//将错误码和错误信息分组的方式放入查询源的聚合通中
searchSourceBuilder.aggregation(oneAgg.subAggregation(twoAgg));

如果需要将分组结果的其他字段再进行统计的sum、min、max、avg聚合
则只需要
AggregationBuilder threeAgg= AggregationBuilders.sum(“fieldThree”).field(“field_three”);

searchSourceBuilder.aggregation(oneAgg.subAggregation(twoAgg.subAggregation(threeAgg)));
其中fieldThree为聚合别名,field_three为聚合字段
这段代码的解释是讲数据通过field_one、field_two字段进行分组对field_three进行求和聚合

只聚合不分组则
searchSourceBuilder.aggregation(threeAgg);即可

Elasticsearch Java API嵌套

下方ES模板中的"error_detail"字段就是就是一个嵌套式数据结果
上面的分组和聚合功能对error_detail对象下的属性不适用,则需要我们再做一层处理
{
“test_index”: {
“order”: 0,
“index_patterns”: [“test_index_v*”],
“settings”: {
“index”: {
“number_of_shards”: “3”,
“number_of_replicas”: “1”
}
},
“mappings”: {
“doc”: {
“dynamic”: “strict”,
“properties”: {
“id”: {
“type”: “long”
},
“timestamp”: {
“type”: “long”
},
“ip”: {
“type”: “keyword”
},
“path”: {
“type”: “keyword”,
“index”: false
},
“count_value”: {
“type”: “keyword”
},
“detail”: {
“type”: “nested”,
“properties”: {
“field_one”: {
“type”: “keyword”
},
“field_two”: {
“type”: “keyword”
},
“field_ip”: {
“type”: “keyword”
},
“field_three”: {
“type”: “integer”,
“index”: false
}
}
},
"version ": {
“type”: “keyword”
}
}
}
},
“aliases”: {}
}
}
/对于嵌套的聚合我们需要新建个NestedAggregationBuilder 对象”nestedAgg”为别名
"error_detail"为嵌套路径,之后的分组和聚合如上述一样
/
NestedAggregationBuilder nestedAggregationBuilder =
AggregationBuilders.nested(“nestedAgg”, “detail”);

AggregationBuilder oneAgg=
AggregationBuilders.terms(“fieldOne”).field(“detail.field_one”).size(2 ^ 31 - 1);

AggregationBuilder ipAgg=
AggregationBuilders.terms(“fieldIp”).field(“detail.field_ip”).size(2 ^ 31 - 1);

AggregationBuilder twoAgg=
AggregationBuilders.terms(“fieldTwo”).field(“detail.field_two”).size(2 ^ 31 - 1);

再将上面写的一个个聚合体放入nestedAggregationBuilder需要将上面的oneAgg、ipAgg、twoAgg聚合相互之间有关联需要一层一层关联如下
nestedAggregationBuilder.subAggregation(
oneAgg.subAggregation(twoAgg.subAggregation(ipAgg.subAggregation(
AggregationBuilders.sum(“fieldThree”).field(“detail.field_three”)))));

searchSourceBuilder.query(boolQueryBuilder).size(0);
searchSourceBuilder.aggregation(nestedAggregationBuilder);
searchRequest.source(searchSourceBuilder);

注意searchSourceBuilder.aggregation(nestedAggregationBuilder);这个段代码这是定义聚合方式的。
searchSourceBuilder.aggregation(oneAgg).aggregation(fieldIp)
.aggregation(twoAgg).aggregation(threeAgg); 和上方聚合方式完全不一样的只是单一将数据分组聚合相互之间没有关联,有兴趣的同学可以造数据尝试看数据结果

分组查询返回结果处理

try {
SearchResponse searchResponse = restHighLevelClient.search(searchRequest);
Nested nested = searchResponse.getAggregations().get(“nestedAgg”);

Aggregations nestedAggregations = nested.getAggregations();
Aggregation nestedAggregation = nestedAggregations.asList().get(0);
//List<ErrorCodeDTO> errorCodeDTOS = new ArrayList<>();List<? extends Terms.Bucket> oneBuckets = ((ParsedStringTerms) nestedAggregation).getBuckets();
for (Terms.Bucket oneBucket : oneBuckets ) {String fieldOne= oneBucket .getKey().toString();List<Aggregation> twoAggregations = oneBucket .getAggregations().asList();List<? extends Terms.Bucket> twoBuckets = ((ParsedStringTerms) twoAggregations .get(0)).getBuckets();for (Terms.Bucket twoBucket : twoBuckets ) {String filedtwo= twoBucket .getKey().toString();List<Aggregation> ipAggregations = errorMsgBucket.getAggregations().asList();List<? extends Terms.Bucket> ipBuckets = ((ParsedStringTerms)                                 ipAggregations .get(0)).getBuckets();for (Terms.Bucket ipBucket : ipBuckets ) {String ip= ipBucket .getKey().toString();Aggregation threeAggregation = ipBucket .getAggregations().asList().get(0);Integer count = (int) ((ParsedSum) threeAggregation ).getValue();System.out.print(fieldOne);System.out.print(filedtwo);System.out.print(ip);System.out.print(count );}}}
} catch (IOException e) {log.error("数据解析错误:{}", e);
}

上诉代码是将嵌套的错误码信息进行分组聚合的返回结果进行解析,我们错误码是按照字段1、IP、和字段2对字段3统计值进行聚合将桶(Bucket)中桶的信息进行逐一解析,这里不做过多赘诉,可以造数据debug步步调试看数据

时间直方图查询

Elasticsearch的时间直方图是在一定时间范围内按时间颗粒度进行分组聚合

/时间直方图聚合查询/
AggregationBuilder aggregation = AggregationBuilders.dateHistogram(“agg”).field(“timestamp”)
.interval(interfaceDTO.getInterval()).minDocCount(0)
//成功请求数求和
.subAggregation(AggregationBuilders.sum(“countValue”).field(“count_value”));
上述代码agg为时间直方图聚合别名,对timestamp字段进行时间直方图聚合,interval中设置的参数为聚合粒度,minDocCount(0)返回空桶意思为在时间分组的某个时间段内没有值返回空值0。
此处的minDocCount(0)返回空桶的功能无效,因为我们这里存储ES中的时间字段是timestamp它是一个Long型的时间戳,不返回空桶这API封装的底层代码不完善造成的,要想可能返回空桶,可将时间字段改成Data型,或加个Data型时间字段。但是Data型的时间字段还会有个问题:比如我们对13:00:00-14:00:00时间段以粒度5分钟进型聚合,当ES中只有13:20:00-13:40:00的数据,那么时间直方图聚合只会返回13:20:00-13:40:00之间的空桶,13:00:00-13:15:00和13:45:00-14:00的空桶不返回。此处请同学注意,对这些数据根据需要需要展示的话,要手动补偿!

searchSourceBuilder.query(boolQueryBuilder).size(0);
searchSourceBuilder.aggregation(aggregation);
searchRequest.source(searchSourceBuilder);

时间直方图返回结果处理

try {
SearchResponse searchResponse = restHighLevelClient.search(searchRequest);
Histogram agg = searchResponse.getAggregations().get(“agg”);
for (Histogram.Bucket entry : agg.getBuckets()) {
ChartInfoDTO chartInfoDTO = new ChartInfoDTO();
//将key的时间戳转为时间
String time = entry.getKey().toString().replace(“Z”, " UTC");
SimpleDateFormat format = new SimpleDateFormat(“yyyy-MM-dd’T’HH:mm:ss.SSS Z”);//注意格式化的表达式
Date date = format.parse(time);
Long timestamp = date.getTime();
Aggregations aggregations = entry.getAggregations();
List list = aggregations.asList();
for (Aggregation bucketAggregation : list) {
if (“count_value”.equals(bucketAggregation.getName())) {
Integer countValue = (int) ((ParsedSum) bucketAggregation).getValue();
}
}
} catch (IOException | ParseException e) {
log.error(“告警大盘聚合错误:{}”, e);
}

数据解析主要分下面三个步骤:
//获取时间直方图别名的返回的Histogram对象
Histogram agg = searchResponse.getAggregations().get(“agg”);
//便利Histogram对象中的Buckets(桶)
for (Histogram.Bucket entry : agg.getBuckets()) {
//获取桶中的聚合对象集合
Aggregations aggregations = entry.getAggregations();
List list = aggregations.asList();
//便利聚合对象取值
for (Aggregation bucketAggregation : list) {
}
}

结语

我们这么涉及的Elasticsearch的功能还比较少,内容也不是很全面,只是针对前段时间对工作上用的技术和问题进行简单的归纳。我们这里只是简单的将Elasticsearch当库使用,它还有分词相关的强大搜索查询功能。有兴趣的同学可以一起学习。

作者:李元 翼支付信息技术部监控技术组高级Java开发

Elasticsearch Java API 分组、聚合、嵌套相关查询相关推荐

  1. Elasticsearch Java API 的使用—多条件查询

    //多条件设置 MatchPhraseQueryBuilder mpq1 = QueryBuilders.matchPhraseQuery("pointid","W3.U ...

  2. Elasticsearch8.0版本中Elasticsearch Java API Client客户端的基本使用方法

    关于Elasticsearch Java API Client客户端如何连接以及如何对索引和文档进行基本的增删改查操作请查看我的上一篇博文:Elasticsearch RestHighLevelCli ...

  3. SpringBoot整合最新Elasticsearch Java API Client 7.16教程

    文章目录 前言 一.Elasticsearch和Kibana 7.16版本安装 二.pom.xml文件引入依赖 三.代码实例 总结 前言 最新在学习SpringBoot整合es的一些知识,浏览了网上的 ...

  4. Es elasticsearch 十七 Java api 实现聚合 几个聚合示例 sql 开启许可 新特效 java 实现es7 sql 功能

    目录 Java api 实现聚合 依赖 简单聚合按照颜色分组获取每个卖出数量 聚合每个颜色卖出数量,及平均价格(每个分桶子聚合) 按照颜色分组 ,获取销售数量,avg min max sum 按照60 ...

  5. Elasticsearch Java API 很全的整理以及架构剖析

    Elasticsearch 的API 分为 REST Client API(http请求形式)以及 transportClient API两种.相比来说transportClient API效率更高, ...

  6. Elasticsearch Java API四种实现方式

    0.题记 之前Elasticsearch的应用比较多,但大多集中在关系型.非关系型数据库与Elasticsearch之间的同步.以上内容完成了Elasticsearch所需要的基础数据量的供给.但想要 ...

  7. Elasticsearch RestHighLevelClient 已标记为被弃用 它的替代方案 Elasticsearch Java API Client 的基础教程及迁移方案

    在Elasticsearch7.15版本之后,Elasticsearch官方将它的高级客户端RestHighLevelClient标记为弃用状态.同时推出了全新的Java API客户端Elastics ...

  8. ElasticSearch Java Api(四) -删除索引

    删除可以是删除整个索引库,也可以根据文档id删除索引库下的文档,还可以通过query查询条件删除所有符合条件的数据. 一.删除整个索引库 下面的例子会删除indexName索引: DeleteInde ...

  9. Elasticsearch Java API 6.2(java client)

    前言 本节描述了Elasticsearch提供的Java API,所有的Elasticsearch操作都使用客户端对象执行,所有操作本质上都是完全异步的(要么接收监听器,要么未来返回). 此外,客户端 ...

最新文章

  1. 再读《数量生态学:R语言应用》
  2. 《我的Python之路V1.3.pdf》可以下载了,这版pdf更精美!
  3. win7(旗舰版)下,OleLoadPicture 加载内存中的图片(MagickGetImageBlob),返回值 0
  4. Python多线程学习教程
  5. 涨知识!外贸中,船公司S/O(订舱单)文件英文解释!
  6. 在并发中练习 Boost.Multiprecision多线程环境相关的测试程序
  7. SQLServer游标简单应用(求分组最小值问题)
  8. Python爬虫爬取美剧网站
  9. 支付宝又要改版了:首页顶栏新增了这个模块
  10. matlab 量化 策略,【策略分享】Matlab量化交易策略源码分享
  11. js获取两个数组不同的元素并返回不同元素组成的数组,并对不同的元素添加一个新的属性
  12. Android内核开发:理解和掌握repo工具
  13. cgblib 代理接口原理_Spring5参考指南-AOP代理
  14. Ansible - 自动化运维工具
  15. iOS 开发获取字体类型
  16. 【转载】MiniGUI输入法词库更新
  17. [C# Fundamantal] 类继承简析
  18. 详解浮点数的精度问题
  19. MATLAB用相干解调DSB信号,AM DSB信号解调的MATLAB实现
  20. oeasy教您玩转 linux 010213 中文 fcitx

热门文章

  1. 100本Python精品书籍(附下载)
  2. 有一个文件上传成功 2:03:58.m4a | MixTalk S01-10
  3. idea mysql快捷键_IntelliJ IDEA 设置代码提示或自动补全的快捷键(Alt+/)
  4. PLC:学习笔记(西门子)4
  5. C#---第十二课:列表操作的高级用法---FindAll()、Find()、Select()、Where()、Sort()、Exists()
  6. ILP——指令级并行2:记分牌(Scoreboard)技术
  7. Linux | ssh服务原理、配置及操作
  8. 脑洞大的日本人,做了一个AI智能观音讲佛经!
  9. 面经手册 · 第11篇《StringBuilder 比 String 快?空嘴白牙的,证据呢!》
  10. 三个骰子点数之和概率