ELK 经典用法—企业自定义日志收集切割和mysql模块

一、收集切割公司自定义的日志

很多公司的日志并不是和服务默认的日志格式一致,因此,就需要我们来进行切割了。

1、需切割的日志示例

2018-02-24 11:19:23,532 [143] DEBUG performanceTrace 1145 http://api.114995.com:8082/api/Carpool/QueryMatchRoutes 183.205.134.240 null 972533 310000 TITTL00 HUAWEI 860485038452951 3.1.146 HUAWEI 5.1 113.552344 33.332737 发送响应完成 Exception:(null)

2、切割的配置

在logstash 上,使用fifter 的grok 插件进行切割

input {beats {port => "5044"}
}filter {grok {match => {"message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{NUMBER:thread:int}\] %{DATA:level} (?<logger>[a-zA-Z]+) %{NUMBER:executeTime:int} %{URI:url} %{IP:clientip} %{USERNAME:UserName} %{NUMBER:userid:int} %{NUMBER:AreaCode:int} (?<Board>[0-9a-zA-Z]+[-]?[0-9a-zA-Z]+) (?<Brand>[0-9a-zA-Z]+[-]?[0-9a-zA-Z]+) %{NUMBER:DeviceId:int} (?<TerminalSourceVersion>[0-9a-z\.]+) %{NUMBER:Sdk:float} %{NUMBER:Lng:float} %{NUMBER:Lat:float} (?<Exception>.*)"}remove_field => "message"}date {match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]remove_field => "timestamp"}geoip {source => "clientip"target => "geoip"database => "/etc/logstash/maxmind/GeoLite2-City.mmdb"}
}output {elasticsearch {hosts => ["http://192.168.10.101:9200/"]index => "logstash-%{+YYYY.MM.dd}"document_type => "apache_logs"}
}

3、切割解析后效果

4、最终kibana 展示效果

① top10 clientip

② top5 url

③ 根据ip 显示地理位置

⑤ top10 executeTime

⑥ 其他字段都可进行设置,多种图案,也可将多个图形放在一起展示

二、grok 用法详解

1、简介

  Grok是迄今为止使蹩脚的、无结构的日志结构化和可查询的最好方式。Grok在解析 syslog logs、apache and other webserver logs、mysql logs等任意格式的文件上表现完美。

  Grok内置了120多种的正则表达式库,地址:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns。

2、入门例子

① 示例

55.3.244.1 GET /index.html 15824 0.043

② 分析

  这条日志可切分为5个部分,IP(55.3.244.1)、方法(GET)、请求文件路径(/index.html)、字节数(15824)、访问时长(0.043),对这条日志的解析模式(正则表达式匹配)如下:

%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}

③ 写到filter中

