TaskTracker执行map或reduce任务的过程(二)
上次说到,当MapLauncher或ReduceLancher(用于执行任务的线程,它们扩展自TaskLauncher),从它们所维护的LinkedList也即队列中获取到TaskInProgress,并且TaskTracker有空闲的slot时,该线程就调用了TaskTracker的startNewTask(tip)方法,如下所示:
public void run() {while (!Thread.interrupted()) {try {TaskInProgress tip;Task task;synchronized (tasksToLaunch) {while (tasksToLaunch.isEmpty()) {tasksToLaunch.wait();//当队列为空时呗阻塞,知道有新的tip到来才会被唤醒 }//get the TIPtip = tasksToLaunch.remove(0);task = tip.getTask();......//当有空闲的slot时执行启动一个任务 startNewTask(tip);......}}
接下了来就让我们看下startNewTask(tip)的神秘面纱吧,由于在其内部通过实习Runnable创建了一个线程,我们只需分析线程体的run方法即可,关键代码如下,为便于说明,给3个核心语句分别标识为**1,**2:
public void run() {try {RunningJob rjob = localizeJob(tip); //**1tip.getTask().setJobFile(rjob.getLocalizedJobConf().toString()); // task本地化已经完成,此刻如果rjob.jobConf或者rjob.ugi为空的话,会抛出异常 launchTaskForJob(tip, new JobConf(rjob.getJobConf()), rjob); //**2 ...... } }
**1的源码如下,
Task t = tip.getTask();JobID jobId = t.getJobID();RunningJob rjob = addTaskToJob(jobId, tip);InetSocketAddress ttAddr = getTaskTrackerReportAddress();
从中我们可以看出,首先创建了一个该任务所属的RunningJob,并把它放入到一个该TaskTracker所维护的TreeMap<jobId,RunningJob>中,同时在RunningJob中记录将要执行的task,也即把tip放入到RunningJob.tasks(一个HashSet<TaskInProgress>)中。由此,我们可以知道,每个TaskTracker都维护者一个TreeMap用以记录它正在执行的哪个作业的哪些任务(map、reduce任务)。
接下来localizeJob(tip)要做的就是调用initializeJob(t, rjob, ttAddr)初始化工作目录,并下载相应的job.xml以及job.jar(TaskController负责)文件,TaskController最后调用RunJar.unJar()将包解压到相应的工作目录,,至此初始化工作完成,调用launchTaskForJob开始执行Task。
**2的核心代码为:
protected void launchTaskForJob(TaskInProgress tip, JobConf jobConf,RunningJob rjob) throws IOException {synchronized (tip) {jobConf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY,localStorage.getDirsString());tip.setJobConf(jobConf);tip.setUGI(rjob.ugi);tip.launchTask(rjob);}}
由此看出,它主要是调用TaskTracker.TaskInProgress的launchTask()方法,在该方法中它创建了一个TaskRunner线程,并启这个线程执行这个task,其run方法核心代码如下:
public final void run() { //设置工作目录 final File workDir = new File(new Path(localdirs[rand.nextInt(localdirs.length)], TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(), taskid.toString(),t.isTaskCleanupTask())).toString());...... // 设置环境变量List<String> classPaths = getClassPaths(conf, workDir,taskDistributedCacheManager);....... //启动Task子进程launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);}}
未完待续......
转载于:https://www.cnblogs.com/yueliming/p/3287810.html
TaskTracker执行map或reduce任务的过程(二)相关推荐
- TaskTracker获取并执行map或reduce任务的过程(一)
我们知道TaskTracker在默认情况下,每个3秒就行JobTracker发送一个心跳包,也就是在这个心跳包中包含对任务的请求.JobTracker返回给TaskTracker的心跳包中包含有各种a ...
- MapReduce剖析笔记之五:Map与Reduce任务分配过程
转载:https://www.cnblogs.com/esingchan/p/3940565.html 在上一节分析了TaskTracker和JobTracker之间通过周期的心跳消息获取任务分配结果 ...
- Python基础知识——函数的基本使用、函数的参数、名称空间与作用域、函数对象与闭包、 装饰器、迭代器、生成器与yield、函数递归、面向过程与函数式(map、reduce、filter)
文章目录 1 函数的基本使用 一 引入 二 定义函数 三 调用函数与函数返回值 2 函数的参数 一 形参与实参介绍 二 形参与实参的具体使用 2.1 位置参数 2.2 关键字参数 2.3 默认参数 2 ...
- Yarn中map、reduce任务运行容器YarnChild分析
在对Yarn上MRAppMaster组件详解以及任务资源申请.启动的源码分析的分析中可以知道,真正用于执行MapTask任务.ReduceTask任务的进程容器为YarnChild进程,接下来对该Ya ...
- Spark RDD API:Map和Reduce
参考文章: http://blog.csdn.net/jewes/article/details/39896301 http://homepage.cs.latrobe.edu.au/zhe/Zhen ...
- Python函数式编程——map()、reduce()
提起map和reduce想必大家并不陌生,Google公司2003年提出了一个名为MapReduce的编程模型[1],用于处理大规模海量数据,并在之后广泛的应用于Google的各项应用中,2006年A ...
- 函数式编程filter、map、reduce
函数式编程filter.map.reduce (本文是一篇学习笔记和自己对filter.map.reduce的理解 参考:link. 作为一名半路出家的程序员,还记得第一门学习的编程语言是C,那种面向 ...
- c++ map用法_Python专题——五分钟带你了解map、reduce和filter
点击上方蓝字,和我一起学技术.今天是Python专题第6篇文章,给大家介绍的是Python当中三个非常神奇的方法:map.reduce和filter.不知道大家看到map和reduce的时候有没有什么 ...
- java filter函数的用法_5分钟掌握Python | Map、Reduce和Filter如何运用?
文末有惊喜哦 天给大家介绍的是Python当中三个非常神奇的方法:map.reduce和filter. 一.Map Map除了地图之外,另一个英文本意是映射.在C++和Java一些语言当中,将map进 ...
最新文章
- 理解 iOS 和 macOS 的内存管理
- MySQL exists的用法介绍
- 第五十四期:Libra盟友纷纷“跳船”,联盟链还有戏吗?
- perl-regexp_使用Regexp :: Common在Perl中轻松进行数据验证
- csrf防御 php,跨站请求伪造CSRF的防御实例(PHP版本)
- C# Get请求携带body
- 基于javaEE的医院病历管理系统的设计与实现
- Mugeda (木疙瘩)H5案例课—交互动画类H5制作-岑远科-专题视频课程
- 爬取《NBA30支球队》“现役球员信息”,分别存储到3种不同的数据库!
- 正弦波、方波、三角波的产生和两两之间相互转换
- Android 快速集成文档校正能力 超简单
- Excel中如何将文本链接转成跳转链接
- 解决网易服务器延迟大,网易WOW服务器延迟
- Linux系统管理员之日志管理
- Visa在2020年东京奥运会和残奥会前公布“Visa之队”阵容
- Springboot Application 集成 OSGI 框架开发
- 计算机应用期刊三次外审,期刊论文一般外审几次
- 10、Health Check、及机制Liveness 探测、机制Readiness 探测、在Scale Up(伸缩)中使用Health Check、
- 龙芯软件开发 10 --龙芯2E指令
- 【干货分享|建议收藏】2w字爆肝详解 JavaScript对象
热门文章
- 我的内核学习笔记5:proc目录文件创建及读写
- 学业水平考试容易过吗_天水普通高中冬季学业水平考试圆满结束
- charles 代理手机连不上网_「技巧」不想接电话?这样可以让手机变成空号,还不影响上网...
- 05-netty小例子
- 【Redis】Redis 基础知识 常用命令 命令积累
- 【Elasticsearch】Elasticsearch之别名
- 【Flink】Flink Serving 天池快速上手 【视频笔记】
- 【JVM】G1垃圾回收器
- Spring : @Value注解
- 【zookeeper】zookeeper的命令行操作zkCli.sh