编写MR程序,让其可以适合大部分的HBase表数据导入到HBase表数据。其中包括可以设置版本数、可以设置输入表的列导入设置(选取其中某几列)、可以设置输出表的列导出设置(选取其中某几列)。

原始表test1数据如下:

每个row key都有两个版本的数据,这里只显示了row key为1的数据

在hbase shell 中创建数据表:

create 'test2',{NAME => 'cf1',VERSIONS => 10} // 保存无版本、无列导入设置、无列导出设置的数据

create 'test3',{NAME => 'cf1',VERSIONS => 10} // 保存无版本、无列导入设置、有列导出设置的数据

create 'test4',{NAME => 'cf1',VERSIONS => 10} // 保存无版本、有列导入设置、无列导出设置的数据

create 'test5',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、无列导入设置、无列导出设置的数据

create 'test6',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、无列导入设置、有列导出设置的数据

create 'test7',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、有列导入设置、无列导出设置的数据

create 'test8',{NAME => 'cf1',VERSIONS => 10} // 保存有版本、有列导入设置、有列导出设置的数据

main函数入口:

package GeneralHBaseToHBase;

import org.apache.hadoop.util.ToolRunner;

public class DriverTest {

public static void main(String[] args) throws Exception {

// 无版本设置、无列导入设置,无列导出设置

String[] myArgs1= new String[]{

"test1", // 输入表

"test2", // 输出表

"0", // 版本大小数,如果值为0,则为默认从输入表导出最新的数据到输出表

"-1", // 列导入设置,如果为-1 ,则没有设置列导入

"-1" // 列导出设置,如果为-1,则没有设置列导出

};

ToolRunner.run(HBaseDriver.getConfiguration(),

new HBaseDriver(),

myArgs1);

// 无版本设置、有列导入设置,无列导出设置

String[] myArgs2= new String[]{

"test1",

"test3",

"0",

"cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",

"-1"

};

ToolRunner.run(HBaseDriver.getConfiguration(),

new HBaseDriver(),

myArgs2);

// 无版本设置,无列导入设置,有列导出设置

String[] myArgs3= new String[]{

"test1",

"test4",

"0",

"-1",

"cf1:c1,cf1:c10,cf1:c14"

};

ToolRunner.run(HBaseDriver.getConfiguration(),

new HBaseDriver(),

myArgs3);

// 有版本设置,无列导入设置,无列导出设置

String[] myArgs4= new String[]{

"test1",

"test5",

"2",

"-1",

"-1"

};

ToolRunner.run(HBaseDriver.getConfiguration(),

new HBaseDriver(),

myArgs4);

// 有版本设置、有列导入设置,无列导出设置

String[] myArgs5= new String[]{

"test1",

"test6",

"2",

"cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",

"-1"

};

ToolRunner.run(HBaseDriver.getConfiguration(),

new HBaseDriver(),

myArgs5);

// 有版本设置、无列导入设置,有列导出设置

String[] myArgs6= new String[]{

"test1",

"test7",

"2",

"-1",

"cf1:c1,cf1:c10,cf1:c14"

};

ToolRunner.run(HBaseDriver.getConfiguration(),

new HBaseDriver(),

myArgs6);

// 有版本设置、有列导入设置,有列导出设置

String[] myArgs7= new String[]{

"test1",

"test8",

"2",

"cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",

"cf1:c1,cf1:c10,cf1:c14"

};

ToolRunner.run(HBaseDriver.getConfiguration(),

new HBaseDriver(),

myArgs7);

}

}

driver:

package GeneralHBaseToHBase;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.util.Tool;

import util.JarUtil;