filter { grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"} } }

④ 解析后效果

client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043

3、解析任意格式日志

(1)解析任意格式日志的步骤:

① 先确定日志的切分原则,也就是一条日志切分成几个部分。

② 对每一块进行分析,如果Grok中正则满足需求,直接拿来用。如果Grok中没用现成的,采用自定义模式。

③ 学会在Grok Debugger中调试。

(2)grok 的分类

  • 满足自带的grok 正则 grok_pattern

① 可以查询

# less /usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.1/patterns/grok-patterns

② 使用格式

grok_pattern 由零个或多个 %{SYNTAX:SEMANTIC}组成

例: %{IP:clientip}

  其中SYNTAX 是表达式的名字,是由grok提供的:例如数字表达式的名字是NUMBER,IP地址表达式的名字是IP

  SEMANTIC 表示解析出来的这个字符的名字,由自己定义,例如IP字段的名字可以是 client

  • 自定义SYNTAX

使用格式:(?<field_name>the pattern here)

例:(?<Board>[0-9a-zA-Z]+[-]?[0-9a-zA-Z]+)

(3)正则解析容易出错,强烈建议使用Grok Debugger调试,姿势如下(我打开这个网页不能用)

三、使用mysql 模块,收集mysql 日志

1、官方文档使用介绍

https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-module-mysql.html

2、配置filebeat ,使用mysql 模块收集mysql 的慢查询

# vim filebeat.yml

#=========================== Filebeat prospectors =============================
filebeat.modules:
- module: mysqlerror:enabled: truevar.paths: ["/var/log/mariadb/mariadb.log"]slowlog:enabled: truevar.paths: ["/var/log/mariadb/mysql-slow.log"]
#----------------------------- Redis output --------------------------------
output.redis:hosts: ["192.168.10.102"]password: "ilinux.io"key: "httpdlogs"datatype: "list"db: 0timeout: 5

3、elk—logstash 切割mysql 的慢查询日志

① 切割配置

# vim mysqllogs.conf

input {redis {host => "192.168.10.102"port => "6379"password => "ilinux.io"data_type => "list"key => "httpdlogs"threads => 2}
}

filter {
  if [fields][type] == "pachongmysql" {
    grok {
      match => {
        "message" => "^#\ Time:\ (?<Time>.*)"
      }
      match => {
        "message" => "^#\ User\@Host:\ (?<User>.*)\[exiuapp\]\ \@\ \ \[%{IP:hostip}\]\ \ Id:\ \ \ \ %{NUMBER:Id:int}"
      }
      match => {
        "message" => "^#\ Query_time:\ %{NUMBER:Query_time:float}\ \ Lock_time:\ %{NUMBER:Lock_time:float}\ Rows_sent:\ %{NUMBER:Rows_sent:int}\ \ Rows_examined:\ %{NUMBER:Rows_examined:int}"
      }
      match => {
        "message" => "^use\ (?<database>.*)"
      }
      match => {
        "message" => "^SET\ timestamp=%{NUMBER:timestamp:int}\;"
      }
      match => {
        "message" => "(?<sql>.*);"
      }
      remove_field => "message"
    }
  }
}

output {

        elasticsearch {hosts => ["http://192.168.10.101:9200/"]index => "logstash-%{+YYYY.MM.dd}"document_type => "mysql_logs"}
} 

② 切割后显示结果

4、kibana 最终显示效果

① 哪几个的数据库最多,例:top2 库

表无法显示,因为有些语句不涉及表,切割不出来

② 哪几个sql语句出现的最多,例:top5 sql语句

③ 哪几个sql语句出现的最多,例:top5 sql语句

④ 哪几台服务器慢查询日志生成的最多,例:top5 服务器

⑤ 哪几个用户慢查询日志生成的最多,例:top2 用户

可以合并显示

5、使用mysql 模块收集mysql 的慢查询

(1)filebeat 配置和上边一样

(2)elk—logstash 切割mysql 的错误日志

# vim mysqllogs.conf

filter {grok {match => { "message" => "(?<timestamp>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}) %{NUMBER:pid:int} \[%{DATA:level}\] (?<content>.*)" }}date {match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]remove_field => "timestamp"}
}

(3)就不在展示结果了

四、ELK 收集多实例日志

很多情况下,公司资金不足,不会一对一收集日志;因此,一台logstash 使用多实例收集处理多台agent 的日志很有必要。

1、filebeat 的配置

主要是output 的配置,只需不同agent 指向不同的端口即可

① agent 1 配置指向5044 端口

#----------------------------- Logstash output --------------------------------
output.logstash:# The Logstash hostshosts: ["192.168.10.107:5044"]

② agent 2 配置指向5045 端口

#----------------------------- Logstash output --------------------------------
output.logstash:# The Logstash hostshosts: ["192.168.10.107:5045"]

2、logstash 的配置

针对不同的agent ,input 指定对应的端口

① agent 1

input {beats {port => "5044"}
}
output {   #可以在output 加以区分elasticsearch {hosts => ["http://192.168.10.107:9200/"]index => "logstash-apache1-%{+YYYY.MM.dd}"document_type => "apache1_logs"}
}

② agent 1

input {beats {port => "5045"}
}
output {   #可以在output 加以区分elasticsearch {hosts => ["http://192.168.10.107:9200/"]index => "logstash-apache2-%{+YYYY.MM.dd}"document_type => "apache2_logs"}
}

开启对应的服务就ok 了

转载于:https://www.cnblogs.com/dengbingbing/p/10485962.html

