JobTracker是整个MapReduce计算框架中的主服务,相当于集群的“管理者”,负责整个集群的作业控制和资源管理。本文对JobTracker的启动过程及心跳接收与应答两个主要功能进行分析。

1 JobTracker启动过程

1.1 各种线程功能

函数offerService()会启动JobTracker内部几个比较重要的后台服务进程,分别是expireTrackersThread、retireJobsThread、expireLaunchingTaskThread和completedJobsStoreThread。相关代码如下:

public class JobTracker {

... ...

ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();

Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks,

"expireLaunchingTasks");

... ...

public void offerService() throws InterruptedException, IOException {

... ...

// expireTrackersThread后台服务进程。

this.expireTrackersThread = new Thread(this.expireTrackers,

"expireTrackers");

this.expireTrackersThread.start();

// retireJobsThread后台服务进程。

this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");

this.retireJobsThread.start();

// expireLaunchingTaskThread后台服务进程。

expireLaunchingTaskThread.start();

// completedJobsStoreThread后台服务进程。

if (completedJobStatusStore.isActive()) {

completedJobsStoreThread = new Thread(completedJobStatusStore,

"completedjobsStore-housekeeper");

completedJobsStoreThread.start();

}

... ...

}

}

下面分别介绍这几个服务线程。

1) expireTrackersThread线程

该线程主要用于发现和清理死掉的TaskTracker。每个TaskTracker会周期性地通过心跳向JobTracker汇报信息,而JobTracker会记录每个TaskTracker最近的汇报心跳时间。如果某个TaskTracker在10分钟内未汇报心跳,则JobTracker认为它已死掉,并将经的相关信息从数据结构trackToJobsToCleanup、trackerToTasksToCleanup、trackerToMarkedTasksMap中清除,同时将正在运行的任务状态标注为KILLED_UNCLEAN。

2) retireJobsThread线程

该线程主要用于清理长时间驻留在内存中的已经运行完成的作业信息。JobTracker会将已经运行完成的作业信息存放到内存中,以便外部查询,但随着完成的作业越来越多,势必会占用JobTracker的大量内存,为此,JobTracker通过该线程清理驻留在内存中较长时间的已经运行完成的作业信息。

当一个作业满足如下条件1、2或者条件1、3时,将被从数据结构jobs转移到过期作业队列中。

条件1  作业已经运行完成,即运行状态为SUCCESSED、FAILED或KILLED。

条件2  作业完成时间距现在已经超过24小时(可通过参数mapred.jobtracker.retirejob.interval配置)。

条件3  作业拥有者已经完成作业总数超过100(可通过参数mapred.jobtracker.completeuserjobs.maximum配置)个。

过期作业被统一保存到过期队列中。当过期作业超过1000个(可通过参数mapred.job.tracker.retiredjobs.cache.size配置)时,将会从内存中彻底删除。

3) expireLaunchingTaskThread线程

该线程用于发现已经被分配给某个TaskTracker但一直未汇报信息的任务。当JobTracker将某个任务分配给TaskTracker后,如果该任务在10分钟内未汇报进度,则JobTracker认为该任务分配失败,并将其状态标注为FAILED。

4) completedJobsStoreThread线程

该线程将已经运行完成的作业运行信息保存到HDFS上,并提供了一套存取这些信息的API。该线程能够解决以下两个问题。

n 用户无法获取很久之前的作业运行信息:前面提到线程retireJobsThread会清除长时间驻留在内存中的完成作业,这会导致用户无法查询很久之前某个作业的运行信息。

n JobTracker重启后作业运行信息丢失:当JobTracker因故障重启后,所有原本保存到内存中的作业信息将会全部丢失。

该线程通过保存作业运行日志的方式,使得用户可以查询任意时间提交的作业和还原作业的运行信息。

默认情况下,该线程不会启用,可以通过下表所示的几个参数配置并启用该线程。

配置参数

参数含义

mapred.job.tracker.persist.jobstatus.active

是否启用该线程

mapred.job.tracker.persist.jobstatus.hours