public class HBaseDriver extends Configured implements Tool{

public static String FROMTABLE=""; //导入表

public static String TOTABLE=""; //导出表

public static String SETVERSION=""; //是否设置版本

// args => {FromTable,ToTable,SetVersion,ColumnFromTable,ColumnToTable}

@Override

public int run(String[] args) throws Exception {

if(args.length!=5){

System.err.println("Usage:\n demo.job.HBaseDriver "

+ ""

+"< versions >"

+ " like or "

+ " like or ");

return -1;

}

Configuration conf = getConf();

FROMTABLE = args[0];

TOTABLE = args[1];

SETVERSION = args[2];

conf.set("SETVERSION", SETVERSION);

if(!args[3].equals("-1")){

conf.set("COLUMNFROMTABLE", args[3]);

}

if(!args[4].equals("-1")){

conf.set("COLUMNTOTABLE", args[4]);

}

String jobName ="From table "+FROMTABLE+ " ,Import to "+ TOTABLE;

Job job = Job.getInstance(conf, jobName);

job.setJarByClass(HBaseDriver.class);

Scan scan = new Scan();

// 判断是否需要设置版本

if(SETVERSION != "0" || SETVERSION != "1"){

scan.setMaxVersions(Integer.parseInt(SETVERSION));

}

// 设置HBase表输入:表名、scan、Mapper类、mapper输出键类型、mapper输出值类型

TableMapReduceUtil.initTableMapperJob(

FROMTABLE,

scan,

HBaseToHBaseMapper.class,

ImmutableBytesWritable.class,

Put.class,

job);

// 设置HBase表输出:表名,reducer类

TableMapReduceUtil.initTableReducerJob(TOTABLE, null, job);

// 没有 reducers, 直接写入到 输出文件

job.setNumReduceTasks(0);

return job.waitForCompletion(true) ? 0 : 1;

}

private static Configuration configuration;

public static Configuration getConfiguration(){

if(configuration==null){

/**

* TODO 了解如何直接从Windows提交代码到Hadoop集群

* 并修改其中的配置为实际配置

*/

configuration = new Configuration();

configuration.setBoolean("mapreduce.app-submission.cross-platform", true);// 配置使用跨平台提交任务

configuration.set("fs.defaultFS", "hdfs://master:8020");// 指定namenode

configuration.set("mapreduce.framework.name", "yarn"); // 指定使用yarn框架

configuration.set("yarn.resourcemanager.address", "master:8032"); // 指定resourcemanager

configuration.set("yarn.resourcemanager.scheduler.address", "master:8030");// 指定资源分配器

configuration.set("mapreduce.jobhistory.address", "master:10020");// 指定historyserver

configuration.set("hbase.master", "master:16000");

configuration.set("hbase.rootdir", "hdfs://master:8020/hbase");

configuration.set("hbase.zookeeper.quorum", "slave1,slave2,slave3");

configuration.set("hbase.zookeeper.property.clientPort", "2181");

//TODO 需export->jar file ; 设置正确的jar包所在位置

configuration.set("mapreduce.job.jar",JarUtil.jar(HBaseDriver.class));// 设置jar包路径

}

return configuration;

}

}

mapper:

package GeneralHBaseToHBase;

import java.io.IOException;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.HashSet;

import java.util.Map.Entry;

import java.util.NavigableMap;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.Cell;

import org.apache.hadoop.hbase.KeyValue;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapper;

