项目github地址:bitcarmanlee easy-algorithm-interview-and-practice
欢迎大家star,留言,一起学习进步

Hbase里的数据量一般都小不了,因此MapReduce跟Hbase就成了天然的好搭档。本文中,本博主将给出最详细的用MR读取Hbase中数据的实例。

1.ZK授权表

首先一点来说,Hbase是强依赖于ZK的。博主所在的team,就经常出现ZK连接数太多被打爆然后Hbase挂了的情况。一般在访问Hbase表之前,需要通过访问ZK得到授权:

    /*** 为hbase表授权。** @param tableConfigKey 任意一个字符串。* @param tableName 需要授权的表名, scan涉及到的表不需要额外授权。* @param job 相关job。* @throws IOException*/public static void initAuthentication(String tableConfigKey, String tableName, Job job) throws IOException {Configuration peerConf = job.getConfiguration();peerConf.set(tableConfigKey, tableName);ZKUtil.applyClusterKeyToConf(peerConf, tableName);if (User.isHBaseSecurityEnabled(peerConf)) {LOGGER.info("Obtaining remote user authentication token with table:{}", tableName);try {User.getCurrent().obtainAuthTokenForJob(peerConf, job);} catch (InterruptedException ex) {LOGGER.info("Interrupted obtaining remote user authentication token");LOGGER.error("Obtained remote user authentication token with table:{}, error:\n", tableName, ex);Thread.interrupted();}LOGGER.info("Obtained remote user authentication token with table:{}", tableName);}}

代码相对比较简单,都是Hbase与ZK提供的一些辅助工具类。不解释。

2.thrift对象转化

本例中操作的对象为XX表,Column Family为"P", Qualifer 为"P"与"C",里面对应的value都是thrift对象。其中"P"对应的thrift对象为:

struct UserProfile {1: optional byte sex; 2: optional i32 age;3: optional string phoneBrand;4: optional string locationProvince;
}

"C"对应的thrift对象为:

struct UserClickInfo {1: required i32 totolAck;2: required i32 totalClick;3: optional map<i64, map<string, i32>> ackClick;
}

这个时候我们就需要经常将Bytes转化为thrift对象,通用的方法为:

    /*** convert byte array to thrift object.** @param <T> type of thrift object.* @param thriftObj an thrift object.* @return byte array if convert succeeded, <code>null</code> if convert failed.* @throws TException*/public static final <T extends TBase<T, ?>> T convertBytesToThriftObject(byte[] raw, T thriftObj) throws TException {if (ArrayUtils.isEmpty(raw)) {return null;}Validate.notNull(thriftObj, "thriftObj");TDeserializer serializer = new TDeserializer(new TBinaryProtocol.Factory());serializer.deserialize(thriftObj, raw);return thriftObj;}

3.Map阶段读取Hbase里的数据

在Map阶段对Hbase表扫描,得出数据

 //输出的KV均为Textstatic class ReadMapper extends TableMapper<Text,Text> {@Overrideprotected void map(ImmutableBytesWritable key, Result res, Context context) throws IOException,InterruptedException{String uuid = StringUtils.reverse(Bytes.toString(key.copyBytes()));if (res == null || res.isEmpty()) return;res.getFamilyMap(USER_FEATURE_COLUMN_FAMILY);for(KeyValue kv:res.list()) {String qualifier = Bytes.toString(kv.getQualifier());//String qualifier = kv.getKeyString();if(qualifier.equals("P")) {try {UserProfile userProfile = new UserProfile();convertBytesToThriftObject(kv.getValue(), userProfile);String profileRes = userProfile.getAge() + "," + userProfile.getSex() + ","+ userProfile.getPhoneBrand() + "," + userProfile.getLocationProvince();context.write(new Text(uuid),new Text(profileRes));} catch (Exception ex) {}}else if(qualifier.equals("C")) {UserClickInfo userClickInfo = new UserClickInfo();try {convertBytesToThriftObject(kv.getValue(), userClickInfo);Map<Long,Map<String,Integer>> resMap = userClickInfo.getAckClick();for(Map.Entry<Long,Map<String,Integer>> entry:resMap.entrySet()) {String appid = String.valueOf(entry.getKey());int click = entry.getValue().get("click");int ack = entry.getValue().get("ack");String all = appid + "," + String.valueOf(click) + "," + String.valueOf(ack);context.write(new Text(uuid),new Text(all));}int allClick = userClickInfo.getTotalClick();int allAck = userClickInfo.getTotolAck();String allNum = "999," + String.valueOf(allClick) + "," + String.valueOf(allAck);context.write(new Text(uuid),new Text(allNum));} catch (Exception ex) {}}}}}

