1. HBase to HBase

Mapper 继承 TableMapper,输入为Rowkey和Result.

public abstract class TableMapper<KEYOUT, VALUEOUT> extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {    public TableMapper() {    }}
package com.scb.jason.mapper;import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** Created by Administrator on 2017/8/16.*/
public class User2BasicMapper extends TableMapper<ImmutableBytesWritable, Put> {private ImmutableBytesWritable mapOutputkey = new ImmutableBytesWritable();@Overridepublic void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {//Get rowKey
        mapOutputkey.set(key.get());Put put = new Put(key.get());for(Cell cell:value.rawCells()){if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){put.add(cell);}if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){put.add(cell);}}}context.write(mapOutputkey,put);}}

Reducer 继承 TableReducer

public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation> {    public TableReducer() {    }}
package com.scb.jason.reducer;import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** Created by Administrator on 2017/8/16.*/
public class User2BasicReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable>{@Overrideprotected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {for(Put put:values){context.write(null,put);}}
}

Driver

package com.scb.jason.driver;import com.scb.jason.mapper.User2BasicMapper;
import com.scb.jason.reducer.User2BasicReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/*** Created by Administrator on 2017/8/16.*/
public class User2BasicDriver extends Configured implements Tool{public int run(String[] strings) throws Exception {Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());job.setJarByClass(this.getClass());Scan scan = new Scan();scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobsscan.setCacheBlocks(false);  // don't set to true for MR jobs// set other scan attrs
TableMapReduceUtil.initTableMapperJob("user",      // input tablescan,             // Scan instance to control CF and attribute selectionUser2BasicMapper.class,   // mapper classImmutableBytesWritable.class,             // mapper output keyPut.class,             // mapper output value
                job);TableMapReduceUtil.initTableReducerJob("basic",      // output tableUser2BasicReducer.class,             // reducer class
                job);job.setNumReduceTasks(1);boolean isSuccess = job.waitForCompletion(true);return isSuccess?1:0;}public static void main(String[] args) throws Exception {Configuration configuration = HBaseConfiguration.create();int status = ToolRunner.run(configuration,new User2BasicDriver(),args);System.exit(status);}}


2. HBase to File

Mapper

package com.scb.jason.mapper;import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;import java.io.IOException;/*** Created by Administrator on 2017/8/16.*/
public class User2FileMapper extends TableMapper<Text, Text> {private Text rowKeyText = new Text();private Text valueText = new Text();@Overridepublic void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {//Get rowKey
        rowKeyText.set(key.get());Put put = new Put(key.get());byte[] inforName = null;byte[] inforAge = null;for(Cell cell:value.rawCells()){if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){inforName = CellUtil.cloneValue(cell);}if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){inforAge = CellUtil.cloneValue(cell);}}}valueText.set(new String(inforName)+"\t"+new String(inforAge));context.write(rowKeyText,valueText);}}

No Reducer Reducer

Driver

package com.scb.jason.driver;import com.scb.jason.mapper.User2BasicMapper;
import com.scb.jason.mapper.User2FileMapper;
import com.scb.jason.reducer.User2BasicReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;/*** Created by Administrator on 2017/8/16.*/
public class User2FileDriver extends Configured implements Tool{public int run(String[] args) throws Exception {Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());job.setJarByClass(this.getClass());Scan scan = new Scan();scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobsscan.setCacheBlocks(false);  // don't set to true for MR jobs// set other scan attrs
TableMapReduceUtil.initTableMapperJob("user",      // input tablescan,             // Scan instance to control CF and attribute selectionUser2FileMapper.class,   // mapper classText.class,             // mapper output keyText.class,             // mapper output value
                job);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputFormatClass(TextOutputFormat.class);FileOutputFormat.setOutputPath(job,new Path(args[0]));job.setNumReduceTasks(1);boolean isSuccess = job.waitForCompletion(true);return isSuccess?1:0;}public static void main(String[] args) throws Exception {Configuration configuration = HBaseConfiguration.create();int status = ToolRunner.run(configuration,new User2FileDriver(),args);System.exit(status);}}


3. File to HBase

Driver

package com.scb.jason.driver;import com.scb.jason.mapper.File2HbaseMapper;
import com.scb.jason.mapper.User2BasicMapper;
import com.scb.jason.reducer.File2HBaseReducer;
import com.scb.jason.reducer.User2BasicReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;/*** Created by Administrator on 2017/8/16.*/
public class File2BasicDriver extends Configured implements Tool{public int run(String[] strings) throws Exception {Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());job.setJarByClass(this.getClass());job.setMapperClass(File2HbaseMapper.class);FileInputFormat.addInputPath(job,new Path("F:\\Workspace\\File"));job.setMapOutputKeyClass(ImmutableBytesWritable.class);job.setMapOutputValueClass(Put.class);TableMapReduceUtil.initTableReducerJob("basic",      // output tableFile2HBaseReducer.class,             // reducer class
                job);job.setNumReduceTasks(1);boolean isSuccess = job.waitForCompletion(true);return isSuccess?1:0;}public static void main(String[] args) throws Exception {Configuration configuration = HBaseConfiguration.create();int status = ToolRunner.run(configuration,new File2BasicDriver(),args);System.exit(status);}}

Mapper

