mapreduce工作流程_详解MapReduce中的五大编程模型
前言
我们上一节讲了关于 MapReduce
中的应用场景和架构分析,最后还使用了一个CountWord
的Demo
来进行演示,关于MapReduce
的具体操作。如果还不了解的朋友可以看看上篇文章:[初识MapReduce的应用场景(附JAVA和Python代码)](初识MapReduce的应用场景(附JAVA和Python代码) - 掘金)
接下来,我们会讲解关于MapReduce
的编程模型,这篇文章的主要目的就是讲清楚Mapreduce
的编程模型有多少种,它们之间是怎么协调合作的,会尽量从源码的角度来解析,最后就是讲解不同的语言是如何调用Hadoop
中的Mapreduce
的API
的。
目录
- MapReduce 编程模型的框架
- 五种编程模型的详解
- InputFormat
- OutPutFormat
- Mapper
- Reducer
- Partitioner
- 总结
MapReduce 编程模型的框架
我们先来看一张图,关于MapReduce
的编程模型
- 用户程序层
用户程序层是指用户用编写好的代码来调用MapReduce
的接口层。
- 工具层
- Job control 是为了监控`Hadoop`中的`MapReduce`向集群提交复杂的作业任务,提交了任务到集群中后,形成的任务是一个有向图。每一个任务都有两个方法
submit()
和waitForCompletion(boolean)
,submit()
方法是向集群中提交作业,然后立即返回,waitForCompletion(boolean)
就是等待集群中的作业是否已经完成了,如果完成了,得到的结果可以当作下个任务的输入。
chain Mapper
和chain Reducer
的这个模块,是为了用户编写链式作业,形式类似于Map + Reduce Map *
,表达的意思就是只有一个Reduce
,在Reduce
的前后可以有多个Map
Hadoop Streaming
支持的是脚本语言,例Python
、PHP
等来调用Hadoop
的底层接口,Hadoop Pipes
支持的是C++
来调用。
- 编程接口层,这一层是全部由
Java
语言来实现的,如果是Java
来开发的话,那么可以直接使用这一层。
详解五种编程模型
InputFormat
作用
对输入进入MapReduce
的文件进行规范处理,主要包括InputSplit
和RecordReader
两个部分。TextOutputFormat
是默认的文件输入格式。
InputSplit
这个是指对输入的文件进行逻辑切割,切割成一对对Key-Value
值。有两个参数可以定义InputSplit
的块大小,分别是mapred.max.split.size
(记为minSize
)和mapred.min.split.size
(记为maxSize
)。
RecordReader
是指作业在InputSplit
中切割完成后,输出Key-Value
对,再由RecordReader
进行读取到一个个Mapper
文件中。如果没有特殊定义,一个Mapper
文件的大小就是由Hadoop
的block_size
决定的,Hadoop 1.x
中的block_size
是64M
,在Hadoop 2.x
中的
block_size
的大小就是128M
。
切割块的大小
在Hadoop2.x
以上的版本中,一个splitSize
的计算公式为
OutputFormat
作用
对输出的文件进行规范处理,主要的工作有两个部分,一个是检查输出的目录是否已经存在,如果存在的话就会报错,另一个是输出最终结果的文件到文件系统中,`TextOutputFormat`是默认的输出格式。
OutputCommiter
OutputCommiter
的作用有六点:
- 作业(
job
)的初始化
//进行作业的初始化,建立临时目录。
//如果初始化成功,那么作业就会进入到 Running 的状态
public abstract void setupJob(JobContext var1) throws IOException;
```
- 作业运行结束后就删除作业
```
//如果这个job完成之后,就会删除掉这个job。
//例如删除掉临时的目录,然后会宣布这个job处于以下的三种状态之一,SUCCEDED/FAILED/KILLED
@Deprecatedpublic void cleanupJob(JobContext jobContext) throws IOException {}
- 初始化
Task
//初始化Task的操作有建立Task的临时目录
public abstract void setupTask(TaskAttemptContext var1) throws IOException;
- 检查是否提交`Task`的结果
//检查是否需要提交Task,为的是Task不需要提交的时候提交出去
public abstract boolean needsTaskCommit(TaskAttemptContext var1) throws IOException;
- 提交
Task
//任务结束的时候,需要提交任务
public abstract void commitTask(TaskAttemptContext var1) throws IOException;
- 回退
Task
//如果Task处于KILLED或者FAILED的状态,这Task就会进行删除掉临时的目录
//如果这个目录删除不了(例如出现了异常后,处于被锁定的状态),另一个同样的Task会被执行
//然后使用同样的attempt-id去把这个临时目录给删除掉,也就说,一定会把临时目录给删除干净public abstract void abortTask(TaskAttemptContext var1) throws IOException;
处理Task Side-Effect File
在Hadoop
中有一种特殊的文件和特殊的操作,那就是Side-Eddect File
,这个文件的存在是为了解决某一个Task
因为网络或者是机器性能的原因导致的运行时间过长,从而导致拖慢了整体作业的进度,所以会为每一个任务在另一个节点上再运行一个子任务,然后选择两者中处理得到的结果最快的那个任务为最终结果,这个时候为了避免文件都输入在同一个文件中,所以就把备胎任务输出的文件取作为 Side-Effect File
RecordWriter
这个是指输出KEY-VALUE
对到文件中。
Mapper和Reducer
详解Mapper
InputFormat
为每一个 InputSplit
生成一个 map
任务,mapper
的实现是通过job
中的setMapperClass(Class)
方法来配置写好的map
类,如这样
//设置要执行的mapper类
job.setMapperClass(WordMapper.class);
其内部是调用了map(WritableComparable, Writable, Context)
这个方法来为每一个键值对写入到InputSplit
,程序会调用cleanup(Context)
方法来执行清理任务,清理掉不需要使用到的中间值。
关于输入的键值对类型不需要和输出的键值对类型一样,而且输入的键值对可以映射到0
个或者多个键值对。通过调用context.write(WritableComparable, Writable)
来收集输出的键值对。程序使用Counter
来统计键值对的数量,
在Mapper
中的输出被排序后,就会被划分到每个Reducer
中,分块的总数目和一个作业的reduce
任务的数目是一样的。
需要多少个Mapper任务
关于一个机器节点适合多少个map
任务,官方的文档的建议是,一个节点有10
个到100
个任务是最好的,如果是cpu
低消耗的话,300
个也是可以的,最合理的一个map
任务是需要运行超过1
分钟。
详解Reducer
Reducer
任务的话就是将Mapper
中输出的结果进行统计合并后,输出到文件系统中。
用户可以自定义Reducer
的数量,使用Job.setNumReduceTasks(int)
这个方法。
在调用Reducer
的话,使用的是Job.setReducerClass(Class)
方法,内部调用的是 reduce(WritableComparable, Iterable<Writable>, Context)
这个方法,最后,程序会调用cleanup(Context)
来进行清理工作。如这样:
//设置要执行的reduce类
job.setReducerClass(WordReduce.class);
Reducer
实际上是分三个阶段,分别是Shuffle
、Sort
和Secondary Sort
。
shuffle
这个阶段是指Reducer
的输入阶段,系统会为每一个Reduce
任务去获取所有的分块,通过的是HTTP
的方式
sort
这个阶段是指在输入Reducer
阶段的值进行分组,sort
和shuffle
是同时进行的,可以这么理解,一边在输入的时候,同时在一边排序。
Secondary Sort
这个阶段不是必需的,只有在中间过程中对key
的排序和在reduce
的输入之前对key
的排序规则不同的时候,才会启动这个过程,可以通过的是Job.setSortComparatorClass(Class)
来指定一个Comparator
进行排序,然后再结合 Job.setGroupingComparatorClass(Class)
来进行分组,最后可以实现二次排序。
在整个`reduce`中的输出是没有排序
需要多少个 Reducer 任务
建议是0.95
或者是1.75
*mapred.tasktracker.reduce.tasks.maximum
。如果是0.9
的话,那么就可以在mapper
任务结束时,立马就可以启动`Reducer`任务。如果是1.75
的话,那么运行的快的节点就可以在map
任务完成的时候先计算一轮,然后等到其他的节点完成的时候就可以计算第二轮了。当然,Reduce
任务的个数不是越多就越好的,个数多会增加系统的开销,但是可以在提升负载均衡,从而降低由于失败而带来的负面影响。
Partitioner
这个模块用来划分键值空间,控制的是map
任务中的key
值分割的分区,默认使用的算法是哈希函数,HashPartitioner
是默认的Partitioner
。
总结
这篇文章主要就是讲了MapReduce
的框架模型,分别是分为用户程序层、工具层、编程接口层这三层,在编程接口层主要有五种编程模型,分别是InputFomat
、MapperReduce
、Partitioner
、OnputFomat
和Reducer
。主要是偏理论,代码的参考例子可以参考官方的例子:[WordCount_v2.0](MapReduce Tutorial)
这是MapReduce
系列的第二篇,接下来的一篇会详细写关于MapReduce
的作业配置和环境,结合一些面试题的汇总,所以接下来的这篇还是干货满满的,期待着就好了。
mapreduce工作流程_详解MapReduce中的五大编程模型相关推荐
- java 死锁 内存消耗_详解Java中synchronized关键字的死锁和内存占用问题
先看一段synchronized 的详解: synchronized 是 java语言的关键字,当它用来修饰一个方法或者一个代码块的时候,能够保证在同一时刻最多只有一个线程执行该段代码. 一.当两个并 ...
- [转载] python中for语句用法_详解Python中for循环的使用_python
参考链接: 在Python中将else条件语句与for循环一起使用 这篇文章主要介绍了Python中for循环的使用,来自于IBM官方网站技术文档,需要的朋友可以参考下 for 循环 本系列前面 &q ...
- python xlrd安装_详解python中xlrd包的安装与处理Excel表格
一.安装xlrd 地址 下载后,使用 pip install .whl安装即好. 查看帮助: >>> import xlrd >>> help(xlrd) Help ...
- python的装饰器迭代器与生成器_详解python中的生成器、迭代器、闭包、装饰器
迭代是访问集合元素的一种方式.迭代器是一个可以记住遍历的位置的对象.迭代器对象从集合的第一个元素开始访问,直到所有的元素被访问完结束.迭代器只能往前不会后退. 1|1可迭代对象 以直接作用于 for ...
- python中heapq的库是什么_详解Python中heapq模块的用法
详解Python中heapq模块的用法 来源:中文源码网 浏览: 次 日期:2018年9月2日 [下载文档: 详解Python中heapq模块的用法.txt ] (友情提示:右键点上行t ...
- python中get函数是什么意思_详解python中get函数的用法(附代码)_后端开发
strncmp函数用法详解_后端开发 strncmp函数为字符串比较函数,其函数语法为"int strncmp ( const char * str1, const char * str2, ...
- python中break怎么用_详解Python中break语句的用法
详解Python中break语句的用法 在Python中的break语句终止当前循环,继续执行下一个语句,就像C语言中的break一样. break最常见的用途是当一些外部条件被触发,需要从一个循环中 ...
- python中filepath路径怎么写_详解Python中的路径问题
1. 绝对路径引入 Python 在搜索模块时,依次搜索sys.path里的位置,直到找到模块为止.下面命令可以查看当前的搜索路径: import sys print(sys.path) sys.pa ...
- python中for语句用法_详解Python中for循环的使用_python
这篇文章主要介绍了Python中for循环的使用,来自于IBM官方网站技术文档,需要的朋友可以参考下 for 循环 本系列前面 "探索 Python,第 5 部分:用 Python 编程&q ...
最新文章
- Microservices Reference Architecture - with Spring Boot, Spring Cloud and Netflix OSS--转
- 软件测试报告bug统计,软件测试中如何有效地写Bug报告
- mysql表定义外键语法_mysql设置外键的语法怎么写?
- leetcode234 回文链表
- 利用Linux的强大移植性和兼容性将操作系统轻松安装到硬盘
- 这三种程序员,是时代的溺水者
- 查看器_「图」Firefox 70将启用全新证书查看器 允许关闭画中画图标
- 初笔,JAVA.HelloWorld代码详解
- 降准对房价与股市的影响!
- HLOJ486 种花小游戏
- python函数图像绘制、函数不固定_Python中函数图像快速绘制的方法
- 在d盘创建文件夹,里面有aaa.txt/bbb.txt/ccc.txt,然后遍历出aaa文件夹下的文件(新手用于记录每天的作业)...
- html新闻公告滚动效果,好用的滚动公告HTML代码
- eBPF-4-perf_map的丢失事件lost_event解读
- 关于Tomcat以及我是个小机灵鬼这回事
- 无法打开编译的html,解决VS在编译的时候无法打开...obj文件的问题
- [机器学习实战] 深度学习为黑白图像着彩色
- 天道酬勤系列之Python 希尔排序
- 目标检测中Regional Proposal到底是什么,RPN和Region Proposal、Proposals三者联系
- html静态网页制作的博客,[推荐]初学制作静态网页HTML推荐标准_
热门文章
- flex的mxmlc命令行编译as3文件成swf
- 撕起来了!谁说数据少就不能用深度学习?这锅俺不背!
- [杭电ACM]3336Count the string
- Tries and Ternary Search Trees in Python and Javascript
- 升级SharePoint数据库到SQL Server 2005的一点心得
- golang float string int 相互转换 保留小数位
- golang 结构体 map 转化为 json
- spring security oauth rce (cve-2016-4977) 漏洞分析
- linux curl编译 arm交叉编译
- metasploit 快速入门(二)信息收集和扫描-续