前言

我们上一节讲了关于 MapReduce 中的应用场景和架构分析,最后还使用了一个CountWordDemo来进行演示,关于MapReduce的具体操作。如果还不了解的朋友可以看看上篇文章:[初识MapReduce的应用场景(附JAVA和Python代码)](初识MapReduce的应用场景(附JAVA和Python代码) - 掘金)

接下来,我们会讲解关于MapReduce的编程模型,这篇文章的主要目的就是讲清楚Mapreduce的编程模型有多少种,它们之间是怎么协调合作的,会尽量从源码的角度来解析,最后就是讲解不同的语言是如何调用Hadoop中的MapreduceAPI的。

目录

  • MapReduce 编程模型的框架
  • 五种编程模型的详解
  • InputFormat
  • OutPutFormat
  • Mapper
  • Reducer
  • Partitioner
  • 总结

MapReduce 编程模型的框架

我们先来看一张图,关于MapReduce的编程模型

MapReduce的框架图
  • 用户程序层

用户程序层是指用户用编写好的代码来调用MapReduce的接口层。

  • 工具层
  • Job control 是为了监控`Hadoop`中的`MapReduce`向集群提交复杂的作业任务,提交了任务到集群中后,形成的任务是一个有向图。每一个任务都有两个方法

submit()waitForCompletion(boolean)submit()方法是向集群中提交作业,然后立即返回,waitForCompletion(boolean)就是等待集群中的作业是否已经完成了,如果完成了,得到的结果可以当作下个任务的输入。

  • chain Mapperchain Reducer 的这个模块,是为了用户编写链式作业,形式类似于 Map + Reduce Map *,表达的意思就是只有一个Reduce,在Reduce的前后可以有多个Map
  • Hadoop Streaming支持的是脚本语言,例PythonPHP等来调用Hadoop的底层接口,Hadoop Pipes支持的是 C++来调用。
  • 编程接口层,这一层是全部由Java语言来实现的,如果是Java来开发的话,那么可以直接使用这一层。

详解五种编程模型

InputFormat

作用

对输入进入MapReduce的文件进行规范处理,主要包括InputSplitRecordReader两个部分。TextOutputFormat是默认的文件输入格式。

InputForMat中的流程图

InputSplit

这个是指对输入的文件进行逻辑切割,切割成一对对Key-Value值。有两个参数可以定义InputSplit的块大小,分别是mapred.max.split.size(记为minSize)和mapred.min.split.size(记为maxSize)。

RecordReader

是指作业在InputSplit中切割完成后,输出Key-Value对,再由RecordReader进行读取到一个个Mapper文件中。如果没有特殊定义,一个Mapper文件的大小就是由Hadoopblock_size决定的,Hadoop 1.x中的block_size64M,在Hadoop 2.x中的

block_size的大小就是128M

切割块的大小

Hadoop2.x以上的版本中,一个splitSize的计算公式为

OutputFormat

作用

对输出的文件进行规范处理,主要的工作有两个部分,一个是检查输出的目录是否已经存在,如果存在的话就会报错,另一个是输出最终结果的文件到文件系统中,`TextOutputFormat`是默认的输出格式。

OnputForMat中的流程图

OutputCommiter

OutputCommiter的作用有六点:

  • 作业(job)的初始化
//进行作业的初始化,建立临时目录。
//如果初始化成功,那么作业就会进入到 Running 的状态
public abstract void setupJob(JobContext var1) throws IOException;
```
- 作业运行结束后就删除作业
```
//如果这个job完成之后,就会删除掉这个job。
//例如删除掉临时的目录,然后会宣布这个job处于以下的三种状态之一,SUCCEDED/FAILED/KILLED
@Deprecatedpublic void cleanupJob(JobContext jobContext) throws IOException {}

  • 初始化 Task
//初始化Task的操作有建立Task的临时目录
public abstract void setupTask(TaskAttemptContext var1) throws IOException;

  • 检查是否提交`Task`的结果
//检查是否需要提交Task,为的是Task不需要提交的时候提交出去
public abstract boolean needsTaskCommit(TaskAttemptContext var1) throws IOException;

  • 提交Task
//任务结束的时候,需要提交任务
public abstract void commitTask(TaskAttemptContext var1) throws IOException;

  • 回退Task
//如果Task处于KILLED或者FAILED的状态,这Task就会进行删除掉临时的目录
//如果这个目录删除不了(例如出现了异常后,处于被锁定的状态),另一个同样的Task会被执行
//然后使用同样的attempt-id去把这个临时目录给删除掉,也就说,一定会把临时目录给删除干净public abstract void abortTask(TaskAttemptContext var1) throws IOException;

