HBase MapReduce
1. HBase to HBase
Mapper 继承 TableMapper,输入为Rowkey和Result.
public abstract class TableMapper<KEYOUT, VALUEOUT> extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> { public TableMapper() { }}
package com.scb.jason.mapper;import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** Created by Administrator on 2017/8/16.*/ public class User2BasicMapper extends TableMapper<ImmutableBytesWritable, Put> {private ImmutableBytesWritable mapOutputkey = new ImmutableBytesWritable();@Overridepublic void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {//Get rowKey mapOutputkey.set(key.get());Put put = new Put(key.get());for(Cell cell:value.rawCells()){if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){put.add(cell);}if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){put.add(cell);}}}context.write(mapOutputkey,put);}}
Reducer 继承 TableReducer
public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation> { public TableReducer() { }}
package com.scb.jason.reducer;import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** Created by Administrator on 2017/8/16.*/ public class User2BasicReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable>{@Overrideprotected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {for(Put put:values){context.write(null,put);}} }
Driver
package com.scb.jason.driver;import com.scb.jason.mapper.User2BasicMapper; import com.scb.jason.reducer.User2BasicReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/*** Created by Administrator on 2017/8/16.*/ public class User2BasicDriver extends Configured implements Tool{public int run(String[] strings) throws Exception {Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());job.setJarByClass(this.getClass());Scan scan = new Scan();scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobsscan.setCacheBlocks(false); // don't set to true for MR jobs// set other scan attrs TableMapReduceUtil.initTableMapperJob("user", // input tablescan, // Scan instance to control CF and attribute selectionUser2BasicMapper.class, // mapper classImmutableBytesWritable.class, // mapper output keyPut.class, // mapper output value job);TableMapReduceUtil.initTableReducerJob("basic", // output tableUser2BasicReducer.class, // reducer class job);job.setNumReduceTasks(1);boolean isSuccess = job.waitForCompletion(true);return isSuccess?1:0;}public static void main(String[] args) throws Exception {Configuration configuration = HBaseConfiguration.create();int status = ToolRunner.run(configuration,new User2BasicDriver(),args);System.exit(status);}}
2. HBase to File
Mapper
package com.scb.jason.mapper;import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text;import java.io.IOException;/*** Created by Administrator on 2017/8/16.*/ public class User2FileMapper extends TableMapper<Text, Text> {private Text rowKeyText = new Text();private Text valueText = new Text();@Overridepublic void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {//Get rowKey rowKeyText.set(key.get());Put put = new Put(key.get());byte[] inforName = null;byte[] inforAge = null;for(Cell cell:value.rawCells()){if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){inforName = CellUtil.cloneValue(cell);}if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){inforAge = CellUtil.cloneValue(cell);}}}valueText.set(new String(inforName)+"\t"+new String(inforAge));context.write(rowKeyText,valueText);}}
No Reducer Reducer
Driver
package com.scb.jason.driver;import com.scb.jason.mapper.User2BasicMapper; import com.scb.jason.mapper.User2FileMapper; import com.scb.jason.reducer.User2BasicReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;/*** Created by Administrator on 2017/8/16.*/ public class User2FileDriver extends Configured implements Tool{public int run(String[] args) throws Exception {Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());job.setJarByClass(this.getClass());Scan scan = new Scan();scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobsscan.setCacheBlocks(false); // don't set to true for MR jobs// set other scan attrs TableMapReduceUtil.initTableMapperJob("user", // input tablescan, // Scan instance to control CF and attribute selectionUser2FileMapper.class, // mapper classText.class, // mapper output keyText.class, // mapper output value job);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputFormatClass(TextOutputFormat.class);FileOutputFormat.setOutputPath(job,new Path(args[0]));job.setNumReduceTasks(1);boolean isSuccess = job.waitForCompletion(true);return isSuccess?1:0;}public static void main(String[] args) throws Exception {Configuration configuration = HBaseConfiguration.create();int status = ToolRunner.run(configuration,new User2FileDriver(),args);System.exit(status);}}
3. File to HBase
Driver
package com.scb.jason.driver;import com.scb.jason.mapper.File2HbaseMapper; import com.scb.jason.mapper.User2BasicMapper; import com.scb.jason.reducer.File2HBaseReducer; import com.scb.jason.reducer.User2BasicReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;/*** Created by Administrator on 2017/8/16.*/ public class File2BasicDriver extends Configured implements Tool{public int run(String[] strings) throws Exception {Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());job.setJarByClass(this.getClass());job.setMapperClass(File2HbaseMapper.class);FileInputFormat.addInputPath(job,new Path("F:\\Workspace\\File"));job.setMapOutputKeyClass(ImmutableBytesWritable.class);job.setMapOutputValueClass(Put.class);TableMapReduceUtil.initTableReducerJob("basic", // output tableFile2HBaseReducer.class, // reducer class job);job.setNumReduceTasks(1);boolean isSuccess = job.waitForCompletion(true);return isSuccess?1:0;}public static void main(String[] args) throws Exception {Configuration configuration = HBaseConfiguration.create();int status = ToolRunner.run(configuration,new File2BasicDriver(),args);System.exit(status);}}
Mapper
package com.scb.jason.mapper;import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException; import java.util.StringTokenizer;/*** Created by Administrator on 2017/8/17.*/ public class File2HbaseMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put> {private ImmutableBytesWritable mapOutputkey = new ImmutableBytesWritable();@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String lineValue = value.toString();StringTokenizer stringTokenizer = new StringTokenizer(lineValue);String rowkey = stringTokenizer.nextToken();String name = stringTokenizer.nextToken();String age = stringTokenizer.nextToken();Put put = new Put(Bytes.toBytes(rowkey));put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(name));put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(age));mapOutputkey.set(Bytes.toBytes(key.get()));context.write(mapOutputkey,put);}}
Reducer
package com.scb.jason.reducer;import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer;import java.io.IOException;/*** Created by Administrator on 2017/8/25.*/ public class File2HBaseReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable> {@Overrideprotected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {for(Put put:values){context.write(null,put);}}}
4. HBase to RDBMS
public static class MyRdbmsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {private Connection c = null;public void setup(Context context) {// create DB connection... }public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {// do summarization// in this example the keys are Text, but this is just an example }public void cleanup(Context context) {// close db connection }}
5. File -> HFile -> HBase 批量导入
http://www.cnblogs.com/shitouer/archive/2013/02/20/hbase-hfile-bulk-load.html
转载于:https://www.cnblogs.com/xdlaoliu/p/7406789.html
HBase MapReduce相关推荐
- HBase常用功能和HBase+MapReduce使用总结
1.HBase如果加了列限定,如果该列不存在时返回的结果为empty. 2.HBase在scan时指定的StartRow里面不能加- 3.HBase在scan时过滤掉指定列不存在的记录 4.利用Map ...
- HBase - MapReduce - HBase 作为输出源的示例 | 那伊抹微笑
博文作者: 那伊抹微笑 csdn 博客地址: http://blog.csdn.net/u012185296 itdog8 地址链接 : http://www.itdog8.com/thread-20 ...
- HBase结合MapReduce批量导入
Hbase是Hadoop生态体系配置的数据库,我们可以通过HTable api中的put方法向Hbase数据库中插入数据,但是由于put效率太低,不能批量插入大量的数据,文本将详细介绍如何通过MapR ...
- MapReduce的方式进行HBase向HDFS导入和导出
附录代码: HBase---->HDFS 1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuratio ...
- MapReduce操作HBase
运行HBase时常会遇到个错误,我就有这样的经历. ERROR: org.apache.hadoop.hbase.MasterNotRunningException: Retried 7 times ...
- java导出hbase表数据_通用MapReduce程序复制HBase表数据
编写MR程序,让其可以适合大部分的HBase表数据导入到HBase表数据.其中包括可以设置版本数.可以设置输入表的列导入设置(选取其中某几列).可以设置输出表的列导出设置(选取其中某几列). 原始表t ...
- 在cdh5.1.3中在mapreduce使用hbase
环境:centos6.5 .cdh5.1.3 一.hadoop命令找不到hbase相关类 (一)观察hadoop classpath的输出: 1,classpath包含了/etc/hadoop/con ...
- MapReduce on Hbase
org.apache.hadoop.hbase.mapreduce TableMapper TableReducer 一个region对应一个map import java.io.IOExcepti ...
- java mapreduce 读hbase数据 写入hdfs 含maven依赖
mapreduce 读hbase数据 写入hdfs java代码如下 import com.google.common.collect.Lists; import java.io.FileInputS ...
最新文章
- Android Studio 工具栏消失了 设置回来
- mysql查询表的数据大小
- mysql 的 VARCHAR VARCHAR2
- Spring Security——实现登录后跳转到登录前页面
- Angular ng-container元素的学习笔记
- 博士生Science发文:很庆幸导师要求每周交工作进展汇报!
- mac电脑开发环境配置
- Mysql(8)_存储引擎之InnoDB
- Myeclipse6.0安装svn插件
- 22-微信小程序商城 我的订单(微信小程序商城开发、小程序毕业设计、小程序源代码)(黄菊华-微信小程序开发教程)
- 如何做好数据分析师的职业规划?
- foobox 2.11(foobar2000 CUI配置)
- 搭建frida+木木模拟器运行环境
- 【Vue】Emitted value instead of an instance of Error
- Excel 2013 如何分列操作
- 微软 Microsoft
- 个人永久性免费-Excel催化剂功能第96波-地图数据挖宝之全国天气查询(区域最细可到区县,最长预报4天)...
- Kong 插件ACL的使用方法(访问控制列表黑名单)
- Vue3(撩课学院)笔记07-vueRouter路由,创建路由,路由重定向,路由跳转,动态路由,路由懒加载,路由嵌套,路由参数穿传递的两种形式,通过事件来跳转路由
- ibm服务器开不了机维修,IBM X3500服务器故障开不了机