在本示例教程中,你将在索引之前使用 ingest pipeline 以通用日志格式解析服务器日志。 在开始之前,请检查摄取管道的先决条件。

你要解析的日志类似于以下内容:

127.0.0.1 user-identifier frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326

这些日志包含时间戳、IP 地址和用户代理。 你希望在 Elasticsearch 中为这三个项目提供自己的字段,以实现更快的搜索和可视化。 你还想知道请求来自哪里。通用日志格式的每一行具有如下的语法:

host ident authuser date request status bytes

一个 “-” 符号代表缺失的数据

  • 127.0.0.1 是向服务器发出请求的客户端(远程主机)的 IP 地址。
  • user-identifier 是客户端的 RFC 1413 身份。 通常 ”-”。
  • frank 是请求文档的人的用户 ID。 通常为“-”,除非 .htaccess 已请求身份验证。
  • [10/Oct/2000:13:55:36 -0700] 是收到请求的日期、时间和时区,默认为 strftime 格式 %d/%b/%Y:%H:%M:%S %z。
  • "GET /apache_pb.gif HTTP/1.0” 是来自客户端的请求行。 方法 GET、/apache_pb.gif 请求的资源和 HTTP/1.0 HTTP 协议。
  • 200 是返回给客户端的 HTTP 状态码。 2xx 是成功响应,3xx 是重定向,4xx 是客户端错误,5xx 是服务器错误。
  • 2326 是返回给客户端的对象的大小,以字节为单位。

在下面的示例中,日志的格式和上面的还说有所不同。

设计 grok pattern

上面的日志信息显然是一个非结构化的日志信息。如果我们直接写入到 Elasticsearch,那肯定是没有多大用处的,除了我们可以做一些简单的全文查询之外。为了能够让上面的日志信息更加易于分析统计,我们可以在写入之前对这个日志信息进行结构化。结构化的途径有很多。 我在文章 “Elasticsearch:创建 Ingest pipeline” 里有介绍几种方案。Ingest pipeline 无疑是一种比较容易部署且有效的方法。我们可以依托 Elasticsearch 的可拓展性,使用专门的 ingest 节点来处理 pipeline。

为了达到结构化的目的,我们可以采用 grok processor 来处理上面的常用日志格式。我们启动 Kibana:

我们首先在 Sample data 项输入如下的句子:

212.87.37.154 - - [05/May/2099:16:21:15 +0000] "GET /favicon.ico HTTP/1.1" 200 3638 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36"

在 Grok pattern 下输入:

%{IPORHOST:source.ip} %{USER:user.id} %{USER:user.name} \[%{HTTPDATE:@timestamp}\] "%{WORD:http.request.method} %{DATA:url.original} HTTP/%{NUMBER:http.version}" %{NUMBER:http.response.status_code:int} (?:-|%{NUMBER:http.response.body.bytes:int}) %{QS:http.request.referrer} %{QS:user_agent}

我们点击 Simulate 后就可以看到上面截图下面的输出。 如果你能看到上面的结构化的输出,则说明我们的 grok pattern 是正确的。

创建 ingest pipeline

我们有两种方法来创建 ingest pipeline。

使用 Kibana 提供的 UI

我们使用如下的方法来进行操作:

从上面的界面中,我们可以看出来我们可以甚至直接从 CSV 来创建一个 pipeline。点击上面的 New pipeline:

我们的 pipeline 名称定义为 common_log_format。接下来,我们来创建一系列的处理器来处理我们的日志:

在上面,我们在 Patterns 里输入:

%{IPORHOST:source.ip} %{USER:user.id} %{USER:user.name} \\[%{HTTPDATE:@timestamp}\\] \"%{WORD:http.request.method} %{DATA:url.original} HTTP/%{NUMBER:http.version}\" %{NUMBER:http.response.status_code:int} (?:-|%{NUMBER:http.response.body.bytes:int}) %{QS:http.request.referrer} %{QS:user_agent}

请注意,由于字符 \ 是特殊字符,它需要 escape 的字符,所以,我们需要使用两个 "\" 来代替。针对 " 字符,它也是一样的,需要 escape。在它的前面需要添加 \ 符号。点击上面的 Add 按钮。这样我们就创建了第一个 processor:

为了验证 Grok processor 的正确性,点击上面的 Add documents 来进行测试:

我们按照上面的提示填入相应的测试信息。针对我们的情况:

[{"_index": "index","_id": "id","_source": {"message": "212.87.37.154 - - [05/May/2099:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\""}}
]

同样地,我们需要 escape 引号这个特殊的字符。

点击上面的 Run the pipeline 按钮:

