本文基于Elasticsearch7.x

本文将上篇Elasticsearch聚合分析Rest API里的实例转化为Java Client

Bucket Aggregation

Bucket Aggregation是一系列满足特定条件的文档的集合, 类似于SQL语句里的分组功能.

(1) main方法

public static void main(String[] args) throws IOException {RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200, "http")));bulkIndex(client);termAggs(client);rangeAggs(client);dateRangeAggs(client);histogramAggs(client);dateHistogramAggs(client);filterAggs(client);client.close();
}

(2) 添加数据

新增电视机销售记录, 用于接下来的聚合分析.

private static void bulkIndex(RestHighLevelClient client) throws IOException {BulkRequest bulkRequest = new BulkRequest();bulkRequest.add(new IndexRequest("sales").id("1").source(XContentType.JSON, "price", 1000, "color", "红色", "brand", "长虹", "sold_date", "2019-10-28"));bulkRequest.add(new IndexRequest("sales").id("2").source(XContentType.JSON, "price", 2000, "color", "红色", "brand", "长虹", "sold_date", "2019-11-05"));bulkRequest.add(new IndexRequest("sales").id("3").source(XContentType.JSON, "price", 3000, "color", "绿色", "brand", "小米", "sold_date", "2019-05-18"));bulkRequest.add(new IndexRequest("sales").id("4").source(XContentType.JSON, "price", 1500, "color", "蓝色", "brand", "TCL", "sold_date", "2019-07-02"));bulkRequest.add(new IndexRequest("sales").id("5").source(XContentType.JSON, "price", 1200, "color", "绿色", "brand", "TCL", "sold_date", "2019-08-19"));bulkRequest.add(new IndexRequest("sales").id("6").source(XContentType.JSON, "price", 2000, "color", "红色", "brand", "长虹", "sold_date", "2019-11-05"));bulkRequest.add(new IndexRequest("sales").id("7").source(XContentType.JSON, "price", 8000, "color", "红色", "brand", "三星", "sold_date", "2020-01-01"));bulkRequest.add(new IndexRequest("sales").id("8").source(XContentType.JSON, "price", 2500, "color", "蓝色", "brand", "小米", "sold_date", "2020-02-12"));client.bulk(bulkRequest, RequestOptions.DEFAULT);
}

(3) terms

按某个字段进行分组, 比如按品牌进行分组.

private static void termAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsTermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("term_aggs").field("brand.keyword");searchSourceBuilder.aggregation(termsAggregationBuilder);searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);Terms termAggs = searchResponse.getAggregations().get("term_aggs");List<? extends Terms.Bucket> buckets = termAggs.getBuckets();for (Terms.Bucket bucket : buckets) {System.out.println("key:" + bucket.getKeyAsString() + "\ndoc_count:" + bucket.getDocCount());}
}

(4) range

按字段的值范围进行分组, 比如按电视价格范围分组. range只能作用于数值类型.

private static void rangeAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsRangeAggregationBuilder rangeAggregationBuilder = AggregationBuilders.range("price_range_aggs").field("price").addUnboundedTo(1000).addRange(1000, 3000).addUnboundedFrom(3000);searchSourceBuilder.aggregation(rangeAggregationBuilder);searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);Range rangeAggs = searchResponse.getAggregations().get("price_range_aggs");List<? extends Range.Bucket> buckets = rangeAggs.getBuckets();for (Range.Bucket bucket : buckets) {System.out.println("key:" + bucket.getKeyAsString() + "\ndoc_count:" + bucket.getDocCount());}
}

(5) date_range

按字段的值范围进行分组, 比如按sold_date范围分组. date_range只能作用于date类型.

private static void dateRangeAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsDateRangeAggregationBuilder dateRangeAggregationBuilder = AggregationBuilders.dateRange("date_range_aggs").field("sold_date").addUnboundedFrom("start", "2019-10-31").addRange("middle", "2019-10-31", "2020-01-01").addUnboundedTo("end", "now");searchSourceBuilder.aggregation(dateRangeAggregationBuilder);searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);Range rangeAggs = searchResponse.getAggregations().get("date_range_aggs");List<? extends Range.Bucket> buckets = rangeAggs.getBuckets();for (Range.Bucket bucket : buckets) {System.out.println("key:" + bucket.getKeyAsString() + "\ndoc_count:" + bucket.getDocCount());}
}

(6) histogram

构建一个直方图, 如按照价格区间分组. histogram只能作用于数值类型.

