python连续写入数据之间用什么隔开_elasticsearch之使用Python批量写入数据
顺序写入100条
现在我们如果有大量的文档(例如10000000万条文档)需要写入es的某条索引中,该怎么办呢?之前学过的一次插入一条肯定不行:
import time
from elasticsearch import Elasticsearch
es = Elasticsearch()
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
print('共耗时约 {:.2f} 秒'.format(time.time() - start))
return res
return wrapper
@timer
def create_data():
""" 写入数据 """
for line in range(100):
es.index(index='s2', doc_type='doc', body={'title': line})
if __name__ == '__main__':
create_data() # 执行结果大约耗时 7.79 秒
上例为顺序向es的s2索引(该索引已存在)写入100条文档,而且值也仅是数字。却花费了大约7秒左右,这种速度在大量数据的时候,肯定不行。那怎么办呢?
批量写入100条
现在,来介绍一种批量写入的方式:
import time
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch()
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
print('共耗时约 {:.2f} 秒'.format(time.time() - start))
return res
return wrapper
@timer
def create_data():
""" 写入数据 """
for line in range(100):
es.index(index='s2', doc_type='doc', body={'title': line})
@timer
def batch_data():
""" 批量写入数据 """
action = [{
"_index": "s2",
"_type": "doc",
"_source": {
"title": i
}
} for i in range(10000000)]
helpers.bulk(es, action)
if __name__ == '__main__':
# create_data()
batch_data() # MemoryError
我们通过elasticsearch模块导入helper,通过helper.bulk来批量处理大量的数据。首先我们将所有的数据定义成字典形式,各字段含义如下:
_index对应索引名称,并且该索引必须存在。
_type对应类型名称。
_source对应的字典内,每一篇文档的字段和值,可有有多个字段。
首先将每一篇文档(组成的字典)都整理成一个大的列表,然后,通过helper.bulk(es, action)将这个列表写入到es对象中。
然后,这个程序要执行的话——你就要考虑,这个一千万个元素的列表,是否会把你的内存撑爆(MemoryError)!很可能还没到没到写入es那一步,却因为列表过大导致内存错误而使写入程序崩溃!很不幸,我的程序报错了。下图是我在生成列表的时候,观察任务管理器的进程信息,可以发现此时Python消耗了大量的系统资源,而运行es实例的Java虚拟机却没什么变动。
解决办法是什么呢?我们可以分批写入,比如我们一次生成长度为一万的列表,再循环着去把一千万的任务完成。这样, Python和Java虚拟机达到负载均衡。
下面的示例测试10万条数据分批写入的速度:
import time
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch()
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
print('共耗时约 {:.2f} 秒'.format(time.time() - start))
return res
return wrapper
@timer
def batch_data():
""" 批量写入数据 """
# 分批写
# for i in range(1, 10000001, 10000):
# action = [{
# "_index": "s2",
# "_type": "doc",
# "_source": {
# "title": k
# }
# } for k in range(i, i + 10000)]
# helpers.bulk(es, action)
# 使用生成器
for i in range(1, 100001, 1000):
action = ({
"_index": "s2",
"_type": "doc",
"_source": {
"title": k
}
} for k in range(i, i + 1000))
helpers.bulk(es, action)
if __name__ == '__main__':
# create_data()
batch_data()
注释的内容是使用列表完成,然后使用生成器完成。结果耗时约93.53 秒。
较劲,我就想一次写入一千万条
经过洒家多年临床经验发现,程序员为什么掉头发?都是因为爱较劲!
上面的例子已经不错了,但是仔细观察,还是使用了两次for循环,但是代码可否优化,答案是可以的,我们直接使用生成器:
import time
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch()
def timer(func):
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
print('共耗时约 {:.2f} 秒'.format(time.time() - start))
return res
return wrapper
@timer
def gen():
""" 使用生成器批量写入数据 """
action = ({
"_index": "s2",
"_type": "doc",
"_source": {
"title": i
}
} for i in range(100000))
helpers.bulk(es, action)
if __name__ == '__main__':
# create_data()
# batch_data()
gen()
我们将生成器交给es去处理,这样,Python的压力更小了,你要说Java虚拟机不是压力更大了,无论是分批处理还是使用生成器,虚拟机的压力都不小,写入操作本来就耗时嘛!上例测试结果大约是耗时90秒钟,还行,一千万的任务还是留给你去测试吧!
欢迎斧正,that's all
python连续写入数据之间用什么隔开_elasticsearch之使用Python批量写入数据相关推荐
- 怎么将excel中的一列数据变成一行数据,且数据之间用逗号隔开。
系统:window 10 工具:office-excel 原始数据如图: 第一步 :选中需要设置的单元格,右键选择->> 设置单元格样式->>数字那一列选择自定义 第二步:在类 ...
- Python代码循环执行exe文件,并传入命令参数实现批量处理数据。
python调用exe程序 传入命令参数,并实现循环批处理文件 1.问题背景 2.面临的问题 3.代码实现(案例) 4.总结 1.问题背景 参加MARS数据医疗赛道进行计算机视觉的比赛,比赛内容为赛题 ...
- MySQL单表数据量超1亿,根据 索引列 批量删除数据
我的场景:MySQL8有个表数据量超1亿,然后我要根据某个例(一对多)删除数据, 我直接用:delete from 表 where 字段 in (select 其他表) 条件用in的方式执行报 ...
- c语言把字符变成asc11值,PLC字符与数据之间如何进行转换?
学习PLC编程过程中不可避免会接触到数据运算.比较等操作,当两个操作数类型不同时就需要进行转换,所以数据转换指令也是编程的重要指令.但是很多人往往不是太了解ASCII字符及字符与数据之间如何转换,本文 ...
- 大数据时代来临了,你需要了解什么是大数据
现在的社会是一个高速发展的社会,科技发达,信息流通,人们之间的交流越来越密切,生活也越来越方便,大数据就是这个高科技时代的产物. 大数据又称巨量资料,指的是需要新处理模式才能具有更强的决策力.洞察力和 ...
- 2015年《大数据》高被引论文Top10文章No.2——大数据时代的数据挖掘 —— 从应用的角度看大数据挖掘(上)...
2015年<大数据>高被引论文Top10文章展示 [编者按]本刊将把2015年<大数据>高被引论文Top10的文章陆续发布,欢迎大家关注!本文为高被引Top10论文的No.2, ...
- flink批量写入oracle,批量写入目标表存在重复写入问题
测试场景 源表字符串类型数据存储到目标表整型数据类型,批量写入,发现有重复写入目标表问题 测试的时候发现:当批量写入抛出异常的时候,产生2个结果,1.写入部分数据 2.程序转到处理异常的代码块,在异常 ...
- 循环——批量处理数据
一.循环的意义 1.我们使用循环来做重复的事情,从而达到效率的提升. 2.在Python中有两种类型的循环,第一种叫做for循环(for-loop),第二种叫做while循环(while-loop). ...
- 如何快速插入大量/批量随机数据到数据库(oracle/sqlserver/mysql/postgresql)
在日常的数据库开发和测试中,需要创建一些测试的表,并构造一下假的数据.这时就需要向表中插入随机数据,特别是插入大量随机数据以获取更好的验证.笔者在开发和应用中,也遇到了很多类似的问题,对于不同的数据库 ...
- 数据库批量添加数据的方法
数据库批量添加数据的方法 使用查询列表的办法来批量添加数据. 比如: insert into 表名 (字段名) select 字段名 from 自定义表 在数据库函数这里写dbo.split方法,分割 ...
最新文章
- Java 17正式发布, Oracle宣布免费提供!“版本任你发,我用Java 8”或成历史?...
- 常用API-1(Object类、String类、StringBuffer类、StringBuilder类)
- PLS-00215:字符串长度限制在范围
- Qt工作笔记-对qmake的认识【两篇转载结合】
- 最流行的轻量级php框架,推荐20个最近很流行的优秀PHP框架
- JAVA-数据类型、变量、常量
- SDR与DDR的区别
- python报错Nonetype object is not iterable
- Idea Debug多线程不进断点问题处理
- html在线生成字体,手写字体在线生成
- 自定义 Spring Starter
- Chapter8(周旭)
- day24.open 打开文件操作
- 【BZOJ5405】platform(二分,SA,线段树)
- 【MRI】解决DPABI计算功能连接时路径冲突
- .NET的数据库编程技术
- EEGLAB | 创建events事件导入eeglab
- 现有的画笔和创建自己的画笔6zhongGDI
- iOS 自定义验证码输入框
- 汽车二自由度相关数学表达式(持续更新)
热门文章
- 【手写字母识别】基于matlab GUI模板匹配手写大写字母识别【含Matlab源码 115期】
- 【游戏】基于matlab GUI可调电扇设计【含Matlab源码 1110期】
- 【优化预测】基于matlab狼群算法优化BP神经网络预测【含Matlab源码 658期】
- 【语音编码】基于matlab LPC编解码【含Matlab源码 554期】
- mfc 制作不同的文档模板mdi不同的子窗体_法律行业python教程——利用python批量制作律师函...
- 创建dqn的深度神经网络_深度Q网络(DQN)-III
- 自然语言理解gpt_GPT-3:自然语言处理的创造潜力
- 社会达尔文主义 盛行时间_新达尔文主义的心理理论
- 【reproject_inter】fits头文件的映射(1,改变fits文件的数据范围,2,对坐标系进行投影转换)
- 例4.7 素数 - 九度教程第51题(素数筛法)