处理Task Side-Effect File

Hadoop中有一种特殊的文件和特殊的操作,那就是Side-Eddect File,这个文件的存在是为了解决某一个Task因为网络或者是机器性能的原因导致的运行时间过长,从而导致拖慢了整体作业的进度,所以会为每一个任务在另一个节点上再运行一个子任务,然后选择两者中处理得到的结果最快的那个任务为最终结果,这个时候为了避免文件都输入在同一个文件中,所以就把备胎任务输出的文件取作为 Side-Effect File

RecordWriter

这个是指输出KEY-VALUE对到文件中。

Mapper和Reducer

详解Mapper

InputFormat为每一个 InputSplit 生成一个 map 任务,mapper的实现是通过job中的setMapperClass(Class)方法来配置写好的map类,如这样

//设置要执行的mapper类
job.setMapperClass(WordMapper.class);

其内部是调用了map(WritableComparable, Writable, Context)这个方法来为每一个键值对写入到InputSplit,程序会调用cleanup(Context)方法来执行清理任务,清理掉不需要使用到的中间值。

关于输入的键值对类型不需要和输出的键值对类型一样,而且输入的键值对可以映射到0个或者多个键值对。通过调用context.write(WritableComparable, Writable)来收集输出的键值对。程序使用Counter来统计键值对的数量,

Mapper中的输出被排序后,就会被划分到每个Reducer中,分块的总数目和一个作业的reduce任务的数目是一样的。

需要多少个Mapper任务

关于一个机器节点适合多少个map任务,官方的文档的建议是,一个节点有10个到100个任务是最好的,如果是cpu低消耗的话,300个也是可以的,最合理的一个map任务是需要运行超过1分钟。

详解Reducer

Reducer任务的话就是将Mapper中输出的结果进行统计合并后,输出到文件系统中。

用户可以自定义Reducer的数量,使用Job.setNumReduceTasks(int)这个方法。

在调用Reducer的话,使用的是Job.setReducerClass(Class)方法,内部调用的是 reduce(WritableComparable, Iterable<Writable>, Context)这个方法,最后,程序会调用cleanup(Context)来进行清理工作。如这样:

//设置要执行的reduce类
job.setReducerClass(WordReduce.class);

Reducer实际上是分三个阶段,分别是ShuffleSortSecondary Sort

shuffle

这个阶段是指Reducer的输入阶段,系统会为每一个Reduce任务去获取所有的分块,通过的是HTTP的方式

sort

这个阶段是指在输入Reducer阶段的值进行分组,sortshuffle是同时进行的,可以这么理解,一边在输入的时候,同时在一边排序。

Secondary Sort

这个阶段不是必需的,只有在中间过程中对key的排序和在reduce的输入之前对key的排序规则不同的时候,才会启动这个过程,可以通过的是Job.setSortComparatorClass(Class)来指定一个Comparator进行排序,然后再结合 Job.setGroupingComparatorClass(Class)来进行分组,最后可以实现二次排序。

在整个`reduce`中的输出是没有排序

需要多少个 Reducer 任务

建议是0.95或者是1.75*mapred.tasktracker.reduce.tasks.maximum。如果是0.9的话,那么就可以在mapper任务结束时,立马就可以启动`Reducer`任务。如果是1.75的话,那么运行的快的节点就可以在map任务完成的时候先计算一轮,然后等到其他的节点完成的时候就可以计算第二轮了。当然,Reduce任务的个数不是越多就越好的,个数多会增加系统的开销,但是可以在提升负载均衡,从而降低由于失败而带来的负面影响。

Partitioner

这个模块用来划分键值空间,控制的是map任务中的key值分割的分区,默认使用的算法是哈希函数,HashPartitioner是默认的Partitioner

总结

这篇文章主要就是讲了MapReduce的框架模型,分别是分为用户程序层、工具层、编程接口层这三层,在编程接口层主要有五种编程模型,分别是InputFomatMapperReducePartitionerOnputFomatReducer。主要是偏理论,代码的参考例子可以参考官方的例子:[WordCount_v2.0](MapReduce Tutorial)

这是MapReduce系列的第二篇,接下来的一篇会详细写关于MapReduce的作业配置和环境,结合一些面试题的汇总,所以接下来的这篇还是干货满满的,期待着就好了。

