我们按照图中的流程,梳理一下MapReduce的任务流程。

  • 初始时,是上述的一个文本。MapReduce接收到作业输入后,会先进行数据拆分。

  • 数据拆分完成之后,会有多个 小文本 数据,每个小文本都会作为一个Map任务的输入。这样一个大的MapReduce作业,会被分解为多个小的Map任务。

  • Combiner会处理Map生成的数据,需要注意的是,此时Map生产的仅仅是中间结果。Combiner是一个可选的组件,用户不设置,他就不存在。

  • 之后,数据会到达Partitioner,Partitioner组件会将中间数据按照哈希函数的对应规则,将中间结果分配到对应的Reducer所在节点上。

  • Reducer会处理中间数据,得到最终的结果。

这就是,一个完整的MapReduce作业的生老病死的概括,其真实的流程自然远不止此,我们会在后面娓娓道来。

先让我们仔仔细细地了解一下上述过程的每一个组件。

一、扯一扯Map

有了上述的内容,我们可以进行下一步了。

按照我们说的,我们应该将这个小短文分成几个部分。也就是图中的数据划分。

(1)首先进行数据划分

当我们开启一个MapReduce程序,一般传入的输入都是一个体积巨大的数据。MapReduce接收到数据后,需要对数据进行划分。通俗来讲,就是我们前文说的,我们该如果将一个小短文划分成多行,分配个多个人进行统计。

MapReduce中有一个InputFormat类,它会完成如下三个任务:

  • 验证作业数据的输入形式和格式

  • 将输入数据分割为若干个逻辑意义上的InputSplit,其中每一个InputSplit都将单独作为Map任务的输入。也就是说,InputSplit的个数,代表了Map任务的个数。需要注意,这里并没有做实际切分,仅仅是将数据进行逻辑上的切分。

  • 提供一个RecordReader,用于将Map的输入转换为若干个记录。虽然MapReduce作业可以接受很多种格式的数据,但是Map任务接收的任务其实是键值对类型的数据,因此需要将初始的输入数据转化为键值对。RecordReader对象会从数据分片中读取出数据记录,然后转化为 Key-Value 键值对,逐个输入到Map中进行处理。

问题在于,这个InputFormat类该如何进行划分呢?在FileInputFormat类中,会有一个getSplits函数,这个函数所做的事情其实就是进行数据切分的过程。我们稍微看一下这个函数:

public List<InputSplit> getSplits(JobContext job) throws IOException {    StopWatch sw = new StopWatch().start();    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));    long maxSize = getMaxSplitSize(job);    //...    for (FileStatus file: files) {        if (isSplitable(job, path)) {          long blockSize = file.getBlockSize();          long splitSize = computeSplitSize(blockSize, minSize, maxSize);            //...        }        //...    }    //...}protected long computeSplitSize(long blockSize, long minSize,                                long maxSize) {    return Math.max(minSize, Math.min(maxSize, blockSize));}
  • minSize :每个split的最小值,默认为1.getFormatMinSplitSize()为代码中写死,固定返回1,除非修改了hadoop的源代码.getMinSplitSize(job)取决于参数mapreduce.input.fileinputformat.split.minsize,如果没有设置该参数,返回1.故minSize默认为1.

  • maxSize:每个split的最大值,如果设置了mapreduce.input.fileinputformat.split.maxsize,则为该值,否则为Long的最大值。

  • blockSize :默认为HDFS设置的文件存储BLOCK大小。注意:该值并不一定是唯一固定不变的。HDFS上不同的文件该值可能不同。故将文件划分成split的时候,对于每个不同的文件,需要获取该文件的blocksize。

  • splitSize :根据公式,默认为blockSize 。

从上述代码中可以看到,这个InputSize在 [minSize, maxSize] 之间。

(2)这样,我们可以理一理划分逻辑

  • 1)遍历输入目录中的每个文件,拿到该文件

  • 2)计算文件长度,A:如果文件长度为0,如果mapred.split.zero.file.skip=true,则不划分split ; 如果mapred.split.zero.file.skip为false,生成一个length=0的split .B:如果长度不为0,跳到步骤3

  • 3)判断该文件是否支持split :如果支持,跳到步骤4;如果不支持,该文件不切分,生成1个split,split的length等于文件长度。

  • 4)根据当前文件,计算splitSize

  • 5)判断剩余待切分文件大小/splitsize是否大于SPLIT_SLOP(该值为1.1,代码中写死了) 如果true,切分成一个split,待切分文件大小更新为当前值-splitsize ,再次切分。生成的split的length等于splitsize;如果false 将剩余的切到一个split里,生成的split length等于剩余待切分的文件大小。之所以需要判断剩余待切分文件大小/splitsize,主要是为了避免过多的小的split。比如文件中有100个109M大小的文件,如果splitSize=100M,如果不判断剩余待切分文件大小/splitsize,将会生成200个split,其中100个split的size为100M,而其中100个只有9M,存在100个过小的split。MapReduce首选的是处理大文件,过多的小split会影响性能。

