java mapreduce 读hbase数据 写入hdfs 含maven依赖
mapreduce 读hbase数据 写入hdfs
java代码如下
import com.google.common.collect.Lists;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;public class HbaseToHdfs
{private static final Logger logger = Logger.getLogger(HbaseToHdfs03.class);public static Configuration conf = HBaseConfiguration.create();static {//hbase的ip和端口conf.set("hbase.master", "xxx:xxx");conf.set("mapreduce.output.fileoutputformat.compress", "false");//不进行压缩//hadoop的配置文件位置conf.addResource(new Path("xxx/core-site.xml"));conf.addResource(new Path("xxx/hdfs-site.xml"));//hbase的配置文件位置conf.addResource(new Path("xxx/hbase-site.xml"));conf.set("hbase.client.pause","2000");conf.set("hbase.client.retries.number","100");conf.set("hbase.client.operation.timeout","500000");}public static void main(String[] args)throws Exception{InputStream foin = new FileInputStream(args[2]);Properties prop = new Properties();prop.load(foin);foin.close();String cloumns = prop.getProperty(args[0]).trim();conf.set("cloumns", cloumns);//xxx为自己起的任务名Job job = Job.getInstance(conf, "xxx");job.setJarByClass(HbaseToHdfs.class);job.setMapperClass(HbaseToHdfs.MyMapper.class);job.setNumReduceTasks(0);TableMapReduceUtil.initTableMapperJob(initScans(job, args[0]), HbaseToHdfs03.MyMapper.class,NullWritable.class, Text.class, job);FileSystem fs = FileSystem.get(conf);if (fs.exists(new Path(args[1]))) {fs.delete(new Path(args[1]));}FileOutputFormat.setOutputPath(job, new Path(args[1]));long start = System.currentTimeMillis();try{job.waitForCompletion(true);}finally{fs.setPermission(new Path(args[1]), new FsPermission("777"));FileStatus[] files = fs.listStatus(new Path(args[1]));for (FileStatus fileStatus : files){Path p = fileStatus.getPath();fs.setPermission(p, new FsPermission("777"));}fs.close();long end = System.currentTimeMillis();logger.info("Job<" + job.getJobName() + ">是否执行成功:" + job.isSuccessful() + ";开始时间:" + start + "; 结束时间:" + end + "; 用时:" + (end - start) + "ms");}}private static List<Scan> initScans(Job job, String tableName){Configuration conf = job.getConfiguration();Scan scan = new Scan();scan.setAttribute("scan.attributes.table.name", Bytes.toBytes(tableName));return Lists.newArrayList(new Scan[] { scan });}public static class MyMapperextends TableMapper<NullWritable, Text>{String cloumns = "";protected void map(ImmutableBytesWritable key, Result r, Mapper<ImmutableBytesWritable, Result, NullWritable, Text>.Context context)throws IOException, InterruptedException{if (r != null){String all = "";int j = 0;for (String cloumn : this.cloumns.split(",")){j++;String s = "";try{//xxx为列簇名byte[] p = r.getValue("xxx".getBytes(), cloumn.getBytes());if (p != null){//设置编码集及去除一下跟分隔符冲突的内容,这里可以自定义s = new String(p, "UTF-8");s = s.replaceAll("\\n", "").replaceAll("\\r", "");s = s.replaceAll(",", ".");s = s.replaceAll(";", ".");if ("NULL".equals(s)) {s = "";}}}catch (Exception e){System.out.println("111");s = "";}if (j == 1) {all = s;} else {//这里设置到hdfs的分隔符为逗号all = all + "," + s;}}context.write(NullWritable.get(), new Text(all));}}protected void setup(Mapper<ImmutableBytesWritable, Result, NullWritable, Text>.Context context)throws IOException, InterruptedException{Configuration conf = context.getConfiguration();this.cloumns = conf.get("cloumns");}}
}
使用方法如下:
传入三个参数
第一个为hbase的表名
第二个为hdfs的写入路径
第三个为一个配置文件
里面写的格式为:
hbase表名=准备写到hive的字段列表,以逗号分开
maven依赖如下:
<dependencies><dependency><groupId>org.apache.parquet</groupId><artifactId>parquet-hadoop</artifactId><version>1.10.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.6.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-mapreduce-client-core --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>2.6.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.0.0-cdh5.5.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-common --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-common</artifactId><version>1.0.0-cdh5.5.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase --><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase</artifactId><version>1.0.0-cdh5.5.0</version><type>pom</type></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-protocol</artifactId><version>1.0.0-cdh5.5.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.0.0-cdh5.5.0</version></dependency></dependencies>
java mapreduce 读hbase数据 写入hdfs 含maven依赖相关推荐
- spark读Hbase数据集成Hbase Filter(过滤器)
文章目录 过滤器简介 spark 读Hbase集成Filter TableInputFormat 源码 代码示例 基于hbase版本2.3.5 过滤器简介 Hbase 提供了种类丰富的过滤器(filt ...
- Logstash读取Kafka数据写入HDFS详解
强大的功能,丰富的插件,让logstash在数据处理的行列中出类拔萃 通常日志数据除了要入ES提供实时展示和简单统计外,还需要写入大数据集群来提供更为深入的逻辑处理,前边几篇ELK的文章介绍过利用lo ...
- HBase - 数据写入流程解析
本文由 网易云 发布. 作者:范欣欣 本篇文章仅限内部分享,如需转载,请联系网易获取授权. 众所周知,HBase默认适用于写多读少的应用,正是依赖于它相当出色的写入性能:一个100台RS的集群可以轻 ...
- elasticsearch备份与恢复4_使用ES-Hadoop将ES中的索引数据写入HDFS中
背景知识见链接:elasticsearch备份与恢复3_使用ES-Hadoop将HDFS数据写入Elasticsearch中 项目参考<Elasticsearch集成Hadoop最佳实践> ...
- Flink读写系列之-读HBase并写入HBase
这里读HBase提供两种方式,一种是继承RichSourceFunction,重写父类方法,一种是实现OutputFormat接口,具体代码如下: 方式一:继承RichSourceFunction p ...
- java数据写入文件方案,Java如何将字符串数据写入文件?
package org.nhooo.example.commons.io; import org.apache.commons.io.FileUtils; import java.io.File; i ...
- Python 读pdf数据写入Excel表中
活动地址:CSDN21天学习挑战赛 目录 一.Python操作PDF的库有很多 二.pdflumber作为案例讲解使用 2.安装配置 2.加载PDF 3.读取pdf文档信息 1)读取pdf文档 ...
- java 生成parquet文件格式,使用Java API将Parquet格式写入HDFS,而不使用Avro和MR
What is the simple way to write Parquet Format to HDFS (using Java API) by directly creating Parquet ...
- java单元测试读文件数据_如何将文本文件资源读入Java单元测试?
本问题已经有最佳答案,请猛点这里访问. 我有一个单元测试需要使用位于src/test/resources/abc.xml中的XML文件.把文件的内容放到String中最简单的方法是什么? 另外,sta ...
最新文章
- 为什么 P8 程序员的代码你写不出来?零拷贝了解一下
- 基于Spark的大规模推荐系统特征工程
- 浏览器上网 (Safari Chrome)
- RedHat6.2 x86手动配置LNMP环境
- 要闻君说:谷歌云重磅发布两大技术平台;以后可以打飞滴了?SAP重组动荡;微软宣布 Azure Functions 支持 Java...
- 测试结果表明开车打手机比酒后开车更危险
- Objectove-c单例模式
- 任正非:外籍员工可当华为 CEO,但有条件;苹果欲研发“智能戒指”;MySQL 8.0.18 稳定版发布 | 极客头条...
- ceph的读写性能测试
- I2c驱动i2c_master_send()和i2c_master_recv()用法
- MFC中将view内容保存为bmp
- 天联高级版服务器信息怎么查,天联高级版
- 畅捷通(chanjet)T3各版本
- 图片去水印的原理_图片去水印方法 图片如何去掉水印
- 利用学信网免费激活PyCharm企业版(也适用所有其它JetBrains的IDE)
- python计算圆周率_用python程序求圆周率到任意位
- vmware下虚拟机自动换ip的解决办法
- 读易[2]·该出手时就出手(乾卦)
- 能带图最好的理解——克朗尼格-朋奈模型(Kronig-Penney模型)
- 十二星座 谁是 “小笨猪” ?