2021年大数据ELK(二十二):采集Apache Web服务器日志
全网最详细的大数据ELK文章系列,强烈建议收藏加关注!
新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点。
目录
采集Apache Web服务器日志
一、需求
二、准备日志数据
三、使用FileBeats将日志发送到Logstash
四、配置Logstash接收FileBeat数据并打印
五、Logstash输出数据到Elasticsearch
1、重新拷贝一份配置文件
2、将output修改为Elasticsearch
3、重新启动Logstash
4、追加一条日志到监控的文件中,并查看Elasticsearch中的索引、文档
六、Logstash过滤器
1、查看Logstash已经安装的插件
2、Grok插件
3、Grok语法
4、用法
七、匹配日志中的IP、日期并打印
1、配置Logstash
2、启动Logstash
八、解析所有字段
1、修改Logstash配置文件
2、测试并启动Logstash
九、将数据输出到Elasticsearch
1、过滤出来需要的字段
2、转换日期格式
3、输出到Elasticsearch指定索引
采集Apache Web服务器日志
一、需求
Apache的Web Server会产生大量日志,当我们想要对这些日志检索分析。就需要先把这些日志导入到Elasticsearch中。此处,我们就可以使用Logstash来实现日志的采集
打开这个文件,如下图所示。我们发现,是一个纯文本格式的日志。如下图所示:
这个日志其实由一个个的字段拼接而成,参考以下表格
字段名 |
说明 |
client IP |
浏览器端IP |
timestamp |
请求的时间戳 |
method |
请求方式(GET/POST) |
uri |
请求的链接地址 |
status |
服务器端响应状态 |
length |
响应的数据长度 |
reference |
从哪个URL跳转而来 |
browser |
浏览器 |
因为最终我们需要将这些日志数据存储在Elasticsearch中,而Elasticsearch是有模式(schema)的,而不是一个大文本存储所有的消息,而是需要将字段一个个的保存在Elasticsearch中。所以,我们需要在Logstash中,提前将数据解析好,将日志文本行解析成一个个的字段,然后再将字段保存到Elasticsearch中
二、准备日志数据
将Apache服务器日志上传到 /export/server/es/data/apache/ 目录
mkdir -p /export/server/es/data/apache/
三、使用FileBeats将日志发送到Logstash
在使用Logstash进行数据解析之前,我们需要使用FileBeat将采集到的数据发送到Logstash。之前,我们使用的FileBeat是通过FileBeat的Harvester组件监控日志文件,然后将日志以一定的格式保存到Elasticsearch中,而现在我们需要配置FileBeats将数据发送到Logstash。FileBeat这一端配置以下即可:
#----------------------------- Logstash output ---------------------------------
#output.logstash:# Boolean flag to enable or disable the output module.#enabled: true# The Logstash hosts#hosts: ["localhost:5044"]
hosts配置的是Logstash监听的IP地址/机器名以及端口号。
准备FileBeat配置文件
cd /export/server/es/filebeat-7.6.1-linux-x86_64vim filebeat-logstash.yml
因为Apache的web log日志都是以IP地址开头的,所以我们需要修改下匹配字段
filebeat.inputs:
- type: logenabled: truepaths:- /var/apache/log/access.*multiline.pattern: '^\d+\.\d+\.\d+\.\d+ 'multiline.negate: truemultiline.match: afteroutput.logstash:enabled: truehosts: ["node1:5044"]
启动FileBeat,并指定使用新的配置文件
./filebeat -e -c filebeat-logstash.yml
FileBeat将尝试建立与Logstash监听的IP和端口号进行连接。但此时,我们并没有开启并配置Logstash,所以FileBeat是无法连接到Logstash的。
2021-12-05T11:28:47.585+0800 ERROR pipeline/output.go:100 Failed to connect to backoff(async(tcp://node1.itcast.cn:5044)): dial tcp 192.168.88.100:5044: connect: connection refused
四、配置Logstash接收FileBeat数据并打印
Logstash的配置文件和FileBeat类似,它也需要有一个input、和output。基本格式如下:
# #号表示添加注释
# input表示要接收的数据
input {
}# file表示对接收到的数据进行过滤处理
filter {}# output表示将数据输出到其他位置
output {
}
配置从FileBeat接收数据
cd /export/server/es/logstash-7.6.1/config
vim filebeat-print.conf
input {beats {port => 5044}
}output {stdout {codec => rubydebug}
}
测试logstash配置是否正确
bin/logstash -f config/filebeat-print.conf --config.test_and_exit
[2021-12-05T11:46:33,940][INFO ][logstash.runner ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash
启动logstash
bin/logstash -f config/filebeat-print.conf --config.reload.automatic
reload.automatic:修改配置文件时自动重新加载
测试
创建一个access.log.1文件,使用cat test >> access.log.1往日志文件中追加内容。
test文件中只保存一条日志:
[root@node1 log]# cat test
235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] "POST /it.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249
当我们启动Logstash之后,就可以发现Logstash会打印出来从FileBeat接收到的数据:
{"log" => {"file" => {"path" => "/var/apache/log/access.log.1"},"offset" => 825},"input" => {"type" => "log"},"agent" => {"ephemeral_id" => "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05","version" => "7.6.1","type" => "filebeat","id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64","hostname" => "node1.itcast.cn"},"@timestamp" => 2021-12-05T09:07:55.236Z,"ecs" => {"version" => "1.4.0"},"host" => {"name" => "node1"},"tags" => [[0] "beats_input_codec_plain_applied"],"message" => "235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] \"POST /it.cn/bigdata.html 200 45 \"www.baidu.com\" \"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249","@version" => "1"
}
五、Logstash输出数据到Elasticsearch
通过控制台,我们发现Logstash input接收到的数据没有经过任何处理就发送给了output组件。而其实我们需要将数据输出到Elasticsearch。所以,我们修改Logstash的output配置。配置输出Elasticsearch只需要配置以下就可以了:
output {elasticsearch {hosts => [ "localhost:9200" ]}}
操作步骤:
1、重新拷贝一份配置文件
cp filebeat-print.conf filebeat-es.conf
2、将output修改为Elasticsearch
input {beats {port => 5044}
}output {elasticsearch {hosts => [ "node1:9200","node2:9200","node3:9200"]}
}
3、重新启动Logstash
bin/logstash -f config/filebeat-es.conf --config.reload.automatic
4、追加一条日志到监控的文件中,并查看Elasticsearch中的索引、文档
cat test >> access.log.1
// 查看索引数据
GET /_cat/indices?v
我们在Elasticsearch中发现一个以logstash开头的索引
{"health": "green","status": "open","index": "logstash-2021.12.05-000001","uuid": "147Uwl1LRb-HMFERUyNEBw","pri": "1","rep": "1","docs.count": "2","docs.deleted": "0","store.size": "44.8kb","pri.store.size": "22.4kb"}
// 查看索引库的数据
GET /logstash-2021.12.05-000001/_search?format=txt
{"from": 0,"size": 1}
我们可以获取到以下数据:
"@timestamp": "2021-12-05T09:38:00.402Z","tags": ["beats_input_codec_plain_applied"],"host": {"name": "node1"},"@version": "1","log": {"file": {"path": "/var/apache/log/access.log.1"},"offset": 1343},"agent": {"version": "7.6.1","ephemeral_id": "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05","id": "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64","hostname": "node1","type": "filebeat"},"input": {"type": "log"},"message": "235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] \"POST /it.cn/bigdata.html 200 45 \"www.baidu.com\" \"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249","ecs": {"version": "1.4.0"}
从输出返回结果,我们可以看到,日志确实已经保存到了Elasticsearch中,而且我们看到消息数据是封装在名为message中的,其他的数据也封装在一个个的字段中。我们其实更想要把消息解析成一个个的字段。例如:IP字段、时间、请求方式、请求URL、响应结果,这样
六、Logstash过滤器
在Logstash中可以配置过滤器Filter对采集到的数据进行中间处理,在Logstash中,有大量的插件供我们使用。参考官网:
Filter plugins | Logstash Reference [7.6] | Elastic
此处,我们重点来讲解Grok插件。
1、查看Logstash已经安装的插件
bin/logstash-plugin list
2、Grok插件
Grok是一种将非结构化日志解析为结构化的插件。这个工具非常适合用来解析系统日志、Web服务器日志、MySQL或者是任意其他的日志格式。
Grok官网:Grok filter plugin | Logstash Reference [7.6] | Elastic
3、Grok语法
Grok是通过模式匹配的方式来识别日志中的数据,可以把Grok插件简单理解为升级版本的正则表达式。它拥有更多的模式,默认,Logstash拥有120个模式。如果这些模式不满足我们解析日志的需求,我们可以直接使用正则表达式来进行匹配。
官网:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns
grok模式的语法是:%{SYNTAX:SEMANTIC}
SYNTAX指的是Grok模式名称,SEMANTIC是给模式匹配到的文本字段名。例如:
%{NUMBER:duration} %{IP:client}
duration表示:匹配一个数字,client表示匹配一个IP地址
默认在Grok中,所有匹配到的的数据类型都是字符串,如果要转换成int类型(目前只支持int和float),可以这样:%{NUMBER:duration:int} %{IP:client}
以下是常用的Grok模式:
NUMBER |
匹配数字(包含:小数) |
INT |
匹配整形数字 |
POSINT |
匹配正整数 |
WORD |
匹配单词 |
DATA |
匹配所有字符 |
IP |
匹配IP地址 |
PATH |
匹配路径 |
4、用法
filter {grok {match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }}}
七、匹配日志中的IP、日期并打印
235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] "POST /it.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249
我们使用IP就可以把前面的IP字段匹配出来,使用HTTPDATE可以将后面的日期匹配出来
配置Grok过滤插件
1、配置Logstash
input {beats {port => 5044}
}filter {grok {match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\]" }}
}output {stdout {codec => rubydebug}
}
2、启动Logstash
bin/logstash -f config/filebeat-filter-print.conf --config.reload.automatic
{"log" => {"offset" => 1861,"file" => {"path" => "/var/apache/log/access.log.1"}},"input" => {"type" => "log"},"tags" => [[0] "beats_input_codec_plain_applied"],"date" => "15/Apr/2015:00:27:19 +0849","ecs" => {"version" => "1.4.0"},"@timestamp" => 2021-12-05T11:02:05.809Z,"message" => "235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] \"POST /it.cn/bigdata.html 200 45 \"www.baidu.com\" \"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249","host" => {"name" => "node1"},"ip" => "235.9.200.242","agent" => {"hostname" => "node1","version" => "7.6.1","ephemeral_id" => "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05","id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64","type" => "filebeat"},"@version" => "1"
}
我们看到,经过Grok过滤器插件处理之后,我们已经获取到了ip和date两个字段。接下来,我们就可以继续解析其他的字段
八、解析所有字段
将日志解析成以下字段:
字段名 |
说明 |
client IP |
浏览器端IP |
timestamp |
请求的时间戳 |
method |
请求方式(GET/POST) |
uri |
请求的链接地址 |
status |
服务器端响应状态 |
length |
响应的数据长度 |
reference |
从哪个URL跳转而来 |
browser |
浏览器 |
1、修改Logstash配置文件
input {beats {port => 5044}
}filter {grok {match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA}\" %{INT:status} %{INT:length} \"%{DATA:reference}\" \"%{DATA:browser}\"" }}
}output {stdout {codec => rubydebug}
}
2、测试并启动Logstash
我们可以看到,8个字段都已经成功解析。
{"reference" => "www.baidu.com","@version" => "1","ecs" => {"version" => "1.4.0"},"@timestamp" => 2021-12-05T03:30:10.048Z,"ip" => "235.9.200.241","method" => "POST","uri" => "/it.cn/bigdata.html","agent" => {"id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64","ephemeral_id" => "734ae9d8-bcdc-4be6-8f97-34387fcde972","version" => "7.6.1","hostname" => "node1","type" => "filebeat"},"length" => "45","status" => "200","log" => {"file" => {"path" => "/var/apache/log/access.log"},"offset" => 1},"input" => {"type" => "log"},"host" => {"name" => "node1"},"tags" => [[0] "beats_input_codec_plain_applied"],"browser" => "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900","date" => "15/Apr/2015:00:27:19 +0849","message" => "235.9.200.241 - - [15/Apr/2015:00:27:19 +0849] \"POST /it.cn/bigdata.html HTTP/1.1\" 200 45 \"www.baidu.com\" \"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900\""
}
九、将数据输出到Elasticsearch
到目前为止,我们已经通过了Grok Filter可以将日志消息解析成一个一个的字段,那现在我们需要将这些字段保存到Elasticsearch中。我们看到了Logstash的输出中,有大量的字段,但如果我们只需要保存我们需要的8个,该如何处理呢?而且,如果我们需要将日期的格式进行转换,我们又该如何处理呢?
1、过滤出来需要的字段
要过滤出来我们需要的字段。我们需要使用mutate插件。mutate插件主要是作用在字段上,例如:它可以对字段进行重命名、删除、替换或者修改结构。
官方文档:Mutate filter plugin | Logstash Reference [7.6] | Elastic
例如,mutate插件可以支持以下常用操作
配置:
注意:此处为了方便进行类型的处理,将status、length指定为int类型
input {beats {port => 5044}
}filter {grok {match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA}\" %{INT:status:int} %{INT:length:int} \"%{DATA:reference}\" \"%{DATA:browser}\"" }}mutate {enable_metric => "false"remove_field => ["message", "log", "tags", "@timestamp", "input", "agent", "host", "ecs", "@version"]}
}output {stdout {codec => rubydebug}
}
2、转换日期格式
要将日期格式进行转换,我们可以使用Date插件来实现。该插件专门用来解析字段中的日期,官方说明文档:Date filter plugin | Logstash Reference [7.6] | Elastic
用法如下:
将date字段转换为「年月日 时分秒」格式。默认字段经过date插件处理后,会输出到@timestamp字段,所以,我们可以通过修改target属性来重新定义输出字段。
Logstash配置修改为如下:
input {beats {port => 5044}
}filter {grok {match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA}\" %{INT:status:int} %{INT:length:int} \"%{DATA:reference}\" \"%{DATA:browser}\"" }}mutate {enable_metric => "false"remove_field => ["message", "log", "tags", "@timestamp", "input", "agent", "host", "ecs", "@version"]}date {match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]target => "date"}
}output {stdout {codec => rubydebug}
}
启动Logstash:
bin/logstash -f config/filebeat-filter-print.conf --config.reload.automatic
{"status" => "200","reference" => "www.baidu.com","method" => "POST","browser" => "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900","ip" => "235.9.200.241","length" => "45","uri" => "/it.cn/bigdata.html","date" => 2015-04-14T15:38:19.000Z
}
3、输出到Elasticsearch指定索引
我们可以通过
elasticsearch {hosts => ["node1:9200" ,"node2:9200" ,"node3:9200"]index => "xxx"}
index来指定索引名称,默认输出的index名称为:logstash-%{+yyyy.MM.dd}。但注意,要在index中使用时间格式化,filter的输出必须包含 @timestamp字段,否则将无法解析日期。
input {beats {port => 5044}
}filter {grok {match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA}\" %{INT:status:int} %{INT:length:int} \"%{DATA:reference}\" \"%{DATA:browser}\"" }}mutate {enable_metric => "false"remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"]}date {match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]target => "date"}
}output {stdout {codec => rubydebug}elasticsearch {hosts => ["node1:9200" ,"node2:9200" ,"node3:9200"]index => "apache_web_log_%{+YYYY-MM}"}
}
启动Logstash
bin/logstash -f config/filebeat-apache-weblog.conf --config.test_and_exitbin/logstash -f config/filebeat-apache-weblog.conf --config.reload.automatic
注意:
- index名称中,不能出现大写字符
-
2021年大数据ELK(二十二):采集Apache Web服务器日志相关推荐
- 2021年大数据ELK(十二):Elasticsearch编程(环境准备)
全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 Elasticsearch编程 一.环境准备 1.准备IDEA项目结构 2.准 ...
- 2021年大数据Kafka(十二):❤️Kafka配额限速机制❤️
全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Kafka配额限速机制 限制producer端的速率 限制c ...
- 2021年大数据HBase(十二):Apache Phoenix 二级索引
全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Apache Phoenix 二级索引 一.索引分类 ...
- 2021年大数据Hive(十二):Hive综合案例!!!
全网最详细的大数据Hive文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Hive综合案例 一.需求描述 二.项目表的字段 三.进 ...
- 2021年大数据HBase(十):Apache Phoenix的基本入门操作
全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Apache Phoenix的基本入门操作 一.Pho ...
- 2021年大数据ELK(十九):使用FileBeat采集Kafka日志到Elasticsearch
全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 使用FileBeat采集Kafka日志到Elasticsearch 一.需求分 ...
- 2021年大数据ELK(十八):Beats 简单介绍和FileBeat工作原理
全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 Beats 简单介绍和FileBeat工作原理 一.Beats 二.FileB ...
- 2021年大数据ELK(十六):Elasticsearch SQL(职位查询案例)
全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 职位查询案例 一.查询职位索引库中的一条数据 二.将SQL转换为DSL 三.职 ...
- 2021年大数据ELK(十五):Elasticsearch SQL简单介绍
全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 Elasticsearch SQL简单介绍 一.SQL与Elasticsear ...
最新文章
- python 通过双栈实现队列
- Kotlin如何避免“!!”(非空断言)
- 函数 —— 分析命令行参数 getopt() getopt_long() getopt_long_only()
- 【Python】青少年蓝桥杯_每日一题_3.05_排列组合
- python----面对对象三大特征2
- python pygame模块怎么写游戏_使用 Python 和 Pygame 模块构建一个游戏框架
- ASP.NET MVC 音乐商店 - 6. 使用 DataAnnotations 进行模型验证
- 力扣题目系列:面试题57 - II. 和为s的连续正数序列
- Python下各种GUI(图形用户界面)简介、使用优缺点对比
- VB.NET数据库编程基础教程
- Spring注解原理详解
- 收集五款常用的HTML编辑软件
- 86版五笔字根表(JPG版)
- [CF1528F]AmShZ Farm
- 哭了,谁还会心疼?累了,谁让我依靠?
- Linux下用火焰图进行性能分析
- 哪里有免费大文件传输平台?通过这4个网站免费来进行大文件传输
- E3ZG_D62传感器 STM32C8T6
- vscode设置 pylint把违反了编码风格标准的提示信息忽略掉,就是Variable name “**“ doesn‘t conform to snake_case naming
- Win\Linux 双系统,如何删除linux的grub引导
热门文章
- 关于使用sklearn进行数据预处理 —— 归一化/标准化/正则化
- LeetCode简单题之爬楼梯
- 车载网络处理器带来多功能能力
- 现代传感器的接口:中断驱动的ADC驱动程序
- 分布式深度学习DDL解析
- C++ 对象的声明与引用
- HarmonyOS开发工具DevEcoStudio 的下载以及运行(包含下载开发工具,sdk,模拟机,以及运行第一个应用你好,世界)
- Python 匹配字符串开头内容与结尾内容(startswith与endswith)
- Plugin Error
- Java开发工具简介
- 2021年大数据ELK(十二):Elasticsearch编程(环境准备)