hello,大家好,我是 Jackpop,硕士毕业于哈尔滨工业大学,曾在华为、阿里等大厂工作,如果你对升学、就业、技术提升等有疑惑,不妨交个朋友:

我是Jackpop,我们交个朋友吧!

在本系列文章的第3部分关于实时流处理的文章中,我们学习了如何使用ElasticSearch的批量API以及利用REST API将.json航班数据文件导入ElasticSearch。

在这篇文章中,我们将介绍另一种方式,Logstash。

Logstash介绍

Logstash是一个开源的数据收集引擎,具有实时流水线功能。

它从多个源头接收数据,进行数据处理,然后将转化后的信息发送到stash,即存储。

Logstash允许我们将任何格式的数据导入到任何数据存储中,不仅仅是ElasticSearch。

它可以用来将数据并行导入到其他NoSQL数据库,如MongoDB或Hadoop,甚至导入到AWS。

数据可以存储在文件中,也可以通过流等方式进行传递。

Logstash对数据进行解析、转换和过滤。它还可以从非结构化数据中推导出结构,对个人数据进行匿名处理,可以进行地理位置查询等等。

一个Logstash管道有两个必要的元素,输入和输出,以及一个可选的元素,过滤器。

输入组件从源头消耗数据,过滤组件转换数据,输出组件将数据写入一个或多个目的地。

所以,我们的示例场景的Logstash架构基本如下。

我们从.json文件中读取我们的航班数据,我们对它们进行处理/转换,应用一些过滤器并将它们存储到ElasticSearch中。

Logstash安装

有几种选择来安装Logstash。

一种是访问网站下载你平台的存档,然后解压到一个文件夹。

你也可以使用你的平台的包管理器来安装,比如yum、apt-get或homebrew,或者作为docker镜像来安装。

确保你已经定义了一个环境变量JAVA_HOME,指向JDK 8或11或14的安装(Logstash自带嵌入式AdoptJDK)。

Logstash工作流

一旦你安装了它,让我们通过运行最基本的Logstash工作流来测试你的Logstash安装情况。

bin/logstash -e 'input { stdin { } } output { stdout {} }'

上面的工作流接受来自stdin(即你的键盘)的输入,并将其输出到stdout(即你的屏幕)。

上面的工作流中没有定义任何过滤器。一旦你看到logstash被成功启动的消息,输入一些东西(我输入的是Hello world),按ENTER键,你应该看到产生的消息的结构格式,像下面这样。