作业运行信息保存时间

mapred.job.tracker.persist.jobstatus.dir

作业运行信息保存路径

1.2 作业恢复

在MapReduce中,JobTracker存在单点故障问题。如果它因异常退出后重启,那么所有正在运行的作业运行时信息将丢失。如果不采用适当的作业恢复机制对作业信息进行恢复,则所有作业需重新提交,且已经计算完成的任务需重新计算。这势必造成资源浪费。

为了解决JobTracker面临的单点故障问题,Hadoop设计了作业恢复机制,过程如下:作业从提交到运行结束的整个过程中,JobTracker会为一些关键事件记录日志(由JobHistory类完成)。对于作业而言,关键事件包括作业提交、作业创建、作业开始运行、作业运行完成、作业运行失败、作业被杀死等;对于任务而言,关键事件包括任务创建、任务开始运行、任务运行结束、任务运行失败、任务被杀死等。当JobTracker因故障重启后(重启过程中,所有TaskTracker仍然活着),如果管理员启用了作业恢复功能(将参数mapred.jobtracker.restart.recover置为true),则JobTracker会检查是否存在需要恢复运行状态的作业,如果有,则通过日志恢复这些作业的运行状态(由RecoveryManager类完成),并重新调度那些未运行完成的任务(包括产生部分结果的任务)。

2 心跳接收与应答

心跳是沟通TaskTracker和JobTracker的桥梁,它实际上是一个RPC函数。TaskTracker周期性地调用该函数汇报节点和任务状态信息,从而形成心跳。在Hadoop中,心跳主要有三个作用:

n 判断TaskTracker是否活着。

n 及时让JobTracker获取各个节点上的资源使用情况和任务运行状态。

n 为TaskTracker分配任务。

TaskTracker周期性地调用RPC函数heartbeat向JobTracker汇报信息和领取任务。该函数定义如下:

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,

boolean restarted,boolean initialContact,boolean acceptNewTasks,

short responseId)

该函数的各个参数含义如下。

status

该参数封装了TaskTracker上的各种状态信息。包括:

String trackerName;//TaskTracker名称

String host;//TaskTracker主机名

int httpPort;//TaskTracker对外的HTTP端口号

int failures;//该TaskTracker上已经失败的任务总数

List<TaskStatus> taskReports;//正在运行的各个任务运行状态

volatile long lastSeen;//上次汇报心跳的时间

private int maxMapTasks;/*Map slot总数,即允许同时运行的Map Task总数,由参数mapred.tasktracker.map.tasks.maximum设定*/

private int maxReduceTasks;//Reduce slot总数

private TaskTrackerHealthStatus healthStatus;//TaskTracker健康状态

private ResourceStatus resStatus;//TaskTracker资源(内存,CPU等)信息

restarted

表示TaskTracker是否刚刚重新启动。

initialContact

表示TaskTracker是否初次连接JobTracker

acceptNewTasks

表示TaskTracker是否可以接收新任务,这通常取决于slot是否有剩余和节点健康状态等。

responseId

表示心跳响应编号,用于防止重复发送心跳。每接收一次心跳后,该值加1。

该函数的返回值为一个HeartbeatResponse对象,该对象主要封装了JobTracker向TaskTracker下达的命令,具体如下:

class HeartbeatResponse implements Writable, Configurable {

... ...

short responseId;    // 心跳响应编号

int heartbeatInterval;    // 下次心跳的发送间隔

TaskTrackerAction[] actions;   // 来自JobTracker的命令,可能包括杀死作业等

Set<JobID> recoveredJobs = new HashSet<JobID>();  // 恢复完成的作业列表。

... ...

}

该函数的内部实现逻辑主要分为两个步骤:更新状态和下达命令。JobTracker首先将TaskTracker汇报的最新任务运行状态保存到相应数据结构中,然后根据这些状态信息和外界需求为其下达相应的命令。

2.1 更新状态

函数heartbeat首先会更新TaskTracker/Job/Task的状态信息。相关代码如下:

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,

