当我们需要创建 Elasticsearch 索引时,数据源通常没有规范化,无法直接导入。 原始数据可以存储在数据库、原始 CSV/XML 文件中,甚至可以从第三方 API 获取。 在这种情况下,我们需要对数据进行预处理以使其与 Bulk API 一起使用。 在本教程中,我们将演示如何使用简单的 Python 代码从 CSV 文件中索引 Elasticsearch 文档。 将使用原生 Elasticsearch bulk API 和 helpers 模块中的 API。 你将学习如何在不同的场合使用合适的工具来索引 Elasticsearch 文档。

在之前的文章 “Elasticsearch:关于在 Python 中使用 Elasticsearch 你需要知道的一切 - 8.x”,我展示了如何使用 bulk API 来索引文档到 Elasticsearch 中。细心的开发者可能观察到,如果我们的文档很多,数据量很大,那个方法可能并不适用,这是因为所以的操作都是在内存里进行操作的。如果我们的原始文档很大,这极有可能造成内存不够的情况。在今天的文章中,我将探讨使用 Python 里的 generator 来实现。

为了方便测试,我们的数据可以从 https://github.com/liu-xiao-guo/py-elasticsearch8 中获取。data.csv 将是我们使用的原始数据。

安装

为了方便进行测试,我们将采用我之前的文章 “Elasticsearch:如何在 Docker 上运行 Elasticsearch 8.x 进行本地开发” 来进行部署。在这里我们采用 docker compose 来进行安装 Elasticsearch 及 Kibana。我们将不采用安全设置。更多关于如何在具有安全性的条件下使用 Python 来连接 Elasticsearch,请参考之前的文章 “Elasticsearch:关于在 Python 中使用 Elasticsearch 你需要知道的一切 - 8.x”。我们可以参考那篇文章来进行安装所需要的 Python 包。

在 Python 中创建索引

我们将创建与之前文章中演示的相同的 latops-demo 索引。  首先,我们将使用 Elasticsearch 客户端直接创建索引。 此外,settings 和 mappings 将作为顶级参数传递,而不是通过 body 参数传递。创建索引的命令是:

main.py

# Import Elasticsearch package
from elasticsearch import Elasticsearch
import csv
import json# Connect to Elasticsearch cluster
es = Elasticsearch( "http://localhost:9200")
resp = es.info()
print(resp)settings = {"index": {"number_of_replicas": 2},"analysis": {"filter": {"ngram_filter": {"type": "edge_ngram","min_gram": 2,"max_gram": 15,}},"analyzer": {"ngram_analyzer": {"type": "custom","tokenizer": "standard","filter": ["lowercase", "ngram_filter"],}}}
}mappings = {"properties": {"id": {"type": "long"},"name": {"type": "text","analyzer": "standard","fields": {"keyword": {"type": "keyword"},"ngrams": {"type": "text", "analyzer": "ngram_analyzer"},}},"brand": {"type": "text","fields": {"keyword": {"type": "keyword"},}},"price": {"type": "float"},"attributes": {"type": "nested","properties": {"attribute_name": {"type": "text"},"attribute_value": {"type": "text"},}}}
}configurations = {"settings": {"index": {"number_of_replicas": 2},"analysis": {"filter": {"ngram_filter": {"type": "edge_ngram","min_gram": 2,"max_gram": 15,}},"analyzer": {"ngram_analyzer": {"type": "custom","tokenizer": "standard","filter": ["lowercase", "ngram_filter"],}}}},"mappings": {"properties": {"id": {"type": "long"},"name": {"type": "text","analyzer": "standard","fields": {"keyword": {"type": "keyword"},"ngrams": {"type": "text", "analyzer": "ngram_analyzer"},}},"brand": {"type": "text","fields": {"keyword": {"type": "keyword"},}},"price": {"type": "float"},"attributes": {"type": "nested","properties": {"attribute_name": {"type": "text"},"attribute_value": {"type": "text"},}}}}
}INDEX_NAME = "laptops-demo"# check the existence of the index. If yes, remove it
if(es.indices.exists(index=INDEX_NAME)):print("The index has already existed, going to remove it")es.options(ignore_status=404).indices.delete(index=INDEX_NAME)# Create the index with the correct configurations
res = es.indices.create(index=INDEX_NAME, settings=settings,mappings=mappings)
print(res)# The following is another way to create the index, but it is deprecated
# es.indices.create(index = INDEX_NAME, body =configurations )