import org.apache.hadoop.hbase.util.Bytes;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class HBaseToHBaseMapper extends TableMapper {

Logger log = LoggerFactory.getLogger(HBaseToHBaseMapper.class);

private static int versionNum = 0;

private static String[] columnFromTable = null;

private static String[] columnToTable = null;

private static String column1 = null;

private static String column2 = null;

@Override

protected void setup(Context context)

throws IOException, InterruptedException {

Configuration conf = context.getConfiguration();

versionNum = Integer.parseInt(conf.get("SETVERSION", "0"));

column1 = conf.get("COLUMNFROMTABLE",null);

if(!(column1 == null)){

columnFromTable = column1.split(",");

}

column2 = conf.get("COLUMNTOTABLE",null);

if(!(column2 == null)){

columnToTable = column2.split(",");

}

}

@Override

protected void map(ImmutableBytesWritable key, Result value,

Context context)

throws IOException, InterruptedException {

context.write(key, resultToPut(key,value));

}

/***

* 把key,value转换为Put

* @param key

* @param value

* @return

* @throws IOException

*/

private Put resultToPut(ImmutableBytesWritable key, Result value) throws IOException {

HashMap fTableMap = new HashMap<>();

HashMap tTableMap = new HashMap<>();

Put put = new Put(key.get());

if(! (columnFromTable == null || columnFromTable.length == 0)){

fTableMap = getFamilyAndColumn(columnFromTable);

}

if(! (columnToTable == null || columnToTable.length == 0)){

tTableMap = getFamilyAndColumn(columnToTable);

}

if(versionNum==0){

if(fTableMap.size() == 0){

if(tTableMap.size() == 0){

for (Cell kv : value.rawCells()) {

put.add(kv); // 没有设置版本,没有设置列导入,没有设置列导出

}

return put;

} else{

return getPut(put, value, tTableMap); // 无版本、无列导入、有列导出

}

} else {

if(tTableMap.size() == 0){

return getPut(put, value, fTableMap);// 无版本、有列导入、无列导出

} else {

return getPut(put, value, tTableMap);// 无版本、有列导入、有列导出

}

}

} else{

if(fTableMap.size() == 0){

if(tTableMap.size() == 0){

return getPut1(put, value); // 有版本,无列导入,无列导出

}else{

return getPut2(put, value, tTableMap); //有版本,无列导入,有列导出

}

}else{

if(tTableMap.size() == 0){

return getPut2(put,value,fTableMap);// 有版本,有列导入,无列导出

}else{

return getPut2(put,value,tTableMap); // 有版本,有列导入,有列导出

}

}

}

}

/***

* 无版本设置的情况下,对于有列导入或者列导出

* @param put

* @param value

* @param tableMap

* @return

* @throws IOException

*/

private Put getPut(Put put,Result value,HashMap tableMap) throws IOException{

for(Cell kv : value.rawCells()){

byte[] family = kv.getFamily();

if(tableMap.containsKey(new String(family))){

String columnStr = tableMap.get(new String(family));

ArrayList columnBy = toByte(columnStr);

if(columnBy.contains(new String(kv.getQualifier()))){

put.add(kv); //没有设置版本,没有设置列导入,有设置列导出

}

}

}

return put;

}

/***

* (有版本,无列导入,有列导出)或者(有版本,有列导入,无列导出)

* @param put

* @param value

* @param tTableMap

* @return

*/

private Put getPut2(Put put,Result value,HashMap tableMap){

NavigableMap>> map=value.getMap();

for(byte[] family:map.keySet()){

if(tableMap.containsKey(new String(family))){

String columnStr = tableMap.get(new String(family));

log.info("@@@@@@@@@@@"+new String(family)+" "+columnStr);

ArrayList columnBy = toByte(columnStr);

NavigableMap> familyMap = map.get(family);//列簇作为key获取其中的列相关数据

for(byte[] column:familyMap.keySet()){ //根据列名循坏

log.info("!!!!!!!!!!!"+new String(column));

if(columnBy.contains(new String(column))){

NavigableMap valuesMap = familyMap.get(column);

for(Entry s:valuesMap.entrySet()){//获取列对应的不同版本数据,默认最新的一个

System.out.println("***:"+new String(family)+" "+new String(column)+" "+s.getKey()+" "+new String(s.getValue()));

put.addColumn(family, column, s.getKey(),s.getValue());

}

}

}

}

}

return put;

}

/***

* 有版本、无列导入、无列导出

* @param put

* @param value

* @return

*/

private Put getPut1(Put put,Result value){

NavigableMap>> map=value.getMap();

for(byte[] family:map.keySet()){

NavigableMap> familyMap = map.get(family);//列簇作为key获取其中的列相关数据

for(byte[] column:familyMap.keySet()){ //根据列名循坏

NavigableMap valuesMap = familyMap.get(column);

for(Entry s:valuesMap.entrySet()){ //获取列对应的不同版本数据,默认最新的一个

put.addColumn(family, column, s.getKey(),s.getValue());

}

}

}

return put;

}

// str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"}

/***

* 得到列簇名与列名的k,v形式的map

* @param str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"}

* @return map => {"cf1" => "c1,c2,c10,c11,c14"}

*/

private static HashMap getFamilyAndColumn(String[] str){

HashMap map = new HashMap<>();

HashSet set = new HashSet<>();

for(String s : str){

set.add(s.split(":")[0]);

}

Object[] ob = set.toArray();

for(int i=0; i

String family = String.valueOf(ob[i]);

String columns = "";

for(int j=0;j < str.length;j++){

if(family.equals(str[j].split(":")[0])){

columns += str[j].split(":")[1]+",";

}

}

map.put(family, columns.substring(0, columns.length()-1));

}

return map;

}

private static ArrayList toByte(String s){

ArrayList b = new ArrayList<>();

String[] sarr = s.split(",");

for(int i=0;i

b.add(sarr[i]);

}

return b;

}

}

程序运行完之后,在hbase shell中查看每个表,看是否数据导入正确:

test2:(无版本、无列导入设置、无列导出设置)

test3 (无版本、有列导入设置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、无列导出设置)

test4(无版本、无列导入设置、有列导出设置("cf1:c1,cf1:c10,cf1:c14"))

test5(有版本、无列导入设置、无列导出设置)

test6(有版本、有列导入设置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、无列导出设置)

test7(有版本、无列导入设置、有列导出设置("cf1:c1,cf1:c10,cf1:c14"))

test8(有版本、有列导入设置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、有列导出设置("cf1:c1,cf1:c10,cf1:c14"))

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

