在本文中,我想快速了解 Elasticsearch 提供的一个(众多)有趣的功能,我也倾向于在生产设置中使用它,即 Ingest Pipelines。 此功能允许在实际文档索引发生之前预处理文档。 听起来很有趣,为什么……

这在很多方面都很有用,但我可以看到两个主要原因。 首先,当你处理(大)数据分析/处理方面时,你必须在原始数据到来时对其进行处理,并对它们进行预处理以满足你的数据需求。 其次,即使你打算更换生产商,你 A) 可能无法直接控制所有 producers(例如,由于某些组织方面,它们的遗留性质,例如)和/或 B) 仍然需要做好准备,以应对可能发生的变化相当长的一段时间(例如,考虑要在共享日志库的新版本上升级数十/数百个微服务等)。

对于这些情况,你可以考虑使用 Elastic 所提供的 ingest pipeline 来帮助处理数据。

Ingest pipeline 是如何工作的?

Ingest pipeline,也即摄取管道, 是 Logstash 长期占据的数据解析和转换领域的新成员。 Ingest 管道实际上并不能完全替代 Logstash。 但是 Logstash 的考虑是它不可避免地会为你的体系结构带来另一个软件组件,从而使其操作起来更加复杂。 摄取管道不是这种情况,因为它们由集群中的任何(或所有)常规 Elasticsearch 节点直接执行。 该节点只需要是 Ingest 节点的类型(默认情况下),因此你甚至不需要在开始使用它们时弄乱配置。

在索引操作期间,协调器节点收到请求后,就会立即在摄取节点上执行管道。 图片来源: elastic.co

有关数数据事如何索引到 Elasticsearch 中去的,请详细阅读如下的文章:

  • Elasticsearch:彻底理解 Elasticsearch 数据操作

  • Elasticsearch:索引数据是如何完成的

Pipeline 定义

通常,摄取管道是通过一个简单的 JSON 文档定义的,该文档包含一组处理器,这些处理器代表一组有序的步骤,这些步骤应用于所有传入文档并执行。 实际的处理器有多种类型,因此我强烈建议你浏览文档中的列表以熟悉它们。

处理器有一些共同点。 这些是:

  • 使用大括号模板 {{service-name}} 访问/引用定义中已处理文档的数据的能力
  • 能够使用 if 子句定义处理器的条件执行,允许仅在检查条件后执行步骤
  • 使用 on_failure 子句处理处理器故障的能力
  • 可以通过 tag 子句进行标记,这很有用,例如。 用于错误跟踪

测试环境启动

我们有了足够的理论,让我们在本地机器上的 Docker 中启动一个简单的(单节点)集群,并尝试第一个管道定义。 注意:如果你需要有关 docker 部分的更多信息,请在我之前的文章 “Elasticsearch:如何在 Docker 上运行 Elasticsearch 8.x 进行本地开发” 中找到它。 你也可以选择其他的方式来部署 Elasticsearch 及 Kibana。请参阅如下的文章:

  • 如何在 Linux,MacOS 及 Windows 上进行安装 Elasticsearch

  • Kibana:如何在 Linux,MacOS 及 Windows 上安装 Elastic 栈中的 Kibana

使用摄取管道来处理文档

我们首先想创建一个如下功能的 pipeline:

  • 使用 split processor 通过特定分隔符拆分一个字段的值
  • 下一步使用 foreach processor 遍历新数组的所有值并使用 uppercase processor 转换为大写字符

在 Kibana 中,我们使用如下的命令来进行模拟:

POST _ingest/pipeline/_simulate
{"pipeline": {"description": "User split, foreach and upper processors","processors": [{"split": {"field": "meta.tags","separator": " ","target_field": "meta.tags_parsed","ignore_missing": true}},{"foreach": {"field": "meta.tags_parsed","processor": {"uppercase": {"field": "_ingest._value"}}}}]},"docs": [{"_source": {"meta": {"tags": "good enjoyed recommended"}}}]
}

上面的命令的响应为:

{"docs": [{"doc": {"_index": "_index","_id": "_id","_version": "-3","_source": {"meta": {"tags_parsed": ["GOOD","ENJOYED","RECOMMENDED"],"tags": "good enjoyed recommended"}},"_ingest": {"_value": null,"timestamp": "2023-03-07T03:29:46.844735427Z"}}}]
}