我们看到了我们想要的结果。它说明我们的 grok processor 是工作正常的。在实际的使用中,在解析的过程中,有可能会出现错误。它可能是不符合 grok pattern 的文档而造成的。我们可以参考之前的文章 “Elasticsearch:如何处理 ingest pipeline 中的异常” 来处理这些异常。

我们查看一下当前的时间戳格式:

我们发现它不是我们想要的格式,比如 2099-05-05T16:21:15.000Z。我们可以使用 date processor 来进行转换:

在上面,我们可以把格式 format 定义为:

dd/MMM/yyyy:HH:mm:ss Z

点击 Add 按钮:

上面显示我们已经成功地创建了 Grok 及 Date processors。它们是按照顺序依次执行的。点击 View output 按钮:

从上面的输出中,我们可以看出来显示的时间格式已经发生改变。它是按照我们默认的 locale 的时间格式来进行显示的。当然,我们甚至可以在 Date processor 里定义输出显示的时间格式,比如:

那么测试的结果是:

我们可以使用 GeoIP processor 来找到发生请求的地方在哪里。 我们仿照上面的步骤,添加 geoip processor。我们针对 source.ip 字段来操作:

点击上面的 Add 按钮:

点击上面的 View output:

从上面,我们可以看到新增加的字段。 这样我们就添加了 GeoIP processor。

我们可以按照同样的套路来添加针对 User agent 这个部分的解析。我们使用 user agent processor:

点击上面的 Add 按钮:

从上面,我可以看到新增加的 user_agent 字段。

最后我们点击 Create pipeline:

我们可以看到已经使用到的 processors。到目前为止,我们已经创建了一个叫做 common_log_format 的 ingest pipeline。 点击上面的拷贝按钮:

[{"grok": {"field": "message","patterns": ["%{IPORHOST:source.ip} %{USER:user.id} %{USER:user.name} \\[%{HTTPDATE:@timestamp}\\] \"%{WORD:http.request.method} %{DATA:url.original} HTTP/%{NUMBER:http.version}\" %{NUMBER:http.response.status_code:int} (?:-|%{NUMBER:http.response.body.bytes:int}) %{QS:http.request.referrer} %{QS:user_agent}"]}},{"date": {"field": "@timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"],"output_format": "yyyy-MMM-dd'T'HH:mm:ss Z"}},{"geoip": {"field": "source.ip","target_field": "source.geo"}},{"user_agent": {"field": "user_agent"}}
]

上面显示我们已经使用到的 processors。

使用 ingest pipeline API

我们也可以使用 ingest pipeline API 来创建 ingest pipeline。事实上,我们在上面的步骤中也可以得到这个请求的格式:

点击上面的拷贝 copy to clipboard,我们粘贴如下:

PUT _ingest/pipeline/common_log_format
{"description": "A pipeline to structure common logs ","processors": [{"grok": {"field": "message","patterns": ["%{IPORHOST:source.ip} %{USER:user.id} %{USER:user.name} \\[%{HTTPDATE:@timestamp}\\] \"%{WORD:http.request.method} %{DATA:url.original} HTTP/%{NUMBER:http.version}\" %{NUMBER:http.response.status_code:int} (?:-|%{NUMBER:http.response.body.bytes:int}) %{QS:http.request.referrer} %{QS:user_agent}"]}},{"date": {"field": "@timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"],"output_format": "yyyy-MMM-dd'T'HH:mm:ss Z"}},{"geoip": {"field": "source.ip","target_field": "source.geo"}},{"user_agent": {"field": "user_agent"}}]
}

这个就是我们使用的具体 API 来创建这个 ingest pipeline。在实际的书写过程中,我们通常并不直接创建一个 ingest pipeline,而是采用如下的格式来先测试一下是否正确与否:

POST _ingest/pipeline/_simulate
{"pipeline": {"processors": [{"grok": {"description": "Extract fields from 'message'","field": "message","patterns": ["""%{IPORHOST:source.ip} %{USER:user.id} %{USER:user.name} \[%{HTTPDATE:@timestamp}\] "%{WORD:http.request.method} %{DATA:url.original} HTTP/%{NUMBER:http.version}" %{NUMBER:http.response.status_code:int} (?:-|%{NUMBER:http.response.body.bytes:int}) %{QS:http.request.referrer} %{QS:user_agent}"""]}},{"date": {"description": "Format '@timestamp' as 'dd/MMM/yyyy:HH:mm:ss Z'","field": "@timestamp","formats": ["dd/MMM/yyyy:HH:mm:ss Z"]}},{"geoip": {"description": "Add 'source.geo' GeoIP data for 'source.ip'","field": "source.ip","target_field": "source.geo"}},{"user_agent": {"description": "Extract fields from 'user_agent'","field": "user_agent"}}]},"docs": [{"_source": {"message": "212.87.37.154 - - [05/May/2099:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\""}}]
}