private static void histogramAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsHistogramAggregationBuilder histogramAggregationBuilder = AggregationBuilders.histogram("histogram_aggs").field("price").interval(2000);searchSourceBuilder.aggregation(histogramAggregationBuilder);searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);Histogram histogramAggs = searchResponse.getAggregations().get("histogram_aggs");List<? extends Histogram.Bucket> buckets = histogramAggs.getBuckets();for (Histogram.Bucket bucket : buckets) {System.out.println("key:" + bucket.getKeyAsString() + "\ndoc_count:" + bucket.getDocCount());}
}

(7) date_histogram

构建一个直方图, 如按照sold_date区间分组. date_histogram只能作用于date类型.

private static void dateHistogramAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsDateHistogramAggregationBuilder dateHistogramAggregationBuilder = AggregationBuilders.dateHistogram("date_histogram_aggs").field("sold_date").calendarInterval(DateHistogramInterval.MONTH).format("yyyy-MM-dd").minDocCount(0).extendedBounds(new ExtendedBounds("2019-05-01", "2020-02-01"));searchSourceBuilder.aggregation(dateHistogramAggregationBuilder);searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);Histogram histogramAggs = searchResponse.getAggregations().get("date_histogram_aggs");List<? extends Histogram.Bucket> buckets = histogramAggs.getBuckets();for (Histogram.Bucket bucket : buckets) {System.out.println("key:" + bucket.getKeyAsString() + "\ndoc_count:" + bucket.getDocCount());}
}

(8) filter

过滤分组, 将满足条件的数据分为一组. 比如分析最近6个月电视销售的平均价格.

private static void filterAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsFilterAggregationBuilder filterAggregationBuilder = AggregationBuilders.filter("filter_aggs", new RangeQueryBuilder("sold_date").gte("now-6M"));//sub_aggsAvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("avg_price").field("price");filterAggregationBuilder.subAggregation(avgAggregationBuilder);searchSourceBuilder.aggregation(filterAggregationBuilder);searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);Filter filterAggs = searchResponse.getAggregations().get("filter_aggs");Avg avgPriceAggs = filterAggs.getAggregations().get("avg_price");System.out.println("doc_count: " + filterAggs.getDocCount() + "\navg_price: " + avgPriceAggs.getValue());
}

Metric Aggregation

Metric Aggregation是一系列数学运算, 可以对文档字段进行统计分析, 类似于SQL语句分组后的统计功能.

(1) main方法

public static void main(String[] args) throws IOException {RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200, "http")));baseMetricAggs(client);cadinalityAggs(client);statAggs(client);topHitsAggs(client);percentilesAggs(client);percentilesRanksAggs(client);singleNestAggs(client);multiNestAggs(client);client.close();
}

(2) count/min/max/sum/avg

从上文的Bucket Aggregation实例中我们知道, 当进行Bucket Aggregation时, 默认会生成一个doc_count, 这个就是Count Aggregation.

min/max/sum/avg实例:

private static void baseMetricAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsMinAggregationBuilder minAggregationBuilder = AggregationBuilders.min("min_price").field("price");MaxAggregationBuilder maxAggregationBuilder = AggregationBuilders.max("max_price").field("price");SumAggregationBuilder sumAggregationBuilder = AggregationBuilders.sum("total_sales").field("price");AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("avg_price").field("price");searchSourceBuilder.aggregation(minAggregationBuilder);searchSourceBuilder.aggregation(maxAggregationBuilder);searchSourceBuilder.aggregation(sumAggregationBuilder);searchSourceBuilder.aggregation(avgAggregationBuilder);searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);Max maxPriceAggs = searchResponse.getAggregations().get("max_price");Min minPriceAggs = searchResponse.getAggregations().get("min_price");Avg avgPriceAggs = searchResponse.getAggregations().get("avg_price");Sum totalPriceAggs = searchResponse.getAggregations().get("total_sales");System.out.println("max_price: " + maxPriceAggs.getValue());System.out.println("min_price: " + minPriceAggs.getValue());System.out.println("avg_price: " + avgPriceAggs.getValue());System.out.println("total_sales: " + totalPriceAggs.getValue());
}

(3) cardinality

类似于与SQL里的distinct Count. 比如统计售出的电视品牌数.

private static void cadinalityAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsCardinalityAggregationBuilder cardinalityAggregationBuilder = AggregationBuilders.cardinality("cardinality_aggs").field("brand.keyword");searchSourceBuilder.aggregation(cardinalityAggregationBuilder);searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);Cardinality cardinalityAggs = searchResponse.getAggregations().get("cardinality_aggs");System.out.println("cardinality_aggs: " + cardinalityAggs.getValue());
}

