Implementing a Statistical Anomaly Detector in Elasticsearch - Part 3

  欢迎来到Elasticsearch建立统计异常检测器的第三期和最后一期。作为快速回顾,让我们看下迄今为止我们所建立的:

  • 第一部分,我们构造了一个pipeline聚合,它捕获了数百万个数据点,以产生前第90个百分位数的“surprise”值。它通过构造每一个元组的时间序列,计算该元组的surprise,然后为每个metric找到第90个百分位数的surprise来实现。
  • 第二部分,我们使用Timelion用曲线图表示随时间推移的第90个百分位数的surprise。然后,我们使用Timelion灵活的语法来构造动态高于surpirse移动平均值三个标准差的阈值。当surpise通过这个阈值的时候,我们在图表上显示一棒条。

  今天,我们将采用我们在第一部分和第二部分中构建的内容,并完全自动化使用Watcher,Elastic的实时报警和为ElasticSearch的通知插件。

  随着Watcher使用胡子模板(mustache templating)和groovy脚本的能力,这是一个非常强大的报警引擎。我们可以在两个wathces中对整个Atlas系统进行编码。第一个watch将生成所有surprise数据(仅仅像第一部分),然后第二个watch将创建阈值和检查异常(像第二部分的Timelion)。

  让我们开始吧!

Data Collection Watch

  第一个watch的任务是按小时计算每一个metric收集前第90个surprise值,在第一部分我们创建了模拟数据收集的执行。这意味着我们可以利用该部分的大部分工作(举例,pipeline聚合)。

  第一,这是完整的watch(然后我们将把它分解成一片一片):

PUT _watcher/watch/atlas
{"trigger":{"schedule":{"hourly" : { "minute" : 0 }}},"input":{"search":{"request":{"indices":"data","types": "data","body":{"query":{"filtered":{"filter":{"range":{"hour":{"gte":"now-24h"}}}}},"size":0,"aggs":{"metrics":{"terms":{"field":"metric"},"aggs":{"queries":{"terms":{"field":"query"},"aggs":{"series":{"date_histogram":{"field":"hour","interval":"hour"},"aggs":{"avg":{"avg":{"field":"value"}},"movavg":{"moving_avg":{"buckets_path":"avg","window":24,"model":"simple"}},"surprise":{"bucket_script":{"buckets_path":{"avg":"avg","movavg":"movavg"},"script":"(avg - movavg).abs()"}}}},"largest_surprise":{"max_bucket":{"buckets_path":"series.surprise"}}}},"ninetieth_surprise":{"percentiles_bucket":{"buckets_path":"queries>largest_surprise","percents":[90.0]}}}}}}},"extract":["aggregations.metrics.buckets.ninetieth_surprise","aggregations.metrics.buckets.key"]}},"actions":{"index_payload":{"transform":{"script": {"file": "hourly"}},"index" : {"index" : "atlas","doc_type" : "data"}}}
}

  它很长,但不要恐慌!很大一部分是和第一部分重复的代码。让我们开始单个组件的查看:

PUT _watcher/watch/atlas
{"trigger":{"schedule":{"hourly" : { "minute" : 0 }}},

  在我们请求中第一件事情是HTTP的命令。Watches被存储在你的集群中,所以我们对_watcher端点执行了一个PUT命令并且增加一个叫着“atlas”的新watch。其次,我们安排watch去执行一个"trigger"。触发器是允许watch在时间表上运行,就像一个cronjob。我们将使用小时触发器,每个小时触发一次。

  在触发器之后,我们定义了watch的输入:

  "input":{"search":{"request":{"indices":"data","types": "data","body":{...},"extract":["aggregations.metrics.buckets.ninetieth_surprise","aggregations.metrics.buckets.key"]}},

  输入提供了watch用于做决定的数据。有各种各样的输入可用,但我们将使用一个search作为输入。该输入执行任意的Elasticsearch查询,并允许watch使用响应以供后续处理。该“request”参数定义了关于请求的详情:查询的目录/类型和请求的实体(那就是我们在第一部分创建的pipeline聚合)。和触发器相结合,我们的watch将每小时再次执行大量的pipeline聚合原始数据。

  “extract”参数让我们提取我们感兴趣的细节,以简化watch的进一步加工。它在概念上和filter_path相似,只是一种减少响应冗长的过滤机制。在这里,我们使用它去提取五个top-90th 百分位数的surpise和他们的keys。

  最后我们定义了一个“action”:

  "actions":{"index_payload":{"transform":{"script": {"file": "hourly"}},"index" : {"index" : "atlas","doc_type" : "data"}}}
}

  查询运行后执行该操作,并定义了watch的“output”。Actions能发送邮件,发送消息到Slack,发送到自定义的webhooks,等等。为了我们的目的,我们实际上希望把数据放回到Elasticsearch中。我们需要索引pipeline聚合的结果,以使我们在它上面进行预警。为此,我们设置了一个index_payload操作,它将为我们把文档索引回Elasticsearch。

  在我们能索引任何事情之前,我们应该把JSON聚合响应转化成一组可索引的文档。这是通过位于我们节点上的转换脚本hourly.groovy完成的(在config/scripts/ 目录)。看起来像这样:

def docs = [];
for(item in ctx.payload.aggregations.metrics.buckets) {def doc = [metric : item.key,value : item.ninetieth_surprise.values["90.0"],execution_time: ctx.execution_time];docs << doc;
}
return [ _doc : docs ];

  它的功能很简单,迭代第90个百分位的buckets,并且创建一个包含健,数值和执行时间的数组。然后将其添加到大容量数组并且在完成对buckets迭代后返回。

  返回的数组是Bulk API语法,Watcher将插入到“数据”类型的“Atlas”索引中。一当该watch添加到集群中,Elasticsearch将开始收集小时surprise指标,就像我们在模拟器中做的一样。完美!让我们现在开始编写发现异常的watch。

Anomaly Detection Watch

  该watch的目标是复制我们在第二部分通过Timelion做的事情。也就是,它需要构造一个在每个指标的第90个surprise移动平均线上三个标准方差的阈值。那么如果这个阈值被打破,它需要提供警告。

  这个watch遵循和最后一个相似的布局,但它拥有一个更自定义的逻辑。整个watch看起来像这样:

PUT _watcher/watch/atlas_analytics
{"trigger": {"schedule": {"hourly" : { "minute" : 5 }}},"input": {"search": {"request": {"indices": "atlas","types": "data","body": {"query": {"filtered": {"filter": {"range": {"execution_time": {"gte": "now-6h"}}}}},"size": 0,"aggs": {"metrics": {"terms": {"field": "metric"},"aggs": {"series": {"date_histogram": {"field": "execution_time","interval": "hour"},"aggs": {"avg": {"avg": {"field": "value"}}}},"series_stats": {"extended_stats": {"field": "value","sigma": 3}}}}}}},"extract": ["aggregations.metrics.buckets"]}},"condition": {"script": {"file": "analytics_condition"}},"transform": {"script": {"file": "analytics_transform"}},"actions": {"index_payload": {"logging": {"text": "{{ctx.alerts}}"}},"email_alert" : {"email": {"to": "'John Doe <john.doe@example.com>'","subject": "Atlas Alerts Triggered!","body": "Metrics that appear anomalous: {{ctx.alerts}}"}}}
}

  我们将一步一步的进行下去。类似于第一个watch,我们把watch置入具有特定名称“atlas_analytics”的集群中,并设置一个运行时间表。但是,这次的时间表偏移了5分钟,以允许完成第一个监视时间。

  我们也使用search输入:

 "input": {"search": {"request": {"indices": "atlas","types": "data","body": {"query": {"filtered": {"filter": {"range": {"execution_time": {"gte": "now-6h"}}}}},"size": 0,"aggs": {"metrics": {"terms": {"field": "metric"},"aggs": {"series": {"date_histogram": {"field": "execution_time","interval": "hour"},"aggs": {"avg": {"avg": {"field": "value"}}}},"series_stats": {"extended_stats": {"field": "value","sigma": 3}}}}}}},"extract": ["aggregations.metrics.buckets"]}},

  这个搜索有一点不同。第一,它是查询/atlas/data以替代/data/data;该watch聚合上一个watch的结果,而不是采用原始数据。该查询也过滤到最近的6个小时,这允许我们将时间范围定位到特定的窗口。

  一个聚合被用来构造每一个指标的data_histogram(举例,每个指标的时间序列)。在每个序列内,我们计算平均线和标准方差(确保通过sigma参数来询问统计聚合的三个标准偏差)。最后,我们只提取出buckets,因为我们不关心其余的响应。

  你将注意到,在第二部分,我们使用了一个移动平均线和标准偏差去计算这些数据,在这里它是一个普通的平均值/标准差。为什么会这样?因为watch每个小时执行,时间窗口将自然的滚动数据。而不像Timelion的实现--它在一张图表中显示了时间内的所有点--我们仅仅关系一个小时内生成的数据点,所以一个简单的平均线就可以了。

  所以在这点上,我们的watch拥有所有必要的信息去标记一个异常,但是我们需要运行一些自定义的逻辑去把它们绑定在一起。那就是在接下来condition句子中将发生的事情。

 "condition": {"script": {"file": "analytics_condition"}},

 条件是行动的守门员:假如条件评估为true,则行动执行。我们的条件使用另一种groovy脚本,analytics_condition.groovy:

def docs = [];
def status = false;
for(item in ctx.payload.aggregations.metrics.buckets) {def std_upper = Double.valueOf(item.series_stats.std_deviation_bounds.upper);def avg = Double.valueOf(item.series.buckets.last().avg.value);if (std_upper == Double.NaN || avg == Double.NaN) {continue;}if (avg > std_upper) {status = true;break;}
}
return status;

  该脚本非常简单:提取标准差的上限(其由本地聚合提供)和平均线,然后查看平均线是否大于上限。假如平均线确实比较大,设置一个标记并返回true。

  在该点上,假如返回false的条件返回为空,该watch结束:没有异常。假如它返回true,我们继续向transform子句:

   "transform": {"script": {"file": "analytics_transform"}},

   转换可以用于修改,丰富和操作数据。我们将使用转换去整理数据,如此,一系列的警告可以被简单的嵌入到电子邮件中。再者,我们使用groovy 脚本去实现转换,这一个叫着analytics_transform.groovy:

def alerts = [];
for(item in ctx.payload.aggregations.metrics.buckets) {def std_upper = Double.valueOf(item.series_stats.std_deviation_bounds.upper);def avg = Double.valueOf(item.series.buckets.last().avg.value);if (Double.isNaN(std_upper) || Double.isNaN(avg)) {continue;}if (avg > std_upper) {alerts << item.id;}
}
return [alerts: alerts];

  看起来很相似?这与在condition子句中使用的analytics_condition.groovy脚本基本一样。唯一的不同是把任意异常指标添加到数组中替换为改变一个标记。然后返回该数组,我们可以在最终的电子邮件操作中使用该数组。

 "actions": {"index_payload": {"logging": {"text": "{{ctx.alerts}}"}},"email_alert" : {"email": {"to": "'John Doe <john.doe@example.com>'","subject": "Atlas Alerts Triggered!","body": "Metrics that appear anomalous: {{ctx.alerts}}"}}}
}

  在watch的最后部分,我们执行了行动。第一,我们记录异常(为了调试目的)。我们还定义了email_alert,它将发送一个email。该email的正文可以使用mustache(胡子)进行模板化,这是我们如何可以嵌入报警列表(通过{{ctx.alerts}},我们在转换步骤中构建的数组)。

