转: 基于elk 实现nginx日志收集与数据分析
原文链接:https://www.cnblogs.com/wenchengxiaopenyou/p/9034213.html
一。背景
前端web服务器为nginx,采用filebeat + logstash + elasticsearch + granfa 进行数据采集与展示,对客户端ip进行地域统计,监控服务器响应时间等。
二。业务整体架构:
nginx日志落地——》filebear——》logstash——》elasticsearch——》grafna(展示)
三。先上个效果图,慢慢去一步步实现
如上只是简单的几个实用的例子,实际上有多维度的数据之后还可以定制很多需要的内容,如终端ip访问数,国家、地区占比,访问前十的省份,请求方法占比,referer统计,user_agent统计,慢响应时间统计,更有世界地图坐标展示等等,只要有数据就能多维度进行展示。这里提供模板搜索位置大家可以查找参考:https://grafana.com/dashboards
四,准备条件
需要具备如下条件:
1.nginx日志落地,需要主要落地格式,以及各个字段对应的含义。
2.安装filebeat。 filebeat轻量,功能相比较logstash而言比较单一。
3.安装logstash 作为中继服务器。这里需要说明一下的是,起初设计阶段并没有计划使用filebeat,而是直接使用logstash发往elasticsearch,但是当前端机数量增加之后logstash数量也随之增加,同时发往elasticsearch的数量增大,logstash则会抛出由于elasticsearch 限制导致的错误,大家遇到后搜索相应错误的代码即可。为此只采用logstash作为中继。
4.elasticsearch 集群。坑点是index templates的创建会影响新手操作 geoip模块。后文会有。
5.grafana安装,取代传统的kibana,grafana有更友好、美观的展示界面。
五。实现过程
1.nginx日志落地配置
nginx日志格式、字段的内容和顺序都是高度可定制化的,将需要收集的字段内容排列好。定义一个log_format
定义的形势实际上直接决定了logstash配置中对于字段抽取的模式,这里有两种可用,一种是直接在nginx日志中拼接成json的格式,在logstash中用codec => "json"来转换,
一种是常规的甚至是默认的分隔符的格式,在logstash中需要用到grok来进行匹配,这个会是相对麻烦些。两种方法各有优点。直接拼接成json的操作比较简单,但是在转码过程中
会遇到诸如 \x 无法解析的情况。 这里我也遇到过,如有必要后续会详谈。采用grok来匹配的方法相对难操作,但是准确性有提升。我们这里采用的是第一种方法,下面logstash部分
也会给出采用grok的例子。 nginx日志中日志格式定义如下:
1
2
3
4
5
6
7
8
9
10
|
log_format access_json '{"timestamp":"$time_iso8601",'
'"hostname":"$hostname",'
'"ip":"$remote_addrx",'
'"request_method":"$request_method",'
'"domain":"XXXX",'
'"size":$body_bytes_sent,'
'"status": $status,'
'"responsetime":$request_time,'
'"sum":"1"'
'}' ;
|
2.filebeat配置文件
关于filebeat更多内容请参考https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-overview.html
配置文件内容:filebeat.yml 这里应该不会遇到坑。
1
2
3
4
5
6
7
|
filebeat.prospectors:
- input_type: log
paths:
- /data0/logs/log_json/*.log #nginx日志路径
output.logstash:
hosts: [ "xxxx.xxxx.xxxx.xxx:12121" ] #logstash 服务器地址
|
3.logstahs配置文件内容:
这里是针对json已经拼接号,直接进行json转码的情况:
需要注意如下:
1)date模块必须有,否则会造成数据无法回填导致最终的图像出现锯齿状影响稳定性(原因是排列时间并不是日志产生的时间,而是进入logstash的时间)。这里后面的 yyyy-MM-dd'T'HH:mm:ssZZ 需要根据你日志中的日期格式进行匹配匹配规则见:https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html
2)需要说明的是我下面去掉了好多的字段(remove_field),原因是我们数据量大,es服务器有限。 可以根据需要随时调整收集的字段。
3) geoip 仅需要指定源ip字段名称即可,fields并不是必须的,我加入的原因还是由于资源有限导致的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
input {
beats {
port => 12121
host => "10.13.0.80"
codec => "json"
}
}
filter {
date {
match => [ "timestamp" , "yyyy-MM-dd'T'HH:mm:ssZZ" ]
#timezone => "Asia/Shanghai"
#timezone => "+00:00"
}
mutate {
convert => [ "status" , "integer" ]
convert => [ "sum" , "integer" ]
convert => [ "size" , "integer" ]
remove_field => "message"
remove_field => "source"
remove_field => "tags"
remove_field => "beat"
remove_field => "offset"
remove_field => "type"
remove_field => "@source"
remove_field => "input_type"
remove_field => "@version"
remove_field => "host"
remove_field => "client"
#remove_field => "request_method"
remove_field => "size"
remove_field => "timestamp"
#remove_field => "domain"
#remove_field => "ip"
}
geoip {
source => "ip"
fields => [ "country_name" , "city_name" , "timezone" , "region_name" , "location" ]
}
}
output {
elasticsearch {
hosts => [ "xxx:19200" , "xxx:19200" , "xxx:19200" ]
user => "xxx"
password => "xxx"
index => "logstash-suda-alllog-%{+YYYY.MM.dd}"
flush_size => 10000
idle_flush_time => 35
}
}
|
下面给出一个采用grok的例子:
其中match内的内容不用参考我的,需要根据你的字段个数,以及格式来定制。 这里其实是正则表达式。自带了一部分几乎就可以满足所有的需求了无需自己写了,可以参考:https://github.com/elastic/logstash/blob/v1.4.0/patterns/grok-patterns 直接用即可。
input {file {type => "access"path => ["/usr/local/nginx/logs/main/*.log"]} } filter {if [type] == "access" {if [message] =~ "^#" {drop {}}grok {match => ["message", "\[%{HTTPDATE:log_timestamp}\] %{HOSTNAME:server_name} \"%{WORD:request_method} %{NOTSPACE:query_string} HTTP/%{NUMBER:httpversion}\" \"%{GREEDYDATA:http_user_agent}\" %{NUMBER:status} %{IPORHOST:server_addr} \"%{IPORHOST:remote_addr}\" \"%{NOTSPACE:http_referer}\" %{NUMBER:body_bytes_sent} %{NUMBER:time_taken} %{GREEDYDATA:clf_body_bytes_sent} %{NOTSPACE:uri} %{NUMBER:m_request_time}"]}date {match => [ "log_timestamp", "dd/MMM/yyyy:mm:ss:SS Z" ]timezone => "Etc/UTC"}mutate {convert => [ "status","integer" ]convert => [ "body_bytes_sent","integer" ]convert => [ "m_request_time","float" ]}
在提供一个高级的用法:
ruby:强大的模块, 可以进行诸如时间转换,单位计算等,多个可以用分号隔开。
ruby {code => "event.set('logdateunix',event.get('@timestamp').to_i);event.set('request_time', event.get('m_request_time') / 1000000 )" }mutate {add_field => {"http_host" => "%{server_name}""request_uri" => "%{uri}"}
在windowns上使用lgostsh需要注意的是:(win上收集iis的日志,我想正常环境是不会用到的,但是我确实用到了。。。。。。)
path路径一定要采用 linux中的分割符来拼接路径,用win的格式则正则不能实现,大家可以测试下。其他配置则无区别。
input {file {type => "access"path => ["C:/WINDOWS/system32/LogFiles/W3SVC614874788/*.log"]} }
4.elasticsearch配置,集群的安装以及启动,调优这里不多说(说不完),需要注意的一个是,geoip location的格式,我这里采用的是index templates来实现的如下:
最重要的是 "location" 定义 (否则geoip_location字段格式有问题,无法拼接成坐标),其他可以根据情况自定:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
{
"order" : 0,
"version" : 50001,
"index_patterns" : [
"suda-*"
],
"settings" : {
"index" : {
"number_of_shards" : "5" ,
"number_of_replicas" : "1" ,
"refresh_interval" : "200s"
}
},
"mappings" : {
"_default_" : {
"dynamic_templates" : [
{
"message_field" : {
"path_match" : "message" ,
"mapping" : {
"norms" : false ,
"type" : "text"
},
"match_mapping_type" : "string"
}
},
{
"string_fields" : {
"mapping" : {
"norms" : false ,
"type" : "text" ,
"fields" : {
"keyword" : {
"ignore_above" : 256,
"type" : "keyword"
}
}
},
"match_mapping_type" : "string" ,
"match" : "*"
}
}
],
"properties" : {
"@timestamp" : {
"type" : "date"
},
"geoip" : {
"dynamic" : true ,
"properties" : {
"ip" : {
"type" : "ip"
},
"latitude" : {
"type" : "half_float"
},
"location" : {
"type" : "geo_point"
},
"longitude" : {
"type" : "half_float"
}
}
},
"@version" : {
"type" : "keyword"
}
}
}
},
"aliases" : {}
}
|
5 grafana 安装以及模板创建,这个比较简单,安装完直接写语句即可附一个例子如下:
这里的变量需要自定义:
通过上面应该能够完成一个完整的收集和展示情况,这里实际上提供了一种可行的方法,并么有太大的具体操作。
希望多多交流
推荐内容:kibana中文指南(有ibook版本,看着挺方便) 三斗室著
https://www.elastic.co/cn/
https://grafana.com/dashboards
https://grafana.com/
转载于:https://www.cnblogs.com/wangdaijun/p/9110556.html
转: 基于elk 实现nginx日志收集与数据分析相关推荐
- ELK+Kafka 企业日志收集平台(二)这是原版
上篇博文主要总结了一下elk.基于kafka的zookeeper集群搭建,以及系统日志通过zookeeper集群达到我们集群的整个过程.下面我们接着下面这个未完成的几个主题 4.Kibana部署; 5 ...
- ELK+Kafka 企业日志收集平台(二)
上篇博文主要总结了一下elk.基于kafka的zookeeper集群搭建,以及系统日志通过zookeeper集群达到我们集群的整个过程.下面我们接着下面这个未完成的几个主题 4.Kibana部署; 5 ...
- ELK+Kafka 企业日志收集平台(一)
背景: 最近线上上了ELK,但是只用了一台Redis在中间作为消息队列,以减轻前端es集群的压力,Redis的集群解决方案暂时没有接触过,并且Redis作为消息队列并不是它的强项:所以最近将Redis ...
- CentOS 7.2下ELK分析Nginx日志生产实战(高清多图)
注:本文系原创投稿 本文以api.mingongge.com.cn域名为测试对象进行统计,日志为crm.mingongge.com.cn和risk.mingongge.com.cn请求之和(此二者域名 ...
- 基于Flume的美团日志收集系统(二)改进和优化
在<基于Flume的美团日志收集系统(一)架构和设计>中,我们详述了基于Flume的美团日志收集系统的架构设计,以及为什么做这样的设计.在本节中,我们将会讲述在实际部署和使用过程中遇到的问 ...
- 基于Flume的美团日志收集系统(一)架构和设计
背景 美团的日志收集系统负责美团的所有业务日志的收集,并分别给Hadoop平台提供离线数据和Storm平台提供实时数据流.美团的日志收集系统基于Flume设计和搭建而成. <基于Flume的美团 ...
- 基于Flume的美团日志收集系统-----架构和设计
问题导读: 1.Flume-NG与Scribe对比,Flume-NG的优势在什么地方? 2.架构设计考虑需要考虑什么问题? 3.Agent死机该如何解决? 4.Collector死机是否会有影响? 5 ...
- 基于python的Nginx日志管理分析系统
温馨提示:文末有 CSDN 平台官方提供的学长 Wechat / QQ 名片 :) 1. 项目简介 本项目利用 pandas + sklearn 对 Nginx 的日志数据进行统计分析,并利用 fla ...
- 基于awk的nginx日志分析
基于awk的nginx日志分析 基于awk的nginx日志分析 定义 nginx日志 awk分析示例 基于awk的nginx日志分析 在系统调优的时候,经常要去分析nginx的请求日志,统计.分析各个 ...
最新文章
- iphone html邮件发送,html – iPhone邮件:由于锚标记,表格不会延伸到100%?
- Nmap扫描教程之网络基础服务DHCP服务类
- Monitor Asynchronous Apex
- 20141203图片Base64编码与解码
- Doctype的作用
- linux 关机命令_小猿圈Linux基础命令汇总
- QQ自动登陆脚本生成器 v1.0
- <a>标签下载excel文件
- Forth 语言学习
- 【Qt】Qt6系列教程汇总
- word模板中添加图片
- 产品分析报告—哔哩哔哩
- Go string类型及其使用
- Unity 应用的消息推送《一》本地推送
- 算法交易策略的成功回测
- 【数字化】数字化转型是什么、为什么、怎么办;2018年数字化供应链行业及案例分析报告
- 赛码网--求数列的和
- C++ 并发指南< future >(3)std::future std::shared_future
- python之安装第三方库失败的解决方法
- VLC更改设置,播放本地H264和H265流媒体文件和播放帧率