HBase结合MapReduce批量导入
Hbase是Hadoop生态体系配置的数据库,我们可以通过HTable api中的put方法向Hbase数据库中插入数据,但是由于put效率太低,不能批量插入大量的数据,文本将详细介绍如何通过MapReduce运算框架向Hbase数据库中导入数据。
开篇先介绍业务场景:将电信手机上网日志中的数据导入到Hbase数据库中,将部分数据以及相应字段描述列出:
图片格式描述:
先介绍一个日期格式的转换:public class TestDate {public static void main(String[] args){Date d = new Date();SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String time = df.format(d);System.out.println(time);} } /*2016-05-14 13:32:24*/
在Java当中,我们经常利用SimpledateFormat这个类将给定的日期转化成指定的格式。接下来在归纳一下Hbase结合MapReduce批量导入数据的时候,在代码当中应该注意的事项:①MyReducer类继承的是TableReduce类,而不在是MapReduce中常用的Reducer类②的数值类型没有什么用,通常将k3的数值类型设置为NullWritable即可③只设置map函数的输出类型,不在设置reduce函数的输出类型,因为②的原因④指定对输出文件格式化处理的类改为TableOutputFormat,而不在是TextOutputFormat⑤输出文件的路径改为指定的表名,在Configuration中进行设定,而不在是path的方式⑥如果想过jar包的方式运行程序,貌似还需要加入什么jar包,我没有整出来。接下来将贴出我在编程的时候第一次写出的业务代码:当然遇到了很多的问题。
package IT01; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class HbaseApp {public static String path1 = "hdfs://hadoop80:9000/FlowData.txt";public static void main(String[] args) throws Exception{Configuration conf = new Configuration();conf.set("hbaser.rootdir","hdfs://hadoop80:9000/hbase");conf.set("hbase.zookeeper.quorum","hadoop80");conf.set(TableOutputFormat.OUTPUT_TABLE,"wlan_log");//在这里需要指定表的名字:相当于输出文件的路径conf.set("dfs.socket.timeout","2000");Job job = new Job(conf,"HbaseApp");FileInputFormat.setInputPaths(job, new Path(path1));job.setInputFormatClass(TextInputFormat.class);job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setNumReduceTasks(1);job.setPartitionerClass(HashPartitioner.class);job.setReducerClass(MyReducer.class); // job.setOutputKeyClass(Text.class); // job.setOutputValueClass(NullWritable.class);job.setOutputFormatClass(TableOutputFormat.class); // FileOutputFormat.setOutputPath(job, new Path(path2));job.waitForCompletion(true);}public static class MyMapper extends Mapper{protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException{String[] splited = v1.toString().split("\t");String reportTime = splited[0];String msisdn = splited[1];Date date = new Date(Long.parseLong(reportTime));String time = DateConvert.dateParse(date);String rowkey = msisdn+":"+time;//获取到行健context.write(new Text(rowkey),new Text(v1.toString())); }}public static class MyReducer extends TableReducer{protected void reduce(Text k2, Iterablev2s,Context context)throws IOException, InterruptedException{for (Text v2 : v2s){String[] splited = v2.toString().split("\t");/**添加记录的时候需要指定行健、列族、列名、数值***/Put put = new Put(k2.toString().getBytes());put.add("cf".getBytes(),"reportTime".getBytes(), splited[0].getBytes());put.add("cf".getBytes(),"msisdn".getBytes(), splited[1].getBytes());put.add("cf".getBytes(),"apmac1".getBytes(), splited[2].getBytes());put.add("cf".getBytes(),"apmac2".getBytes(), splited[3].getBytes());put.add("cf".getBytes(),"host".getBytes(), splited[4].getBytes());put.add("cf".getBytes(),"sitetype".getBytes(), splited[5].getBytes());put.add("cf".getBytes(),"upPackNum".getBytes(), splited[6].getBytes());put.add("cf".getBytes(),"downPackNum".getBytes(), splited[7].getBytes());put.add("cf".getBytes(),"upPayLoad".getBytes(), splited[8].getBytes());put.add("cf".getBytes(),"downPayLoad".getBytes(), splited[9].getBytes());put.add("cf".getBytes(),"httpstatus".getBytes(), splited[10].getBytes());context.write(NullWritable.get(),put);} }} } class DateConvert {public static String dateParse(Date date){SimpleDateFormat df = new SimpleDateFormat("yyyyMMddhhmmss");//构造一个日期解析器return df.format(date); } }
程序运行完之后:显示如下异常NumberFormatException
显示的是数字格式异常, 于是我在map函数当中又加了一个throws NumberFormatExceptionprotected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException,NumberFormatException{String[] splited = v1.toString().split("\t");String reportTime = splited[0];String msisdn = splited[1];Date date = new Date(Long.parseLong(reportTime));String time = DateConvert.dateParse(date);String rowkey = msisdn+":"+time;//获取到行健context.write(new Text(rowkey),new Text(v1.toString())); }
但是这样我发现也不对,因为当我追踪Mapp这个类的源代码时,我发现父类的map方法并没有抛出NumberFormatException这个异常,根据子类重写方法抛出异常的范围不能大于父类被重写方法抛出异常的范围,我又将上面这段代码用try——catch这种异常处理方式进行处理:
protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException{try{String[] splited = v1.toString().split("\t");String reportTime = splited[0];String msisdn = splited[1];Date date = new Date(Long.parseLong(reportTime));String time = DateConvert.dateParse(date);String rowkey = msisdn+":"+time;//获取到行健context.write(new Text(rowkey),new Text(v1.toString())); }catch(Exception e){Counter counter = context.getCounter("NumberExceptionNum", "num");counter.increment(1L);}}
当我将代码改成这样的时候,此时程序并没有显示抛出NumberFormatException这个异常,说明异常得到了处理,但是当我去查看Hbase数据的时候,发现HDFS中的日志数据并没有导入到Hbase数据库中,于是我又查看了一下MapReduce的运行日志:
也就是我的22行数据在map函数中当中并没有输出,这个问题就匪夷所思了,为什么22行数据都会抛出数字格式异常呢,而且都没有输出,于是我想到可能是SimpleDateFZ喎"/kf/ware/vc/" target="_blank" class="keylink">vcm1hdNXiuPbA4LXEzsrM4qOs09rKx87S09a/qsq8uPfW1rDZtsijrLeiz9bN+MnPuty24M7E1cK2vMrHxfrF0NXiuPbA4LXEo6zX7tbV1tXT2tXStb3By87KzOK1xL3ivva3vbC4o6zTw3RyaW0oKdXiuPa3vbeoyKWz/dfWt/u0rsewuvO1xL/VuPG8tL/JoaM8L3A+DQo8cHJlIGNsYXNzPQ=="brush:java;">protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException { try { String[] splited = v1.toString().split("\t"); String reportTime = splited[0].trim(); String msisdn = splited[1].trim(); Date date = new Date(Long.parseLong(reportTime)); String time = DateConvert.dateParse(date); String rowkey = msisdn+":"+time;//获取到行健 context.write(new Text(rowkey),new Text(v1.toString())); }catch(Exception e) { Counter counter = context.getCounter("NumberExceptionNum", "num"); counter.increment(1L); } }
于是我又开始运行程序,但是当我运行完之后,从MapReduce的计数器当中,我发现第一条数据文本并没有导入:因为数字格式异常的这个原因估计在运行过程中被终止了。下面是计数器的显示:
于是我又想到了一个解决方案,将第一条数据多复制一条即可,然后重写将数据上传到HDFS中。
此时在一次 运行程序,显示正确,此时数据也全部导入到Hbase数据库中。
Hbase中数据查看核实:
将HDFS中的数据通过MapReduce导入到Hbase数据库时,总结如下:
核心步骤:先将数据文件上传到HDFS,然后用MapReduce进行处理,将处理后的数据插入到 hbase中
注意事项:
1>子类重写方法抛出异常的范围不能大于父类被重写方法抛出异常的范围
2>用trim()这个方法可以去除字符串前后的空格,换行符。
3>既然第一条数据总是显示数字格式异常,将第一条数据复制为2份即可。
转载于:https://blog.51cto.com/2226894115/1896517
HBase结合MapReduce批量导入相关推荐
- java hdfs导入hbase_使用BulkLoad批量导入数据到HBase中
说明 BulkLoad将数据批量导入HBase中.支持通过命令行和API两种操作方式. 命令行包含如下两个步骤: ImportTsv CompleteBulkLoad 准备工作 在HDFS创建临时目录 ...
- ubuntu导入python的包_在ubuntu环境下怎么利用python将数据批量导入数据hbase
斯蒂芬大帝 能够单条导入就能够批量导入配置 thriftpython使用的包 thrift个人使用的python 编译器是pycharm community edition. 在工程中设置中,找到pr ...
- HBase建表高级属性,hbase应用案例看行键设计,HBase和mapreduce结合,从Hbase中读取数据、分析,写入hdfs,从hdfs中读取数据写入Hbase,协处理器和二级索引
1. Hbase高级应用 1.1建表高级属性 下面几个shell 命令在hbase操作中可以起到很到的作用,且主要体现在建表的过程中,看下面几个create 属性 1. BLOOMFILTER 默认是 ...
- Hbase-day05_bulkLoad实现批量导入_HBase中rowkey的设计_二级索引_Phoenix二级索引
hbase-day05 1.bulkLoad实现批量导入 优点: 如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 "Bul ...
- HBase 与 MapReduce 集成
6. HBase 与 MapReduce 集成 6.1 官方 HBase 与 MapReduce 集成 查看 HBase 的 MapReduce 任务的执行:bin/hbase mapredcp; 环 ...
- HBase与MapReduce
HBase API操作 MapReduce 通过HBase的相关JavaAPI,我们可以实现伴随HBase操作的MapReduce过程,比如使用MapReduce将数据从本地文件系统导入到HBase的 ...
- HBase:HBase与MapReduce的集成
HBase与MapReduce的集成 HBase当中的数据最终都是存储在HDFS上面的,HBase天生的支持MR的操作,我们可以通过MR直接处理HBase当中的数据,并且MR可以将处理后的结果直接存储 ...
- 怎么接收layui上传的文件_layui 上传文件_批量导入数据UI的方法
使用layui的文件上传组件,可以方便的弹出文件上传界面. 效果如下: 点击[批量导入]按钮调用js脚本importData(config)就可以实现数据上传到服务器. 脚本: /*** * 批量导入 ...
- AD下批量导入域用户
如果您的域环境比较大,那么设置用户可能会不方便,就"新建用户"都可能重复做上几十遍....是不是很.....呵呵... 下面介绍一个工具"csvde.exe", ...
最新文章
- elasticsearch查询及logstash简介
- Java Math 类中的新功能--浮点数
- 使用IDEA创建Maven项目和Maven使用入门(配图详解)
- Hbase2.0版本安装教程
- php常见漏洞修复,phpstudy漏洞修复方法
- 如何在Red Hat Linux上安装和配置FreeIPA
- pyqt5 -——菜单和工具栏
- 互联网之达芬奇密码:浪潮揭秘:与中国五亿网民互为影响的互联网DNA
- 通过矩阵操作实现点的2D线性变换(几何变换、仿射变换)
- 51单片机和52单片机区别是什么?51仿真器有必要买吗?
- windows屏幕放大镜
- java自定义font_java – 设置自定义字体
- 最详细的Keycloak教程:Keycloak实现手机号、验证码登陆——(一)Keycloak的下载与使用
- 如何实现3台计算机网络传递文件,两台电脑如何实现对拷,三种办法轻松搞定!...
- python之网络部分
- JAVA开发中常见问题
- 关于我开始使用博客的这件事
- Linux上开启tftp服务,Linux中配置tftp服务
- 2021-04-24
- opencart seo优化_OpenCart商品与目录页标题SEO优化
热门文章
- 0.数据结构学习笔记大纲
- 【转】Android 面试题笔记-------android五种布局及其作用
- SQL 左外连接,右外连接,全连接,内连接
- java学习规划-转的
- hashmap hashtable 的区别
- linux时间轮算法,关于时间轮的设计 linux hashed Hierarchical timing wheel
- 作为一个程序员。数学重要吗,下面python大牛告诉你
- Java中intentfiler_【Android - 组件】之IntentFilter的匹配规则
- 如何访问Linux服务器中RabbitMQ管理页面
- 来看看企业如何拥抱混合云?