Elasticsearch:Ingest Pipeline 实践
相比较 Logstash 而言,由于其丰富的 processors 而受到越来越多人的喜欢。最重要的一个优点就是它基于 Elasticsearch 极具可拓展性和维护性而受到开发者的喜欢。我在之前创建了很多关于 Ingest Pipeline 的文章。你可以参阅文章 “Elastic:开发者上手指南” 中的 Ingest pipeline 章节。
Ingest pipeline 可让你在索引之前对数据执行常见转换。 例如,你可以使用 pipeline 删除字段、从文本中提取值并丰富你的数据。
Pipeline 由一系列称为处理器(processors)的可配置任务组成。 每个处理器按顺序运行,对传入文档进行特定更改。 处理器运行后,Elasticsearch 会将转换后的文档添加到您的数据流或索引中。
你可以使用 Kibana 的 Ingest Pipelines 功能或 ingest APIs 创建和管理摄取管道。 Elasticsearch 以集群状态存储管道。
前提条件:
- 具有 ingest 角色的节点处理管道处理。 要使用 pipeline,你的集群必须至少有一个具有 ingest 角色的节点。 对于大量摄取负载,我们建议创建专用摄取节点。有关节点角色的描述,你可以阅读我之前的文章 “Elasticsearch:Node roles 介绍 - 7.9 之后版本”。
- 如果启用了 Elasticsearch 安全功能,你必须具有 manage_pipeline 集群权限才能管理摄取管道。 要使用 Kibana 的 Ingest Pipelines 功能,你还需要 cluster:monitor/nodes/info 集群权限。
- 包括 enrich 处理器的管道需要额外的设置。 请参阅丰富你的数据。你也可以阅读文章 “Elasticsearch:enrich processor (7.5发行版新功能)”。
在今天的文章中,我想同一个一个例子来展示两种创建 Ingest Pipeline 的方法尽管在我之前的文章中都有介绍:
- 通过 API 的方法来创建
- 通过 Kibana 的界面来进行创建
通过 API 的方法来创建
通过 API 的方法来创建其实也是非常容易和直接的。在今天的练习中,我将使用一个简单的文档来说明:
PUT demo/_doc/1
{"@timestamp": "2021-06-18T18:50:53","message": "950020004L-1,1,Rue du Clos Saint-Martin,95450,Ableiges,OSM,49.06789,1.969323"
}
在上面的文档中,我们可以看到 message 字段是一个非结构化的数据。这个数据用结构化的语句可以表述为:
_id
address.number
address.street_name
address.zipcode
address.city
source
location.lat
location.lon
为了能够使得数据在 Kibana 中得到更好的搜索及展示,我们需要把上面的数据进行结构化。我们使用 API 的方法有几种。我们首先能够想到的就是这个数据的每一项是由一个逗号分开,所以我们想到使用 split 处理器。我们可以通过如下的方式来测试我们的 Pipeline:
POST _ingest/pipeline/_simulate
{"description": "interpret a message","pipeline": {"processors": [{"split": {"field": "message","separator": ","}},{"set": {"field": "_id","value": "{{message.0}}"}},{"set": {"field": "address.number","value": "{{message.1}}"}},{"set": {"field": "address.street_name","value": "{{message.2}}"}},{"set": {"field": "address.zipcode","value": "{{message.3}}"}},{"set": {"field": "address.city","value": "{{message.4}}"}},{"set": {"field": "source","value": "{{message.5}}"}},{"set": {"field": "location.lat","value": "{{message.6}}"}},{"set": {"field": "location.lon","value": "{{message.7}}"}},{"remove": {"field": "message"}}]},"docs": [{"_source": {"message": "950020004L-1,1,Rue du Clos Saint-Martin,95450,Ableiges,OSM,49.06789,1.969323"}}]
}
我们在 Kibana 的 console 中运行,我们可以看到如下的输出:
{"docs" : [{"doc" : {"_index" : "_index","_type" : "_doc","_id" : "950020004L-1","_source" : {"location" : {"lon" : "1.969323","lat" : "49.06789"},"address" : {"zipcode" : "95450","number" : "1","city" : "Ableiges","street_name" : "Rue du Clos Saint-Martin"},"source" : "OSM"},"_ingest" : {"timestamp" : "2021-06-22T07:21:30.694064Z"}}}]
}
从输出中,我们可以看到之前的 message 字段完全变成了结构化的字段。基于上面的测试,我们可以定义如下的 Pipeline:
PUT _ingest/pipeline/decode
{"processors": [{"split": {"field": "message","separator": ","}},{"set": {"field": "_id","value": "{{message.0}}"}},{"set": {"field": "address.number","value": "{{message.1}}"}},{"set": {"field": "address.street_name","value": "{{message.2}}"}},{"set": {"field": "address.zipcode","value": "{{message.3}}"}},{"set": {"field": "address.city","value": "{{message.4}}"}},{"set": {"field": "source","value": "{{message.5}}"}},{"set": {"field": "location.lat","value": "{{message.6}}"}},{"set": {"field": "location.lon","value": "{{message.7}}"}},{"remove": {"field": "message"}}]
}
当我们导入数据时,我们可以使用如下的方式来进行调用:
PUT demo/_doc/1?pipeline=decode
{"@timestamp": "2021-06-18T18:50:53","message": "950020004L-1,1,Rue du Clos Saint-Martin,95450,Ableiges,OSM,49.06789,1.969323"
}
这样我们的文档就变成了:
GET demo/_doc/950020004L-1
请注意在之前的 processor 中,我们重新设置了_id。它被设置为 message 的第一个字段值。
{"_index" : "demo","_type" : "_doc","_id" : "950020004L-1","_version" : 2,"_seq_no" : 1,"_primary_term" : 1,"found" : true,"_source" : {"@timestamp" : "2021-06-18T18:50:53","address" : {"zipcode" : "95450","number" : "1","city" : "Ableiges","street_name" : "Rue du Clos Saint-Martin"},"location" : {"lon" : "1.969323","lat" : "49.06789"},"source" : "OSM"}
}
上面显示它正是我们需要的结果。
在上面,可能有的人觉得里面用的 processor 确实太大,有没有更好的办法呢?我们可以参考我之前的文章 “Elastic可观测性 - 运用 pipeline 使数据结构化”。我们可以使用 dissect 处理器来完成。
POST _ingest/pipeline/_simulate
{"pipeline": {"processors": [{"dissect": {"field": "message","pattern": "%{_id},%{address.number},%{address.street_name},%{address.zipcode},%{address.city},%{source},%{location.lat},%{location.lon}"}},{"remove": {"field": "message"}}]},"docs": [{"_source": {"message": "950020004L-1,1,Rue du Clos Saint-Martin,95450,Ableiges,OSM,49.06789,1.969323"}}]
}
在上面,我们只使用了两个 processor,但是它同样达到了解析 message 字段的目的。上面的命令运行的结果是:
{"docs" : [{"doc" : {"_index" : "_index","_type" : "_doc","_id" : "950020004L-1","_source" : {"location" : {"lon" : "1.969323","lat" : "49.06789"},"address" : {"zipcode" : "95450","number" : "1","city" : "Ableiges","street_name" : "Rue du Clos Saint-Martin"},"source" : "OSM"},"_ingest" : {"timestamp" : "2021-06-22T07:36:19.674056Z"}}}]
}
通过 Kibana 界面来完成 Pipeline
在 Kibana 中,它有一个设计非常好的界面让我们快速地设计并测试 Ingest Pipeline:
我们取名 Ingest Pipeline 为 interpret。点击上面的 Add a processor。在这次的练习中,我将使用 CSV 处理器:
我们按照上面的顺序填入 Target fields。点击 Add 按钮:
我们点击 Add documents:
在上面,我们可以看到需要测试的文档的格式。我们把如下的文件拷贝进去:
{"_index" : "demo_csv","_id" : "Vb4IIHoBgxQVs4WbvcxH","_source" : {"@timestamp" : "2021-06-18T18:50:58","message" : "950140640A-6,6,Rue des Prés,95580,Andilly,C+O,48.999518,2.299499"}}
这样就变成了:
从上面的结果中,我们可以看出来是我们想要的结果。仔细查看,我们发现 location.lon 及 location.lat 是字符串。我们需要把它们进行转为为浮点数,这样才能表示经纬度。我们接着添加 convert 处理器:
同样地,我们针对 location.lat 也做一个转换。当我添加完毕后,我们直接点击 View output:
这次,我们看到 location.lon 及 location.lat 都变成了 float 类型的数据了。在上面,我们可以看到 message 字段还在,我们希望删除这个字段。我们接着添加一个 remove 处理器:
我们再次点击 View output:
这次,我们看到没有 message 字段在输出中了。
在上面,我们看到所有的处理器都是按照我们规定的动作在处理。在实际的使用中,可能有时我们的数据并不是那么完美,这其中包括格式,或者数据类型。那么这些处理器可能会发生这样或者那样的错误。那么,我们该如何处理这些错误呢?
比如,当我们的测试文档是这样的一个格式:
{"_index" : "demo_csv","_id" : "Vb4IIHoBgxQVs4WbvcxH","_source" : {"@timestamp" : "2021-06-18T18:50:58","message" : "950140640A-6,6,Rue des Prés,95580,Andilly,C+O,NOT_NUM,NOT_NUM"}}
在上面,我们的经纬度不再是数值,而是一个不可以转换为数值的文字,那么我们该如何处理这个错误呢?我们重新运行 Pipeline:
处理这种错误,可以在两个级别来进行处理。一种是在 processor 的基本,另外是在整个 pipeline 的级别来处理。比如在上面的 convert 中,我们可以针对这个 processor 来进行异常处理。我们可以选择 ignore_failure:
在上面,我们可以看到错误信息被忽视,但是在最终的结果中,它并不是我们想要的。我们还是取消之前的启动 ignore_failure。我们有两种途径处理这种错误。一种方法是针对这个 processor 完成一个 failure handler 来处理这种错误:
另外一种途径是针对整个 pipeline 来做错误处理:
在上面,我们添加了一个 Set 处理器。它设置了一个 error 字段,并把它的内容设置为 "document is not correctly parsed"。点击上面的 Add 按钮:
重新点击 View output:
这一次,我们没有看到像上次的那种错误了,但是 location 里的内容还是不正确。一种解决办法就是删除这个 location 字段。在上面的 Failure processors 中,我们再添加一个 remove 处理器:
我们再次点击 View output:
从上面的输出中我们可以看到 location 字段被删除了,同时我们还可以看到 message 字段还在,它表明在遇到错误时,位于 Convert 后面的 remove 处理器没有被运行,所以 message 字段没有被删除。
我们可以点击 Show request 来查看如何使用 API 来生成这个 Ingest Pipeline:
你可以甚至拷贝到 Dev Tools 中去运行。我们接下来需要点击 Save Pipeline 来保存这个 Ingest Pipeline。
到目前为止,我们已经通过 Kibana 的界面生成了我们想要的 Ingest Pipeline。我们可以在 Dev Tools 通过如下的方法来使用:
PUT demo/_doc/1?pipeline=interpret
{"@timestamp": "2021-06-18T18:50:53","message": "950020004L-1,1,Rue du Clos Saint-Martin,95450,Ableiges,OSM,49.06789,1.969323"
}
我们通过如下的方法来查询已经导入的文档:
GET demo/_doc/950020004L-1
上面的命令显示的结果为:
{"_index" : "demo","_type" : "_doc","_id" : "950020004L-1","_version" : 3,"_seq_no" : 2,"_primary_term" : 1,"found" : true,"_source" : {"@timestamp" : "2021-06-18T18:50:53","address" : {"zipcode" : "95450","number" : "1","city" : "Ableiges","street_name" : "Rue du Clos Saint-Martin"},"location" : {"lon" : 1.969323,"lat" : 49.06789},"source" : "OSM"}
}
从上面,我们可以看出来 location.lon 及 location.lat 都已经是浮点数了。我们的 Ingest Pipeline 确实是在工作了。
为时序数据添加 timestamp
对于时序数据来说,timestamp 是非常重要的。假如有一种情况,我们的数据里缺少这个 timestamp,那么我改怎么办呢?一种简单的办法就是添加 ingest pipeline 运行的时间为事件的时间。我们可以采用如下的 ingest pipeline:
PUT _ingest/pipeline/test
{"processors": [{"set": {"field": "hello","value": "world"}},{"set": {"if": "!ctx.containsKey('@timestamp')","field": "@timestamp","value": "{{_ingest.timestamp}}"}}]
}
在上面,我们检查 @timestamp 字段是否存在。如果不存在的话,我们使用 _ingest.timestamp 作为事件的时间。
Elasticsearch:Ingest Pipeline 实践相关推荐
- Elasticsearch Ingest Pipeline
文章目录 1. 需求:修复与增强写入的数据 2. Ingest Node 3. Pipeline & Processor 4. 使用 Pipeline 切分字符串 5. 为 ES添加一个 Pi ...
- Elasticsearch:如何处理 ingest pipeline 中的异常
在我之前的文章 "如何在 Elasticsearch 中使用 pipeline API 来对事件进行处理" 中,我详细地介绍了如何创建并使用一个 ingest pipeline.简 ...
- Elasticsearch:Ingest pipeline 介绍
Ingest pipeline 可让你在索引之前对数据执行常见转换. 例如,你可以使用 pipeline 删除字段.从文本中提取值并丰富你的数据. Pipeline 由一系列称为处理器(process ...
- Elasticsearch:从零开始创建一个 ingest pipeline 处理器
实际上在我之前的文章: Elasticsearch:创建属于自己的 Ingest processor Elasticsearch:创建一个 Elasticsearch Ingest 插件 我已经详述了 ...
- Elasticsearch:ingest pipeline 使用示例 - 解析常用日志格式
在本示例教程中,你将在索引之前使用 ingest pipeline 以通用日志格式解析服务器日志. 在开始之前,请检查摄取管道的先决条件. 你要解析的日志类似于以下内容: 127.0.0.1 user ...
- Elasticsearch:创建 Ingest pipeline
在 Elasticsearch 针对数据进行分析之前,我们必须针对数据进行摄入.在摄入的过程中,我们需要对数据进行加工,这其中包括非结构化数据转换为结构化数据,数据的转换,丰富,删除,添加新的字段等等 ...
- Elasticsearch使用Ingest Pipeline进行数据预处理
本文基于Elasticsearch7.x Elasticsearch可以使用自身的Ingest Pipeline功能进行数据预处理, 无须借助Logstash. Ingest Pipeline介绍 I ...
- ElasticSearch实战(三十六)-Ingest Pipeline 多管道处理器
在前文我们已经讲了 Ingest Pipeline 使用方法,除了单管道处理方式以外,它还支持多管道组合并行处理的方式对数据进行清洗,同时支持管道动态扩展和灵活组合,让数据清洗更加强大和实用. ...
- Elasticsearch:创建一个 Elasticsearch Ingest 插件
在前面的一篇文章 "Elasticsearch:创建属于自己的 Ingest processor" 中,我相信地介绍了如何使用一个模板来创建 Ingest 插件.在今天的文章中,我 ...
最新文章
- 2021年大数据常用语言Scala(三十七):scala高级用法 高阶函数用法
- js observer 添加_简单了解4种JS设计模式
- Page_Load的问题
- 安卓USB开发教程 一 USB Host 与 Accessory
- elementUI Drawer 抽屉 Dialog 对话框 等弹出组件的遮罩层以及多层遮罩解决办法
- 《MapReduce 2.0源码分析与编程实战》一第2章 入门
- ROS笔记(37) 抓取和放置
- Extjs创建多个application实现多模块MVC动态加载。。
- Dubbo学习总结(6)——Dubbo开源现状与未来规划
- Java 多线程的创建
- 注入攻击-SQL注入和代码注入
- 机器学习问题的十个实例
- 小夜曲dsd使用foobar2000测试dsd输出
- java jpanel 数据刷新6_刷新swing
- 北邮数电期末复习——第三章
- 组合数学——特征方程与线性递推方程
- 先进半导体材料与器件Chapter4
- java-php-python-ssm漠河旅游官网计算机毕业设计
- 使用两次Hash的Hash表——Twice_Hash_Map
- 人生的意义是什么,活着的意义是什么?