Logstash 6.2 参考指南(开始使用Logstash)

https://segmentfault.com/a/1190000015237808#articleHeader3

Logstash官网

https://www.elastic.co/cn/products/logstash

开始使用Logstash

本节将指导您安装Logstash并验证一切正常运行的过程,在学习如何存储你的第一件事之后,接下来,你将创建一个更高级的管道,该管道将Apache web日志作为输入,解析日志,并将解析后的数据写入到Elasticsearch集群中,然后,你将学习如何将多个输入和输出插件组合在一起,以统一来自各种不同来源的数据。

本部分包括以下主题:

  • 安装Logstash
  • 存储你的第一个事件
  • 使用Logstash解析日志
  • 将多个输入和输出插件拼接在一起

安装Logstash

注意
Logstash需要Java 8,不支持Java 9,使用官方的Oracle发行版或开源发行版,如OpenJDK

要检查Java版本,请运行以下命令:

java -version

在安装了Java的系统上,该命令产生的输出类似于以下内容:

java version "1.8.0_65"
Java(TM) SE Runtime Environment (build 1.8.0_65-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)

在一些Linux系统上,在尝试安装之前,您可能还需要导出JAVA_HOME环境,尤其是在从tarball中安装Java时,这是因为Logstash在安装过程中使用Java自动检测环境并安装正确的启动方法(SysV init脚本、Upstartsystemd)。如果在包安装期间,Logstash无法找到JAVA_HOME环境变量,你可能会得到一条错误消息,并且Logstash将无法正常启动。

从下载的二进制文件中安装

下载与主机环境匹配的Logstash安装文件并解压文件,不要将日志文件安装到包含冒号(:)字符的目录路径中。

在支持的Linux操作系统上,你可以使用包管理器来安装Logstash

从包存储库安装

我们也有用于APTYUM的发行版的存储库,注意,我们只提供二进制包,但不提供源包,因为包是作为Logstash构建的一部分创建的。

我们将Logstash包存储库按版本划分为不同的url,以避免在主要版本之间意外升级,所有6.x.y releases使用6.x作为版本号。

我们使用PGP密钥D88E42B4,即Elastic的签名密钥,带有指纹

4609 5ACC 8548 582C 1A26 99A9 D27D 666C D88E 42B4

在所有的包上签名,它可以从https://pgp.mit.edu获得。

APT

下载并安装公开签名密钥:

wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add

在进行之前,你可能需要在Debian上安装apt-transport-https软件包:

sudo apt-get install apt-transport-https

将存储库定义保存到/etc/apt/sources.list.d/elastic-6.x.list

echo "deb https://artifacts.elastic.co/packages/6.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-6.x.list

警告
使用上面描述的echo方法添加Logstash存储库,不要使用add-apt-repository,因为它也会添加deb-src条目,但是我们不提供源包。如果你已经添加了deb-src条目,你将会看到如下错误:

Unable to find expected entry 'main/source/Sources' in Release file (Wrong sources.list entry or malformed file)

只需从/etc/apt/sources.list中删除deb-src条目文件并安装应该按照预期工作。

运行sudo apt-get更新,存储库就可以使用了,你可以这样安装:

sudo apt-get update && sudo apt-get install logstash

管理Logstash作为系统服务请参阅运行Logstash文档。

YUM

下载并安装公开签名密钥:

rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

/etc/yum.repos.d/目录添加以下内容到.repo后缀的文件中,例如logstash.repo