java导出hbase表数据_通用MapReduce程序复制HBase表数据相关推荐

  1. 微信小程序系统教程Java版[3/3阶段]_微信小程序电商系统-翟东平-专题视频课程...

    微信小程序系统教程Java版[3/3阶段]_微信小程序电商系统-2445人已学习 课程介绍         微信小程序系统教程[初级阶段],微信小程序0基础学起,讲解微信小程序开发的基础知识. 微信小 ...

  2. 视频教程-微信小程序系统教程Java版[3/3阶段]_微信小程序电商系统-微信开发

    微信小程序系统教程Java版[3/3阶段]_微信小程序电商系统 微信企业号星级会员.10多年软件从业经历,国家级软件项目负责人,主要从事软件研发.软件企业员工技能培训.已经取得计算机技术与软件资格考试 ...

  3. java hbase 删除数据结构_「从零单排HBase 09」Hbase的那些数据结构和算法

    在之前学习MySQL的时候,我们知道存储引擎常用的索引结构有B+树索引和哈希索引. 而对HBase的学习,也离不开索引结构的学习,它使用了一种LSM树((Log-Structured Merge-Tr ...

  4. 查询a表有但是b表没有的数据_牛逼!一个上亿数据的报表竟然能做到秒查~

    数据背景 首先项目是西门子中国在我司实施部署的MES项目,由于项目是在产线上运作(3 years+),数据累积很大.在项目的数据库中,大概上亿条数据的表有5个以上,千万级数据的表10个以上,百万级数据 ...

  5. oracle中join另一个表后会查询不出一些数据_阿里规定超过3张表,禁止JOIN,为何?

    一. 问题提出 <阿里巴巴JAVA开发手册>里面写超过三张表禁止join,这是为什么? 二.问题分析 对这个结论,你是否有怀疑呢?也不知道是哪位先哲说的不要人云亦云,今天我设计sql,来验 ...

  6. java爬取网页数据_利用Python做数据分析—对前程无忧数据类岗位进行分析

    引言 随着时代的发展,我国在各行各业都需要大量的人才引进,处于近几年最热门的行业也称"最火行业":大数据.数据分析.数据挖掘.机器学习.人工智能,这五门行业各有不同又互有穿插.近几 ...

  7. 数据透视表 筛选_筛选列表可见行中的数据透视表

    数据透视表 筛选 When you create a pivot table in Excel, it doesn't matter if there are filters applied in t ...

  8. power bi 跨表计算_现代企业BI使命之打造数据文化——微软Build大会PowerBI解读

    现代企业BI使命之打造数据文化--微软Build大会PowerBI解读 5月19日晚在线举行的微软Build2020开发者大会上,几位PowerBI(以下简称PBI)团队的主要成员与大家介绍了PBI的 ...

  9. 刷新table数据_经典 - 一文轻松看懂数据透视表

    [导语]也许大多数人都知道 Excel 中的数据透视表,也体会到了它的强大功能,那么 Pandas 也提供了一个类似的功能,也就是pivot_table.因为考虑到直接学 pivot_table 会有 ...

最新文章

  1. 通过define _CRTDBG_MAP_ALLOC宏来检测windows上的code是否有内存泄露
  2. 江湖永在:金庸先生和阿里人的那些记忆
  3. 导航能力堪比GPS!动物们是这样做到的
  4. “进度条”博客——第四周
  5. 生产场景不同角色linux服务器分区案例分享
  6. python基础之01数据类型-变量-运算浅解
  7. 决定了 [2007-10-11]
  8. Python网络爬虫
  9. geoserver三维_基于geoserver的伪三维地图制作
  10. 音频系统测试软件:Smaart for Mac
  11. 文件删除需要管理员权限
  12. 用Python分析《都挺好》中的人物关系
  13. 用python祝福父亲节_python 计算 父亲节
  14. 万恶的prototype
  15. python中quadratic,Python: Using CVXOPT for quadratic programming
  16. IntelliJ IDEA 文件只读
  17. 网易微专业Android实战教程
  18. 视频字幕 硬字幕 软字幕 外挂字幕 简介
  19. 猫眼电影经典电影爬取
  20. /bin/sh: 1: x86_64-linux-gnu-gcc: not found

热门文章

  1. 求序列最长不下降子序列_最长不下降子序列nlogn算法详解
  2. 火星云分发全网视频_好用的短视频一键分发软件,让工作效率提高10倍
  3. nginx管理面板_吸塑包装自建网站上线,阿里云ecs+bt面板+WordPress
  4. linux服务器上网页变形,Linux服务器上用iScanner删除网页恶意代码的方法
  5. lombok有参构造注解_Java高效开发工具: Lombok
  6. 查询学生选修课程管理系统java_JAVA数据库课程设计学生选课管理系统的
  7. 广义典型相关分析_重复测量数据分析及结果详解(之二)——广义估计方程
  8. 数据结构与算法(C#版)第二章 C#语言与面向对象技术(中)V1.0
  9. 【转】GitHub 从单机到联机:玩转 Pull Request
  10. Dynamics CRM 2016 安装