在上面,我们使用 _simulate 终点来进行测试。我们使用了一个测试文档,看看输出的结果:

{"docs": [{"doc": {"_index": "_index","_id": "_id","_source": {"@timestamp": "2099-05-05T16:21:15.000Z","http": {"request": {"method": "GET","referrer": "\"-\""},"version": "1.1","response": {"body": {"bytes": 3638},"status_code": 200}},"source": {"geo": {"continent_name": "Europe","region_iso_code": "DE-BE","city_name": "Berlin","country_iso_code": "DE","country_name": "Germany","region_name": "Land Berlin","location": {"lon": 13.3878,"lat": 52.5312}},"ip": "212.87.37.154"},"message": "212.87.37.154 - - [05/May/2099:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\"","user": {"name": "-","id": "-"},"url": {"original": "/favicon.ico"},"user_agent": {"name": "Chrome","original": "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\"","os": {"name": "Mac OS X","version": "10.11.6","full": "Mac OS X 10.11.6"},"device": {"name": "Mac"},"version": "52.0.2743.116"}},"_ingest": {"timestamp": "2022-08-09T06:31:12.648918Z"}}}]
}

显然,我们的输出结果是正确的。然后,我们再使用上面的 ingest pipeline API 来创建一个 pipeline。

一旦我们定义好一个 pipeline, 我们可以使用它。在文章 “Elasticsearch:Elastic可观测性 - 运用 pipeline 使数据结构化” 有比较详细的描述。我们使用如下的方法来写入一个文档:

PUT common_log/_doc/1?pipeline=common_log_format
{"message": "212.87.37.154 - - [05/May/2099:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\""
}

我们也可以在使用 Beats 写入数据时调用:

output.elasticsearch:hosts: ["http://localhost:9200"]pipeline: common_log_format

或者在使用 reindex 时这么调用:

POST _reindex
{"source": {"index": "source"},"dest": {"index": "dest","pipeline": "common_log_format"}
}

或者在使用 _bulk 命令时这么使用:

POST _bulk?pipeline=common_log_format
{"index":{"_index":"test","_id":"1"}}
{"message":"212.87.37.154 - - [05/May/2099:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\""}

我们甚至可以直接在索引的设置中配置这个 pipeline:

PUT test1
{"settings": {"index.default_pipeline": "common_log_format"}
}PUT test1/_doc/1
{"message": "212.87.37.154 - - [05/May/2099:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\""
}

你还可以将 pipeline 参数与通过 update_by_query 一起使用,比如:

PUT test2/_doc/1
{"message": "212.87.37.154 - - [05/May/2099:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\""
}POST test2/_update_by_query?pipeline=common_log_format

把日志数据写入到 data stream

我们首先来创建一个 index template。这个 index template 含有 data stream。

PUT _index_template/my-data-stream-template
{"index_patterns": [ "my-data-stream*" ],"data_stream": { },"priority": 500
}

我们使用如下的命令来向 data stream 写入一个数据:

POST my-data-stream/_doc?pipeline=common_log_format
{"message": "89.160.20.128 - - [05/May/2099:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\""
}

我们接下来验证一下 data stream 里的数据是否已经被结构化了。我们使用如下的命令来进行查看:

GET my-data-stream/_search?filter_path=hits.hits._source

上面的命令显示的结果为:

{"hits": {"hits": [{"_source": {"@timestamp": "2099-May-05T16:21:15 +0000","http": {"request": {"referrer": "\"-\"","method": "GET"},"response": {"status_code": 200,"body": {"bytes": 3638}},"version": "1.1"},"source": {"geo": {"continent_name": "Europe","region_iso_code": "SE-O","city_name": "Trollhättan","country_iso_code": "SE","country_name": "Sweden","region_name": "Västra Götaland County","location": {"lon": 12.2816,"lat": 58.2854}},"ip": "89.160.20.128"},"message": "89.160.20.128 - - [05/May/2099:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\"","user": {"name": "-","id": "-"},"url": {"original": "/favicon.ico"},"user_agent": {"original": "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\"","os": {"name": "Mac OS X","version": "10.11.6","full": "Mac OS X 10.11.6"},"name": "Chrome","device": {"name": "Mac"},"version": "52.0.2743.116"}}}]}
}

很显然我们的数据已经是结构化的数据。我们可以使用 Kibana 强悍的可视化来对数据进行可视化,并挖掘数据里的洞察,比如分析那个时段是高峰期,那个 IP 地址下载的数据最多,那个国家的访问最多,那个访问的次数最多,哪种操作系统访问的最多,浏览器访问网站的分布是咋样的?

