MapReduce编程基础

1. WordCount示例及MapReduce程序框架

2.  MapReduce程序执行流程

3.  深入学习MapReduce编程(1)

4. 参考资料及代码下载

<1>. WordCount示例及MapReduce程序框架 

首先通过一个简单的程序来实际运行一个MapReduce程序,然后通过这个程序我们来哦那个结一下MapReduce编程模型。

下载源程序:/Files/xuqiang/WordCount.rar,将该程序打包成wordcount.jar下面的命令,随便写一个文本文件,这里是WordCountMrtrial,并上传到hdfs上,这里的路径是/tmp/WordCountMrtrial,运行下面的命令:

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ ./bin/hadoop jar wordcount.jar WordCount /tmp/WordCountMrtrial /tmp/result

如果该任务运行完成之后,将在hdfs的/tmp/result目录下生成类似于这样的结果:

gentleman 11

get 12
give 8
go 6
good 9
government 16

运行一个程序的基本上就是这样一个过程,我们来看看具体程序:

main函数中首先生成一个Job对象, Job job = new Job(conf, "word count");然后设置job的MapperClass,ReducerClass,设置输入文件路径FileInputFormat.addInputPath(job, new Path(otherArgs[0]));设置输出文件路径:FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));等待程序运行完成:System.exit(job.waitForCompletion(true) ? 0 : 1);可以看出main中仅仅是启动了一个job,然后设置该job相关的参数,具体实现MapReduce是mapper类和reducer类。

TokenizerMapper类中map函数将一行分割成<K2, V2>,然后IntSumReducer的reduce将<K2, list<V2>>转换成最终结果<K3, V3>。

通过这个示例基本上也能总结出简单的MapReduce编程的模型:一个Mapper类,一个Reducer类,一个Driver类。

<2>. MapReduce程序执行流程 

这里所描述的执行流程更加注重是从程序的角度去理解,更加全面的流程可参考[这里]。

首先用户指定待处理的文件,在WordCount就是文件WordCountMrtrial,这是hadoop根据设定的InputDataFormat来将输入文件分割成一个record(key/value对),然后将这些record传递给map函数,在WordCount示例中,对应的record就是<line_number行号, line_content该行内容>;

然后map函数根据输入的record,形成<K2, V2>,在WordCount示例中形成<K2, V2>就是<single_word, word_count>,例如<"a", 1>;

如果map过程完成之后,hadoop将这些生成的<K2, V2>按照K2进行分组,形成<K2,list(V2) >,之后传递给reduce函数,在该函数中最终得到程序的输出结果<K3, V3>。

<3>. 深入学习MapReduce编程(1)

3.1 hadoop data types

由于在hadoop需要将key/value对序列化,然后通过网络network发送到集群中的其他机器上,所以说hadoop中的类型需要能够序列化。

具体而言,自定义的类型,如果一个类class实现了Writable interface的话,那么这个可以作为value类型,如果一个class实现了WritableComparable<T> interface的话,那么这个class可以作为value类型或者是key类型。

hadoop本身已经实现了一些预定义的类型predefined classes,并且这些类型实现了WritableComparable<T>接口。

3.2 Mapper

如果一个类想要成为一个mapper,那么该类需要实现Mapper接口,同时继承自MapReduceBase。在MapReduceBase类中,两个方法是特别需要注意的:

void configure( JobConf job):这个方法是在任务被运行之前调用

void close():在任务运行完成之后调用

剩下的工作就是编写map方法,原型如下:

void map(Object key, Text value, Context context

) throws IOException, InterruptedException;

这个方法根据<K1, V1>生成<K2, V2>,然后通过context输出。

同样的在hadoop中预先定义了如下的Mapper:

3.3 Reducer

如果一个类想要成为Reducer的话,需要首先实现Reducer接口,然后需要继承自MapReduceBase。

当reducer接收从mapper传递而来的key/value对,然后根据key来排序,分组,最终生成<K2, list<V2>> ,然后reducer根据<K2, list<V2>>生成<K3, V3>.

同样在hadoop中预定义了一些Reducer:

3.4 Partitioner

Partitioner的作用主要是将mapper运行的结果“导向directing”到reducer。如果一个类想要成为Partitioner,那么需要实现Partitioner接口,该接口继承自JobConfigurable,定义如下:

public interface Partitioner<K2, V2> extends JobConfigurable {
  /** 
   * Get the paritition number for a given key (hence record) given the total 
   * number of partitions i.e. number of reduce-tasks for the job.
   *   
   * <p>Typically a hash function on a all or a subset of the key.</p>
   *
   * @param key the key to be paritioned.
   * @param value the entry value.
   * @param numPartitions the total number of partitions.
   * @return the partition number for the <code>key</code>.
   */
  int getPartition(K2 key, V2 value, int numPartitions);

hadoop将根据方法getPartition的返回值确定将mapper的值发送到那个reducer上。返回值相同的key/value对将被“导向“至同一个reducer。

3.5 Input Data Format and Output Data Format

3.5.1 Input Data Format

上面我们的假设是MapReduce程序的输入是key/value对,也就是<K1, V1>,但是实际上一般情况下MapReduce程序的输入是big file的形式,那么如何将这个文件转换成<K1, V1>,即file -> <K1, V1>。这就需要使用InputFormat接口了。

下面是几个常用InputFormat的实现类:

当然除了使用hadoop预先定义的InputDataFormat之外,还可以自定义,这是需要实现InputFormat接口。该接口仅仅包含两个方法:

InputSplit[] getSplits(JobConf job, int numSplits) throws  IOException;该接口实现将大文件分割成小块split。
RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job, 

Reporter reporter) throws IOException;

该方法输入分割成的split,然后返回RecordReader,通过RecordReader来遍历该split内的record。

3.5.2 Output Data Format

每个reducer将自己的输出写入到结果文件中,这是使用output data format来配置输出的文件的格式。hadoop预先实现了:

3.6 Streaming in Hadoop

3.6.1 执行流程

我们知道在linux中存在所谓的“流”的概念,也就是说我们可以使用下面的命令:

cat input.txt | RandomSample.py 10 >sampled_output.txt

同样在hadoop中我们也可以使用类似的命令,显然这样能够在很大程度上加快程序的开发进程。下面来看看hadoop中流执行的过程:

hadoop streaming从标砖输入STDIN读取数据,默认的情况下使用\t来分割每行,如果不存在\t的话,那么这时正行的内容将被看作是key,而此时的value内容为空;

然后调用mapper程序,输出<K2, V2>;

之后,调用Partitioner来将<K2, V2>输出到对应的reducer上;

reducer根据输入的<K2, list(V2)> 得到最终结果<K3, V3>并输出到STDOUT上。

3.6.2 简单示例程序

下面我们假设需要做这样一个工作,输入一个文件,文件中每行是一个数字,然后得到该文件中的数字的最大值(当然这里可以使用streaming中自带的Aggregate)。 首先我们编写一个python文件(如果对python不是很熟悉,看看[这里]):

3.6.2.1 准备数据

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.cs.brandeis.edu" >url1

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.nytimes.com" >url2

上传到hdfs上:

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -mkdir urls

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -put url1 urls/
xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -put url2 urls/

3.6.2.2 编写mapper multifetch.py

#!/usr/bin/env python

import sys, urllib, re

title_re = re.compile("<title>(.*?)</title>",
        re.MULTILINE | re.DOTALL | re.IGNORECASE)

for line in sys.stdin:
    # We assume that we are fed a series of URLs, one per line
    url = line.strip()
    # Fetch the content and output the title (pairs are tab-delimited)
    match = title_re.search(urllib.urlopen(url).read())
    if match:
        print url, "\t", match.group(1).strip()

该文件的主要作用是给定一个url,然后输出该url代表的html页面的title部分。

在本地测试一下该程序:

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.cs.brandeis.edu" >urls
xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.nytimes.com" >>urls

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ sudo chmod u+x ./multifetch.py

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ cat urls | ./multifetch.py 将输出:

http://www.cs.brandeis.edu  Computer Science Department | Brandeis University

http://www.nytimes.com The New York Times - Breaking News, World News &amp; Multimedia

3.6.2.3 编写reducer reducer.py

编写reducer.py文件

 #!/usr/bin/env python

from operator import itemgetter
import sys

for line in sys.stdin:
    line = line.strip()
    print line

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ chmod u+x ./reducer.py

现在我们的mapper和reducer已经准备好了,那么首先在本地上运行测试一下程序的功能,下面的命令模拟在hadoop上运行的过程:

首先mapper从stdin读取数据,这里是一行;

然后读取该行的内容作为一个url,然后得到该url代表的html的title的内容,输出<url, url-title-content>;

调用sort命令将mapper输出排序;

将排序完成的结果交给reducer,这里的reducer仅仅是将结果输出。

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ cat urls | ./multifetch.py | sort | ./reducer.py 
http://www.cs.brandeis.edu     Computer Science Department | Brandeis University
http://www.nytimes.com     The New York Times - Breaking News, World News &amp; Multimedia  

显然程序能够正确