package com.scb.jason.mapper;import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;
import java.util.StringTokenizer;/*** Created by Administrator on 2017/8/17.*/
public class File2HbaseMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put> {private ImmutableBytesWritable mapOutputkey = new ImmutableBytesWritable();@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String lineValue = value.toString();StringTokenizer stringTokenizer = new StringTokenizer(lineValue);String rowkey = stringTokenizer.nextToken();String name = stringTokenizer.nextToken();String age = stringTokenizer.nextToken();Put put = new Put(Bytes.toBytes(rowkey));put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(name));put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(age));mapOutputkey.set(Bytes.toBytes(key.get()));context.write(mapOutputkey,put);}}

Reducer

package com.scb.jason.reducer;import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;import java.io.IOException;/*** Created by Administrator on 2017/8/25.*/
public class File2HBaseReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable> {@Overrideprotected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {for(Put put:values){context.write(null,put);}}}


4. HBase to RDBMS

public static class MyRdbmsReducer extends Reducer<Text, IntWritable, Text, IntWritable>  {private Connection c = null;public void setup(Context context) {// create DB connection...
  }public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {// do summarization// in this example the keys are Text, but this is just an example
  }public void cleanup(Context context) {// close db connection
  }}

5. File -> HFile ->  HBase 批量导入

http://www.cnblogs.com/shitouer/archive/2013/02/20/hbase-hfile-bulk-load.html

转载于:https://www.cnblogs.com/xdlaoliu/p/7406789.html

HBase MapReduce相关推荐

  1. HBase常用功能和HBase+MapReduce使用总结

    1.HBase如果加了列限定,如果该列不存在时返回的结果为empty. 2.HBase在scan时指定的StartRow里面不能加- 3.HBase在scan时过滤掉指定列不存在的记录 4.利用Map ...

  2. HBase - MapReduce - HBase 作为输出源的示例 | 那伊抹微笑

    博文作者: 那伊抹微笑 csdn 博客地址: http://blog.csdn.net/u012185296 itdog8 地址链接 : http://www.itdog8.com/thread-20 ...

  3. HBase结合MapReduce批量导入

    Hbase是Hadoop生态体系配置的数据库,我们可以通过HTable api中的put方法向Hbase数据库中插入数据,但是由于put效率太低,不能批量插入大量的数据,文本将详细介绍如何通过MapR ...

  4. MapReduce的方式进行HBase向HDFS导入和导出

    附录代码: HBase---->HDFS 1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuratio ...

  5. MapReduce操作HBase

    运行HBase时常会遇到个错误,我就有这样的经历. ERROR: org.apache.hadoop.hbase.MasterNotRunningException: Retried 7 times ...

  6. java导出hbase表数据_通用MapReduce程序复制HBase表数据

    编写MR程序,让其可以适合大部分的HBase表数据导入到HBase表数据.其中包括可以设置版本数.可以设置输入表的列导入设置(选取其中某几列).可以设置输出表的列导出设置(选取其中某几列). 原始表t ...

  7. 在cdh5.1.3中在mapreduce使用hbase

    环境:centos6.5 .cdh5.1.3 一.hadoop命令找不到hbase相关类 (一)观察hadoop classpath的输出: 1,classpath包含了/etc/hadoop/con ...

  8. MapReduce on Hbase

    org.apache.hadoop.hbase.mapreduce TableMapper  TableReducer 一个region对应一个map import java.io.IOExcepti ...

  9. java mapreduce 读hbase数据 写入hdfs 含maven依赖

    mapreduce 读hbase数据 写入hdfs java代码如下 import com.google.common.collect.Lists; import java.io.FileInputS ...

最新文章

  1. Android Studio 工具栏消失了 设置回来
  2. mysql查询表的数据大小
  3. mysql 的 VARCHAR VARCHAR2
  4. Spring Security——实现登录后跳转到登录前页面
  5. Angular ng-container元素的学习笔记
  6. 博士生Science发文:很庆幸导师要求每周交工作进展汇报!
  7. mac电脑开发环境配置
  8. Mysql(8)_存储引擎之InnoDB
  9. Myeclipse6.0安装svn插件
  10. 22-微信小程序商城 我的订单(微信小程序商城开发、小程序毕业设计、小程序源代码)(黄菊华-微信小程序开发教程)
  11. 如何做好数据分析师的职业规划?
  12. foobox 2.11(foobar2000 CUI配置)
  13. 搭建frida+木木模拟器运行环境
  14. 【Vue】Emitted value instead of an instance of Error
  15. Excel 2013 如何分列操作
  16. 微软 Microsoft
  17. 个人永久性免费-Excel催化剂功能第96波-地图数据挖宝之全国天气查询(区域最细可到区县,最长预报4天)...
  18. Kong 插件ACL的使用方法(访问控制列表黑名单)
  19. Vue3(撩课学院)笔记07-vueRouter路由,创建路由,路由重定向,路由跳转,动态路由,路由懒加载,路由嵌套,路由参数穿传递的两种形式,通过事件来跳转路由
  20. ibm服务器开不了机维修,IBM X3500服务器故障开不了机

热门文章

  1. 【转】Linux如何在系统启动时自动加载模块
  2. OC Swift 走马灯效果
  3. matlab光学毕业论文,光学信息处理实验的Matlab仿真.doc
  4. OpenCV访问像素点的灰度值
  5. threshold()
  6. 完全编译安装boost
  7. 添用户报错:useradd:警告:此主目录已经存在
  8. vs android 压缩,Android Studio是否压缩classes.dex文件?
  9. 如何使用python给PDF文件加水印
  10. android webview rem,Android部分webview rem计算误差记录