(4) stats

统计count, min, max, sum, avg.

private static void statAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsStatsAggregationBuilder statsAggregationBuilder = AggregationBuilders.stats("stat_price_aggs").field("price");searchSourceBuilder.aggregation(statsAggregationBuilder);searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);Stats statPriceAggs = searchResponse.getAggregations().get("stat_price_aggs");System.out.println("max_price: " + statPriceAggs.getMax());System.out.println("min_price: " + statPriceAggs.getMin());System.out.println("avg_price: " + statPriceAggs.getAvg());System.out.println("total_sales: " + statPriceAggs.getSum());
}

(5) top_hits

返回排在前面的结果, 与sort联用. 比如统计前三销售价格.

private static void topHitsAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsTopHitsAggregationBuilder topHitsAggregationBuilder = AggregationBuilders.topHits("top_hits_aggs").size(3).sort("price", SortOrder.DESC);searchSourceBuilder.aggregation(topHitsAggregationBuilder);searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);TopHits topHitsAggs = searchResponse.getAggregations().get("top_hits_aggs");SearchHit[] hits = topHitsAggs.getHits().getHits();for (SearchHit hit : hits) {System.out.println(hit.getSourceAsString());}
}

(6) percentiles

percentiles表示观察值在某个百分比的最大值. 比如统计50%, 90% 和 99%的电视的最大价格.

private static void percentilesAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsPercentilesAggregationBuilder percentilesAggregationBuilder = AggregationBuilders.percentiles("percentile_aggs").field("price").percentiles(50, 90, 99);searchSourceBuilder.aggregation(percentilesAggregationBuilder);searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);Percentiles percentileAggs = searchResponse.getAggregations().get("percentile_aggs");for (Percentile next : percentileAggs) {System.out.println(next.getPercent() + ": " + next.getValue());}
}

(7) percentile_ranks

percentile_ranks表示观察值低于一定值的百分比. 比如统计价格在2000以内, 5000以内, 10000以内的电视所占比例.

private static void percentilesRanksAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsdouble[] values = {2000, 5000, 10000};PercentileRanksAggregationBuilder percentileRanksAggregationBuilder = AggregationBuilders.percentileRanks("percentile_ranks", values).field("price");searchSourceBuilder.aggregation(percentileRanksAggregationBuilder);searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);PercentileRanks percentileRanksAggs = searchResponse.getAggregations().get("percentile_ranks");for (Percentile next : percentileRanksAggs) {System.out.println(next.getValue() + ": " + next.getPercent());}
}

Bucket + Metric Aggregation

Bucket 聚合分析允许通过添加子聚合分析来进⼀步分析, 子聚合分析可以是Bucket, 也可以是Metric.

(1) 一层嵌套, 如按品牌分组, 统计价格信息

private static void singleNestAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsTermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_by_brand").field("brand.keyword");//subAggsStatsAggregationBuilder statsAggregationBuilder = AggregationBuilders.stats("stats_price").field("price");termsAggregationBuilder.subAggregation(statsAggregationBuilder);searchSourceBuilder.aggregation(termsAggregationBuilder);searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);Terms termsAggs = searchResponse.getAggregations().get("group_by_brand");List<? extends Terms.Bucket> buckets = termsAggs.getBuckets();for (Terms.Bucket bucket : buckets) {Stats statsPriceAggs = bucket.getAggregations().get("stats_price");System.out.println("count: " + statsPriceAggs.getCount());System.out.println("max: " + statsPriceAggs.getMax());System.out.println("min: " + statsPriceAggs.getMin());System.out.println("avg: " + statsPriceAggs.getAvg());System.out.println("total: " + statsPriceAggs.getSum());}
}

(2) 多层嵌套, 如先按品牌分组, 然后按颜色分组, 统计价格信息