现在索引已创建。我们可以在 Kibana 中使用如下的命令来进行查看:

GET _cat/indices

我们可以开始向其中添加文档。

使用原生 Elasticsearch 批量 API

当你有一个小数据集要加载时,使用原生 Elasticsearch 批量 API 会很方便,因为语法与原生 Elasticsearch 查询相同,可以直接在 Dev 控制台中运行。 你不需要学习任何新东西。

将要加载的数据文件可以从这个链接下载。 将其保存为 data.csv,将在下面的 Python 代码中使用:

main.py

# Import Elasticsearch package
from elasticsearch import Elasticsearch
import csv
import json# Connect to Elasticsearch cluster
es = Elasticsearch( "http://localhost:9200")
resp = es.info()
# print(resp)settings = {"index": {"number_of_replicas": 2},"analysis": {"filter": {"ngram_filter": {"type": "edge_ngram","min_gram": 2,"max_gram": 15,}},"analyzer": {"ngram_analyzer": {"type": "custom","tokenizer": "standard","filter": ["lowercase", "ngram_filter"],}}}
}mappings = {"properties": {"id": {"type": "long"},"name": {"type": "text","analyzer": "standard","fields": {"keyword": {"type": "keyword"},"ngrams": {"type": "text", "analyzer": "ngram_analyzer"},}},"brand": {"type": "text","fields": {"keyword": {"type": "keyword"},}},"price": {"type": "float"},"attributes": {"type": "nested","properties": {"attribute_name": {"type": "text"},"attribute_value": {"type": "text"},}}}
}configurations = {"settings": {"index": {"number_of_replicas": 2},"analysis": {"filter": {"ngram_filter": {"type": "edge_ngram","min_gram": 2,"max_gram": 15,}},"analyzer": {"ngram_analyzer": {"type": "custom","tokenizer": "standard","filter": ["lowercase", "ngram_filter"],}}}},"mappings": {"properties": {"id": {"type": "long"},"name": {"type": "text","analyzer": "standard","fields": {"keyword": {"type": "keyword"},"ngrams": {"type": "text", "analyzer": "ngram_analyzer"},}},"brand": {"type": "text","fields": {"keyword": {"type": "keyword"},}},"price": {"type": "float"},"attributes": {"type": "nested","properties": {"attribute_name": {"type": "text"},"attribute_value": {"type": "text"},}}}}
}INDEX_NAME = "laptops-demo"# check the existence of the index. If yes, remove it
if(es.indices.exists(index=INDEX_NAME)):print("The index has already existed, going to remove it")es.options(ignore_status=404).indices.delete(index=INDEX_NAME)# Create the index with the correct configurations
res = es.indices.create(index=INDEX_NAME, settings=settings,mappings=mappings)
print(res)# The following is another way to create the index, but it is deprecated
# es.indices.create(index = INDEX_NAME, body =configurations )with open("data.csv", "r") as fi:reader = csv.DictReader(fi, delimiter=",")actions = []for row in reader:action = {"index": {"_index": INDEX_NAME, "_id": int(row["id"])}}doc = {"id": int(row["id"]),"name": row["name"],"price": float(row["price"]),"brand": row["brand"],"attributes": [{"attribute_name": "cpu", "attribute_value": row["cpu"]},{"attribute_name": "memory", "attribute_value": row["memory"]},{"attribute_name": "storage","attribute_value": row["storage"],},],}actions.append(action)actions.append(doc)es.bulk(index=INDEX_NAME, operations=actions, refresh=True)# Check the results:
result = es.count(index=INDEX_NAME)
print(result)
print(result.body['count'])

我们运行上面的代码:

$ python main.py
The index has already existed, going to remove it
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'laptops-demo'}
{'count': 200, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}}
200

注意:在上面的 bulk 指令中,我们需要使用 refresh=True,否则当我们读出 count 的时候,它的值可能是 0。

