前提

刚进公司就给我整个Json文件解析入库ES的任务,丢了个网站给我(https://opendata.rapid7.com/sonar.fdns_v2/,有兴趣的可以下载一个玩玩),要我下载一个最大的包解析,讲真的,有点懵也不敢说,就下载30g的那个,解压出来将近300g。当时脑子里只有一句话:不能往内存里读。我寻思这起码有几十亿的数据把。突然就达到我职业生涯(实习生)以来见过最大的数据。

需求

以下数据是我下载的一个大概几十m的小文件,里面是这样的数据,每个文件也有些出入,键是一样的,值可能会有不同。:

然后呢,需要先解析一下,换成我们需要的格式,大致是这个样子:

name字段提取出域名,然后提取域名主域,分别赋值给domain和sub_domian字段,value提取 value字段里的第一个ip4地址,time取timestamp。
基本就是这样,根据文档的不同,解析条件会有点不同。

实践1 logstash

logstash这部分我处理的文件是那个几十m的小文件,因为那个30g的json解压花了我很多时间,所以中间的等待时间就先练下手。

我刚开始是用java,将文件解析出来 然后再写到另一个文件,然后使用logstash直接导入json文件到ES。这里如何解析的我就不说了,反正我整了个csv文件出来,这里logstash可以导入json也可以导入csv文件。

步骤1

准备解析好的csv文件,安装logstash,这里随便装一下就行了,也没啥好配置的,因为我就只要使用导入下数据,我们需要在bin里面,编写这个logstash.conf的文件。

步骤2

logstash.conf:

input {file{#设置csv文件路径,多个文件路径可设置成数组[],模糊匹配用*#指定单一文件path => "D:/logstash-7.12.0/data/plugins/inputs/file/last.csv"#可设置成begining或end,begining表示从头开始读取文件,end表示读取最新数据,可和ignore_older一起使用#begining只针对首次启动是否需要读取所有的历史数据,而当csv文件修改了之后,同样会自动增量更新新数据start_position => "beginning"#codec => plain {#  charset => "ISO-8859-1"#}}}
#2.过滤格式化数据阶段
filter {#读取csv文件csv {#设置拆分符为逗号separator => ","#指定csv文件的字段,必须要和csv文件中的字段顺序一致columns => ["domain","sub_domain","time","type","value"]}#mutate{#删除无效的字段,可自定义#   remove_field => ["@version","message","host","path"]#}
}
#3.数据输出到ES阶段
output {stdout {codec => json_lines}elasticsearch {hosts => ["localhost:9200"]index => "test"#document_id => "%{mhlkdzjlbh}%{gmsfhm}"}
}

步骤3

先启动es的服务,然后再到 logstash 的bin目录下运行 logstash -f logstash.conf

有一说一,今天竟然运行成功了,我上次使用这个的时候还报了一些错,大致是说数据量太大es处理不过来,然后拒绝插入数据了。

这可能和电脑内存使用也有点关系吧,可能那天服务开太多了。这个logstash读取文件的速度很快,然后往es里面传,es可能会处理不过来,然后数据堆积,导致拒绝连接,错误这里一时半会整不出来了,有机会再更新一下。

实践2 python +bulk

这里就是开始300g的json文件解析了。因为数据需要处理后在使用,所以得写个解析的程序先把文件读一遍,如果继续使用LogStash的话应该需要解析json后然后再写到一个文件进行一些切片处理。这个话,后面可以考虑一下,现在既然使用到python了就先用python掉bulk接口批量插入试一下。

首先看一下这个json解析的处理代码:

# coding:utf8
import io
import json
import datetime
import logging
import timefrom es_process import ElasticObj'''处理json数据部分,将json解析成需要的格式'''# 打印日志,粗略记录一下是否是报错
logging.basicConfig(level=logging.WARNING,filename="log_file/json_es1018.log",filemode="w",format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')class json_analysis:def __init__(self, file_name, num, es_index, es_type, es_ip):self.es_ip = es_ipself.es_type = es_typeself.es_index = es_indexself.num = numself.file_name = file_name# 这是个计数的 删掉也行line_count = 0def read_json_all(self):with io.open(self.file_name, 'r', encoding='utf-8') as f:self.read_json_thousand(f)def read_json_thousand(self, f):global line_countnew_line_dict_array = []for line in f:try:line_dict = json.loads(line)new_line_dict = {}# print (line_count)# 获取类型 只需要类型为a的if line_dict["type"] is None or line_dict["type"] != "a":continuenew_line_dict["type"] = line_dict["type"]# 获取值if line_dict["value"] is None:continuenew_line_dict["value"] = line_dict["value"]# 格式化时间if line_dict["timestamp"] is not None:dateArray = datetime.datetime.fromtimestamp(long(line_dict["timestamp"]))new_line_dict["time"] = dateArray.strftime("%Y-%m-%d")# todo 需要将name中的主域名提取出来  先把name直接传进去使用if line_dict["name"] is not None:new_line_dict["domain"] = line_dict["name"]new_line_dict["sub_domain"] = line_dict["name"]# if line_dict["name"] is not None and re.match("_mta-sts.",line_dict["name"]):#     line_dict["name"]=re.new_line_dict_array.append(new_line_dict)line_count += 1if len(new_line_dict_array) % self.num == 0:logging.error("startTime:" + time.strftime("%H:%M:%S", time.localtime(time.time())))# es导入操作# 设置 index type ip地址obj = ElasticObj(self.es_index, self.es_type, ip=self.es_ip)obj.create_index()obj.insert_data_array(new_line_dict_array)logging.error("endTime:" + time.strftime("%H:%M:%S", time.localtime(time.time())))new_line_dict_array = []except Exception as e:logging.debug(e)if __name__ == "__main__":logging.error("==startTime==:" + time.strftime("%H:%M:%S", time.localtime(time.time())))# 读取文件位置 读取的数据量 index type ip地址# 30000是准备bulk一次批量处理的数据json_analysis = json_analysis(r"./data_file/2021-08-01-1627776546-fdns_txt_mx_mta-sts.json", 30000,"yyq_test_1018","domain", "127.0.0.1")json_analysis.read_json_all()logging.error("==endTime==:" + time.strftime("%H:%M:%S", time.localtime(time.time())))

然后再是es处理的代码:

# coding:utf8
import loggingfrom elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import io'''这是处理es创建索引,调用bulk之类的代码'''class ElasticObj:def __init__(self, index_name, index_type, ip):self.index_name = index_nameself.index_type = index_type# 无用户名密码状态self.es = Elasticsearch([ip])# 用户名密码状态# self.es = Elasticsearch([ip],http_auth=('elastic', 'password'),port=9200)def create_index(self):# 创建映射_index_mappings = {"mappings": {self.index_type: {"properties": {"sub_domain": {'type': 'string'},"type": {'type': 'string'},"value": {'type': 'string'},"domain": {'type': 'string'},"time": {'type': 'date'}}}}}if self.es.indices.exists(index=self.index_name) is not True:# 创建index 忽略400错误res = self.es.indices.create(index=self.index_name, body=_index_mappings, ignore=400)print(res)# 插入文件数据def insert_data_array(self, data):ACTIONS = []i = 1bulk_num = len(data)for list_line in data:action = {"_index": self.index_name,"_type": self.index_type,# "_id": i,  # _id 也可以默认生成,不赋值"_source": {"sub_domain": list_line["sub_domain"],"type": list_line["type"],"value": list_line["value"],"domain": list_line["domain"],"time": list_line["time"]},}i += 1ACTIONS.append(action)# 批量处理if len(ACTIONS) == bulk_num:success, _ = bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True)del ACTIONS[0:len(ACTIONS)]logging.error("success")if len(ACTIONS) > 0:success, _ = bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True)del ACTIONS[0:len(ACTIONS)]print('Performed %d actions' % success)if __name__ == '__main__':# 设置 index type ip地址obj = ElasticObj("======101101=====", "en", ip="127.0.0.1")obj.create_index()obj.insert_data(r"D:\Download\2021-08-27-1630023156-fdns_any.json")

目前就写了这两个代码,能用是真的,但是我觉得效率不是很高。之前是一次读取10w条数据调用bulk准备一次性传入es,刚开始是没什么问题,但是后面插入到接近两亿数据的时候,es就崩了……

现在以问答的形式记录一些问题和待处理的事项

Q1:为什么不用多线程和异步来提高效率

  • 首先说一下为啥没有异步处理,刚开始是有想到一边读取数据然后一边调用bulk来加快处理效率的,但是考虑到es本身的处理效率不够,如果一直往es里批量插入的,可能会导致数据堆积太多,es处理不过来直接给压垮不工作了。
  • 第二个多线程,是有想使用多线程来处理读取数据这一块的,但是python这块的多线程着实还没弄明白,之前试图使用python的多线程成来处理这个文件,结果python封装的多线程需要把这个文件全部读出来才能进行下一步多线程分配,但是如果全部读出来,电脑也就宕机了。所以就还没想好要咋用多线程处理这个文件= =,兄弟们要是有什么好方法可以提点我一下。

Q2:处理速度

  • 读取10w条数据大概是个两三秒的样子(后续我把我当时开发用的电脑配置贴一下),而es处理10w条数据则是很慢,也没用监控它一秒能插入多少数据(不会),就肉眼看了一下,快的时候大概5k-8k tps,慢的时候都可以慢到500tps,至于为什么慢了,我也不太清楚,可能是服务器内存?或者网络原因?

Q3:300g文件到底插入成功了没有

  • 没有。这个程序执行了有好几次,有因为网络原因中断了,有因为es处理不过来然后拒绝处理了……大部分时候都是因为es的原因拒绝处理,代码还是可以用的,处理个百万千万的数据应该不是什么问题。之前也没记录那些报错信息,如果后续再处理到这个es的问题的时候会把报错信息沾上来(最近再处理别的工作)

Q4:es配置

这个es的版本比较低,pip安装库的时候记得选择和es版本相对应的库,不然可能会报错。

有试图跟着网上的一些教程修改过一些参数,cpu也跑到百分之一百多……但是没什么很大的效果。然后领导说这个es配置不让动……就只能按着这个配置进行插入操作了

Q5:开发机的配置

因为工作环境这边连不上研发中心的某些网段,所以只能向日葵到远程机上进行开发了,开发机的配置就这样了

总结

因为最近再忙别的工作了,这一块需求也不是很紧急,目前就暂停在这个地方,后续再启动研究。我觉得代码可优化的空间挺小的,就那个读取json文件的地方需要处理一下,最主要的还是研究一下这个es的处理速度问题,看一下es集群的读写是怎么个工作的。现在就记录到这了,有兴趣的兄弟可以跟我探讨一下,给我点建议啥的 ^ o ^ !

【记录1】300G Json文件入库ES相关推荐

  1. python es 数据库_Python将json文件写入ES数据库的方法

    1.安装Elasticsearch数据库 PS:在此之前需首先安装Java SE环境 下载elasticsearch-6.5.2版本,进入/elasticsearch-6.5.2/bin目录,双击执行 ...

  2. 利用shell脚本将json文件导入es

    现要将保存在一个目录下所有后缀名为json文件导入es集群,每个json文件中都按回车行分隔的json数据,下面是利用shell脚本完成此功能的代码. 在运行脚本之前,先创建索引. [root@nod ...

  3. 把json数据导入linux,使用json文件给es中导入数据

    使用json文件可以给es中导入数据,10万条左右的数据可以一次导入,数量太大时导入就会报错.大数量的到导入还是需要用bulk方式. accounts.json文件格式如下: {"index ...

  4. python列表json_python-带有列表的JSON_normalize JSON文件包含字...

    这是我正在处理2条记录的示例json文件: [{"Time":"2016-01-10", "ID" :13567, "Conten ...

  5. 微信小程序云开发——常用功能2:操作云数据库一键批量导入数据(导入json文件)

    微信小程序云开发--常用功能2:操作云数据库一键批量导入数据(导入json文件) 今天我们要添加100条数据.下面的过程是先创建一条记录,然后导出这条数据看json文件中是如何编辑字段的,然后仿照这个 ...

  6. es elasticsearch 几种常见查询场景 二次分组 java读取es的查询json文件

    大家好,我是烤鸭: es中几种常见的查询场景,使用java读取es的json文件进行查询. es 中文使用手册. https://www.elastic.co/guide/cn/elasticsear ...

  7. JSON文件的应用——记录类型的用户数据存储

    章节索引 前提 从问题出发 JSON和XML 专精JSON JSON文件读与写 (1)读JSON文件 (2)正向映射 (3)反向映射 (4)写JSON文件 后记 前提 之前一直有个问题没有弄清楚,就是 ...

  8. 日常小记录json文件(json.load()、json.loads()、json.dump()、json.dumps())

    使用模型预测图片文件时,为了加快速度,早点得到预测结果,可以将预测图片分成多份多开几个命令窗口进行预测. 具体的处理方法是: ①读取需预测图片文件列表, ②把列表分成多份写入多个json文件 ③预测时 ...

  9. rk3588 rkaiq_3A_server 无法解析json文件记录

    1.RKISP Tuner v2.0.6 for RK3588转换出来的json无法被 最新rkaiq_3A_server 版本为v3.0x9.1解析识别 老版本的rkaiq_3A_server是可以 ...

最新文章

  1. 解决问题 inner element must either be a resource reference or empty.
  2. 【数据结构与算法】之单向循环链表的创建/遍历/插⼊/查找/删除算法实现
  3. Linux下开启/关闭防火墙命令
  4. .net get set 初始化_.NET项目升级:可为空引用
  5. 2016.3.16(Java图形用户界面)
  6. 详解CSS position属性
  7. linux入门_Linux从入门到入土(抽奖送10本新书)
  8. sql查询百分之20到百分之40的数据_FOCUS数据管理之ETL监控
  9. cppcheck下载及使用
  10. python-opencv 帧差法目标检测
  11. 问卷星如何设置调查人利用账号登录_端起你的小板凳,快来听我讲制作调查问卷啦!...
  12. 抖音java解析_Java版抖音解析接口
  13. 损失函数(MSE和交叉熵)
  14. java文本域添加滚动条实例_java文本域滚动条
  15. 一次培训机构的面试经历
  16. Elasticsearch集群原理
  17. docker tag 删除images_docker实现重新打tag并删除原tag的镜像
  18. 微软Power Platform平台低代码
  19. Java:输入单个字符
  20. H3CIE机试C套需求

热门文章

  1. 关于0xF0的一些认识
  2. python诞生的时间地点人物事件_教程|计算任意视频中各人物的出镜时间(附Python实现)...
  3. Linux网络管理(上)
  4. PotPlayer不支持S/W HEVC(H.265)解码怎么办
  5. 电子白板的开源项目【whiteboard】
  6. python mayavi_python – mayavi mlab.savefig()给出一个空图像
  7. Django blog项目《二十五》:项目优化《1》使用celery异步任务和定时任务
  8. BUAA Object Oriented Unit 1 Summary
  9. Mysql中间件Atlas读写分离原理与实战
  10. 应用安全系列之一:SQL注入