private static void multiNestAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsTermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_by_brand").field("brand.keyword");//subAggs1TermsAggregationBuilder termsAggregationBuilder2 = AggregationBuilders.terms("group_by_color").field("color.keyword");//subAggs2StatsAggregationBuilder statsAggregationBuilder = AggregationBuilders.stats("stats_price").field("price");termsAggregationBuilder2.subAggregation(statsAggregationBuilder);termsAggregationBuilder.subAggregation(termsAggregationBuilder2);searchSourceBuilder.aggregation(termsAggregationBuilder);searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);Terms termsAggs = searchResponse.getAggregations().get("group_by_brand");List<? extends Terms.Bucket> buckets = termsAggs.getBuckets();for (Terms.Bucket bucket : buckets) {Terms groupByColorAggs = bucket.getAggregations().get("group_by_color");List<? extends Terms.Bucket> colorAggsBuckets = groupByColorAggs.getBuckets();for (Terms.Bucket colorAggsBucket : colorAggsBuckets) {Stats statsPriceAggs = colorAggsBucket.getAggregations().get("stats_price");System.out.println("count: " + statsPriceAggs.getCount());System.out.println("max: " + statsPriceAggs.getMax());System.out.println("min: " + statsPriceAggs.getMin());System.out.println("avg: " + statsPriceAggs.getAvg());System.out.println("total: " + statsPriceAggs.getSum());}}
}

Pipeline Aggregation

对聚合分析的结果再次进行聚合分析.

(1) main方法

public static void main(String[] args) throws IOException {RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200, "http")));minBucketAggs(client);statBucketAggs(client);percentilesBucketAggs(client);derivativeAggs(client);cumulativeSumAggs(client);moveFnAggs(client);client.close();
}

(2) min_bucket

平均价格最低的品牌

private static void minBucketAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsTermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_by_brand").field("brand.keyword");//subAggsAvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("avg_price").field("price");termsAggregationBuilder.subAggregation(avgAggregationBuilder);searchSourceBuilder.aggregation(termsAggregationBuilder);//sublingAggsMinBucketPipelineAggregationBuilder minBucketPipelineAggregationBuilder = PipelineAggregatorBuilders.minBucket("min_price_by_brand", "group_by_brand>avg_price");searchSourceBuilder.aggregation(minBucketPipelineAggregationBuilder);searchRequest.source(searchSourceBuilder);//resultSearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);Terms termAggs = searchResponse.getAggregations().get("group_by_brand");List<? extends Terms.Bucket> buckets = termAggs.getBuckets();for (Terms.Bucket bucket : buckets) {Avg avgPriceAggs = bucket.getAggregations().get("avg_price");System.out.println("key:" + bucket.getKeyAsString() + "\ndoc_count:" + bucket.getDocCount() + "\navg_price:" + avgPriceAggs.getValue());}System.out.println();BucketMetricValue bucketMetricValue = searchResponse.getAggregations().get("min_price_by_brand");System.out.println("value: " + bucketMetricValue.getValueAsString() + "\nkeys:" + Arrays.toString(bucketMetricValue.keys()));
}

(2) stats_bucket

所有品牌电视的平均价格的统计分析.

private static void statBucketAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsTermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_by_brand").field("brand.keyword");//subAggsAvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("avg_price").field("price");termsAggregationBuilder.subAggregation(avgAggregationBuilder);searchSourceBuilder.aggregation(termsAggregationBuilder);//sublingAggsStatsBucketPipelineAggregationBuilder statsBucketPipelineAggregationBuilder = PipelineAggregatorBuilders.statsBucket("stats_price_by_brand", "group_by_brand>avg_price");searchSourceBuilder.aggregation(statsBucketPipelineAggregationBuilder);searchRequest.source(searchSourceBuilder);//resultSearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);Terms termAggs = searchResponse.getAggregations().get("group_by_brand");List<? extends Terms.Bucket> buckets = termAggs.getBuckets();for (Terms.Bucket bucket : buckets) {Avg avgPriceAggs = bucket.getAggregations().get("avg_price");System.out.println("key:" + bucket.getKeyAsString() + "\ndoc_count:" + bucket.getDocCount() + "\navg_price:" + avgPriceAggs.getValue());}System.out.println();StatsBucket statsBucket = searchResponse.getAggregations().get("stats_price_by_brand");System.out.println("count: " + statsBucket.getCount());System.out.println("max: " + statsBucket.getMax());System.out.println("min: " + statsBucket.getMin());System.out.println("avg: " + statsBucket.getAvg());System.out.println("total: " + statsBucket.getSum());
}

(3) percentiles_bucket

所有品牌电视的平均价格的百分比统计

