转载来源 :ELK基于ElastAlert实现日志的微信报警 : https://www.jianshu.com/p/f31c0d6020fe

一、ElastAlert介绍

在日志管理上我们使用Elasticsearch,Logstash和Kibana技术栈来管理不断增长的数据和日志,但是对于错误日志的监控ELK架构并没有提供,所以我们需要使用到第三方工具ElastAlert,来帮助我们及时发现业务中存在的问题。

ElastAlert通过定期查询Elasticsearch,并将数据传递到规则类型,该规则类型确定何时找到匹配项。发生匹配时,将为该警报提供一个或多个警报,这些警报将根据匹配采取行动。

这是由一组规则配置的,每个规则定义一个查询,一个规则类型和一组警报。

ElastAlert支持以下方式报警

Command (可调用短信接口)
Email
JIRA
OpsGenie
SNS
HipChat
Slack
Telegram
Debug
Stomp

除了这种基本用法外,还有许多其他功能使警报更加有用:

警报链接到Kibana仪表板
任意字段的合计计数
将警报合并为定期报告
通过使用唯一键字段来分隔警报
拦截并增强比赛数据

二、部署ElastAlert

1. 部署所需环境

ELK 环境部署

EFK6.3+kafka+logstash日志分析平台集群

安装依赖包

$ yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel

部署python3.6

$ mkdir -p /usr/local/python3
$ cd /usr/local/python3
$ wget https://www.python.org/ftp/python/3.6.9/Python-3.6.9.tgz
$ tar xf Python-3.6.9.tgz
$ cd Python-3.6.9
$ ./configure --prefix=/usr/local/python3
$ make && make install

配置环境变量

# 将python2.7 软链删除,换成python3.6
$ rm /usr/bin/python
$ ln -s /usr/local/python3/bin/python3.6 /usr/bin/python
$ rm /usr/bin/pip
$ ln -s /usr/local/python3/bin/pip3 /usr/bin/pip

验证版本

$ python
Python 3.6.9 (default, Jun  2 2020, 12:12:43)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-18)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>>
$ pip -V
pip 18.1 from /usr/local/python3/lib/python3.6/site-packages/pip (python 3.6)

2. 部署ElastAlert

$ cd /app
$ git clone https://github.com/Yelp/elastalert.git

安装模块:

$ pip install "setuptools>=11.3"
$ python setup.py install

根据Elasticsearch的版本,您可能需要手动安装正确版本的elasticsearch-py。

Elasticsearch 5.0+:

$ pip install "elasticsearch>=5.0.0"

Elasticsearch 2.X:

$ pip install "elasticsearch<3.0.0"

3. 配置ElastAlert

配置config.yaml 文件

$ cp config.yaml.example  config.yaml
$ cat config.yaml
rules_folder: example_rules
run_every:seconds: 10
buffer_time:minutes: 15
es_host: 10.1.144.208
es_port: 9201
#es_username: elastic
#es_password: 123456
writeback_index: elastalert_status
alert_time_limit:days: 2

rules_folder:ElastAlert从中加载规则配置文件的位置。它将尝试加载文件夹中的每个.yaml文件。没有任何有效规则,ElastAlert将无法启动。
run_every: ElastAlert多久查询一次Elasticsearch的时间。
buffer_time:查询窗口的大小,从运行每个查询的时间开始向后延伸。对于其中use_count_query或use_terms_query设置为true的规则,将忽略此值。
es_host:是Elasticsearch群集的地址,ElastAlert将在其中存储有关其状态,查询运行,警报和错误的数据。
es_port:es对应的端口。
es_username: 可选的; 用于连接的basic-auth用户名es_host。
es_password: 可选的; 用于连接的basic-auth密码es_host。
es_send_get_body_as: 可选的; 方法查询Elasticsearch - GET,POST或source。默认是GET
writeback_index:ElastAlert将在其中存储数据的索引的名称。我们稍后将创建此索引。
alert_time_limit: 失败警报的重试窗口。

创建elastalert-create-index索引

$ elastalert-create-index
New index name (Default elastalert_status)
Name of existing index to copy (Default None)
New index elastalert_status created
Done!

三、使用微信报警

由于ElastAlert没有内置企业微信的报警方式,我们还需要使用一个开源插件elastalert-wechat-plugin来实现微信的报警,Github项目地址

1. 下载项目文件

$ cd elastalert
$ wget -P ~/elastalert/elastalert_modules/ wget https://raw.githubusercontent.com/anjia0532/elastalert-wechat-plugin/master/elastalert_modules/wechat_qiye_alert.py
$ touch ~/elastalert/elastalert_modules/__init__.py

