全网最详细的大数据ELK文章系列,强烈建议收藏加关注!

新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点。

目录

采集Apache Web服务器日志

一、需求

二、准备日志数据

三、使用FileBeats将日志发送到Logstash

四、配置Logstash接收FileBeat数据并打印

五、Logstash输出数据到Elasticsearch

1、重新拷贝一份配置文件

2、将output修改为Elasticsearch

3、重新启动Logstash

4、追加一条日志到监控的文件中,并查看Elasticsearch中的索引、文档

六、Logstash过滤器

1、查看Logstash已经安装的插件

2、Grok插件

3、Grok语法

4、用法

七、匹配日志中的IP、日期并打印

1、配置Logstash

2、启动Logstash

八、解析所有字段

1、修改Logstash配置文件

2、测试并启动Logstash

九、将数据输出到Elasticsearch

1、过滤出来需要的字段

2、转换日期格式

3、输出到Elasticsearch指定索引


采集Apache Web服务器日志

一、需求

Apache的Web Server会产生大量日志,当我们想要对这些日志检索分析。就需要先把这些日志导入到Elasticsearch中。此处,我们就可以使用Logstash来实现日志的采集

打开这个文件,如下图所示。我们发现,是一个纯文本格式的日志。如下图所示:

这个日志其实由一个个的字段拼接而成,参考以下表格

字段名

说明

client IP

浏览器端IP

timestamp

请求的时间戳

method

请求方式(GET/POST)

uri

请求的链接地址

status

服务器端响应状态

length

响应的数据长度

reference

从哪个URL跳转而来

browser

浏览器

因为最终我们需要将这些日志数据存储在Elasticsearch中,而Elasticsearch是有模式(schema)的,而不是一个大文本存储所有的消息,而是需要将字段一个个的保存在Elasticsearch中。所以,我们需要在Logstash中,提前将数据解析好,将日志文本行解析成一个个的字段,然后再将字段保存到Elasticsearch中

二、准备日志数据

将Apache服务器日志上传到 /export/server/es/data/apache/ 目录

mkdir -p /export/server/es/data/apache/

三、使用FileBeats将日志发送到Logstash

在使用Logstash进行数据解析之前,我们需要使用FileBeat将采集到的数据发送到Logstash。之前,我们使用的FileBeat是通过FileBeat的Harvester组件监控日志文件,然后将日志以一定的格式保存到Elasticsearch中,而现在我们需要配置FileBeats将数据发送到Logstash。FileBeat这一端配置以下即可:

#----------------------------- Logstash output ---------------------------------
#output.logstash:# Boolean flag to enable or disable the output module.#enabled: true# The Logstash hosts#hosts: ["localhost:5044"]

hosts配置的是Logstash监听的IP地址/机器名以及端口号。

准备FileBeat配置文件

cd /export/server/es/filebeat-7.6.1-linux-x86_64vim filebeat-logstash.yml

因为Apache的web log日志都是以IP地址开头的,所以我们需要修改下匹配字段

filebeat.inputs:
- type: logenabled: truepaths:- /var/apache/log/access.*multiline.pattern: '^\d+\.\d+\.\d+\.\d+ 'multiline.negate: truemultiline.match: afteroutput.logstash:enabled: truehosts: ["node1:5044"]

启动FileBeat,并指定使用新的配置文件

./filebeat -e -c filebeat-logstash.yml

FileBeat将尝试建立与Logstash监听的IP和端口号进行连接。但此时,我们并没有开启并配置Logstash,所以FileBeat是无法连接到Logstash的。