mapreduce工作流程_详解MapReduce中的五大编程模型相关推荐

  1. java 死锁 内存消耗_详解Java中synchronized关键字的死锁和内存占用问题

    先看一段synchronized 的详解: synchronized 是 java语言的关键字,当它用来修饰一个方法或者一个代码块的时候,能够保证在同一时刻最多只有一个线程执行该段代码. 一.当两个并 ...

  2. [转载] python中for语句用法_详解Python中for循环的使用_python

    参考链接: 在Python中将else条件语句与for循环一起使用 这篇文章主要介绍了Python中for循环的使用,来自于IBM官方网站技术文档,需要的朋友可以参考下 for 循环 本系列前面 &q ...

  3. python xlrd安装_详解python中xlrd包的安装与处理Excel表格

    一.安装xlrd 地址 下载后,使用 pip install .whl安装即好. 查看帮助: >>> import xlrd >>> help(xlrd) Help ...

  4. python的装饰器迭代器与生成器_详解python中的生成器、迭代器、闭包、装饰器

    迭代是访问集合元素的一种方式.迭代器是一个可以记住遍历的位置的对象.迭代器对象从集合的第一个元素开始访问,直到所有的元素被访问完结束.迭代器只能往前不会后退. 1|1可迭代对象 以直接作用于 for ...

  5. python中heapq的库是什么_详解Python中heapq模块的用法

    详解Python中heapq模块的用法 来源:中文源码网    浏览: 次    日期:2018年9月2日 [下载文档:  详解Python中heapq模块的用法.txt ] (友情提示:右键点上行t ...

  6. python中get函数是什么意思_详解python中get函数的用法(附代码)_后端开发

    strncmp函数用法详解_后端开发 strncmp函数为字符串比较函数,其函数语法为"int strncmp ( const char * str1, const char * str2, ...

  7. python中break怎么用_详解Python中break语句的用法

    详解Python中break语句的用法 在Python中的break语句终止当前循环,继续执行下一个语句,就像C语言中的break一样. break最常见的用途是当一些外部条件被触发,需要从一个循环中 ...

  8. python中filepath路径怎么写_详解Python中的路径问题

    1. 绝对路径引入 Python 在搜索模块时,依次搜索sys.path里的位置,直到找到模块为止.下面命令可以查看当前的搜索路径: import sys print(sys.path) sys.pa ...

  9. python中for语句用法_详解Python中for循环的使用_python

    这篇文章主要介绍了Python中for循环的使用,来自于IBM官方网站技术文档,需要的朋友可以参考下 for 循环 本系列前面 "探索 Python,第 5 部分:用 Python 编程&q ...

最新文章

  1. Microservices Reference Architecture - with Spring Boot, Spring Cloud and Netflix OSS--转
  2. 软件测试报告bug统计,软件测试中如何有效地写Bug报告
  3. mysql表定义外键语法_mysql设置外键的语法怎么写?
  4. leetcode234 回文链表
  5. 利用Linux的强大移植性和兼容性将操作系统轻松安装到硬盘
  6. 这三种程序员,是时代的溺水者
  7. 查看器_「图」Firefox 70将启用全新证书查看器 允许关闭画中画图标
  8. 初笔,JAVA.HelloWorld代码详解
  9. 降准对房价与股市的影响!
  10. HLOJ486 种花小游戏
  11. python函数图像绘制、函数不固定_Python中函数图像快速绘制的方法
  12. 在d盘创建文件夹,里面有aaa.txt/bbb.txt/ccc.txt,然后遍历出aaa文件夹下的文件(新手用于记录每天的作业)...
  13. html新闻公告滚动效果,好用的滚动公告HTML代码
  14. eBPF-4-perf_map的丢失事件lost_event解读
  15. 关于Tomcat以及我是个小机灵鬼这回事
  16. 无法打开编译的html,解决VS在编译的时候无法打开...obj文件的问题
  17. [机器学习实战] 深度学习为黑白图像着彩色
  18. 天道酬勤系列之Python 希尔排序
  19. 目标检测中Regional Proposal到底是什么,RPN和Region Proposal、Proposals三者联系
  20. html静态网页制作的博客,[推荐]初学制作静态网页HTML推荐标准_

热门文章

  1. flex的mxmlc命令行编译as3文件成swf
  2. 撕起来了!谁说数据少就不能用深度学习?这锅俺不背!
  3. [杭电ACM]3336Count the string
  4. Tries and Ternary Search Trees in Python and Javascript
  5. 升级SharePoint数据库到SQL Server 2005的一点心得
  6. golang float string int 相互转换 保留小数位
  7. golang 结构体 map 转化为 json
  8. spring security oauth rce (cve-2016-4977) 漏洞分析
  9. linux curl编译 arm交叉编译
  10. metasploit 快速入门(二)信息收集和扫描-续