最近着手处理大批量数据的任务。

现状是这样的,一个数据采集程序承载大批量数据的存储和检索。后期可能需要对大批量数据进行统计。

数据分布情况

13个点定时生成采集结果到4个文件(小文件生成周期是5分钟)

名称                                                 大小(b)
gather_1_2014-02-27-14-50-0.txt                      568497
gather_1_2014-02-27-14-50-1.txt                      568665
gather_1_2014-02-27-14-50-2.txt                      568172
gather_1_2014-02-27-14-50-3.txt                      568275

同步使用shell脚本对四个文件入到sybase_iq库的一张表tab_tmp_2014_2_27中.

每天数据量大概是3亿条,所以小文件的总量大概是3G。小文件数量大,单表容量大执行复合主键查询,由原来2s延时变成了,5~10分钟。

针对上述情况需要对目前的储存结构进行优化。

才是看了下相关系统 catior使用的是环状数据库,存储相关的数据优点方便生成MRTG图,缺点不利于数据统计。后来引入elasticsearch来对大数据检索进行优化。

测试平台

cpu: AMD Opteron(tm) Processor 6136 64bit 2.4GHz   * 32
内存: 64G
硬盘:1.5T
操作系统:Red Hat Enterprise Linux Server release 6.4 (Santiago)

读取文件的目录结构:

[test@test001 data]$ ls
0  1  2  3

简单测试代码:

