MapReduce Job作业的提交过程可以分为本地提交模式与集群模式提交,这两种提交模式与org.apache.hadoop.mapred.LocalJobRunner、org.apache.hadoop.mapred.YARNRunner这两个类相关。在本篇文章中,将剖析Job作业本地的提交过程。用到了JVM的远程调试,具体操作请见这篇 “eclipse中远程调试JVM(以启动namenode进程为例)” 
所写的MapReduce程序、debug的操作步骤以及本文中涉及到的内容均以整理好打包上传,下载地址。

Job作业本地提交过程简略表示

先将整理得出的提交过程简略标示图,以方便阅读后文所写的关键代码解析

        job.submit() --> JobSubmitter.submitJobInternal() --> LocalJobRunner.submitJob(..)--> 转换Job为LocalJobRunner.Job的对象(线程,并启动)new LocalJobRunner(){...this.start();              //启动job线程}--> LocalJobRunner.run()1.创建mapRunnables集合          //map数量取决于切片的数量2.runTasks(mapRunnable集合)       3.创建reduceRunnables集合       //reduce数量需要手动进行设置4.runTasks(reduceRunnable集合)--> runTasks(...)for(Runnable r : runnables){service.submit(r);}--> MapTaskRunnable.run() \ ReduceTaskRunnable.run(){MapTask task = new MapTask(); \ ReduceTask task = new ReduceTask();task.run();}--> MapTask.run(){MyMaxTempMapper.run(){setup();while(){map(...);}cleanup();}}ReduceTask.run(){MyMaxTempMapper.run(){setup();while(){reduce(...);}cleanup();}}

源码解析

org.apache.hadoop.mapreduce.Job类

waitForCompletion()方法

  /*** 提交作业给集群并等待完成* Submit the job to the cluster and wait for it to finish.* @param verbose print the progress to the user* @return true if the job succeeded* @throws IOException thrown if the communication with the *         <code>JobTracker</code> is lost*/public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {//状态的提取if (state == JobState.DEFINE) {//调用自己的提交方法submit();}if (verbose) {monitorAndPrintJob();} else {// get the completion poll interval from the client.int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf());while (!isComplete()) {try {Thread.sleep(completionPollIntervalMillis);} catch (InterruptedException ie) {}}}return isSuccessful();}

submit()方法

  /*** 提交作业给集群立即返回* Submit the job to the cluster and return immediately.* @throws IOException*/public void submit() throws IOException, InterruptedException, ClassNotFoundException {//确认状态ensureState(JobState.DEFINE);//设置新型APIsetUseNewAPI();//连接,连接到集群connect();//创建了一个作业提交器,从集群中取出文件系统和Client,得到文件系统和客户端的值;通过集群的文件系统和集群的客户端,从而得到作业提交器final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());//匿名内部类new PrivilegedExceptionAction<JobStatus>()status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {//调用了JobSubmitter的submitJobInternal()方法(内部的提交方法)  最重要一步return submitter.submitJobInternal(Job.this, cluster);}});state = JobState.RUNNING;LOG.info("The url to track the job: " + getTrackingURL());}

connect()方法

  //使用了匿名内部类对象new PrivilegedExceptionAction<Cluster>()//目的就是创建集群对象,通过匿名内部类创建出一个集群对象,返回给cluster(cluster是Job类的成员变量)private synchronized void connect()throws IOException, InterruptedException, ClassNotFoundException {if (cluster == null) {cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() {public Cluster run()throws IOException, InterruptedException, ClassNotFoundException {//通过getConfiguration()方法将配置传给集群return new Cluster(getConfiguration());}});}}  

org.apache.hadoop.mapreduce.JobSubmitter类

JobSubmitter类中的submitJobInternal()方法

JobSubmitter.submitJobInternal(){...JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {//validate the jobs output specs   检查空间,如果输出目录存在就会抛出异常checkSpecs(job);//拿到job的配置Configuration conf = job.getConfiguration();//把conf放入分布式缓存(可以先不管)addMRFrameworkToDistributedCache(conf);//进入作业的阶段性区域(hdfs的临时目录)Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);//取得本地客户IP(作业提交客户端的ip地址,即在哪一台机子上进行提交)InetAddress ip = InetAddress.getLocalHost();if (ip != null) {//得到本机的提交地址submitHostAddress = ip.getHostAddress();//得到本机的主机名字submitHostName = ip.getHostName();//获取之后,在配置文件中进行submitHostName和submitHostAddress的设置conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);//创建一个新的作业ID(每个Job作业都有一个唯一的ID)JobID jobId = submitClient.getNewJobID();//对JobID进行设置job.setJobID(jobId);//通过jobid和临时目录构造出一个新的路径Path submitJobDir = new Path(jobStagingArea, jobId.toString());//作业状态JobStatus status = null;try{//设置用户名称conf.set(MRJobConfig.USER_NAME,UserGroupInformation.getCurrentUser().getShortUserName());//设置过滤器初始化conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");//设置作业目录//submitJobDir的值发生了改变,值为file:/tmp/hadoop-陶/mapred/staging/ì635285396/.staging/job_local635285396_0001//说明在./.staging/目录下还要创造一个子文件夹来存放作业conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());...//接下来的一长串操作是进行安全性控制,进行安全认证(hadoop可以加入安全认证的插件)// generate a secret to authenticate shuffle transfersif (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {KeyGenerator keyGen;try {keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);keyGen.init(SHUFFLE_KEY_LENGTH);} catch (NoSuchAlgorithmException e) {throw new IOException("Error generating shuffle secret key", e);}SecretKey shuffleKey = keyGen.generateKey();TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),job.getCredentials());}if (CryptoUtils.isEncryptedSpillEnabled(conf)) {conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);LOG.warn("Max job attempts set to 1 since encrypted intermediate" +"data spill is enabled");}//拷贝并配置文件copyAndConfigureFiles(job, submitJobDir);Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);// Create the splits for the job    为作业创建切片(map任务的个数取决于切片的个数)LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));int maps = writeSplits(job, submitJobDir);//执行完writeSplits()方法(该方法就是创建切片的过程)之后,在./.staging/job_local635285396_0001/目录下生成了四个文件//.job.split.crc、.job.splitmetainfo.crc校验和文件//job.split切片文件         job.splitmetainfo切片元信息文件//设置MapReduce的map任务数conf.setInt(MRJobConfig.NUM_MAPS, maps);//打印map信息LOG.info("number of splits:" + maps);//作业提交到作业队列中去,由作业队列进行管理// write "queue admins of the queue to which job is being submitted"// to job file.String queue = conf.get(MRJobConfig.QUEUE_NAME,JobConf.DEFAULT_QUEUE_NAME);AccessControlList acl = submitClient.getQueueAdmins(queue);conf.set(toFullPropertyName(queue,QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());...//写入job file到提交目录中(提交job.xml到提交目录)//job.xml文件里包含了四个配置文件里面所设置的所有参数writeConf(conf, submitJobFile);...//submitClient=LocalJobRunner//LocalJobRunner.submitJob()  通过执行器提交作业status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());}}}...
}

JobSubmitter类中的copyAndConfigureFiles()方法

JobSubmitter{...//使用命令行参数设置conf信息private void copyAndConfigureFiles(Job job, Path jobSubmitDir) throws IOException {JobResourceUploader rUploader = new JobResourceUploader(jtFs);rUploader.uploadFiles(job, jobSubmitDir);// Get the working directory. If not set, sets it to filesystem working dir// This code has been added so that working directory reset before running// the job. This is necessary for backward compatibility as other systems// might use the public API JobConf#setWorkingDirectory to reset the working// directory.job.getWorkingDirectory();}...
}

org.apache.hadoop.mapred.LocalJobRunner类

LocalJobRunner类中的submitJob方法

LocalJobRunner{...public org.apache.hadoop.mapreduce.JobStatus submitJob(org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir,Credentials credentials) throws IOException {//创建LocalJobRunner.job的内部类对象  Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);job.job.setCredentials(credentials);return job.status;}...
}

LocalJobRunner类中的内部类Job 
在Job类中涉及到了run、runTasks以及Job的内部类MapTaskRunnable与ReduceTaskRunnable

LocalJobRunner.Job extends Thread implements TaskUmbilicalProtocol{...public Job(JobID jobid, String jobSubmitDir) throws IOException {//通过临时目录下的job.xml创建JobConf对象this.systemJobDir = new Path(jobSubmitDir);this.systemJobFile = new Path(systemJobDir, "job.xml");this.id = jobid;JobConf conf = new JobConf(systemJobFile);//得到本地文件系统this.localFs = FileSystem.getLocal(conf);String user = UserGroupInformation.getCurrentUser().getShortUserName();//得到本地目录this.localJobDir = localFs.makeQualified(new Path(new Path(conf.getLocalPath(jobDir), user), jobid.toString()));//得到本地文件this.localJobFile = new Path(this.localJobDir, id + ".xml");// Manage the distributed cache.  If there are files to be copied,// this will trigger localFile to be re-written again.localDistributedCacheManager = new LocalDistributedCacheManager();localDistributedCacheManager.setup(conf);//通过流写入一个配置文件// Write out configuration file.  Instead of copying it from// systemJobFile, we re-write it, since setup(), above, may have// updated it.OutputStream out = localFs.create(localJobFile);try {//写入流。     写入job_local2086240933_0001.xmlconf.writeXml(out);} finally {//写入成功   生成job_local2086240933_0001.xml.crc文件和job_local2086240933_0001.xml文件out.close();}//上述一大段代码都是在准备目录、配置文件、配置空间,至此还没有执行任何的任务//通过localJobFile创建一个文件this.job = new JobConf(localJobFile);...//最终目的:启动线程this.start();...

Job类中的run()方法

    @Overridepublic void run() {//得到作业的IDJobID jobId = profile.getJobID();//拿到作业的上下文JobContext jContext = new JobContextImpl(job, jobId);org.apache.hadoop.mapreduce.OutputCommitter outputCommitter = null;try {outputCommitter = createOutputCommitter(conf.getUseNewMapper(), jobId, conf);} catch (Exception e) {LOG.info("Failed to createOutputCommitter", e);return;}try {//任务切片元信息,得到切片信息TaskSplitMetaInfo[] taskSplitMetaInfos = SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);//得到Reduce任务数int numReduceTasks = job.getNumReduceTasks();//安装作业  设置作业outputCommitter.setupJob(jContext);//设置作业进度status.setSetupProgress(1.0f);//输出文件Map<TaskAttemptID, MapOutputFile> mapOutputFiles =Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());//得到mappper对应的runnable个数(Runner.Job.MapTaskRunnable)    org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnableList<RunnableWithThrowable> mapRunnables = getMapTaskRunnables(taskSplitMetaInfos, jobId, mapOutputFiles);//初始化计数器initCounters(mapRunnables.size(), numReduceTasks);//创建一个map的线程池的执行器ExecutorService mapService = createMapExecutor();//开始运行map任务任务//注意:mapreduce的运行过程中,使用了线程池的技术(放到队列当中,在将来的某个时刻进行执行)runTasks(mapRunnables, mapService, "map");try {//如果reduce的个数大于0,再去执行reduce阶段if (numReduceTasks > 0) {//计算reduce对应的runnable个数List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables(jobId, mapOutputFiles);ExecutorService reduceService = createReduceExecutor();//开始运行reduce任务runTasks(reduceRunnables, reduceService, "reduce");}} finally {for (MapOutputFile output : mapOutputFiles.values()) {output.removeAll();}}// delete the temporary directory in output directoryoutputCommitter.commitJob(jContext);status.setCleanupProgress(1.0f);if (killed) {this.status.setRunState(JobStatus.KILLED);} else {this.status.setRunState(JobStatus.SUCCEEDED);}JobEndNotifier.localRunnerNotification(job, status);} catch (Throwable t) {try {outputCommitter.abortJob(jContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED);} catch (IOException ioe) {LOG.info("Error cleaning up job:" + id);}status.setCleanupProgress(1.0f);if (killed) {this.status.setRunState(JobStatus.KILLED);} else {this.status.setRunState(JobStatus.FAILED);}LOG.warn(id, t);JobEndNotifier.localRunnerNotification(job, status);} finally {try {fs.delete(systemJobFile.getParent(), true);  // delete submit dirlocalFs.delete(localJobFile, true);              // delete local copy// Cleanup distributed cachelocalDistributedCacheManager.close();} catch (IOException e) {LOG.warn("Error cleaning up "+id+": "+e);}}}

Job类中的runTasks()方法

    .../** Run a set of tasks and waits for them to complete. */private void runTasks(List<RunnableWithThrowable> runnables,ExecutorService service, String taskType) throws Exception {// Start populating the executor with work units.// They may begin running immediately (in other threads).for (Runnable r : runnables) {//进行提交  是一个线程池,执行map和reduceservice.submit(r);}...}

Job的内部类MapTaskRunnable

    MapTaskRunnable{...public void run() {try {//生成一个maptask的IDTaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, taskId), 0);LOG.info("Starting task: " + mapId);//将mapID加入到mapIds这个集合里来mapIds.add(mapId);//作业文件(实质上是一个job.xml文件,可通过watch查看),mapid,任务id,切片信息  去构造一个MapTask类型的对象MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId,info.getSplitIndex(), 1);map.setUser(UserGroupInformation.getCurrentUser().getShortUserName());//设置目录//map为MapTask类型的一个值,例如本次调试中所获取的值为:attempt_local335618588_0001_m_000000_0//localConf()加载配置文件的信息setupChildMapredLocalDirs(map, localConf);//创建一个map输出文件MapOutputFile mapOutput = new MROutputFiles();//设置配置信息mapOutput.setConf(localConf);mapOutputFiles.put(mapId, mapOutput);//localJobFile.toString()的值为file:/tmp/hadoop-陶/mapred/local/localRunner/ì?/job_local335618588_0001/job_local335618588_0001.xml//实质上是一个job_local335618588_0001.xml文件map.setJobFile(localJobFile.toString());localConf.setUser(map.getUser());map.localizeConfiguration(localConf);map.setConf(localConf);try {map_tasks.getAndIncrement();//launchMap()方法,进行启动mapmyMetrics.launchMap(mapId);map.run(localConf, Job.this);myMetrics.completeMap(mapId);} finally {map_tasks.getAndDecrement();}LOG.info("Finishing task: " + mapId);} catch (Throwable e) {this.storedException = e;}}...}}

Job的内部类ReduceTaskRunnable

    ReduceTaskRunnable{...public void run() {try {TaskAttemptID reduceId = new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, taskId), 0);LOG.info("Starting task: " + reduceId);ReduceTask reduce = new ReduceTask(systemJobFile.toString(),reduceId, taskId, mapIds.size(), 1);reduce.setUser(UserGroupInformation.getCurrentUser().getShortUserName());setupChildMapredLocalDirs(reduce, localConf);reduce.setLocalMapFiles(mapOutputFiles);if (!Job.this.isInterrupted()) {reduce.setJobFile(localJobFile.toString());localConf.setUser(reduce.getUser());reduce.localizeConfiguration(localConf);reduce.setConf(localConf);try {reduce_tasks.getAndIncrement();// 进行启动reducemyMetrics.launchReduce(reduce.getTaskID());// 开始运行reduce任务reduce.run(localConf, Job.this);myMetrics.completeReduce(reduce.getTaskID());} finally {reduce_tasks.getAndDecrement();}LOG.info("Finishing task: " + reduceId);...}}}...}
}

org.apache.hadoop.mapred.MapTask

MapTask{...@Overridepublic void run(final JobConf job, final TaskUmbilicalProtocol umbilical)throws IOException, ClassNotFoundException, InterruptedException {this.umbilical = umbilical;//判断是否是map任务if (isMapTask()) {// If there are no reducers then there won't be any sort. Hence the map // phase will govern the entire attempt's progress.//如果没有reduce就不做任何排序if (conf.getNumReduceTasks() == 0) {mapPhase = getProgress().addPhase("map", 1.0f);} else {// If there are reducers then the entire attempt's progress will be // split between the map phase (67%) and the sort phase (33%).mapPhase = getProgress().addPhase("map", 0.667f);//进行排序sortPhase  = getProgress().addPhase("sort", 0.333f);}}//启动一个汇报TaskReporter reporter = startReporter(umbilical);boolean useNewApi = job.getUseNewMapper();//进行初始化工作initialize(job, getJobID(), reporter, useNewApi);// check if it is a cleanupJobTaskif (jobCleanup) {runJobCleanupTask(umbilical, reporter);return;}if (jobSetup) {runJobSetupTask(umbilical, reporter);return;}if (taskCleanup) {runTaskCleanupTask(umbilical, reporter);return;}//判断是否是新型Apiif (useNewApi) {//调用自己所写的mapper类//作业信息,切片元信息,,报告runNewMapper(job, splitMetaInfo, umbilical, reporter);} else {//调用自己所写的mapper类runOldMapper(job, splitMetaInfo, umbilical, reporter);}done(umbilical, reporter);}...@SuppressWarnings("unchecked")private <INKEY,INVALUE,OUTKEY,OUTVALUE>void runNewMapper(final JobConf job,final TaskSplitIndex splitIndex,final TaskUmbilicalProtocol umbilical,TaskReporter reporter) throws IOException, ClassNotFoundException,InterruptedException {// make a task context so we can get the classes// 通过创建任务的上下文对象,获取类的对象org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(),reporter);// make a mapper// 创建一个mapper,通过反射的方式得到mapper(值为class com.zhaotao.hadoop.mr.MyMaxTempMapper)// taskContext.getMapperClass()的值为class com.zhaotao.hadoop.mr.MyMaxTempMapperorg.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)ReflectionUtils.newInstance(taskContext.getMapperClass(), job);// make the input format// 通过反射获取输入格式的对象org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);// rebuild the input split// 重建输入的切片org.apache.hadoop.mapreduce.InputSplit split = null;split = getSplitDetails(new Path(splitIndex.getSplitLocation()),splitIndex.getStartOffset());LOG.info("Processing split: " + split);org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =new NewTrackingRecordReader<INKEY,INVALUE>(split, inputFormat, reporter, taskContext);job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());org.apache.hadoop.mapreduce.RecordWriter output = null;// get an output object// 判断是否有reduceif (job.getNumReduceTasks() == 0) {output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter);} else {// 新建一个输出收集器(与自己重写的mapper方法种Context对象相关)===>等价于Context对象output = new NewOutputCollector(taskContext, job, umbilical, reporter);}// 将新建的输出收集器对象output放入MapContextImpl中,以此得到MapContext对象org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split);// 对mapContext进行包装,得到定义的Context类型的mapperContextorg.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(mapContext);try {// 先做初始化input.initialize(split, mapperContext);//具体内容看Mapper类中的run()方法mapper.run(mapperContext);mapPhase.complete();setPhase(TaskStatus.Phase.SORT);statusUpdate(umbilical);input.close();input = null;output.close(mapperContext);output = null;} finally {closeQuietly(input);closeQuietly(output, mapperContext);}}...
}

org.apache.hadoop.mapreduce.Mapper

Mapper{...// map的执行过程中分为三个阶段// a.安装阶段// b.循环调用map()阶段   (只有该阶段是自己定义的)   在Java中,这是一个典型的回调机制(只有在重写方法的时候进行回调机制)// c.清除阶段// 传入了Context类型的参数public void run(Context context) throws IOException, InterruptedException {setup(context);try {// 使用循环,不断调用map方法(实质上就是自己重写的map方法,对其不断的调用,用于处理文本行)// 判断是否存在下一个Key - Valuewhile (context.nextKeyValue()) {// 存在,取出Key值、Value值,并传入上下文contextmap(context.getCurrentKey(), context.getCurrentValue(), context);}} finally {// 最后对其进行清理cleanup(context);}}...
}

org.apache.hadoop.mapreduce.lib.map.WrappedMapper

WrappedMapper{...@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {// 返回Key值return mapContext.nextKeyValue();}@Overridepublic Counter getCounter(Enum<?> counterName) {// 返回Value值return mapContext.getCounter(counterName);}...
}

org.apache.hadoop.mapred.ReduceTask

ReduceTask{...@Override@SuppressWarnings("unchecked")public void run(JobConf job, final TaskUmbilicalProtocol umbilical)throws IOException, InterruptedException, ClassNotFoundException {job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());// 判断是Map或者Reduceif (isMapOrReduce()) {// 添加一个拷贝阶段copyPhase = getProgress().addPhase("copy");// 增加一个排序阶段sortPhase  = getProgress().addPhase("sort");// 增加一个reduce化简阶段reducePhase = getProgress().addPhase("reduce");}// start thread that will handle communication with parentTaskReporter reporter = startReporter(umbilical);// 判断是否使用了新的Reducer  Apiboolean useNewApi = job.getUseNewReducer();// 利用上述值进行初始化操作initialize(job, getJobID(), reporter, useNewApi);// check if it is a cleanupJobTaskif (jobCleanup) {runJobCleanupTask(umbilical, reporter);return;}if (jobSetup) {runJobSetupTask(umbilical, reporter);return;}if (taskCleanup) {runTaskCleanupTask(umbilical, reporter);return;}// Initialize the codec// 检查map的输出是否被压缩了codec = initCodec();// 一个迭代器RawKeyValueIterator rIter = null;// shuffle洗牌ShuffleConsumerPlugin shuffleConsumerPlugin = null;Class combinerClass = conf.getCombinerClass();CombineOutputCollector combineCollector = (null != combinerClass) ? new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;Class<? extends ShuffleConsumerPlugin> clazz =job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);// 正式开启reduce之前的shuffle过程ShuffleConsumerPlugin.Context shuffleContext = new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, codec, combinerClass, combineCollector, spilledRecordsCounter, reduceCombineInputCounter,shuffledMapsCounter,reduceShuffleBytes, failedShuffleCounter,mergedMapOutputsCounter,taskStatus, copyPhase, sortPhase, this,mapOutputFile, localMapFiles);// shuffle完成shuffleConsumerPlugin.init(shuffleContext);// rIter是一个MergeQueue合并队列rIter = shuffleConsumerPlugin.run();// free up the data structuresmapOutputFilesOnDisk.clear();// 排序已经完成sortPhase.complete();                         // sort is completesetPhase(TaskStatus.Phase.REDUCE); statusUpdate(umbilical);Class keyClass = job.getMapOutputKeyClass();Class valueClass = job.getMapOutputValueClass();RawComparator comparator = job.getOutputValueGroupingComparator();// 判断reduce的Api是否为新版本if (useNewApi) {// 开始运行reduce任务runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);} else {runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);}shuffleConsumerPlugin.close();done(umbilical, reporter);}...@SuppressWarnings("unchecked")private <INKEY,INVALUE,OUTKEY,OUTVALUE>void runNewReducer(JobConf job,final TaskUmbilicalProtocol umbilical,final TaskReporter reporter,RawKeyValueIterator rIter,RawComparator<INKEY> comparator,Class<INKEY> keyClass,Class<INVALUE> valueClass) throws IOException,InterruptedException, ClassNotFoundException {// wrap value iterator to report progress.final RawKeyValueIterator rawIter = rIter;rIter = new RawKeyValueIterator() {public void close() throws IOException {rawIter.close();}public DataInputBuffer getKey() throws IOException {return rawIter.getKey();}public Progress getProgress() {return rawIter.getProgress();}public DataInputBuffer getValue() throws IOException {return rawIter.getValue();}public boolean next() throws IOException {boolean ret = rawIter.next();reporter.setProgress(rawIter.getProgress().getProgress());return ret;}};// make a task context so we can get the classes// 创建一个任务上下文来获取classorg.apache.hadoop.mapreduce.TaskAttemptContext taskContext =new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,getTaskID(), reporter);// make a reducer// 通过反射的方式获取reducer对象// taskContext.getReducerClass()的值为com.zhaotao.hadoop.mr.MyMaxTempReducerorg.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)ReflectionUtils.newInstance(taskContext.getReducerClass(), job);org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);job.setBoolean("mapred.skip.on", isSkipping());job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, getTaskID(),rIter, reduceInputKeyCounter, reduceInputValueCounter, trackedRW,committer,reporter, comparator, keyClass,valueClass);try {// 进入并开始reduce的运行reducer.run(reducerContext);} finally {trackedRW.close(reducerContext);}}...
}

org.apache.hadoop.mapreduce.Reducer

Reducer{...// 分三阶段运行,与mapper的run()方法运行的一样public void run(Context context) throws IOException, InterruptedException {setup(context);try {while (context.nextKey()) {reduce(context.getCurrentKey(), context.getValues(), context);// If a back up store is used, reset itIterator<VALUEIN> iter = context.getValues().iterator();if(iter instanceof ReduceContext.ValueIterator) {((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        }}} finally {cleanup(context);}}...
}

Job作业本地提交流程分析图

MapReduce Job本地提交过程源码跟踪及分析相关推荐

  1. MapReduce Job集群提交过程源码跟踪及分析

    继上篇文章对MapReduce Job本地提交过程进行分析之后  在本篇文章中,同样将通过debug的方式,对Job作业的集群提交过程进行分析  Job作业集群的提交有别于本地的提交方式,本地的提交采 ...

  2. MapReduce中Client提交Job源码分析

    回顾 在进行submit源码分析之前,先来回顾一下WordCount案例(点击查看WordCount案例).仔细回想一下曾经Client都干了点啥?获取对象-->一通set-->job.w ...

  3. Android耳机耳机,Android 耳机插拔流程源码跟踪浅析

    Android 开发过程中,使用耳机控制拍照,控制音乐播放,控制打电话等,线控再到蓝牙控... 耳机也在不断升级,耳机插拔的程序这一块也在不断完善.因此,在定制开发过程中,阅读这部分流程代码是必修的功 ...

  4. .net webim 源码_Netty服务器启动过程源码带你分析「你能坚持看完吗?」

    基本说明 1.只有看过Netty源码,才能说是真正的掌握了Netty框架: 2.在io.netty.example包下,有很多netty源码案例,可以用来分析: 3.源码分析,是针对有Java项目经验 ...

  5. Android端WebRTC本地音视频采集流程源码分析

    WebRTC源码版本为:org.webrtc:google-webrtc:1.0.32006 本文仅分析Java层源码,在分析之前,先说明一下一些重要类的基本概念. MediaSource:WebRT ...

  6. hadoop源码分析_Spark2.x精通:Job触发流程源码深度剖析(一)

    , 一.概述  之前几篇文章对Spark集群的Master.Worker启动流程进行了源码剖析,后面直接从客户端角度出发,讲解了spark-submit任务提交过程及driver的启动:集群启动.任务 ...

  7. Myth源码解析系列之六- 订单下单流程源码解析(发起者)

    前面一章我们走完了服务启动的源码,这次我们进入下单流程的源码解析~ 订单下单流程源码解析(发起者) 首先保证myth-demo-springcloud-order.myth-demo-springcl ...

  8. 【SRIO】5、Xilinx RapidIO核例子工程源码分析

    目录 一.软件平台与硬件平台 二.打开例子工程 三.例子工程详解 3.1 工程概述 3.2 工程结构 3.3 工程分析 四.工程源码分析 3.1 顶层模块srio_example_top.v源码分析 ...

  9. 5.Xilinx RapidIO核例子工程源码分析

    https://www.cnblogs.com/liujinggang/p/10091216.html 一.软件平台与硬件平台 软件平台: 操作系统:Windows 8.1 64-bit 开发套件:V ...

最新文章

  1. Java笔记整理-02.Java基础语法
  2. spring data实现自定义的repository实现类,实现跟jpa联通
  3. oracle中更改列明和更改显示列长度
  4. c++面试题之标准模板库
  5. 树莓派:树莓派的刷机和登录,以及更新新版vim方便使用
  6. Java中使用Jedis连接池连接Redis数据库流程
  7. 你还在 996 ?提升工作效率请看这几个公众号
  8. 前端学习(3103):vue+element今日头条管理-hello-react案例
  9. Chapter1-3_Speech_Recognition(CTC, RNN-T and more)
  10. 数据可视化及数据保存
  11. 时间序列-LSTM长期预测
  12. 选择符/选择器 深入了解 全面的选择器学习
  13. SQL注入-DNSlog注入(手工加工具使用)
  14. 首次 golang爬虫插件gocolly/colly 使用经历
  15. Android实现AirPlay,DriodAirPlay开发
  16. Oracle-数据库的备份与恢复
  17. 腐蚀rust服务器命令_服务器指令_腐蚀rust服务器命令一览 腐蚀rust有哪些服务器命令_3DM单机...
  18. edge保存页面html,Edge浏览器怎么保存网页 保存网页方法介绍
  19. 分析评估和定位声音质量
  20. 计算机组装拆卸 心得,学习组装电脑的心得体会怎么写?

热门文章

  1. python 关键字参数为什么只能出现在最后_Python笔记2——默认参数,可变参数,关键字参数,参数组合...
  2. recyclerview item点击无效_Android列表布局专题(一)RecyclerView
  3. axure如何实现跳转_Axure教程:网易云音乐首页原型设计
  4. m3u8地址_「波波带你手动提取网页视频」04讲 Network和Elements提取m3u8链接
  5. Android程序员 2019,VS 2019开发APP(一)界面和代码
  6. cad自动标注界址点_这样绘制cad施工图,提升效率至少3倍以上!
  7. sql数据迁移到oracle数据库,从Oracle到SQL Server数据库主键的迁移
  8. python常用运维模块_python常用模块之一
  9. 阿里云mysql写入性能_插入mysql类型
  10. 计算机网络的组成和结构ppt,常见的计算机网络拓扑结构PPT课件.pptx