Elasticsearch使用Ingest Pipeline进行数据预处理
本文基于Elasticsearch7.x
Elasticsearch可以使用自身的Ingest Pipeline功能进行数据预处理, 无须借助Logstash.
Ingest Pipeline介绍
Ingest Pipeline 就是在文档写入Data Node之前进行一系列的数据预处理, 进行数据预处理的就是processor, 一组处理器构成了Pipeline. 所有的预处理都在Ingest Node上执行, 默认情况下所有节点都是Ingest Node.
常用的processor
- split processor
字符串切分成数组 - join processor
数组转化成字符串 - gsub processor
字符串替换 - set processor
创建或替换一个字段. - remove processor
移除一个字段 - rename processor
重命名一个字段 - lowercase processor
字符串小写化 - upcase processor
字符串大写化 - script processor
使用painless脚本进行复杂的处理 - …
语法
(1) 创建pipeline
description是对pipeline的描述, processors定义了一组处理器.
PUT /_ingest/pipeline/my_pipeline_id
{"description": "to split blog tags","processors": [{"split": {"field": "tags","separator": ","}}]
}
(2) 查询
GET /_ingest/pipeline/my_pipeline_id
(3) 删除
DELETE /_ingest/pipeline/my_pipeline_id
实例
(1) 创建pipeline
PUT /_ingest/pipeline/my_pipeline_id
{"description": "to split blog tags","processors": [{"split": {"field": "tags","separator": ","}},{"set": {"field": "views","value": "0"}}]
}
创建一个pipeline, 它的作用是将tags字段按逗号切分成数组, 创建或替换一个views字段, 值为0.
(2) 新增文档
新增一个文档, 指定pipeline为my_pipeline_id.
PUT /blogs/_doc/1?pipeline=my_pipeline_id
{"title": "Introducing big data......","tags": "hadoop,elasticsearch,spark","content": "You konw, for big data"
}
(3) 查看文档
GET /blogs/_doc/1
结果:
{"_index" : "blogs","_type" : "_doc","_id" : "1","_version" : 1,"_seq_no" : 0,"_primary_term" : 1,"found" : true,"_source" : {"title" : "Introducing big data......","content" : "You konw, for big data","views" : "0","tags" : ["hadoop","elasticsearch","spark"]}
}
可以看到tags字符串被切分成数组了, 同时新增了一个view字段.
使用Painless脚本定义Processor
Elasticsearch内置了很多processor, 可以进行一些简单的数据预处理, 但如果我们想进行复杂的数据预处理, 就需要使用painless脚本来自定义processor.
(1) 创建pipeline
我们使用painless脚本来自定义一个processor, 如果存在一个content的字段, 则新增一个content_length, 值为content字段值的长度.
PUT /_ingest/pipeline/my_pipeline_id
{"description": "to split blog tags","processors": [{"split": {"field": "tags","separator": ","}},{"script": {"source": """if(ctx.containsKey("content")){ctx.content_length = ctx.content.length();}else{ctx.content_length = 0;}"""}}]
}
(2) 新增文档
PUT /blogs/_doc/1?pipeline=my_pipeline_id
{"title": "Introducing big data......","tags": "hadoop,elasticsearch,spark","content": "You konw, for big data"
}PUT /blogs/_doc/2?pipeline=my_pipeline_id
{"title":"Introducing cloud computering","tags":"openstack,k8s","content":"You konw, for cloud"
}
(3) 查看文档
GET /blogs/_search
结果:
"hits" : [{"_index" : "blogs","_type" : "_doc","_id" : "1","_score" : 1.0,"_source" : {"title" : "Introducing big data......","content" : "You konw, for big data","content_length" : 22,"tags" : ["hadoop","elasticsearch","spark"]}},{"_index" : "blogs","_type" : "_doc","_id" : "2","_score" : 1.0,"_source" : {"title" : "Introducing cloud computering","content" : "You konw, for cloud","content_length" : 19,"tags" : ["openstack","k8s"]}}
]
Ingest Pipeline 和 Logstash 的对比
对比 | Logstash | Ingest Pipeline |
---|---|---|
数据输入与输出 | 支持从不同的数据源读取, 并写入不同的数据源 | 支持从 ES REST API 获取数据, 并且写入 Elasticsearch |
数据缓冲 | 实现了简单的数据队列, 支持重写 | 不支持缓冲 |
数据处理 | 支持大量的插件, 也支持定制开发 | 内置的插件, 可以开发 Plugin 进行扩展(Plugin 更新需要重启) |
配置和使用 | 增加了一定的架构复杂度 | 无需额外部署 |
Elasticsearch使用Ingest Pipeline进行数据预处理相关推荐
- Elasticsearch:Ingest pipeline 介绍
Ingest pipeline 可让你在索引之前对数据执行常见转换. 例如,你可以使用 pipeline 删除字段.从文本中提取值并丰富你的数据. Pipeline 由一系列称为处理器(process ...
- Elasticsearch:Ingest Pipeline 实践
相比较 Logstash 而言,由于其丰富的 processors 而受到越来越多人的喜欢.最重要的一个优点就是它基于 Elasticsearch 极具可拓展性和维护性而受到开发者的喜欢.我在之前创建 ...
- Elasticsearch:ingest pipeline 使用示例 - 解析常用日志格式
在本示例教程中,你将在索引之前使用 ingest pipeline 以通用日志格式解析服务器日志. 在开始之前,请检查摄取管道的先决条件. 你要解析的日志类似于以下内容: 127.0.0.1 user ...
- 机器学习之数据预处理
在sklearn之数据分析中总结了数据分析常用方法,接下来对数据预处理进行总结 当我们拿到数据集后一般需要进行以下步骤: (1)明确有数据集有多少特征,哪些是连续的,哪些是类别的 (2)检查有没有缺失 ...
- Elasticsearch Ingest Pipeline
文章目录 1. 需求:修复与增强写入的数据 2. Ingest Node 3. Pipeline & Processor 4. 使用 Pipeline 切分字符串 5. 为 ES添加一个 Pi ...
- Elasticsearch:如何处理 ingest pipeline 中的异常
在我之前的文章 "如何在 Elasticsearch 中使用 pipeline API 来对事件进行处理" 中,我详细地介绍了如何创建并使用一个 ingest pipeline.简 ...
- Elasticsearch:创建 Ingest pipeline
在 Elasticsearch 针对数据进行分析之前,我们必须针对数据进行摄入.在摄入的过程中,我们需要对数据进行加工,这其中包括非结构化数据转换为结构化数据,数据的转换,丰富,删除,添加新的字段等等 ...
- Elasticsearch:从零开始创建一个 ingest pipeline 处理器
实际上在我之前的文章: Elasticsearch:创建属于自己的 Ingest processor Elasticsearch:创建一个 Elasticsearch Ingest 插件 我已经详述了 ...
- ElasticSearch实战(三十六)-Ingest Pipeline 多管道处理器
在前文我们已经讲了 Ingest Pipeline 使用方法,除了单管道处理方式以外,它还支持多管道组合并行处理的方式对数据进行清洗,同时支持管道动态扩展和灵活组合,让数据清洗更加强大和实用. ...
最新文章
- 在 MOSS2007 集成 SQL Server 2008 报表服务
- 人工智能、大数据、云计算、机器学习和深度学习,主要有什么关系?
- 数据中心的“芯”竞争
- JavaScript对TreeView的操作全解
- flatform installer web 安装php_Windows server 2019 安装 IIS PHP 环境无标题笔记
- zookeeper删除节点的权限_zookeeper权限管理
- Unit5 Survival Shooter笔记3
- c# 解析Xml文件
- 微机接口技术实用教程(第2版)-任向民,王克朝,宗明魁-课后答案
- java LineRecordReader类解析
- JAVA代码爬虫获取网站信息
- linux qt qpa linuxfb,Qt 5.4带有Tslib的Linux触摸屏输入在Raspberry Pi上无法使用LinuxFB QPA平台插件...
- CATIA V5实战培训设计视频教程-基础建模 逆向造型A面 装配工程图
- octobercms mysql_如何在Ubuntu 16.04 LTS上安装OctoberCMS
- Monero GUI Wallet发送交易源码分析
- java开灯问题_算法题-开灯问题
- 如何把应用程序和资料转移到新的硬盘?
- openGL, mac 上 glad 的环境搭建
- 求两个数的 最大公约数 和最小公倍数
- 关于如何设置收藏本站和设为首页