private static void percentilesBucketAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsTermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_by_brand").field("brand.keyword");//subAggsAvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("avg_price").field("price");termsAggregationBuilder.subAggregation(avgAggregationBuilder);searchSourceBuilder.aggregation(termsAggregationBuilder);//sublingAggsdouble[] percents = {50, 90, 99};PercentilesBucketPipelineAggregationBuilder percentilesBucketPipelineAggregationBuilder = PipelineAggregatorBuilders.percentilesBucket("percentiles_price_by_brand", "group_by_brand>avg_price").setPercents(percents);searchSourceBuilder.aggregation(percentilesBucketPipelineAggregationBuilder);searchRequest.source(searchSourceBuilder);//resultSearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);Terms termAggs = searchResponse.getAggregations().get("group_by_brand");List<? extends Terms.Bucket> buckets = termAggs.getBuckets();for (Terms.Bucket bucket : buckets) {Avg avgPriceAggs = bucket.getAggregations().get("avg_price");System.out.println("key:" + bucket.getKeyAsString() + "\ndoc_count:" + bucket.getDocCount() + "\navg_price:" + avgPriceAggs.getValue());}System.out.println();ParsedPercentilesBucket percentilesBucket = searchResponse.getAggregations().get("percentiles_price_by_brand");for (Percentile percentile : percentilesBucket) {System.out.println(percentile.getPercent() + ": " + percentile.getValue());}
}

(4) derivative

对按照sold_date分组的平均价格求导

private static void derivativeAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsDateHistogramAggregationBuilder dateHistogramAggregationBuilder = AggregationBuilders.dateHistogram("sold_date_aggs").field("sold_date").calendarInterval(DateHistogramInterval.MONTH).format("yyyy-MM-dd");//subAggsAvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("avg_price_aggs").field("price");dateHistogramAggregationBuilder.subAggregation(avgAggregationBuilder);//parentAggsDerivativePipelineAggregationBuilder derivativePipelineAggregationBuilder = PipelineAggregatorBuilders.derivative("derivative_avg_price_aggs", "avg_price_aggs");dateHistogramAggregationBuilder.subAggregation(derivativePipelineAggregationBuilder);searchSourceBuilder.aggregation(dateHistogramAggregationBuilder);searchRequest.source(searchSourceBuilder);//resultSearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);Histogram histogramAggs = searchResponse.getAggregations().get("sold_date_aggs");List<? extends Histogram.Bucket> buckets = histogramAggs.getBuckets();for (Histogram.Bucket bucket : buckets) {Avg avgPriceAggs = bucket.getAggregations().get("avg_price_aggs");System.out.println("key:" + bucket.getKeyAsString() + "\ndoc_count:" + bucket.getDocCount() + "\navg_price:" + avgPriceAggs.getValue());Derivative derivativeAvgPriceAggs = bucket.getAggregations().get("derivative_avg_price_aggs");if (derivativeAvgPriceAggs != null) {System.out.println(derivativeAvgPriceAggs.normalizedValue());}}
}

(5) cumulative_sum

对按照sold_date分组的平均价格累计求和.

private static void cumulativeSumAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsDateHistogramAggregationBuilder dateHistogramAggregationBuilder = AggregationBuilders.dateHistogram("sold_date_aggs").field("sold_date").calendarInterval(DateHistogramInterval.MONTH).format("yyyy-MM-dd");//subAggsAvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("avg_price_aggs").field("price");dateHistogramAggregationBuilder.subAggregation(avgAggregationBuilder);//parentAggsCumulativeSumPipelineAggregationBuilder cumulativeSumPipelineAggregationBuilder = PipelineAggregatorBuilders.cumulativeSum("cumulative_sum_avg_price_aggs", "avg_price_aggs");dateHistogramAggregationBuilder.subAggregation(cumulativeSumPipelineAggregationBuilder);searchSourceBuilder.aggregation(dateHistogramAggregationBuilder);searchRequest.source(searchSourceBuilder);//resultSearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);Histogram histogramAggs = searchResponse.getAggregations().get("sold_date_aggs");List<? extends Histogram.Bucket> buckets = histogramAggs.getBuckets();for (Histogram.Bucket bucket : buckets) {Avg avgPriceAggs = bucket.getAggregations().get("avg_price_aggs");System.out.println("key:" + bucket.getKeyAsString() + "\ndoc_count:" + bucket.getDocCount() + "\navg_price:" + avgPriceAggs.getValue());ParsedSimpleValue parsedSimpleValue = bucket.getAggregations().get("cumulative_sum_avg_price_aggs");if (parsedSimpleValue != null) {System.out.println("cumulative_sum_avg_price_aggs: " + parsedSimpleValue.getValueAsString());}}
}

(6) moving_fn

对按照sold_date分组的平均价格, 按时间窗口求最小平均价格