从输出中,我们可以看到,meta.tags 以空格为分隔符进行拆分,之后再经过 foreach 处理器,分别对每个拆分后的词进行大写。一旦我们使用上面的 simulate 端点测试好以后,我们就可以使用如下的命令来进行定义:

PUT _ingest/pipeline/my_simple_pipeline
{"processors": [{"split": {"field": "meta.tags","separator": " ","target_field": "meta.tags_parsed","ignore_missing": true}},{"foreach": {"field": "meta.tags_parsed","processor": {"uppercase": {"field": "_ingest._value"}}}}]
}

在上面,我们定义了一个叫做 my_simple_pipeline 的摄取管道。现在让我们创建一个索引,该索引将使用此管道作为其默认入口管道。

PUT my_index
{"settings": {"index.default_pipeline": "my_simple_pipeline"}
}

最后让我们索引一个将由该管道处理的虚拟文档:

POST my_index/_doc
{"meta": {"tags": "good enjoyed recommended"}
}

我们可以对 my_index 进行检索:

GET my_index/_search

我们可以看到如下的内容:

{"took": 222,"timed_out": false,"_shards": {"total": 1,"successful": 1,"skipped": 0,"failed": 0},"hits": {"total": {"value": 1,"relation": "eq"},"max_score": 1,"hits": [{"_index": "my_index","_id": "vc4muoYB6XeJoCxQWgC_","_score": 1,"_source": {"meta": {"tags_parsed": ["GOOD","ENJOYED","RECOMMENDED"],"tags": "good enjoyed recommended"}}}]}
}

基于值的脚本和条件执行

现在让我们尝试两件事,为了有效,让我们在一个处理器中同时使用它们 :)

  • 使用 if 子句的条件执行(这对所有处理器都是可能的)
  • 以及使用 script processor 进行更复杂的处理(当预制处理器不够用时)

两者都使用 Painless 脚本语言(基于 Groovy)。 阅读 Painless 指南以获取更多信息。 然而重要的是:

  • 你可以通过 ctx 变量在脚本中访问你处理过的文档(所谓的摄取处理器上下文)
  • ctx 包含提取的 JSON 的 Map 结构(通过方括号 ctx['my_field'] 引用各个字段)
  • 可以通过修改 ctx 变量的值来增加、修改或删除文档的字段

简单示例:如果其余部分对我们的目的没有用,我们可以使用它来仅索引原始文本的一部分(子字符串)。 所以只需更换管道......

