我们知道我们可以使用 Filebeat 很方便地把我们的 log 数据收集进来并直接写入到我们的 Elasticsearch 之中。

就像我们上面的这个图显示的一样。这样我们就不需要另外一个 Logstash 的部署了。Logstash 可以很方便地帮我们对数据进行处理,比如对数据进行转换, 丰富数据等等。有一种情况,我们不想部署自己的 Logstash,但是我们还是像对我们的数据进行一些处理,那么我们该怎么办?我们其实可以利用 ingest node 所提供的 Pipeline 帮我们对数据进行处理。

Ingest node

如果大家还不知道如何配置我们的 node 为一个 ingest node 的话,可以参阅我之前的文章 “Elasticsarch中的一些重要概念:cluster, node, index, document, shards及replica”。我们需要在 Elasticsearch 中的配置文件 elasticsearch.yml 文件中配置:

node.ingest: true

ingest node 提供了在对文档建立索引之前对其进行预处理的功能:

  • 接收节点拦截索引或批量 API 请求
  • 运用转换(transformation)
  • 将文档传递回索引或批量 API

什么是pipeline呢?

如果大家想对pipleline有更多的了解,请参阅我的文章“如何在Elasticsearch中使用pipeline API来对事件进行处理”。

一个pipleline就是一套处理器:

  • 一个processor就像是Logstash里的一个filter
  • 拥有对通过管道(pipeline)的文档的读写权限

那么 Elastic 到底提供了哪些 processor 呢?我们可以参阅 Elastic 的官方文档,我们可以看到许多的 pocessors 可以被利用。

大家如果有兴趣的话,请查阅我们的官方文档做更一步的阅读。

定义一个 Pipleline

定义一个 Pipeline 也是非常直接的,你可以使用 PUT 命令配合 Ingest API 来操作。它是存在于 cluster state 里的。

PUT _ingest/pipeline/my-pipeline-id
{"description": "DESCRIPTION","processors": [{...}],"on_failure": [{...}]
}

这里my-pipleline-id是我们自己命令的在该cluster唯一标识是的pipleline ID。在里面,我们可以定义我们喜欢的processors数组。在处理失败后,我们也可以定义相应的processors来完成。

例子:

在今天的这个例子里,我们来使用 Filebeat 来读取一个 log 文件,并使用 processors 对这个 log 的数据进行处理。

准备数据

我们在网址 https://logz.io/sample-data 下载一个叫做 apache-daily-access.lo g的 log 文件。我们用 Atom 编辑器打开这个文件,显示有 17279 条数据:

每一条的数据是这样的格式:

20.168.183.41 - - [11/Sep/2019:00:00:05 +0000] "GET /category/health HTTP/1.1" 200 132 "/item/software/623" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.7 (KHTML, like Gecko) Chrome/16.0.912.77 Safari/535.7"

配置 Filebeat

如果大家还不知如何安装自己的 Filebeat 的话,请参阅我之前的文章 “Beats:通过 Filebeat 把日志传入到 Elasticsearch”。

为了能够 filebeat 把这个 log 数据传输到 Elasticsearch,我们可以使用如下的配置文件。我们创建一个叫做 filebeat_processor.yml 文件:

filebeat.inputs:
- type: logenabled: truefields:apache: truepaths:- /Users/liuxg/data/apache-daily-access.logoutput.elasticsearch:hosts: ["localhost:9200"]pipeline: "my_pipeline_id"

在这里请注意,我把上面下载的 apache-daily-access.log 文件置于我自己电脑的 /User/liuxg/data 目录下。在你自己做练习的时候,需要根据自己的文件路径进行调整。在这里,我们使用了一个叫做 my_pipleline_id 的 pipeline。它的定义如下:

PUT _ingest/pipeline/my_pipeline_id
{"description": "Drop ECS field and add one new field","processors": [{"remove": {"field": "ecs"},"set": {"field": "added_field","value": 0}}]
}