在上面的代码中,有一个致命的问题就是我们在内存里创建 actions。如果我们的数据比较大的话,那么 actions 所需要的内存也会比较大。它显然不适合很大的数据的情况。

请注意,我们使用 csv 库方便地从 CSV 文件中读取数据。 可以看出,原生 bulk API 的语法非常简单,可以跨不同语言(包括 Dev Tools Console)使用。

使用批量助手 - bulk helper

如上所述,原生 bulk API 的一个问题是所有数据都需要先加载到内存,然后才能被索引。 当我们有一个大数据集时,这可能会出现问题并且效率很低。 为了解决这个问题,我们可以使用 bulk helper,它可以从迭代器(iterators)或生成器(generators)中索引 Elasticsearch 文档。 因此,它不需要先将所有数据加载到内存中,这在内存方面非常高效。 然而,语法有点不同,我们很快就会看到。

在我们使用 bulk helper 索引文档之前,我们应该删除索引中的文档以确认 bulk helper 确实成功工作。这个已经在我们上面的代码中已经完成了。然后我们可以运行以下代码使用批量助手将数据加载到 Elasticsearch:

main.py

# Import Elasticsearch package
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import csv
import json# Connect to Elasticsearch cluster
es = Elasticsearch( "http://localhost:9200")
resp = es.info()
# print(resp)settings = {"index": {"number_of_replicas": 2},"analysis": {"filter": {"ngram_filter": {"type": "edge_ngram","min_gram": 2,"max_gram": 15,}},"analyzer": {"ngram_analyzer": {"type": "custom","tokenizer": "standard","filter": ["lowercase", "ngram_filter"],}}}
}mappings = {"properties": {"id": {"type": "long"},"name": {"type": "text","analyzer": "standard","fields": {"keyword": {"type": "keyword"},"ngrams": {"type": "text", "analyzer": "ngram_analyzer"},}},"brand": {"type": "text","fields": {"keyword": {"type": "keyword"},}},"price": {"type": "float"},"attributes": {"type": "nested","properties": {"attribute_name": {"type": "text"},"attribute_value": {"type": "text"},}}}
}configurations = {"settings": {"index": {"number_of_replicas": 2},"analysis": {"filter": {"ngram_filter": {"type": "edge_ngram","min_gram": 2,"max_gram": 15,}},"analyzer": {"ngram_analyzer": {"type": "custom","tokenizer": "standard","filter": ["lowercase", "ngram_filter"],}}}},"mappings": {"properties": {"id": {"type": "long"},"name": {"type": "text","analyzer": "standard","fields": {"keyword": {"type": "keyword"},"ngrams": {"type": "text", "analyzer": "ngram_analyzer"},}},"brand": {"type": "text","fields": {"keyword": {"type": "keyword"},}},"price": {"type": "float"},"attributes": {"type": "nested","properties": {"attribute_name": {"type": "text"},"attribute_value": {"type": "text"},}}}}
}INDEX_NAME = "laptops-demo"# check the existence of the index. If yes, remove it
if(es.indices.exists(index=INDEX_NAME)):print("The index has already existed, going to remove it")es.options(ignore_status=404).indices.delete(index=INDEX_NAME)# Create the index with the correct configurations
res = es.indices.create(index=INDEX_NAME, settings=settings,mappings=mappings)
print(res)# The following is another way to create the index, but it is deprecated
# es.indices.create(index = INDEX_NAME, body =configurations )def generate_docs():with open("data.csv", "r") as fi:reader = csv.DictReader(fi, delimiter=",")for row in reader:doc = {"_index": INDEX_NAME,"_id": int(row["id"]),"_source": {"id": int(row["id"]),"name": row["name"],"price": float(row["price"]),"brand": row["brand"],"attributes": [{"attribute_name": "cpu","attribute_value": row["cpu"],},{"attribute_name": "memory","attribute_value": row["memory"],},{"attribute_name": "storage","attribute_value": row["storage"],},],},}yield dochelpers.bulk(es, generate_docs())
# (200, [])   -- 200 indexed, no errors.es.indices.refresh()# Check the results:
result = es.count(index=INDEX_NAME)
print(result.body['count'])

运行上面的代码。显示的结果如下:

$ python main.py
The index has already existed, going to remove it
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'laptops-demo'}
200