public class FileReader
{private File file;private String splitCharactor;private Map<String, Class<?>> colNames;private static final Logger LOG = Logger.getLogger(FileReader.class);/*** @param path*            文件路径* @param fileName*            文件名* @param splitCharactor*            拆分字符* @param colNames*            主键名称*/public FileReader(File file, String splitCharactor, Map<String, Class<?>> colNames){this.file = file;this.splitCharactor = splitCharactor;this.colNames = colNames;}/*** 读取文件* * @return* @throws Exception*/public List<Map<String, Object>> readFile() throws Exception{List<Map<String, Object>> list = new ArrayList<Map<String, Object>>();if (!file.isFile()){throw new Exception("File not exists." + file.getName());}LineIterator lineIterator = null;try{lineIterator = FileUtils.lineIterator(file, "UTF-8");while (lineIterator.hasNext()){String line = lineIterator.next();String[] values = line.split(splitCharactor);if (colNames.size() != values.length){continue;}Map<String, Object> map = new HashMap<String, Object>();Iterator<Entry<String, Class<?>>> iterator = colNames.entrySet().iterator();int count = 0;while (iterator.hasNext()){Entry<String, Class<?>> entry = iterator.next();Object value = values[count];if (!String.class.equals(entry.getValue())){value = entry.getValue().getMethod("valueOf", String.class).invoke(null, value);}map.put(entry.getKey(), value);count++;}list.add(map);}}catch (IOException e){LOG.error("File reading line error." + e.toString(), e);}finally{LineIterator.closeQuietly(lineIterator);}return list;}
}
public class StreamIntoEs
{public static class ChildThread extends Thread{int number;public ChildThread(int number){this.number = number;}@Overridepublic void run(){Settings settings = ImmutableSettings.settingsBuilder().put("client.transport.sniff", true).put("client.transport.ping_timeout", 100).put("cluster.name", "elasticsearch").build();TransportClient client = new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress("192.168.32.228",9300));File dir = new File("/export/home/es/data/" + number);LinkedHashMap<String, Class<?>> colNames = new LinkedHashMap<String, Class<?>>();colNames.put("aa", Long.class);colNames.put("bb", String.class);colNames.put("cc", String.class);colNames.put("dd", Integer.class);colNames.put("ee", Long.class);colNames.put("ff", Long.class);colNames.put("hh", Long.class);int count = 0;long startTime = System.currentTimeMillis();for (File file : dir.listFiles()){int currentCount = 0;long startCurrentTime = System.currentTimeMillis();FileReader reader = new FileReader(file, "\\$", colNames);BulkResponse resp = null;<strong>BulkRequestBuilder bulkRequest = client.prepareBulk();</strong>try{List<Map<String, Object>> results = reader.readFile();for (Map<String, Object> col : results){bulkRequest.add(client.prepareIndex("flux", "fluxdata").setSource(JSON.toJSONString(col)).setId(col.get("getway")+"##"+col.get("port_info")+"##"+col.get("device_id")+"##"+col.get("collecttime")));count++;currentCount++;}resp = bulkRequest.execute().actionGet();}catch (Exception e){// TODO Auto-generated catch blocke.printStackTrace();}long endCurrentTime = System.currentTimeMillis();System.out.println("[thread-" + number + "-]per count:" + currentCount);System.out.println("[thread-" + number + "-]per time:"+ (endCurrentTime - startCurrentTime));System.out.println("[thread-" + number + "-]per count/s:"+ (float) currentCount / (endCurrentTime - startCurrentTime)* 1000);System.out.println("[thread-" + number + "-]per count/s:"+ resp.toString());}long endTime = System.currentTimeMillis();System.out.println("[thread-" + number + "-]total count:" + count);System.out.println("[thread-" + number + "-]total time:"+ (endTime - startTime));System.out.println("[thread-" + number + "-]total count/s:" + (float) count/ (endTime - startTime) * 1000);// IndexRequest request =// = client.index(request);}}public static void main(String args[]){for (int i = 0; i < 4; i++){ChildThread childThread = new ChildThread(i);childThread.start();}}
}

起了4个线程来做入库,每个文件解析完成进行一次批处理。

初始化脚本:

curl -XDELETE 'http://192.168.32.228:9200/twitter/'
curl -XPUT 'http://192.168.32.228:9200/twitter/' -d '
{"index" :{"number_of_shards" : 5,"number_of_replicas ": 0,<strong>"index.refresh_interval": "-1","index.translog.flush_threshold_ops": "100000"</strong>}
}'
curl -XPUT 'http://192.168.32.228:9200/twiter/twiterdata/_mapping' -d '
{"<span style="font-size: 1em; line-height: 1.5;">twiterdata</span><span style="font-size: 1em; line-height: 1.5;">": {</span>"aa" : {"type" : "long", "index" : "not_analyzed"},"bb" : {"type" : "String", "index" : "not_analyzed"},"cc" : {"type" : "String", "index" : "not_analyzed"},"dd" : {"type" : "integer", "index" : "not_analyzed"},"ee" : {"type" : "long", "index" : "no"},"ff" : {"type" : "long", "index" : "no"},"gg" : {"type" : "long", "index" : "no"},"hh" : {"type" : "long", "index" : "no"},"ii" : {"type" : "long", "index" : "no"},"jj" : {"type" : "long", "index" : "no"},"kk" : {"type" : "long", "index" : "no"},}
}

执行效率参考:

不开启refresh_interval
[test@test001 bin]$ more StreamIntoEs.out|grep total
[thread-2-]total count:1199411
[thread-2-]total time:1223718
[thread-2-]total count/s:980.1368
[thread-1-]total count:1447214
[thread-1-]total time:1393528
[thread-1-]total count/s:1038.5253
[thread-0-]total count:1508043
[thread-0-]total time:1430167
[thread-0-]total count/s:1054.4524
[thread-3-]total count:1650576
[thread-3-]total time:1471103
[thread-3-]total count/s:1121.9989
4195.1134开启refresh_interval
[test@test001 bin]$ more StreamIntoEs.out |grep total
[thread-2-]total count:1199411
[thread-2-]total time:996111
[thread-2-]total count/s:1204.0938
[thread-1-]total count:1447214
[thread-1-]total time:1163207
[thread-1-]total count/s:1244.1586
[thread-0-]total count:1508043
[thread-0-]total time:1202682
[thread-0-]total count/s:1253.9
[thread-3-]total count:1650576
[thread-3-]total time:1236239
[thread-3-]total count/s:1335.1593
5037.3117开启refresh_interval  字段类型转换
[test@test001 bin]$ more StreamIntoEs.out |grep total
[thread-2-]total count:1199411
[thread-2-]total time:1065229
[thread-2-]total count/s:1125.9653
[thread-1-]total count:1447214
[thread-1-]total time:1218342
[thread-1-]total count/s:1187.8552
[thread-0-]total count:1508043
[thread-0-]total time:1230474
[thread-0-]total count/s:1225.5789
[thread-3-]total count:1650576
[thread-3-]total time:1274027
[thread-3-]total count/s:1295.5581
4834.9575开启refresh_interval  字段类型转换 设置id
[thread-2-]total count:1199411
[thread-2-]total time:912251
[thread-2-]total count/s:1314.7817
[thread-1-]total count:1447214
[thread-1-]total time:1067117
[thread-1-]total count/s:1356.1906
[thread-0-]total count:1508043
[thread-0-]total time:1090577
[thread-0-]total count/s:1382.7937
[thread-3-]total count:1650576
[thread-3-]total time:1128490
[thread-3-]total count/s:1462.6412
5516.4072

580M的数据平均用时大概是20分钟。索引文件大约为1.76G

相关测试结果可以参考这里:

elasticsearch 性能测试

转载于:https://www.cnblogs.com/new0801/p/6175978.html

ElasticSearch大批量数据入库相关推荐

  1. java之CSV大批量数据入库

    CSV数据的入库 需求 前期准备 环境 代码展示 pom文件 关键代码及思路 多线程处理数据,否则8k万数据太慢了 获取文件数据 根据文件名创建相应数据库 关键的SQL语句 saveOrUpdateB ...

  2. 使用spoon(kettle)工具抽取Elasticsearch的数据并入库

    1.需求:使用 spoon工具抽取Elasticsearch的数据,并且入库,为此自己本地搭建了一套spoon,请求Elasticsearch抽取数据并入库: 2.本次搭建的的资源请见我百度网盘分享地 ...

  3. elasticsearch 大数据场景下使用scroll实现分页查询

    es查询大批量数据的"可能方案" 当使用es来请求大批量数据时,通常有三种办法,其一:直接查询获取全量数据:其二:使用setFrom以及setSize解决:其三:使用es自带的sc ...

  4. 每日生产万亿消息数据入库,腾讯如何突破大数据分析架构瓶颈

    背景介绍 对于腾讯庞大的大数据分析业务,几千台的 Hadoop 集群,近百 P 级的存储总量,每日产生万亿的消息数据入库,需要针对几十亿 IMEI 手机设备去重,并关联数千亿的历史全表,进行曝光.点击 ...

  5. python将ElasticSearch索引数据读入pandas dataframe实战

    python将ElasticSearch索引数据读入pandas dataframe实战 # 导入基础包和库 import pandas as pdpd.set_option('display.max ...

  6. python连接elasticsearch查询数据

    python连接elasticsearch获取数据 原文:https://blog.csdn.net/ziqiaowang/article/details/54972279 # -*- encodin ...

  7. 基于Elasticsearch的数据报表方案

    文  | 闵令超 网易智企高级应用开发工程师 前言 数据报表分析对于企业管理者的分析决策有着至关重要的作用,因此数据报表的灵活可用以及数据的准确性显得至关重要.本文会介绍基于 Elasticsearc ...

  8. 19_clickhouse,数据查询与写入优化,分布式子查询优化,外部聚合/排序优化,基于JOIN引擎的优化,SQL优化案例,物化视图提速,查询优化常用经验法则,选择和主键不一样的排序键,数据入库优化

    25.数据查询与写入优化 25.1.分布式子查询优化 25.1.1.分布式表的IN查询示例1(普通IN子查询.IN子查询为本地表) 25.1.2.分布式表的IN查询示例2(普通IN子查询.IN子查询为 ...

  9. DELETE大批量数据的性能优化

    http://litterbaby.itpub.net/post/16841/276327 DELETE大批量数据的性能优化 问题的提出: 一个表有上千万的数据,欲从该表中删除部分数据: 在线用的生产 ...

最新文章

  1. 一文读懂PID控制算法
  2. MIPS中的异常处理和系统调用【转】
  3. SQLServer存储过程
  4. python资料书-《Python数据分析与应用》——图书配套资料下载
  5. 5天玩转C#并行和多线程编程 —— 第五天 多线程编程大总结
  6. 【概念信息】成本中心计划
  7. RHEL5.4在线调整磁盘分区大小
  8. redis 查询缓存_Redis缓存总结:淘汰机制、缓存雪崩、数据不一致....
  9. office文档 在线预览 (doc、ppt、xls)
  10. 【比赛】智源计算所-互联网虚假新闻检测挑战赛(冠军)方案分享,代码已开源...
  11. JAVA版村庄哨塔种子_我的世界:top16种子,出生5村庄、地狱堡垒、2哨塔和一堆遗迹...
  12. win10子系统安装php,win10 ubuntu 子系统安装php
  13. 北京环球度假区宣布首批21家旅游渠道官方授权合作伙伴
  14. 机器学习中激活函数的作用
  15. w10系统桌面的计算机找不到,w10桌面我的电脑图标不见了怎么办
  16. 数据结构与算法--线性表
  17. js 判断对象数组是否存在某一个对象(全)
  18. 网站开发之HTML基础知识及超链接(二)
  19. Excel 和Word 常用小技巧
  20. 精品收藏:GitHub人工智能AI开源项目

热门文章

  1. 校招c语言面试题目及答案,C/C++学习之路(一)校招后端面试题及答案(作者回忆版)...
  2. java 物理地址_java中的==与equals的区别是什么,什么是物理地址?
  3. sql 拼接int类型的字段_SQL 基础教程—第一章:4. 表的创建
  4. java语言的多态性及特点_Java中的方法的多态性有两种形式:( )和( )。_学小易找答案...
  5. java 基本的数据类型_Java的基本数据类型介绍
  6. .net 导出excel_java导出excel(easypoi)
  7. mciSendString 的两个小坑
  8. 基于 Express 与 Mongodb 的简易 实现一个多人 blog
  9. 使用dwz框架搭建网站后台
  10. dispatchTouchEvent onInterceptTouchEvent onTouchEvent