Hadoop pipes编程
1. Hadoop pipes编程介绍
Hadoop pipes允许C++程序员编写mapreduce程序,它允许用户混用C++和Java的RecordReader, Mapper, Partitioner,Rducer和RecordWriter等五个组件。关于Hadoop pipes的设计思想,可参见我这篇文章:Hadoop Pipes设计原理。
本文介绍了Hadoop pipes编程的基本方法,并给出了若干编程示例,最后介绍了Hadoop pipes高级编程方法,包括怎样在MapReduce中加载词典,怎么传递参数,怎样提高效率等。
2. Hadoop pipes编程初体验
Hadoop-0.20.2源代码中自带了三个pipes编程示例,它们位于目录src/examples/pipes/impl中,分别为wordcount-simple.cc,wordcount-part.cc和wordcount-nopipe.cc。下面简要介绍一下这三个程序。
(1) wordcount-simple.cc:Mapper和Reducer组件采用C++语言编写,RecordReader, Partitioner和RecordWriter采用Java语言编写,其中,RecordReader 为LineRecordReader(位于InputTextInputFormat中,按行读取数据,行所在的偏移量为key,行中的字符串为value),Partitioner为PipesPartitioner,RecordWriter为LineRecordWriter(位于InputTextOutputFormat中,输出格式为”key\tvalue\n”)
(2) wordcount-part.cc:Mapper,Partitioner和Reducer组件采用C++语言编写,其他采用Java编写
(3)wordcount-nopipe.cc:RecordReader,Mapper,Rducer和RecordWriter采用C++编写
接下来简单介绍一下wordcount-simple.cc的编译和运行方法。
在Hadoop的安装目录下,执行下面命令:
1
|
ant -Dcompile.c++=yes examples
|
则wordcount-simple.cc生成的可执行文件wordcount-simple被保存到了目录build/c++-examples/Linux-amd64-64/bin/中,然后将该可执行文件上传到HDFS的某一个目录下,如/user/XXX/ bin下:
1
|
bin/hadoop -put build/c++-examples/Linux-amd64-64/bin/wordcount-simple /user/XXX/ bin/
|
上传一份数据到HDFS的/user/XXX /pipes_test_data目录下:
1
|
bin/hadoop -put data.txt /user/XXX /pipes_test_data
|
直接使用下面命令提交作业:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
bin/hadoop pipes \
-D hadoop.pipes.java.recordreader= true \
-D hadoop.pipes.java.recordwriter= true \
-D mapred.job.name= wordcount \
-input /user/XXX /pipes_test_data \
-output /user/XXX /pipes_test_output \
-program /user/XXX/ bin/wordcount-simple
|
3. Hadoop pipes编程方法
先从最基础的两个组件Mapper和Reducer说起。
(1) Mapper编写方法
用户若要实现Mapper组件,需继承HadoopPipes::Mapper虚基类,它的定义如下:
1
2
3
4
5
6
7
|
class Mapper: public Closable {
public :
virtual void map(MapContext& context) = 0;
};
|
用户必须实现map函数,它的参数是MapContext,该类的声明如下:
1
2
3
4
5
6
7
8
9
10
11
|
class MapContext: public TaskContext {
public :
virtual const std::string& getInputSplit() = 0;
virtual const std::string& getInputKeyClass() = 0;
virtual const std::string& getInputValueClass() = 0;
};
|
而TaskContext类地声明如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
class TaskContext {
public :
class Counter {
……
public :
Counter( int counterId) : id(counterId) {}
Counter( const Counter& counter) : id(counter.id) {}
……
};
virtual const JobConf* getJobConf() = 0;
virtual const std::string& getInputKey() = 0;
virtual const std::string& getInputValue() = 0;
virtual void emit( const std::string& key, const std::string& value) = 0;
virtual void progress() = 0;
…….
};
|
用户可以从context参数中获取当前的key,value,progress和inputsplit等数据信息,此外,还可以调用emit将结果回传给Java代码。
Mapper的构造函数带有一个HadoopPipes::TaskContext参数,用户可以通过它注册一些全局counter,对于程序调试和跟踪作业进度非常有用:
如果你想注册全局counter,在构造函数添加一些类似的代码:
1
2
3
4
5
6
7
|
WordCountMap(HadoopPipes::TaskContext& context) {
inputWords1 = context.getCounter(“group”, ”counter1”);
inputWords2 = context.getCounter(“group”, ”counter2”);
}
|
当需要增加counter值时,可以这样:
1
2
3
|
context.incrementCounter(inputWords1, 1);
context.incrementCounter(inputWords2, 1);
|
其中getCounter的两个参数分别为组名和组内计数器名,一个组中可以存在多个counter。
用户自定义的counter会在程序结束时,输出到屏幕上,当然,用户可以用通过web界面看到。
(2) Reducer编写方法
Reducer组件的编写方法跟Mapper组件类似,它需要继承虚基类public HadoopPipes::Reducer。
与Mapper组件唯一不同的地方时,map函数的参数类型为HadoopPipes::ReduceContext,它包含一个nextValue()方法,这允许用于遍历当前key对应的value列表,依次进行处理。
接下来介绍RecordReader, Partitioner和RecordWriter的编写方法:
(3) RecordReader编写方法
用户自定义的RecordReader类需要继承虚基类HadoopPipes::RecordReader,它的声明如下:
1
2
3
4
5
6
7
8
9
|
class RecordReader: public Closable {
public :
virtual bool next(std::string& key, std::string& value) = 0;
virtual float getProgress() = 0;
};
|
用户需要实现next和 getProgress两个方法。
用户自定义的RecordReader的构造函数可携带类型为HadoopPipes::MapContext的参数,通过该参数的getInputSplit()的方法,用户可以获取经过序列化的InpuSplit对象,Java端采用不同的InputFormat可导致InputSplit对象格式不同,但对于大多数InpuSplit对象,它们可以提供至少三个信息:当前要处理的InputSplit所在的文件名,所在文件中的偏移量,它的长度。用户获取这三个信息后,可使用libhdfs库读取文件,以实现next方法。
下面介绍一下反序列化InputSplit对象的方法:
【1】 如果Java端采用的InputFormat为WordCountInpuFormat,可以这样:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
class XXXReader: public HadoopPipes::RecordReader {
public :
XXXReader (HadoopPipes::MapContext& context) {
std::string filename;
HadoopUtils::StringInStream stream(context.getInputSplit());
HadoopUtils::deserializeString(filename, stream);
……
};
|
【2】 如果Java端采用的InputFormat为TextInpuFormat,可以这样:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
class XXXReader: public HadoopPipes::RecordReader {
public :
XXXReader (HadoopPipes::MapContext& context) {
std::string filename;
HadoopUtils::StringInStream stream(context.getInputSplit());
readString(filename, stream);
int start = ( int )readLong(stream);
int len = ( int )readLong(stream);
……
private :
void readString(std::string& t, HadoopUtils::StringInStream& stream)
{
int len = readShort(stream);
if (len > 0) {
// resize the string to the right length
t.resize(len);
// read into the string in 64k chunks
const int bufSize = 65536;
int offset = 0;
char buf[bufSize];
while (len > 0) {
int chunkLength = len > bufSize ? bufSize : len;
stream.read(buf, chunkLength);
t.replace(offset, chunkLength, buf, chunkLength);
offset += chunkLength;
len -= chunkLength;
}
} else {
t.clear();
}
}
long readLong(HadoopUtils::StringInStream& stream) {
long n;
char b;
stream.read(&b, 1);
n = ( long )(b & 0xff) << 56 ;
stream.read(&b, 1);
n |= ( long )(b & 0xff) << 48 ;
stream.read(&b, 1);
n |= ( long )(b & 0xff) << 40 ;
stream.read(&b, 1);
n |= ( long )(b & 0xff) << 32 ;
stream.read(&b, 1);
n |= ( long )(b & 0xff) << 24 ;
stream.read(&b, 1);
n |= ( long )(b & 0xff) << 16 ;
stream.read(&b, 1);
n |= ( long )(b & 0xff) << 8 ;
stream.read(&b, 1);
n |= ( long )(b & 0xff) ;
return n;
}
};
|
(4) Partitioner编写方法
用户自定义的Partitioner类需要继承虚基类HadoopPipes:: Partitioner,它的声明如下:
1
2
3
4
5
6
7
8
9
|
class Partitioner {
public :
virtual int partition( const std::string& key, int numOfReduces) = 0;
virtual ~Partitioner() {}
};
|
用户需要实现partition方法和 析构函数。
对于partition方法,框架会自动为它传入两个参数,分别为key值和reduce task的个数numOfReduces,用户只需返回一个0~ numOfReduces-1的值即可。
(5) RecordWriter编写方法
用户自定义的RecordWriter类需要继承虚基类HadoopPipes:: RecordWriter,它的声明如下:
1
2
3
4
5
6
7
8
9
|
class RecordWriter: public Closable {
public :
virtual void emit( const std::string& key,
const std::string& value) = 0;
};
|
用户自定的RecordWriter的构造函数可携带类型为HadoopPipes::MapContext的参数,通过该参数的getJobConf()可获取一个HadoopPipes::JobConf的对象,用户可从该对象中获取该reduce task的各种参数,如:该reduce task的编号(这对于确定输出文件名有用),reduce task的输出目录等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
class MyWriter: public HadoopPipes::RecordWriter {
public :
MyWriter(HadoopPipes::ReduceContext& context) {
const HadoopPipes::JobConf* job = context.getJobConf();
int part = job->getInt( "mapred.task.partition" );
std::string outDir = job->get( "mapred.work.output.dir" );
……
}
}
|
用户需实现emit方法,将数据写入某个文件。
4. Hadoop pipes编程示例
网上有很多人怀疑Hadoop pipes自带的程序wordcount-nopipe.cc不能运行,各个论坛都有讨论,在此介绍该程序的设计原理和运行方法。
该运行需要具备以下前提:
(1) 采用的InputFormat为WordCountInputFormat,它位于src/test/下的org.apache.hadoop.mapred.pipes中
(2) 输入目录和输出目录需位于各个datanode的本地磁盘上,格式为:file:///home/xxx/pipes_test (注意,hdfs中的各种接口同时支持本地路径和HDFS路径,如果是HDFS上的路径,需要使用hdfs://host:9000/user/xxx,表示/user/xxx为namenode 为host的hdfs上的路径,而本地路径,需使用file:///home/xxx/pipes_test,表示/home/xxx/pipes_test为本地路径。例如,bin/hadoop fs –ls file:///home/xxx/pipes_test表示列出本地磁盘上/home/xxx/pipes_tes下的文件)
待确定好各个datanode的本地磁盘上有输入数据/home/xxx/pipes_test/data.txt后,用户首先上传可执行文件到HDFS中:
1
|
bin/hadoop -put build/c++-examples/Linux-amd64-64/bin/wordcount-nopipe /user/XXX/bin/
|
然后使用下面命令提交该作业:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
bin/hadoop pipes \
-D hadoop.pipes.java.recordreader= false \
-D hadoop.pipes.java.recordwriter= false \
-D mapred.job.name=wordcount \
-D mapred.input.format. class =org.apache.hadoop.mapred.pipes.WordCountInputFormat \
-libjars hadoop-0.20.2-test.jar \
-input file: ///home/xxx/pipes_test/data.txt \
-output file: ///home/xxx/pipes_output \
-program /user/XXX/bin/wordcount-nopipe
|
5. Hadoop pipes高级编程
如果用户需要在mapreduce作业中加载词典或者传递参数,可这样做:
(1) 提交作业时,用-files选项,将词典(需要传递参数可以放到一个配置文件中)上传给各个datanode,如:
1
2
3
4
5
6
7
8
9
10
11
|
bin/hadoop pipes \
-D hadoop.pipes.java.recordreader= false \
-D hadoop.pipes.java.recordwriter= false \
-D mapred.job.name=wordcount \
-files dic.txt \
….
|
(2) 在Mapper或者Reducer的构造函数中,将字典文件以本地文件的形式打开,并把内容保存到一个map或者set中,然后再map()或者reduce()函数中使用即可,如:
1
2
3
4
5
6
7
|
WordCountMap(HadoopPipes::TaskContext& context) {
file = fopen (“dic.txt”, "r" ); //C库函数
…….
}
|
为了提高系能,RecordReader和RecordWriter最好采用Java代码实现(或者重用Hadoop中自带的),这是因为Hadoop自带的C++库libhdfs采用JNI实现,底层还是要调用Java相关接口,效率很低,此外,如果要处理的文件为二进制文件或者其他非文本文件,libhdfs可能不好处理。
6. 总结
Hadoop pipes使C++程序员编写MapReduce作业变得可能,它简单好用,提供了用户所需的大部分功能。
1.Hadoop pipes编程介绍
Hadoop pipes允许C++程序员编写mapreduce程序,它允许用户混用C++和Java的RecordReader,Mapper,Partitioner,Rducer和RecordWriter等五个组件。关于Hadoop pipes的设计思想,可参见我这篇文章:
本文介绍了Hadoop pipes编程的基本方法,并给出了若干编程示例,最后介绍了Hadoop pipes高级编程方法,包括怎样在MapReduce中加载词典,怎么传递参数,怎样提高效率等。
2.Hadoop pipes编程初体验
Hadoop-0.20.2源代码中自带了三个pipes编程示例,它们位于目录src/examples/pipes/impl中,分别为wordcount-simple.cc,wordcount-part.cc和wordcount-nopipe.cc。下面简要介绍一下这三个程序。
(1)wordcount-simple.cc:Mapper和Reducer组件采用C++语言编写,RecordReader, Partitioner和RecordWriter采用Java语言编写,其中,RecordReader为LineRecordReader(位于InputTextInputFormat中,按行读取数据,行所在的偏移量为key,行中的字符串为value),Partitioner为PipesPartitioner,RecordWriter为LineRecordWriter(位于InputTextOutputFormat中,输出格式为”key\tvalue\n”)
(2)wordcount-part.cc:Mapper,Partitioner和Reducer组件采用C++语言编写,其他采用Java编写
(3)wordcount-nopipe.cc:RecordReader,Mapper,Rducer和RecordWriter采用C++编写
接下来简单介绍一下wordcount-simple.cc的编译和运行方法。
在Hadoop的安装目录下,执行下面命令:
ant -Dcompile.c++=yes examples
则wordcount-simple.cc生成的可执行文件wordcount-simple被保存到了目录build/c++-examples/Linux-amd64-64/bin/中,然后将该可执行文件上传到HDFS的某一个目录下,如/user/XXX/ bin下:
bin/hadoop-putbuild/c++-examples/Linux-amd64-64/bin/wordcount-simple/user/XXX/ bin/
上传一份数据到HDFS的/user/XXX /pipes_test_data目录下:
bin/hadoop-putdata.txt/user/XXX /pipes_test_data
直接使用下面命令提交作业:
bin/hadoop pipes \
-D hadoop.pipes.java.recordreader=true \
-D hadoop.pipes.java.recordwriter=true \
-D mapred.job.name= wordcount \
-input /user/XXX /pipes_test_data \
-output /user/XXX /pipes_test_output \
-program /user/XXX/ bin/wordcount-simple
3.Hadoop pipes编程方法
先从最基础的两个组件Mapper和Reducer说起。
(1)Mapper编写方法
用户若要实现Mapper组件,需继承HadoopPipes::Mapper虚基类,它的定义如下:
class Mapper: public Closable {
public:
virtual void map(MapContext& context) = 0;
};
用户必须实现map函数,它的参数是MapContext,该类的声明如下:
class MapContext: public TaskContext {
public:
virtual const std::string& getInputSplit() = 0;
virtual const std::string& getInputKeyClass() = 0;
virtual const std::string& getInputValueClass() = 0;
};
而TaskContext类地声明如下:
class TaskContext {
public:
class Counter {
……
public:
Counter(int counterId) : id(counterId) {}
Counter(const Counter& counter) : id(counter.id) {}
……
};
virtual const JobConf* getJobConf() = 0;
virtual const std::string& getInputKey() = 0;
virtual const std::string& getInputValue() = 0;
virtual void emit(const std::string& key, const std::string& value) = 0;
virtual void progress() = 0;
…….
};
用户可以从context参数中获取当前的key,value,progress和inputsplit等数据信息,此外,还可以调用emit将结果回传给Java代码。
Mapper的构造函数带有一个HadoopPipes::TaskContext参数,用户可以通过它注册一些全局counter,对于程序调试和跟踪作业进度非常有用:
如果你想注册全局counter,在构造函数添加一些类似的代码:
WordCountMap(HadoopPipes::TaskContext& context) {
inputWords1 = context.getCounter(“group”, ”counter1”);
inputWords2 = context.getCounter(“group”, ”counter2”);
}
当需要增加counter值时,可以这样:
context.incrementCounter(inputWords1, 1);
context.incrementCounter(inputWords2, 1);
其中getCounter的两个参数分别为组名和组内计数器名,一个组中可以存在多个counter。
用户自定义的counter会在程序结束时,输出到屏幕上,当然,用户可以用通过web界面看到。
(2)Reducer编写方法
Reducer组件的编写方法跟Mapper组件类似,它需要继承虚基类public HadoopPipes::Reducer。
与Mapper组件唯一不同的地方时,map函数的参数类型为HadoopPipes::ReduceContext,它包含一个nextValue()方法,这允许用于遍历当前key对应的value列表,依次进行处理。
接下来介绍RecordReader,Partitioner和RecordWriter的编写方法:
(3)RecordReader编写方法
用户自定义的RecordReader类需要继承虚基类HadoopPipes::RecordReader,它的声明如下:
class RecordReader: public Closable {
public:
virtual bool next(std::string& key, std::string& value) = 0;
virtual float getProgress() = 0;
};
用户需要实现next和getProgress两个方法。
用户自定义的RecordReader的构造函数可携带类型为HadoopPipes::MapContext的参数,通过该参数的getInputSplit()的方法,用户可以获取经过序列化的InpuSplit对象,Java端采用不同的InputFormat可导致InputSplit对象格式不同,但对于大多数InpuSplit对象,它们可以提供至少三个信息:当前要处理的InputSplit所在的文件名,所在文件中的偏移量,它的长度。用户获取这三个信息后,可使用libhdfs库读取文件,以实现next方法。
(4)Partitioner编写方法
用户自定义的Partitioner类需要继承虚基类HadoopPipes:: Partitioner,它的声明如下:
class Partitioner {
public:
virtual int partition(const std::string& key, int numOfReduces) = 0;
virtual ~Partitioner() {}
};
用户需要实现partition方法和析构函数。
对于partition方法,框架会自动为它传入两个参数,分别为key值和reduce task的个数numOfReduces,用户只需返回一个0~ numOfReduces-1的值即可。
(5)RecordWriter编写方法
用户自定义的RecordWriter类需要继承虚基类HadoopPipes:: RecordWriter,它的声明如下:
class RecordWriter: public Closable {
public:
virtual void emit(const std::string& key,
const std::string& value) = 0;
};
用户自定的RecordWriter的构造函数可携带类型为HadoopPipes::MapContext的参数,通过该参数的getJobConf()可获取一个HadoopPipes::JobConf的对象,用户可从该对象中获取该reduce task的各种参数,如:该reduce task的编号(这对于确定输出文件名有用),reduce task的输出目录等。
class WordCountWriter: public HadoopPipes::RecordWriter {
public:
MyWriter(HadoopPipes::ReduceContext& context) {
const HadoopPipes::JobConf* job = context.getJobConf();
int part = job->getInt(“mapred.task.partition”);
std::string outDir = job->get(“mapred.work.output.dir”);
……
}
}
用户需实现emit方法,将数据写入某个文件。
4.Hadoop pipes编程示例
网上有很多人怀疑Hadoop pipes自带的程序wordcount-nopipe.cc不能运行,各个论坛都有讨论,在此介绍该程序的设计原理和运行方法。
该运行需要具备以下前提:
(1) 采用的InputFormat为WordCountInputFormat,它位于src/test/下的org.apache.hadoop.mapred.pipes中
(2) 输入目录和输出目录需位于各个datanode的本地磁盘上,格式为:file:///home/xxx/pipes_test(注意,hdfs中的各种接口同时支持本地路径和HDFS路径,如果是HDFS上的路径,需要使用hdfs://host:9000/user/xxx,表示/user/xxx为namenode为host的hdfs上的路径,而本地路径,需使用file:///home/xxx/pipes_test,表示/home/xxx/pipes_test为本地路径)
待确定好各个datanode的本地磁盘上有输入数据/home/xxx/pipes_test/data.txt后,用户首先上传可执行文件到HDFS中:
bin/hadoop-putbuild/c++-examples/Linux-amd64-64/bin/wordcount-simple/user/XXX/ bin/
然后使用下面命令运行该程序:
bin/hadoop pipes \
-D hadoop.pipes.java.recordreader=false \
-D hadoop.pipes.java.recordwriter=false \
-D mapred.job.name=wordcount \
-D mapred.input.format.class=org.apache.hadoop.mapred.pipes.WordCountInputFormat \
-libjars hadoop-0.20.2-test.jar \
-input file:/home/xxx/pipes_test/data.txt \
-output file:/home/xxx/pipes_output \
-program /user/XXX/ bin/wordcount-nopipe
5.Hadoop pipes高级编程
如果用户需要在mapreduce作业中加载词典或者传递参数,可这样做:
(1) 提交作业时,用-files选项,将词典(需要传递参数可以放到一个配置文件中)上传给各个datanode,如
bin/hadoop pipes \
-D hadoop.pipes.java.recordreader=false \
-D hadoop.pipes.java.recordwriter=false \
-D mapred.job.name=wordcount \
-files dic.txt \
….
(2)在Mapper或者Reducer的构造函数中,将字典文件以本地文件的形式打开,并把内容保存到一个map或者set中,然后再map()或者reduce()函数中使用即可,如
WordCountMap(HadoopPipes::TaskContext& context) {
file = fopen(“dic.txt”, “r”); //C库函数
…….
}
为了提高系能,RecordReader和RecordWriter最好采用Java代码实现(或者重用Hadoop中自带的),这是因为Hadoop自带的C++库libhdfs采用JNI实现,底层还是要调用Java相关接口,效率很低,此外,如果要处理的文件为二进制文件或者其他非文本文件,libhdfs可能不好处理。
6.总结
1. Hadoop pipes编程介绍
Hadoop pipes允许C++程序员编写mapreduce程序,它允许用户混用C++和Java的RecordReader,Mapper,Partitioner,Rducer和RecordWriter等五个组件。关于Hadoop pipes的设计思想,可参见我这篇文章:
本文介绍了Hadoop pipes编程的基本方法,并给出了若干编程示例,最后介绍了Hadoop pipes高级编程方法,包括怎样在MapReduce中加载词典,怎么传递参数,怎样提高效率等。
2. Hadoop pipes编程初体验
Hadoop-0.20.2源代码中自带了三个pipes编程示例,它们位于目录src/examples/pipes/impl中,分别为wordcount-simple.cc,wordcount-part.cc和wordcount-nopipe.cc。下面简要介绍一下这三个程序。
(1) wordcount-simple.cc:Mapper和Reducer组件采用C++语言编写,RecordReader, Partitioner和RecordWriter采用Java语言编写,其中,RecordReader 为LineRecordReader(位于InputTextInputFormat中,按行读取数据,行所在的偏移量为key,行中的字符串为value),Partitioner为PipesPartitioner,RecordWriter为LineRecordWriter(位于InputTextOutputFormat中,输出格式为”key\tvalue\n”)
(2) wordcount-part.cc:Mapper,Partitioner和Reducer组件采用C++语言编写,其他采用Java编写
(3)wordcount-nopipe.cc:RecordReader,Mapper,Rducer和RecordWriter采用C++编写
接下来简单介绍一下wordcount-simple.cc的编译和运行方法。
在Hadoop的安装目录下,执行下面命令:
ant -Dcompile.c++=yes examples
则wordcount-simple.cc生成的可执行文件wordcount-simple被保存到了目录build/c++-examples/Linux-amd64-64/bin/中,然后将该可执行文件上传到HDFS的某一个目录下,如/user/XXX/ bin下:
bin/hadoop -put build/c++-examples/Linux-amd64-64/bin/wordcount-simple /user/XXX/ bin/
上传一份数据到HDFS的/user/XXX /pipes_test_data目录下:
bin/hadoop -put data.txt /user/XXX /pipes_test_data
直接使用下面命令提交作业:
bin/hadoop pipes \
-D hadoop.pipes.java.recordreader=true \
-D hadoop.pipes.java.recordwriter=true \
-D mapred.job.name= wordcount \
-input /user/XXX /pipes_test_data \
-output /user/XXX /pipes_test_output \
-program /user/XXX/ bin/wordcount-simple
3. Hadoop pipes编程方法
先从最基础的两个组件Mapper和Reducer说起。
(1) Mapper编写方法
用户若要实现Mapper组件,需继承HadoopPipes::Mapper虚基类,它的定义如下:
class Mapper: public Closable {
public:
virtual void map(MapContext& context) = 0;
};
用户必须实现map函数,它的参数是MapContext,该类的声明如下:
class MapContext: public TaskContext {
public:
virtual const std::string& getInputSplit() = 0;
virtual const std::string& getInputKeyClass() = 0;
virtual const std::string& getInputValueClass() = 0;
};
而TaskContext类地声明如下:
class TaskContext {
public:
class Counter {
……
public:
Counter(int counterId) : id(counterId) {}
Counter(const Counter& counter) : id(counter.id) {}
……
};
virtual const JobConf* getJobConf() = 0;
virtual const std::string& getInputKey() = 0;
virtual const std::string& getInputValue() = 0;
virtual void emit(const std::string& key, const std::string& value) = 0;
virtual void progress() = 0;
…….
};
用户可以从context参数中获取当前的key,value,progress和inputsplit等数据信息,此外,还可以调用emit将结果回传给Java代码。
Mapper的构造函数带有一个HadoopPipes::TaskContext参数,用户可以通过它注册一些全局counter,对于程序调试和跟踪作业进度非常有用:
如果你想注册全局counter,在构造函数添加一些类似的代码:
WordCountMap(HadoopPipes::TaskContext& context) {
inputWords1 = context.getCounter(“group”, ”counter1”);
inputWords2 = context.getCounter(“group”, ”counter2”);
}
当需要增加counter值时,可以这样:
context.incrementCounter(inputWords1, 1);
context.incrementCounter(inputWords2, 1);
其中getCounter的两个参数分别为组名和组内计数器名,一个组中可以存在多个counter。
用户自定义的counter会在程序结束时,输出到屏幕上,当然,用户可以用通过web界面看到。
(2) Reducer编写方法
Reducer组件的编写方法跟Mapper组件类似,它需要继承虚基类public HadoopPipes::Reducer。
与Mapper组件唯一不同的地方时,map函数的参数类型为HadoopPipes::ReduceContext,它包含一个nextValue()方法,这允许用于遍历当前key对应的value列表,依次进行处理。
接下来介绍RecordReader, Partitioner和RecordWriter的编写方法:
(3) RecordReader编写方法
用户自定义的RecordReader类需要继承虚基类HadoopPipes::RecordReader,它的声明如下:
class RecordReader: public Closable {
public:
virtual bool next(std::string& key, std::string& value) = 0;
virtual float getProgress() = 0;
};
用户需要实现next和 getProgress两个方法。
用户自定义的RecordReader的构造函数可携带类型为HadoopPipes::MapContext的参数,通过该参数的getInputSplit()的方法,用户可以获取经过序列化的InpuSplit对象,Java端采用不同的InputFormat可导致InputSplit对象格式不同,但对于大多数InpuSplit对象,它们可以提供至少三个信息:当前要处理的InputSplit所在的文件名,所在文件中的偏移量,它的长度。用户获取这三个信息后,可使用libhdfs库读取文件,以实现next方法。
(4) Partitioner编写方法
用户自定义的Partitioner类需要继承虚基类HadoopPipes:: Partitioner,它的声明如下:
class Partitioner {
public:
virtual int partition(const std::string& key, int numOfReduces) = 0;
virtual ~Partitioner() {}
};
用户需要实现partition方法和 析构函数。
对于partition方法,框架会自动为它传入两个参数,分别为key值和reduce task的个数numOfReduces,用户只需返回一个0~ numOfReduces-1的值即可。
(5) RecordWriter编写方法
用户自定义的RecordWriter类需要继承虚基类HadoopPipes:: RecordWriter,它的声明如下:
class RecordWriter: public Closable {
public:
virtual void emit(const std::string& key,
const std::string& value) = 0;
};
用户自定的RecordWriter的构造函数可携带类型为HadoopPipes::MapContext的参数,通过该参数的getJobConf()可获取一个HadoopPipes::JobConf的对象,用户可从该对象中获取该reduce task的各种参数,如:该reduce task的编号(这对于确定输出文件名有用),reduce task的输出目录等。
class WordCountWriter: public HadoopPipes::RecordWriter {
public:
MyWriter(HadoopPipes::ReduceContext& context) {
const HadoopPipes::JobConf* job = context.getJobConf();
int part = job->getInt(“mapred.task.partition”);
std::string outDir = job->get(“mapred.work.output.dir”);
……
}
}
用户需实现emit方法,将数据写入某个文件。
4. Hadoop pipes编程示例
网上有很多人怀疑Hadoop pipes自带的程序wordcount-nopipe.cc不能运行,各个论坛都有讨论,在此介绍该程序的设计原理和运行方法。
该运行需要具备以下前提:
(1) 采用的InputFormat为WordCountInputFormat,它位于src/test/下的org.apache.hadoop.mapred.pipes中
(2) 输入目录和输出目录需位于各个datanode的本地磁盘上,格式为:file:///home/xxx/pipes_test (注意,hdfs中的各种接口同时支持本地路径和HDFS路径,如果是HDFS上的路径,需要使用hdfs://host:9000/user/xxx,表示/user/xxx为namenode 为host的hdfs上的路径,而本地路径,需使用file:///home/xxx/pipes_test,表示/home/xxx/pipes_test为本地路径)
待确定好各个datanode的本地磁盘上有输入数据/home/xxx/pipes_test/data.txt后,用户首先上传可执行文件到HDFS中:
bin/hadoop -put build/c++-examples/Linux-amd64-64/bin/wordcount-simple /user/XXX/ bin/
然后使用下面命令运行该程序:
bin/hadoop pipes \
-D hadoop.pipes.java.recordreader=false \
-D hadoop.pipes.java.recordwriter=false \
-D mapred.job.name=wordcount \
-D mapred.input.format.class=org.apache.hadoop.mapred.pipes.WordCountInputFormat \
-libjars hadoop-0.20.2-test.jar \
-input file:/home/xxx/pipes_test/data.txt \
-output file:/home/xxx/pipes_output \
-program /user/XXX/ bin/wordcount-nopipe
5. Hadoop pipes高级编程
如果用户需要在mapreduce作业中加载词典或者传递参数,可这样做:
(1) 提交作业时,用-files选项,将词典(需要传递参数可以放到一个配置文件中)上传给各个datanode,如
bin/hadoop pipes \
-D hadoop.pipes.java.recordreader=false \
-D hadoop.pipes.java.recordwriter=false \
-D mapred.job.name=wordcount \
-files dic.txt \
….
(2) 在Mapper或者Reducer的构造函数中,将字典文件以本地文件的形式打开,并把内容保存到一个map或者set中,然后再map()或者reduce()函数中使用即可,如
WordCountMap(HadoopPipes::TaskContext& context) {
file = fopen(“dic.txt”, “r”); //C库函数
…….
}
为了提高系能,RecordReader和RecordWriter最好采用Java代码实现(或者重用Hadoop中自带的),这是因为Hadoop自带的C++库libhdfs采用JNI实现,底层还是要调用Java相关接口,效率很低,此外,如果要处理的文件为二进制文件或者其他非文本文件,libhdfs可能不好处理。
6. 总结
Hadoop pipes使C++程序员编写MapReduce作业变得可能,它简单好用,提供了用户所需的大部分功能。
Hadoop pipes使C++程序员编写MapReduce作业变得可能,它简单好用,提供了用户所需的大部分功能。
原创文章,转载请注明: 转载自董的博客
本文链接地址: http://dongxicheng.org/mapreduce/hadoop-pipes-programming/
Hadoop pipes编程相关推荐
- Hadoop Streaming 编程
1.概述 Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,例如: 采用shell脚本语言中的一些命令作为ma ...
- Hadoop Pipes编程之C++实现WordCount
2019独角兽企业重金招聘Python工程师标准>>> Hadoop虽然用java实现,但是同样可以支持其他语言的Map和Reduce.由于需要学习C++方面的Hadoop实现,所以 ...
- Hadoop Streaming编程实例
Hadoop Streaming是Hadoop提供的多语言编程工具,通过该工具,用户可采用任何语言编写MapReduce程序,本文将介绍几个Hadoop Streaming编程实例,大家可重点从以下几 ...
- Hadoop pipes设计原理
1. 什么是Hadoop pipes? Hadoop pipes允许用户使用C++语言进行MapReduce程序设计.它采用的主要方法是将应用逻辑相关的C++代码放在单独的进程中,然后通过Socket ...
- 在Hadoop 2.3上运行C++程序各种疑难杂症(Hadoop Pipes选择、错误集锦、Hadoop2.3编译等)
首记 感觉Hadoop是一个坑,打着大数据最佳解决方案的旗帜到处坑害良民.记得以前看过一篇文章,说1TB以下的数据就不要用Hadoop了,体现不出太大的优势,有时候反而会成为累赘.因此Hadoop的使 ...
- Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)
不多说,直接上代码. Hadoop MapReduce编程 API入门系列之小文件合并(二十九) 生成的结果,作为输入源. 代码 package zhouls.bigdata.myMapReduce. ...
- Hadoop hdfs编程案例和java交互
Hadoop hdfs编程案例 一. HDFS编程实践 二.利用Java API与HDFS进行交互 三.应用程序的部署 一. HDFS编程实践 启动hadoop 切换到hadoop安装目录 cd /u ...
- Hadoop MapReduce编程 API入门系列之查找相同字母组成的字谜(三)
找出相同单词的所有单词.现在,是拿取部分数据集(如下)来完成本项目. 项目需求 一本英文书籍包含成千上万个单词或者短语,现在我们需要在大量的单词中,找出相同字母组成的所有anagrams(字谜). 思 ...
- 《Hadoop高级编程》之为Hadoop实现构建企业级安全解决方案
本章内容提要 ● 理解企业级应用的安全顾虑 ● 理解Hadoop尚未为企业级应用提供的安全机制 ● 考察用于构建企业级安全解决方案的方法 第10章讨论了Hadoop安全性以及Hado ...
最新文章
- NVIDIA DGX SUPERPOD 企业解决方案
- WCF如何通过契约加编码方式调用
- web框架flask(12)——国际化和本地化
- python培训Day1 随笔
- 掌握这几种平面设计思维类型,让你设计水平上新台阶
- 如何缩短IDEA行号的距离
- LIS路径记录(UVA481)
- 天文学中常用的坐标系
- cmd编译java文件中文乱码_乱码 HelloWorld 世界你好 cmd 执行输出的中文java 显示乱码 解决 另附 win无法执行编译运行javac java编译文件的解决方案...
- 英伟达2022财年第二季度获得创纪录营收65.1亿美元
- 详解Java类对象执行顺序
- Arcgis for android 100.4 getFieldType ()
- Dart的数据库操作
- window服务器搭建私有Git详解
- 基于AE+C#实现在TOCControl中实现指定图层删除
- 20145210 20145226实验一
- mysql可以建立个人数据库吗_mysql怎么建立数据库?
- mysql双机热备份_MySQL双机热备份试验
- Android 自动检测版本更新(包含强制更新)并安装
- ORAN专题系列-29:运营商O-RAN扩展皮站测试的硬件架构
热门文章
- 华为诺亚方舟实验室主任李航:神经符号处理开启自然语言处理新篇章
- 人人皆可大数据!SACC教你玩转阿里ODPS
- Visual Studio 2013开发 mini-filter driver step by step (1) - 创建 mini filter driver 工程
- jvm性能调优 - 19G1分代回收原理深度图解
- 白话Elasticsearch43-深入聚合数据分析之案例实战__排序:按每种颜色的平均销售额升序排序
- Git-将已有的项目转换为GIT项目托管到 GITHUB 仓库
- mysql groupby 取值_mysql获取groupby总记录行数的方法
- 关于source /etc/profile命令
- Kotlin-如何创建一个好用的协程作用域
- 射影几何教程: 1 射影几何介绍