4.run方法里配置相关驱动

run方法里需要配置一些相关的参数,保证任务的顺利进行。
其中,TableMapReduceUtil.addDependencyJars方法添加了完成任务一些必要的类。

    public int run(String[] args) throws Exception{Configuration conf = HBaseConfiguration.create();Job job = Job.getInstance(conf,"read_data_from_hbase");job.setJarByClass(ReadDataFromHbase.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setReducerClass(ReadReducer.class);job.setSpeculativeExecution(false);TableMapReduceUtil.addDependencyJars(job.getConfiguration(),StringUtils.class, TimeUtils.class, Util.class,CompressionCodec.class, TStructDescriptor.class, ObjectMapper.class, CompressionCodecName.class, BytesInput.class);Scan scan = new Scan();//对整个CF扫描scan.addFamily(USER_FEATURE_COLUMN_FAMILY);String table = "XXX";initAuthentication(table,table,job);TableMapReduceUtil.initTableMapperJob(table,scan,ReadMapper.class,Text.class,Text.class,job);String output = "";FileSystem.get(job.getConfiguration()).delete(new Path(output), true);FileOutputFormat.setOutputPath(job,new Path(output));return job.waitForCompletion(true) ? 0 : 1;}

5.完整的代码

package XXX.XXX.XXX.mr_job_and_tools.task;import com.twitter.elephantbird.thrift.TStructDescriptor;
import XXX.XXX.XXX.XXX.common.util.TimeUtils;
import XXX.XXX.XXX.thrift.UserClickInfo;
import XXX.XXX.XXX.thrift.UserProfile;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.Validate;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.format.CompressionCodec;
import org.apache.parquet.format.Util;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.thrift.TBase;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shaded.parquet.org.codehaus.jackson.map.ObjectMapper;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;/*** Created by WangLei on 17-3-13.*/
public class ReadDataFromHbase extends Configured implements Tool{private static final Logger LOGGER = LoggerFactory.getLogger(ReadDataFromHbase.class);public static final byte[] USER_FEATURE_COLUMN_FAMILY = Bytes.toBytes("P");/*** convert byte array to thrift object.** @param <T> type of thrift object.* @param thriftObj an thrift object.* @return byte array if convert succeeded, <code>null</code> if convert failed.* @throws TException*/public static final <T extends TBase<T, ?>> T convertBytesToThriftObject(byte[] raw, T thriftObj) throws TException {if (ArrayUtils.isEmpty(raw)) {return null;}Validate.notNull(thriftObj, "thriftObj");TDeserializer serializer = new TDeserializer(new TBinaryProtocol.Factory());serializer.deserialize(thriftObj, raw);return thriftObj;}/*** 为hbase表授权。** @param tableConfigKey 任意一个字符串。* @param tableName 需要授权的表名, scan涉及到的表不需要额外授权。* @param job 相关job。* @throws IOException*/public static void initAuthentication(String tableConfigKey, String tableName, Job job) throws IOException {Configuration peerConf = job.getConfiguration();peerConf.set(tableConfigKey, tableName);ZKUtil.applyClusterKeyToConf(peerConf, tableName);if (User.isHBaseSecurityEnabled(peerConf)) {LOGGER.info("Obtaining remote user authentication token with table:{}", tableName);try {User.getCurrent().obtainAuthTokenForJob(peerConf, job);} catch (InterruptedException ex) {LOGGER.info("Interrupted obtaining remote user authentication token");LOGGER.error("Obtained remote user authentication token with table:{}, error:\n", tableName, ex);Thread.interrupted();}LOGGER.info("Obtained remote user authentication token with table:{}", tableName);}}static class ReadMapper extends TableMapper<Text,Text> {@Overrideprotected void map(ImmutableBytesWritable key, Result res, Context context) throws IOException,InterruptedException{String uuid = StringUtils.reverse(Bytes.toString(key.copyBytes()));if (res == null || res.isEmpty()) return;res.getFamilyMap(USER_FEATURE_COLUMN_FAMILY);for(KeyValue kv:res.list()) {String qualifier = Bytes.toString(kv.getQualifier());//String qualifier = kv.getKeyString();if(qualifier.equals("P")) {try {UserProfile userProfile = new UserProfile();convertBytesToThriftObject(kv.getValue(), userProfile);String profileRes = userProfile.getAge() + "," + userProfile.getSex() + ","+ userProfile.getPhoneBrand() + "," + userProfile.getLocationProvince();context.write(new Text(uuid),new Text(profileRes));} catch (Exception ex) {}}else if(qualifier.equals("C")) {UserClickInfo userClickInfo = new UserClickInfo();try {convertBytesToThriftObject(kv.getValue(), userClickInfo);Map<Long,Map<String,Integer>> resMap = userClickInfo.getAckClick();for(Map.Entry<Long,Map<String,Integer>> entry:resMap.entrySet()) {String appid = String.valueOf(entry.getKey());int click = entry.getValue().get("click");int ack = entry.getValue().get("ack");String all = appid + "," + String.valueOf(click) + "," + String.valueOf(ack);context.write(new Text(uuid),new Text(all));}int allClick = userClickInfo.getTotalClick();int allAck = userClickInfo.getTotolAck();String allNum = "999," + String.valueOf(allClick) + "," + String.valueOf(allAck);context.write(new Text(uuid),new Text(allNum));} catch (Exception ex) {}}}}}static class ReadReducer extends Reducer<Text,Text,Text,Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,InterruptedException{List<String> resultList = new ArrayList<String>();for(Text each:values) {resultList.add(each.toString());}String res = StringUtils.join(resultList,":");context.write(key,new Text(res));}}@Overridepublic int run(String[] args) throws Exception{Configuration conf = HBaseConfiguration.create();Job job = Job.getInstance(conf,"read_data_from_hbase");job.setJarByClass(ReadDataFromHbase.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setReducerClass(ReadReducer.class);job.setSpeculativeExecution(false);TableMapReduceUtil.addDependencyJars(job.getConfiguration(),StringUtils.class, TimeUtils.class, Util.class,CompressionCodec.class, TStructDescriptor.class, ObjectMapper.class, CompressionCodecName.class, BytesInput.class);Scan scan = new Scan();//对整个CF扫描scan.addFamily(USER_FEATURE_COLUMN_FAMILY);String table = "XXX";initAuthentication(table,table,job);TableMapReduceUtil.initTableMapperJob(table,scan,ReadMapper.class,Text.class,Text.class,job);String output = "";FileSystem.get(job.getConfiguration()).delete(new Path(output), true);FileOutputFormat.setOutputPath(job,new Path(output));return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception{System.exit(ToolRunner.run(new ReadDataFromHbase(),args));}
}

然后将代码打包,提交到集群上运行对应的shell脚本即可。

6.版本信息

本文中的代码,对应的hadoop版本为2.6,Hbase版本为0.98。

MapReduce操作Hbase史上最完整范例相关推荐

  1. 史上最完整的文件和目录操作类

    [文件操作类]史上最完整的文件和目录操作类 <a target=_blank href="http://bbs.cskin.net/thread-114-1-1.html"& ...

  2. poi操作ppt图表史上最完整示例演示

    poi操作ppt图表史上最完整示例演示和内嵌excel的获取添加数据简单示例 ,POI3.15版本. 在模板中构造几中基本图表进行测试就行了. 完整下载地址:http://download.csdn. ...

  3. ML之FE:利用【数据分析+数据处理】算法对国内某平台上海2020年6月份房价数据集【12+1】进行特征工程处理(史上最完整,建议收藏)

    ML之FE:利用[数据分析+数据处理]算法对国内某平台上海2020年6月份房价数据集[12+1]进行特征工程处理(史上最完整,建议收藏) 目录 利用[数据分析+数据处理]算法对链家房价数据集[12+1 ...

  4. 史上最完整交互设计基本原则|推荐收藏

    史上最完整交互设计基本原则|推荐收藏 人人都是产品经理 •  2 小时前 摘要:如何设计出具有优秀用户体验的产品是交互设计师始终面临的一道难题,"好的产品设计一定是建立在对用户需求的深刻理解 ...

  5. 史上最完整的MySQL注入 1

    作者:Passerby2 原文来自:史上最完整的MySQL注入 免责声明:本教程仅用于教育目的,以保护您自己的SQL注释代码. 在阅读本教程后,您必须对任何行动承担全部责任. 0x00 ~ 背景 这篇 ...

  6. 终于把单点登录完整流程图画明白了!史上最完整的CAS单点登录完整图解!

    CAS单点登录 本人也是初次接触CAS,有问题还请指正. 什么是单点登录 比如说百度,在浏览器中登录百度贴吧之后,百度的其他网站也同步登录了:退出百度贴吧之后,百度的其他网站也同步退出了. 简单来说就 ...

  7. php编写六十甲子纳音表_史上最完整的六十甲子纳音表详细说明

    六十甲子是汉族人民最早和最大的发明,其历史已有上千年,其用途是纪年.纪月.纪日.纪时.在古时候,就是我们的时钟,以六十年为一个周期,纪月为五年一个周期,纪日为六十天一个周期,纪时为五天一个周期.接下来 ...

  8. 史上最完整的5G NR介绍

    史上最完整的5G NR介绍 目录 史上最完整的5G NR介绍 5G部署选项 5G NR频谱 5G NR物理层 5G部署选项 一说到"部署选项"这事,说实话,我觉得自己有点" ...

  9. 史上最完整的2012(2011年度)SCI(SSCI)影响因子

    史上最完整的2012(2011年度)SCI(SSCI)影响因子 (1)按因子if大小排序的版本 按if排序的版本.xls (2)2012年(2011年度)按学科领域细分 按学科细分.xls (3)各种 ...

  10. HBase学习(5)-MapReduce操作HBase

    原文来自:扎心了,老铁的<HBase学习之路 (五)MapReduce操作Hbase>

最新文章

  1. React 组件js文件中如何引入其他的js 文件数组
  2. BCH双花成功率极低——零确认交易安全性高达99.9%
  3. NAS设置NFS共享便于KODI添加视频的方式
  4. 大数据量及海量数据处理算法总结
  5. 大文件做分割处理的方法——winRAR压缩分割法
  6. Object-C时间与字符串的转化 因多语言设置中造成返回Nil的解决方法
  7. gradle 构建完成自动删除_Gradle 6.6 RC6 发布,引入配置缓存特性,大幅提升构建性能
  8. 我与电脑1-初识电脑
  9. 华为服务器修改密码命令,服务器用户名密码修改
  10. html js控制页面蒙版,js实现蒙版效果
  11. 博士申请 | 西湖大学智能无人系统实验室招收空中机器人方向全奖博士生
  12. 介绍一种AI的抠图方法
  13. 智能车阳光算法(含大津法)
  14. nrf52 ESB通信协议底层探讨
  15. Diffusion-weighted in MRI 学习笔记
  16. Python 去除字符串中空格(删除指定字符)的3种方法
  17. SAP MM模块业务流程
  18. 关键点估计之 PCK, PCKh, PDJ 评价度量
  19. MPEG2-TS格式
  20. 写在腾讯大讲堂演讲之后

热门文章

  1. Mac 抓包工具wireshark使用
  2. 一步步打造一个移动端手势库
  3. [macOS] git忽略所有的.DS_Store文件
  4. [转载]github在线更改mysql表结构工具gh-ost
  5. idea使用jrebel热部署插件
  6. JavaScript高级编程II
  7. Eclipse常用的一些设置
  8. delphi之鼠标模拟
  9. 如何实现 java 接口中的部分方法
  10. python 2个dict如何合并