HBase数据导入

1. 背景

  1. 在实际生产中,海量数据一般都不是直接存储在HBase中,这时候就需要一个数据导入到HBase的步骤
  2. 上一篇博客讲述了可以通过java api的方式或者shell 客户端方式导入或者创建数据,但这对于实际生产中海量数据导入来说,速度和效率都太慢了,所以我们需要使用其他方式来解决海量输入导入到HBase的问题
  3. 利用HBase底层文件是HFile形式存储再HDFS中,所以如果能够直接生成HFile的话,这时候再让HBase从HFile中读取数据,就会快很多。

2. 批量数据导入方式

  1. shell 脚本命令,使用工具将HFile直接导入到HBase中
  2. 编写mapreduce程序,生成Hfile。然后再使用上shell方式,将HFile文件导入到HBase中
  3. 编写mapreduce程序,直接将文件中数据写入到Hbase中
  4. 将数据从HBase中一个地方(namespace、table)导入到HBase另外一个地方(namespace、table)

3.数据导入步骤

3.1 shell 脚本命令,使用工具将HFile直接导入到HBase中

3.1.1 将csv文件转为HFile

  1. 数据准备(数据形式需要是csv格式的数据)

    在windows或者linux节点服务器上准备好,然后通过hdfs dfs -put指令上传到hdfs文件系统中。这里选择在linux系统上创建,可以省去在windows上创建后还要再从windows上传到linux节点服务器的步骤
    文件名这里是:bulkload.csv
1,lenovo,black,3000,intel
2,lenovo,black,43000,intel
3,xiaomi,black,3500,amd
4,lenovo,black,3000,amd
5,huawei,black,3000,amd
6,xiaomi,black,3000,arm
7,xiaomi,black,3000,arm
8,lenovo,black,6000,arm
9,huawei,black,5000,intel
10,huawei,black,3000,intel
11,huawei,black,7000,intel
12,huawei,black,3000,intel
13,hp,black,3000,amd
14,dell,black,3000,amd
15,huawei,black,3000,amd
16,apple,silver,544000,arm
17,mechanic,black,3000,arm
18,mechnic,black,3000,amd
19,hasee,black,12000,amd
20,hasee,black,23000,amd
21,hp,black,3000,intel
22,acer,black,3000,intel
23,acer,black,33000,intel
24,dell,black,30040,intel
25,dell,black,3000,intel
26,huawei,white,3000,amd
27,founder,blue,30060,intel
28,huawei,pink,3000,intel
29,huawei,black,30300,intel
30,huawei,black,3000,arm
31,huawei,black,33000,arm
32,lenovo,black,30200,arm
33,huawei,black,33000,intel
34,lenovo,black,3000,intel
35,huawei,black,30500,intel
36,lenovo,black,3000,intel
  1. 在hdfs上创建输入文件的文件夹
hdfs dfs -mkdir -p /csv/input
  1. 将linux节点服务器上的csv文件上传到hdfs的输入文件文件夹中
hdfs dfs -put bulkload.csv /csv/input-- 上传之后,输入以下shell命令查看文件
hdfs dfs -ls /csv/input

文件上传后,查看上传结果

4. 在hdfs集群节点服务器上执行以下shell命令

  • 指定的shell命令
hbase  org.apache.hadoop.hbase.mapreduce.ImportTsv \
-Dimporttsv.separator=, \
-Dimporttsv.columns='HBASE_ROW_KEY,cf1:brand,cf1:color,cf1:price,cf2:cpu_brand'  \
-Dimporttsv.bulk.output=/csv/output \
doit:tb_computer_info \
/csv/input# org.apache.hadoop.hbase.mapreduce.ImportTsv这个是进行导入数据的工具类的类型,类似于javca执行一个jar包时,需要指定一个main方法的类的全类名
# Dimporttsv.separator=, 这里是csv文件的分割符号,csv文件按照百度百科标准,一般都是以逗号进行分割,但实际可以使用其他分隔符号
#  Dimporttsv.columns='HBASE_ROW_KEY,cf1:brand,cf1:color,cf1:price,cf2:cpu_brand' 这里就是将csv文件中一行数据的每个字段对应在hbase的表中的字段进行指定,使用逗号隔开,例如第一个就是HBASE_ROW_KEY,第二个就是cf1:brand(就是列族cf1下的brand字段)
# doit:tb_computer_info这是hbase中的哪个表,如果需要指定namespace,前面加上namespace名再加上冒号。不指定就默认是default这个namespace中的表。注意这个表需在执行shell命令前建立好
# /csv/input这个是输入数据源路径
# -Dimporttsv.bulk.output=/csv/output  这个是输出数据源路径
# \ 反斜杠是因为shell命令一行太长,使用反斜杠进行分割 
  • Dimporttsv的参数说明