2021-12-05T11:28:47.585+0800    ERROR   pipeline/output.go:100  Failed to connect to backoff(async(tcp://node1.itcast.cn:5044)): dial tcp 192.168.88.100:5044: connect: connection refused

四、​​​​​​​配置Logstash接收FileBeat数据并打印

Logstash的配置文件和FileBeat类似,它也需要有一个input、和output。基本格式如下:

# #号表示添加注释
# input表示要接收的数据
input {
}# file表示对接收到的数据进行过滤处理
filter {}# output表示将数据输出到其他位置
output {
}

配置从FileBeat接收数据

cd /export/server/es/logstash-7.6.1/config
vim filebeat-print.conf
input {beats {port => 5044}
}output {stdout {codec => rubydebug}
}

测试logstash配置是否正确

bin/logstash -f config/filebeat-print.conf --config.test_and_exit
[2021-12-05T11:46:33,940][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

启动logstash

bin/logstash -f config/filebeat-print.conf --config.reload.automatic

reload.automatic:修改配置文件时自动重新加载

测试

创建一个access.log.1文件,使用cat test >> access.log.1往日志文件中追加内容。

test文件中只保存一条日志:

[root@node1 log]# cat test
235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] "POST /it.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249

当我们启动Logstash之后,就可以发现Logstash会打印出来从FileBeat接收到的数据:

{"log" => {"file" => {"path" => "/var/apache/log/access.log.1"},"offset" => 825},"input" => {"type" => "log"},"agent" => {"ephemeral_id" => "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05","version" => "7.6.1","type" => "filebeat","id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64","hostname" => "node1.itcast.cn"},"@timestamp" => 2021-12-05T09:07:55.236Z,"ecs" => {"version" => "1.4.0"},"host" => {"name" => "node1"},"tags" => [[0] "beats_input_codec_plain_applied"],"message" => "235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] \"POST /it.cn/bigdata.html 200 45 \"www.baidu.com\" \"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249","@version" => "1"
}

五、Logstash输出数据到Elasticsearch

通过控制台,我们发现Logstash input接收到的数据没有经过任何处理就发送给了output组件。而其实我们需要将数据输出到Elasticsearch。所以,我们修改Logstash的output配置。配置输出Elasticsearch只需要配置以下就可以了:

output {elasticsearch {hosts => [ "localhost:9200" ]}}

操作步骤:

1、重新拷贝一份配置文件

cp filebeat-print.conf filebeat-es.conf

2、将output修改为Elasticsearch

input {beats {port => 5044}
}output {elasticsearch {hosts => [ "node1:9200","node2:9200","node3:9200"]}
}

3、重新启动Logstash

bin/logstash -f config/filebeat-es.conf --config.reload.automatic

4、追加一条日志到监控的文件中,并查看Elasticsearch中的索引、文档

cat test >> access.log.1 

// 查看索引数据

GET /_cat/indices?v

我们在Elasticsearch中发现一个以logstash开头的索引

    {"health": "green","status": "open","index": "logstash-2021.12.05-000001","uuid": "147Uwl1LRb-HMFERUyNEBw","pri": "1","rep": "1","docs.count": "2","docs.deleted": "0","store.size": "44.8kb","pri.store.size": "22.4kb"}

// 查看索引库的数据

GET /logstash-2021.12.05-000001/_search?format=txt

{"from": 0,"size": 1}

我们可以获取到以下数据:

                    "@timestamp": "2021-12-05T09:38:00.402Z","tags": ["beats_input_codec_plain_applied"],"host": {"name": "node1"},"@version": "1","log": {"file": {"path": "/var/apache/log/access.log.1"},"offset": 1343},"agent": {"version": "7.6.1","ephemeral_id": "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05","id": "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64","hostname": "node1","type": "filebeat"},"input": {"type": "log"},"message": "235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] \"POST /it.cn/bigdata.html 200 45 \"www.baidu.com\" \"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249","ecs": {"version": "1.4.0"}

从输出返回结果,我们可以看到,日志确实已经保存到了Elasticsearch中,而且我们看到消息数据是封装在名为message中的,其他的数据也封装在一个个的字段中。我们其实更想要把消息解析成一个个的字段。例如:IP字段、时间、请求方式、请求URL、响应结果,这样

六、Logstash过滤器

在Logstash中可以配置过滤器Filter对采集到的数据进行中间处理,在Logstash中,有大量的插件供我们使用。参考官网:

Filter plugins | Logstash Reference [7.6] | Elastic

此处,我们重点来讲解Grok插件。

1、查看Logstash已经安装的插件

bin/logstash-plugin list

2、​​​​​​​Grok插件

Grok是一种将非结构化日志解析为结构化的插件。这个工具非常适合用来解析系统日志、Web服务器日志、MySQL或者是任意其他的日志格式。

Grok官网:Grok filter plugin | Logstash Reference [7.6] | Elastic

3、​​​​​​​Grok语法

Grok是通过模式匹配的方式来识别日志中的数据,可以把Grok插件简单理解为升级版本的正则表达式。它拥有更多的模式,默认,Logstash拥有120个模式。如果这些模式不满足我们解析日志的需求,我们可以直接使用正则表达式来进行匹配。

官网:https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns

grok模式的语法是:%{SYNTAX:SEMANTIC}

SYNTAX指的是Grok模式名称,SEMANTIC是给模式匹配到的文本字段名。例如:

%{NUMBER:duration} %{IP:client}

duration表示:匹配一个数字,client表示匹配一个IP地址

默认在Grok中,所有匹配到的的数据类型都是字符串,如果要转换成int类型(目前只支持int和float),可以这样:%{NUMBER:duration:int} %{IP:client}

以下是常用的Grok模式:

NUMBER

匹配数字(包含:小数)

INT

匹配整形数字

POSINT

匹配正整数

WORD

匹配单词

DATA

匹配所有字符

IP

匹配IP地址

PATH

匹配路径

4、​​​​​​​用法

    filter {grok {match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }}}

​​​​​​​七、匹配日志中的IP、日期并打印

235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] "POST /it.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249