2. 修改插件源码

由于这个插件是基于python2.x版本开发的,而ElastAlert的最新版本使用的是python3.6版本开发,所以需要改一些代码,以便正常运行,另外还添添加了转中文字符功能。
wechat_qiye_alert.py修改后如下:

#! /usr/bin/env python
# -*- coding: utf-8 -*-import json
import datetime
from elastalert.alerts import Alerter, BasicMatchString
from requests.exceptions import RequestException
from elastalert.util import elastalert_logger,EAException #[感谢minminmsn分享](https://github.com/anjia0532/elastalert-wechat-plugin/issues/2#issuecomment-311014492)
import requests
from elastalert_modules.MyEncoder import MyEncoder'''
#################################################################
# 微信企业号推送消息                                              #
#                                                               #
# 作者: AnJia <anjia0532@gmail.com>                              #
# 作者博客: https://anjia.ml/                                    #
# Github: https://github.com/anjia0532/elastalert-wechat-plugin #
#                                                               #
#################################################################
'''
class WeChatAlerter(Alerter):#企业号id,secret,应用id必填required_options = frozenset(['corp_id','secret','agent_id'])def __init__(self, *args):super(WeChatAlerter, self).__init__(*args)self.corp_id = self.rule.get('corp_id', '')     #企业号idself.secret = self.rule.get('secret', '')       #secretself.agent_id = self.rule.get('agent_id', '')   #应用idself.party_id = self.rule.get('party_id')       #部门idself.user_id = self.rule.get('user_id', '')     #用户id,多人用 | 分割,全部用 @allself.tag_id = self.rule.get('tag_id', '')       #标签idself.access_token = ''                          #微信身份令牌self.expires_in=datetime.datetime.now() - datetime.timedelta(seconds=60)def create_default_title(self, matches):subject = 'ElastAlert: %s' % (self.rule['name'])return subjectdef alert(self, matches):if not self.party_id and not self.user_id and not self.tag_id:elastalert_logger.warn("All touser & toparty & totag invalid")# 参考elastalert的写法# https://github.com/Yelp/elastalert/blob/master/elastalert/alerts.py#L236-L243body = self.create_alert_body(matches)#matches 是json格式#self.create_alert_body(matches)是String格式,详见 [create_alert_body 函数](https://github.com/Yelp/elastalert/blob/master/elastalert/alerts.py)# 微信企业号获取Token文档# http://qydev.weixin.qq.com/wiki/index.php?title=AccessTokenself.get_token()self.senddata(body)elastalert_logger.info("send message to %s" % (self.corp_id))def get_token(self):#获取token是有次数限制的,本想本地缓存过期时间和token,但是elastalert每次调用都是一次性的,不能全局缓存if self.expires_in >= datetime.datetime.now() and self.access_token:return self.access_token#构建获取token的urlget_token_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=%s&corpsecret=%s' %(self.corp_id,self.secret)try:response = requests.get(get_token_url)response.raise_for_status()except RequestException as e:raise EAException("get access_token failed , stacktrace:%s" % e)#sys.exit("get access_token failed, system exit")token_json = response.json()if 'access_token' not in token_json :raise EAException("get access_token failed , , the response is :%s" % response.text())#sys.exit("get access_token failed, system exit")#获取access_token和expires_inself.access_token = token_json['access_token']self.expires_in = datetime.datetime.now() + datetime.timedelta(seconds=token_json['expires_in'])return self.access_tokendef senddata(self, content):#如果需要原始json,需要传入matches# http://qydev.weixin.qq.com/wiki/index.php?title=%E6%B6%88%E6%81%AF%E7%B1%BB%E5%9E%8B%E5%8F%8A%E6%95%B0%E6%8D%AE%E6%A0%BC%E5%BC%8F# 微信企业号有字符长度限制(2048),超长自动截断# 参考 http://blog.csdn.net/handsomekang/article/details/9397025#len utf8 3字节,gbk2 字节,ascii 1字节if len(content) > 512 :content = content[:512] + "..."# 微信发送消息文档# http://qydev.weixin.qq.com/wiki/index.php?title=%E6%B6%88%E6%81%AF%E7%B1%BB%E5%9E%8B%E5%8F%8A%E6%95%B0%E6%8D%AE%E6%A0%BC%E5%BC%8Fsend_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s' %( self.access_token)headers = {'content-type': 'application/json'}# 替换消息标题为中文,下面的字段为logstash切分的日志字段title_dict = {
#            "At least": "报警规则:At least","@timestamp": "报警时间","_index": "索引名称","_type": "索引类型","ServerIP": "报警主机","hostname": "报警机器","message": "报警内容","class": "报错类","lineNum": "报错行""num_hits": "文档命中数","num_matches": "文档匹配数"}#print(f"type:{type(content)}")for k, v in title_dict.items():content = content.replace(k, v, 1 )# 最新微信企业号调整校验规则,tagid必须是string类型,如果是数字类型会报错,故而使用str()函数进行转换payload = {"touser": self.user_id and str(self.user_id) or '', #用户账户,建议使用tag"toparty": self.party_id and str(self.party_id) or '', #部门id,建议使用tag"totag": self.tag_id and str(self.tag_id) or '', #tag可以很灵活的控制发送群体细粒度。比较理想的推送应该是,在heartbeat或者其他elastic工具自定义字段,添加标签id。这边根据自定义的标签id,进行推送'msgtype': "text","agentid": self.agent_id,"text":{"content": content.encode('UTF-8').decode("latin1") #避免中文字符发送失败},"safe":"0"}# set https proxy, if it was provided# 如果需要设置代理,可修改此参数并传入requests# proxies = {'https': self.pagerduty_proxy} if self.pagerduty_proxy else Nonetry:#response = requests.post(send_url, data=json.dumps(payload, ensure_ascii=False), headers=headers)response = requests.post(send_url, data=json.dumps(payload, cls=MyEncoder, indent=4, ensure_ascii=False), headers=headers)response.raise_for_status()except RequestException as e:raise EAException("send message has error: %s" % e)elastalert_logger.info("send msg and response: %s" % response.text)def get_info(self):return {'type': 'WeChatAlerter'}