private static void moveFnAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsDateHistogramAggregationBuilder dateHistogramAggregationBuilder = AggregationBuilders.dateHistogram("sold_date_aggs").field("sold_date").calendarInterval(DateHistogramInterval.MONTH).format("yyyy-MM-dd");//subAggsAvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("avg_price_aggs").field("price");dateHistogramAggregationBuilder.subAggregation(avgAggregationBuilder);//parentAggsMovFnPipelineAggregationBuilder movFnPipelineAggregationBuilder = PipelineAggregatorBuilders.movingFunction("moving_fn_avg_price_aggs", new Script("MovingFunctions.min(values)"), "avg_price_aggs", 10);dateHistogramAggregationBuilder.subAggregation(movFnPipelineAggregationBuilder);searchSourceBuilder.aggregation(dateHistogramAggregationBuilder);searchRequest.source(searchSourceBuilder);//resultSearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);Histogram histogramAggs = searchResponse.getAggregations().get("sold_date_aggs");List<? extends Histogram.Bucket> buckets = histogramAggs.getBuckets();for (Histogram.Bucket bucket : buckets) {Avg avgPriceAggs = bucket.getAggregations().get("avg_price_aggs");System.out.println("key:" + bucket.getKeyAsString() + "\ndoc_count:" + bucket.getDocCount() + "\navg_price:" + avgPriceAggs.getValue());ParsedSimpleValue parsedSimpleValue = bucket.getAggregations().get("moving_fn_avg_price_aggs");if (parsedSimpleValue != null) {System.out.println("moving_fn_avg_price_aggs: " + parsedSimpleValue.getValueAsString());}}
}

Matrix Aggregation

支持对多个字段进行统计分析, 并为每个字段提供一个结果矩阵.

private static void moveFnAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsList<String> fields = new ArrayList<>();fields.add("price");MatrixStatsAggregationBuilder matrixStatsAggregationBuilder = MatrixStatsAggregationBuilders.matrixStats("statistics").fields(fields);searchSourceBuilder.aggregation(matrixStatsAggregationBuilder);searchRequest.source(searchSourceBuilder);//resultSearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);ParsedMatrixStats parsedMatrixStats = searchResponse.getAggregations().get("statistics");System.out.println("count: " + parsedMatrixStats.getFieldCount("price"));System.out.println("mean: " + parsedMatrixStats.getMean("price"));System.out.println("variance: " + parsedMatrixStats.getVariance("price"));System.out.println("skewness: " + parsedMatrixStats.getSkewness("price"));System.out.println("kurtosis: " + parsedMatrixStats.getKurtosis("price"));System.out.println("covariance: " + parsedMatrixStats.getCovariance("price","price"));System.out.println("correlation: " + parsedMatrixStats.getCorrelation("price", "price"));
}

聚合的作用范围及排序

Elasticsearch聚合分析的默认作用范围是query的查询结果集, 同时Elasticsearch还支持以下方式改变聚合的作用范围.

  • filter
  • post_filter
  • global

(1) main方法

public static void main(String[] args) throws IOException {RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200, "http")));query(client);filter(client);postFilter(client);global(client);orderAggs(client);orderSubAggs(client);client.close();
}

(2) query

先使用query进行过滤, 然后再进行聚合操作.

private static void query(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//queryRangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("price").gte(2000);searchSourceBuilder.query(rangeQueryBuilder);//aggsTermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_by_brand").field("brand.keyword");searchSourceBuilder.aggregation(termsAggregationBuilder);searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);//resultTerms termsAggs = searchResponse.getAggregations().get("group_by_brand");List<? extends Terms.Bucket> buckets = termsAggs.getBuckets();for (Terms.Bucket bucket : buckets) {System.out.println("key: " + bucket.getKeyAsString() + "\ndoc_count: " + bucket.getDocCount());}
}

(3) filter

聚合内的filter只对当前的子聚合语句生效.

