前言

运维故障排障速度往往与监控系统体系颗粒度成正比,监控到位才能快速排障

在部署这套系统之前,平台所有系统日志都由Graylog+Zabbix,针对日志出现的错误关键字进行告警,这种做法在运维工作开展过程中暴露出多个不足点,不详述;在考虑多方面原因后,最终对日志告警系统进行更换,选用的方案是:ELK + Kafka+ Filebeat + Elastalert

本文主要以两个需求为主轴做介绍

  • 非工作时间服务器异常登录告警
  • 系统日志出现错误关键字告警

架构

服务选型

name version info
Amazon Elasticsearch Service v6.2 AWK官网部署教程
Logstash v6.2.3 选用与ES相同版本
Filebeat v6.2.3 选用与ES相同版本
Confluent(Kafka) v4.0 这里推荐 Confluent 的版本,Confluent 是 kafka 作者 Neha Narkhede 从 Linkedin 出来之后联合 LinkedIn 前员工创建的大数据公司,专注于 kafka 的企业应用。
Elastalert v0.1.29 原先考虑采用X-Pack但由于AWS目前还不支持

部署

本文采用的操作系统 :CentOS release 6.6

Filebeat

# 下载源
$ curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.2.3-x86_64.rpm# 安装
$ sudo rpm -vi filebeat-6.2.3-x86_64.rpm

Logstash

