五十三、通过MapReduce实现HBase操作
通过HBase的相关JavaAPI,我们可以实现伴随HBase操作的MapReduce过程,比如使用MapReduce将HBase表中的数据拷贝到另外一张表。本文我们通过两个案例来进行实操一下,关注专栏《破茧成蝶——大数据篇》,查看更多相关的内容~
目录
一、将HBase表数据复制到另外一张表
1.1 需求说明
1.2 编码实现
1.3 测试
二、将HDFS的数据写入到HBase中
2.1 需求说明
2.2 编码实现
2.3 测试
一、将HBase表数据复制到另外一张表
1.1 需求说明
将xzw:people中的数据拷贝到xzw:user中。
1.2 编码实现
1、新建ScanDataMapper类用于获取xzw:people中的数据
package com.xzw.hbase_mr;import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;import java.io.IOException;/*** @author: xzw* @create_date: 2021/3/17 13:46* @desc: 用于获取people中的数据* @modifier:* @modified_date:* @desc:*/
public class ScanDataMapper extends TableMapper<ImmutableBytesWritable, Put> {@Overrideprotected void map(ImmutableBytesWritable key, Result result, Context context) throws IOException,InterruptedException {//运行Mapper,查询数据Put put = new Put(key.get());for (Cell cell :result.rawCells()) {put.addColumn(CellUtil.cloneFamily(cell),CellUtil.cloneQualifier(cell),CellUtil.cloneValue(cell));}context.write(key, put);}
}
2、新建InsertDataReducer类用于将读取到的数据存到xzw:user表中
package com.xzw.hbase_mr;import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;import java.io.IOException;/*** @author: xzw* @create_date: 2021/3/17 13:49* @desc: 用于将读到的数据存到user表中* @modifier:* @modified_date:* @desc:*/
public class InsertDataReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {@Overrideprotected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException,InterruptedException {//运行Reducer,增加数据for (Put put :values) {context.write(NullWritable.get(), put);}}
}
3、新建HBaseMapperReduceTool类用于组装运行job任务
package com.xzw.hbase_mr;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.util.Tool;/*** @author: xzw* @create_date: 2021/3/17 10:55* @desc: 用于组装运行job任务* @modifier:* @modified_date:* @desc:*/
public class HBaseMapperReduceTool implements Tool {public int run(String[] strings) throws Exception {//作业Job job = Job.getInstance();job.setJarByClass(HBaseMapperReduceTool.class);//mapperTableMapReduceUtil.initTableMapperJob("xzw:people",new Scan(),ScanDataMapper.class,ImmutableBytesWritable.class,Put.class,job);//reducerTableMapReduceUtil.initTableReducerJob("xzw:user",InsertDataReducer.class,job);//执行作业boolean b = job.waitForCompletion(true);return b ? JobStatus.State.SUCCEEDED.getValue(): JobStatus.State.FAILED.getValue();}public void setConf(Configuration configuration) {}public Configuration getConf() {return null;}
}
4、新建T2TApplication类用于启动程序
package com.xzw.hbase_mr;import org.apache.hadoop.util.ToolRunner;/*** @author: xzw* @create_date: 2021/3/17 10:53* @desc:* @modifier:* @modified_date:* @desc:*/
public class T2TApplication {public static void main(String[] args) throws Exception {ToolRunner.run(new HBaseMapperReduceTool(), args);}
}
1.3 测试
1、将代码打包提交到服务器上
2、运行代码
yarn jar hbase.jar com.xzw.hbase_mr.T2TApplication
3、查看user表发现,数据已经拷贝过来
二、将HDFS的数据写入到HBase中
2.1 需求说明
将HDFS上面的user.csv文件导入到HBase中的xzw:user表中,文件内容如下所示:
2.2 编码实现
1、新建ReadDataFromHDFSMapper类用于读取HDFS上的数据
package com.xzw.hbase_hdfs;import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @author: xzw* @create_date: 2021/3/18 9:46* @desc: 从HDFS上读取数据* @modifier:* @modified_date:* @desc:*/
public class ReadDataFromHDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//从HDFS中读取的数据String[] data = value.toString().split(",");String rowkey = data[0];String name = data[1];String age = data[2];//初始化rowkeyImmutableBytesWritable immutableBytesWritable = new ImmutableBytesWritable(Bytes.toBytes(rowkey));//初始化put对象Put put = new Put(Bytes.toBytes(rowkey));put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name));put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(age));context.write(immutableBytesWritable, put);}
}
2、新建WriteDataToHBaseReducer类用于将读到的数据写入HBase表
package com.xzw.hbase_hdfs;import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;import java.io.IOException;/*** @author: xzw* @create_date: 2021/3/18 10:07* @desc: 写数据到HBase中* @modifier:* @modified_date:* @desc:*/
public class WriteDataToHBaseReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {@Overrideprotected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException,InterruptedException {//将读出来的数据写入到HBase中for (Put put :values) {context.write(NullWritable.get(), put);}}
}
3、新建HDFSToHBaseTool类用于组装运行job任务
package com.xzw.hbase_hdfs;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;import java.io.IOException;/*** @author: xzw* @create_date: 2021/3/18 10:21* @desc:* @modifier:* @modified_date:* @desc:*/
public class HDFSToHBaseTool implements Tool {public int run(String[] strings) throws Exception {//获取ConfigurationConfiguration conf = new Configuration();//创建job任务Job job = Job.getInstance(conf);job.setJarByClass(HDFSToHBaseTool.class);Path path = new Path("hdfs://master:9000/xzw/user.csv");FileInputFormat.addInputPath(job, path);//设置Mapperjob.setMapperClass(ReadDataFromHDFSMapper.class);job.setMapOutputKeyClass(ImmutableBytesWritable.class);job.setMapOutputValueClass(Put.class);//设置ReducerTableMapReduceUtil.initTableReducerJob("xzw:user", WriteDataToHBaseReducer.class, job);//设置Reducer的数量,最少一个job.setNumReduceTasks(1);boolean b = job.waitForCompletion(true);return b ? 0 : 1;}public void setConf(Configuration configuration) {}public Configuration getConf() {return null;}
}
4、新建H2TApplication类用于启动程序
package com.xzw.hbase_hdfs;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.util.ToolRunner;/*** @author: xzw* @create_date: 2021/3/18 13:38* @desc:* @modifier:* @modified_date:* @desc:*/
public class H2TApplication {public static void main(String[] args) throws Exception {Configuration configuration = HBaseConfiguration.create();ToolRunner.run(configuration, new HDFSToHBaseTool(), args);}
}
2.3 测试
1、将代码打包提交到服务器上
2、运行jar包
yarn jar hbase_hdfs.jar com.xzw.hbase_hdfs.H2TApplication
3、查看user表发现,数据已经导入进来
好了,本文到此已经全部结束。你们在这个过程中遇到了什么问题,欢迎留言,让我看看你们遇到了什么问题~
五十三、通过MapReduce实现HBase操作相关推荐
- 从零开始大数据--Hadoop、HDFS、MapReduce、HBase、Hive
文章目录 概述 Hadoop HDFS HBase 实现原理 Regin服务器原理 HBase安装与使用 NoSQL数据库 MapReduce Hive 概述 IT领域每隔十五年就会迎来一次重大变革: ...
- 学习笔记Hadoop(十一)—— Hadoop基础操作(3)—— MapReduce常用Shell操作、MapReduce任务管理
四.MapReduce常用Shell操作 4.1.MapReduce常用Shell MapReduce Shell 此处指的是可以使用类似shell的命令来直接和MapReduce任务进行交互(这里不 ...
- OpenCV学习笔记(四十一)——再看基础数据结构core OpenCV学习笔记(四十二)——Mat数据操作之普通青年、文艺青年、暴力青年 OpenCV学习笔记(四十三)——存取像素值操作汇总co
OpenCV学习笔记(四十一)--再看基础数据结构core 记得我在OpenCV学习笔记(四)--新版本的数据结构core里面讲过新版本的数据结构了,可是我再看这部分的时候,我发现我当时实在是看得太马 ...
- 【正点原子STM32连载】第五十三章 DSP测试实验 摘自【正点原子】MiniPro STM32H750 开发指南_V1.1
1)实验平台:正点原子MiniPro H750开发板 2)平台购买地址:https://detail.tmall.com/item.htm?id=677017430560 3)全套实验源码+手册+视频 ...
- 实验5-9 使用函数输出水仙花数_正点原子STM32F407探索者开发板资料连载第五十三章 手写识别实验
1)实验平台:alientek 阿波罗 STM32F767 开发板 2)摘自<STM32F7 开发指南(HAL 库版)>关注官方微信号公众号,获取更多资料:正点原子 第五十三章 手写识别实 ...
- 【正点原子Linux连载】第五十三章 异步通知实验 -摘自【正点原子】I.MX6U嵌入式Linux驱动开发指南V1.0
1)实验平台:正点原子阿尔法Linux开发板 2)平台购买地址:https://item.taobao.com/item.htm?id=603672744434 2)全套实验源码+手册+视频下载地址: ...
- 熟悉常用的HBase操作
熟悉常用的HBase操作 1. 以下关系型数据库中的表和数据,要求将其转换为适合于HBase存储的表并插入数据: 学生表(Student)(不包括最后一列) 学号(S_No) 姓名(S_Name) 性 ...
- 五十三、开始算法刷题磨练
@Author: Runsen 从大一写Python文章,到现在其都有上百篇,现在到了五十三,我觉得这个时候应该去刷题了,其实很多学Python的都不是专科的,都是给培训机构宣传牛逼,其实在编程语言中 ...
- MapReduce实现join操作
前阵子把MapReduce实现join操作的算法设想清楚了,但一直没有在代码层面落地.今天终于费了些功夫把整个流程走了一遭,期间经历了诸多麻烦并最终得以将其一一搞定,再次深切体会到,什么叫从计算模型到 ...
最新文章
- 全球首届APMCon,带你给“应用性能”把把脉
- Java 面向对象四大特性
- android mvc mvp 区别,谈谈Android框架 MVC、MVP、MVVM的区别
- 拼多多回应“二次上市”:公司现金储备充裕 暂无任何计划
- Mysql-5.5.3 主从同步不支持master-host问题的解决办法
- 迪米特法则(Law Of Demeter)
- maven学习二(dependencies)
- 【2020年度合辑】人工智能量化实验室原创推送合辑
- mysql联合查询的几种方式
- 利用gretna计算小世界网络属性等图论指标笔记
- Bootstrap响应式图表设计
- 浅淡静态代码分析工具
- unity资源包导入错误 Failed to import package with error Couldnt decompress package
- 2018年回顾:但行好事,无问前程
- 仿pinterest实现瀑布流布局效果
- 修改配置的时候提示token验证失败/微信公众号
- 硬盘格式化后数据如何恢复
- 大连 青少年 python 培训
- python获取a股数据_python获取A股数据列表的例子
- Motion Factory特效制作AE脚本五套 Particle/ActionFX/Glitch/TypeBuilder/Hi-Tech HUD