在同级目录下创建MyEncoder.py文件

#!/usr/bin/env python3
# -*- coding: utf-8 -*-import jsonclass MyEncoder(json.JSONEncoder):def default(self, obj):if isinstance(obj, bytes):return str(obj, encoding='utf-8')return json.JSONEncoder.default(self, obj)

3. 申请企业微信账号

step 1: 访问网站 注册企业微信账号(不需要企业认证)。
step 2: 访问apps 创建第三方应用,点击创建应用按钮 -> 填写应用信息:
Step3: 创建部门,获取部门ID

4. 配置报警规则

配置规则文件

$ cd  elastalert/example_rules/
$ cp example_frequency.yaml   applog.yaml
$ cat sms-applog.yaml  | grep -v ^#
name: 【日志报警】
use_strftine_index: true
type: frequencyindex: applog-*
num_events: 1
timeframe:minutes: 1
filter:
- query:query_string:query: '"\[ERROR\]" NOT "发送邮件失败"'alert:- "elastalert_modules.wechat_qiye_alert.WeChatAlerter"
corp_id: wxdxxx40b4720f24
secret: xa4pWq63sxxtaZzzEg8X860ZBIoOkToCbh_oNc
agent_id: 1000002
party_id: 2

index:要查询的索引的名称, ES中存在的索引。
num_events:此参数特定于frequency类型,并且是触发警报时的阈值。
filter:用于过滤结果的Elasticsearch过滤器列表,这里的规则定义是除了包含“发送邮件失败”的错误日志,其他所有ERROR的日志都会触发报警。
alert:定义报警方式,我们这里采用企业微信报警。
corp_id: 企业微信的接口认证信息

5. 运行ElastAlert

$ python -m elastalert.elastalert --verbose --config /app/elastalert/config.yaml --rule /app/elastalert/example_rules/sms-applog.yaml
1 rules loaded
INFO:elastalert:Starting up
INFO:elastalert:Disabled rules are: []
INFO:elastalert:Sleeping for 9.999904 seconds
INFO:elastalert:Queried rule 【日志报警】 from 2020-06-05 17:47 CST to 2020-06-05 17:47 CST: 0 / 0 hits
INFO:elastalert:Ran 【日志报警】 from 2020-06-05 17:47 CST to 2020-06-05 17:47 CST: 0 query hits (0 already seen), 0 matches, 0 alerts sent
后台运行
$ nohup python -m elastalert.elastalert --verbose --config /app/elastalert/config.yaml --rule /app/elastalert/example_rules/sms-applog.yaml > nohup.txt 2>&1

微信端添加企业号报警格式如下:

CentOS下ELK基于ElastAlert实现日志的微信报警相关推荐

  1. CentOS下ELK收集Nginx日志

    1. ELK收集Nginx普通格式的日志 1.1 测试服务器架构 1.2 ab工具使用 yum install httpd-tools -y# -n 总共发送多少条请求,注意,最后"/&qu ...

  2. CentOS下ELK 7.2生产安全部署

    01.架构说明 在需要采集日志的服务器上部署Filebeat服务,它将采集到的日志数据推送到Kafka集群: Logstash服务通过input插件读取Kafka集群对应主题的数据,期间可以使用fil ...

  3. 奥塔在线:CentOS下查看crontab定时任务输出日志

    值班室打电话过来,提醒说监控平台中某台TCU集群服务器告警,磁盘空间不足. 连上服务器,使用df -h查看当前磁盘使用情况,居然已经93%了. 通过查看目录大小命令 du -sh * 发现是Nginx ...

  4. Centos下重要日志文件及查看方式

    Centos下重要日志文件及查看方式 时间:2013-07-28 12:10来源:中国IT实验室 作者:感谢:"匿名"投稿 举报 点击:6525次 id="iframeu ...

  5. Docker下ELK三部曲之三:K8S上的ELK和应用日志上报

    本章是<Docker下ELK三部曲>系列的终篇,前面章节已经详述了ELK环境的搭建以及如何制作自动上报日志的应用镜像,今天我们把ELK和web应用发布到K8S环境下,模拟多个后台serve ...

  6. 基于ELK搭建网站实时日志监控平台

    基于ELK搭建网站实时日志监控平台 1 为什么要用到ELK 早在传统的单体应用时代,查看日志大都通过SSH客户端登服务器去看,使用较多的命令就是 less 或者 tail.如果服务部署了好几台,就要分 ...

  7. 基于ELK进行邮箱访问日志的分析

    公司希望能够搭建自己的日志分析系统.现在基于ELK的技术分析日志的公司越来越多,在此也记录一下我利用ELK搭建的日志分析系统. 系统搭建 系统主要是基于elasticsearch+logstash+f ...

  8. Linux 下 4 种实时监控日志文件的方法,总有一种适合你

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 来源 | http://t.cn/AiKIk7c3 在 ...

  9. log4日志内容换行_Linux 下 4 种实时监控日志文件的方法,总有一种适合你

    「 读者福利!2 TB 各类技术资源免费赠送 」 在 Linux 下如何才能实时查看日志内容呢?有很多工具可以帮助我们在文件持续修改的同时输出文件内容,最常用的莫过于 tail 命令 了. 1. ta ...

最新文章

  1. 《Go语言从入门到实战》学习笔记(1)——Go语言学习路线图、简介
  2. docker dockerfile 映射端口范围 批量映射端口
  3. oracle11g 数据库导出报“ EXP-00003: 未找到段 (0,0) 的存储定义”错误的解决方案
  4. SAP 电商云 Spartacus UI 产品搜索结果的设计明细
  5. WCF RIA优缺点
  6. python Django ORM ,用filter方法表示“不等于”的方法
  7. (147)FPGA面试题-Verilog移位相加实现乘法(二)
  8. fastexcel读取excel追加写入sheet页_Python写入和读取excel
  9. chrome如何调试html,如何用firefox或chrome浏览器调试js和jquery程序
  10. mysql 2182_MySql常用命令总结
  11. 软件工程——(2)软件项目管理 思维导图
  12. 匹配滤波器的仿真——线性调频信号
  13. 色谱计算机常用英文,色谱术语的常用中英文对照
  14. springboot医疗管理系统 毕业设计-附源码015221
  15. [Windows]卸载Office 2016密钥
  16. 2022年指数与指数公司行业研究报告
  17. web网页设计实例作业 ——古典中国风工艺美术(9页) html+css+javascript网页设计实例
  18. 五个无刷马达驱动电路分享!
  19. 粗虚线和细虚线_车道划分线上下两侧有粗虚线
  20. IOS app 上线流程

热门文章

  1. Python时间序列数据分析--以示例说明
  2. linux yum仓库制作,yum仓库搭建之RPM包制作
  3. nsq php,NSQ 最佳实践
  4. java实现梁友栋裁剪算法_梁友栋裁剪算法
  5. python和abap的关系_ABAP 一对多关系
  6. 数据结构排序系列详解之六 树形选择排序
  7. Android 数据Parcel序列化过程源码分析
  8. java tree 表格_00030-layui+java 树形表格treeTable
  9. java知识点3(null)
  10. 去除 Css 表单自动填充黄色背景