boolean restarted, boolean initialContact, boolean acceptNewTasks,

short responseId)  {

... ...

/* Make sure heartbeat is from a tasktracker allowed by the jobtracker.

当一个TaskTracker在host list(由参数mapred.hosts指定)中,但不在exclude list(由参数mapred.hosts.exclude指定)中时,可接入到JobTracker */

if (!acceptTaskTracker(status)) {

throw new DisallowedTaskTrackerException(status);

}

// 如果该TaskTracker被重启了,则将之标注为健康的TaskTracker,并从黑名单或者灰名单中清除,否则,启动TaskTracker容错机制以检查它是否处于健康状态。

if (restarted) {

faultyTrackers.markTrackerHealthy(status.getHost());

} else {

faultyTrackers.checkTrackerFaultTimeout(status.getHost(), now);

}

......

// Process this heartbeat

short newResponseId = (short)(responseId + 1);

// 记录心跳发送时间,以发现在一定时间内未发送心跳的TaskTracker,交将其标注为死亡的TaskTracker,此后不可再向其分配新任务。

status.setLastSeen(now);

if (!processHeartbeat(status, initialContact, now)) { // 处理心跳

... ...

}

... ...

}

接下来,跟踪进入函数processHeartbeat内部。该函数首先进行一系列异常情况检查,然后更新TaskTracker/Job/Task的状态信息。相关代码如下:

private synchronized boolean processHeartbeat(

TaskTrackerStatus trackerStatus,

boolean initialContact,

long timeStamp) throws UnknownHostException {

... ...

updateTaskStatuses(trackerStatus);    // 更新Task状态信息

updateNodeHealthStatus(trackerStatus, timeStamp); // 更新节点健康状态

... ...

}

2.2 下达命令

更新完状态信息后,JobTracker要为TaskTracker构造一个HeartbeatResponse对象作为心跳应答。该对象主要有两部分内容:下达给TaskTracker的命令和下次汇报心跳的时间间隔。下面分别对它们进行介绍:

1. 下达命令

JobTracker将下达给TaskTracker的命令封装成TaskTrackerAction类,主要包括ReinitTrackerAction(重新初始化)、LauchTaskAction(运行新任务)、KillTaskAction(杀死任务)、KillJobAction(杀死作业)和CommitTaskAction(提交任务)五种。下面依次对这几个命令进行介绍。

1) ReinitTrackerAction

JobTracker收到TaskTracker发送过来的心跳信息后,首先要进行一致性检查,如果发现异常情况,则会要求TaskTracker重新对自己进行初始化,以恢复到一致的状态。当出现以下两种不一致情况时,JobTracker会向TaskTracker下达ReinitTrackerAction命令。

n 丢失上次心跳应答信息:JobTracker会保存向每个TaskTracker发送的最近心跳应答信息,如果JobTracker未刚刚重启且一个TaskTracker并非初次连接JobTracker(initialContact!=true),而最近的心跳应答信息丢失了,则这是一种不一致状态。

n 丢失TaskTracker状态信息:JobTracker接收到任何一个心跳信息后,会将TaskTracker状态(封装在类TaskTrackerStatus中)信息保存起来。如果一个TaskTracker非初次连接JobTracker但状态信息却不存在,则也是一种不一致状态。

相关代码如下:

public class JobTracker {

... ...

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,

boolean restarted,

boolean initialContact,

boolean acceptNewTasks,

short responseId) {

... ...

return new HeartbeatResponse(responseId,

new TaskTrackerAction[] {new ReinitTrackerAction()});

... ...

}

}

2) LauchTaskAction

