在Elasticsearch中实现统计异常检测器——第三部分
Implementing a Statistical Anomaly Detector in Elasticsearch - Part 3
欢迎来到Elasticsearch建立统计异常检测器的第三期和最后一期。作为快速回顾,让我们看下迄今为止我们所建立的:
- 第一部分,我们构造了一个pipeline聚合,它捕获了数百万个数据点,以产生前第90个百分位数的“surprise”值。它通过构造每一个元组的时间序列,计算该元组的surprise,然后为每个metric找到第90个百分位数的surprise来实现。
- 第二部分,我们使用Timelion用曲线图表示随时间推移的第90个百分位数的surprise。然后,我们使用Timelion灵活的语法来构造动态高于surpirse移动平均值三个标准差的阈值。当surpise通过这个阈值的时候,我们在图表上显示一棒条。
今天,我们将采用我们在第一部分和第二部分中构建的内容,并完全自动化使用Watcher,Elastic的实时报警和为ElasticSearch的通知插件。
随着Watcher使用胡子模板(mustache templating)和groovy脚本的能力,这是一个非常强大的报警引擎。我们可以在两个wathces中对整个Atlas系统进行编码。第一个watch将生成所有surprise数据(仅仅像第一部分),然后第二个watch将创建阈值和检查异常(像第二部分的Timelion)。
让我们开始吧!
Data Collection Watch
第一个watch的任务是按小时计算每一个metric收集前第90个surprise值,在第一部分我们创建了模拟数据收集的执行。这意味着我们可以利用该部分的大部分工作(举例,pipeline聚合)。
第一,这是完整的watch(然后我们将把它分解成一片一片):
PUT _watcher/watch/atlas
{"trigger":{"schedule":{"hourly" : { "minute" : 0 }}},"input":{"search":{"request":{"indices":"data","types": "data","body":{"query":{"filtered":{"filter":{"range":{"hour":{"gte":"now-24h"}}}}},"size":0,"aggs":{"metrics":{"terms":{"field":"metric"},"aggs":{"queries":{"terms":{"field":"query"},"aggs":{"series":{"date_histogram":{"field":"hour","interval":"hour"},"aggs":{"avg":{"avg":{"field":"value"}},"movavg":{"moving_avg":{"buckets_path":"avg","window":24,"model":"simple"}},"surprise":{"bucket_script":{"buckets_path":{"avg":"avg","movavg":"movavg"},"script":"(avg - movavg).abs()"}}}},"largest_surprise":{"max_bucket":{"buckets_path":"series.surprise"}}}},"ninetieth_surprise":{"percentiles_bucket":{"buckets_path":"queries>largest_surprise","percents":[90.0]}}}}}}},"extract":["aggregations.metrics.buckets.ninetieth_surprise","aggregations.metrics.buckets.key"]}},"actions":{"index_payload":{"transform":{"script": {"file": "hourly"}},"index" : {"index" : "atlas","doc_type" : "data"}}}
}
它很长,但不要恐慌!很大一部分是和第一部分重复的代码。让我们开始单个组件的查看:
PUT _watcher/watch/atlas
{"trigger":{"schedule":{"hourly" : { "minute" : 0 }}},
在我们请求中第一件事情是HTTP的命令。Watches被存储在你的集群中,所以我们对_watcher端点执行了一个PUT命令并且增加一个叫着“atlas”的新watch。其次,我们安排watch去执行一个"trigger"。触发器是允许watch在时间表上运行,就像一个cronjob。我们将使用小时触发器,每个小时触发一次。
在触发器之后,我们定义了watch的输入:
"input":{"search":{"request":{"indices":"data","types": "data","body":{...},"extract":["aggregations.metrics.buckets.ninetieth_surprise","aggregations.metrics.buckets.key"]}},
输入提供了watch用于做决定的数据。有各种各样的输入可用,但我们将使用一个search作为输入。该输入执行任意的Elasticsearch查询,并允许watch使用响应以供后续处理。该“request”参数定义了关于请求的详情:查询的目录/类型和请求的实体(那就是我们在第一部分创建的pipeline聚合)。和触发器相结合,我们的watch将每小时再次执行大量的pipeline聚合原始数据。
“extract”参数让我们提取我们感兴趣的细节,以简化watch的进一步加工。它在概念上和filter_path相似,只是一种减少响应冗长的过滤机制。在这里,我们使用它去提取五个top-90th 百分位数的surpise和他们的keys。
最后我们定义了一个“action”:
"actions":{"index_payload":{"transform":{"script": {"file": "hourly"}},"index" : {"index" : "atlas","doc_type" : "data"}}}
}
查询运行后执行该操作,并定义了watch的“output”。Actions能发送邮件,发送消息到Slack,发送到自定义的webhooks,等等。为了我们的目的,我们实际上希望把数据放回到Elasticsearch中。我们需要索引pipeline聚合的结果,以使我们在它上面进行预警。为此,我们设置了一个index_payload操作,它将为我们把文档索引回Elasticsearch。
在我们能索引任何事情之前,我们应该把JSON聚合响应转化成一组可索引的文档。这是通过位于我们节点上的转换脚本hourly.groovy完成的(在config/scripts/ 目录)。看起来像这样:
def docs = [];
for(item in ctx.payload.aggregations.metrics.buckets) {def doc = [metric : item.key,value : item.ninetieth_surprise.values["90.0"],execution_time: ctx.execution_time];docs << doc;
}
return [ _doc : docs ];
它的功能很简单,迭代第90个百分位的buckets,并且创建一个包含健,数值和执行时间的数组。然后将其添加到大容量数组并且在完成对buckets迭代后返回。
返回的数组是Bulk API语法,Watcher将插入到“数据”类型的“Atlas”索引中。一当该watch添加到集群中,Elasticsearch将开始收集小时surprise指标,就像我们在模拟器中做的一样。完美!让我们现在开始编写发现异常的watch。
Anomaly Detection Watch
该watch的目标是复制我们在第二部分通过Timelion做的事情。也就是,它需要构造一个在每个指标的第90个surprise移动平均线上三个标准方差的阈值。那么如果这个阈值被打破,它需要提供警告。
这个watch遵循和最后一个相似的布局,但它拥有一个更自定义的逻辑。整个watch看起来像这样:
PUT _watcher/watch/atlas_analytics
{"trigger": {"schedule": {"hourly" : { "minute" : 5 }}},"input": {"search": {"request": {"indices": "atlas","types": "data","body": {"query": {"filtered": {"filter": {"range": {"execution_time": {"gte": "now-6h"}}}}},"size": 0,"aggs": {"metrics": {"terms": {"field": "metric"},"aggs": {"series": {"date_histogram": {"field": "execution_time","interval": "hour"},"aggs": {"avg": {"avg": {"field": "value"}}}},"series_stats": {"extended_stats": {"field": "value","sigma": 3}}}}}}},"extract": ["aggregations.metrics.buckets"]}},"condition": {"script": {"file": "analytics_condition"}},"transform": {"script": {"file": "analytics_transform"}},"actions": {"index_payload": {"logging": {"text": "{{ctx.alerts}}"}},"email_alert" : {"email": {"to": "'John Doe <john.doe@example.com>'","subject": "Atlas Alerts Triggered!","body": "Metrics that appear anomalous: {{ctx.alerts}}"}}}
}
我们将一步一步的进行下去。类似于第一个watch,我们把watch置入具有特定名称“atlas_analytics”的集群中,并设置一个运行时间表。但是,这次的时间表偏移了5分钟,以允许完成第一个监视时间。
我们也使用search输入:
"input": {"search": {"request": {"indices": "atlas","types": "data","body": {"query": {"filtered": {"filter": {"range": {"execution_time": {"gte": "now-6h"}}}}},"size": 0,"aggs": {"metrics": {"terms": {"field": "metric"},"aggs": {"series": {"date_histogram": {"field": "execution_time","interval": "hour"},"aggs": {"avg": {"avg": {"field": "value"}}}},"series_stats": {"extended_stats": {"field": "value","sigma": 3}}}}}}},"extract": ["aggregations.metrics.buckets"]}},
这个搜索有一点不同。第一,它是查询/atlas/data以替代/data/data;该watch聚合上一个watch的结果,而不是采用原始数据。该查询也过滤到最近的6个小时,这允许我们将时间范围定位到特定的窗口。
一个聚合被用来构造每一个指标的data_histogram(举例,每个指标的时间序列)。在每个序列内,我们计算平均线和标准方差(确保通过sigma参数来询问统计聚合的三个标准偏差)。最后,我们只提取出buckets,因为我们不关心其余的响应。
你将注意到,在第二部分,我们使用了一个移动平均线和标准偏差去计算这些数据,在这里它是一个普通的平均值/标准差。为什么会这样?因为watch每个小时执行,时间窗口将自然的滚动数据。而不像Timelion的实现--它在一张图表中显示了时间内的所有点--我们仅仅关系一个小时内生成的数据点,所以一个简单的平均线就可以了。
所以在这点上,我们的watch拥有所有必要的信息去标记一个异常,但是我们需要运行一些自定义的逻辑去把它们绑定在一起。那就是在接下来condition句子中将发生的事情。
"condition": {"script": {"file": "analytics_condition"}},
条件是行动的守门员:假如条件评估为true,则行动执行。我们的条件使用另一种groovy脚本,analytics_condition.groovy:
def docs = [];
def status = false;
for(item in ctx.payload.aggregations.metrics.buckets) {def std_upper = Double.valueOf(item.series_stats.std_deviation_bounds.upper);def avg = Double.valueOf(item.series.buckets.last().avg.value);if (std_upper == Double.NaN || avg == Double.NaN) {continue;}if (avg > std_upper) {status = true;break;}
}
return status;
该脚本非常简单:提取标准差的上限(其由本地聚合提供)和平均线,然后查看平均线是否大于上限。假如平均线确实比较大,设置一个标记并返回true。
在该点上,假如返回false的条件返回为空,该watch结束:没有异常。假如它返回true,我们继续向transform子句:
"transform": {"script": {"file": "analytics_transform"}},
转换可以用于修改,丰富和操作数据。我们将使用转换去整理数据,如此,一系列的警告可以被简单的嵌入到电子邮件中。再者,我们使用groovy 脚本去实现转换,这一个叫着analytics_transform.groovy
:
def alerts = [];
for(item in ctx.payload.aggregations.metrics.buckets) {def std_upper = Double.valueOf(item.series_stats.std_deviation_bounds.upper);def avg = Double.valueOf(item.series.buckets.last().avg.value);if (Double.isNaN(std_upper) || Double.isNaN(avg)) {continue;}if (avg > std_upper) {alerts << item.id;}
}
return [alerts: alerts];
看起来很相似?这与在condition子句中使用的analytics_condition.groovy脚本基本一样。唯一的不同是把任意异常指标添加到数组中替换为改变一个标记。然后返回该数组,我们可以在最终的电子邮件操作中使用该数组。
"actions": {"index_payload": {"logging": {"text": "{{ctx.alerts}}"}},"email_alert" : {"email": {"to": "'John Doe <john.doe@example.com>'","subject": "Atlas Alerts Triggered!","body": "Metrics that appear anomalous: {{ctx.alerts}}"}}}
}
在watch的最后部分,我们执行了行动。第一,我们记录异常(为了调试目的)。我们还定义了email_alert,它将发送一个email。该email的正文可以使用mustache(胡子)进行模板化,这是我们如何可以嵌入报警列表(通过{{ctx.alerts}},我们在转换步骤中构建的数组)。
Conclusion
就是这样!监视非常的长,但当你一步一步的跟着监视下来还是相对简单的。所以困难的工作我们在第一部分和第二部分都做了,移动逻辑到监视器是微不足道的。
一旦这些监控被激活,该集群将按小时自动开始监视和报警。它是很好调的,因为监视能在任意时间通过调用API来修改。你可以把间隔变长或变短,扩展每个聚合过程中的数据量,修改任意聚合的设置,改变pipeline聚合中的移动平均线类型,引入全新的指标,等等。一旦运行和生产,这是一个非常简单的系统调整。
我希望你喜欢这三部分系列。这是一个非常有趣的项目,并且真正的帮助我懂得pipeline聚合的能力,Timelion和Watcher提供的好处(特别是组合时)。直到下一次!
原文地址:https://www.elastic.co/blog/implementing-a-statistical-anomaly-detector-part-3
转载于:https://www.cnblogs.com/benjiming/p/7152562.html
在Elasticsearch中实现统计异常检测器——第三部分相关推荐
- 在Elasticsearch中实现统计异常检测器——第二部分
Implementing a statistical anomaly detector in Elasticsearch - Part 2 上一周,我们建立了一个pipeline聚合,将数千个数据点分 ...
- 在Elasticsearch中实现统计异常检测器——第一部分
Implementing a Statistical Anomaly Detector in Elasticsearch - Part 1 该图显示了4500万个数据点的最小/最大/平均值(超过600 ...
- 在Elasticsearch中对 text 类型的字段进行聚合异常Fielddata is disabled,Set fielddata=true
在Elasticsearch中对 text 类型的字段进行聚合异常Fielddata is disabled,Set fielddata=true 参考文章: (1)在Elasticsearch中对 ...
- Elasticsearch中的Zen Discovery选主流程
文章目录 背景 为什么使用主从模式? 选举算法 什么时候触发选主? 选主过程 选举临时Master 投票与得票的实现 确立Master或加入集群 选举完成 elasticsearch中的Discove ...
- elasticsearch(7)聚合统计-分组聚合
原文:https://blog.csdn.net/sz85850597/article/details/82858831 elasticsearch(7)聚合统计-分组聚合 2018年09月26日 2 ...
- 无监督学习:异常检测与剔除(局部异常因子法 SVM异常检测器)
1.前言 前面介绍的都是有监督学习的回归和分类算法.有监督学习是指对输入和输出都有成对出现的训练样本{(xi,yi)}.在这里,主要介绍在没有输出信息时,只利用输入样本{xi}的信息进行无监督学习的方 ...
- 在Elasticsearch中查询Term Vectors词条向量信息
这篇文章有点深度,可能需要一些Lucene或者全文检索的背景.由于我也很久没有看过Lucene了,有些地方理解的不对还请多多指正. 更多内容还请参考整理的ELK教程 关于Term Vectors 额, ...
- nginx日志中$request_time时间异常问题排查
女主宣言 nginx日志分为access_log和error_log,可以用于业务的访问统计.服务排障等.我们可以自定义设置log_format,来记录关注的各项指标.本文主要讲述了一次nginx日志 ...
- 【Elasticsearch】在Elasticsearch中查询Term Vectors词条向量信息
1.概述 转载:https://www.cnblogs.com/xing901022/p/5348737.html 关于Term Vectors 额,对于这个专业词汇,暂且就叫做词条向量吧,因为实在想 ...
最新文章
- Jira接入钉钉机器人
- Log4j 1使用教程
- python3.6打包成exe可执行文件、已解决方案_Python 3.6打包成EXE可执行程序的实现...
- readonly和const的区别
- java登录界面命令_Java命令行界面(第25部分):JCommando
- 线性表的顺序存储的基本操作
- N点虚拟主机管理系统安装图解
- 如何找出光纤微米级别的脏污?女朋友的一个举动给了我灵感
- STM32工作笔记0039---认识电路图中的DS203,MS,L等
- while循环 for循环的理解
- oracle删除重复字段数据库,用Oracle的分析函数删除重复的数据,
- [jQuery]30+ Brand New jQuery Plugins To Change the Look and Feel of Your Website
- You Don't Have To Say Goodbye
- 用强化学习来玩Atari游戏(基于Tensorflow的深度Q学习模型)
- Linux下载安装JDK
- Excel批量根据银行卡号查询银行卡的详细信息
- android 360开机启动,手机360设置开机启动项
- 巴西柔术第一课:骑乘式上位技术
- Kalibr标定时卡在Extracting calibration target corners的问题
- 推荐系统9---AFM与DIN模型(推荐系统遇上注意力机制)