我们使用IP就可以把前面的IP字段匹配出来,使用HTTPDATE可以将后面的日期匹配出来

配置Grok过滤插件

1、配置Logstash

input {beats {port => 5044}
}filter {grok {match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\]" }}
}output {stdout {codec => rubydebug}
}

2、启动Logstash

bin/logstash -f config/filebeat-filter-print.conf --config.reload.automatic
{"log" => {"offset" => 1861,"file" => {"path" => "/var/apache/log/access.log.1"}},"input" => {"type" => "log"},"tags" => [[0] "beats_input_codec_plain_applied"],"date" => "15/Apr/2015:00:27:19 +0849","ecs" => {"version" => "1.4.0"},"@timestamp" => 2021-12-05T11:02:05.809Z,"message" => "235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] \"POST /it.cn/bigdata.html 200 45 \"www.baidu.com\" \"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249","host" => {"name" => "node1"},"ip" => "235.9.200.242","agent" => {"hostname" => "node1","version" => "7.6.1","ephemeral_id" => "d4c3b652-4533-4ebf-81f9-a0b78c0d4b05","id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64","type" => "filebeat"},"@version" => "1"
}

我们看到,经过Grok过滤器插件处理之后,我们已经获取到了ip和date两个字段。接下来,我们就可以继续解析其他的字段

八、​​​​​​​解析所有字段

将日志解析成以下字段:

字段名

说明

client IP

浏览器端IP

timestamp

请求的时间戳

method

请求方式(GET/POST)

uri

请求的链接地址

status

服务器端响应状态

length

响应的数据长度

reference

从哪个URL跳转而来

browser

浏览器

1、修改Logstash配置文件

input {beats {port => 5044}
}filter {grok {match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA}\" %{INT:status} %{INT:length} \"%{DATA:reference}\" \"%{DATA:browser}\"" }}
}output {stdout {codec => rubydebug}
}

2、测试并启动Logstash

我们可以看到,8个字段都已经成功解析。