POST _ingest/pipeline/_simulate
{"pipeline": {"description": "extract only part of the comments","processors": [{"script": {"source": """ctx.comment = ctx.comment.substring(0,20) + " ..."""","if": "ctx.containsKey('comment') && ctx['comment'].length() > 20"}}]},"docs": [{"_source": {"comment": "Hello, this is a message which deserves a hair cut."}},{"_source": {"comment": "This is a short one"}}]
}

上面命令生成的结果为:

{"docs": [{"doc": {"_index": "_index","_id": "_id","_version": "-3","_source": {"comment": "Hello, this is a mes ..."},"_ingest": {"timestamp": "2023-03-07T03:54:32.402316754Z"}}},{"doc": {"_index": "_index","_id": "_id","_version": "-3","_source": {"comment": "This is a short one"},"_ingest": {"timestamp": "2023-03-07T03:54:32.402361462Z"}}}]
}

提取结构化字段(解析默认的 NGINX 日志行格式)

如果你正在处理以某种明确定义的格式构建的数据(但未在单个字段中提取),你可以试用 dissect Processor。 只需用带百分号 %{my_field} 的大括号括起要提取的各个字段来描述模式。

我们可以使用这个处理器从默认的 NGINX 日志行格式(组合/主)中解析结构化字段,它具有以下结构。

log_format main '$remote_addr - $remote_user [$time_local] '
'"$request" $status $body_bytes_sent '
'"$http_referer" "$http_user_agent"';

有关 NGINX 日志记录的更多信息,请参阅日志记录模块 ngx_http_log_module 的文档。 我们还可以使用 date processor 提取 @timestamp,因为原始值默认为非标准格式。

让我们把它放在一个新的管道中:

POST _ingest/pipeline/_simulate
{"pipeline": {"description": "structure NGINX logs","processors": [{"dissect": {"field": "message","pattern": "%{remote_addr} - %{remote_user} [%{time_local}] \"%{request}\" %{status} %{body_bytes_sent} \"%{http_referer}\" \"%{http_user_agent}\" \"%{http_x_forwarded_for}\""}},{"date": {"field": "time_local","formats": ["dd/MMM/yyyy:HH:mm:ss Z"],"timezone": "Europe/Prague"}}]},"docs": [{"_source": {"message": "172.17.0.1 - - [24/Dec/2019:10:09:42 +0000] \"GET / HTTP/1.1\" 200 95 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36\" \"-\""}}]
}

上面命令运行的结果为:

{"docs": [{"doc": {"_index": "_index","_id": "_id","_version": "-3","_source": {"remote_user": "-","remote_addr": "172.17.0.1","request": "GET / HTTP/1.1","@timestamp": "2019-12-24T11:09:42.000+01:00","http_referer": "-","body_bytes_sent": "95","http_x_forwarded_for": "-","time_local": "24/Dec/2019:10:09:42 +0000","message": "172.17.0.1 - - [24/Dec/2019:10:09:42 +0000] \"GET / HTTP/1.1\" 200 95 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36\" \"-\"","status": "200","http_user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36"},"_ingest": {"timestamp": "2023-03-07T04:07:13.2829158Z"}}}]
}

你可以对不同格式的内容使用类似的处理器,例如 CSV processor(从 csv 中提取字段)、KV processor(解析键=值对)或基于正则表达式的 Grok processor。

我们可以创建 nginx_pipeline 摄取管道:

PUT _ingest/pipeline/nginx_pipeline
{"processors": [{"dissect": {"field": "message","pattern": "%{remote_addr} - %{remote_user} [%{time_local}] \"%{request}\" %{status} %{body_bytes_sent} \"%{http_referer}\" \"%{http_user_agent}\" \"%{http_x_forwarded_for}\""}},{"date": {"field": "time_local","formats": ["dd/MMM/yyyy:HH:mm:ss Z"],"timezone": "Europe/Prague"}}]
}

我们可以通过如下的方式来摄取一个文档:

PUT nginx_index/_doc/1?pipeline=nginx_pipeline
{"message": "172.17.0.1 - - [24/Dec/2019:10:09:42 +0000] \"GET / HTTP/1.1\" 200 95 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36\" \"-\""
}

用另一个索引的值丰富

我们将使用的最后一件事是 enrich Processor,它可用于使用来自另一个索引的数据来丰富你摄取的文档。 酷吧!

在你的管道中使用它之前,它具有三个先决条件:

  • 你需要有我们将从中获取丰富数据的源索引
  • 你需要定义一个丰富的策略来定义源索引、匹配字段和附加字段
  • 您需要 _execute 丰富策略来为该策略创建丰富索引

让我们扩展前面的示例并使用已知 IP 地址的源索引。我们将检查以附加一些数据并查看 IP 是否不在潜在的黑名单中。

首先创建源索引和一个文档(注意:使用 refresh 查询参数以确保索引立即可用于搜索):

PUT ip_source_index/_doc/1?refresh=wait_for
{"ip": "172.17.0.1","black_listed": false,"user_category": "test"
}

接下来将创建 enrich 策略。 很简单 —— 只需链接我们的源索引(我们在上面创建的),匹配 ip 字段并列出相关字段。

PUT _enrich/policy/ip_policy
{"match": {"indices": "ip_source_index","match_field": "ip","enrich_fields": ["black_listed","user_category"]}
}

我们需要执行它来创建丰富的索引:

POST _enrich/policy/ip_policy/_execute

现在我们终于可以将 enrich Processor 添加到我们之前的 nginx 管道中了。 我们需要引用丰富策略、我们将匹配的字段(我们在上一步中提取的 remote_addr)、丰富数据的目标字段和 max_matches(要包含的匹配文档的最大数量)。 将以下内容添加到处理器中……

PUT _ingest/pipeline/nginx_pipeline
{"processors": [{"dissect": {"field": "message","pattern": "%{remote_addr} - %{remote_user} [%{time_local}] \"%{request}\" %{status} %{body_bytes_sent} \"%{http_referer}\" \"%{http_user_agent}\" \"%{http_x_forwarded_for}\""}},{"date": {"field": "time_local","formats": ["dd/MMM/yyyy:HH:mm:ss Z"],"timezone": "Europe/Prague"}},{"enrich": {"policy_name": "ip_policy","field": "remote_addr","target_field": "meta.ip","max_matches": "1"}}]
}

现在只需(重新)索引之前的文档(使用相同的 nginx 日志行)并提取日志内容,然后从我们的 ip_source_index 索引中丰富。

DELETE nginx_indexPUT nginx_index/_doc/1?pipeline=nginx_pipeline
{"message": "172.17.0.1 - - [24/Dec/2019:10:09:42 +0000] \"GET / HTTP/1.1\" 200 95 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36\" \"-\""
}GET nginx_index/_search?filter_path=**.hits

上面命令返回的文档结果为:

{"hits": {"hits": [{"_index": "nginx_index","_id": "1","_score": 1,"_source": {"remote_user": "-","remote_addr": "172.17.0.1","request": "GET / HTTP/1.1","@timestamp": "2019-12-24T11:09:42.000+01:00","http_referer": "-","meta": {"ip": {"black_listed": false,"user_category": "test","ip": "172.17.0.1"}},"body_bytes_sent": "95","http_x_forwarded_for": "-","time_local": "24/Dec/2019:10:09:42 +0000","message": "172.17.0.1 - - [24/Dec/2019:10:09:42 +0000] \"GET / HTTP/1.1\" 200 95 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36\" \"-\"","status": "200","http_user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36"}}]}
}

“生产” 建议

最后,我想向你指出其他概念和功能,当你想在实际场景中开始使用管道时,摄取管道本身或与它们结合使用时很有用。

与索引模板一起使用

如果你 rollover 索引(在处理时间序列数据时可能应该这样做,大小超过数十 GB 等),甚至更多,如果你通过索引生命周期管理(我倾向于这样做)实现这些自动化,绝对推荐使用索引模板。 这些有助于为从索引模板创建的所有索引形式化配置(设置和映射)。

从管道的角度来看,你可以在设置中指定:

  • index.default_pipeline 如果在请求中没有直接指定其他管道(如果默认被覆盖),则默认应用管道。
  • 每次在默认管道或请求管道之后运行的 index.final_pipeline

在包含这些之前,只需确保你的管道存在,否则你的请求将失败。

管道模拟

绝对有用(当你的管道就位并需要执行一些更改时)是通过 _simulate API 测试这些。 你可以在请求正文中指定新的管道定义以及几个测试文档并获得结果,就好像这些已被处理一样。

POST /_ingest/pipeline/_simulate
{"pipeline": {"processors": [...]},"docs": [{"_index": "index","_id": "id","_source": {// your doc here}}]
}

在上面的代码中,我已经展示了如何使用 _smulate 这个端点来测试你的管道。

故障处理

确保处理管道执行期间发生的最终故障。 通过在特定处理器级别或整个管道级别定义 on_failure 块(由其他一些处理器组成)来实现。

例如。 将有问题的文档传递给不同的索引。

{"processors": [ … ],"on_failure": [{"set": {"field": "_index","value": "failed-{{ _index }}"}}]
}

更多阅读,请参考我之前的另外一篇文章 “Elasticsearch:如何正确处理 Elasticsearch 摄取管道故障”。

空值的处理

对于需要条件执行的情况(即仅当字段存在时),请确保处理引用字段缺失或具有无效值的情况。 你可以通过某些处理器(转换、重命名、删除……)提供的 ignore_missing 属性或在 if 块中执行此操作。

"if": "ctx.containsKey('foo') && ctx['foo'].containsKey('bar')"

重建索引 - reindex

通常当你发现摄取管道时,你的现有索引中已经有大量数据。 要让新的闪亮管道处理这些数据,只需创建一个新索引并使用 reindex API 来复制数据。 要处理数据,请在索引设置或重新索引请求正文中指定管道。

POST /_reindex
{"source": {"index": "source-index-*"},"dest": {"index": "destination-index","pipeline": "my_pipeline"}
}

注意:使用 ILM 重建索引时要更加小心。

总结

我们研究了进气管道的各种功能并对其进行了测试。 希望你喜欢这个介绍,并且你可以在你的场景中看到一些潜力。

Elasticsearch:从零开始到搜索 - 使用 Elasticsearch 摄取管道玩转你的数据相关推荐

  1. Elasticsearch:如何正确处理 Elasticsearch 摄取管道故障

    在我之前的文章 "Elastic:开发者上手指南" 中的 "Ingest pipeline" 章节中个,我有很多文章是关于 ingest pipeline 的. ...

  2. 【Elasticsearch】如何使用 Elasticsearch 6.2 搜索中文、日语和韩语文本 - 第 3 部分:语言检测工具

    1.概述 翻译:https://www.elastic.co/cn/blog/how-to-search-ch-jp-kr-part-3 这是我有关中文.日语和韩语文本搜索的系列文章的第 3 部分.如 ...

  3. 搜索引擎|全文搜索技术Elasticsearch

    总结搜索引擎技术的知识归纳,工作中用到过 ES,以此拓展知识面. 文章目录 1 全文检索技术 2 倒排索引 3 ES及其优点 4 ES术语及其概念 5 ES对外提供的接口形式 6 索引 7 映射 8 ...

  4. 一文详解 | 开放搜索兼容Elasticsearch做召回引擎

    简介:开放搜索发布开源兼容版,支持阿里云Elasticsearch做搜索召回引擎,本文详细介绍阿里云ES用户如何通过接入开放搜索兼容版丰富行业分词库,提升查询语义理解能力,无需开发.算法投入,即可获得 ...

  5. Laravel 使用 scout 集成 elasticsearch 做全文搜索

    安装需要的组件 composer require tamayo/laravel-scout-elastic composer require laravel/scout 如果composer requ ...

  6. 【Elasticsearch】十九种Elasticsearch字符串搜索方式

    1.概述 十九种Elasticsearch字符串搜索方式 刚开始接触Elasticsearch的时候被Elasticsearch的搜索功能搞得晕头转向,每次想在Kibana里面查询某个字段的时候,查出 ...

  7. 【Elasticsearch】十九种Elasticsearch字符串搜索方式终极介绍 各种 查询

    本文为博主九师兄(QQ:541711153 欢迎来探讨技术)原创文章,未经允许博主不允许转载. 可以加我问问题,免费解答,有问题可以先私聊我,本人每天都在线,会帮助需要的人. 但是本博主因为某些原因, ...

  8. 【Elasticsearch】如何使用 Elasticsearch 6.2 搜索中文、日文和韩文文本 - 第 2 部分: 多字段

    1.概述 翻译:https://www.elastic.co/cn/blog/how-to-search-ch-jp-kr-part-2 如何使用 Elasticsearch 6.2 搜索中文.日文和 ...

  9. elasticsearch的rest搜索--- 查询

    目录: 一. 针对这次装B 的解释 二.下载,安装插件elasticsearch-1.7.0   三.索引的mapping 四. 查询 五.对于相关度的大牛的文档 四. 查询 1. 查询的官网的文档 ...

最新文章

  1. C# JSON使用的常用技巧(一)
  2. 从一道面试题谈linux下fork的运行机制
  3. JuPyter(IPython) Notebooks中使用pip安装Python的模块
  4. python 主要内容,介绍一些有关python的重要内容
  5. word List29
  6. mysql语法替换字符串
  7. Java日常错误及需要注意细节,持续更新......
  8. (计算机组成原理)第二章数据的表示和运算-第一节3:字符与字符串在计算机中的表示详解
  9. MySQL 大表优化方案,收藏了细看!
  10. java 提取视频缩略图_如何使用java提取视频缩略图或某一帧的图片
  11. SpringBoot Web项目 解析
  12. 作为项目经理你应该掌握的关键链法
  13. Java反射原理和实际用法
  14. Centos安装postgresql数据库
  15. 《上古卷轴5:天际》控制台代码之特技
  16. Servlet Jsp(个人笔记)
  17. Scratch软件编程等级考试四级——20200319
  18. iOS各机型参数对比
  19. 整理任正非思想:从二则空难事故看员工培训的重要性-1994
  20. 那些被疯狂追求的女孩,后来怎么样了?

热门文章

  1. CSDN明星博主Leo新作——《程序员羊皮卷》即将隆重上市
  2. 华为鸿蒙新平板,华为鸿蒙Beta 3.0 版本推送更新 预装鸿蒙新平板获入网许可
  3. 我成功转型大厂数据分析师后,总结了这些经验…
  4. matlab计算下列极限,MATLAB微积分计算极限,又快又好
  5. 使用Java 2D绘制黑白太极图案
  6. 【电泳仪品牌】生科必知的电泳仪品牌
  7. 图片转视频python/ffmpeg
  8. 记微信开发者工具登录网络连接失败
  9. 短语、直接短语、句柄、素短语
  10. 崔老哥python scrapy爬虫框架入门