-Dimporttsv.skip.bad.lines=false - 若遇到无效行则失败
-Dimporttsv.separator=, - 使用特定分隔符,默认是tab也就是\t
-Dimporttsv.timestamp=currentTimeAsLong - 使用导入时的时间戳
-Dimporttsv.mapper.class=my.Mapper - 使用用户自定义Mapper类替换TsvImporterMapper
-Dmapreduce.job.name=jobName - 对导入使用特定mapreduce作业名
-Dcreate.table=no - 避免创建表,注:如设为为no,目标表必须存在于HBase中
-Dno.strict=true - 忽略HBase表列族检查。默认为false
-Dimporttsv.bulk.output=/user/yarn/output 作业的输出目录

  • 输入指令后,开始执行,按下enter键执行
  • 执行过程日志,可以看出其实就是执行了一个mapreduce的程序



  • 查看输出的文件
    注意这里按照列族数量,生成了2个文件夹,和hbase存储在hdfs中的文件规则是一致的。

    注意点击进去cf1文件夹查看,可以看到有一个文件,这就是生成的hfile文件

3.1.2 将HFile导入到HBase中

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /csv/output  doit:tb_computer_info
  • 输入指令后,可以看到如下日志,可以看出这时候就是HBase内部的程序,而不是mapreduce
  • 查看hbase中对应表格中数据

3.2 使用mapreduce方式导入hbase

3.2.1 环境准备

  1. pom文件
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties>
<dependencies>
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.6</version>
</dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-auth</artifactId><version>3.2.1</version>
</dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>compile</scope>
</dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.2.5</version>
</dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.2.1</version>
</dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.2.1</version>
</dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>2.2.5</version>
</dependency><!-- 使用mr程序操作hbase 数据的导入 -->
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-mapreduce</artifactId><version>2.2.5</version>
</dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.5</version>
</dependency><!-- phoenix 凤凰 用来整合Hbase的工具 -->
<dependency><groupId>org.apache.phoenix</groupId><artifactId>phoenix-core</artifactId><version>5.0.0-HBase-2.0</version>
</dependency></dependencies><build>
<plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><!-- bind to the packaging phase --><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>
  1. 集群环境准备
  • hdfs集群 hdfs
  • zookeeper集群 zk
  • hbase集群 hbase
    注意,需要都启动,启动顺序,先启动hdfs,然后是zookeeper,然后是hbase
  1. windows10,安装好 idea 2020版本和jdk 1.8 JDK8
  2. maven环境 maven
  3. 数据准备, 这里手动制造一些json数据,一行一行存放。
{"movie":"2294","rate":"4","timeStamp":"978824291","uid":"1"}
{"movie":"3186","rate":"4","timeStamp":"978300019","uid":"1"}
{"movie":"1566","rate":"a","timeStamp":"978824330","uid":"1"}
{"movie":"588","rate":"4","timeStamp":"978824268","uid":"1"}
{"movie":"1907","rate":"4","timeStamp":"978824330","uid":"1"}
{"movie":"783","rate":"4","timeStamp":"978824291","uid":"1"}
{"movie":"1836","rate":"5","timeStamp":"978300172","uid":"1"}
{"movie":"1022","rate":"5","timeStamp":"978300055","uid":"1"}
{"movie":"2762","rate":"4","timeStamp":"978302091","uid":"1"}
{"movie":"150","rate":"5","timeStamp":"978301777","uid":"1"}