该类封装了TaskTracker新分配的任务。TaskTracker接收到该命令后会启动一个子进程运行该任务。Hadoop将一个作业分解后的任务分成两大类:计算型任务和辅助型任务。其中,计算型任务是处理实际数据的任务,包括Map Task和Reduce Task两种(对应TaskType类中的MAP和REDUCE两种类型),由专门的任务调度器对它们进行调度;而辅助型任务则不会处理实际的数据,通常用于同步计算型任务或者清理磁盘上无用的目录,包括job-setup task、job-cleanup task和task-cleanup task三种(对应TaskType类中的JOB_SETUP,JOB_CLEANUP和TASK_CLEANUP三种类型),其中,job-setup task和job-cleanup task分别用作计算型任务开始运行同步标识和结束运行同步标识,而task-cleanup task则用于清理失败的计算型任务已经写到磁盘上的部分结果,这种任务由JobTracker负责调度,且运行优先级高于计算型任务。

如果一个正常(不在黑名单中)的TaskTracker尚有空闲slot(acceptNewTasks为true),则JobTracker会为该TaskTracker分配新任务,任务选择顺序是:先辅助型任务,再计算型任务。而对于辅助型任务,选择顺序依次为job-cleanup task、task-cleanup task和job-setup task,具体代码如下:

public class JobTracker {

... ...

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,

boolean restarted,

boolean initialContact,

boolean acceptNewTasks,

short responseId) {

... ...

List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);

// 如果没有辅助型任务,则选择计算型任务

if (tasks == null ) {

// 由任务调度器选择一个或多个计算型任务

tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));

}

if (tasks != null) {

for (Task task : tasks) {

expireLaunchingTasks.addNewTask(task.getTaskID());

if(LOG.isDebugEnabled()) {

LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());

}

// 将分配的任务封装成LauchTAskAction

actions.add(new LaunchTaskAction(task));

}

... ...

}

... ...

}

3) KillTaskAction

该类封装了TaskTracker需杀死的任务。TaskTracker收到该命令后会杀掉对应任务、清理工作目录和释放slot。导致JobTracker向TaskTracker发送该命令的原因有很多,主要包括以下几个场景:

n 用户使用命令“bin/hadoop job -kill-task”或者“bin/hadoop job -fail-task”杀死一个任务或者使一个任务失败。

n 启用推测执行机制后,同一份数据可能同时由两个Task Attempt处理。当其中一个Task Attempt执行成功后,另外一个处理相同数据的Task Attempt将被杀掉。

n 某个作业运行失败,它的所有任务将被杀掉。

n TaskTracker在一定时间内未汇报心跳,则JobTracker认为其死掉,它上面的所有Task均被标注为死亡。

相关代码如下:

public class JobTracker {

... ...

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,

boolean restarted,

boolean initialContact,

boolean acceptNewTasks,

short responseId) {

... ...

// Check for tasks to be killed

List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);

if (killTasksList != null) {

actions.addAll(killTasksList);

}

... ...

}

}

4) KillJobAciton

该类封装了TaskTracker待清理的作业。TaskTracker接收到该命令后,会清理作业的临时目录。导致JobTracker向TaskTracker发送该命令的原因有很多,主要包括以下几个场景:

n 用户使用命令“”或者“”杀死一个作业或者是使一个作业失败。

n 作业运行完成,通知TaskTracker清理该作业的工作目录。

n 作业运行失败,即同一个作业失败的Task数目超过一定比例。

相关代码如下:

public class JobTracker {

... ...

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,

boolean restarted,

boolean initialContact,

boolean acceptNewTasks,

short responseId) {

... ...

// Check for jobs to be killed/cleanedup

List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName);

if (killJobsList != null) {

actions.addAll(killJobsList);

}

... ...

}

}

5) CommitTaskAction

该类封装了TaskTracker需提交的任务。为了防止同一个TaskInProgress的两个同时运行的Task Attempt(比如打开推测执行功能,一个任务可能存在备份任务)同时打开一个文件或者往一个文件中写数据而产生冲突,Hadoop让每个Task Attempt写到单独一个文件(以TaskAttemptID命名,比如attempt_201412031706_0008_r_000000_0)中。通常而言,Hadoop让每个Task Attempt成功运行完成后,再将运算结果转移到最终目录${mapred.output.dir}中。Hadoop将一个成功运行完成的Task Attempt结果文件从临时目录“提升”至最终目录的过程,称为“任务提交”。当TaskInProgress中一个任务被提交后,其他任务将被杀死,同时意味着该TaskInProgress运行完成。相关代码如下:

public class JobTracker {

... ...

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,

boolean restarted,

boolean initialContact,

boolean acceptNewTasks,

short responseId) {

... ...

// Check for tasks whose outputs can be saved

List<TaskTrackerAction> commitTasksList = getTasksToSave(status);

if (commitTasksList != null) {

actions.addAll(commitTasksList);

}

... ...

}

}

2. 调整心跳间隔

TaskTracker心跳时间间隔大小应该适度,如果太小,则JobTracker需要处理高并发的心跳连接请求,必然产生不小的并发压力;如果太大,空闲的资源不能及时汇报给JobTracker(进而为之分配新的Task),造成资源空闲,进而降低系统吞吐率。

TaskTracker汇报心跳的时间间隔并不是一成不变的,它会随着集群规模的动态调整(比如节点死掉或者用户动态添加新节点)而变化,以便能够合理利用JobTracker的并发处理能力。在Hadoop MapReduce中,只有JobTracker知道某一时刻集群的规模,因此由JobTracker为每个TaskTracker计算下一次汇报心跳的时间间隔,并通过心跳机制告诉TaskTracker。

JobTracker允许用户通过参数配置心跳的时间间隔加速比,即每增加mapred.heartbeats.in.second(默认是100,最小是1)个节点,心跳时间间隔增加mapreduce.jobtracker.heartbeats.scaling.factor(默认是1,最小是0.01)秒。同时,为了防止用户参数设置不合理而对JobTracker产生较大负载,JobTracker要示心跳时间间隔至少为3秒。具体计算方法如下:

public class JobTracker implements MRConstants, InterTrackerProtocol,

JobSubmissionProtocol, TaskTrackerManager, RefreshUserMappingsProtocol,

RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol,

JobTrackerMXBean {

... ...

/**

* Calculates next heartbeat interval using cluster size.

* Heartbeat interval is incremented by 1 second for every 100 nodes by default.

* @return next heartbeat interval.

*/

public int getNextHeartbeatInterval() {

// get the no of task trackers

int clusterSize = getClusterStatus().getTaskTrackers();

int heartbeatInterval =  Math.max(

(int)(1000 * HEARTBEATS_SCALING_FACTOR *

Math.ceil((double)clusterSize /NUM_HEARTBEATS_IN_SECOND)),

HEARTBEAT_INTERVAL_MIN) ;

return heartbeatInterval;

}

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,

boolean restarted,

boolean initialContact,

boolean acceptNewTasks,

short responseId) {

... ...

// calculate next heartbeat interval and put in heartbeat response

int nextInterval = getNextHeartbeatInterval();

response.setHeartbeatInterval(nextInterval);

... ...

}

... ...

}