[logstash-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

你的存储库已经准备好使用了,你可以使用如下命令安装:

sudo yum install logstash

警告
这些存储库不能使用仍然使用rpm v3的旧的基于rpm的发行版,比如CentOS5

管理Logstash作为系统服务请参阅运行Logstash文档。

Docker

镜像可以作为Docker容器运行Logstash,它们可以从Elastic Docker注册表获得。

有关如何配置和运行Logstash Docker容器的详细信息,请参阅Docker上的运行日志。

存储你的第一个事件

首先,让我们通过运行最基本的Logstash管道来测试你的Logstash安装。

Logstash管道有两个必需的元素,输入和输出,以及一个可选的元素,过滤器。输入插件使用来自源的数据,过滤器插件根据您的指定修改数据,输出插件将数据写入目的地。

要测试你的Logstash安装,请运行最基本的Logstash管道,例如:

cd logstash-6.2.4
bin/logstash -e 'input { stdin { } } output { stdout {} }'

注意
bin目录的位置因平台而异,请参阅目录布局以找到你的系统上bin/logstash的位置。

-e标志允许你直接从命令行指定配置,在命令行中指定配置可以让您快速测试配置,而不必在迭代之间编辑文件。示例中的管道接受来自标准输入的输入stdin,并将输入移动到标准输出stdout,在一个结构化的格式。

启动Logstash后,等待看到"Pipeline main started",然后在命令提示符下输入hello world:

hello world
2013-11-21T01:22:14.405+0000 0.0.0.0 hello world

Logstash向消息添加时间戳和IP地址信息,通过在运行Logstash的shell中发出CTRL-D命令来退出Logstash

恭喜你!您已经创建并运行了一个基本的Logstash管道,接下来,你将学习如何创建更实际的管道。

使用Logstash解析日志

在存储第一个事件时,你创建了一个基本的Logstash管道来测试你的Logstash设置,在现实情况中,Logstash管道要复杂一些:它通常有一个或多个输入、过滤器和输出插件。

在本节中,你将创建一个Logstash管道,该管道使用FilebeatApache web日志作为输入,解析这些日志以从日志中创建特定的、命名的字段,并将解析后的数据写入到Elasticsearch集群中,与其在命令行中定义管道配置,不如在配置文件中定义管道。

首先,到这里下载本示例中使用的示例数据集,然后解压文件。

配置Filebeat,将日志行发送到Logstash

在创建Logstash管道之前,你将配置Filebeat将日志行发送到LogstashFilebeat客户端是一个轻量级的、资源友好的工具,它从服务器上的文件收集日志,并将这些日志转发给您的Logstash实例进行处理。Filebeat是为可靠性和低延迟而设计的,Filebeat在主机上占用的资源较少,而Beats input插件将Logstash实例上的资源需求最小化。

注意
在一个典型的用例中,Filebeat运行在一台单独的机器上,而不是运行Logstash实例的机器上,为了本教程的目的,LogstashFilebeat在同一台机器上运行。

默认的Logstash安装包含Beats input插件,Beats input插件允许LogstashElastic Beats框架接收事件,也就是说任何Beat written使用Beats框架工作,例如PacketbeatMetricbeat,也可以将事件数据发送到Logstash

要在数据源机器上安装Filebeat,请从Filebeat产品页面下载适当的包。你还可以参考Beats文档中的开始使用Filebeat来获得更多的安装说明。

安装Filebeat后,需要对其进行配置,打开位于Filebeat安装目录中的filebeat.yml文件,并将内容替换为以下几行,确保路径指向Apache日志示例文件logstash-tutorial.log,你先前下载的日志:

filebeat.prospectors:
- type: logpaths:- /path/to/file/logstash-tutorial.log
output.logstash:hosts: ["localhost:5044"]

Filebeat处理的文件或文件的绝对路径

保存你的更改。

为了简化配置,你不会像在现实场景中那样指定TLS/SSL设置。

在数据源机器上,使用以下命令运行Filebeat:

sudo ./filebeat -e -c filebeat.yml -d "publish"

注意
如果以root身份运行Filebeat,则需要更改配置文件的所有权(请参阅Beats平台引用中的配置文件所有权和权限)

Filebeat将尝试在端口5044上连接,在使用活动的Beats插件开始之前,该端口上不会有任何答案,因此您看到的任何关于在该端口上连接失败的消息目前都是正常的。直到Logstash以活动的Beats插件开始之前,那个端口不会有任何答复,因此,现在您看到的任何关于在该端口上连接失败的消息都是正常的。

Filebeat输入配置Logstash

接下来,你将创建一个Logstash配置管道,使用Beats input插件从Beats接收事件。

下面的文本表示配置管道的概要:

# The # character at the beginning of a line indicates a comment. Use
# comments to describe your configuration.
input {
}
# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }
output {
}

这个概要是非功能性的,因为输入和输出部分没有定义任何有效的选项。

为了开始,在你的Logstash主目录中,复制并粘贴配置管道概要到一个文件中,并命名为first-pipeline.conf

接下来,通过添加以下几行到first-pipeline.conf文件中的输入部分来配置你的Logstash实例来使用Beats输入插件:

beats {port => "5044"
}

稍后你将配置Logstash以写入到Elasticsearch,现在,您可以向output部分添加以下一行,以便在运行Logstash时将输出打印到stdout:

stdout { codec => rubydebug }

完成后,first-pipeline.conf文件内容应该看起来向这样:

input {beats {port => "5044"}
}
# The filter part of this file is commented out to indicate that it is
# optional.
# filter {
#
# }
output {stdout { codec => rubydebug }
}

要验证你的配置,请运行以下命令:

bin/logstash -f first-pipeline.conf --config.test_and_exit

--config.test_and_exit选项解析配置文件并报告任何错误。

如果配置文件通过了配置测试,则使用以下命令启动Logstash:

bin/logstash -f first-pipeline.conf --config.reload.automatic

--config.reload.automatic选项允许自动重新加载配置文件,这样您不必在每次修改配置文件时停止并重新启动Logstash

Logstash启动时,你可能会看到一个或多个关于Logstash忽略pipelines.yml文件的警告消息,你可以忽略这个警告,pipelines.yml文件用于在一个Logstash实例中运行多个管道,对于这里显示的示例,您正在运行一个管道。

如果你的管道工作正常,你应该会看到一系列事件,如下面写到控制台的:

{"@timestamp" => 2017-11-09T01:44:20.071Z,"offset" => 325,"@version" => "1","beat" => {"name" => "My-MacBook-Pro.local","hostname" => "My-MacBook-Pro.local","version" => "6.0.0"},"host" => "My-MacBook-Pro.local","prospector" => {"type" => "log"},"source" => "/path/to/file/logstash-tutorial.log","message" => "83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] \"GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1\" 200 203023 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"","tags" => [[0] "beats_input_codec_plain_applied"]
}
...

使用Grok过滤器插件解析Web日志

现在你有了一个可以从Filebeat读取日志行的工作管道,但是你会注意到日志消息的格式并不理想,你希望解析日志消息,以便从日志中创建特定的、命名的字段,为此你将使用grok过滤器插件。

grok过滤器插件是在Logstash中默认可用的几个插件之一,有关如何管理Logstash插件的详细信息,请参阅插件管理器的参考文档。

grok过滤器插件允许你将非结构化日志数据解析为结构化和可查询的数据。

因为grok过滤器插件在传入的日志数据中寻找模式,配置插件需要你决定关于如何定义对你的用例有意义的模式,web服务器日志示例代表性的行如下:

83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] "GET /presentations/logstash-monitorama-2013/images/kibana-search.png
HTTP/1.1" 200 203023 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel
Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"