3.2.2 数据导入思路

  1. 从外部导入数据到HBase中,由于HBase本身不支持多维度的数据查询,所以需要进行HBase表数据字段和结构设计
  2. 外部数据可能有错误或者缺失信息,需要考虑如何处理(数据预处理或者在mapreduce中处理都可以)
  3. rowkey设计
  • 这里假定业务场景是根据movie id进行数据查询,所以rowkey的设计需要把movie id放进去,
  • 同时movie id长度不一,这时候采取补位处理,让所有movie id的长度一致。考虑到不损失其原本含义,采用前面补0,否则后面补0会损失id的原本含义。
  • 看数据,如果采用movie id作为rowkey,则数据会发生覆盖,这时候如果把rate、时间戳、uid作为后缀加进去。uid和rate会有重复,但时间戳没有,所以选择时间戳作为后缀来确保rowkey的唯一性。这样一条数据对应hbase中一条数据,不会发生数据覆盖问题。
  1. 数据分区考虑,注意这里的数据(手动造了100万条左右),数据量比较大,为了合理存放这些数据,也就是相对均匀划分到不同region server上,同时确保相关数据尽可能在一起。可以考虑在设计好的rowkey基础上,看是否做分区处理。如果需要,可以执行SPLITS设置。本文这里暂不演示

3.2.3 代码实现

  1. 因为数据是json格式,所以需要有一个java bean存放数据。这个java bean会参与mapreduce过程,所以需要遵守writable协议
  2. map过程就是把一行一行文本转换为一个一个的java bean,注意这里的rowkey需要
  3. reduce就是把map阶段产生的key、value进行输出,写入到hdfs的文件中或者直接写入hbase中
  • mapper
class HBaseDataImportMapper extends Mapper<LongWritable, Text, Text, MovieBean> {// 数据解析使用GSONGson gson = new Gson();Text rowkeyText = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {try{// 读取一行一行的数据String line = value.toString();// 使用GSON进行解析MovieBean movieBean = gson.fromJson(line, MovieBean.class);// 拼接处理rowkeyString movie = movieBean.getMovie();String timeStamp = movieBean.getTimeStamp();String rowkey = StringUtils.leftPad(movie, 5, '0') + "_" + timeStamp;// 这里复用Text对象rowkeyText.set(rowkey);// 输出context.write(rowkeyText, movieBean);}catch (Exception e) {e.printStackTrace();}}
}
  • reducer
