Logstash具有一个有趣的功能,称为翻译过滤器 (translate filter)。 翻译过滤器用于根据字典或查找文件过滤传入数据中的特定字段。 然后,如果输入字段与字典查找文件中的条目匹配,则它将执行操作,例如,将字段添加到数据或发送电子邮件。这个和我们之前介绍的数据丰富是一样的。

一个简单的用例

也许你正在从Twitter索引数据,并想知道用户何时在其推文中提及某些特定单词。 创建一个被认为很有趣的单词列表。 每次在推文中提及该单词时,你都可以在数据中添加一个字段,以将数据标记为“interesting”。 现在,你可以轻松地在 Kibana 中制作一个图表,仅显示标记为“interesting”的推文。

在处理安全事件的日志文件时,这也是一个很好的功能,因为你可能希望对日志文件中的某些文件哈希或列入黑名单的IP发出警报。 在开始之前,我们先来看一个使用转换过滤器的 Logstash 配置文件的结构。

配置

你可以使用翻译过滤器的不同方式。 我将在下面内讨论其中两种方法。 翻译部分位于配置文件的过滤器部分。 见下文:

input {stdin {codec => json}
}filter {translate {# Your translate filter specific options go here.    ...}...
}output {stdout {codec => plain {charset => "ISO-8859-1"}}elasticsearch {index => "nginx_json_elk_example"document_type => "logs"}
}

现在你知道将与转换过滤器相关的选项放在何处。 我们需要在传入数据中定义一个要对其执行转换过滤器的字段。 在下面的示例中,我们正在 HTTP 代理服务器的日志中查看目标 IP。 通过以这种方式在转换过滤器中指定我们的字段,我们可以检查所有传入日志数据上的目标IP。

translate {field => "dst_ip"...
}

字典条目

你可以通过两种方法定义字典条目,以使翻译过滤器参考传入的数据。 他们是:

Dictionary configutation 选项

你可以在其中定义直接在配置文件中表示的值。 当你有少量数据表示不经常更改时,这很有用,比如

# Web server related ports and what they represent. Just an example.
dictionary => [     "80", "http",    "443", "https",    "8080", "http-alt"    ]

Dictionary lookup 文件

这对于大型数据集很有用,尤其是需要经常更新的数据。 例如,这对于包含许多条目并且经常每天更新的IP或电子邮件黑名单之类的东西很有用。比如:

# This is our Logstash configuration
translate {field => "agent"destination => "good_bot"dictionary_path => '/tmp/crawler_bot_list.yaml'
}

这是 yaml 文件的样子:

"Exalead Mozilla/5.0 (compatible; Exabot/3.0 (BiggerBetter); +http://www.exabot.com/go/robot)": "YES"
"Alexa Mozilla/5.0 (compatible; alexa site audit/1.0; +http://www.alexa.com/help/webmasters; siteaudit@alexa.com)": "YES"
"Grapeshot UK Mozilla/5.0 (compatible; GrapeshotCrawler/2.0; +http://www.grapeshot.co.uk/crawler.php)": "YES"
"Facebook facebookexternalhit/1.0 (+http://www.facebook.com/externalhit_uatext.php)": "YES"
"Archive.org Mozilla/5.0 (compatible; archive.org_bot +http://www.archive.org/details/archive.org_bot)": "YES"
"Shopstyle.com ShopStyle Server/1.0 (ShopStyle Server Agent; http://www.shopstyle.com/; info@shopstyle.com))": "YES"
"jobrapido.com Mozilla/5.0 (compatible; Jobrapido/1.1; +http://www.jobrapido.com)": "YES"

通过实际示例更容易理解这两种类型的用途。对于这些示例,我将制作一个所谓的 IP 黑名单。这些 IP 实际上并未被列入黑名单,我只是获取了数据中已经存在的 IP,并根据这些IP创建了虚假的黑名单。该示例将说明如何检查 nginx 日志中是否有列入黑名单的IP是否击中了你的网络服务器。

首先创建字典文件或 yaml 文件。我们将检查此文件中的 IP,以防传入 IP 访问Web服务器。关于该主题的文章还很少。但是,你可以通过以下 github 存储库在安全上下文中使用 ELK 堆栈查看一些不错的配置:GitHub - TravisFSmith/MyBroElk: Scripts for Bro IDS and ELK Stack。看一下logstash.conf。它将帮助你了解如何使用 Logstash 转换过滤器。

现在回到我们的例子。这是此示例的黑名单文件的样子:
(将这些内容放在名称为/tmp/blacklisted_ip.yaml的文件中)

"216.46.173.126": "true"
"180.179.174.219": "true"
"204.77.168.241": "true"
"65.39.197.164": "true"
"80.91.33.133": "true"
"84.208.15.12": "true"
"74.125.60.158": "true"

我们将在 remote_ip 字段中检查这些特定 IP 的传入日志数据。 如果我们在 remote_ip 字段中看到其中一个 IP,则将在文档中添加一个值为 true”的新字段 blacklisted_ip。 这是我们的配置:

logstash_translate.conf

input {stdin {codec => json}
}filter {date {match => ["time", "dd/MMM/YYYY:HH:mm:ss Z" ]locale => en}geoip {source => "remote_ip"target => "geoip"}translate {field => "remote_ip"destination => "blacklisted_ip"dictionary_path => '/tmp/blacklisted_ip.yaml'}grok {match => [ "request" , "%{WORD:request_action} %{DATA:request1} HTTP/%{NUMBER:http_version}" ]}
}output {stdout {codec => plain {charset => "ISO-8859-1"}}elasticsearch {index => "logstash-nginx"}
}

这利用了一些官方的 Elasticsearch 示例文件。 你需要先下载日志文件,然后才能尝试新的配置文件。

wget https://raw.githubusercontent.com/elastic/examples/master/Common%20Data%20Formats/nginx_json_logs/nginx_json_logs

现在,我们可以在日志文件上运行 Logstash。 确保你创建了字典查找文件 /tmp/blacklisted_ip.yaml

sudo cat nginx_json_logs | sudo ./bin/logstash -f ~/data/translate/logstash_translate.conf

上面的 logstash_translate.conf 的路径依赖于你的路径需要重新修改。我们在 Logstash 的 console 中可以看出来:

你可能会认为 “blacklisted_ip” 字段不是很有用。 你需要在 Kibana 进行一些尝试才能看到其价值。 打开 Kibana 并创建一个名为 “logstash-nginx-blacklisted” 的索引。 在发现选项卡中搜索:blacklisted_ip:true。

通过如下命令:

GET _cat/indices

我们可以看到一个新的叫做 logstash_nginx 的索引出现了:

我们为它创建一个index pattern:

我们在 Discover  中可以看到:

在上面我们可以看出来,有很多的文档具有 blacklisted_ip 为 true。

我们可以更进一步对这些 remote_ip 进行统计:

在上面,我们使用了 yaml 文件作为 dictionary_path。根据文档,我们实际上也可以使用 csv 格式的文档:

blacklisted_ip.csv 

"216.46.173.126","true"
"180.179.174.219","true"
"204.77.168.241","true"
"65.39.197.164","true"
"80.91.33.133","true"
"84.208.15.12","true"
"74.125.60.158","true"

根据文档的要求,CSV格式需要两列,第一列用作原始文本(查找键),第二列作为翻译。

在这种情况下,我们把 Logstash 的配置文件修改为:

logstash_translate_csv.conf

input {stdin {codec => json}
}filter {date {match => ["time", "dd/MMM/YYYY:HH:mm:ss Z" ]locale => en}geoip {source => "remote_ip"target => "geoip"}translate {field => "remote_ip"destination => "blacklisted_ip"dictionary_path => '/Users/liuxg/data/translate/blacklisted_ip.csv'}grok {match => [ "request" , "%{WORD:request_action} %{DATA:request1} HTTP/%{NUMBER:http_version}" ]}
}output {stdout {codec => rubydebug}elasticsearch {index => "logstash-nginx"}
}

重新启动 Logstash,我们将看到同样的结果。

丰富多个字段

在上面的演示中,我们发现,我们只能丰富一个字段。在实际的使用中,假如我们想丰富多个字段,那该怎么办呢?

我们重新修改之前的 blacklisted_ip.yaml 文件。

blacklisted_ip.yaml

"216.46.173.126": "true,good"
"180.179.174.219": "true,bad"
"204.77.168.241": "true,great"
"65.39.197.164": "true,teriffic"
"80.91.33.133": "true,fabulous"
"84.208.15.12": "true,excellent"
"74.125.60.158": "true,wonderful"

在上面,我们把除 IP 地址以外的数据写成一个字符串的模样。每当 translate 成功后,我们就会获取这个字符串。我们也同时修改 Logstash 的配置文件:

logstash_translate.conf  

input {stdin {codec => json}
}filter {date {match => ["time", "dd/MMM/YYYY:HH:mm:ss Z" ]locale => en}geoip {source => "remote_ip"target => "geoip"}translate {field => "remote_ip"destination => "csv_data"dictionary_path => '/Users/liuxg/data/translate/blacklisted_ip.yaml'}if ("" in [csv_data]) {csv {source => csv_dataseparator => ","columns => [ "blacklisted_ip", "comments"]}}grok {match => [ "request" , "%{WORD:request_action} %{DATA:request1} HTTP/%{NUMBER:http_version}" ]}
}output {stdout {codec => rubydebug}elasticsearch {index => "logstash-nginx"}
}

在上面,我们在 translate 过滤器中,把数据写入到 csv_data 这个字段,并在下面使用 csv 过滤器来对数据进行分析。

我们重新启动 Logstash:

sudo cat nginx_json_logs | sudo ./bin/logstash -f ~/data/translate/logstash_translate.conf 

最后,我们在 Discover 中查询:

我们会发现一个增加的字段 comments。

Logstash:Logstash translate 过滤器简介相关推荐

  1. logback filter过滤器简介说明

    转自: logback filter过滤器简介说明 下文笔者讲述logback中filter过滤器的简介说明,如下所示: Logback提供两类Filter Regular Filter;Turbo ...

  2. 过滤器一:过滤器简介;创建第一个Filter;

    本篇博客主要介绍过滤器的基本内容,同时编写一个过滤器小例子. 目录 一:过滤器简介 1.什么是过滤器? 2.过滤器执行过程: 3.过滤器开发的三要素:​ 二:过滤器范例 1.创建过滤器 2.测试过滤器 ...

  3. android 低频过滤器,Filters过滤器简介

    原标题:Filters过滤器简介 什么是过滤器? 滤波器是能够在衰减其他频率的同时传递(或放大)某些频率的电路.因此,滤波器可以从也包含不期望或不相关频率的信号中提取重要频率. 在电子领域,有许多过滤 ...

  4. HBase过滤器简介

    HBase过滤器简介 HBase过滤器(filter)提供非常强大的特性帮助用户提高其处理表中数据的效率. HBase中两种主要的数据读取函数是get()和scan(),他们都支持直接访问数据和通过指 ...

  5. Logstash~filter.json过滤器使用教程(附带示例)

    一.介绍 用来解析json格式数据的过滤器,默认会解析置顶字段的json数据并将其放置在Logstash事件的最顶级中,可以配置target配置项选择存放结果位置 如果解析的数据包含一个@timest ...

  6. Logstash [logstash.outputs.elasticsearch][main] retrying failed action with response code: 403

    问题描述: 今天发现当天的索引在ES中并没有创建,logstash中不停的报错: [2021-05-24T05:47:51,904][INFO ][logstash.outputs.elasticse ...

  7. Linux内核:网络过滤器简介与示例代码

    目录 简单的例子 实施 用户空间处理 Linux的一大优点是其网络功能.路由器,交换机等许多网络产品均基于嵌入式Linux操作系统. 网络过滤是Linux内核中很好的基础结构,它使我们能够过滤和操作网 ...

  8. 数字图像处理 拜耳过滤器简介

    1.概述 典型的图像传感器(例如我们在数码相机中使用的图像传感器)由许多单独的光电传感器组成,所有这些光电传感器都捕获光.这些光电传感器本身就能够捕捉光的强度,但不能捕捉其波长(颜色).因此,图像传感 ...

  9. Servlet过滤器---简介

    过滤器的基本概念 Servlet过滤器从字面上的字意理解为经过一层次的过滤处理才达到使用的要求,而其实Servlet过滤器就是服务器与客户端请求与响应的中间层组件,在实际项目开发中Servlet过滤器 ...

最新文章

  1. 059_arguments.callee和arguments.callee.caller
  2. Win7安装64位CentOS 6.4双系统详细过程
  3. 0.3秒定位解剖位置、定位精度提升超2.3%!
  4. php链接mysql验证用户登录,PHP连接mysql验证用户名是否存在
  5. Csv数据库CsvDb
  6. Instant Client 配置
  7. vue 识别身份证内容并映射到来源地区的js
  8. R语言| 缺失值判断
  9. yarn下载报错There appears to be trouble with your network connection. Retrying.
  10. [NOI2003] 文本编辑器
  11. 致江苏卫视《最强大脑第二季》节目组的一封信
  12. 荟研新材料 毕克BYK420 水性涂料和颜料浓缩浆用液态流变助剂 抗流挂剂
  13. gis核密度分析工具_抓取公共服务设施POI后,用GIS进行核密度分析的可视化过程...
  14. 解决ORA-01111, ORA-01110, ORA-01157
  15. 表白神器!教你如何使用 Python 绘制爱心墙获取芳心!
  16. python检索用人名查电话_创建一个将人名用作键的字典后,输入姓名查找值,返回错误...
  17. gentoo命令行刻录
  18. [SQL Server玩转Python] 二.T-SQL查询表格值及Python实现数据分析
  19. GD32E230开发笔记-GD32E230外设SPI的初始化
  20. 用python制作水仙花

热门文章

  1. 华为eNSP 配置DHCP自动分配IP地址
  2. 设计师使用的网站推荐
  3. 1GB等于多少MB?
  4. 文献阅读:Stylized Neural Painting
  5. 学习新体验-itron
  6. 机载点云单木分割方法和实现过程的概括介绍(论文赏析)
  7. 《创新创业实训》网课答案解析
  8. 计算机无法屏幕亮度,今天详解win10电脑屏幕亮度无法调节的具体解决手法
  9. 宝塔面板自定义404错误页面的设置方法
  10. 推荐一款学习app:Enki