在Hadoop中,启动作业运行的方式有很多,可以用命令行格式把打包好后的作业提交还可以,用Hadoop的插件进行应用开发,在这么多的方式中,都会必经过一个流程,作业会以JobInProgress的形式提交到JobTracker中。什么叫JobTracker呢,也许有些人了解Hadoop只知道他的MapReduce计算模型,那个过程只是其中的Task执行的一个具体过程,比较微观上的流程,而JobTrack是一个比较宏观上的东西。涉及到作业的提交的过程。Hadoop遵循的是Master/Slave的架构,也就是主从关系,对应的就是JobTracker/TaskTracker,前者负责资源管理和作业调度,后者主要负责执行由前者分配过来的作业。这样说的话,简单明了。JobTracker里面的执行的过程很多,那就得从开头开始分析,也就是作业最最开始的提交流程开始。后面的分析我会结合MapReduce的代码穿插式的分析,便于大家理解。

其实在作业的提交状态之前,还不会到达JobTacker阶段的,首先是到了MapReduce中一个叫JobClient的类中。也就是说,比如用户通过bin/hadoop jar xxx.jar把打包的jar包上传到系统中时,首先会触发的就是JobClient.。

public RunningJob submitJob(String jobFile) throws FileNotFoundException, InvalidJobConfException, IOException {// Load in the submitted job detailsJobConf job = new JobConf(jobFile);return submitJob(job);}

之后人家根据配置文件接着调用submitJob()方法

public RunningJob submitJob(JobConf job) throws FileNotFoundException,IOException {try {//又继续调用的是submitJobInternal方法return submitJobInternal(job);} catch (InterruptedException ie) {throw new IOException("interrupted", ie);} catch (ClassNotFoundException cnfe) {throw new IOException("class not found", cnfe);}}

来到了submitJobInternal的主要方法了

...jobCopy = (JobConf)context.getConfiguration();// Create the splits for the job 为作业创建输入信息FileSystem fs = submitJobDir.getFileSystem(jobCopy);LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));int maps = writeSplits(context, submitJobDir);jobCopy.setNumMapTasks(maps);// write "queue admins of the queue to which job is being submitted"// to job file.String queue = jobCopy.getQueueName();AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);jobCopy.set(QueueManager.toFullPropertyName(queue,QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());// Write job file to JobTracker's fs        FSDataOutputStream out = FileSystem.create(fs, submitJobFile,new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));try {jobCopy.writeXml(out);} finally {out.close();}//// Now, actually submit the job (using the submit name)//printTokens(jobId, jobCopy.getCredentials());//所有信息配置完毕,作业的初始化工作完成,最后将通过RPC方式正式提交作业status = jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials());JobProfile prof = jobSubmitClient.getJobProfile(jobId);

在这里他会执行一些作业提交之前需要进行的初始化工作,最后会RPC调用远程的提交方法。下面是一个时序图

至此我们知道,我们作业已经从本地提交出去了,后面的事情就是JobTracker的事情了,这个时候我们直接会触发的是JobTacker的addJob()方法。

private synchronized JobStatus addJob(JobID jobId, JobInProgress job) throws IOException {totalSubmissions++;synchronized (jobs) {synchronized (taskScheduler) {jobs.put(job.getProfile().getJobID(), job);//观察者模式,会触发每个监听器的方法for (JobInProgressListener listener : jobInProgressListeners) {listener.jobAdded(job);}}}myInstrumentation.submitJob(job.getJobConf(), jobId);job.getQueueMetrics().submitJob(job.getJobConf(), jobId);LOG.info("Job " + jobId + " added successfully for user '" + job.getJobConf().getUser() + "' to queue '" + job.getJobConf().getQueueName() + "'");AuditLogger.logSuccess(job.getUser(), Operation.SUBMIT_JOB.name(), jobId.toString());return job.getStatus();}

在这里设置了很多监听器,监听作业的一个情况。那么分析到这里,我们当然也也要顺便学习一下JobTracker的是怎么运行开始的呢。其实JobTracker是一个后台服务程序,他有自己的main方法入口执行地址。上面的英文是这么对此进行描述的:

/*** Start the JobTracker process.  This is used only for debugging.  As a rule,* JobTracker should be run as part of the DFS Namenode process.* JobTracker也是一个后台进程,伴随NameNode进程启动进行,main方法是他的执行入口地址*/public static void main(String argv[]) throws IOException, InterruptedException

上面说的很明白,作为NameNode的附属进程操作,NameNode跟JonTracker一样,全局只有一个,也是Master/Slave的关系对应的是DataNode数据结点。这些是HDFS相关的东西了。

public static void main(String argv[]) throws IOException, InterruptedException {StringUtils.startupShutdownMessage(JobTracker.class, argv, LOG);try {if(argv.length == 0) {//调用startTracker方法开始启动JobTrackerJobTracker tracker = startTracker(new JobConf());//JobTracker初始化完毕,开启里面的各项线程服务tracker.offerService();}else {if ("-dumpConfiguration".equals(argv[0]) && argv.length == 1) {dumpConfiguration(new PrintWriter(System.out));}else {System.out.println("usage: JobTracker [-dumpConfiguration]");System.exit(-1);}}} catch (Throwable e) {LOG.fatal(StringUtils.stringifyException(e));System.exit(-1);}}

里面2个主要方法,初始化JobTracker,第二个开启服务方法。首先看startTracker(),最后会执行到new JobTracker()构造函数里面去了:

JobTracker(final JobConf conf, String identifier, Clock clock, QueueManager qm) throws IOException, InterruptedException { .....    //初始化安全相关操作secretManager = new DelegationTokenSecretManager(secretKeyInterval,tokenMaxLifetime,tokenRenewInterval,DELEGATION_TOKEN_GC_INTERVAL);secretManager.startThreads();......// Read the hosts/exclude files to restrict access to the jobtracker.this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),conf.get("mapred.hosts.exclude", ""));//初始化ACL访问控制列表aclsManager = new ACLsManager(conf, new JobACLsManager(conf), queueManager);LOG.info("Starting jobtracker with owner as " +getMROwner().getShortUserName());// Create the schedulerClass<? extends TaskScheduler> schedulerClass= conf.getClass("mapred.jobtracker.taskScheduler",JobQueueTaskScheduler.class, TaskScheduler.class);//初始化Task任务调度器taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);// Set service-level authorization security policyif (conf.getBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {ServiceAuthorizationManager.refresh(conf, new MapReducePolicyProvider());}int handlerCount = conf.getInt("mapred.job.tracker.handler.count", 10);this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), handlerCount, false, conf, secretManager);if (LOG.isDebugEnabled()) {Properties p = System.getProperties();for (Iterator it = p.keySet().iterator(); it.hasNext();) {String key = (String) it.next();String val = p.getProperty(key);LOG.debug("Property '" + key + "' is " + val);}}

里面主要干了这么几件事:

1.初始化ACL访问控制列表数据

2.创建TaskSchedule任务调度器

3.得到DPC Server。

4.还有其他一些零零碎碎的操作....

然后第2个方法offService(),主要开启了各项服务;

public void offerService() throws InterruptedException, IOException {// Prepare for recovery. This is done irrespective of the status of restart// flag.while (true) {try {recoveryManager.updateRestartCount();break;} catch (IOException ioe) {LOG.warn("Failed to initialize recovery manager. ", ioe);// wait for some timeThread.sleep(FS_ACCESS_RETRY_PERIOD);LOG.warn("Retrying...");}}taskScheduler.start();.....this.expireTrackersThread = new Thread(this.expireTrackers,"expireTrackers");//启动该线程的主要作用是发现和清理死掉的任务this.expireTrackersThread.start();this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");//启动该线程的作用是清理长时间驻留在内存中且已经执行完的任务this.retireJobsThread.start();expireLaunchingTaskThread.start();if (completedJobStatusStore.isActive()) {completedJobsStoreThread = new Thread(completedJobStatusStore,"completedjobsStore-housekeeper");//该线程的作用是把已经运行完成的任务的信息保存到HDFS中,以便后续的查询completedJobsStoreThread.start();}// start the inter-tracker server once the jt is readythis.interTrackerServer.start();synchronized (this) {state = State.RUNNING;}LOG.info("Starting RUNNING");this.interTrackerServer.join();LOG.info("Stopped interTrackerServer");}

主要3大线程在这个方法里被开开启了,expireTrackersThread,retireJobsThread,completedJobsStoreThread,还有1个RPC服务的开启,interTrackerServer.start(),还有细节的操作就不列举出来了。好了JobTraker的close方法的流程刚刚好和以上的操作相反,之前启动过的线程统统关掉。

void close() throws IOException {//服务停止if (this.infoServer != null) {LOG.info("Stopping infoServer");try {this.infoServer.stop();} catch (Exception ex) {LOG.warn("Exception shutting down JobTracker", ex);}}if (this.interTrackerServer != null) {LOG.info("Stopping interTrackerServer");this.interTrackerServer.stop();}if (this.expireTrackersThread != null && this.expireTrackersThread.isAlive()) {LOG.info("Stopping expireTrackers");//执行线程中断操作this.expireTrackersThread.interrupt();try {//等待线程执行完毕再执行后面的操作this.expireTrackersThread.join();} catch (InterruptedException ex) {ex.printStackTrace();}}if (this.retireJobsThread != null && this.retireJobsThread.isAlive()) {LOG.info("Stopping retirer");this.retireJobsThread.interrupt();try {this.retireJobsThread.join();} catch (InterruptedException ex) {ex.printStackTrace();}}if (taskScheduler != null) {//调度器的方法终止taskScheduler.terminate();}if (this.expireLaunchingTaskThread != null && this.expireLaunchingTaskThread.isAlive()) {LOG.info("Stopping expireLaunchingTasks");this.expireLaunchingTaskThread.interrupt();try {this.expireLaunchingTaskThread.join();} catch (InterruptedException ex) {ex.printStackTrace();}}if (this.completedJobsStoreThread != null &&this.completedJobsStoreThread.isAlive()) {LOG.info("Stopping completedJobsStore thread");this.completedJobsStoreThread.interrupt();try {this.completedJobsStoreThread.join();} catch (InterruptedException ex) {ex.printStackTrace();}}if (jobHistoryServer != null) {LOG.info("Stopping job history server");try {jobHistoryServer.shutdown();} catch (Exception ex) {LOG.warn("Exception shutting down Job History server", ex);}}DelegationTokenRenewal.close();LOG.info("stopped all jobtracker services");return;}

至此,JobTracker的执行过程总算有了一个了解了吧,不算太难。后面的过程分析。JobTracker是如何把任务进行分解和分配的,从宏观上去理解Hadoop的工作原理。下面是以上过程的一个时序图

JobTracker作业启动过程分析相关推荐

  1. 嵌入式linux启动过程分析,嵌入式Linux裸机开发(二)——S5PV210启动过程分析

    嵌入式Linux裸机开发(二)--S5PV210启动过程分析 一.iROM启动方式简介 友善之臂Smart210开发板的SoC为三星S5PV210,S5PV210采用iROM启动方式进行启动,通过查阅 ...

  2. OpenWrt启动过程分析+添加自启动脚本【转】

    一.OpenWrt启动过程分析 转自: http://www.eehello.com/?post=107 总结一下OpenWrt的启动流程:1.CFE->2.linux->3./etc/p ...

  3. pixhawk PX4FMU和PX4IO最底层启动过程分析

    摘要: pixhawk PX4FMU和PX4IO最底层启动过程分析1.1 主处理器和协处理器的固件烧写和运行流程首先,大体了解PX4IO 与PX4FMU各自的任务.PX4IO(STM32F100)为P ...

  4. linux 重定位arm,Arm linxu启动过程分析(一)

    本文着重分析 FS2410 平台 linux-2.6.14 内核启动的详细过程,主要包括: zImage 解压缩阶段. vmlinux 启动汇编阶段. startkernel 到创建第一个进程阶段三个 ...

  5. linxu 启动过程分析

    linxu 启动过程分析 Linux启动过程如下:当用户打开PC的电源,BIOS开机自检,按BIOS中设置的启动设备(通常是硬盘)启动,接着启动设备上安装的引导程序lilo或grub开始引导Linux ...

  6. 开机SystemServer到ActivityManagerService启动过程分析

    开机SystemServer到ActivityManagerService启动过程 一 从Systemserver到AMS zygote-> systemserver:java入层口: /*** ...

  7. AliOS Things的启动过程分析(一)

    AliOS Things的启动过程分析(一) 在本篇文章中,我们以developerkit开发板为例,介绍AliOS Things的启动过程.AliOS Things支持多种工具链进行编译链接的方式生 ...

  8. Slurm作业启动原理

    文章目录 作业启动原理 交互式作业启动 批处理作业启动 分配式作业启动 作业启动原理 ​ 在slurm下,用户可以在三种模式下运行作业. 第一种也是最简单的模式是交互模式,其中stdout和stder ...

  9. Chromium的GPU进程启动过程分析

    Chromium除了有Browser进程和Render进程,还有GPU进程.GPU进程负责Chromium的GPU操作,例如Render进程通过GPU进程离屏渲染网页,Browser进程也是通过GPU ...

  10. Android开发入门教程2-Android init 启动过程分析

    Android init 启动过程分析   分析android的启动过程,从内核之上,我们首先应该从文件系统的init开始,因为 init 是内核进入文件系统后第一个运行的程序,通常我们可以在linu ...

最新文章

  1. 记录下,我们平时开发当中不得不知道的HTTP状态码
  2. 读书笔记:怪侠一枝梅 看后感
  3. jsp图片墙_JS实现的非常漂亮的3D立体照片墙显示效果
  4. 如何使用可外部化的接口在Java中自定义序列化
  5. Tomcat 配置 login 和 gas
  6. LeetCode 1259. 不相交的握手(DP)
  7. React个人入门总结《五》
  8. php判断目录是否有写的权限,PHP版目录权限检测
  9. Map对象与实体类Object对象转换
  10. 请领导过目文件怎么说_职场话题:当领导说“你定吧”,你会怎么做?
  11. wpf中使用ListView
  12. 用摄像管替换电视机电路里的显现管的摄像机
  13. 安装register
  14. 锐化pdf文件(图片形式)
  15. GBase8S_RSS配置
  16. POJ-2387:Dijkstra模板题
  17. 【Iriun Webcam】
  18. Plugin “GsonFormat“ is incompatible (supported only in IntelliJ IDEA).报错
  19. 导出iphone手机安装包的几种方法
  20. 如何从 Red Hat Enterprise Linux 6 升级到 Red Hat Enterprise Linux 7?

热门文章

  1. python字符串只保留字母_在字符串中只保留字母字符(多语言)
  2. 微信支付服务器白名单,微信测试号白名单怎么回事?如何进行微信支付测试?
  3. Python中关于with open file as 的用法
  4. 华为核心交换如何配置源地址转换_华为路由器和交换机配置地址转换
  5. Less系列之导入(Importing)
  6. 每天吃一个核桃好处多多,坚持半年以上,身体会发生五种变化
  7. 抖音GIF表情包制作教程 如何制作QQ动态表情包
  8. 运放参数的详细解释和分析-压摆率(SR)
  9. 微信支付-委托代扣学习资料整理
  10. AirServer投屏轻松地将iPhone、iPad投屏到Mac上面教程