Conclusion

  就是这样!监视非常的长,但当你一步一步的跟着监视下来还是相对简单的。所以困难的工作我们在第一部分和第二部分都做了,移动逻辑到监视器是微不足道的。

  一旦这些监控被激活,该集群将按小时自动开始监视和报警。它是很好调的,因为监视能在任意时间通过调用API来修改。你可以把间隔变长或变短,扩展每个聚合过程中的数据量,修改任意聚合的设置,改变pipeline聚合中的移动平均线类型,引入全新的指标,等等。一旦运行和生产,这是一个非常简单的系统调整。

  我希望你喜欢这三部分系列。这是一个非常有趣的项目,并且真正的帮助我懂得pipeline聚合的能力,Timelion和Watcher提供的好处(特别是组合时)。直到下一次!

原文地址:https://www.elastic.co/blog/implementing-a-statistical-anomaly-detector-part-3

转载于:https://www.cnblogs.com/benjiming/p/7152562.html

在Elasticsearch中实现统计异常检测器——第三部分相关推荐

  1. 在Elasticsearch中实现统计异常检测器——第二部分

    Implementing a statistical anomaly detector in Elasticsearch - Part 2 上一周,我们建立了一个pipeline聚合,将数千个数据点分 ...

  2. 在Elasticsearch中实现统计异常检测器——第一部分

    Implementing a Statistical Anomaly Detector in Elasticsearch - Part 1 该图显示了4500万个数据点的最小/最大/平均值(超过600 ...

  3. 在Elasticsearch中对 text 类型的字段进行聚合异常Fielddata is disabled,Set fielddata=true

    在Elasticsearch中对 text 类型的字段进行聚合异常Fielddata is disabled,Set fielddata=true 参考文章: (1)在Elasticsearch中对 ...

  4. Elasticsearch中的Zen Discovery选主流程

    文章目录 背景 为什么使用主从模式? 选举算法 什么时候触发选主? 选主过程 选举临时Master 投票与得票的实现 确立Master或加入集群 选举完成 elasticsearch中的Discove ...

  5. elasticsearch(7)聚合统计-分组聚合

    原文:https://blog.csdn.net/sz85850597/article/details/82858831 elasticsearch(7)聚合统计-分组聚合 2018年09月26日 2 ...

  6. 无监督学习:异常检测与剔除(局部异常因子法 SVM异常检测器)

    1.前言 前面介绍的都是有监督学习的回归和分类算法.有监督学习是指对输入和输出都有成对出现的训练样本{(xi,yi)}.在这里,主要介绍在没有输出信息时,只利用输入样本{xi}的信息进行无监督学习的方 ...

  7. 在Elasticsearch中查询Term Vectors词条向量信息

    这篇文章有点深度,可能需要一些Lucene或者全文检索的背景.由于我也很久没有看过Lucene了,有些地方理解的不对还请多多指正. 更多内容还请参考整理的ELK教程 关于Term Vectors 额, ...

  8. nginx日志中$request_time时间异常问题排查

    女主宣言 nginx日志分为access_log和error_log,可以用于业务的访问统计.服务排障等.我们可以自定义设置log_format,来记录关注的各项指标.本文主要讲述了一次nginx日志 ...

  9. 【Elasticsearch】在Elasticsearch中查询Term Vectors词条向量信息

    1.概述 转载:https://www.cnblogs.com/xing901022/p/5348737.html 关于Term Vectors 额,对于这个专业词汇,暂且就叫做词条向量吧,因为实在想 ...

最新文章

  1. Jira接入钉钉机器人
  2. Log4j 1使用教程
  3. python3.6打包成exe可执行文件、已解决方案_Python 3.6打包成EXE可执行程序的实现...
  4. readonly和const的区别
  5. java登录界面命令_Java命令行界面(第25部分):JCommando
  6. 线性表的顺序存储的基本操作
  7. N点虚拟主机管理系统安装图解
  8. 如何找出光纤微米级别的脏污?女朋友的一个举动给了我灵感
  9. STM32工作笔记0039---认识电路图中的DS203,MS,L等
  10. while循环 for循环的理解
  11. oracle删除重复字段数据库,用Oracle的分析函数删除重复的数据,
  12. [jQuery]30+ Brand New jQuery Plugins To Change the Look and Feel of Your Website
  13. You Don't Have To Say Goodbye
  14. 用强化学习来玩Atari游戏(基于Tensorflow的深度Q学习模型)
  15. Linux下载安装JDK
  16. Excel批量根据银行卡号查询银行卡的详细信息
  17. android 360开机启动,手机360设置开机启动项
  18. 巴西柔术第一课:骑乘式上位技术
  19. Kalibr标定时卡在Extracting calibration target corners的问题
  20. 推荐系统9---AFM与DIN模型(推荐系统遇上注意力机制)

热门文章

  1. Genymotion安装微信 下载ARM
  2. 原武大校长刘道玉给清华大学的公开信
  3. Windows2008R2 启用TLS 1.2
  4. 推荐模型复现(四):多任务模型ESMM、MMOE
  5. 英文学习20170902
  6. ElasticSearch 动态映射和静态映射,以及四种字段类型
  7. Moonbeam基金会启动首期Accelerator Program孵化计划
  8. 美团获取cookie(使用PC浏览器)
  9. godaddy ssl证书配置
  10. Windows和Linux搭建Web环境(文字精简版)