在HBase上跑MapReduce有个很麻烦的问题:多HTable+多Scan作为Map的Input对象。以往都需要开发人员去写TableInputFormat类的重载方法。

HBase 0.94.6版本加入了一个新的Class::MultiTableInputFormatBase。(这里我提醒一下HBase 0.94.6有致命BUG,请用0.94.7)

这个方法可以帮助MapReduce开发人员快速实现多HTable+多Scan的Input。

MultiTableInputFormatBase的实现源于HBASE-3996:Support multiple tables and scanners as input to the mapper in map/reduce jobs

MultiTableInputFormatBase的加入的同时,补丁还调整了TableMapReduceUtil,并加入了以下方法:

//多输入方法
void org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(List<Scan> scans, Class<? extends TableMapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException//单输入方法
void org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(String table, Scan scan, Class<? extends TableMapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException

多输入方法相比单输入方法的最大区别在于:去掉了Table Name的传入参数,取而代之的是一个List<Scan>

现在问题来了,新方法只有Scan,没有HTable名称。如何让MapReduce知道数据从哪几张表获取呢?

经过几番周折,我在MultiTableInputFormatBase.java源码的第116行找到了答案

      byte[] tableName = scan.getAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME);if (tableName == null) throw new IOException("A scan object did not have a table name");HTable table = new HTable(context.getConfiguration(), tableName);

原来TableName是从Scan.SCAN_ATTRIBUTES_TABLE_NAME中获取的

只要对Scan对象的SCAN_ATTRIBUTES_TABLE_NAME进行TableName赋值就行了。

本文是CSDN-撸大湿原创,如要转载请注明出处,谢谢。

如有任何异议请留言,或去CSDN-Hadoop论坛找我,欢迎拍砖~

最后贴上今天刚写的利用MultiTableInput的MapReduce源码:

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;public class mr_ccu implements StaticObject {public static class CCUMapper extends TableMapper<Text, Text>{Text outputkey = new Text();Text outputvalue = new Text();int i = 0;public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {int LogType = Integer.valueOf(new String(value.getValue("_0".getBytes(), "lgtp".getBytes())));if (LogType == PLAYER_LOGINZONE || LogType == PLAYER_LOGOUT) {String key = new String(value.getValue("_0".getBytes(), "area".getBytes()))+ DEFAULT_DELIMITER+ ("00" + new String(value.getValue("_0".getBytes(), "wid".getBytes()))).substring(1)+ DEFAULT_DELIMITER+ new String(value.getValue("_0".getBytes(), "pid".getBytes()));outputkey.set(key);String val = new String(value.getValue("_0".getBytes(), "uts".getBytes()))+ DEFAULT_DELIMITER+ LogType;outputvalue.set(val);context.write(outputkey, outputvalue);i++;}}protected void cleanup(Context context) throws IOException, InterruptedException {context.getCounter("CCU", "Mapper Count").setValue(i);};}public static class CCUCombiner extends Reducer<Text, Text, Text, Text>{Text outputkey = new Text();Text outputvalue = new Text();int i = 0;public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {Long LastLoginTime = 0L;for (Text val : values) {if (LastLoginTime < Long.valueOf(val.toString().split(DEFAULT_DELIMITER)[0])) {LastLoginTime = Long.valueOf(val.toString().split(DEFAULT_DELIMITER)[0]);outputvalue.set(val);}}context.write(key, outputvalue);i++;}protected void cleanup(org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException {context.getCounter("CCU", "Combiner Count").setValue(i);};}public static class CCUReducer extends TableReducer<Text, Text, ImmutableBytesWritable>{HashMap<String, Integer> CCUMap = new HashMap<String, Integer>();String CCUTime_StringTime = "";String CCUTime_UnixTime = "";protected void setup(org.apache.hadoop.mapreduce.Reducer<Text, Text,ImmutableBytesWritable, org.apache.hadoop.io.Writable>.Context context)throws IOException, InterruptedException {CCUTime_StringTime = context.getConfiguration().get("CCUTime_StringTime");CCUTime_UnixTime = String.valueOf(Long.MAX_VALUE - Long.valueOf(context.getConfiguration().get("CCUTime_UnixTime")));};public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {Long LastLoginTime = 0L;String value = "";for (Text val : values) {if (LastLoginTime < Long.valueOf(val.toString().split(DEFAULT_DELIMITER)[0])) {LastLoginTime = Long.valueOf(val.toString().split(DEFAULT_DELIMITER)[0]);value = val.toString();}}if (Integer.valueOf(value.split(DEFAULT_DELIMITER)[1]) == PLAYER_LOGINZONE) {int CCU = DEFAULT_ZERO_TYPE;String[] tmpStr = key.toString().split(DEFAULT_DELIMITER);String MapKey = tmpStr[0] + DEFAULT_DELIMITER + tmpStr[1] + DEFAULT_DELIMITER + CCUTime_UnixTime;if (CCUMap.containsKey(MapKey))CCU = CCUMap.get(MapKey) + 1;CCUMap.put(MapKey, CCU);}}protected void cleanup(Context context) throws java.io.IOException, java.lang.InterruptedException{Iterator<Entry<String, Integer>> iter = CCUMap.entrySet().iterator();while (iter.hasNext()) {Entry<String, Integer> PutData = iter.next();Put put = new Put(PutData.getKey().getBytes());put.add(Bytes.toBytes("_0"), Bytes.toBytes("ccu"), Bytes.toBytes(String.valueOf(PutData.getValue())));put.add(Bytes.toBytes("_0"), Bytes.toBytes("ts"), Bytes.toBytes(String.valueOf(CCUTime_StringTime)));context.write(null, put);}}}public static void main(String[] args) throws Exception {MyHBase myHBase = new MyHBase();String TempStr = "";String AreaID = "";String StartTime = "";String StopTime = "";if (args.length < 2) {TempStr = "" +"00,01,02" + "\t" +"20130528000000" + "\t" +"20130528210000";AreaID = TempStr.split("\t")[0];StartTime = TempStr.split("\t")[1];StopTime = TempStr.split("\t")[2];} else {AreaID = args[0];StartTime = args[1];StopTime = args[2];}String sourceTable = "ps2cb.login.event";String targetTable = "ps2cb.tbl.ccu";List<Scan> ScanList = new ArrayList<Scan>();for (String aid : AreaID.split(",")) {String StartRow = aid + DEFAULT_DELIMITER + StartTime;String StopRow = aid + DEFAULT_DELIMITER + StopTime;Scan scan = new Scan(StartRow.getBytes(), StopRow.getBytes());scan.setCaching(5000);scan.setCacheBlocks(false);scan.addColumn("_0".getBytes(), "lgtp".getBytes());scan.addColumn("_0".getBytes(), "area".getBytes());scan.addColumn("_0".getBytes(), "wid".getBytes());scan.addColumn("_0".getBytes(), "pid".getBytes());scan.addColumn("_0".getBytes(), "uts".getBytes());scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, sourceTable.getBytes());ScanList.add(scan);}myHBase.myConf.set("CCUTime_StringTime", FormatTime.getStringDatetoString(StopTime));myHBase.myConf.set("CCUTime_UnixTime", FormatTime.toUnixTimeBySecond(StopTime));Job job = new Job(myHBase.myConf, "CCU");job.setJarByClass(mr_ccu.class);job.setCombinerClass(CCUCombiner.class);TableMapReduceUtil.setScannerCaching(job, 5000);TableMapReduceUtil.initTableMapperJob(ScanList,CCUMapper.class,Text.class,Text.class,job);TableMapReduceUtil.initTableReducerJob(targetTable, // output tableCCUReducer.class, // reducer classjob);System.exit(job.waitForCompletion(true) ? 0 : 1);}
}

HBase MapReduce MultiTableInput首次测试相关推荐

  1. HBase常用功能和HBase+MapReduce使用总结

    1.HBase如果加了列限定,如果该列不存在时返回的结果为empty. 2.HBase在scan时指定的StartRow里面不能加- 3.HBase在scan时过滤掉指定列不存在的记录 4.利用Map ...

  2. 基于Solr的HBase多条件查询测试

    背景: 某电信项目中采用HBase来存储用户终端明细数据,供前台页面即时查询.HBase无可置疑拥有其优势,但其本身只对rowkey支持毫秒级的快速检索,对于多字段的组合查询却无能为力.针对HBase ...

  3. HBase MapReduce

    1. HBase to HBase Mapper 继承 TableMapper,输入为Rowkey和Result. public abstract class TableMapper<KEYOU ...

  4. HBase - MapReduce - HBase 作为输出源的示例 | 那伊抹微笑

    博文作者: 那伊抹微笑 csdn 博客地址: http://blog.csdn.net/u012185296 itdog8 地址链接 : http://www.itdog8.com/thread-20 ...

  5. FMT开源自驾仪 | FMT固定翼飞控系统首次测试

    FMT作为国内首个基于模型开发的开源飞控系统,此前在多旋翼无人机F200上的各项测试结果均达预期.在圆满完成F200的测试后,FMT项目组开始着手对固定翼飞控系统进行开发,目前FMT固定翼已实现自稳. ...

  6. 五十三、通过MapReduce实现HBase操作

    通过HBase的相关JavaAPI,我们可以实现伴随HBase操作的MapReduce过程,比如使用MapReduce将HBase表中的数据拷贝到另外一张表.本文我们通过两个案例来进行实操一下,关注专 ...

  7. HBase结合MapReduce批量导入

    Hbase是Hadoop生态体系配置的数据库,我们可以通过HTable api中的put方法向Hbase数据库中插入数据,但是由于put效率太低,不能批量插入大量的数据,文本将详细介绍如何通过MapR ...

  8. MapReduce的方式进行HBase向HDFS导入和导出

    附录代码: HBase---->HDFS 1 import java.io.IOException; 2 3 import org.apache.hadoop.conf.Configuratio ...

  9. MapReduce操作HBase

    运行HBase时常会遇到个错误,我就有这样的经历. ERROR: org.apache.hadoop.hbase.MasterNotRunningException: Retried 7 times ...

  10. java导出hbase表数据_通用MapReduce程序复制HBase表数据

    编写MR程序,让其可以适合大部分的HBase表数据导入到HBase表数据.其中包括可以设置版本数.可以设置输入表的列导入设置(选取其中某几列).可以设置输出表的列导出设置(选取其中某几列). 原始表t ...

最新文章

  1. java map 迭代删除元素,java – 如何在迭代时删除和添加元素到TreeMap?
  2. 1.5 为什么正则化有利于预防过拟合-深度学习第二课《改善深层神经网络》-Stanford吴恩达教授
  3. 【图文并茂】RNN、LSTM、GRU、ConvLSTM、ConvGRU、ST-LSTM的总结
  4. Tensorflow-相关API-交叉熵
  5. Caffe源码解析—核函数
  6. Ansible:Ansibl项目生产环境快速布局
  7. TCP UDP HTTP 的关系和区别
  8. “高仿版拼多多”宣告破产!曾一年收割1.3亿用户,如今自救失败负债16亿
  9. ROS-PCL读取pcd点云数据并在rviz中进行显示
  10. iOS 循环引用 委托 (实例说明)
  11. 安卓平台中的动态加载技术分析
  12. C++11 enable_shared_from_this
  13. js 判断是否为mac电脑 、还是windows操作系统
  14. vmware fusion 7 序列号
  15. Maven系列第4篇:仓库详解
  16. SQL 2008客户端ODBC配置DSN时使用网络登录ID的windows NT验证登录时 报18452错误
  17. 目标规划之问题数学化(建模)
  18. 1714. 混合牛奶
  19. 广告传媒实际税负怎么计算_文化传媒 广告行业企业怎么来合法节税,税收案例展示...
  20. 信息系统安全等级保护相关法规及重要国家标准汇总目录

热门文章

  1. 一年月份大小月口诀_《认识年月日》大小月记忆法知识点教学设计
  2. Shaolin - HDU 4585 - 树堆
  3. 小米平板2可以装鸿蒙系统,搞定LOL?Win10版小米平板2游戏性能实测
  4. arm开发板烧写linux系统,ARM开发板烧写linux系统的步骤
  5. xp计算机组策略怎么打开,WinXP系统打开组策略的命令是什么?
  6. smp irq affinity介绍
  7. 数据库表结构设计,什么是概念模型、逻辑模型、物理模型
  8. c 语言 农历,C++算法系列之中国农历的算法
  9. HTML5个人学习笔记(一)
  10. 通信领域的宽带信号和窄带信号到底是什么??