[2021-02-11T21:52:57,120][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
Hello world
{"message" => "Hello world","@version" => "1","@timestamp" => 2021-02-11T19:57:46.208Z,"host" => "MacBook-Pro.local"
}

然而,通常Logstash是通过配置文件来工作的,配置文件告诉它该做什么,即在哪里找到它的输入,如何转换它,在哪里存储它。Logstash配置文件的结构基本上包括三个部分:输入、过滤和输出。

你在输入部分指定数据的来源,在输出部分指定目的地。在过滤器部分,你可以使用支持的过滤器插件来操作、测量和创建事件。

配置文件的结构如下面的代码示例所示。

input {...}
filter {...}
output{...}

你需要创建一个配置文件,指定你要使用的组件和每个组件的设置。在config文件夹中已经存在一个配置文件样本,logstash-sample.conf。

其内容如下所示。

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.input {beats {port => 5044}
}output {elasticsearch {hosts => ["http://localhost:9200"]index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"#user => "elastic"#password => "changeme"}
}

这里input部分定义了Logstash应该从哪里获取数据。这里有一个可用的输入插件列表。

我们的输入不是来自Beats组件,而是来自文件系统,所以我们使用文件输入组件。

input {file {start_position => "beginning"path => "/usr/local/Cellar/logstash-full/7.11.0/data/flightdata/test.json"codec => "json"}
}

我们使用start_position参数来告诉插件从头开始读取文件。

需要注意,数据路径必须是绝对的。

我们使用的是json编解码器,除了json,还可以使用纯文本形式。

在下载的数据中,可以找到一个名为test.json的文件。它只由2条航班数据组成的文件。

输出块定义了Logstash应该在哪里存储数据。我们将使用ElasticSearch来存储我们的数据。

我们添加了第二个输出作为我们的控制台,并使用rubydebugger格式化输出,第三个输出作为文件系统,最后两个用于测试我们的输出。 我们将输出存储在output.json中。

output {elasticsearch {hosts => ["http://localhost:9200"]index => "testflight"}file {path => "/usr/local/Cellar/logstash-full/7.11.0/data/output.json"}stdout {codec => rubydebug}
}

此外,还可以定义过滤器来对数据进行转换。

Logstash提供了大量的过滤器,下面介绍一些非常常用的的过滤器:

  • grok:解析任何任意文本并添加结构,它包含120种内置模式
  • mutate:对字段进行一般的转换,例如重命名、删除、替换和修改字段
  • drop:丢弃一个数据
  • clone:复制一个数据,可能增加或删除字段
  • geoip:添加IP地址的地理位置信息
  • split:将多行消息、字符串或数组分割成不同的数据

可以通过执行下方命令查看 Logstash 安装中安装的全部插件列表。

$ bin/logstash-plugin list

你会注意到,有一个JSON过滤器插件。这个插件可以解析.json文件并创建相应的JSON数据结构。

正确地选择和配置过滤器是非常重要的,否则,你最终的输出中没有数据。

所以,在我们的过滤块中,我们启用json插件,并告诉它我们的数据在消息字段中。

filter {json {source => "message"}
}

到此为止,完成的配置文件config/testflight.conf内容如下:

input {file {start_position => "beginning"path => "/usr/local/Cellar/logstash-full/7.11.0/data/flightdata/test.json"codec => "json"}
}filter {json {source => "message"}
}output {
#   elasticsearch {
#   hosts => ["http://localhost:9200/"]
#   index => "testflight"
# }file {path => "/usr/local/Cellar/logstash-full/7.11.0/data/output.json"}stdout {codec => rubydebug}
}

你可以通过如下命令进行一下测试:

bin/logstash -f config/testflight.conf --config.test_and_exit
...
Configuration OK
[2021-02-11T23:15:38,997][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

如果配置文件通过了配置测试,用以下命令启动Logstash。

bin/logstash -f config/testflight.conf --config.reload.automatic
...

–config.reload.automatic配置选项可以实现自动重载配置,这样你就不必每次修改配置文件时都要停止并重新启动Logstash。

如果一切顺利,你应该会看到如下的输出结果。

{"CMsgs" => 1,"@version" => "1","PosTime" => 1467378028852,"Rcvr" => 1,"EngMount" => 0,"Tisb" => false,"Mil" => false,"Trt" => 2,"Icao" => "A0835D","Long" => -82.925616,"InHg" => 29.9409447,"VsiT" => 1,"ResetTrail" => true,"CallSus" => false,"@timestamp" => 2021-02-14T18:32:16.337Z,"host" => "MacBook-Pro.local","OpIcao" => "RPA","Man" => "Embraer","GAlt" => 2421,"TT" => "a","Bad" => false,"HasSig" => true,"TSecs" => 1,"Vsi" => 2176,"EngType" => 3,"Reg" => "N132HQ","Alt" => 2400,"Species" => 1,"FlightsCount" => 0,"WTC" => 2,"Cos" => [[0] 39.984322,[1] -82.925616,[2] 1467378028852.0,[3] nil],"message" => "{"Id":10519389,"Rcvr":1,"HasSig":true,"Sig":0,"Icao":"A0835D","Bad":false,"Reg":"N132HQ","FSeen":"\/Date(1467378028852)\/","TSecs":1,"CMsgs":1,"Alt":2400,"GAlt":2421,"InHg":29.9409447,"AltT":0,"Lat":39.984322,"Long":-82.925616,"PosTime":1467378028852,"Mlat":true,"Tisb":false,"Spd":135.8,"Trak":223.2,"TrkH":false,"Type":"E170","Mdl":"2008 EMBRAER-EMPRESA BRASILEIRA DE ERJ 170-200 LR","Man":"Embraer","CNum":"17000216","Op":"REPUBLIC AIRLINE INC     - INDIANAPOLIS, IN","OpIcao":"RPA","Sqk":"","Vsi":2176,"VsiT":1,"WTC":2,"Species":1,"Engines":"2","EngType":3,"EngMount":0,"Mil":false,"Cou":"United States","HasPic":false,"Interested":false,"FlightsCount":0,"Gnd":false,"SpdTyp":0,"CallSus":false,"ResetTrail":true,"TT":"a","Trt":2,"Year":"2008","Cos":[39.984322,-82.925616,1467378028852.0,null]}","Lat" => 39.984322,"TrkH" => false,"Op" => "REPUBLIC AIRLINE INC     - INDIANAPOLIS, IN","Engines" => "2","Sqk" => "","Id" => 10519389,"Gnd" => false,"CNum" => "17000216","path" => "/usr/local/Cellar/logstash-full/7.11.0/data/flightdata/test.json","Cou" => "United States","HasPic" => false,"FSeen" => "/Date(1467378028852)/","Interested" => false,"Mdl" => "2008 EMBRAER-EMPRESA BRASILEIRA DE ERJ 170-200 LR","Spd" => 135.8,"Sig" => 0,"Trak" => 223.2,"Year" => "2008","SpdTyp" => 0,"AltT" => 0,"Type" => "E170","Mlat" => true
}

数据转换

首先,让我们从输出中删除path, @version, @timestamp, host和message,这些都是logstash添加的。

filter {json {source => "message"}mutate {remove_field => ["path", "@version", "@timestamp", "host", "message"]}
}

mutate过滤器组件可以删除不需要的字段。

重新运行:

bin/logstash -f config/flightdata-logstash.conf –-config.test_and_exit
bin/logstash -f config/flightdata-logstash.conf --config.reload.automatic

接下来,我们将_id设置为Id。

output {elasticsearch {hosts => ["http://localhost:9200"]index => "testflight"document_id => "%{Id}"}

我们在输出组件中通过设置document_id来实现。

然而,如果你重新运行logstash,你会发现Id字段仍然存在。

有一个窍门,在过滤插件中把它改名为[@metadata][Id],然后在输出中使用,@metadata字段被自动删除。

filter {json {source => "message"}mutate {remove_field => ["path", "@version", "@timestamp", "host", "message"]rename => { "[Id]" => "[@metadata][Id]" }}
}output {elasticsearch {hosts => ["http://localhost:9200"]index => "flight-logstash"document_id => "%{[@metadata][Id]}"}
...

现在让我们尝试解析日期。如果你还记得,这是我们在上一篇文章中没有做的事情,我们需要将日期转换为更适合人们熟悉的格式。

例如:

"FSeen" => "/Date(1467378028852)/"

需要将时间1467378028852转化成容易阅读的格式,并且去掉前后多余的字符串,通过gsub组件可以实现这项功能:

gsub => [# get rid of /Date("FSeen", "/Date(", "",# get rid of )/"FSeen", ")/", ""]

这里通过gsub去掉了数据中/Date()\等多余部分,输出结果为:

"FSeen" : "1467378028852"

然后把时间戳转换成熟悉的格式:

date {timezone => "UTC"match => ["FSeen", "UNIX_MS"]target => "FSeen"
}

UNIX_MS是UNIX时间戳,单位是毫秒。我们匹配字段FSeen并将结果存储在同一字段中,输出结果为:

"FSeen" : "2016-07-01T13:00:28.852Z",

上述转换的完整代码如下:

mutate {gsub => [# get rid of /Date("FSeen", "/Date(", "",# get rid of )/"FSeen", ")/", ""]
}
date {timezone => "UTC"match => ["FSeen", "UNIX_MS"]target => "FSeen"
}

在这部分中,我们学习了如何使用Logstash将.json航班数据批量文件导入到ElasticSearch中。Logstash是一个非常方便的方式,它有很多过滤器,支持很多数据类型,你只需要学习如何编写一个配置文件就可以了!

Logstash是否适合实时数据处理?

答案是:要看情况

Logstash主要是为批处理数据而设计的,比如日志数据,也许不适合处理来自传感器的实时航班数据。

不过,你可以参考一些参考资料,这些资料描述了如何创建可以扩展的Logstash部署,并使用Redis作为Logstash代理和Logstash中央服务器之间的中介,以便处理许多事件并实时处理它们。

ElasticSearch从入门到精通:Logstash妙用相关推荐

  1. ElasticSearch第一讲:ElasticSearch从入门到精通

    ElasticSearch第一讲:ElasticSearch从入门到精通 业内目前来说事实上的一个标准,就是分布式搜索引擎一般大家都用elasticsearch.本文是ElasticSearch第一讲 ...

  2. ElasticSearch从入门到精通--第七话(自动补全、拼音分词器、自定义分词、数据同步方案)

    ElasticSearch从入门到精通–第七话(自动补全.拼音分词器.自定义分词.数据同步方案) 使用拼音分词 可以引入elasticsearch的拼音分词插件,地址:https://github.c ...

  3. Elasticsearch从入门到精通 理论 集群 优化 框架集成

    Elasticsearch 入门 Elasticsearch 安装 下载软件 Elasticsearch 的官方地址:https://www.elastic.co/cn/ Elasticsearch ...

  4. Elasticsearch从入门到精通

    1.Elasticsearch简介 Elasticsearch是一个基于Apache Lucene(TM)的开源搜索引擎,无论在开源还是专有领域,Lucene可以被认为是迄今为止最先进.性能最好的.功 ...

  5. ElasticSearch从入门到精通,史上最全(持续更新,未完待续,每天一点点)

    目录 1.ElasticSearch的简介 2.用数据库实现搜素的功能 3.ES的核心概念 3.1 NRT(Near Realtime)近实时 3.2 cluster集群,ES是一个分布式的系统 3. ...

  6. kibana从入门到精通-Kibana安装

    作者其他ELK快速入门系列文章 Elasticsearch从入门到精通 logstash快速入门实战指南 简介 Kibana 是一款开源的数据分析和可视化平台,它是 Elastic Stack 成员之 ...

  7. java框架 零基础从入门到精通的学习路线 附开源项目面经等(超全)

    目录 前言 1. 学习路线 2. 学习方法 前言 这篇文章主要总结我之前所学过的框架以及学习路线 从实打实的零基础到框架再到项目 之后实习工作 也在这篇博客中记录我的学习笔记 以及在笔记中遇到的配置安 ...

  8. redis 中一个字段 修改map_Redis bitmap 位图 从入门到精通 基础 实战 妙用

    1.bitmap介绍 位图不是真正的数据类型,它是定义在字符串类型中,一个字符串类型的值最多能存储512M字节的内容 位上限:2^(9(512)+10(1024)+10(1024)+3(8b=1B)) ...

  9. Elasticsearch入门到精通教程 - 学习资料综合

    背景 因经常被问到Elasticsearch相关一些资料教程,这里特例整理一份以前自己的学习资料,希望对你有用. 信息资料 1. 基本API用法教程 Elasticsearch JAVA API教程G ...

  10. Elasticsearch7从入门到精通(简介、部署、原理、开发、ELK)

    Elasticsearch7从入门到精通(简介.部署.原理.开发.ELK) 第1章.Elasticsearch简介 1-1.Elasticsearch介绍 Elasticsearch官方网站:http ...

最新文章

  1. Unity5.6+ 导出Android 应用程序apk的环境配置及导出过程
  2. 前沿|8种面部表情实时追踪,你的喜怒哀乐全被AI看穿了
  3. ASP.Net Core WebApi几种版本控制对比
  4. python的16.1节课后练习16-2比较希特卡和死亡谷的气温的问题,最后图像为什么出现乱码?
  5. 使用Tomcat配置域名
  6. 百万年薪的腾讯员工买得起深圳房子吗?
  7. 分析Vector、ArrayList、Hashtable、HashMap数据结分享一下
  8. 负载均衡原理与实践详解 第五篇 负载均衡时数据包流程详解
  9. httprequest存储的是字符内容 而文本内容是以字节形式上传的;所以普通的取值方式无法从httprequest取到值...
  10. 初学者python编辑器用geany可以吗_面向初学者的Python编辑器Mu
  11. JVM内存管理------GC算法精解
  12. 山东大学软件学院计算机组成原理课程设计整机实验(1)
  13. 基于python技术的超市仓库管理系统
  14. C语言指针学习(超详细)
  15. xdb 服务_localhost 8080 XDB服务器需要用户名和密码的问题
  16. 汇编:根据段大小计算偏移地址
  17. 51单片机 独立按键k1控制数码管移位 k2控制数值加
  18. 神经网络学说的主要观点,神经网络研究属于下列
  19. CVPR 2021 华南理工和微信的Transformer:UP-DETR无监督预训练检测器
  20. ffmpeg中如何设置不含SEI,如何自定义SEI

热门文章

  1. Mac M1芯片安装打开Axure9
  2. Matlab数据库工具箱的简单使用
  3. 终极算法 机器学习和人工智能如何重塑世界
  4. 微信浏览器ISO系统底部导航栏
  5. 项目版本号的命名规范
  6. 100行JS代码实现❤坦克大战js小游戏源码 HTML5坦克大战游戏代码(HTML+CSS+JavaScript )...
  7. Python做一个简单的在线编辑器
  8. 天地图 + geojson 绘制中国行政区划
  9. matlab cramer法则,玩转线性代数(8)第一章第七节_克拉姆法则与秘密武器
  10. C#使用Aforge对uvc协议摄像头亮度属性的更改