ELK 经典用法—企业自定义日志收集切割和mysql模块相关推荐

  1. 再见笨重的ELK!这套轻量级日志收集方案要火!

    之前一直使用的日志收集方案是ELK,动辄占用几个G的内存,有些配置不好的服务器有点顶不住!最近发现一套轻量级日志收集方案: Loki+Promtail+Grafana(简称LPG), 几百M内存就够了 ...

  2. elk替代_Golang-logrus简单的日志收集系统(替代ELKB)

    1. 背景 不废话, Golang 日志查看疼点linux查看日志,一般开发者对linux命令不是很熟悉, 搜索日志更加难上加难 JAVA生态 ELKB 日志收集搭建复杂, 需要的是一个快速查看搜索, ...

  3. SpringBoot整合Graylog做日志收集

    日志收集折腾过程 ELK 之前整合过ELK做日志采集,就是Elasticsearch + Logstash + Kibana: Elasticsearch:存储引擎,存放日志内容,利于全文检索 Log ...

  4. 日志收集系统loki+promtail+Grafana 部署

    一.简 介 Loki是受Prometheus启发由Grafana Labs团队开源的水平可扩展,高度可用的多租户日志聚合系统. 开发语言: Google Go.它的设计具有很高的成本效益,并且易于操作 ...

  5. 网络日志管理工具_企业网络日志管理的优质工具有哪些?

    规范的日志管理对企业是否合规的评判具有重要帮助.中国互联网发展到今天,网络合规审计已经到了势在必行的阶段.虽然互联网给我们的生活带来了诸多便利,但各类威胁也潜伏其中.企业网络设备遭受外部攻击所引发的网 ...

  6. 借鉴开源框架自研日志收集系统

    踏浪无痕 岂安科技高级架构师 十余年数据研发经验,擅长数据处理领域工作,如爬虫.搜索引擎.大数据应用高并发等.担任过架构师,研发经理等岗位.曾主导开发过大型爬虫,搜索引擎及大数据广告DMP系统目前负责 ...

  7. ELK+Kafka 企业日志收集平台(二)这是原版

    上篇博文主要总结了一下elk.基于kafka的zookeeper集群搭建,以及系统日志通过zookeeper集群达到我们集群的整个过程.下面我们接着下面这个未完成的几个主题 4.Kibana部署; 5 ...

  8. ELK+Kafka 企业日志收集平台(二)

    上篇博文主要总结了一下elk.基于kafka的zookeeper集群搭建,以及系统日志通过zookeeper集群达到我们集群的整个过程.下面我们接着下面这个未完成的几个主题 4.Kibana部署; 5 ...

  9. ELK+Kafka 企业日志收集平台(一)

    背景: 最近线上上了ELK,但是只用了一台Redis在中间作为消息队列,以减轻前端es集群的压力,Redis的集群解决方案暂时没有接触过,并且Redis作为消息队列并不是它的强项:所以最近将Redis ...

最新文章

  1. 模仿nginx修改进程名
  2. 重启jboss出现问题:端口被占用
  3. [转载] 你真的会用 Java 中的三目运算符吗
  4. 怎样设置计算机默认字体及语言,电脑win10系统怎么将paint 3D的语言设置为中文...
  5. 2019蓝桥杯A组:数列求值(递推式)
  6. 利用FreeMarker生成java源代码
  7. oracle数据库查询904错误,EXP-00008:遇到ORACLE错误904问题详解
  8. 【二〇二一·立春】读书笔记
  9. Vue实现点击上传图片预览图片功能
  10. 创建student-dissertation数据库
  11. C语言-make概述
  12. 普元的ajax,有人了解普元 primeton EOS 产品的么?可否评价一下?
  13. html5中画线效果标记是,HTML5画布中怎样绘制线?
  14. 虚拟机连接外网(桥接)
  15. 恭喜 SphereEx 联合创始人潘娟成为亚马逊云科技新晋 Data Hero
  16. java 小数乘法_集合复习教案
  17. 计算机专业第五批什么意思,2019下半年高中信息技术学科教师资格证面试试题(精选)第五批...
  18. 3、制定特性迭代计划
  19. java button和jbutton_java – JButton中的组合与继承
  20. stm32f10x 安装包_STM32标准库及的Keil软件包下载

热门文章

  1. 第1-10个xhtml程序
  2. html代码中本地路径里斜杠 / 和反斜杠 \ 的区别
  3. android 向左滑动动画,Android中的滑动动画
  4. STL-bitset源码解析
  5. 非二进制字符串数据:CHAR,VARCHAR,TEXT
  6. Echo Socket例子项目
  7. python文件字符串操作
  8. 深入浅出JSONP--解决ajax跨域问题
  9. 兼容Tomcat和Weblogic的Spring 数据源JNDI配置
  10. 一个简单的DWR入门例子