行开头的IP地址很容易识别,方括号中的时间戳也很容易识别,要解析数据,可以使用%{COMBINEDAPACHELOG} grok模式,该模式使用以下模式来构造来自Apache日志的行:

  • 信息:IP Address

    • 字段名:clientip
  • 信息:User ID

    • 字段名:ident
  • 信息:User Authentication

    • 字段名:auth
  • 信息:timestamp

    • 字段名:timestamp
  • 信息:HTTP Verb

    • 字段名:verb
  • 信息:Request body

    • 字段名:request
  • 信息:HTTP Version

    • 字段名:httpversion
  • 信息:HTTP Status Code

    • 字段名:response
  • 信息:Bytes served

    • 字段名:bytes
  • 信息:Referrer URL

    • 字段名:referrer
  • 信息:User agent

    • 字段名:agent

提示
如果你需要帮助构建grok模式,请尝试使用grok调试器,Grok调试器是基本许可证下的X-Pack特性,因此可以自由使用。

编辑first-pipeline.conf文件,用以下文本替换整个过滤器部分:

filter {grok {match => { "message" => "%{COMBINEDAPACHELOG}"}}
}

完成后,first-pipeline.conf的内容应该是这样的:

input {beats {port => "5044"}
}
filter {grok {match => { "message" => "%{COMBINEDAPACHELOG}"}}
}
output {stdout { codec => rubydebug }
}

