我们通常的做法是使用 Elasticsearch 的 ingest node 或者 Logstash 来对数据进行清洗。这其中包括删除,添加,丰富,转换等等。但是针对每个 beat 来讲,它们也分别有自己的一组 processors 来可以帮我们处理数据。我们可以访问  Elastic 的官方网站来查看针对 filebeat 的所有 processors。 也就是说,我们可以在配置 beats 的时候并同时配置相应的 processors 来对数据进行处理。每个 processor 能够修改经过它的事件。

你可以在配置中定义 processors 以在将事件发送到配置的输出之前对其进行处理。 libbeat 库提供以下处理器:

  • 减少导出字段的数量
  • 使用附加元数据增强事件
  • 执行额外的处理和解码

每个处理器接收一个事件,将定义的操作应用于该事件,然后返回该事件。 如果你定义处理器列表,它们将按照在 Beats 配置文件中定义的顺序执行。

event -> processor 1 -> event1 -> processor 2 -> event2 ...

如果你想了解 ingest pipeline 是如何清洗这些事件的,请阅读我之前的文章 “Elastic可观测性 - 运用 pipeline 使数据结构化”。在之前文章 “深入理解 Dissect ingest processor” 中,我讲述了 dissect ingest processor 的应用。在今天的文章中,我将使用同样的 beat processor 来说明如何对数据进行格式化。

使用  Filebeat 来对数据进行处理

在今天的实验中,我们将使用如下是例子来进行。我们创建一个叫做 sample.log 的文件,其内容如下:


"321 - App01 - WebServer is starting"
"321 - App01 - WebServer is up and running"
"321 - App01 - WebServer is scaling 2 pods"
"789 - App02 - Database is will be restarted in 5 minutes"
"789 - App02 - Database is up and running"
"789 - App02 - Database is refreshing tables"

由于 Filebeat 是以换行符来识别每一行的数据的,所以我在文件的最后一行也加上了一个换行符以确保最后一行的数据能被导入。

我们创建一个叫做 filebeat_processors.yml 的 filebeat 配置文件:



- type: logenabled: truepaths:- /Users/liuxg/data/beatsprocessors/sample.logprocessors:- dissect:tokenizer: '"%{pid|integer} - %{service.name} - %{service.status}"'field: "message"target_prefix: ""trim_values: "all"- drop_fields:fields: ["ecs", "agent", "log", "input", "host"]setup.template.enabled: false
setup.ilm.enabled: falseoutput.elasticsearch:hosts: ["localhost:9200"]index: "sample"bulk_max_size: 1000

请注意你需要依据自己 sample.log 的位置修改上面的 paths 中的路径。请注意在上面,我们在 tokenizer 中是这样定义的:

'"%{pid|integer} - %{service.name} - %{service.status}"'

我们使用了 ' 字符来定义字符串。我们也可以使用 " 符号来定义字符串。在这种情况下,我们必须使用 \" 来进行 escape(转义):

"\"%{pid|integer} - %{service.name} - %{service.status}\""


另外,如果我们不想要其中的一个字段,那么我们可以使用 %{},或者更加易懂的方式 %{?field},比如,在上面的例子中,我们不想要 service.name 这个字段,我们可以使用如下的方法:

'"%{pid|integer} - %{?service.name} - %{service.status}"'

那么经过 Beats processor 的处理过后,我们将看不到 service.name 这个字段了。



- type: logenabled: truepaths:- /Users/liuxg/data/beatsprocessors/sample.logprocessors:- dissect:tokenizer: '"%{pid|integer} - %{service.name} - %{service.status}"'field: "message"target_prefix: ""trim_values: "all"- drop_fields:fields: ["ecs", "agent", "log", "input", "host"]- drop_event:when:equals:pid: 789setup.template.enabled: false
setup.ilm.enabled: falseoutput.elasticsearch:hosts: ["localhost:9200"]index: "sample"bulk_max_size: 1000

我们使用了 drop_event 这个 processor。当这个事件的 pid 的值为 789 时,那么整个事件将被删除,从而我们在 Elasticsearch 中看不到这个事件。

在上面,我们使用了 drop_fields , dissect 及 drop_event 三个 processors。当我们使用 dissect 处理器时,有时我们的日志里的空格是不规整的,比如有的是一个,有的是两个空格:

针对这种情况,我们可以使用 trim_values 来除去多余的空格:

我们使用如下的命令来运行 filebeat:

./filebeat -e -c ~/data/beatsprocessors/filebeat_processors.yml 


运行完上面的命令后,我们可以在 Kibana 中进行查询 sample 索引的内容:

GET sample/_search
{"took" : 0,"timed_out" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 6,"relation" : "eq"},"max_score" : 1.0,"hits" : [{"_index" : "sample","_type" : "_doc","_id" : "qrBscHYBpymojx8hDWuV","_score" : 1.0,"_source" : {"@timestamp" : "2020-12-17T11:18:16.540Z","message" : "\"321 - App01 - WebServer is starting\"","service" : {"name" : "App01","status" : "WebServer is starting"},"pid" : 321}},{"_index" : "sample","_type" : "_doc","_id" : "q7BscHYBpymojx8hDWuV","_score" : 1.0,"_source" : {"@timestamp" : "2020-12-17T11:18:16.541Z","pid" : 321,"message" : "\"321 - App01 - WebServer is up and running\"","service" : {"name" : "App01","status" : "WebServer is up and running"}}},{"_index" : "sample","_type" : "_doc","_id" : "rLBscHYBpymojx8hDWuV","_score" : 1.0,"_source" : {"@timestamp" : "2020-12-17T11:18:16.541Z","message" : "\"321 - App01 - WebServer is scaling 2 pods\"","service" : {"name" : "App01","status" : "WebServer is scaling 2 pods"},"pid" : 321}},{"_index" : "sample","_type" : "_doc","_id" : "rbBscHYBpymojx8hDWuV","_score" : 1.0,"_source" : {"@timestamp" : "2020-12-17T11:18:16.541Z","message" : "\"789 - App02 - Database is will be restarted in 5 minutes\"","pid" : 789,"service" : {"name" : "App02","status" : "Database is will be restarted in 5 minutes"}}},{"_index" : "sample","_type" : "_doc","_id" : "rrBscHYBpymojx8hDWuV","_score" : 1.0,"_source" : {"@timestamp" : "2020-12-17T11:18:16.541Z","service" : {"name" : "App02","status" : "Database is up and running"},"pid" : 789,"message" : "\"789 - App02 - Database is up and running\""}},{"_index" : "sample","_type" : "_doc","_id" : "r7BscHYBpymojx8hDWuV","_score" : 1.0,"_source" : {"@timestamp" : "2020-12-17T11:18:16.541Z","service" : {"status" : "Database is refreshing tables","name" : "App02"},"message" : "\"789 - App02 - Database is refreshing tables\"","pid" : 789}}]}

显然,我们得到了一个结构化的索引。在上面,我们对 pid 还进行了从字符串到整型值的转换。



- type: logenabled: truepaths:- /Users/liuxg/data/beatsprocessors/sample.logprocessors:- drop_fields:fields: ["ecs", "agent", "log", "input", "host"]- dissect:tokenizer: '"%{pid|integer} - %{service.name} - %{service.status}"'field: "message"target_prefix: "" - rename:fields:- from: "pid"to: "PID"ignore_missing: falsefail_on_error: true    setup.template.enabled: false
setup.ilm.enabled: falseoutput.elasticsearch:hosts: ["localhost:9200"]index: "sample"bulk_max_size: 1000