划分好Split之后,这些数据进入Map任务,按照用户设计处理逻辑进行处理。Map可以由用户定义设计处理逻辑。

二、聊一聊Combiner

Combiner组件并不是一个必须部分,用户可以按照实际的需求灵活的添加。Combiner组件的主要作用是 减少网络传输负载,优化网络数据传输优化

当我们Map任务处理完成之后,上述的文本会变成一个一个的 Key-Value 对。

(This, 1)(distribution, 1)...

在没有Combiner组件前提下,这些键值对会直接传输到Reducer端,进行最后的统计工作。但是这一步是可以优化的,因为Map端仅仅是将每行的词拆分了,但是其实可以再做一步统计的。

例如,我们假设在Map任务A这里出现了两次 (This, 1),我们可以做一次统计,将这个Map任务上的This做一次统计,生成(This, 2)。在大数据场合,千万个这样的相同词的合并会显著降低网络负载。

但是并不是所有的场合都适用Combiner,这个组件是可有可无的,用户需要按照自己的需求灵活决定

因为Combiner可以存在,也可以不存在,所有,我们设计Combiner时,要保证Combiner的key-value和Map的key-value一致 。这也意味着,若你设计的Combiner改变了原先Map的键值对设计,那么你的Combiner设计就是不合法的。

三、瞅一瞅Partitioner

为了保证所有主键相同的键值对会传输到同一个Reducer节点,以便Reducer节点可以在不访问其他Reducer节点的情况下就可以计算出最终的结果,我们需要对来自Map(如果有Combiner,就是Combiner之后的结果)中间键值对进行分区处理,Partitioner主要就是进行分区处理的。

Partitioner 默认的分发规则

根据 keyhashcode%reduce task 数来分发,所以:如果要按照我们自己的需求进行分组,则需要改写数据分发(分区)组件 Partitioner

  • Partition 的 key value, 就是Mapper输出的key value

    public interface Partitioner<K2, V2> extends JobConfigurable {    /**    * Get the paritition number for a given key (hence record) given the total    * number of partitions i.e. number of reduce-tasks for the job.   *      * 

    Typically a hash function on a all or a subset of the key.

      *   * @param key 用来partition的key值。   * @param value 键值对的值。   * @param numPartitions 分区数目。   * @return the partition number for the key.   */  int getPartition(K2 key, V2 value, int numPartitions);}

    输入是Map的结果对和Reducer的数目,输出则是分配的Reducer(整数编号)就是指定Mappr输出的键值对到哪一个reducer上去。系统缺省的Partitioner是HashPartitioner,它以key的Hash值对Reducer的数目取模,得到对应的Reducer。这样保证如果有相同的key值,肯定被分配到同一个reducre上。如果有N个reducer,编号就为0,1,2,3……(N-1)

  • MapReduce 中会将 map 输出的 kv 对,按照相同 key 分组,然后分发给不同的 reducetask 默认的分发规则为:根据 key 的 hashcode%reduce task 数来分发,所以:如果要按照我们自 己的需求进行分组,则需要改写数据分发(分组)组件 Partitioner, 自定义一个 CustomPartitioner 继承抽象类:Partitioner

  • 因此, Partitioner 的执行时机, 是在Map输出 key-value 对之后

四、MapReduce中的Sort

MapReduce中的很多流程都涉及到了排序,我们会在后面详细说明。

从整个MapReduce的程序执行来看,整个过程涉及到了 快排、归并排序、堆排 三种排序方法。

五、遛一遛Reduce

Reduce会处理上游(Map,也可能有Combiner)的中间结果。

需要注意的是,Map到Reduce整个过程中,键值的变化是不一样的

  1. 初始是文本内容,会被RecordReader处理为键值对

  2. 经过Map(也可能有Combiner)后,仍然是键值对形式

  3. 经过Partition,到达Reduce的结果是 key - list(value) 形式。所以在Reduce处理的value其实一个整体。

Reduce会把所有的结果处理完成,输出到对应的输出路径。

弊端

MapReduce的Reduce处理结果最后都是需要落盘的,当一个project中含有多个MapReduce的 作业(job)时,无法有效利用内存。