保存你的更改,因为你已经启用了自动配置重载,你不必重新启动Logstash来获取更改,但是,你需要强制Filebeat来从头读取日志文件。要做到这一点,请转到正在运行Filebeat的终端窗口并按Ctrl+C关闭Filebeat,然后删除Filebeat注册表文件,例如,运行:

sudo rm data/registry

由于Filebeat存储它在注册表中获取的每个文件的状态,因此删除注册表文件将迫使Filebeat从头读取它正在获取的所有文件。

接下来,使用以下命令重新启动Filebeat:

sudo ./filebeat -e -c filebeat.yml -d "publish"

如果需要等待Logstash重新加载配置文件,那么在Filebeat开始处理事件之前可能会有一点延迟。

使用grok模式后,事件将会有以下JSON表示:

{"request" => "/presentations/logstash-monitorama-2013/images/kibana-search.png","agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"","offset" => 325,"auth" => "-","ident" => "-","verb" => "GET","prospector" => {"type" => "log"},"source" => "/path/to/file/logstash-tutorial.log","message" => "83.149.9.216 - - [04/Jan/2015:05:13:42 +0000] \"GET /presentations/logstash-monitorama-2013/images/kibana-search.png HTTP/1.1\" 200 203023 \"http://semicomplete.com/presentations/logstash-monitorama-2013/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"","tags" => [[0] "beats_input_codec_plain_applied"],"referrer" => "\"http://semicomplete.com/presentations/logstash-monitorama-2013/\"","@timestamp" => 2017-11-09T02:51:12.416Z,"response" => "200","bytes" => "203023","clientip" => "83.149.9.216","@version" => "1","beat" => {"name" => "My-MacBook-Pro.local","hostname" => "My-MacBook-Pro.local","version" => "6.0.0"},"host" => "My-MacBook-Pro.local","httpversion" => "1.1","timestamp" => "04/Jan/2015:05:13:42 +0000"
}

注意,该事件包含原始消息,但是日志消息也被分解为特定的字段。

使用Geoip过滤器插件增强数据

除了解析日志数据以获得更好的搜索之外,过滤器插件可以从现有数据中获得补充信息,作为一个例子,geoip插件查找IP地址,从地址中获取地理位置信息,并将该位置信息添加到日志中。

通过向first-pipeline.conf文件的filter部分添加以下代码行,配置您的Logstash实例以使用geoip过滤器插件:

geoip {source => "clientip"
}

geoip插件配置要求你指定包含要查找的IP地址的源字段的名称来查找,在本例中,clientip字段包含IP地址。

因为过滤器是按顺序计算的,确保geoip部分位于配置文件的grok部分之后,并且grokgeoip部分都嵌套在filter部分中。

完成后,first-pipeline.conf的内容应如下:

input {beats {port => "5044"}
}filter {grok {match => { "message" => "%{COMBINEDAPACHELOG}"}}geoip {source => "clientip"}
}
output {stdout { codec => rubydebug }
}

保存你的更改,如前所述,要强制Filebeat从头读取日志文件,请关闭Filebeat(按Ctrl+C),删除注册表文件,然后使用以下命令重新启动Filebeat:

sudo ./filebeat -e -c filebeat.yml -d "publish"

注意,事件现在包含地理位置信息:

{"request" => "/presentations/logstash-monitorama-2013/images/kibana-search.png","agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36\"","geoip" => {"timezone" => "Europe/Moscow","ip" => "83.149.9.216","latitude" => 55.7485,"continent_code" => "EU","city_name" => "Moscow","country_name" => "Russia","country_code2" => "RU","country_code3" => "RU","region_name" => "Moscow","location" => {"lon" => 37.6184,"lat" => 55.7485},"postal_code" => "101194","region_code" => "MOW","longitude" => 37.6184},...

将数据索引到Elasticsearch

现在web日志被分解成特定的字段,Logstash管道可以将数据索引到一个Elasticsearch集群中,编辑first-pipeline.conf文件并将整个输出部分替换为以下文本:

output {elasticsearch {hosts => [ "localhost:9200" ]}
}

使用这个配置,Logstash使用http协议连接到Elasticsearch,上面的例子假设在相同的实例上运行了LogstashElasticsearch。你可以使用主机配置指定一个远程的Elasticsearch实例:hosts => [ "es-machine:9092" ]

此时,你的first-pipeline.conf文件已经正确配置了输入、过滤和输出部分,并且看起来如下:

input {beats {port => "5044"}
}filter {grok {match => { "message" => "%{COMBINEDAPACHELOG}"}}geoip {source => "clientip"}
}
output {elasticsearch {hosts => [ "localhost:9200" ]}
}

保存你的更改,如前所述,要强制Filebeat从头读取日志文件,请关闭Filebeat(按Ctrl+C),删除注册表文件,然后使用以下命令重新启动Filebeat:

sudo ./filebeat -e -c filebeat.yml -d "publish"

测试你的管道

现在已经配置了Logstash管道,将数据索引到一个Elasticsearch集群中,你可以查询Elasticsearch

尝试一个基于grok过滤器插件创建的字段的Elasticsearch测试查询,将$DATE替换为当前日期,使用YYYY.MM.DD格式:

curl -XGET 'localhost:9200/logstash-$DATE/_search?pretty&q=response=200'

注意
索引名中使用的日期基于UTC,而不是Logstash所在的时区,如果查询返回index_not_found_exception,请确保logstash-$DATE反映了索引的实际名称,要查看可用索引的列表,可以使用这个查询:curl 'localhost:9200/_cat/indices?v'

你应该得到多份命中返回,例如:

{"took": 50,"timed_out": false,"_shards": {"total": 5,"successful": 5,"skipped": 0,"failed": 0},"hits": {"total": 98,"max_score": 2.793642,"hits": [{"_index": "logstash-2017.11.09","_type": "doc","_id": "3IzDnl8BW52sR0fx5wdV","_score": 2.793642,"_source": {"request": "/presentations/logstash-monitorama-2013/images/frontend-response-codes.png","agent": """"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"""","geoip": {"timezone": "Europe/Moscow","ip": "83.149.9.216","latitude": 55.7485,"continent_code": "EU","city_name": "Moscow","country_name": "Russia","country_code2": "RU","country_code3": "RU","region_name": "Moscow","location": {"lon": 37.6184,"lat": 55.7485},"postal_code": "101194","region_code": "MOW","longitude": 37.6184},"offset": 2932,"auth": "-","ident": "-","verb": "GET","prospector": {"type": "log"},"source": "/path/to/file/logstash-tutorial.log","message": """83.149.9.216 - - [04/Jan/2015:05:13:45 +0000] "GET /presentations/logstash-monitorama-2013/images/frontend-response-codes.png HTTP/1.1" 200 52878 "http://semicomplete.com/presentations/logstash-monitorama-2013/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.77 Safari/537.36"""","tags": ["beats_input_codec_plain_applied"],"referrer": """"http://semicomplete.com/presentations/logstash-monitorama-2013/"""","@timestamp": "2017-11-09T03:11:35.304Z","response": "200","bytes": "52878","clientip": "83.149.9.216","@version": "1","beat": {"name": "My-MacBook-Pro.local","hostname": "My-MacBook-Pro.local","version": "6.0.0"},"host": "My-MacBook-Pro.local","httpversion": "1.1","timestamp": "04/Jan/2015:05:13:45 +0000"}},...

尝试另一个搜索从IP地址派生的地理信息,将$DATE替换为当前日期,使用YYYY.MM.DD格式:

curl -XGET 'localhost:9200/logstash-$DATE/_search?pretty&q=geoip.city_name=Buffalo'

一些日志条目来自Buffalo,因此查询产生以下响应:

{"took": 9,"timed_out": false,"_shards": {"total": 5,"successful": 5,"skipped": 0,"failed": 0},"hits": {"total": 2,"max_score": 2.6390574,"hits": [{"_index": "logstash-2017.11.09","_type": "doc","_id": "L4zDnl8BW52sR0fx5whY","_score": 2.6390574,"_source": {"request": "/blog/geekery/disabling-battery-in-ubuntu-vms.html?utm_source=feedburner&utm_medium=feed&utm_campaign=Feed%3A+semicomplete%2Fmain+%28semicomplete.com+-+Jordan+Sissel%29","agent": """"Tiny Tiny RSS/1.11 (http://tt-rss.org/)"""","geoip": {"timezone": "America/New_York","ip": "198.46.149.143","latitude": 42.8864,"continent_code": "NA","city_name": "Buffalo","country_name": "United States","country_code2": "US","dma_code": 514,"country_code3": "US","region_name": "New York","location": {"lon": -78.8781,"lat": 42.8864},"postal_code": "14202","region_code": "NY","longitude": -78.8781},"offset": 22795,"auth": "-","ident": "-","verb": "GET","prospector": {"type": "log"},"source": "/path/to/file/logstash-tutorial.log","message": """198.46.149.143 - - [04/Jan/2015:05:29:13 +0000] "GET /blog/geekery/disabling-battery-in-ubuntu-vms.html?utm_source=feedburner&utm_medium=feed&utm_campaign=Feed%3A+semicomplete%2Fmain+%28semicomplete.com+-+Jordan+Sissel%29 HTTP/1.1" 200 9316 "-" "Tiny Tiny RSS/1.11 (http://tt-rss.org/)"""","tags": ["beats_input_codec_plain_applied"],"referrer": """"-"""","@timestamp": "2017-11-09T03:11:35.321Z","response": "200","bytes": "9316","clientip": "198.46.149.143","@version": "1","beat": {"name": "My-MacBook-Pro.local","hostname": "My-MacBook-Pro.local","version": "6.0.0"},"host": "My-MacBook-Pro.local","httpversion": "1.1","timestamp": "04/Jan/2015:05:29:13 +0000"}},...

如果你正在使用Kibana来可视化你的数据,你还可以在Kibana中探索Filebeat数据:

有关如何为Filebeat加载Kibana索引模式的信息,请参见Filebeat入门文档。

你已经成功地创建了一个管道,它使用FilebeatApache web日志作为输入,解析这些日志以创建特定的日志,从日志中命名字段,并将解析后的数据写入到Elasticsearch集群。接下来,学习如何创建使用多个输入和输出插件的管道。

将多个输入和输出插件拼接在一起

你需要管理的信息通常来自多个不同的数据源,并且用例可以为你的数据要求多个目的地,你的日志存储管道可以使用多个输入和输出插件来处理这些需求。

在本节中,你将创建一个Logstash管道,该管道接收来自Twitter提要和Filebeat客户机的输入,然后将信息发送到一个Elasticsearch集群,并将信息直接写入文件。

读取来自Twitter的提要

要添加Twitter提要,请使用Twitter输入插件,要配置插件,需要以下信息:

  • 一个消费者key,它唯一地标识您的Twitter应用程序
  • 一个消费者secret,作为你的Twitter应用的密码
  • 在传入的提要中搜索一个或多个关键字,这个例子显示了使用“cloud”作为关键字,但是你可以使用任何你想要的
  • 一个oauth token,它标识使用这个应用程序的Twitter帐户

访问https://dev.twitter.com/apps来建立一个Twitter账户,生成你的消费keysecret,以及你的访问tokensecret,如果你不确定如何生成这些key,请参阅twitter输入插件的文档。

就像前面使用Logstash解析日志时所做的那样,创建一个包含配置管道概要的配置文件(称为second-pipeline.conf),如果需要,可以重用前面创建的文件,但请确保在运行Logstash时传递正确的配置文件名。

将以下几行添加到second-pipeline.conf文件的输入部分,将此处所示的占位符值替换为你的值:

twitter {consumer_key => "enter_your_consumer_key_here"consumer_secret => "enter_your_secret_here"keywords => ["cloud"]oauth_token => "enter_your_access_token_here"oauth_token_secret => "enter_your_access_token_secret_here"}

配置Filebeat,将日志行发送到Logstash

正如你在配置Filebeat以将日志行发送到Logstash时所了解的那样,Filebeat客户机是一个轻量级的、资源友好的工具,它可以从服务器上的文件中收集日志,并将这些日志转发到您的Logstash实例进行处理。

安装Filebeat后,需要对其进行配置,打开位于Filebeat安装目录中的filebeat.yml文件,并使用以下代码替换内容,确保路径指向你的系统日志:

filebeat.prospectors:
- type: logpaths:- /var/log/*.log fields:type: syslog
output.logstash:hosts: ["localhost:5044"]
  • paths:Filebeat处理的文件或文件的绝对路径
  • 将一个名为type值为syslog的字段添加到该事件中

保存你的更改。

为了简化配置,你不会像在现实场景中那样指定TLS/SSL设置。

通过在second-pipeline.conf文件的输入部分添加以下代码行,将您的Logstash实例配置为使用Filebeat输入插件:

beats {port => "5044"
}

Logstash数据写入文件

你可以配置你的Logstash管道,以便使用file输出插件将数据直接写入文件。

通过在second-pipeline.conf文件的输出部分添加以下行,配置你的Logstash实例以使用文件输出插件:

file {path => "/path/to/target/file"
}

写入到多个Elasticsearch节点

对多个Elasticsearch节点的写入可以减轻给定Elasticsearch节点的资源需求,并在特定节点不可用时为集群提供多余的入口点。

若要将你的Logstash实例配置为写入多个Elasticsearch节点,请编辑second-pipeline.conf文件的输出部分以供读取:

output {elasticsearch {hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]}
}

使用主机行中的Elasticsearch集群中的三个非主节点的IP地址,当hosts参数列出多个IP地址时,Logstash就会使用地址列表中负载均衡请求。还请注意,Elasticsearch的默认端口是9200,可以在上面的配置中省略。

测试管道

此时,你的second-pipeline.conf文件如下所示:

input {twitter {consumer_key => "enter_your_consumer_key_here"consumer_secret => "enter_your_secret_here"keywords => ["cloud"]oauth_token => "enter_your_access_token_here"oauth_token_secret => "enter_your_access_token_secret_here"}beats {port => "5044"}
}
output {elasticsearch {hosts => ["IP Address 1:port1", "IP Address 2:port2", "IP Address 3"]}file {path => "/path/to/target/file"}
}

Logstash使用的数据来自你配置的Twitter提要,接收来自Filebeat的数据,并将此信息索引到Elasticsearch集群中的三个节点,并将其写入文件。

在数据源机器上,使用以下命令运行Filebeat:

sudo ./filebeat -e -c filebeat.yml -d "publish"

Filebeat将尝试在端口5044上连接,在Logstash使用激活的Beats插件启动之前,该端口上不会有任何回应,因此您你看到的任何关于在该端口上连接失败的消息目前都是正常的。

要验证你的配置,请运行以下命令:

bin/logstash -f second-pipeline.conf --config.test_and_exit

--config.test_and_exit选项解析配置文件并报告任何错误,当配置文件通过配置测试时,使用以下命令启动Logstash:

bin/logstash -f second-pipeline.conf

使用grep工具在目标文件中搜索,以验证信息是否存在:

grep syslog /path/to/target/file

运行一个Elasticsearch查询,在Elasticsearch集群中找到相同的信息:

curl -XGET 'localhost:9200/logstash-$DATE/_search?pretty&q=fields.type:syslog'

使用YYYY.MM.DD格式将$DATE替换为当前日期。

要查看来自Twitter提要的数据,请尝试以下查询:

curl -XGET 'http://localhost:9200/logstash-$DATE/_search?pretty&q=client:iphone'

同样,请记住使用YYYY.MM.DD格式将$DATE替换为当前日期。


下一篇:Logstash是如何工作的

Logstash 6.2 参考指南(开始使用Logstash)相关推荐

  1. metricbeat mysql_Metricbeat 参考指南(目录)

    Metricbeat 参考指南 版本:v6.4 更新日期:2018-10-15 概述 Metricbeat是一个轻量级的托运工,你可以在服务器上安装它,定期从操作系统和服务器上运行的服务收集指标,Me ...

  2. logstash读取Elasticsearch数据保存为json,logstash接收log数据写入kafka生产者

    [提前声明] 文章由作者:张耀峰 结合自己生产中的使用经验整理,最终形成简单易懂的文章 写作不易,转载请注明,谢谢! 代码案例地址: ?https://github.com/Mydreamandrea ...

  3. Java面试参考指南(二)

    2019独角兽企业重金招聘Python工程师标准>>> 访问修饰符 对于基本的OOPS(面向对象)概念,请看Java面试参考指南的第一部分.访问修饰符规定了一个类如何访问另一个类及它 ...

  4. Spring5参考指南:IOC容器

    文章目录 为什么使用Spring5 什么是IOC容器 配置元数据 实例化容器 XML嵌套 groovy bean定义DSL 使用容器 最近在翻译Spring Framework Documentati ...

  5. 史无前例的 HTML5 资源参考指南

    2019独角兽企业重金招聘Python工程师标准>>> 尽管 HTML5 规范在 2014 年之前不会有正式版本,很多设计师已经开始试水高级浏览器已经支持的部分 HTML5 功能.H ...

  6. java 面试指南_Java面试参考指南–第1部分

    java 面试指南 JAVA面向对象的概念 Java in基于面向对象的概念,它允许更高级别的抽象以实际方式解决任何问题. 面向对象的方法将实际对象中的问题解决方案概念化,更易于在整个应用程序中重用. ...

  7. Java面试参考指南–第1部分

    JAVA面向对象的概念 Java基于面向对象的概念,它允许更高级别的抽象以实际方式解决任何问题. 面向对象的方法将实际对象中的问题解决方案概念化,从而更易于在整个应用程序中重用. 例如椅子,风扇,狗, ...

  8. 信息安全技术网络安全等级保护定级指南_行业标准 |报业网络安全等级保护定级参考指南V2.0发布,明确保护对象、定级要求...

    近期,中国新闻技术工作者联合会正式发布<报业网络安全等级保护定级参考指南V2.0>. 该指南由中国新闻技术工作者联合会组织网络安全领域的专家.报业技术专家以及业务专家经过多次调研.学习.探 ...

  9. Python - SIP参考指南 - 介绍

    介绍 本文是SIP4.18的参考指南.SIP是一种Python工具,用于自动生成Python与C.C++库的绑定.SIP最初是在1998年用PyQt开发的,用于Python与Qt GUI toolki ...

最新文章

  1. php upload ctf,强网杯CTF防御赛ez_upload Writeup
  2. bzoj[1835][ZJOI2010]base 基地选址
  3. linux查看网卡速度
  4. Java Enum 使用
  5. python按字节读取文件_Python读取二进制文件
  6. 几个免费高质量图标搜索引擎。
  7. Android 自定义操作成功的loading动画
  8. 数据与运算(以及补码)
  9. html5 尾迹特效,如何设置ECharts线图的特效
  10. 多线程读取视频及深度学习推理
  11. qvodplayer.hta:按个人使用习惯自写易用的整合搜索资源的p2p电影搜索/播放程序(能自动升级)
  12. VS2008编译的程序运行提示“由于应用程序配置不正确,应用程序未能启动”
  13. 被Gartner评为十大安全技术的IAST是什么
  14. beyong经典之作
  15. 一个华为人在华为工作十年的感悟
  16. Hive 性能调优大全
  17. 嘉和美康科创板IPO:阿里健康是股东,副总姬铮并非核心技术人员
  18. WMI与CIM的区别
  19. 最常见加密方式和Python实现
  20. 程序员必须知道的9大数据挖掘工具

热门文章

  1. Java实现迷宫城堡(强连通图的判定)
  2. uml 菱形_UML类图的各符号含义
  3. date 减去固定时长_案例一:shell脚本指定日期减去一天
  4. 2018-狗年年终总结
  5. 《先驱者》服务器修复,Steam一周大事件:先驱者服务器修复,1分钟掉线3次不再可能...
  6. 面向对象程序设计(Java) 课程设计——三少五子棋(Final)
  7. 马云的创业故事及他人生中的摆渡人-第一个双十一(九)
  8. Http的Header里面包含哪些字段,http和https 的区别
  9. 【VUE】VUE 运行项目的时候报错 throw er; // Unhandled 'error' event
  10. Python之虚拟环境venv实战详解