{"took" : 0,"timed_out" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 6,"relation" : "eq"},"max_score" : 1.0,"hits" : [{"_index" : "sample","_type" : "_doc","_id" : "UrB5cHYBpymojx8h7oCK","_score" : 1.0,"_source" : {"@timestamp" : "2020-12-17T11:33:26.114Z","service" : {"status" : "WebServer is starting","name" : "App01"},"message" : "\"321 - App01 - WebServer is starting\"","PID" : 321}},...

之前的 pid 已经转换为 PID 字段。



- type: logenabled: truepaths:- /Users/liuxg/data/beatsprocessors/sample.logprocessors:- drop_fields:fields: ["ecs", "agent", "log", "input", "host"]- dissect:tokenizer: '"%{pid|integer} - %{service.name} - %{service.status}"'field: "message"target_prefix: "" - rename:fields:- from: "pid"to: "PID"ignore_missing: falsefail_on_error: true- script:lang: javascriptid: my_filterparams:pid: 789source: >var params = {pid: 0};function register(scriptParams) {params = scriptParams;}function process(event) {if (event.Get("PID") == params.pid) {event.Cancel();}}        setup.template.enabled: false
setup.ilm.enabled: falseoutput.elasticsearch:hosts: ["localhost:9200"]index: "sample"bulk_max_size: 1000

在上面,当 PID 的值为 789 时,我们将过滤这个事件。重新运行 filebeat:

{"took" : 0,"timed_out" : false,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0},"hits" : {"total" : {"value" : 3,"relation" : "eq"},"max_score" : 1.0,"hits" : [{"_index" : "sample","_type" : "_doc","_id" : "5bCBcHYBpymojx8hrIup","_score" : 1.0,"_source" : {"@timestamp" : "2020-12-17T11:41:53.478Z","PID" : 321,"service" : {"status" : "WebServer is starting","name" : "App01"},"message" : "\"321 - App01 - WebServer is starting\""}},{"_index" : "sample","_type" : "_doc","_id" : "5rCBcHYBpymojx8hrIup","_score" : 1.0,"_source" : {"@timestamp" : "2020-12-17T11:41:53.479Z","message" : "\"321 - App01 - WebServer is up and running\"","service" : {"status" : "WebServer is up and running","name" : "App01"},"PID" : 321}},{"_index" : "sample","_type" : "_doc","_id" : "57CBcHYBpymojx8hrIup","_score" : 1.0,"_source" : {"@timestamp" : "2020-12-17T11:41:53.479Z","service" : {"status" : "WebServer is scaling 2 pods","name" : "App01"},"message" : "\"321 - App01 - WebServer is scaling 2 pods\"","PID" : 321}}]}

我们发现所有关于 PID 为789 的事件都被过滤掉了。

我们设置可以通过 script 的方法为事件添加一个 tag。当然由于这是一种 Javascript 的脚本编程,我们甚至可以依据一些条件对事件添加不同的 tag。


- type: logenabled: truepaths:- /Users/liuxg/data/beatsprocessors/sample.logprocessors:- drop_fields:fields: ["ecs", "agent", "log", "input", "host"]- dissect:tokenizer: '"%{pid|integer} - %{service.name} - %{service.status}"'field: "message"target_prefix: "" - rename:fields:- from: "pid"to: "PID"ignore_missing: falsefail_on_error: true- script:lang: javascriptid: my_filterparams:pid: 789source: >var params = {pid: 0};function register(scriptParams) {params = scriptParams;}function process(event) {if (event.Get("PID") == params.pid) {event.Cancel();}event.Tag("myevent")}setup.template.enabled: false
setup.ilm.enabled: falseoutput.elasticsearch:hosts: ["localhost:9200"]index: "sample"bulk_max_size: 1000

在上面,我们添加了 event.Tag("myevent")。重新运行我们可以看到:

    "hits" : [{"_index" : "sample","_type" : "_doc","_id" : "C7CScHYBpymojx8hkKVy","_score" : 1.0,"_source" : {"@timestamp" : "2020-12-17T12:00:20.365Z","message" : "\"321 - App01 - WebServer is starting\"","PID" : 321,"service" : {"name" : "App01","status" : "WebServer is starting"},"tags" : ["myevent"]}},

在上面,我们可以看到 tags 字段里有一个叫做 myevent 的值。
在今天的介绍中,我就当是抛砖引玉。更多关于 Filebeat 的 Beats processors,请参阅链接 Define processors | Filebeat Reference [7.16] | Elastic

在今天的文章中,我们介绍了一种数据处理的方式。这种数据处理可以在 beats 中进行实现,而不需要在 Elasticsearch 中的 ingest node 中实现。在实际的使用中,你需要依据自己的架构设计来实现不同的设计方案。

Beats:Beats processors相关推荐

  1. Beats:Beats 入门教程 (一)

    在今天的这个教程里,我们来针对初学者如何快速地了解 Beats 是什么,并如何快速地部署 Beats.如果你想了解更多关于 Beats 方面的知识,可以参阅我的文章. 在我们开始 Beats 知识的讲 ...

  2. Beats:Beats 入门教程 (二)

    这篇文章是 "Beats 入门教程 (一)"的续篇.在上一篇文章,我们主要讲述了 Beats 的一些理论方面的知识.在这篇文章中,我们将具体展示如何使用 Filebeat 及 Me ...

  3. Beats:Beats 在 Kibana 中的集中管理

    我们可以通过在命令行中对我们的 Beats 进行管理,比如我们可以启动 metric 几个模块,我们可以通过如下的命令来执行: ./metricbeat modules enable apache m ...

  4. Beats:如何调试 Beats processors

    在之前的 "Beats:Beats processors" 文章中,我详细地描述了如何使用 Beat 的 processors 对数据进行清洗.在很多情况下它是非常有用的一种方法. ...

  5. Beats:使用 Elastic Stack 记录 Python 应用日志

    日志记录实际上是每个应用程序都必须具备的功能.无论你选择基于哪种技术,都需要监视应用程序的运行状况和操作.随着应用程序扩展,这变得越来越困难,你需要查看不同的文件,文件夹甚至服务器来查找所需的信息.虽 ...

  6. Beats:为 Filebeat 配置 inputs

    Filebeat 模块为常见日志格式提供最快的入门体验.如果你对如何使用 Filebeat 模块还不是挺了解的话,请参阅我之前的文章: Beats:Beats 入门教程 (一) Beats:Beats ...

  7. Beats:在 Docker 里运行 Filebeat

    Docker 是一套平台即服务(PaaS)产品,它使用操作系统级虚拟化来以称为容器的软件包交付软件.容器彼此隔离,并将它们自己的软件,库和配置文件捆绑在一起: 他们可以通过定义明确的渠道相互交流.所有 ...

  8. Beats:如何在 Docker 容器中运行 Filebeat

    今天在这篇博客中,我们将学习如何在容器环境中运行 Filebeat. 为了快速了解 Filebeat 是做什么用的: Filebeat用于转发和集中日志数据 它重量轻,小型化,使用的资源更少 它作为代 ...

  9. Beats:Elastic Beats 介绍 及和 Logstash 的比较

    Elastic Stack 传统上由三个主要组件(Elasticsearch,Logstash 和 Kibana)组成,早已脱离了这种组合,现在也可以与名为 "Beats" 的第四 ...


  1. python爬虫系列(5.3-动态网站的爬取的策略)
  2. 可转债数据一览表集思录_学习先进的可转债投资策略
  3. libevent多线程使用bufferevent的那些事
  4. Flash学习笔记(01)
  5. Pygame初始-模仿windows待机画面
  6. Vue项目中使用svg文件
  7. java seconds_Java LocalTime minusSeconds()用法及代码示例
  8. 如何通过虚拟私有云保障服务安全【华为云分享】
  9. 访问自己的网站有病毒提示,为什么?
  10. Acwing第 38 场周赛
  11. 【赛尔原创】用对比集成式方法理解基于文档的对话
  12. 基于sklearn的线性分类器
  13. SVN客户端详细说明
  14. latex怎么看论文字数_如何确定latex文档字数
  15. IDC最新中国BI市场报告,永洪科技等中国厂商领跑
  16. ftrack出免费软件了--cineSync Paly
  17. 同城信息v6.6.3
  18. window7调用计算机,教你查看win7系统电脑使用记录的具体方法
  19. python还款程序_python 之简单模拟银行系统功能(卡号申请、还款、支付、取现)...
  20. 根据账户名称筛选收支明细


  1. 使用castor实现xml和java对象的转换
  2. 计算机dll修复,DLL缺失怎么办 DLL怎么修复
  3. SpringBoot 偷懒小工具:项目种子生成器
  4. 3dmax常用快捷键来了!!
  5. mail命令发送html格式的电子邮件
  6. GB/T 17626.2-2018下载网址
  7. 正版示波器软件安装教程NS-Scope
  8. Vue实战电商系统-五商品管理
  9. 【MATLAB】matlab曲线拟合与矩阵计算技巧
  10. Obiee+echarts实例之饼图(2)