Hbase几种数据入库(load)方式比较
1. 预先生成HFile入库
这个地址有详细的说明http://blog.csdn.net/dajuezhao/archive/2011/04/26/6365053.aspx
2. 通过MapReduce入库
/* MapReduce 读取hdfs上的文件,以HTable.put(put)的方式在map中完成数据写入,无reduce过程*/
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class HBaseImport extends Configured implements Tool{
static final Log LOG = LogFactory.getLog(HBaseImport.class);
public static final String JOBNAME = "MRImport ";
public static class Map extends Mapper<LongWritable , Text, NullWritable, NullWritable>{
Configuration configuration = null;
HTable xTable = null;
private boolean wal = true;
static long count = 0;
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
super.cleanup(context);
xTable.flushCommits();
xTable.close();
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String all[] = value.toString().split("/t");
If(all.length==2){
put = new Put(Bytes.toBytes(all[0]))); put.add(Bytes.toBytes("xxx"),Bytes.toBytes("20110313"),Bytes.toBytes(all[1]));
}
if (!wal) {
put.setWriteToWAL(false);
}
xTable.put(put);
if ((++count % 100)==0) {
context.setStatus(count +" DOCUMENTS done!");
context.progress();
System.out.println(count +" DOCUMENTS done!");
}
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
super.setup(context);
configuration = context.getConfiguration();
xTable = new HTable(configuration,"testKang");
xTable.setAutoFlush(false);
xTable.setWriteBufferSize(12*1024*1024);
wal = true;
}
}
@Override
public int run(String[] args) throws Exception {
String input = args[0];
Configuration conf = HBaseConfiguration.create(getConf());
conf.set("hbase.master", "m0:60000");
Job job = new Job(conf,JOBNAME);
job.setJarByClass(HBaseImport.class);
job.setMapperClass(Map.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.setInputPaths(job, input);
job.setOutputFormatClass(NullOutputFormat.class);
return job.waitForCompletion(true)?0:1;
}
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
int res = 1;
try {
res = ToolRunner.run(conf, new HBaseImport (), otherArgs);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(res);
}
}
3. 通过Java程序入库
/* Java多线程读取本地磁盘上的文件,以HTable.put(put)的方式完成数据写入*/
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
public class InsertContactJava {
public static long startTime;
public static long rowkey = 0; //起始rowkey
public static final int lineCount = 100000; //每次提交时录入的行数
public static String tableName = "usercontact_kang"; //录入目的表名
public static int countLie = 8; //表的列数
public static void main(String[] args) throws IOException {
startTime = System.currentTimeMillis() / 1000;
System.out.println("start time = " + startTime);
Thread t1 = new Thread() {
@Override
public void run() {
try {
insert_one("/run/jar/123");
//loadByLieWithVector("/run/jar/123");
//loadByLieWithArrayList("/run/jar/123");
} catch (IOException e) {
e.printStackTrace();
}
}
};
t1.start();
}
public static void insert_one(String path) throws IOException {
Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, tableName);
File f = new File(path);
ArrayList<Put> list = new ArrayList<Put>();
BufferedReader br = new BufferedReader(new FileReader(f));
String tmp = br.readLine();
int count = 0;
while (tmp != null) {
if (list.size() > 10000) {
table.put(list);
table.flushCommits();
list.clear();
} else {
String arr_value[] = tmp.toString().split("/t", 10);
String first[] = arr_value[0].split("~", 5);
String second[] = arr_value[1].split("~", 5);
String rowname = getIncreasRowKey();
String firstaccount = first[0];
String firstprotocolid = first[1];
String firstdomain = first[2];
String inserttime = Utils.getToday("yyyyMMdd");
String secondaccount = second[0];
String secondprotocolid = second[1];
String seconddomain = second[2];
String timescount = Integer.valueOf(arr_value[2]).toString();
Put p = new Put(rowname.getBytes());
p.add(("ucvalue").getBytes(), "FIRSTACCOUNT".getBytes(),
firstaccount.getBytes());
p.add(("ucvalue").getBytes(), "FIRSTDOMAIN".getBytes(),
firstdomain.getBytes());
p.add(("ucvalue").getBytes(), "FIRSTPROTOCOLID".getBytes(),
firstprotocolid.getBytes());
p.add(("ucvalue").getBytes(), "INSERTTIME".getBytes(),
inserttime.getBytes());
p.add(("ucvalue").getBytes(), "SECONDACCOUNT".getBytes(),
secondaccount.getBytes());
p.add(("ucvalue").getBytes(), "SECONDDOMAIN".getBytes(),
seconddomain.getBytes());
p.add(("ucvalue").getBytes(), "SECONDPROTOCOLID".getBytes(),
secondprotocolid.getBytes());
p.add(("ucvalue").getBytes(), "TIMESCOUNT".getBytes(),
timescount.getBytes());
list.add(p);
}
tmp = br.readLine();
count++;
}
if (list.size() > 0) {
table.put(list);
table.flushCommits();
}
table.close();
System.out.println("total = " + count);
long endTime = System.currentTimeMillis() / 1000;
long costTime = endTime - startTime;
System.out.println("end time = " + endTime);
System.out.println(path + ": cost time = " + costTime);
}
4. 入库方式比较
Ø 生成HFile方式:
生成HFile的过程比较慢,生成HFile后写入hbase非常快,基本上就是hdfs上的mv过程.对于生成HFile方式入库的时候有一个改进的方案,就是先对数据排序,然后生成HFile。
HFile方式在所有的加载方案里面是最快的,不过有个前提——数据是第一次导入,表是空的。如果表中已经有了数据。HFile再导入到hbase的表中会触发split操作,最慢的时候这种操作会耗时1小时。
Ø MapReduce方式:
开始会很快,但是由于mr和hbase竞争资源,到一个特定的时间点会变很慢
Ø Java程序方式:
多客户端,多线程同时入库,目前看来是最好的方式,client和regionserver分开,硬盘读写分开,瓶颈只在网络和内存上。咨询了一些牛人,大多推荐这种方式,并且一定要多客户端,多线程。关于入库效率的调优,在我另一篇博客中有说明。
Hbase几种数据入库(load)方式比较相关推荐
- Mixup vs. SamplePairing:ICLR2018投稿论文的两种数据增广方式
在碎片化阅读充斥眼球的时代,越来越少的人会去关注每篇论文背后的探索和思考. 在这个栏目里,你会快速 get 每篇精选论文的亮点和痛点,时刻紧跟 AI 前沿成果. 点击本文底部的「阅读原文」即刻加入社区 ...
- 用户画像2种数据存储的方式
目前,越来越多的企业,在大数据应用上,都会选择用户画像这一主题,为什么呢?因为用户画像相对于做推荐以及机器学习等简单容易的多,做画像,更多是就是对用户数据的整合,然后做一些用户聚类.用推荐算法,比如基 ...
- redis迁移至linux,redis几种数据导出导入方式
环境说明: 172.20.0.1 redis源实例 172.20.0.2 redis目标实例 172.20.0.3 任意linux系统 一.redis-dump方式 1.安装redis-dump工具[ ...
- 动态网页常用的两种数据加载方式ajax和js动态请求
欢迎关注"生信修炼手册"! 对于静态网页,我们只需要访问对应的URL就可以获得全部的数据了,动态网页则没有这么简单.比如以下网站 http://q.10jqka.com.cn/zj ...
- 浅谈Entity Framework中的数据加载方式
如果你还没有接触过或者根本不了解什么是Entity Framework,那么请看这里http://www.entityframeworktutorial.net/EntityFramework-Arc ...
- PaddleClas-图像分类中的8种数据增广方法(cutmix, autoaugment,..)
本文主要来源于PaddleClas这个代码仓库中的数据增广文档:https://github.com/PaddlePaddle/PaddleClas/blob/master/docs/zh_CN/ad ...
- 数据中心交换机布线方式 EOR/MOR/TOR
由于数据中心服务器的形态差异(机架式服务器.刀片服务器),POD中服务器机柜和网络机柜的布线方式也存在差异.本文介绍EOR.MOR.TOR和刀片服务器几种数据中心布线方式的特点和差异. 数据中心机房平 ...
- Hbase表两种数据备份方法-导入和导出示例
Hbase表两种数据备份方法-导入和导出示例 本文将提供两种备份方法 -- 1) 基于Hbase提供的类对hbase中某张表进行备份 2) 基于Hbase snapshot数据快速备份方法 场合:由于 ...
- HIVE的安装配置、mysql的安装、hive创建表、创建分区、修改表等内容、hive beeline使用、HIVE的四种数据导入方式、使用Java代码执行hive的sql命令
1.上传tar包 这里我上传的是apache-hive-1.2.1-bin.tar.gz 2.解压 mkdir -p /home/tuzq/software/hive/ tar -zxvf apach ...
最新文章
- 黎曼曲面Riemann Surface
- linux svn 开机启动
- 用Windows Storage Server 2008做iSCSI存储服务器
- 814. Binary Tree Pruning
- Java高并发编程(八):Java并发容器和框架
- Redis主从同步和持久化
- leetcode LCP 19. 秋叶收藏集(dp)
- ddos应急处理_写给十九大安保应急的兄弟们 来看看DDOS攻击应急预案
- 市场份额一般是多少_虹口区武进路疏通坐便器通一次收多少钱
- grpc框架_grpc的入门使用
- 测试工程师,必须掌握的shell变量知识
- 【HNOI2003】【BZOJ1216】操作系统(模拟,优先队列)
- 「代码随想录」322. 零钱兑换 【动态规划】力扣详解!
- 打造属于自己的 linux版(硬盘版或电子盘)view5 终端
- 批处理路径中含有空格的处理办法
- 十一新疆之旅中邂逅的一首诗《黄河,母亲之河》
- 自2018年2月28日起 iCloud 由云上贵州运营
- 商业汇票的背书、贴现与质押
- 微信小程序 向下跳动箭头
- 电脑系统重装篇7:使用GHO镜像文件安装系统(OneKey)