2019独角兽企业重金招聘Python工程师标准>>>

简介

通过本章节,您可以学习到:

  1. Job的提交流程
  2. InputFormat数据切片的机制

1、Job提交流程源码分析

1)job提交流程源码详解
waitForCompletion()
submit();
// 1建立连接
connect();
// 1)创建提交job的代理
new Cluster(getConfiguration());// (1)判断是本地yarn还是远程initialize(jobTrackAddr, conf); // 2 提交job
submitter.submitJobInternal(Job.this, cluster)// 1)创建给集群提交数据的Stag路径Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);// 2)获取jobid ,并创建job路径JobID jobId = submitClient.getNewJobID();// 3)拷贝jar包到集群
copyAndConfigureFiles(job, submitJobDir);   rUploader.uploadFiles(job, jobSubmitDir);
// 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);maps = writeNewSplits(job, jobSubmitDir);input.getSplits(job);
// 5)向Stag路径写xml配置文件
writeConf(conf, submitJobFile);conf.writeXml(out);
// 6)提交job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

注意以上代码只是大过程的提取,并不是连续的某处的代码。要了解详细的过程,可以通过编译器打断点了解。

2、InputFomat数据切片机制

2.1、FileInputFormat图解分析

红色划分是均分方式,这种方式比较低下。

而当前采用的是蓝色方式,以一个块为一个切片。大致流程如下:

  1. 找到你数据输入的目录。
  2. 开始遍历处理(规划切片)目录下的每一个文件
  3. 循环执行4-6步骤,直接遍历完所有输入文件。
  4. 遍历第一个文件test1.file
    • 获取文件大小fs.sizeOf(ss.txt);
    • 计算切片大小computeSliteSize(Math.max(minSize,Math.max(maxSize,blocksize)))=blocksize;
    • 默认情况下,切片大小=blocksize
  5. 开始切片,形成第1个切片:test1.file—0:128M;第2个切片test1.file—128:256M 第3个切片test1.file—256M:300M(每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)
  6. 将切片信息写到一个切片规划文件中。
    • 整个切片的核心过程在getSplit()方法中完成。需要注意的是数据切片只是在逻辑上对输入数据进行分片,并不会再磁盘上将其切分成分片进行存储。InputSplit只记录了分片的元数据信息,比如起始位置、长度以及所在的节点列表等。
  7. 提交切片规划文件到yarn上,yarn上的MrAppMaster就可以根据切片规划文件计算开启maptask个数。

block是HDFS上物理上存储的存储的数据,切片是对数据逻辑上的划分。

2.2、FileInputFormat中默认的切片机制

通过以下的学习,我们可以总结出以下三个结论:

  • 切片过程只是简单地按照文件的内容长度进行切片
  • 切片大小默认等于block大小
  • 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

举个例子加入我们有以下两个文件

file1.txt    320M
file2.txt    10M

经过FileInputFormat的切片机制运算后,默认配置下形成的切片信息如下:

file1.txt.split1--  0~128
file1.txt.split2--  128~256
file1.txt.split3--  256~320
file2.txt.split1--  0~10M

2.3、FileInputFormat切片大小的参数配置

通过分析源码org.apache.hadoop.mapreduce.lib.input.FileInputFormat,我们先来看看他的父类InputFormat

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//package org.apache.hadoop.mapreduce;import java.io.IOException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;@Public
@Stable
public abstract class InputFormat<K, V> {public InputFormat() {}public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}

父类规定了两个抽象方法getSplits以及RecordReader。

再来看看FileInputFormat计算分片大小的相关代码:

public List<InputSplit> getSplits(JobContext job) throws IOException {StopWatch sw = (new StopWatch()).start();long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));long maxSize = getMaxSplitSize(job);List<InputSplit> splits = new ArrayList();List<FileStatus> files = this.listStatus(job);Iterator var9 = files.iterator();while(true) {while(true) {while(var9.hasNext()) {FileStatus file = (FileStatus)var9.next();Path path = file.getPath();long length = file.getLen();if (length != 0L) {BlockLocation[] blkLocations;if (file instanceof LocatedFileStatus) {blkLocations = ((LocatedFileStatus)file).getBlockLocations();} else {FileSystem fs = path.getFileSystem(job.getConfiguration());blkLocations = fs.getFileBlockLocations(file, 0L, length);}if (this.isSplitable(job, path)) {long blockSize = file.getBlockSize();long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);long bytesRemaining;int blkIndex;for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));}if (bytesRemaining != 0L) {blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));}} else {splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts()));}} else {splits.add(this.makeSplit(path, 0L, length, new String[0]));}}job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size());sw.stop();if (LOG.isDebugEnabled()) {LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));}return splits;}}}

从中我们可以了解到,计算分片大小的逻辑为

// 初始化值
long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
...
// 计算分片大小long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);...protected long computeSplitSize(long blockSize, long minSize, long maxSize) {return Math.max(minSize, Math.min(maxSize, blockSize));}...
// minSize默认值为1Lprotected long getFormatMinSplitSize() {return 1L;}

也就说,切片主要由这几个值来运算决定

mapreduce.input.fileinputformat.split.minsize=1 默认值为1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue

因此,默认情况下,切片大小=blocksize。我们不难得到,要想修改分片的大小,完全可以通过配置文件的mapreduce.input.fileinputformat.split.minsize以及mapreduce.input.fileinputformat.split.maxsize进行配置:

  • mapreduce.input.fileinputformat.split.maxsize(切片最大值):参数如果调得比blocksize小,则会让切片变小。 mapreduce.input.fileinputformat.split.minsize (切片最小值):参数调的比blockSize大,则可以让切片变得比blocksize还大。

2.4、继承树

FileInputFormat有多个底层实现,2.7版本的jdk具有如下的继承树

默认情况下Job任务使用的是

2.5、获取切片信息API

// 根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();
// 获取切片的文件名称
String name = inputSplit.getPath().getName();

3、CombineTextInputFormat切片机制

默认情况下TextInputformat对任务的切片机制是按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个maptask,这样如果有大量小文件,就会产生大量的maptask,处理效率极其低下。最好的办法,在数据处理系统的最前端(预处理/采集),将小文件先合并成大文件,再上传到HDFS做后续分析。

如果已经是大量小文件在HDFS中了,可以使用另一种InputFormat来做切片(CombineTextInputFormat),它的切片逻辑跟TextFileInputFormat不同:它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个maptask。

优先满足最小切片大小,不超过最大切片大小

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

举例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m

如果不设置InputFormat,它默认用的是TextInputFormat.class,因此我们需要手动指定InputFormat类型,在执行job之前指定:

job.setInputFormatClass(CombineTextInputFormat.class)
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

通过此设置之后,分片会变得更少一些,不会像之前一样,一个文件形成一个分片(文件过小的情况尤其浪费)。

转载于:https://my.oschina.net/u/3091870/blog/3000619