class HBaseDataImportReducer extends TableReducer<Text, MovieBean, ImmutableBytesWritable> {@Overrideprotected void reduce(Text key, Iterable<MovieBean> values, Context context) throws IOException, InterruptedException {try {String rowkey = key.toString();for (MovieBean value : values) {// 获取java bean中的属性String movie = value.getMovie();double rate = value.getRate();String timeStamp = value.getTimeStamp();String uid = value.getUid();// 构建一个HBase数据操作的Put对象// 传入rowkey,这是一行的rowkeyPut put = new Put(Bytes.toBytes(rowkey));// 设置列族、字段等信息put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("movie"), Bytes.toBytes(movie));put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("rate"), Bytes.toBytes(rate));put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("timeStamp"), Bytes.toBytes(timeStamp));put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("uid"), Bytes.toBytes(uid));// 输出context.write(null, put);}}catch (Exception e) {e.printStackTrace();}}
}
  • driver
public static void main(String[] args) {// 获取HBaseConfiguration对象,这个本身就是继承自hadoop的Configuration 类Configuration conf = HBaseConfiguration.create();// 设置hbase的zookeeper入口(hbase的元数据服务器节点位置等信息是保存在zookeeper上的)conf.set("hbase.zookeeper.quorum", "linux100:2181,linux101:2181,linux102:2181");try {// 创建一个job对象,代表一个完整的mapreduce程序Job job = Job.getInstance(conf);// 设置mapper类job.setMapperClass(HBaseDataImportMapper.class);// 设置map阶段输出的key value类job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(MovieBean.class);// 设置输入数据源FileInputFormat.setInputPaths(job, new Path("E:\\movie\\input"));// 这里设置reducer的类、需要导出的hbase表名,"doit:movie"中doit是namespace,movie是表名TableMapReduceUtil.initTableReducerJob("doit:movie", HBaseDataImportReducer.class, job);boolean b = job.waitForCompletion(true);if(b) {System.out.println("成功了");} else {System.out.println("失败了");}} catch (Exception e) {e.printStackTrace();}}

  • java bean
public class MovieBean implements Writable {// {"movie":"2294","rate":"4","timeStamp":"978824291","uid":"1"}private String movie;private double rate;private String timeStamp;private String uid;@Overridepublic String toString() {return "MovieBean{" +"movie='" + movie + '\'' +", rate=" + rate +", timeStamp='" + timeStamp + '\'' +", uid='" + uid + '\'' +'}';}public String getMovie() {return movie;}public void setMovie(String movie) {this.movie = movie;}public double getRate() {return rate;}public void setRate(double rate) {this.rate = rate;}public String getTimeStamp() {return timeStamp;}public void setTimeStamp(String timeStamp) {this.timeStamp = timeStamp;}public String getUid() {return uid;}public void setUid(String uid) {this.uid = uid;}/*** private String movie;*     private double rate;*     private String timeStamp;*     private String uid;**     注意这里的读写字段的顺序需要一致* */@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(movie);out.writeDouble(rate);out.writeUTF(timeStamp);out.writeUTF(uid);}@Overridepublic void readFields(DataInput in) throws IOException {movie = in.readUTF();rate = in.readDouble();timeStamp = in.readUTF();uid = in.readUTF();}
}

导入后,去hbase查看,进入hbase shell ,使用count指令查看。百万条数据

3.3从hbase中将数据导入另一个hbase表格中

3.1环境准备

  1. 准备好hbase中对应的表user,列族是f,字段是name和gender
  2. 准备好一张空表user2,列族是f

3.2 java代码

  1. driver类
public class HBaseTransferFromHBase {public static void main(String[] args) {// 获取整合的初始化对象Configuration conf = HBaseConfiguration.create();// 连接zookeeper集群的的位置,给多个集群节点地址,这样就算一个无法连接,还可以连接其他zookeeper集群的节点conf.set("hbase.zookeeper.quorum", "linux100:2181,linux101:2181,linux102:2181");// 获取job对象Job job = null;try {job = Job.getInstance(conf);// 创建扫描对象用来扫描源hbase中的所有的数据Scan scan = new Scan();// 接收的扫描的数据的行数scan.setCaching(200);scan.setCacheBlocks(false);job.setJarByClass(HBaseTransferFromHBase.class);// 初始化  源表,这里没有写namespace,默认就是defaul表格中TableMapReduceUtil.initTableMapperJob("user", scan, HBaseTransferFromHBaseMapper.class, Text.class, Text.class, job);// 插入数据的表要存在TableMapReduceUtil.initTableReducerJob("user2", HBaseTransferFromHBaseReducer.class, job);boolean b = job.waitForCompletion(true);if (b){System.out.println("ok");} else {System.out.println("not ok");}} catch (Exception e) {e.printStackTrace();}}
}
  1. mapper类
class HBaseTransferFromHBaseMapper extends TableMapper<Text, Text> {// 参数一是 rowkey 参数二是结果  参数三是输出的key  参数四是 输出的value@Overrideprotected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {// 获取字符串的 rowkeyString k = new Text(new String(key.copyBytes())).toString();// 获取指定的属性的值String name = Bytes.toString(value.getValue("f".getBytes(), "name".getBytes()));String gender = Bytes.toString(value.getValue("f".getBytes(), "gender".getBytes()));System.out.println(k + "  " + name);// 以行键为key 以多个属性组装的结果为value传递 到reduce中context.write(new Text(k), new Text(name + ":" + gender));}
}
  1. reducer类
class HBaseTransferFromHBaseReducer extends TableReducer<Text, Text, ImmutableBytesWritable> {@Overrideprotected void reduce(Text key, Iterable<Text> iters,Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context)throws IOException, InterruptedException {//创建put对象Put put = new Put(key.getBytes());// 获取接收的map的value值Text next = iters.iterator().next();// 将value转换成字符串String v = next.toString();//处理字符串获取 各个属性的值String[] split = v.split(":");String name = split[0];String gender = split[1];// 将各个属性的值添加到对应的列中put.addColumn("f".getBytes(), "name".getBytes(), Bytes.toBytes(name));put.addColumn("f".getBytes(), "gender".getBytes(), Bytes.toBytes(gender));// 将put对象写出去context.write(null, put);}
}
  1. 运行结束后,如下图所示,hbase的shell客户端中执行scan 扫描如下,确实完整导入了。如果需要更多限制,可以限制指定范围内的row才导入,这里就不做演示。可以自行尝试

HBase数据大批量导入方式总结和对比相关推荐

  1. 【redis】三种redis数据导出导入方式

    文章目录 1.概述 一.redis-dump方式 二.aof方式导入 三.rdb文件迁移方式 1.概述 转载:三种redis数据导出导入方式 一.redis-dump方式 redis-dump安装 y ...

  2. mysql 导入 rdb_几种redis数据导出导入方式

    几种redis数据导出导入方式 1 环境说明: 192.168.1.101 node1 redis源实例 192.168.1.102 node2 redis目标实例 192.168.1.103 nod ...

  3. 【Hbase】HBase数据快速导入之ImportTsv

    1.在前面的博客中使用程序导入数据,但是当数据量太大了,会非常的慢,因为他是一行一行读取的,然后put上去的,我尝试put2亿条数据结果用了一天 2.现在要用改进板的,使用hbase自带的工具Impo ...

  4. redis迁移至linux,redis几种数据导出导入方式

    环境说明: 172.20.0.1 redis源实例 172.20.0.2 redis目标实例 172.20.0.3 任意linux系统 一.redis-dump方式 1.安装redis-dump工具[ ...

  5. HBase数据快速导入之ImportTsvBulkload

    2019独角兽企业重金招聘Python工程师标准>>> 导入数据最快的方式,可以略过WAL直接生产底层HFile文件 (环境:centos6.5.Hadoop2.6.0.HBase0 ...

  6. hbase 数据的导入导出

    2019独角兽企业重金招聘Python工程师标准>>> 导出 hbase org.apache.hadoop.hbase.mapreduce.Driver export 表名 导出存 ...

  7. TP框架 数据大批量导入数据库

    2秒钟 一次性添加十万条数据 public function addQrcode(){$arr = [];for($i=0;$i<100000;$i++){$data = ['goodsid'= ...

  8. matlab将图片导入工作区,matlab数据的导入和导出,以matlab工作区workspace为source和destination...

    MATLAB支持工作区的保存.用户可以将工作区或工作区中的变量以文件的形式保存,以备在需要时再次导入. 保存工作区可以通过菜单进行,也可以通过命令窗口进行. 数据导出 1. 保存整个工作区 选择Fil ...

  9. HBase 数据导入功能实现方式解释

    https://www.ibm.com/developerworks/cn/opensource/os-cn-data-import/index.html 预备知识:启动 HBase 清单 1. 修改 ...

最新文章

  1. 手机 x PC 交叉感染?360 安全研究员演示“混血攻击”
  2. python运行系统找不到指定文件_“系统无法找到指定的文件”当调用Python中的subprocess.Popen...
  3. 文件上传 upload-labs 1~20做题记录
  4. LFS安装ifconfig命令
  5. 10.static_extern
  6. LINQ - 對付 SQL Injection 的 免費補洞策略 (转)
  7. 使用Eclipse创建maven项目
  8. WEB安全基础-SQL注入演示
  9. plsql 存储过程 批量提交_Spring Batch 批量处理策略
  10. 华为HCNE专题一:网络基础知识
  11. JAVA多线程-CountDownLatch计数器
  12. 闭包会造成内存泄漏吗?
  13. [蓝桥杯]试题 基础练习 完美的代价
  14. 专利与论文-4:专利申请流程与生命周期及费用
  15. [IMX6Q][Android4.4] Audio添加控制MIC左右声道接口
  16. apt-get: relocation error:/libapt-private.so.0.0 version APTPKG_5.0 not defined in file libapt-pkg
  17. jvm 内存查看与分析工具
  18. 用Discuz 搭建个人论坛
  19. 帮百度AI干脏活累活的公司,都死了
  20. Linux安装使用及命令大全

热门文章

  1. 类的继承层次结构的宽度和深度
  2. Day82_ELK(一)
  3. c语言 return0作用
  4. mac mini u盘安装系统_用PE系统U盘启动盘安装操作系统
  5. 基于数据结构和C语言实现公交管理系统(含文档和代码)数据结构课程设计
  6. linux txt file busy,linux使用cp报错 Text file busy
  7. CDH 配置CM Server的数据库时错误
  8. 时间格式的转换 例如:(2021-05-10 14:20:43) 转为( 2021年5月10日 14时20分43秒)
  9. 盛大Everbox邀请码[2011-01-20]
  10. LevelDB源码解析(1) Arena内存分配器