3.6.2.4 在hadoop streaming上运行

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop jar ./mapred/contrib/streaming/hadoop-0.21.0-streaming.jar \
> -mapper /home/xuqiang/hadoop/src/hadoop-0.21.0/multifetch.py \
> -reducer /home/xuqiang/hadoop/src/hadoop-0.21.0/reducer.py \
> -input urls/* \

> -output titles 

程序运行完成之后,查看运行结果:

xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -cat titles/part-00000

<4>. 参考资料及代码下载

http://pages.cs.brandeis.edu/~cs147a/lab/hadoop-example/

http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html#Hadoop+Streaming

<Hadoop In Action>

出处:http://www.cnblogs.com/xuqiang/archive/2011/06/05/2071935.html

MapReduce编程基础相关推荐

  1. Hadoop生态圈(二十一)- MapReduce编程基础

    目录 1. MapReduce Partition.Combiner 1.1 MapReduce Partition分区 1.1.1 默认情况下MR输出文件个数 1.1.2 修改reducetask个 ...

  2. 【小白视角】大数据基础实践(五) MapReduce编程基础操作

    目录 1. MapReduce 简介 1.1 起源 1.2 模型简介 1.3 MRv1体系结构 1.4 YARN 1.4.1 YARN体系结构 1.4.2 YARN工作流程 2. MapReduce ...

  3. MapReduce编程:单词计数--《大数据基础教程》

    MapReduce编程:单词计数 文章目录 MapReduce编程:单词计数 1.实验描述 2.实验环境 3.相关技能 4.相关知识点 5.实现效果 6.实验步骤 7.参考答案 8.总结 1.实验描述 ...

  4. [Hadoop入门] - 1 Ubuntu系统 Hadoop介绍 MapReduce编程思想

    Ubuntu系统 (我用到版本号是140.4) ubuntu系统是一个以桌面应用为主的Linux操作系统,Ubuntu基于Debian发行版和GNOME桌面环境.Ubuntu的目标在于为一般用户提供一 ...

  5. 大数据之Hadoop学习——动手实战学习MapReduce编程实例

    文章目录 一.MapReduce理论基础 二.Hadoop.Spark学习路线及资源收纳 三.MapReduce编程实例 1.自定义对象序列化 需求分析 报错:Exception in thread ...

  6. mapreduce 编程模型

    MapReduce是在总结大量应用的共同特点的基础上抽象出来的分布式计算框架,它适用的应用场景往往具有一个共同的特点:任务可被分解成相互独立的子问题.基于该特点,MapReduce编程模型给出了其分布 ...

  7. 第七章-mapreduce编程实战实验

    实验的推进模式 先配置好eclipse for hadoop 直接借用WordCount去测试 利用WordCount的基本程序框架,编写自己的代码 要点:map/reduce的所在的类和方法的数据类 ...

  8. hadoop中使用MapReduce编程实例

    原文链接:http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html 从网上搜到的一篇hadoop的编程实例,对于初学者真是帮助太大 ...

  9. 关于介绍编程前景的html文档,HTML编程基础稿件(32页)-原创力文档

    * * 第2章 HTML编程基础 优选文档 * 内容提要 本章首先介绍HTML的发展历史,然后介绍HTML的基本框架 详细介绍了HTML的各种常用标记:文字标记.图片标记和超级链接标记,等等. 介绍C ...

最新文章

  1. SharePoint2013 Online中InfoPath 无法调用WebService
  2. elasticsearch插件大全(不断更新)
  3. php中isdefin,在PHP中定義全局常量數組的最“優雅”方法是什么
  4. Unity-Find-Script-References 查找脚本的引用
  5. java 8 update 11_从Java 8升级到Java 11应该注意的问题
  6. kmeans算法中的sse_聚类算法入门:k-means
  7. Ubuntu中Python3找不到_sqlite3模块
  8. stm32 工业按键检测_基于STM32芯片的能谱仪设计
  9. 前n个正整数相乘的时间复杂度为_初一数学常考的21个知识点,掌握好,轻松110+!...
  10. ad18修改过孔和走线间距_PCB设计之“过孔”
  11. Spring核心原理
  12. 书单 | 深度学习修炼秘籍
  13. 一元四次方程求根实现
  14. 通过 Teardrop 攻击程序学习自制 IP 包及了解包的结构
  15. 微信 WCDB 正式开源——高效易用的移动数据库框架
  16. HP打印机连不上解决办法
  17. 凯撒密码C语言用ASCII码,凯撒密码帮助ASCII循环
  18. 对项目成本和进度的监控----挣值分析
  19. 微服务网关Gateway实战
  20. iBatis延迟加载

热门文章

  1. 记一次mongoDB-@Document(collection = “XXX“)配置的探索
  2. python 读写utf8文件_Python关于 文件读写的总结
  3. 布道微服务_06微服务调用的监控
  4. MySQL-在线处理大表数据 在线修改大表的表结构
  5. Quartz- Quartz API以及Jobs 和Triggers介绍
  6. Python-爬取自己博客文章的URL
  7. matlab 添加环境变量,CentOS 添加环境变量的三种方法
  8. java round指令_Java PApplet.round方法代码示例
  9. java更新 位置_请求位置信息更新  |  Android 开发者  |  Android Developers
  10. kali linux获取root,kali linux 安装keybase 并使用root来运行keybase