# 导入Yum源
$ rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
$ cat <<EOF > /etc/yum.repos.d/logstash.repo
[logstash-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF# 安装
yum install logstash -y

Elastalert

# pip直接安装
$ pip install elastalert# 如果出现依赖包报错,以下为常用开发所需依赖包
$ yum install -y zlib openssl openssl-devel gcc gcc-c++ Xvfb libXfont Xorg libffi libffi-devel python-cffi python-devel libxslt-devel libxml2-devel zlib-devel bzip2-devel xz-libs wget

配置

Filebeat
/etc/filebeat/filebeat.yml

filebeat.config:prospectors:path: /etc/filebeat/conf/*.ymlreload.enabled: truereload.period: 10soutput.kafka:# kafkaNode为Kafaka服务所在服务器 hosts: ["kafkaNode:9092"]# 索引取fields.out_topictopic: "%{[fields][out_topic]}"partition.round_robin:reachable_only: false

/etc/filebeat/conf/base.yml


# 收集系统日志
- type: logpaths: - /var/log/messages- /var/log/syslog*exclude_files: [".gz$"]exclude_lines: ["ssh_host_dsa_key"]tags: ["system_log"]scan_frequency: 1sfields:# 新增字段用于辨别来源客户端server_name: client01# 索引out_topic: "system_log"multiline:pattern: "^\\s"match: after# 收集登录日志
- type: logpaths:- /var/log/secure*- /var/log/auth.log*tags: ["system_secure"]exclude_files: [".gz$"]scan_frequency: 1sfields:server_name: client01out_topic: "system_secure"multiline:pattern: "^\\s"match: after

Logstash

/etc/logstash/conf.d/system_log.conf

input {kafka {bootstrap_servers => "kafkaNode:9092"consumer_threads => 3topics => ["system_log"]auto_offset_reset => "latest"codec => "json"}
}filter {# 排除logstash日志if [source] == "/var/log/logstash-stdout.log" {drop {}}if [fields][out_topic] == "system_log" {date {match => [ "[system][syslog][timestamp]", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]}grok {match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }remove_field => "message"}}
}output {elasticsearch {hosts => ["<亚马逊ES地址>"]index => "%{[fields][out_topic]}_%{+YYYYMMdd}"document_type => "%{[@metadata][type]}"}
}

/etc/logstash/conf.d/secure_log.conf

input {kafka {bootstrap_servers => "kafkaNode:9092"consumer_threads => 3topics => ["system_secure"]auto_offset_reset => "latest"codec => "json"}
}filter {if [fields][out_topic] == "system_secure" {grok {match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?","%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}","%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}","%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}","%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}","%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][user][add][name]}, UID=%{NUMBER:[system][auth][user][add][uid]}, GID=%{NUMBER:[system][auth][user][add][gid]}, home=%{DATA:[system][auth][user][add][home]}, shell=%{DATA:[system][auth][user][add][shell]}$","%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][message]}"] }pattern_definitions => {"GREEDYMULTILINE"=> "(.|\n)*"}remove_field => "message"}}
}output {elasticsearch {hosts => ["<亚马逊ES地址>"]index => "%{[fields][out_topic]}_%{+YYYYMMdd}"document_type => "%{[@metadata][type]}"}
}

Kafka


# 导入
rpm --import https://packages.confluent.io/rpm/4.0/archive.keycat <<EOF > /etc/yum.repos.d/confluent.repo
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/4.0/6
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/4.0/archive.key
enabled=1[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/4.0
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/4.0/archive.key
enabled=1
EOFyum install confluent-platform-oss-2.11

Elastalert

Elastalert可以部署到任何一台能够读取到ES的服务器上;配置文件中modules.eagle_post.EagleAlerter blacklist_v2经过修改,后面会介绍到

rules/system_log.yaml

es_host: <亚马逊ES地址>
es_port: 80
name: system log rule
type: blacklist_v2
index: system_log*
timeframe:minutes: 1# 监控key
compare_key: system.syslog.message# 出现下面任意关键字将告警,按需添加
blacklist_v2:- "ERROR"- "error"alert: "modules.eagle_post.EagleAlerter"
eagle_post_url: "<eagle>"
eagle_post_all_values: False
eagle_post_payload:server: "fields.server_name"info: "system.syslog.message"source: "source"

rules/system_log.yaml

es_host: <亚马逊ES地址>
es_port: 80
name: system secure rule
type: frequency
index: system_secure*
num_events: 1
timeframe:minutes: 1
filter:
- query:wildcard:system.auth.user : "*"
alert: "modules.eagle_post.EagleAlerter"
eagle_post_url: "<eagle>"
eagle_post_all_values: False# 非工作时间
eagle_time_start: "09:00"
eagle_time_end: "18:00"
eagle_post_payload:user: "system.auth.user"server: "fields.server_name"ip: "system.auth.ssh.ip"event: "system.auth.ssh.event"

Elastalert

自定义typealert

为了能够将告警接入到Eagle(自研统一接口平台)在尝试使用http_post做告警类型过程中,发现无法传入ES结果作为POST参数,所以对其进行简单修改,新增类型,实现能够无缝接入Eagle

Alert

moudules/eagle_post.py

将文件夹保存到site-packages/elastalert

import json
import requests
import dateutil.parser
import datetime
from elastalert.alerts import Alerter
from elastalert.util import EAException
from elastalert.util import elastalert_logger
from elastalert.util import lookup_es_keyclass EagleAlerter(Alerter):def __init__(self, rule):super(EagleAlerter, self).__init__(rule)# 设定时间有效范围self.post_time_start = self.rule.get('eagle_time_start','00:00')self.post_time_end = self.rule.get('eagle_time_end','00:00')# post链接self.post_url = self.rule.get('eagle_post_url','')self.post_payload = self.rule.get('eagle_post_payload', {})self.post_static_payload = self.rule.get('eagle_post_static_payload', {})self.post_all_values = self.rule.get('eagle_post_all_values', False)self.post_lock = Falsedef alert(self, matches):if not self.post_url:elastalert_logger.info('Please input eagle url!')return Falsefor match in matches:# 获取所有payloadpayload = match if self.post_all_values else {}# 构建字典for post_key, es_key in self.post_payload.items():payload[post_key] = lookup_es_key(match, es_key)# 获取当前时间login_time = datetime.datetime.now().time()# 获取时间限制time_start = dateutil.parser.parse(self.post_time_start).time()time_end =  dateutil.parser.parse(self.post_time_end).time()# 如果在时间范围内,将不做告警self.post_lock = False if login_time > time_start and \login_time < time_end else True# 合并两种类型payloaddata = self.post_static_payloaddata.update(payload)# 发送告警if self.post_lock:myRequests = requests.Session()myRequests.post(url=self.post_url,data=data,verify=False)elastalert_logger.info("[-] eagle alert sent.")else:elastalert_logger.info("[*] nothing to do.")def get_info(self):return {'type': 'http_post'}

type

在使用blaklist过程发现改类型是全匹配,为了方便编写配置文件,所以对其做了简单修改

elastalert/ruletypes.py

# 新增class BlacklistV2Rule(CompareRule):required_options = frozenset(['compare_key', 'blacklist_v2'])def __init__(self, rules, args=None):super(BlacklistV2Rule, self).__init__(rules, args=None)self.expand_entries('blacklist_v2')def compare(self, event):term = lookup_es_key(event, self.rules['compare_key'])# 循环配置文件, 这种做法对性能有一定的损耗,在没找到更合适的解决方案前,就采取这种方式for i in self.rules['blacklist_v2']:if i in term:return True return False

elastalert/config.py

# 新增
rules_mapping = {'frequency': ruletypes.FrequencyRule,'any': ruletypes.AnyRule,'spike': ruletypes.SpikeRule,'blacklist': ruletypes.BlacklistRule,'blacklist_v2': ruletypes.BlacklistV2Rule,'whitelist': ruletypes.WhitelistRule,'change': ruletypes.ChangeRule,'flatline': ruletypes.FlatlineRule,'new_term': ruletypes.NewTermsRule,'cardinality': ruletypes.CardinalityRule,'metric_aggregation': ruletypes.MetricAggregationRule,'percentage_match': ruletypes.PercentageMatchRule,
}

elastalert/schema.yaml

# 新增- title: BlacklistV2required: [blacklist_v2, compare_key]properties:type: {enum: [blacklist_v2]}compare_key: {'items': {'type': 'string'},'type': ['string', 'array']}blacklist: {type: array, items: {type: string}}

打进Docker

做了个简单DockerFile做参考

FROM python:2.7-alpine
ENV SITE_PACKAGES /usr/local/lib/python2.7/site-packages/elastalert
WORKDIR /opt/elastalertRUN apk update &&     apk add gcc ca-certificates openssl-dev openssl libffi-dev  gcc musl-dev tzdata openntpd && \ pip install elastalert && cp -rf /usr/share/zoneinfo/Asia/Taipei /etc/localtime
COPY ./ /opt/elastalert
CMD ["/opt/elastalert/start.sh"]

start.sh

#!/bin/sh
SITE_PATH=/usr/local/lib/python2.7/site-packages/elastalert
CONFIG=/opt/elastalert/config/config.yaml
MODULES=/opt/elastalert/modulesif [ -n "${MODULES}" ]
then\cp -rf ${MODULES}  ${SITE_PATH}echo "[-] Copy ${MODULES} to ${SITE_PATH}"
fi\cp -rf elastalert/* ${SITE_PATH}/
echo "[-] Copy elastalert/* to ${SITE_PATH}"
python -m elastalert.elastalert --verbose  --config ${CONFIG}

基础工作准备就绪,加入Bee容器管理平台完成自动构建。

实现效果

碰到的坑

Zookeeper

问题描述

老版Kafaka依赖Zookeeper,默认安装时注册地址为:localhost,导致问题的现象:

filebeat错误日志

2018-04-25T09:14:55.590+0800    INFO    kafka/log.go:36    client/metadata fetching metadata for [[[system_log] kafkaNode:9092]] from broker %!s(MISSING)2018-04-25T09:14:55.591+0800    INFO    kafka/log.go:36    producer/broker/[[0]] starting up2018-04-25T09:14:55.591+0800    INFO    kafka/log.go:36    producer/broker/[[0 %!d(string=system_log) 0]] state change to [open] on %!s(MISSING)/%!d(MISSING)2018-04-25T09:14:55.591+0800    INFO    kafka/log.go:36    producer/leader/[[system_log %!s(int32=0) %!s(int32=0)]]/%!d(MISSING) selected broker %!d(MISSING)2018-04-25T09:14:55.591+0800    INFO    kafka/log.go:36    producer/leader/[[system_secure %!s(int32=0) %!s(int=3)]]/%!d(MISSING) state change to [retrying-%!d(MISSING)]2018-04-25T09:14:55.591+0800    INFO    kafka/log.go:36    producer/leader/[[system_secure %!s(int32=0) %!s(int32=0)]]/%!d(MISSING) abandoning broker %!d(MISSING)2018-04-25T09:14:55.592+0800    INFO    kafka/log.go:36    producer/broker/[[0]] shut down2018-04-25T09:14:55.592+0800    INFO    kafka/log.go:36    Failed to connect to broker [[localhost:9092 dial tcp [::1]:9092: getsockopt: connection refused]]: %!s(MISSING)

日志出现两个地址,一个是kafka地址,另外出现一个localhost地址。
这是因为filebeat已经跟kafaka建立了连接,但是从kafakazookeeper这一段找不到

解决方法

# get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://localhost:9092"],"jmx_port":-1,"host":"localhost","timestamp":"1523429158364","port":9092,"version":4}
cZxid = 0x1d
ctime = Wed Apr 11 14:45:58 CST 2018
mZxid = 0x1d
mtime = Wed Apr 11 14:45:58 CST 2018
pZxid = 0x1d
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x162b374170d0000
dataLength = 188
numChildren = 0# 发现注册地址是localhost,修改之
set /brokers/ids/0  {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafkaNode:9092"],"jmx_port":9999,"host":"kafkaNode","timestamp":"1523429158364","port":9092,"version":4}

修改完重启,问题解决。

博文链接:http://a-cat.cn/2018/04/29/aws-elk/

转载于:https://blog.51cto.com/maoyao/2109147

基于AWS-ELK部署系统日志告警系统相关推荐

  1. 分布式实时日志分析解决方案 ELK 部署架构

    一.前言 ELK 已经成为目前最流行的集中式日志解决方案,它主要是由Beats.Logstash.Elasticsearch.Kibana等组件组成,来共同完成实时日志的收集,存储,展示等一站式的解决 ...

  2. 在AWS中部署OpenShift平台

    OpenShift是RedHat出品的PAAS平台.OpenShift做为PAAS平台最大的特点是它是完全容器化的PAAS平台,底层封装了Docker和Kubernetes,上层暴露了对开发者友好的接 ...

  3. anasys hpc集群_这可能是最简单的并行方案,如何基于 AWS ParallelCluster 运行 ANSYS Fluent...

    使用 HPC(高性能计算)应对计算流体动力学 (CFD) 的挑战已成为惯例.随着近一二十年间,HPC 工作站向超级计算机的发展变缓,计算集群正不断地取代单独的大型 SMP(共享内存处理)超级计算机的地 ...

  4. 云+X案例展 | 民生类:基于AWS PaaS构建基础集团企业级中台

    本案例由浪潮投递并参与评选,CSDN云计算独家全网首发:更多关于[云+X 案例征集]的相关信息,点击了解详情丨挖掘展现更多优秀案例,为不同行业领域带来启迪,进而推动整个"云+行业" ...

  5. 基于AWS 平台跳板机配置

    很多用户通过跳板机对部署在AWS平台的应用系统进行日常维护,为管理私有网络的服务器提供便利,最小化了应用系统的安全风险,从而有利于提升整体架构的安全. 为达到更好的安全性,需要进行恰当的规划,通常可以 ...

  6. 在AWS上部署一个网站

    在AWS上部署一个网站 使用Elastic Beanstalk,用户就不必操心自己的操作系统或虚拟服务器,因为它在它们之上加了一个抽象层. 文章目录 在AWS上部署一个网站 前言 一.Elastic ...

  7. 基于AWS的云架构设计最佳实践——万字长文:云架构设计原则|附PDF下载

    译者序 AWS用户广泛,产品线复杂,AWS发布的白皮书<Architecting for the Cloud-AWS Best Practices>介绍了常见场景下云架构的最佳实践,不仅对 ...

  8. Terraform 学习总结(7)—— 基于 AWS 云平台上的 Terraform 实战

    一.AWS 上基础环境介绍 相信大家已经知道 Terraform 的基本使用方式了.在我们以后的场景中,主要讨论一些典型架构的实现.在实际使用中,Terraform 在多云基础架构创建中,是非常方便和 ...

  9. ELK部署+Elastalert

    前言 本文是ELK部署调试文档,ELK是由kibana.elasticsearch.logstash三个软件组成的,三个人软件的作用分别是,数据展示.日志存储以及检索.日志采集. ELK的部署方式有多 ...

最新文章

  1. HTML转WORD WORD转PDF--来源网络
  2. 互联网公装企业“inDeco领筑智造”完成A+B轮近1.1亿元融资
  3. vue 修改div宽度_vue 拖动调整左右两侧div的宽度
  4. makefile小技巧
  5. WePY框架开发的小程序如何在微信web开发者工具中运行起来
  6. ks检验正态分布结果_数据分析基础(2)——正态分布检验
  7. NFS应用场景及环境搭建
  8. 2018.1.18纪事
  9. Odoo (OpenERP/TinyERP)-10.0 (Debian 8)
  10. 谋学网计算机维修,19秋学期西交《计算机组成原理》在线作业1(标准答案).doc...
  11. 实用工具软件远古大神Nir Sofer,数百款短小精悍便携工具,从Win2000到Win10通吃
  12. mysql语句高逼格_求一些逼格高的语句?
  13. Bzoj3653 谈笑风生
  14. centos tcpdump
  15. 你了解净水器滤芯知识多少?
  16. 大数据安全运营内容包括哪些?
  17. Mybatis-Plus入门案例以及使用方法
  18. 磁盘阵列raid LVM创建
  19. mysql zfs快照_ZFS for MySQL
  20. 彼得•德鲁克 (Peter F.Drucker)管理思想

热门文章

  1. 吴恩达老师深度学习视频课笔记:逻辑回归公式推导及C++实现
  2. 图像处理和图像识别中常用的OpenCV函数
  3. 【c语言】C语言配置文件解析库——iniparser
  4. matlab 弹出提示,谁能告诉我为什么一打开matlab2014b就弹出一个框就自动退出
  5. mysql课程id数据类型_数据库学习之六:mysql数据类型
  6. matlab怎么根据波宽度去波,使用Matlab图像处理(三)——图像滤波原理
  7. python snmp 交换机 配置文件_编译安装SNMP,snmpd.conf配置文件说明
  8. php 自定义表格并统计,PHP 使用Echarts生成数据统计报表的实现
  9. 魔兽怀旧服怎么找不到服务器,魔兽世界怀旧服无法连接服务器怎么解决 服务器连接不上解决方法...
  10. matlab ros 手势识别,使用MATLAB读取分析ros记录的.bag文件