撸了今年阿里、头条和美团的面试,我有一个重要发现.......>>>

split:split是逻辑切片,在mapreduce中的map task开始之前,将文件按照指定的大小切割成若干个部分,每一部分称为一个split,默认是split的大小与block的大小相等,均为128MB。

split大小由minSize、minSize、blocksize决定,以wordcount代码为例

找到 job.waitForCompletion( true);进入waitForCompletion

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WordCount {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job. getInstance(conf);//noticejob.setJarByClass(WordCount. class);//set mapper `s propertyjob.setMapperClass(WCMapper. class);job.setMapOutputKeyClass(Text. class);job.setMapOutputValueClass(LongWritable. class);FileInputFormat. setInputPaths(job, new Path(args[0]));//set reducer`s propertyjob.setReducerClass(WCReducer. class);job.setOutputKeyClass(Text. class);job.setOutputValueClass(LongWritable. class);FileOutputFormat. setOutputPath(job, new Path(args[1]));//submitjob.waitForCompletion( true);}}

进入waitForCompletion( true); 找到    submit();

 /*** Submit the job to the cluster and wait for it to finish.* @param verbose print the progress to the user* @return true if the job succeeded* @throws IOException thrown if the communication with the*         <code>JobTracker</code> is lost*/public boolean waitForCompletion( boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {if (state == JobState. DEFINE) {submit();}if (verbose) {monitorAndPrintJob();} else {// get the completion poll interval from the client.int completionPollIntervalMillis =Job. getCompletionPollInterval(cluster.getConf());while (!isComplete()) {try {Thread. sleep(completionPollIntervalMillis);} catch (InterruptedException ie ) {}}}return isSuccessful();}

进入submit()找到 return submitter .submitJobInternal(Job.this, cluster);

/*** Submit the job to the cluster and return immediately.* @throws IOException*/public void submit()throws IOException, InterruptedException, ClassNotFoundException {ensureState(JobState. DEFINE);setUseNewAPI();connect();final JobSubmitter submitter =getJobSubmitter( cluster.getFileSystem(), cluster.getClient());status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {public JobStatus run() throws IOException, InterruptedException,ClassNotFoundException {return submitter .submitJobInternal(Job.this, cluster);}});state = JobState. RUNNING;LOG.info( "The url to track the job: " + getTrackingURL());}

进入submitJobInternal()方法找到   int maps = writeSplits( job, submitJobDir);

 /*** Internal method for submitting jobs to the system.** <p>The job submission process involves:* <ol>*   <li>*   Checking the input and output specifications of the job.*   </li>*   <li>*   Computing the {@link InputSplit}s for the job.*   </li>*   <li>*   Setup the requisite accounting information for the*   {@link DistributedCache} of the job, if necessary.*   </li>*   <li>*   Copying the job's jar and configuration to the map-reduce system*   directory on the distributed file -system.*   </li>*   <li>*   Submitting the job to the <code>JobTracker</code> and optionally*   monitoring it's status.*   </li>* </ol></p>* @param job the configuration to submit* @param cluster the handle to the Cluster* @throws ClassNotFoundException* @throws InterruptedException* @throws IOException*/JobStatus submitJobInternal (Job job , Cluster cluster)throws ClassNotFoundException, InterruptedException, IOException {//validate the jobs output specscheckSpecs(job);Configuration conf = job.getConfiguration();addMRFrameworkToDistributedCache(conf);Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);//configure the command line options correctly on the submitting dfsInetAddress ip = InetAddress.getLocalHost();if (ip != null) {submitHostAddress = ip.getHostAddress();submitHostName = ip.getHostName();conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);}JobID jobId = submitClient.getNewJobID();job.setJobID( jobId);Path submitJobDir = new Path(jobStagingArea , jobId.toString());JobStatus status = null;try {conf.set(MRJobConfig. USER_NAME,UserGroupInformation.getCurrentUser().getShortUserName());conf.set("hadoop.http.filter.initializers","org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer" );conf.set(MRJobConfig. MAPREDUCE_JOB_DIR, submitJobDir.toString());LOG.debug( "Configuring job " + jobId + " with " + submitJobDir+ " as the submit dir");// get delegation token for the dirTokenCache.obtainTokensForNamenodes( job.getCredentials(),new Path[] { submitJobDir }, conf );populateTokenCache( conf, job.getCredentials());// generate a secret to authenticate shuffle transfersif (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {KeyGenerator keyGen;try {int keyLen = CryptoUtils.isShuffleEncrypted(conf)? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS ): SHUFFLE_KEY_LENGTH;keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);keyGen.init( keyLen);} catch (NoSuchAlgorithmException e ) {throw new IOException("Error generating shuffle secret key", e);}SecretKey shuffleKey = keyGen .generateKey();TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),job.getCredentials());}copyAndConfigureFiles( job, submitJobDir);Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);// Create the splits for the jobLOG.debug( "Creating splits at " + jtFs.makeQualified(submitJobDir ));int maps = writeSplits( job, submitJobDir);conf.setInt(MRJobConfig. NUM_MAPS, maps );LOG.info( "number of splits:" + maps );// write "queue admins of the queue to which job is being submitted"// to job file.String queue = conf.get(MRJobConfig. QUEUE_NAME,JobConf. DEFAULT_QUEUE_NAME);AccessControlList acl = submitClient.getQueueAdmins(queue );conf.set(toFullPropertyName(queue ,QueueACL. ADMINISTER_JOBS.getAclName()), acl.getAclString());// removing jobtoken referrals before copying the jobconf to HDFS// as the tasks don't need this setting, actually they may break// because of it if present as the referral will point to a// different job.TokenCache.cleanUpTokenReferral(conf );if (conf.getBoolean(MRJobConfig. JOB_TOKEN_TRACKING_IDS_ENABLED ,MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED )) {// Add HDFS tracking idsArrayList<String> trackingIds = new ArrayList<String>();for (Token<? extends TokenIdentifier> t :job.getCredentials().getAllTokens()) {trackingIds.add(t .decodeIdentifier().getTrackingId());}conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,trackingIds.toArray(new String[trackingIds.size()]));}// Set reservation info if it existsReservationId reservationId = job.getReservationId();if (reservationId != null) {conf.set(MRJobConfig. RESERVATION_ID, reservationId.toString());}// Write job file to submit dirwriteConf( conf, submitJobFile);//// Now, actually submit the job (using the submit name)//printTokens( jobId, job.getCredentials());status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());if (status != null) {return status ;} else {throw new IOException("Could not launch job");}} finally {if (status == null) {LOG.info("Cleaning up the staging area " + submitJobDir);if (jtFs != null && submitJobDir != null)jtFs.delete( submitJobDir, true );}}}

进入writeSplits方法找到 maps = writeNewSplits( job, jobSubmitDir);

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job ,Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {JobConf jConf = (JobConf) job.getConfiguration();int maps;if (jConf.getUseNewMapper()) {maps = writeNewSplits( job, jobSubmitDir);} else {maps = writeOldSplits( jConf, jobSubmitDir);}return maps;}

进入writeNewSplits找到  List<InputSplit> splits = input.getSplits( job);

private <T extends InputSplit>int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {Configuration conf = job.getConfiguration();InputFormat<?, ?> input =ReflectionUtils.newInstance(job.getInputFormatClass(), conf);List<InputSplit> splits = input.getSplits( job);T[] array = (T[]) splits.toArray( new InputSplit[splits.size()]);// sort the splits into order based on size, so that the biggest// go firstArrays.sort(array , new SplitComparator());JobSplitWriter.createSplitFiles(jobSubmitDir , conf ,jobSubmitDir.getFileSystem(conf ), array );return array. length;}

进入getsplits进入找到如下三行就是关键点代码

long minSize = Math. max(getFormatMinSplitSize(), getMinSplitSize(job));
 long maxSize = getMaxSplitSize(job);

long splitSize = computeSplitSize(blockSize, minSize, maxSize );

/*** Generate the list of files and make them into FileSplits.* @param job the job context* @throws IOException*/public List<InputSplit> getSplits(JobContext job) throws IOException {Stopwatch sw = new Stopwatch().start();long minSize = Math. max(getFormatMinSplitSize(), getMinSplitSize(job));long maxSize = getMaxSplitSize(job);// generate splitsList<InputSplit> splits = new ArrayList<InputSplit>();List<FileStatus> files = listStatus( job);for (FileStatus file: files) {Path path = file.getPath();long length = file.getLen();if (length != 0) {BlockLocation[] blkLocations;if (file instanceof LocatedFileStatus) {blkLocations = ((LocatedFileStatus) file).getBlockLocations();} else {FileSystem fs = path.getFileSystem(job .getConfiguration());blkLocations = fs .getFileBlockLocations(file, 0, length);}if (isSplitable(job , path )) {long blockSize = file .getBlockSize();long splitSize = computeSplitSize(blockSize, minSize, maxSize );long bytesRemaining = length ;while (((double ) bytesRemaining )/splitSize > SPLIT_SLOP) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining );splits.add(makeSplit( path, length-bytesRemaining , splitSize ,blkLocations[blkIndex ].getHosts(),blkLocations[blkIndex ].getCachedHosts()));bytesRemaining -= splitSize ;}if (bytesRemaining != 0) {int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining );splits.add(makeSplit( path, length-bytesRemaining , bytesRemaining ,blkLocations[blkIndex ].getHosts(),blkLocations[blkIndex ].getCachedHosts()));}} else { // not splitablesplits.add(makeSplit( path, 0, length, blkLocations[0].getHosts(),blkLocations[0].getCachedHosts()));}} else {//Create empty hosts array for zero length filessplits.add(makeSplit( path, 0, length, new String[0]));}}// Save the number of input files for metrics/loadgenjob.getConfiguration().setLong( NUM_INPUT_FILES, files.size());sw.stop();if (LOG.isDebugEnabled()) {LOG.debug( "Total # of splits generated by getSplits: " + splits.size()+ ", TimeTaken: " + sw .elapsedMillis());}return splits;}

分析:
(1)getFormatMinSplitSize()方法返回的值为1,
(2)getMinSplitSize( job)

/*** Get the minimum split size* @param job the job* @return the minimum number of bytes that can be in a split*/public static long getMinSplitSize(JobContext job) {return job.getConfiguration().getLong( SPLIT_MINSIZE, 1L);}

SPLIT_MINSIZE 看配置mapreduce.input.fileinputformat.split.minsize的值,默认在hadoop-mapreduce-client-core-2.6.0.jar配置文件mapred-default.xml中

<property><name>mapreduce.input.fileinputformat.split.minsize</name><value>0</value><description>The minimum size chunk that map input should be splitinto.  Note that some file formats may have minimum split sizes thattake priority over this setting.</description>
</property>

(3)getMaxSplitSize(job)方法
SPLIT_MAXSIZE看配置mapreduce.input.fileinputformat.split.maxsize的值,默认在hadoop-mapreduce-client-core-2.6.0.jar配置文件mapred-default.xml中没有进行配置,Long.MAX_VALUE= 2的63次方减1
所以最大值为

  /*** Get the maximum split size.* @param context the job to look at.* @return the maximum number of bytes a split can include*/public static long getMaxSplitSize(JobContext context) {return context.getConfiguration().getLong(SPLIT_MAXSIZE,Long.MAX_VALUE );},

(4)long minSize = Math. max(getFormatMinSplitSize(), getMinSplitSize(job));
Math.max(1,0)=1
minSize=1

8.FileInputFormat.java类的computeSplitSize方法,435行

  protected long computeSplitSize( long blockSize , long minSize,long maxSize ) {return Math. max(minSize, Math.min(maxSize , blockSize));}

分析:
minSize = 1
maxSize = 2的63次方减1
blockSize=一个块大小,默认为128M
Math.min(maxSize , blockSize )=128*1024
Math.max( 1,128*1024)=128*1024
所以默认情况下一个块就是一个map,这样做的好处是在执行map的时候不需要讲数据拷贝到map端,因为有的数据可能没有在map端需要进行拷贝。

由上面三个参数就可以计算出分片大小了,也能得到如下结论:
Split与block的对应关系可能是多对一,默认是一对一

在mapreduce的FileInputFormat类中的getSplits() 方法对文件进行split,算法如下:
Math.max(minSize,Math.min(maxSize, blockSize)),其中maxSize是取得longValueMax的值
1.如果blockSize小于maxSize && blockSize 大于 minSize之间,那么split就是blockSize(一对一);
2.如果blockSize小于maxSize && blockSize 小于 minSize之间,那么split就是minSize;(一对多)  ;
3.如果blockSize大于maxSize && maxSize   大于 minSize之间,那么split就是maxSize(多对一);
4.如果blockSize大于maxSize && maxSize   小于 minSize之间,那么split就是maxSize(不存在这种关系)。

在优化过程中,若想调整split大小控制map task的数量,原则如下:

文件大小不变,minsize大小默认,增加map task数量,减小maxSize,则split减小
文件大小不变,maxSize大小默认,减小map task数量,增大minSize,则split增大

注意:split大小如何调整,split只能是一个文件的分片,不能让多个小文件“划入”一个split中

number of splits 划分的条件相关推荐

  1. ValueError: Cannot have number of splits n_splits=3 greater than the number of samples: 1

    If the number of splits is greater than number of samples, you will get the first error. 如果划分类别数比样本数 ...

  2. ValueError: Cannot have number of splits n_splits=10 greater than the number of samples: n_samples=0

    项目场景: Linux系统 facenet模型 运行train_softmax.py时 问题描述: 类似这些问题: ValueError: Cannot have number of splits n ...

  3. ValueError: Cannot have number of splits n_splits=10 greater than the number of samples: 0

    报错信息: 错误原因: 数据集存在问题,没有LFW_dataloader数据部分. 解决方法: 将数据集放入对应的文件夹中.

  4. ML之FE:特征工程中常用的五大数据集划分方法(特殊类型数据分割,如时间序列数据分割法)讲解及其代码

    ML之FE:特征工程中常用的五大数据集划分方法(特殊类型数据分割,如时间序列数据分割法)讲解及其代码 目录 特殊类型数据分割 5.1.时间序列数据分割TimeSeriesSplit 特殊类型数据分割 ...

  5. Hadoop MapReduce Splits 切片源码分析及切片机制

    本文从Job提交,逐步分析Splits相关源码. 数据块:Block是HDFS物理上把数据分成一块一块的. 数据切片:数据切片只是在物理上输入进行分片,并不会在磁盘上将其分成片进行存储. 文件路径 o ...

  6. python for循环连续输入五个成绩判断等级_Python 入门(五)条件判断和循环

    有的时候,一个 if ... else ... 还不够用.比如,根据年龄的划分: 条件1:18岁或以上:adult 条件2:6岁或以上:teenager 条件3:6岁以下:kid 我们可以用一个 if ...

  7. 黑盒测试 ------ 等价类划分法

    文章目录 等价类划分法 等价类划分概念: 等价类的分类 等价类思考步骤 例子 等价类划分的总结 等价类划分法 计算器:到底输入几组数据才算测试完毕? 答案:一个一个测试效率低下,所以一定要分类测试. ...

  8. 【Python入门】5-1 条件判断 if、if-else、if-elif-else

    文章目录 1. if语句 2. if-else语句 3. if-elif-else语句 1. if语句 计算机之所以能做很多自动化的任务,因为它可以自己做条件判断. 比如,输入用户年龄,根据年龄打印不 ...

  9. 等价类划分和边界值法

    测试用例 指为特定的目的而设计的一组测试输入.执行条件和预期的结果,以便测试是否满足某个特定需求.通过大量的测试用例来检验软件的运行效果,它是指导测试工作进行的依据.(测什么,怎么测) 等价类划分法 ...

最新文章

  1. 前沿视频教室——《C#图解教程》是本好书,强烈推荐!
  2. Kali Linux 官方宣传视频
  3. [转]oracle查看数据文件, 控制文件, 及日志文件命令
  4. boost::function模块实现contains的测试程序
  5. 学会asp后再学php,九天学会ASP 之 第二天
  6. 利用mybatis插件开发动态更改sql
  7. 《软件工程》思维导图
  8. sklearn.datasets.base中Bunch类
  9. 【Android】_干货_制作透明背景icon
  10. jadx反编译—下载和使用
  11. 《汇编语言程序设计》(双语)上机指导
  12. 数字华容道有解的条件
  13. MySQL— 索引,视图,触发器,函数,存储过程,执行计划,慢日志,分页性能...
  14. Excel数据透视表之多重合并
  15. 廖雪峰 php教程,获取廖雪峰老师的Python3教程的php脚本
  16. 解决:微软应用商店(Microsoft Store)无法加载页面
  17. 四大排名函数(ROW_NUMBER、RANK、DENSE_RANK、NTILE)简介
  18. DoTween打字机效果
  19. 6款电脑必备软件,每一款都非常好用,当场起飞!
  20. 使用poi版本不同导致报错

热门文章

  1. RAID磁盘阵列之RAID 5
  2. sprd9820 来电归属地
  3. 2019拼多多前端笔试题
  4. VMBox加载拷贝的vid报错E_INVALIDARG (0x80070057) Cannot register the hard disk 'D:\VMBox\win7.vdi' {7bca5a3
  5. 编写应用程序,计算两个非零正整数的最大公约数和最小公倍数,要求两个非零正整数从键盘输入。
  6. CentOS6.5--修改系统语言为中文或英文,文件夹切换语言
  7. linux修改操作系统语言方法
  8. 【浙江大学】一个开源的知识图谱表示学习框架
  9. kubernetes 实用 api list
  10. springboot中对各个层的理解以及流程