private static void filter(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggs1RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("price").gte(2000);FilterAggregationBuilder filterAggregationBuilder = AggregationBuilders.filter("price_filter", rangeQueryBuilder);//subAggsTermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_by_brand").field("brand.keyword");filterAggregationBuilder.subAggregation(termsAggregationBuilder);searchSourceBuilder.aggregation(filterAggregationBuilder);//aggs2TermsAggregationBuilder termsAggregationBuilder2 = AggregationBuilders.terms("group_by_all_brand").field("brand.keyword");searchSourceBuilder.aggregation(termsAggregationBuilder2);searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);//resultTerms termsAggs = searchResponse.getAggregations().get("group_by_all_brand");List<? extends Terms.Bucket> buckets = termsAggs.getBuckets();for (Terms.Bucket bucket : buckets) {System.out.println("key: " + bucket.getKeyAsString() + "\ndoc_count: " + bucket.getDocCount());}Filter filterAggs = searchResponse.getAggregations().get("price_filter");Terms termsAggs2 = filterAggs.getAggregations().get("group_by_brand");List<? extends Terms.Bucket> buckets2 = termsAggs2.getBuckets();for (Terms.Bucket bucket : buckets2) {System.out.println("key: " + bucket.getKeyAsString() + "\ndoc_count: " + bucket.getDocCount());}
}

(4) post_filter

post_filter是对聚合分析后的文档再次过滤, size不要设置为0.

private static void postFilter(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();//aggsTermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_by_brand").field("brand.keyword");searchSourceBuilder.aggregation(termsAggregationBuilder);//post_filterTermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("brand.keyword", "小米");searchSourceBuilder.postFilter(termQueryBuilder);searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);//resultTerms termsAggs = searchResponse.getAggregations().get("group_by_brand");List<? extends Terms.Bucket> buckets = termsAggs.getBuckets();for (Terms.Bucket bucket : buckets) {System.out.println("key: " + bucket.getKeyAsString() + "\ndoc_count: " + bucket.getDocCount());}SearchHit[] hits = searchResponse.getHits().getHits();for (SearchHit hit : hits) {System.out.println(hit.getSourceAsString());}
}

(5) global

global无视query过滤, 对全部文档进行统计.

private static void global(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//queryRangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("price").gte(2000);searchSourceBuilder.query(rangeQueryBuilder);//aggsTermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_by_brand").field("brand.keyword");searchSourceBuilder.aggregation(termsAggregationBuilder);//subAggsAvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("avg_price").field("price");GlobalAggregationBuilder globalAggregationBuilder = AggregationBuilders.global("all_price_aggs").subAggregation(avgAggregationBuilder);searchSourceBuilder.aggregation(globalAggregationBuilder);searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);//resultTerms termsAggs = searchResponse.getAggregations().get("group_by_brand");List<? extends Terms.Bucket> buckets = termsAggs.getBuckets();for (Terms.Bucket bucket : buckets) {System.out.println("key: " + bucket.getKeyAsString() + "\ndoc_count: " + bucket.getDocCount());}Global allPriceAggs = searchResponse.getAggregations().get("all_price_aggs");Avg avgPriceAggs = allPriceAggs.getAggregations().get("avg_price");System.out.println("\ndoc_count: " + allPriceAggs.getDocCount() + "\navg_price: " + avgPriceAggs.getValue());
}

排序

通过order字段来指定排序, 默认情况下根据doc_count降序排列.

(1) 根据doc_count升序, key降序来排序

private static void orderAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsList<BucketOrder> orders = new ArrayList<>();orders.add(BucketOrder.count(true));orders.add(BucketOrder.key(false));TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_by_brand").field("brand.keyword").order(orders);searchSourceBuilder.aggregation(termsAggregationBuilder);searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);//resultTerms termsAggs = searchResponse.getAggregations().get("group_by_brand");List<? extends Terms.Bucket> buckets = termsAggs.getBuckets();for (Terms.Bucket bucket : buckets) {System.out.println("key: " + bucket.getKeyAsString() + "\ndoc_count: " + bucket.getDocCount());}
}

(2) 基于子聚合的值排序

private static void orderSubAggs(RestHighLevelClient client) throws IOException {SearchRequest searchRequest = new SearchRequest("sales");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchSourceBuilder.size(0);//aggsBucketOrder bucketOrder = BucketOrder.aggregation("avg_price", false);TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("group_by_brand").field("brand.keyword").order(bucketOrder);searchSourceBuilder.aggregation(termsAggregationBuilder);AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("avg_price").field("price");termsAggregationBuilder.subAggregation(avgAggregationBuilder);searchRequest.source(searchSourceBuilder);SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);//resultTerms termsAggs = searchResponse.getAggregations().get("group_by_brand");List<? extends Terms.Bucket> buckets = termsAggs.getBuckets();for (Terms.Bucket bucket : buckets) {Avg avgPriceAggs = bucket.getAggregations().get("avg_price");System.out.println("key: " + bucket.getKeyAsString() + "\ndoc_count: " + bucket.getDocCount() + "\navg_price: " + avgPriceAggs.getValue());}
}