mapreduce 文件可以切分吗_MapReduce的任务流程相关推荐

  1. Linux文件的切分和结合

    Linux文件的切分和结合 2008-04-28 15:13 1.文件的切分及结合工具: 可能我们遇到这种情况,有时文件比较大,想上传到服务器上,但由于服务器管理员为了安全考 虑,把上传空间作了限制, ...

  2. python 文件路径切分

    见代码 # 文件路径切分 import ospath = "/Users/test0712.xlsx" # 利用路径分隔符进行切分,再用索引进行选择 print(path.spli ...

  3. 文件的切分split和结合工具cat介绍

    从服务器上下载大文件,但网络环境不好,因此将大文件分成多个小文件,在使用rsync同步,会比直接同步或下载好许多. 按行(50000)将大文件分成小文件,每个小文件以'file_'为前缀,数字为其后缀 ...

  4. 大文件的切分--split命令选项详解

    由于工作中需要处理很大的数据文件,使用split命令将其切分成较小的文件后再进行处理是一个不错的选择. 在默认情况下,split以1000行为单位进行切分,如果不足1000行的会另外输出到一个文件. ...

  5. 虚拟机centos7 git clone特别慢_从文件生命周期看GIT的提交流程

    上一篇GIT的理论知识比较枯燥无味,理论性较强,也是难以引起共鸣! 波罗学:谈谈版本管理GIT之理论与架构​zhuanlan.zhihu.com 紧接上篇,今天从实在操作方面说一下GIT使用中,使用最 ...

  6. linux 文件拆分 合并,Linux下文件的切分与合并的简单方法

    linux下文件分割可以通过split命令来实现,可以将一个大文件拆分成指定大小的多个文件,并且拆分速度非常的快,可以指定按行数分割和安大小分割两种模式.Linux下文件合并可以通过cat命令来实现, ...

  7. mapreduce复制连接的代码_MapReduce:在大型集群上简化数据处理(2)

    特别说明 这是一个由simviso团队所组织进行的基于mit分布式系统课程翻译的系列,由知秋带领和其他成员一起翻译的课程以及课程当中涉及的论文翻译. 由于微信排版功能有限,想要看最新版文档的小伙伴,请 ...

  8. linux中split分割文件打开方式,Linux使用split对文件进行切分和合并的方法

    linux下文件分割可以通过split命令来实现,可以将一个大文件拆分成指定大小的多个文件,并且拆分速度非常的快,可以指定按行数分割和安大小分割两种模式.Linux下文件合并可以通过cat命令来实现, ...

  9. Python机器学习数据预处理:读取txt数据文件并切分为训练和测试数据集

    背景信息 在使用Python进行机器学习时,经常需要自己完成数据的预处理,本节主要实现对txt文本数据的读取,该文本满足如下要求: 每行为一条样本数据,包括特征值与标签,标签在最后 样本数据的特征值之 ...

最新文章

  1. python 使用 redis expire属性设置访问时间间隔
  2. Bilateral Filtering(双边滤波) for SSAO
  3. 【c++】9.深拷贝、浅拷贝、拷贝构造函数 、移动构造函数
  4. 配置oracle驱动_Myeclipse中添加Oracle
  5. Java设计模式-桥接模式 理论代码相结合
  6. 阿里巴巴大规模应用Flink的踩坑经验:如何大幅降低 HDFS 压力?
  7. HDU Problem - 1533 Going Home(费用流板子题)
  8. UVa 11324 最大团(强连通分量缩点)
  9. java第二章_零基础学Java第二章
  10. 【报告分享】2021全球职场调研中国报告:期待与忐忑,职场人的心声-普华永道.pdf(附下载链接)...
  11. 通俗易懂:说说 Python 里的线程安全、原子操作
  12. 【数据清洗】异常点的理解与处理方法(1)
  13. SAP BW:0FI_GL_4 的特殊增强
  14. 下载地址url中带有中文是url转换方法
  15. IDEA2019开发WebService实例
  16. 威尔逊定理 及其拓展
  17. 蜂鸟数据Trochil:论述制定策略的两种主要方法:市场假设和瑞士法郎案例研究-构建更好的策略1
  18. ih5手机版怎么登录服务器未响应,ih5 与服务器链接教程
  19. stm32外部中断问题(每次stm32进行系统复位按键控制NRST=0,程序立马进入中断服务函数)
  20. mac安装MongoDB与启动

热门文章

  1. Python:序列的copy() 方法和 copy 模块
  2. python服务端多进程压测工具
  3. python 用turtle库画围棋棋盘和正、余弦函数图形
  4. 进度条设置_项目功能分解4:MATLAB GUI如何设计有特色的进度条。
  5. linux进程VSZ(虚拟内存)
  6. python 多进程 multiprocessing 进程池 pool apply_async()函数与apply()函数的用法
  7. 如何绘制计算机软件程序流程图?
  8. numpy的常规使用(数组合并、拼接、添加)
  9. java 特殊符号正则_java利用正则表达式处理特殊字符的方法实例
  10. synchronousqueue场景_java并发队列之SynchronousQueue