{"reference" => "www.baidu.com","@version" => "1","ecs" => {"version" => "1.4.0"},"@timestamp" => 2021-12-05T03:30:10.048Z,"ip" => "235.9.200.241","method" => "POST","uri" => "/it.cn/bigdata.html","agent" => {"id" => "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64","ephemeral_id" => "734ae9d8-bcdc-4be6-8f97-34387fcde972","version" => "7.6.1","hostname" => "node1","type" => "filebeat"},"length" => "45","status" => "200","log" => {"file" => {"path" => "/var/apache/log/access.log"},"offset" => 1},"input" => {"type" => "log"},"host" => {"name" => "node1"},"tags" => [[0] "beats_input_codec_plain_applied"],"browser" => "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900","date" => "15/Apr/2015:00:27:19 +0849","message" => "235.9.200.241 - - [15/Apr/2015:00:27:19 +0849] \"POST /it.cn/bigdata.html HTTP/1.1\" 200 45 \"www.baidu.com\" \"Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900\""
}

九、​​​​​​​将数据输出到Elasticsearch

到目前为止,我们已经通过了Grok Filter可以将日志消息解析成一个一个的字段,那现在我们需要将这些字段保存到Elasticsearch中。我们看到了Logstash的输出中,有大量的字段,但如果我们只需要保存我们需要的8个,该如何处理呢?而且,如果我们需要将日期的格式进行转换,我们又该如何处理呢?

1、​​​​​​​过滤出来需要的字段

要过滤出来我们需要的字段。我们需要使用mutate插件。mutate插件主要是作用在字段上,例如:它可以对字段进行重命名、删除、替换或者修改结构。

官方文档:Mutate filter plugin | Logstash Reference [7.6] | Elastic

例如,mutate插件可以支持以下常用操作

配置:

注意:此处为了方便进行类型的处理,将status、length指定为int类型

input {beats {port => 5044}
}filter {grok {match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA}\" %{INT:status:int} %{INT:length:int} \"%{DATA:reference}\" \"%{DATA:browser}\"" }}mutate {enable_metric => "false"remove_field => ["message", "log", "tags", "@timestamp", "input", "agent", "host", "ecs", "@version"]}
}output {stdout {codec => rubydebug}
}

2、​​​​​​​转换日期格式

要将日期格式进行转换,我们可以使用Date插件来实现。该插件专门用来解析字段中的日期,官方说明文档:Date filter plugin | Logstash Reference [7.6] | Elastic

用法如下:

将date字段转换为「年月日 时分秒」格式。默认字段经过date插件处理后,会输出到@timestamp字段,所以,我们可以通过修改target属性来重新定义输出字段。

Logstash配置修改为如下:

input {beats {port => 5044}
}filter {grok {match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA}\" %{INT:status:int} %{INT:length:int} \"%{DATA:reference}\" \"%{DATA:browser}\"" }}mutate {enable_metric => "false"remove_field => ["message", "log", "tags", "@timestamp", "input", "agent", "host", "ecs", "@version"]}date {match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]target => "date"}
}output {stdout {codec => rubydebug}
}

启动Logstash:

bin/logstash -f config/filebeat-filter-print.conf --config.reload.automatic
{"status" => "200","reference" => "www.baidu.com","method" => "POST","browser" => "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900","ip" => "235.9.200.241","length" => "45","uri" => "/it.cn/bigdata.html","date" => 2015-04-14T15:38:19.000Z
}

3、​​​​​​​输出到Elasticsearch指定索引

我们可以通过

elasticsearch {hosts => ["node1:9200" ,"node2:9200" ,"node3:9200"]index => "xxx"}

index来指定索引名称,默认输出的index名称为:logstash-%{+yyyy.MM.dd}。但注意,要在index中使用时间格式化,filter的输出必须包含 @timestamp字段,否则将无法解析日期。

input {beats {port => 5044}
}filter {grok {match => { "message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA}\" %{INT:status:int} %{INT:length:int} \"%{DATA:reference}\" \"%{DATA:browser}\"" }}mutate {enable_metric => "false"remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"]}date {match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]target => "date"}
}output {stdout {codec => rubydebug}elasticsearch {hosts => ["node1:9200" ,"node2:9200" ,"node3:9200"]index => "apache_web_log_%{+YYYY-MM}"}
}

启动Logstash

bin/logstash -f config/filebeat-apache-weblog.conf --config.test_and_exitbin/logstash -f config/filebeat-apache-weblog.conf --config.reload.automatic

注意:

  • index名称中,不能出现大写字符