Elasticsearch:ingest pipeline 使用示例 - 解析常用日志格式相关推荐

  1. Elasticsearch Ingest Pipeline

    文章目录 1. 需求:修复与增强写入的数据 2. Ingest Node 3. Pipeline & Processor 4. 使用 Pipeline 切分字符串 5. 为 ES添加一个 Pi ...

  2. java syslog解析_syslog日志格式解析

    1.介绍 在Unix类操作系统上,syslog广泛应用于系统日志.syslog日志消息既可以记录在本地文件中,也可以通过网络发送到接收syslog的服务器.接收syslog的服务器可以对多个设备的sy ...

  3. mysql 日志mixed模式_[MySQL binlog]彻底解析Mixed日志格式的binlog

    MySQL binlog3种格式,row,mixed,statement. 解析工作 mysqlbinlog --base64-output=DECODE-ROWS -v mysql-bin.0001 ...

  4. Elasticsearch:Ingest pipeline 介绍

    Ingest pipeline 可让你在索引之前对数据执行常见转换. 例如,你可以使用 pipeline 删除字段.从文本中提取值并丰富你的数据. Pipeline 由一系列称为处理器(process ...

  5. Python 解析log日志

    Python 解析log日志 软件环境 环境搭建 待解析log日志格式 log解析脚本 解析后文本格式 软件环境 软件 版本 作用 Ubuntu 20.04 操作系统 python 3.8.10 py ...

  6. Elasticsearch:如何处理 ingest pipeline 中的异常

    在我之前的文章 "如何在 Elasticsearch 中使用 pipeline API 来对事件进行处理" 中,我详细地介绍了如何创建并使用一个 ingest pipeline.简 ...

  7. Elasticsearch:Ingest Pipeline 实践

    相比较 Logstash 而言,由于其丰富的 processors 而受到越来越多人的喜欢.最重要的一个优点就是它基于 Elasticsearch 极具可拓展性和维护性而受到开发者的喜欢.我在之前创建 ...

  8. Elasticsearch使用Ingest Pipeline进行数据预处理

    本文基于Elasticsearch7.x Elasticsearch可以使用自身的Ingest Pipeline功能进行数据预处理, 无须借助Logstash. Ingest Pipeline介绍 I ...

  9. Elasticsearch:如何使用 Elasticsearch ingest 节点来丰富日志和指标

    当导入数据到Elasticsearch中时,用其他信息丰富文档通常是有益的,这些信息以后可用于搜索或查看数据.丰富化是将权威来源的数据合并到文档中的过程,这些数据被摄入到 Elasticsearch ...

最新文章

  1. iOS绘制图片与文字
  2. linux nacos启动_Nacos集群安装配置
  3. 使用NoSQL实施实体服务–第5部分:使用云提高自治性
  4. Spring基于 XML 的声明式事务控制(配置方式)
  5. imagej链接资源
  6. imagereader java_java中ImageReader和BufferedImage获取图片尺寸实例
  7. 任正非:华为要防止内卷 精益求精不叫内卷
  8. c#对PL/SQL查询结果列复制的结果生成指定格式
  9. android 产品上线流程图,产品上线工作流程(试行)20050302.doc
  10. 电子表格计算机知识,EXCEL电子表格基本知识.doc
  11. webmax函数高级教程整理集
  12. c# NPOI按模板导出
  13. hdwiki的php架构,齐博CMS(原php168)整合百科系统(HDwiki)手记
  14. unable to load Private Key 6572:error:0906D06C:PEM routines:PEM_read_bio:no start line:.\crypto\pem\
  15. 韩顺平坦克大战项目0.2(画坦克并且移动)
  16. 得到app文稿导出_得到app学习笔记作为知识付费者,如何把所学内容快速输出?...
  17. softmax、softmax损失函数、cross-entropy损失函数
  18. Windows Server安全日志与系统事件变更审计
  19. spin_lock详解
  20. 数据集仓库 —— UCI Machine Learning Repository

热门文章

  1. 集线器,路由器,交换机的作用和区别是什么以及如何区分?
  2. ubuntu16.04 update 出现 aborted(core dumped)错误
  3. caffe 报错 Aborted(core dumped
  4. ESXi 社区版网卡驱动
  5. ms-sql数据类型和access数据类型大全
  6. TDengineGUI无法连接TDengine
  7. 51单片机的LCD12864电子秤设计
  8. tampermonkey如何寻找_Tampermonkey脚本安装问题及自用脚本推荐
  9. php 五舍六入,Golang浮点型的默认舍入规则——四舍六入五成双
  10. 为艺术而生的惊艳算法