在上面,我们定义了两个 processor: remove 及 set。一个是删除一个叫做 ecs 的项,另外一个是添加一个叫做 added_field 的项,并把它的值设置为0。

在正常的情况下,如果在我们的配置文件中没有定义那个 pipleline 的情况下,那么他们的结果是:

      {"_index" : "filebeat-7.3.0-2019.09.11-000001","_type" : "_doc","_id" : "637VIG0BJD_DqHjgqvC5","_score" : 1.0,"_source" : {"@timestamp" : "2019-09-11T14:58:55.902Z","message" : """144.228.123.71 - - [11/Sep/2019:01:52:35 +0000] "GET /category/games HTTP/1.1" 200 117 "/search/?c=Books+Software" "Mozilla/5.0 (Windows NT 6.0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11"""","input" : {"type" : "log"},"fields" : {"apache" : true},"ecs" : {"version" : "1.0.1"},"host" : {"name" : "localhost"},"agent" : {"hostname" : "localhost","id" : "c88813ba-fdea-4a98-a0be-468fb53566f3","version" : "7.3.0","type" : "filebeat","ephemeral_id" : "ec3328d6-f7f0-4134-a2b6-8ff0c5141cc5"},"log" : {"offset" : 300352,"file" : {"path" : "/Users/liuxg/data/apache-daily-access.log"}}}}

你可以参考我的文章 “Beats:通过Filebeat把日志传入到Elasticsearch” 来查看这个结果。显然这里是有 ecs 这个 field 的。

运行 Filebeat

接下来,我们在 Filebeat 的安装目录,运行如下的命令:

./filebeat -c filebeat_processor.yml

我们在 Kibana 中可以通过如下的命令来查看:

GET _cat/indices?v

显然我们已经看到了一个已经生产的以 filebeat 为开头的文件名。我们可以通过如下的命令来查看它的数据:

GET filebeat-7.4.2/_search

那么其中的一个文档的 soure 是这样的:

       "_source" : {"agent" : {"hostname" : "localhost","id" : "45832d40-b664-466b-a523-3bc58890ea50","type" : "filebeat","ephemeral_id" : "dbbba131-9c33-4e82-a00a-9e8e09d3e799","version" : "7.4.2"},"log" : {"file" : {"path" : "/Users/liuxg/data/apache-daily-access.log"},"offset" : 11497},"message" : """164.51.31.185 - - [11/Sep/2019:00:04:15 +0000] "GET /item/giftcards/232 HTTP/1.1" 200 130 "/category/electronics" "Mozilla/5.0 (Windows NT 6.0) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.56 Safari/535.11"""","input" : {"type" : "log"},"@timestamp" : "2019-11-23T13:11:57.478Z","host" : {"name" : "localhost"},"fields" : {"apache" : true},"added_field" : 0}

显然 ecs 这个 field 已经不见了,而另外一个叫做 added_field 新的 field 被成功添加进来了。这个说明我们的 pipleline 是起作用的。

Beats: Filebeat 和 pipeline processors相关推荐

  1. ES Filebeat 使用 Pipeline 处理日志中的 @timestamp

    推荐阅读 Helm3(K8S 资源对象管理工具)视频教程:https://edu.csdn.net/course/detail/32506 Helm3(K8S 资源对象管理工具)博客专栏:https: ...

  2. Beats: 使用 Filebeat 进行日志结构化 - Python

    结构化日志背后的想法很简单:让应用程序直接编写 JSON 对象,而不是让应用程序将需要通过正则表达式解析的日志写入到你索引到 Elasticsearch 的 JSON 对象中. 举例来说,假设你正在编 ...

  3. filebeat收集日志到elsticsearch中并使用ingest node的pipeline处理

    filebeat收集日志到elsticsearch中 一.需求 二.实现 1.filebeat.yml 配置文件的编写 2.创建自定义的索引模板 3.加密连接到es用户的密码 1.创建keystore ...

  4. Elastic Stack之Beats(Filebeat、Metricbeat)、Kibana、Logstash教程

    如果你没有听说过Elastic Stack,那你一定听说过ELK,实际上ELK是三款软件的简称,分别是Elasticsearch.Logstash.Kibana组成,在发展的过程中,又有新成员Beat ...

  5. Beats:Beats processors

    我们通常的做法是使用 Elasticsearch 的 ingest node 或者 Logstash 来对数据进行清洗.这其中包括删除,添加,丰富,转换等等.但是针对每个 beat 来讲,它们也分别有 ...

  6. FileBeat + Pipeline 解析日志 保存至ElasticSearch(实战)

    文章目录 FileBeat + Pipeline 解析日志 保存至ElasticSearch(实战) 下载地址 目的 日志数据 模拟Pipeline 创建pipeline 查看Pipeline是否创建 ...

  7. ELK结合Beats工具的搭建使用(Metricbeat、Filebeat、Topbeat)

    ELK之间的合作机制: L(Logstash)作为信息收集者,主要是用来对日志的搜集.分析.过滤,支持大量的数据获取方式,一般工作方式为c/s架构,client端安装在需要收集日志的主机上,serve ...

  8. Beats:在 Docker 里运行 Filebeat

    Docker 是一套平台即服务(PaaS)产品,它使用操作系统级虚拟化来以称为容器的软件包交付软件.容器彼此隔离,并将它们自己的软件,库和配置文件捆绑在一起: 他们可以通过定义明确的渠道相互交流.所有 ...

  9. Beats:如何在 Docker 容器中运行 Filebeat

    今天在这篇博客中,我们将学习如何在容器环境中运行 Filebeat. 为了快速了解 Filebeat 是做什么用的: Filebeat用于转发和集中日志数据 它重量轻,小型化,使用的资源更少 它作为代 ...

最新文章

  1. Linux常用服务安装部署
  2. 【Apache】Apache的安装和配置
  3. 451 Research发布《2019年数据中心服务和基础设施预测》
  4. Spring Boot 接入 GitHub 第三方登录,只要两行配置!
  5. Winform中实现文件另存为后并打开文件
  6. 【One by One系列】IdentityServer4(四)授权码流程
  7. 使用BigQuery分析GitHub上的C#代码
  8. 钉钉项目任务怎么添加审批表单
  9. 大数据与实体经济深度融合全国行首站将于7月10日在贵州举办
  10. matlab gui实例_App Designer 自学实例8
  11. 全网首发:SHELL多个判断条件,不会短路
  12. 按键精灵和python功能对比_AutoIt3和按键精灵的功能对比第2/2页
  13. 发票验证出现服务器证书出错,网上认证发票平台证书密码出现错误怎么办?
  14. Unity 动态切换天空盒\反射天空盒材质
  15. android歌词控件
  16. 解构领域驱动设计--思维导图
  17. 霍尔在光伏发电系统中的应用与产品选型
  18. 全文标明引文报告html,知网查重报告之全文(标明引文)报告单参数详解
  19. sau交流学习社区--基于thinkjs+vue+redis+mysql+es6开发的周报企业管理系统weekly
  20. 关于cookie的跨域(一级域)

热门文章

  1. Ubuntu系统连接Android真机测试
  2. 我的GH60 - 极客定制GK61XS : eclipse软件开发常用快捷键新增绑定 (亦适用于不使用方向键/HOME/END情况的大牛)
  3. 用python画小星星
  4. maven依赖本地宝
  5. SCIENCE ROBOTICS:与机器人的相互注视会影响人类的神经活动,延迟决策过程
  6. 使用socket实现聊天对话
  7. 湘潭大学自考计算机,湘潭大学自考专业
  8. PPT高手常掉进的7个陷阱(下)
  9. ppt 深度学习绘图_最全中文深度学习入门书:小白易入,课程代码PPT全有 | 复旦邱锡鹏出品...
  10. 客户案例-Vibration Research