利用Hadoop Streaming处理二进制格式文件
Hadoop Streaming是Hadoop提供的多语言编程工具,用户可以使用自己擅长的编程语言(比如python、php或C#等)编写Mapper和Reducer处理文本数据。Hadoop Streaming自带了一些配置参数可友好地支持多字段文本数据的处理,参与Hadoop Streaming介绍和编程,可参考我的这篇文章: “Hadoop Streaming编程实例” 。然而,随着Hadoop应用越来越广泛,用户希望Hadoop Streaming不局限在处理文本数据上,而是具备更加强大的功能,包括能够处理二进制数据;能够支持多语言编写Combiner等组件。随着Hadoop 2.x的发布,这些功能已经基本上得到了完整的实现,本文将介绍如何使用Hadoop Streaming处理二进制格式的文件,包括 SequenceFile , HFile 等。
注:本文用到的程序实例可在百度云:hadoop-streaming-binary-examples 下载。
在详细介绍操作步骤之前,先介绍本文给出的实例。假设有这样的SequenceFile,它保存了手机通讯录信息,其中,key是好友名,value是描述该好友的一个结构体或者对象,为此,本文使用了google开源的protocol buffer这一序列化/反序列化框架,protocol buffer结构体定义如下:
1
2
3
4
5
6
7
8
9
|
option java_package = "" ;
option java_outer_classname= "PersonInfo" ;
message Person {
optional string name = 1 ;
optional int32 age = 2 ;
optional int64 phone = 3 ;
optional string address = 4 ;
}
|
SequenceFile文件中的value便是保存的Person对象序列化后的字符串,这是典型的二进制数据,不能像文本数据那样可通过换行符解析出每条记录,因为二进制数据的每条记录中可能包含任意字符,包括换行符。
一旦有了这样的SequenceFile之后,我们将使用Hadoop Streaming编写这样的MapReduce程序:这个MapReduce程序只有Map Task,任务是解析出文件中的每条好友记录,并以name \t age,phone,address的文本格式保存到HDFS上。
1. 准备数据
首先,我们需要准备上面介绍的SequenceFile数据,生成数据的核心代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
|
final SequenceFile.Writer out =
SequenceFile.createWriter(fs, getConf(), new Path(args[ 0 ]),
Text. class , BytesWritable. class );
Text nameWrapper = new Text();
BytesWritable personWrapper = new BytesWritable();
System.out.println( "Generating " + num + " Records......" );
for ( int i = 0 ; i < num; i++) {
genOnePerson(nameWrapper, personWrapper);
System.out.println( "Generating " + i + " Records," + nameWrapper.toString() + "......" );
out.append(nameWrapper, personWrapper);
}
out.close();
|
当然,为了验证我们产生的数据是否正确,需要编写一个解析程序,核心代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
|
Reader reader = new Reader(fs, new Path(args[ 0 ]), getConf());
Text key = new Text();
BytesWritable value = new BytesWritable();
while (reader.next(key, value)) {
System.out.println( "key:" + key.toString());
value.setCapacity(value.getSize()); // Very important!!! Very Tricky!!!
PersonInfo.Person person = PersonInfo.Person.parseFrom(value.getBytes());
System.out.println( "age:" + person.getAge()
+ ",address:" + person.getAddress()
+ ",phone:" + person.getPhone());
}
reader.close();
|
需要注意的,Value保存类型为BytesWritable,使用这个类型非常容易犯错误。当你把一堆byte[]数据保存到BytesWritable后,通过BytesWritable.getBytes()再读到的数据并不一定是原数据,可能变长了很多,这是因为BytesWritable采用了自动内存增长算法,你保存的数据长度为size时,它可能将数据保存到了长度为capacity(capacity>size)的buffer中,这时候,你通过BytesWritable.getBytes()得到的数据最后一些字符是多余的,如果里面保存的是protocol buffer序列化后的字符串,则无法反序列化,这时候可以使用BytesWritable.setCapacity (value.getSize())将后面多余空间剔除掉。
2. 使用Hadoop Streaming编写C++程序
为了说明Hadoop Streaming如何处理二进制格式数据,本文仅仅以C++语言为例进行说明,其他语言的设计方法类似。
先简单说一下原理。当输入数据是二进制格式时,Hadoop Streaming会对输入key和value进行编码后,通过标准输入传递给你的Hadoop Streaming程序,目前提供了两种编码格式,分别是rawtypes和 typedbytes,你可以设计你想采用的格式,这两种编码规则如下(具体在文章“Hadoop Streaming高级编程”中已经介绍了):
rawbytes:key和value均用【4个字节的长度+原始字节】表示
typedbytes:key和value均用【1字节类型+4字节长度+原始字节】表示
本文将采用第一种编码格式进行说明。采用这种编码意味着你不能想文本数据那样一次获得一行内容,而是依次获得key和value序列,其中key和value都由两部分组成,第一部分是长度(4个字节),第二部分是字节内容,比如你的key是dongxicheng,value是goodman,则传递给hadoop streaming程序的输入数据格式为11 dongxicheng 7 goodman。为此,我们编写下面的Mapper程序解析这种数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
int main() {
string key, value;
while (!cin.eof()) {
if (!FileUtil::ReadString(key, cin))
break ;
FileUtil::ReadString(value, cin);
Person person;
ProtoUtil::ParseFromString(value, person);
cout << person.name() << "\t" << person.age()
<< "," << person.address()
<< "," << person.phone() << endl;
}
return 0;
}
|
其中,辅助函数实现如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
class ProtoUtil {
public :
static bool ParseFromString( const string& str, Person &person) {
if (person.ParseFromString(str))
return true ;
return false ;
}
};
class FileUtil {
public :
static bool ReadInt(unsigned int *len, istream &stream) {
if (!stream.read(( char *)len, sizeof (unsigned int )))
return false ;
*len = bswap_32(*len);
return true ;
}
static bool ReadString(string &str, istream &stream) {
unsigned int len;
if (!ReadInt(&len, stream))
return false ;
str.resize(len);
if (!ReadBytes(&str[0], len, stream))
return false ;
return true ;
}
static bool ReadBytes( char *ptr, unsigned int len, istream &stream) {
stream.read(ptr, sizeof (unsigned char ) * len);
if (stream.eof()) return false ;
return true ;
}
};
|
该程序需要注意以下几点:
(1)注意大小端编码规则,解析key和value长度时,需要对长度进行字节翻转。
(2)注意循环结束条件,仅仅靠!cin.eof()判定是不够的,仅靠这个判定会导致多输出一条重复数据。
(3)本程序只能运行在linux系统下,windows操作系统下将无法运行,因为windows下的标准输入cin并直接支持二进制数据读取,需要将其强制以二进制模式重新打开后再使用。
3. 程序测试与运行
程序写好后,第一步是编译C++程序。由于该程序需要运行在多节点的Hadoop集群上,为了避免部署或者分发动态库带来的麻烦,我们直接采用静态编译方式,这也是编写Hadoop C++程序的基本规则。为了静态编译以上MapReduce程序,安装protocol buffers时,需采用以下流程(强调第一步),
./configure –disable-shared
make –j4
make install
然后使用以下命令编译程序,生成可执行文件ProtoMapper:
g++ -o ProtoMapper ProtoMapper.cpp person.pb.cc `pkg-config –cflags –static –libs protobuf` -lpthread
在正式将程序提交到Hadoop集群之前,需要先在本地进行测试,本地测试运行脚本如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
#!/bin/bash
HADOOP_HOME= /opt/dong/yarn-client
INPUT_PATH= /tmp/person . seq
OUTPUT_PATH= file : ///tmp/output111
echo "Clearing output path: $OUTPUT_PATH"
$HADOOP_HOME /bin/hadoop fs -rmr $OUTPUT_PATH
${HADOOP_HOME} /bin/hadoop jar\
${HADOOP_HOME} /share/hadoop/tools/lib/hadoop-streaming-2 .2.0.jar\
-D mapred.reduce.tasks=0\
-D stream.map.input=rawbytes\
-files ProtoMapper\
-jt local \
-fs local \
-input $INPUT_PATH\
-output $OUTPUT_PATH\
-inputformat SequenceFileInputFormat\
-mapper ProtoMapper
|
注意以下几点:
(1)使用stream.map.input指定输入数据解析成rawbytes格式
(2) 使用-jt和-fs两个参数将程序运行模式设置为local模式
(3)使用-inputformat指定输入数据格式为SequenceFileInputFormat
(4)使用mapred.reduce.tasks将Reduce Task数目设置为0
在本地tmp/output111目录下查看测试结果是否正确,如果没问题,可改写该脚本(去掉-fs和-jt两个参数,将输入和输出目录设置成HDFS上的目录),将程序直接运行在Hadoop上。
原创文章,转载请注明: 转载自董的博客
本文链接地址: http://dongxicheng.org/mapreduce-nextgen/hadoop-streaming-process-binary-data/
利用Hadoop Streaming处理二进制格式文件相关推荐
- vue 文件转换二进制_Vue利用Blob下载原生二进制数组文件
本文实例为大家分享了Vue利用Blob下载原生二进制数组文件的具体代码,供大家参考,具体内容如下 在服务端推送过来的二进制数组(JSON格式),在前端要处理成JS原生数组以后才能做成Blob,有两个地 ...
- 利用转换流将GBK格式文件以UTF-8输出
3.利用转换流将GBK格式文件以UTF-8输出 解题思路: 1,InputStreamReader(File file,"gbk");读入文件 2,Outp ...
- [音视媒体制作][教程]利用tmpgenc实现rm,rmvb格式文件的VCD制作
撰稿人:chnechen (修改于2006-01-11 ) 需要软件: 1.tmpgenc plu 2.53/2.54(俗称:小日本) 2.HappyShow V4.11 (或者K-Lite Meg ...
- Python 技术篇-利用pdfkit库实现html格式文件转换PDF文档实例演示
准备: 首先需要安装 pdfkit 库,使用 pip install pdfkit 命令就好了. 还需要安装 wkhtmltopdf 工具,本质就是利用这个工具来进行转换,pdfkit 库就是作为接口 ...
- Hadoop Streaming高级编程
1. 概要 本文主要介绍了Hadoop Streaming的一些高级编程技巧,包括,怎样在mapredue作业中定制输出输出格式?怎样向mapreduce作业中传递参数?怎么在mapreduce作业中 ...
- Hadoop Streaming
Hadoop Streaming Hadoop Streaming Hadoop Streaming Streaming工作原理 将文件打包到提交的作业中 Streaming选 ...
- mysql 二进制 存储格式化_解析MYSQL BINLOG 二进制格式(2)--FORMAT_DESCRIPTION_EVENT
原创:转载请说明出处谢谢! 上接 http://blog.itpub.net/7728585/viewspace-2133188/ 参考源: 1.源码log_event.h log_event.cc ...
- opencv把图片转换成二进制_Python+OpenCV实现将图像转换为二进制格式
Python+OpenCV实现将图像转换为二进制格式 发布时间:2020-09-20 20:30:58 来源:脚本之家 阅读:68 作者:大蛇王 在学习tensorflow的过程中,有一个问题,ten ...
- mariadb通用二进制格式安装
一.MariaDB安装介绍: 对于通用二进制格式的包,我们只需要解压缩后就能够使用数据库,听起来很容易,但必须要注意一些问题.二进制 格式的程序包是已经编译好的二进制程序,所以里边有很多脚本都是在固定 ...
最新文章
- Transformer又出新变体∞-former:无限长期记忆,任意长度上下文
- 【java】简单的方式实现文本文件的读写
- 查询GC得到森林里主域和子域的帐号
- WCF分布式开发常见错误(26):Authentication failed
- Redis(5种数据类型)
- EvenBus源码分析
- JVM从入门到精通(九):JVM调优实战 - arthas 的使用
- EventUtil.addHandler方法
- [html] 网页上的验证码是为了解决什么问题?说说你了解的验证码种类有哪些
- ASP.net中实现双表格同步缩放不变形
- 强化学习组队学习task06——DDPG 算法
- 如何用Python实现杨辉三角和心
- AnyForWeb告诉你什么才是“最好的”编程语言
- python中空间的位置怎么放置_如何在空间中对齐一个位置?
- 两种方式打开jar文件
- java 连接sybase数据库_Jdbc连Sybase数据库的几种方法_MySQL
- 亦真亦幻,A股区块链板块含金量几何?
- word脚注、尾注小技巧|怎么删除尾注的横线|怎么快速删除页眉的横线|怎么快速将尾注和脚注转为带方括号的格式
- Redis的那些事儿:关系型和非关系型数据库,非关系型数据库的类型,redis数据类型、编码格式、高性能、可以做什么、分布式锁失效的原因,string为采用sds数据类型,为什么是二进制安全的,
- 【C语言初阶】求最小公倍数的三种方法
热门文章
- sensor曝光量和曝光行的区别_4个要点,告诉你拼多多新的产品怎么增加曝光量!...
- MyBatis-02 MyBatis XML方式概述及配置步骤
- SQL Server数据库基本操作(一)
- IDEA中的项目没有被SVN管理解决办法
- 剑指offer07.重建二叉树
- 华为Android9.0谷歌框架,华为Mate9怎样登陆谷歌商店 Mate9如何安装谷歌服务框架【详解】...
- Ubuntu下C语言Debug工具GDB【1】安装和使用
- mysql 1个月多少天_在MySQL日期间隔中,1个月是否与30天相同? 1个季度与3个月相同吗?等等?...
- matlab 矩阵序列R6(n),MATLAB___09年试题加答案
- html内容显示重叠了,HTML:将DIV内容并排放置而不重叠