【hadoop】20.MapReduce-InputFormat数据切片机制相关推荐

  1. MapReduce之FileInputFormat切片机制

    1.切片机制 切片大小,默认等于Block(128M)大小 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片 简单地按照文件的内容长度进行切片 2.源码中计算公式可以调整切片大小(默认等于Blo ...

  2. Hadoop之MapReduce理论篇01

    2019独角兽企业重金招聘Python工程师标准>>> 1. Writable序列化 序列化就是把内存中的对象,转换成字节序列 (或其他数据传输协议) 以便于存储 (持久化) 和网络 ...

  3. 他来了他来了,Hadoop序列化和切片机制了解一下?

    点击上方蓝色字体,选择"设为星标" 回复"面试"获取更多惊喜 切片机制 一个超大文件在HDFS上存储时,是以多个Block存储在不同的节点上,比如一个512M的 ...

  4. Hadoop MapReduce Splits 切片源码分析及切片机制

    本文从Job提交,逐步分析Splits相关源码. 数据块:Block是HDFS物理上把数据分成一块一块的. 数据切片:数据切片只是在物理上输入进行分片,并不会在磁盘上将其分成片进行存储. 文件路径 o ...

  5. 大数据技术之Hadoop(MapReduce)

    大数据技术之Hadoop(MapReduce) (作者:大数据研发部) 版本:V1.4 第1章MapReduce入门 map 计算 reduce 规约 1.1 MapReduce定义 Mapreduc ...

  6. MapTask并行度决定机制、FileInputFormat切片机制、map并行度的经验之谈、ReduceTask并行度的决定、MAPREDUCE程序运行演示(来自学笔记)

    1.3 MapTask并行度决定机制 maptask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度 那么,mapTask并行实例是否越多越好呢?其并行度又是如何决定呢? 1.3 ...

  7. Hadoop之InputFormat数据输入详解

    Hadoop之InputFormat数据输入详解 Job提交流程和切片源码详解 FileInputFormat切片机制 CombineTextInputFormat切片机制 InputFormat接口 ...

  8. (超详细)大数据Hadoop之MapReduce组件

    一.MapReduce 简介 1.1 MapReduce的概述 在Hadoop生态圈中,MapReduce属于核心,负责进行分布式计算. MapReduce 核心功能是将用户编写的业务逻辑代码和自带默 ...

  9. Hadoop中的FileInputFormat切片机制、FileInputFormat切片大小的参数配置、TextInputFormat、CombineTextInputFormat切片机制

    文章目录 13.MapReduce框架原理 13.1InputFormat数据输入 13.1.4FileInputFormat切片机制 13.1.4.1切片机制 13.1.4.2案例分析 13.1.4 ...

最新文章

  1. Asp.net团队疯了(同时发布WebMatrix, Razor, MVC3和Orchard)
  2. python socket coding
  3. 我的 2021 之感谢有你们(上篇)
  4. ubuntu14.04下通过.frm, .MYD,.MYI文件恢复建立mysql数据库
  5. 每天一道LeetCode-----将单词数组分成多行,每行长度相同,单词之间用空格分隔,要求空格尽量均匀分布
  6. mse均方误差计算公式_PCA的两种解读:方差最大与均方误差最小的推导
  7. Unity 生成APK 出错的解决方法
  8. 基础教程之Spin旋转篇
  9. (剑指Offer)面试题37:两个链表的第一个公共结点
  10. 智慧医疗整体解决方案及功能介绍
  11. Js求时间差、并转换为字符串
  12. 国内有哪些小众但很有意思的网站?这6个网站值得收藏
  13. 网站运营手册_分享几款运营必备软件合集,欢迎补充
  14. 一些适合程序员玩的游戏
  15. 联想手机里的照片误删怎么恢复
  16. 计算机网络 之 网络应用
  17. 经典网络命令(搜集、概括)
  18. 苹果已冻结招聘 VS 推特员工每周狂干 84 小时,防止被裁员
  19. java sbt_SBT管理java项目
  20. 愿天下团圆,愿天下再无团圆

热门文章

  1. mysql 常规命令操作_mysql数据库常规命令操作
  2. java 一般方法_一般覆盖Java中的方法
  3. 4g通信模块怎么连接sim卡_你好eSIM,再见SIM卡
  4. 贝叶斯分类器的matlab实现_贝叶斯实验
  5. malloc 初始化_你真的了解 NSObject 对象的初始化吗?
  6. 服务器连接异常系统无法登录,win10系统无法登录LOL提示“服务器连接异常”的解决方法...
  7. nginx 反向代理跨域访问配置_nginx反向代理配置去除前缀
  8. 图像降噪算法——小波硬阈值滤波(下)
  9. 【星球知识卡片】图像风格化与翻译都有哪些核心技术,如何对其长期深入学习...
  10. 【每周CV论文推荐】 掌握残差网络必读的10多篇文章