从上面的结果中我们可以看出来,我们已经成功地摄入了 200 个文档。

Elasticsearch:如何在 Python 中使用批量 API 为 Elasticsearch 索引文档相关推荐

  1. java过滤器python是啥_过滤器如何在python中使用softlayer API

    尝试以下python脚本: """ This script retrieves storage identifier through name Important man ...

  2. python批量读取csv并入库pg_如何通读CSV然后在Python中发布批量API调用

    看看这个片段:user_list = get_users() # ['user_1', 'user_2', ... , 'user_n'] TOTAL_USERS = len(user_list) s ...

  3. python读json文件数组_如何在python中从json文件读取json对象数组

    我有一个名为example.json的json文件,包含以下内容[{ "product/productId" : "XXX", "product/ti ...

  4. 如何在Python中解析YAML文件

    如何在Python中解析YAML文件? #1楼 不依赖C标头的最简单,最纯净的方法是PyYaml( 文档 ): #!/usr/bin/env pythonimport yamlwith open(&q ...

  5. linux中python如何调用matlab的数据_特征锦囊:如何在Python中处理不平衡数据

    今日锦囊 特征锦囊:如何在Python中处理不平衡数据 ? Index 1.到底什么是不平衡数据 2.处理不平衡数据的理论方法 3.Python里有什么包可以处理不平衡样本 4.Python中具体如何 ...

  6. python隐藏启动台_如何在Python中启动后台进程?

    如何在Python中启动后台进程? 我正在尝试将shell脚本移植到更易读的python版本. 原始shell脚本在后台使用"&"启动多个进程(实用程序,监视器等). 如何 ...

  7. float在python_如何在python中读取.float文件? - python

    Improve this question 我正在处理大脑MRI数据,它是.float数据. 您知道如何在python中使用它吗? 与 with open('[43x25520].float') as ...

  8. 如何在Python中获得当前的CPU和RAM使用率?

    本文翻译自:How to get current CPU and RAM usage in Python? What's your preferred way of getting current s ...

  9. 如何在python中找到两个日期时间对象之间的时差?

    本文翻译自:How do I find the time difference between two datetime objects in python? 如何分辨两个datetime对象之间的时 ...

最新文章

  1. iframe标签快速使用
  2. iOS使用 xcconfig配置文件的若干坑
  3. java聊天室程序_Java简易聊天室程序socket
  4. rust(19)-进制
  5. maven工程servlet实例之jar包冲突解决
  6. 多智能体连续行为空间问题求解——MADDPG
  7. 李宏毅机器学习(八)ELMo、BERT、GPT、XLNet、MASS、BART、UniLM、ELECTRA、others
  8. FckEditor配置详解
  9. tensorflow2 目标检测_BJX 系列 小型远距离检测型光电传感器代理报价
  10. asp.net 上传大文件控件
  11. 数据库基础:什么是SQL
  12. python语法学习第十天--类与对象
  13. matlab如何求距平,matlab编程语句
  14. Windows CMD常用命令大全(值得收藏)
  15. CAD快速看图软件中孔轴投影教程
  16. 兄弟们,以后不能愉快的抽烟
  17. 二进制转十六进制 算法实现思想
  18. html设置字体透明度,css怎么设置字体不透明度?
  19. Mac 好用的 Android 模拟器整理(玩游戏、装应用、支持咸鱼、拼多多...)
  20. 关于微信公众号,无法接受服务器消息的原因

热门文章

  1. 卷积神经网络CNN+唐宇迪博士CNN课程学习笔记
  2. 使用Eclipse调试Java程序的10个技巧
  3. ffmpeg采集视频,转码成h264格式,并播放
  4. 帆软初级证书 第一部分 Fine Report 答案
  5. 智能手环两极分化:Fitbit卖身,华为小米们扶摇直上
  6. Eclipse免费下载(附安装教程)
  7. 安装NetBeans提示找不到JDK
  8. 在ubuntu下完美安装RTX(解决离线、乱码问题)
  9. 【医学图像】图像分割系列.4
  10. 助力 VR/AR 等复杂图像场景极致高清,火山引擎夺得 NTIRE 大赛双料冠军