mapreduce 读hbase数据 写入hdfs
java代码如下

import com.google.common.collect.Lists;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;public class HbaseToHdfs
{private static final Logger logger = Logger.getLogger(HbaseToHdfs03.class);public static Configuration conf = HBaseConfiguration.create();static {//hbase的ip和端口conf.set("hbase.master", "xxx:xxx");conf.set("mapreduce.output.fileoutputformat.compress", "false");//不进行压缩//hadoop的配置文件位置conf.addResource(new Path("xxx/core-site.xml"));conf.addResource(new Path("xxx/hdfs-site.xml"));//hbase的配置文件位置conf.addResource(new Path("xxx/hbase-site.xml"));conf.set("hbase.client.pause","2000");conf.set("hbase.client.retries.number","100");conf.set("hbase.client.operation.timeout","500000");}public static void main(String[] args)throws Exception{InputStream foin = new FileInputStream(args[2]);Properties prop = new Properties();prop.load(foin);foin.close();String cloumns = prop.getProperty(args[0]).trim();conf.set("cloumns", cloumns);//xxx为自己起的任务名Job job = Job.getInstance(conf, "xxx");job.setJarByClass(HbaseToHdfs.class);job.setMapperClass(HbaseToHdfs.MyMapper.class);job.setNumReduceTasks(0);TableMapReduceUtil.initTableMapperJob(initScans(job, args[0]), HbaseToHdfs03.MyMapper.class,NullWritable.class, Text.class, job);FileSystem fs = FileSystem.get(conf);if (fs.exists(new Path(args[1]))) {fs.delete(new Path(args[1]));}FileOutputFormat.setOutputPath(job, new Path(args[1]));long start = System.currentTimeMillis();try{job.waitForCompletion(true);}finally{fs.setPermission(new Path(args[1]), new FsPermission("777"));FileStatus[] files = fs.listStatus(new Path(args[1]));for (FileStatus fileStatus : files){Path p = fileStatus.getPath();fs.setPermission(p, new FsPermission("777"));}fs.close();long end = System.currentTimeMillis();logger.info("Job<" + job.getJobName() + ">是否执行成功:" + job.isSuccessful() + ";开始时间:" + start + "; 结束时间:" + end + "; 用时:" + (end - start) + "ms");}}private static List<Scan> initScans(Job job, String tableName){Configuration conf = job.getConfiguration();Scan scan = new Scan();scan.setAttribute("scan.attributes.table.name", Bytes.toBytes(tableName));return Lists.newArrayList(new Scan[] { scan });}public static class MyMapperextends TableMapper<NullWritable, Text>{String cloumns = "";protected void map(ImmutableBytesWritable key, Result r, Mapper<ImmutableBytesWritable, Result, NullWritable, Text>.Context context)throws IOException, InterruptedException{if (r != null){String all = "";int j = 0;for (String cloumn : this.cloumns.split(",")){j++;String s = "";try{//xxx为列簇名byte[] p = r.getValue("xxx".getBytes(), cloumn.getBytes());if (p != null){//设置编码集及去除一下跟分隔符冲突的内容,这里可以自定义s = new String(p, "UTF-8");s = s.replaceAll("\\n", "").replaceAll("\\r", "");s = s.replaceAll(",", ".");s = s.replaceAll(";", ".");if ("NULL".equals(s)) {s = "";}}}catch (Exception e){System.out.println("111");s = "";}if (j == 1) {all = s;} else {//这里设置到hdfs的分隔符为逗号all = all + "," + s;}}context.write(NullWritable.get(), new Text(all));}}protected void setup(Mapper<ImmutableBytesWritable, Result, NullWritable, Text>.Context context)throws IOException, InterruptedException{Configuration conf = context.getConfiguration();this.cloumns = conf.get("cloumns");}}
}

使用方法如下:
传入三个参数
第一个为hbase的表名
第二个为hdfs的写入路径
第三个为一个配置文件
里面写的格式为:
hbase表名=准备写到hive的字段列表,以逗号分开
maven依赖如下:

<dependencies><dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-hadoop</artifactId><version>1.10.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.6.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>2.6.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.0.0-cdh5.5.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-common --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-common</artifactId><version>1.0.0-cdh5.5.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase</artifactId><version>1.0.0-cdh5.5.0</version><type>pom</type></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-protocol</artifactId><version>1.0.0-cdh5.5.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.0.0-cdh5.5.0</version></dependency></dependencies>

java mapreduce 读hbase数据 写入hdfs 含maven依赖相关推荐

  1. spark读Hbase数据集成Hbase Filter(过滤器)

    文章目录 过滤器简介 spark 读Hbase集成Filter TableInputFormat 源码 代码示例 基于hbase版本2.3.5 过滤器简介 Hbase 提供了种类丰富的过滤器(filt ...

  2. Logstash读取Kafka数据写入HDFS详解

    强大的功能,丰富的插件,让logstash在数据处理的行列中出类拔萃 通常日志数据除了要入ES提供实时展示和简单统计外,还需要写入大数据集群来提供更为深入的逻辑处理,前边几篇ELK的文章介绍过利用lo ...

  3. HBase - 数据写入流程解析

    本文由  网易云 发布. 作者:范欣欣 本篇文章仅限内部分享,如需转载,请联系网易获取授权. 众所周知,HBase默认适用于写多读少的应用,正是依赖于它相当出色的写入性能:一个100台RS的集群可以轻 ...

  4. elasticsearch备份与恢复4_使用ES-Hadoop将ES中的索引数据写入HDFS中

    背景知识见链接:elasticsearch备份与恢复3_使用ES-Hadoop将HDFS数据写入Elasticsearch中 项目参考<Elasticsearch集成Hadoop最佳实践> ...

  5. Flink读写系列之-读HBase并写入HBase

    这里读HBase提供两种方式,一种是继承RichSourceFunction,重写父类方法,一种是实现OutputFormat接口,具体代码如下: 方式一:继承RichSourceFunction p ...

  6. java数据写入文件方案,Java如何将字符串数据写入文件?

    package org.nhooo.example.commons.io; import org.apache.commons.io.FileUtils; import java.io.File; i ...

  7. Python 读pdf数据写入Excel表中

    ​ ​ 活动地址:CSDN21天学习挑战赛 目录 一.Python操作PDF的库有很多 二.pdflumber作为案例讲解使用 2.安装配置 2.加载PDF 3.读取pdf文档信息 1)读取pdf文档 ...

  8. java 生成parquet文件格式,使用Java API将Parquet格式写入HDFS,而不使用Avro和MR

    What is the simple way to write Parquet Format to HDFS (using Java API) by directly creating Parquet ...

  9. java单元测试读文件数据_如何将文本文件资源读入Java单元测试?

    本问题已经有最佳答案,请猛点这里访问. 我有一个单元测试需要使用位于src/test/resources/abc.xml中的XML文件.把文件的内容放到String中最简单的方法是什么? 另外,sta ...

最新文章

  1. 为什么 P8 程序员的代码你写不出来?零拷贝了解一下
  2. 基于Spark的大规模推荐系统特征工程
  3. 浏览器上网 (Safari Chrome)
  4. RedHat6.2 x86手动配置LNMP环境
  5. 要闻君说:谷歌云重磅发布两大技术平台;以后可以打飞滴了?SAP重组动荡;微软宣布 Azure Functions 支持 Java...
  6. 测试结果表明开车打手机比酒后开车更危险
  7. Objectove-c单例模式
  8. 任正非:外籍员工可当华为 CEO,但有条件;苹果欲研发“智能戒指”;MySQL 8.0.18 稳定版发布 | 极客头条...
  9. ceph的读写性能测试
  10. I2c驱动i2c_master_send()和i2c_master_recv()用法
  11. MFC中将view内容保存为bmp
  12. 天联高级版服务器信息怎么查,天联高级版
  13. 畅捷通(chanjet)T3各版本
  14. 图片去水印的原理_图片去水印方法 图片如何去掉水印
  15. 利用学信网免费激活PyCharm企业版(也适用所有其它JetBrains的IDE)
  16. python计算圆周率_用python程序求圆周率到任意位
  17. vmware下虚拟机自动换ip的解决办法
  18. 读易[2]·该出手时就出手(乾卦)
  19. 能带图最好的理解——克朗尼格-朋奈模型(Kronig-Penney模型)
  20. 十二星座 谁是 “小笨猪” ?

热门文章

  1. EXCEL不同表头的多表合并VBA
  2. js 对一个字段去重_js面试
  3. 3分钟:带你看懂Comos Hub的治理模型
  4. iphone录制脚本_如何在iPhone上录制和编辑慢动作视频
  5. layui获取复选框的值
  6. android 实现半个圆角长方形
  7. Ghostscript和Gsview下载安装,matlab图形去白边方法汇总
  8. 统计学基础理论学习(1)
  9. 企业计算机管理水平评价,企业两化融合管理水平诊断评估问卷
  10. 顶层设计理论和方法概述