Elasticsearch:rollup - 索引管理
汇总作业 (rollup jobs)是一项定期任务,它将来自索引模式指定的索引中的数据进行汇总,然后将其汇总到新的索引中。 汇总索引是紧凑存储数月或数年历史数据以供可视化和报告使用的好方法。用到 rollup 的情况是我们有很多的历史数据,而且通常会比较大。通过使用 rollup 功能,我们可以把很多针对大量数据的统计变为针对经过 rollup 后的索引操作,从而使得数据的统计更加有效。在实际的应用中:
- 在很多的情况下,保留历史数据不是一种最佳的选择。这是因为时序数据随着时间的推移,它们的价值没有那么高,需要删除这些数据以节省成本
- 但是我们还是希望能够保留它们的统计数据用于分析
Elasticsearch 自6.3 版本推出 rollup 功能,它可以帮我们:
- 保留旧数据,并以一种紧凑和聚合的方式来存储
- 仅仅保留我们感兴趣的数据
在下面,我们来用一个具体的例子来展示如何使用 rollup 的。
准备数据
我们首先需要找到一个数据比较大一点的索引。我们可以参考我之前的文章 “Logstash 入门教程 (二)” 。在那篇文章中,我们直接跳到文章的最后一节把整个文件都导入到 Elasticticsearch 中。
我们可以看到我们的 cars 索引有达到 201M 的大小,而且它的总文档数达到 30 万。在接下来我们将使用 rollup 的功能对这个索引进行处理。在数据导入后,我们必须创建一个 index pattern,并在 Discover 中进行显示:
从上面的图中,我们可以看出来,整个索引的数据是从 2019.5.9 到 2019.6.07 进行采集的。
创建 rollup job
我们打开 Kibana 界面:
点击 Create rollup job 按钮:
从上面我们可以看出来这个 rollup job 针对一个时间系列的索引。把屏幕向下滚动。
从上面我们可以看出来:rollup 是一个在后台不断运行的一个任务。它会周期性地定时做这项工作,以使得最新已有的数据得到处理。在上面,我选择在每个时钟过15分钟时进行一次 rollup,比如在1:15分做一次,2:15分做一次,3:15分做一次,依次类推。这个依赖于你自己索引的大小及项目的性质决定的。
点击上面的 Next 按钮:
这个是针对 Date histogram 的配置。点击 Next 按钮:
这个是针对 terms 的 选择。点击 Add terms fields 按钮:
我们选择上面的 geoip.country_code2:
按照同样的方法,我们添加 agent.hostname:
点击 Next 按钮:
这个页面时可选项。如果我们感兴趣的话,那么我们点击 Add histogram fields:
我们选择 Bytes 字段:
点击 Next 按钮:
这个是对 Metrics 的配置。我们点击 Add metrics fields 按钮:
我们选择 bytes。由于 Metrics 只是针对数值类型的字段,在上面我们可以看到所有的字段都是数值类型的。
我们接着勾上我们喜欢的 metrics 项。再点击 Next 按钮:
在这个页面我们可以看到这个 rollup 的 概览。如果你觉得不太满意,你可以点击 Back 按钮然后再进行重新配置。如果满意的话,我们点击 Save 按钮。如果你还想马上就开始这个 job 的话,勾上 Start job now。我们点击 Save,并勾上 Start job now:
上面的状态显示这个 job 已经开始工作了。我们可以点击 Manage 按钮来对这个 job 进行管理:
它可以让我们停止这个 job 或者克隆这个 job。目前,我们既不想停止,也不想克隆。我们在这个界面还可以点击上面的几个 tab 来查看这个 job 的详细信息。
特别有意思的是我们甚至可以看到这个 job 的 JSON 表达方式。
退出这个 Dialog:
从上面,我们可以看出来我们的 Job 正在运行中。
在 Kibana 中进行统计
我在Kibana中,通过如下的命令来查看所有的索引:
GET _cat/indices
结果显示:
我们可以看到一个新的索引:apache_rollup。它的文件显示的非常之小。只有 21.8M。是不是觉得不可思议啊。它相比之前的那个apache_elastic_example 来说,小了非常多。
我们通过对这个索引来对我们所关心的数据进行统计分析。当我们对这个 rollup 的索引进行分析时,参照链接Rollup search | Elasticsearch Guide [master] | Elastic,我们可以对它进行如下的方式的搜索:
我们经过 rollup 的处理后,那么对于我们的数据的统计来说有没有什么影响呢?
我们先来做一些检测:
找出最大值
GET apache_rollup/_rollup_search
{"size": 0,"aggs": {"my_max": {"max": {"field": "bytes"}}}
}
返回结果:
{"took" : 9,"timed_out" : false,"terminated_early" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 0,"relation" : "eq"},"max_score" : 0.0,"hits" : [ ]},"aggregations" : {"my_max" : {"value" : 8.2090432E7}}
}
上面是根据 rollup 的索引得到的结果。我们来使用最原始的索引:
GET apache_elastic_example/_search
{"size": 0,"aggs": {"my_max": {"max": {"field": "bytes"}}}
}
{"took" : 0,"timed_out" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 10000,"relation" : "gte"},"max_score" : null,"hits" : [ ]},"aggregations" : {"my_max" : {"value" : 8.209043E7}}
}
显然返回的结果是一样的。
找出最小值
GET apache_rollup/_rollup_search
{"size": 0,"aggs": {"my_min": {"min": {"field": "bytes"}}}
}
返回的结果:
{"took" : 7,"timed_out" : false,"terminated_early" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 0,"relation" : "eq"},"max_score" : 0.0,"hits" : [ ]},"aggregations" : {"my_min" : {"value" : 1.0}}
}
使用原始的数据:
GET apache_elastic_example/_search
{"size": 0,"aggs": {"my_min": {"min": {"field": "bytes"}}}
}
{"took" : 0,"timed_out" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 10000,"relation" : "gte"},"max_score" : null,"hits" : [ ]},"aggregations" : {"my_min" : {"value" : 1.0}}
}
返回的结果是一样的。
找出平均值
GET apache_rollup/_rollup_search
{"size": 0,"aggs": {"my_avg": {"avg": {"field": "bytes"}}}
}
{"took" : 13,"timed_out" : false,"terminated_early" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 0,"relation" : "eq"},"max_score" : 0.0,"hits" : [ ]},"aggregations" : {"my_avg" : {"value" : 2372996.9378802367}}
}
使用原始的数据:
GET apache_elastic_example/_search
{"size": 0,"aggs": {"my_avg": {"avg": {"field": "bytes"}}}
}
返回的结果是:
{"took" : 0,"timed_out" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 10000,"relation" : "gte"},"max_score" : null,"hits" : [ ]},"aggregations" : {"my_avg" : {"value" : 2372996.957136324}}
}
结果的差异是非常之小的。
找出文档最多的前5个国家的名称
GET apache_rollup/_rollup_search
{"size":0,"aggs" : {"countries": {"terms": {"field": "geoip.country_code2.keyword","size": 5}}}
}
"aggregations" : {"countries" : {"doc_count_error_upper_bound" : 0,"sum_other_doc_count" : 66600,"buckets" : [{"key" : "US","doc_count" : 122800},{"key" : "FR","doc_count" : 19226},{"key" : "DE","doc_count" : 17415},{"key" : "NL","doc_count" : 14720},{"key" : "CN","doc_count" : 14581}]}}
使用最原始的数据:
GET apache_elastic_example/_search
{"size":0,"aggs" : {"countries": {"terms": {"field": "geoip.country_code2.keyword","size": 5}}}
}
"aggregations" : {"countries" : {"doc_count_error_upper_bound" : 0,"sum_other_doc_count" : 109662,"buckets" : [{"key" : "US","doc_count" : 122800},{"key" : "FR","doc_count" : 19226},{"key" : "DE","doc_count" : 17415},{"key" : "NL","doc_count" : 14720},{"key" : "CN","doc_count" : 14581}]}}
这两个的统计结果是一样的。
更为复杂的统计
我们可以尝试一下的统计:
GET apache_rollup/_rollup_search
{"size": 0,"query": {"term": {"agent.hostname.keyword" : {"value": "liuxg"}}},"aggs" :{"daily" : {"date_histogram" :{"field": "@timestamp","calendar_interval": "1d","time_zone": "UTC"},"aggs": {"avg_byete" : {"avg": {"field": "bytes"}}}}}
}
上面是对 rollup 索引进行的统计:
"aggregations" : {"daily" : {"meta" : { },"buckets" : [{"key_as_string" : "2019-05-08T00:00:00.000Z","key" : 1557273600000,"doc_count" : 688,"avg_byete" : {"value" : 126608.41791044777}},{"key_as_string" : "2019-05-09T00:00:00.000Z","key" : 1557360000000,"doc_count" : 7721,"avg_byete" : {"value" : 317026.4027212793}},{"key_as_string" : "2019-05-10T00:00:00.000Z","key" : 1557446400000,"doc_count" : 9044,"avg_byete" : {"value" : 551951.1379390019}},{"key_as_string" : "2019-05-11T00:00:00.000Z","key" : 1557532800000,"doc_count" : 9412,"avg_byete" : {"value" : 261766.84161836826}},...
}
我们对最原始的索引来进行统计:
GET apache_elastic_example/_search
{"size": 0,"query": {"term": {"agent.hostname.keyword": {"value": "liuxg"}}},"aggs": {"daily": {"date_histogram": {"field": "@timestamp","calendar_interval": "1d","time_zone": "UTC"},"aggs": {"avg_bytes": {"avg": {"field": "bytes"}}}}}
}
显示结果:
"aggregations" : {"daily" : {"buckets" : [{"key_as_string" : "2019-05-08T00:00:00.000Z","key" : 1557273600000,"doc_count" : 688,"avg_bytes" : {"value" : 126608.41940298508}},{"key_as_string" : "2019-05-09T00:00:00.000Z","key" : 1557360000000,"doc_count" : 7721,"avg_bytes" : {"value" : 317026.4042642727}},{"key_as_string" : "2019-05-10T00:00:00.000Z","key" : 1557446400000,"doc_count" : 9044,"avg_bytes" : {"value" : 551951.1417513863}},{"key_as_string" : "2019-05-11T00:00:00.000Z","key" : 1557532800000,"doc_count" : 9412,"avg_bytes" : {"value" : 261766.84351315204}},...}
从上面的结果显示,通过这两种方式进行的统计的结果非常一致。
从某种程度上讲,我们甚至可以通过 Index cycle management 的方法只保留一段时间的数据(比如最近的6个月的数据),而通过 rollup 方法继续可以对之前的数据进行预设的统计。
通过 API 的方法实现
上面的方法通过界面非常直观。但是 Elastic 也提供 API 的方法来设置。比如:
PUT _rollup/job/apache_rollup_job2
{"index_pattern": "apache_elastic_example*","rollup_index": "apache_rollup2","cron": "*/10 * * * * ?","page_size": 1000,"groups": {"date_histogram": {"field": "@timestamp","fixed_interval": "1h","delay": "1d","time_zone": "UTC"},"terms": {"fields": ["geoip.country_code2.keyword","agent.hostname.keyword"]}},"metrics": [{"field": "bytes","metrics": ["min","max","sum","avg"]}]
}
在上面,我们设置了几乎和界面一样的实现,只不过在这里,我们每隔10分钟来进行一次 rollup。在这里,我们把数据存于到 apache_rollup2 这个索引中。
这个 apache_rollup2 的大小更小。但是它和 apache_rollup 是一样的效果。我们可以对这个索引做同样的搜索:
GET apache_rollup2/_rollup_search
{"size": 0,"aggs": {"my_avg": {"avg": {"field": "bytes"}}}
}
"took" : 2,"timed_out" : false,"terminated_early" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 0,"relation" : "eq"},"max_score" : 0.0,"hits" : [ ]},"aggregations" : {"my_avg" : {"value" : 2372996.9558440386}}
}
使用最原始的索引:
GET apache_elastic_example/_search
{"size": 0,"aggs": {"my_avg": {"avg": {"field": "bytes"}}}
}
{"took" : 0,"timed_out" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 10000,"relation" : "gte"},"max_score" : null,"hits" : [ ]},"aggregations" : {"my_avg" : {"value" : 2372996.957136324}}
}
关于其它的比较,我这里就不详述了。你们可以自己来实践。
如果我们想删除一个 rollup 的任务,我们必须先停止,然后,删除它的 rollup 索引,再进行删除的动作。这些有的可以在之前的那个UI里通过 Manage 按钮来实现。通过 API 的方式来删除一个 rollup,请参考如下的步骤:
POST _rollup/job/apache_rollup_job/_stop?wait_for_completion=true&timeout=10s
DELETE apache_rollup
DELETE _rollup/job/apache_rollup_job
参考:
【1】https://www.youtube.com/watch?v=I5-9x_pQ-Y0&t=302s&pbjreload=10
【2】Get rollup jobs API | Elasticsearch Guide [7.16] | Elastic
【3】Rollup search | Elasticsearch Guide [master] | Elastic
Elasticsearch:rollup - 索引管理相关推荐
- elasticsearch 索引_Elasticsearch系列---索引管理
概要 Elasticsearch让索引创建变得非常简单,只要索引一条新的数据,索引会自动创建出来,但随着数据量的增加,我们开始有了索引优化和搜索优化的需求之后,就会发现自动创建的索引在某些方面不能非常 ...
- 深入理解ElasticSearch(八):索引管理
索引管理 1.创建一个索引 到目前为止, 我们已经通过索引一篇文档创建了一个新的索引 .这个索引采用的是默认的配置,新的字段通过动态映射的方式被添加到类型映射.现在我们需要对这个建立索引的过程做更多的 ...
- ElasticSearch最全详细使用教程:入门、索引管理、映射详解、索引别名、分词器、文档管理、路由、搜索详解...
墨墨导读:之前我们分享了ElasticSearch最全详细使用教程:入门.索引管理.映射详解,本文详细介绍ElasticSearch的索引别名.分词器.文档管理.路由.搜索详解. 一.索引别名 1. ...
- ElasticSearch最全详细使用教程:入门、索引管理、映射详解
墨墨导读:本文介绍了ElasticSearch的必备知识:从入门.索引管理到映射详解. 一.快速入门 1. 查看集群的健康状况http://localhost:9200/_cat http://loc ...
- ElasticSearch核心基础之索引管理
一 索引管理 1.1 创建索引 # 建立索引的时候,我们可以设置主分片和备份分片的数量通过setting字段number_of_shards和number_of_replicas字段设置 # 对于ES ...
- elasticsearch删除索引_一文带您了解 Elasticsearch 中,如何进行索引管理(图文教程)
在 Elasticsearch 中,索引是一个非常重要的概念,它是具有相同结构的文档集合.类比关系型数据库,比如 Mysql, 你可以把它对标看成和库同级别的概念. 今天小哈将带着大家了解, 在 El ...
- ElasticSearch权威指南学习(索引管理)
创建索引 当我们需要确保索引被创建在适当数量的分片上,在索引数据之前设置好分析器和类型映射. 手动创建索引,在请求中加入所有设置和类型映射,如下所示: PUT /my_index {"set ...
- Elasticsearch ILM 索引生命周期管理常见坑及避坑指南
之前的博文和视频都讲过 ILM 索引生命周期管理.但从近期的反馈和我自己的实战经验看,依然会有很多坑. 现将我自己和大家遇到的常见坑汇集如下,希望能让后来小伙伴少走弯路. 少啰嗦,直接上干货. 坑1: ...
- 如何使用 Filebeat,ILM 和数据流跨多个索引管理 Elasticsearch 数据
索引是 Elasticsearch 的重要组成部分. 每个索引使你的数据集保持分离和有条理,从而使你可以灵活地以不同方式对待每个数据集,并使其在整个生命周期中都易于管理. 通过提供摄入方法和管理工具来 ...
最新文章
- 2018 AI产业投融资分析:热钱涌向何处,谁的“寒冬”将至?
- Servlet--05--HttpServletRequest; HttpServletResponse
- 【网工必备】网络端口号大全......
- c# 如何设置透明画刷
- matlab给定四点求交点,Matlab有关曲线求交点程序分享
- String、StringBuffer比较
- 湖北高校实用的大数据平台,专业的高校大数据实训平台解决方案,波若高校实训平台...
- OpenCV 中的绘制功能
- 软件之间的数据格式对接往往将_XRD数据格式的转换和TXT格式数据正确导入Jade的办法...
- Zabbix3.0 安装Graphtree
- 对软件工程实践课程的预定目标
- Android下磁盘分区表损坏,电脑硬盘分区表损坏怎么修复?电脑硬盘分区表损坏的修复方法...
- 【算法】数独解题——用python代码
- 前端基础:通过HTML技术布局《李白诗词赏析》
- 身份证扫描件用手机怎么弄?手把手教你生成电子身份证
- mysql 执行存储过程
- 网络基本功(二十四):Wireshark抓包实例分析TCP重传
- 计算机无法自动排列,win10系统文件夹不自动排列的解决方法
- 用Python实现自动化测试
- Materials and Manufacturing Processes期刊投稿经验分享