大数据框架hadoop之JobTracker主要功能分析相关推荐

  1. 大数据框架Hadoop篇之Hadoop入门

    1. 写在前面 今天开始,想开启大数据框架学习的一个新系列,之前在学校的时候就会大数据相关技术很是好奇,但苦于没有实践场景,对这些东西并没有什么体会,到公司之后,我越发觉得大数据的相关知识很重要,不管 ...

  2. 大数据学习笔记二:Ubuntu/Debian 下安装大数据框架Hadoop

    文章目录 安装Java 为Hadoop创建用户 安装Hadoop 配置Hadoop 配置环境变量 设置配置文件 格式化namenode 启动hadoop集群 访问hadoop集群 大数据学习系列文章: ...

  3. 五种大数据框架你必须要知道

    学习大数据不可不知的五种大数据框架,码笔记分享大数据框架Hadoop.Storm.Samza.Spark和Flink五种大数据框架详解: 一:Hadoop大数据框架 Hadoop 大数据框架?第一映入 ...

  4. 打怪升级之小白的大数据之旅(四十一)<大数据与Hadoop概述>

    打怪升级之小白的大数据之旅(四十) Hadoop概述 上次回顾 好了,经过了java,mysql,jdbc,maven以及Linux和Shell的洗礼,我们终于开始正式进入大数据阶段的知识了,首先我会 ...

  5. 大数据框架对比:Hadoop、Storm、Samza、Spark和Flink——flink支持SQL,待看

    简介 大数据是收集.整理.处理大容量数据集,并从中获得见解所需的非传统战略和技术的总称.虽然处理数据所需的计算能力或存储容量早已超过一台计算机的上限,但这种计算类型的普遍性.规模,以及价值在最近几年才 ...

  6. 细细品味大数据--初识hadoop

    初识hadoop 前言 之前在学校的时候一直就想学习大数据方面的技术,包括hadoop和机器学习啊什么的,但是归根结底就是因为自己太懒了,导致没有坚持多长时间,加上一直为offer做准备,所以当时重心 ...

  7. 大数据和hadoop的一些基础知识

    一.前言 大数据这个概念不用我提大家也听过很多了,前几年各种公开论坛.会议等场合言必及大数据,说出来显得很时髦似的.有意思的是最近拥有这个待遇的名词是"人工智能/AI",当然这是后 ...

  8. GitChat · 大数据 | 一步一步学习大数据:Hadoop 生态系统与场景

    目录(?)[-] Hadoop概要 Hadoop相关组件介绍 HDFS Yarn Hive HBase Spark Other Tools Hadoop集群硬件和拓扑规划 硬件配置 软件配置 Hado ...

  9. 【快速入门大数据】hadoop和它的hdfs、yarn、mapreduce

    文章目录 导学 大数据概述 初识Hadoop 概述 核心组件 HDFS分布式文件系统 资源调度系统YARN MapReduce 优势 发展史 生态系统 发行版本选择 企业应用案例 第3章 分布式文件系 ...

最新文章

  1. 北电PBX资料_LD 11數位電話機設定
  2. mysql 修改密码演练
  3. java基础经典面试题10道
  4. Educational Codeforces Round 7
  5. 面试记录:冒泡排序都不会,大哥你会编程吗
  6. Gartner预测公有云将迎来“双头垄断”局面
  7. c语言编写keil 设置memory model的编辑器,keil C51的Memory Model 说明[三种Model的选择对编译的影响]】...
  8. 模式识别实验报告--贝叶斯分类器设计
  9. 如何在visio中插入矩阵公式
  10. matlab h系统控制器,Matlab的H_inf鲁棒控制器的设计.pdf
  11. 矩阵按层级内容排序——Power BI
  12. 基于springboot,vue旅游信息推荐系统
  13. 软件工程——2021软科中国大学专业排名
  14. banner图第三版
  15. 游戏直播视频太大了,怎么一键压缩视频?--QVE视频压缩
  16. 对于软件,我是认真的
  17. 手机照片删除了还能找回来吗?高手就是高手,精彩
  18. GridView使用【GridViewHelper】分组统计
  19. 用计算机视觉描述机器人,一文读懂计算机视觉和机器人视觉
  20. 论文邮箱不是导师的_为什么你迟迟收不到研究生导师的回复邮件?

热门文章

  1. python + 数学公式 + 图像 表白 LOVE YOU❤
  2. 记录一个使用imgkit库转图片在windows上可能会出现的问题 iis OSError: [WinError 6] 句柄无效
  3. VGA带音频转HDMI转换芯片|VGA转HDMI 转换器方案|VGA转HDMI1.4转换器芯片介绍
  4. 如何利用vga接口的显示器做笔记本的副屏
  5. 哔哩哔哩 B站挂了!完了,5个9可靠性,超时了!
  6. Silverlight游戏设计(Game Design):(十三)帝国时代II (Demo) 之 “战争艺术”②
  7. pause()与sigsuspend()的用法
  8. 有没有游泳可以戴的耳机、防水耳机能戴着游泳
  9. 房屋检测中七大类别的适用范围是什么?
  10. 学python可以改善思维_基于培养思维能力的Python语言程序设计教学