Elasticsearch聚合分析Java Client相关推荐

  1. 聚合中返回source_大数据搜索与可视化分析(9)elasticsearch聚合分析Metric Aggregation...

    在上一篇文章中,我们介绍了<大数据搜索与可视化分析(8)kibana入门教程-2-Discover>,本文学习elasticsearch聚合分析,是对<大数据搜索与可视化分析(3)e ...

  2. 搜索引擎(Elasticsearch聚合分析)

    2019独角兽企业重金招聘Python工程师标准>>> 学习目标 掌握聚合分析的查询语法. 掌握指标聚合.桶聚合的用法 聚合分析简介 ES聚合分析是什么? 聚合分析是数据库中重要的功 ...

  3. ElasticSearch聚合分析

    聚合用于分析查询结果集的统计指标,我们以观看日志分析为例,介绍各种常用的ElasticSearch聚合操作. 目录: 查询用户观看视频数和观看时长 聚合分页器 查询视频uv 单个视频uv 批量查询视频 ...

  4. ElasticSearch聚合分析API

    前言 说完了ES的索引与检索,接着再介绍一个ES高级功能API – 聚合(Aggregations),聚合功能为ES注入了统计分析的血统,使用户在面对大数据提取统计指标时变得游刃有余.同样的工作,你在 ...

  5. Elasticsearch聚合分析的精准性 shard_size设置

    衡量分布式统计算法的指标有3个:数据量.实时性和精准性.任何算法只能满足其中2个指标,ES为了数据的实时性,降低了聚合分析的精准性.由于ES的数据是分布在各个分片上的,coordinating节点无法 ...

  6. ElasticSearch聚合分析API——非常详细,如果要全面了解的话,最好看这个

    转自:http://www.tianyiqingci.com/2016/04/11/esaggsapi/ 前言 说完了ES的索引与检索,接着再介绍一个ES高级功能API – 聚合(Aggregatio ...

  7. Elasticsearch[2.0] ☞ Java Client API ☞ Percolate API

    The percolator allows one to register queries against an index, and then send percolate requests whi ...

  8. Elasticsearch 5.5 SQL语句转Java Client 及相关注意事项(三)

    前言 前面两边文章已经讲述了如何搭建集群以及简单的查询基础,想看的移步: 1. Elasticsearch 5.5 入门必会(一) 2. Elasticsearch 5.5 入门必会之Java cli ...

  9. ElasticSearch实现商品搜索与聚合分析

    ElasticSearch实现商品搜索与聚合分析 Gitee地址:https://gitee.com/yuyuuyuy/micro-mall 文章目录 ElasticSearch实现商品搜索与聚合分析 ...

最新文章

  1. 十万浙企上云 阿里云崛起的最大征候?
  2. UVALive 7040 Color
  3. LeetCode刷题:滑动窗口模板以及典型例题
  4. 求任意大小矩阵的转置矩阵
  5. android数据的五种存储方式
  6. 今晚课题:2019-3-20
  7. 生活 list.php,list.php
  8. 检测与跟踪:快速视频姿态估计
  9. 微信公众平台开发者原理图解
  10. 【RobotStudio学习笔记】(七)工件坐标
  11. 22.搜索大纲及重定向(Search Synonyms and Re-directs)
  12. Maven工程pom.xml文件秒变gradle工程的命令
  13. 全国青少年编程等级考试scratch三级真题2019年3月(含题库答题软件账号)
  14. 二进制数除法 matlab,MATLAB求出不可约多项式(实现二进制加法、除法)
  15. 百度OCR图片内容识别
  16. 【英语语法入门】 第13讲 形容词
  17. auc是ROC曲线面积的直观理解
  18. Qt编写小清新风格界面
  19. 缓存路由关联的两个生命周期activated和deactivated
  20. 1080P、2k、4k、帧、fps等概念区别

热门文章

  1. 补题:吉首大学第九届"新星杯"大学生程序设计大赛 ---还差一题.jpg
  2. Win10不小心删除环境变量怎么恢复
  3. 软考信息系统监理师,2016年4月1日作业
  4. 2011年统计用区划代码和城乡划分代码
  5. 【AGM】《风色幻想:纷争—luca篇》总之这是一个没空填的坑
  6. JVM Tomcat性能实战
  7. 基于视觉的Web页面分页算法VIPS的实现源代码下载
  8. 循环经济下的商业模式
  9. Python 画图之小黄人~
  10. 二段式提交协议和三段式提交协议(2PC和3PC)