[记录点滴] 小心 Hadoop Speculative 调度策略

文章目录

  • [记录点滴] 小心 Hadoop Speculative 调度策略
    • 0x00 摘要
    • 0x01 缘由
    • 0x02 代码示例
    • 0x03 排查过程
    • 0x04 Speculative execution
      • 4.1 掉队者
      • 4.2 推测执行
      • 4.3 问题所在
    • 0x05 问题解决
    • 0x06 map编写注意点
    • [0x07] 源码分析
    • 0xFF 参考

0x00 摘要

本文从一个bug入手,为大家展示Hadoop Speculative机制,以及编写mapreduce程序的注意点。

0x01 缘由

一个小弟来找我帮忙调试一个问题。

项目大概:用C++编写 map reduce程序,mapper会解析大文件为很多小文件。每个小文件名字是唯一确定的(小文件名字 = 大文件名字 + index),为了调试,文件名字又加入了生成文件的时间戳。

问题描述:程序运行成功,突然发现有很多“同名+不同时间戳”的文件。

这个小弟完全懵圈了,看代码没有任何异常,但就是同一个文件被生成多次。只能找我帮忙看看。

0x02 代码示例

map程序示例如下(仅仅是示例)

int main(int argc, char** argv) {string key;while(cin >> key) {key是文件名字,解析这个大文件为若干小文件,然后存储cout << key << "/t" << "1" << endl;}return 0;
}

reduce.cpp程序如下:

int main(int argc, char** argv) {string key, num;map<string, int> count; map<string, int>::iterator it;while(cin >> key >> num) {it = count.find(key);if(it != count.end()) {it->second++;}else {count.insert(make_pair(key, 1));}}for(it = count.begin(); it != count.end(); it++) {cout << it->first << "/t" << it->second << endl;}return 0;
}

看到这里,有的兄弟会提出疑问,这不就是wordcount的reduce代码嘛!

是的,这个就是wordcount的reduce代码。其实这个项目的reducer没有什么业务逻辑意义,业务完全在map程序中执行,或者说业务就是map的副产品而已。而这个就是最终问题所在

0x03 排查过程

其实从这个问题原因看,就是大致定位出问题点也没法找出最终原因。我们当时只能定位如下:

  • 从程序执行来看,map / reduce 没有任何问题。
  • 从文件名字和log看,同一个大文件被两个map node处理了,所以输出了“同名+不同时间戳”的文件。。
  • 但是从reduce结果看,“两个map node” 中有一个node没有输出(这里指的是输出到Reducer)

因为无法最终定位,所以只能从配置项,官方文档和网上资料中查找。

我们开始怀疑是Hadoop有备份或者高可用机制,这样就会启动多个task,但这样没法解释为啥有一个map node参与处理却没有参与最后输出。

最后觉得配置中有一个speculative.execution的字样平时没怎么关注,而且看起来和执行相关。于是就去查找,结果发现就是这家伙造成的。

0x04 Speculative execution

4.1 掉队者

作业(job)提交时,会被map-reduce 框架的 JobTracker 拆成一系列的map任务、reduce任务在整个hadoop 集群的机器上执行。

由于一些原因,可能是硬件老化,软件层面的不恰当配置,程序Bug,负载不均或者其他的一些问题,导致在一个JOB下的多个TASK速度不一致,比如有的任务已经完成,但是有些任务可能只跑了10%,根据木桶原理,这些任务将成为整个JOB的短板。

Straggle(掉队者)是指那些跑的很慢但最终会成功完成的任务。一个掉队的Map任务会阻止Reduce任务开始执行。

4.2 推测执行

Hadoop不会尝试去诊断或者修复这些Straggle,但是可以识别那些跑的比较慢的任务。它会在集群的其他节点上去启动这些慢任务的多个实例作为备份,这就是hadoop的推测执行(speculative execution)。

如果集群启动了推测执行,这时为了最大限度的提高短板,Hadoop会为该task启动备份任务,让speculative task与原始task同时处理一份数据,哪个先运行完,则将谁的结果作为最终结果,并且在运行完成后Kill掉另外一个任务

推测执行(Speculative Execution)是通过利用更多的资源来换取时间的一种优化策略。

4.3 问题所在

我们这个bug就是因为推测执行造成的。

Hadoop发现有的任务执行慢,就启动了备份任务。这时候两个任务都对同样的文件进行解析,生成新的文件。这样文件就被重复生成了。但是因为推测执行机制,最终只有一个任务顺利完成,这就是为什么reducer只看到一个任务执行完成。

0x05 问题解决

MapReduce任务有两个参数可以控制Speculative Task:

  • mapred.map.tasks.speculative.execution: mapper阶段是否开启推测执行
  • mapred.reduce.tasks.speculative.execution: reducer阶段是否开启推测执行

这两个参数默认都为true。

所以我们直接修改mapred-site.xml,如下:

<property><name>mapreduce.map.speculative</name><value>false</value>
</property><property><name>mapreduce.reduce.speculative</name><value>false</value>
</property>

则没有重复文件输出。

0x06 map编写注意点

map函数应该是幂等的,即同样的输入,如果map执行到一半退出,在另外一个节点重试这个map任务,则应该得到同样的业务逻辑和业务输出。

因此,编写map / reduce程序要特别小心,特别是和外部资源如数据库/文件相关时候。这时候如果误用了Speculative Task,则很容易发生重复读/写,产生异常。同时又额外消耗了节点资源。

[0x07] 源码分析

以下 hadoop 源码分析均摘录于 Hadoop2.6.0运行mapreduce之推断(speculative)执行(上)。

这部分源码在 org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator

maybeScheduleASpeculation用于计算map或者reduce任务推断调度的可能性。

  • maybeScheduleASpeculation方法首先根据当前Task的类型(map或reduce)获取相应类型任务的需要分配Container数量的缓存containerNeeds,然后遍历containerNeeds。
  • 遍历containerNeeds的执行步骤如下:
    1. 如果当前Job依然有未分配Container的Task,那么跳过当前循环,继续下一次循环。这说明如果当前Job的某一类型的Task依然存在未分配Container的,则不会进行任务推断;
    2. 从当前应用的上下文AppContext中获取Job,并获取此Job的所有的Task(map或者reduce);
    3. 计算允许执行推断的Task数量numberAllowedSpeculativeTasks(map或者reduce)。其中MINIMUM_ALLOWED_SPECULATIVE_TASKS的值是10,PROPORTION_TOTAL_TASKS_SPECULATABLE的值是0.01。numberAllowedSpeculativeTasks取MINIMUM_ALLOWED_SPECULATIVE_TASKS与PROPORTION_TOTAL_TASKS_SPECULATABLE*任务数量之积之间的最大值。因此我们知道,当Job的某一类型(map或者reduce)的Task的数量小于1100时,计算得到的numberAllowedSpeculativeTasks等于10,如果Job的某一类型(map或者reduce)的Task的数量大于等于1100时,numberAllowedSpeculativeTasks才会大于10。numberAllowedSpeculativeTasks变量可以有效防止大量任务同时启动备份任务所造成的资源浪费。
    4. 遍历Job对应的map任务或者reduce任务集合,调用speculationValue方法获取每一个Task的推断值。并在迭代完所有的map任务或者reduce任务后,获取这一任务集合中的推断值bestSpeculationValue最大的任务ID。
    5. 再次计算numberAllowedSpeculativeTasks,其中PROPORTION_RUNNING_TASKS_SPECULATABLE的值等于0.1,numberRunningTasks是处于运行中的Task。numberAllowedSpeculativeTasks取numberAllowedSpeculativeTasks与PROPORTION_RUNNING_TASKS_SPECULATABLE*numberRunningTasks之积之间的最大值。因此我们知道当Job的某一类型(map或者reduce)的正在运行中的Task的数量小于110时(假设第3步得到的numberAllowedSpeculativeTasks等于10),计算得到的numberAllowedSpeculativeTasks等于10,如果Job的某一类型(map或者reduce)的正在运行中的Task的数量大于等于110时,numberAllowedSpeculativeTasks才会大于10。
    6. 如果numberAllowedSpeculativeTasks大于numberSpeculationsAlready(已经推断执行过的Task数量),则调用addSpeculativeAttempt方法将第4步中选出的任务的任务ID添加到推断尝试中。
private int maybeScheduleASpeculation(TaskType type) {int successes = 0;long now = clock.getTime();//根据当前Task的类型(map或reduce)获取相应类型任务的需要分配Container数量的缓存containerNeedsConcurrentMap<JobId, AtomicInteger> containerNeeds= type == TaskType.MAP ? mapContainerNeeds : reduceContainerNeeds;//遍历containerNeedsfor (ConcurrentMap.Entry<JobId, AtomicInteger> jobEntry : containerNeeds.entrySet()) {// This race conditon is okay.  If we skip a speculation attempt we//  should have tried because the event that lowers the number of//  containers needed to zero hasn't come through, it will next time.// Also, if we miss the fact that the number of containers needed was//  zero but increased due to a failure it's not too bad to launch one//  container prematurely.//如果当前Job依然有未分配Container的Task,那么跳过当前循环,继续下一次循环。这说明如果当前Job的某一类型的Task依然存在未分配Container的,则不会进行任务推断;if (jobEntry.getValue().get() > 0) {continue;}int numberSpeculationsAlready = 0;int numberRunningTasks = 0;// loop through the tasks of the kind//从当前应用的上下文AppContext中获取Job,并获取此Job的所有的Task(map或者reduce)Job job = context.getJob(jobEntry.getKey());Map<TaskId, Task> tasks = job.getTasks(type);//计算允许执行推断的Task数量numberAllowedSpeculativeTasks(map或者reduce)int numberAllowedSpeculativeTasks= (int) Math.max(minimumAllowedSpeculativeTasks,proportionTotalTasksSpeculatable * tasks.size());TaskId bestTaskID = null;long bestSpeculationValue = -1L;// this loop is potentially pricey.// TODO track the tasks that are potentially worth looking at//遍历Job对应的map任务或者reduce任务集合,在迭代完所有的map任务或者reduce任务后,获取这一任务集合中的推断值bestSpeculationValue最大的任务ID。for (Map.Entry<TaskId, Task> taskEntry : tasks.entrySet()) {//调用speculationValue方法获取每一个Task的推断值。long mySpeculationValue = speculationValue(taskEntry.getKey(), now);if (mySpeculationValue == ALREADY_SPECULATING) {++numberSpeculationsAlready;}if (mySpeculationValue != NOT_RUNNING) {++numberRunningTasks;}//获取这一任务集合中的推断值bestSpeculationValue最大的任务IDif (mySpeculationValue > bestSpeculationValue) {bestTaskID = taskEntry.getKey();bestSpeculationValue = mySpeculationValue;}}//再次计算numberAllowedSpeculativeTasksnumberAllowedSpeculativeTasks= (int) Math.max(numberAllowedSpeculativeTasks,proportionRunningTasksSpeculatable * numberRunningTasks);// If we found a speculation target, fire it off//如果numberAllowedSpeculativeTasks大于numberSpeculationsAlready(已经推断执行过的Task数量),则调用addSpeculativeAttempt方法将第4步中选出的任务的任务ID添加到推断尝试中。if (bestTaskID != null&& numberAllowedSpeculativeTasks > numberSpeculationsAlready) {addSpeculativeAttempt(bestTaskID);++successes;}}return successes;
}

speculationValue方法主要用于估算每个任务的推断值。主要是“备份任务实例运行”的结束时间 和 “原任务实例的结束时间” 的差值,越大则调度执行的价值越大。

speculationValue方法的执行步骤如下:

  1. 如果任务还没有被推断执行,那么调用estimator的thresholdRuntime方法获取任务可以接受的运行时长acceptableRuntime。如果acceptableRuntime等于Long.MAX_VALUE,则将ON_SCHEDULE作为返回值,ON_SCHEDULE的值是Long.MIN_VALUE,以此表示当前任务的推断值很小,即被推断尝试的可能最小。
  2. 如果任务的运行实例数大于1,则说明此任务已经发生了推断执行,因此返回ALREADY_SPECULATING。ALREADY_SPECULATING等于Long.MIN_VALUE + 1。
  3. 调用estimator的estimatedRuntime方法获取任务运行实例的估算运行时长estimatedRunTime。
  4. 调用estimator的attemptEnrolledTime方法获取任务实例开始运行的时间,此时间即为startTimes中缓存的start。这个值是在任务实例启动时导致DefaultSpeculator的processSpeculatorEvent方法处理Speculator.EventType.ATTEMPT_START类型的SpeculatorEvent事件时保存的。
  5. estimatedEndTime表示估算任务实例的运行结束时间,estimatedEndTime = estimatedRunTime + taskAttemptStartTime。
  6. 调用estimator的estimatedNewAttemptRuntime方法估算如果此时重新为任务启动一个实例,此实例运行结束的时间estimatedReplacementEndTime。
  7. 如果缓存中没有任务实例的历史统计信息,那么将estimatedRunTime、任务实例进度progress,当前时间封装为历史统计信息缓存起来。
  8. 如果缓存中存在任务实例的历史统计信息,如果缓存的estimatedRunTime和本次估算的estimatedRunTime一样并且缓存的实例进度progress和本次获取的任务实例进度progress一样,说明有一段时间没有收到心跳了,则模拟一次心跳。如果缓存的estimatedRunTime和本次估算的estimatedRunTime不一样或者缓存的实例进度progress和本次获取的任务实例进度progress不一样,那么将estimatedRunTime、任务实例进度progress,当前时间更新到任务实例的历史统计信息中。
  9. 如果estimatedEndTime小于当前时间,则说明任务实例的进度良好,返回PROGRESS_IS_GOOD,PROGRESS_IS_GOOD等于Long.MIN_VALUE + 3。
  10. 如果estimatedReplacementEndTime大于等于estimatedEndTime,则说明即便启动备份任务实例也无济于事,因为它的结束时间达不到节省作业总运行时长的作用。
  11. 计算本次估算的结果值result,它等于estimatedEndTime - estimatedReplacementEndTime,当这个差值越大表示备份任务实例运行后比原任务实例的结束时间就越早,因此调度执行的价值越大。
  12. 如果numberRunningAttempts等于0,则表示当前任务还没有启动任务实例,返回NOT_RUNNING,NOT_RUNNING等于Long.MIN_VALUE + 4。
  13. 重新计算acceptableRuntime,处理方式与第1步相同。
  14. 返回result。
private long speculationValue(TaskId taskID, long now) {Job job = context.getJob(taskID.getJobId());Task task = job.getTask(taskID);Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();long acceptableRuntime = Long.MIN_VALUE;long result = Long.MIN_VALUE;//如果任务还没有被推断执行,那么调用estimator的thresholdRuntime方法获取任务可以接受的运行时长acceptableRuntimeif (!mayHaveSpeculated.contains(taskID)) {acceptableRuntime = estimator.thresholdRuntime(taskID);if (acceptableRuntime == Long.MAX_VALUE) {return ON_SCHEDULE;}}TaskAttemptId runningTaskAttemptID = null;int numberRunningAttempts = 0;for (TaskAttempt taskAttempt : attempts.values()) {if (taskAttempt.getState() == TaskAttemptState.RUNNING|| taskAttempt.getState() == TaskAttemptState.STARTING) {//如果任务的运行实例数大于1,则说明此任务已经发生了推断执行,因此返回ALREADY_SPECULATING。if (++numberRunningAttempts > 1) {return ALREADY_SPECULATING;}runningTaskAttemptID = taskAttempt.getID();//调用estimator的estimatedRuntime方法获取任务运行实例的估算运行时长estimatedRunTime。long estimatedRunTime = estimator.estimatedRuntime(runningTaskAttemptID);//调用estimator的attemptEnrolledTime方法获取任务实例开始运行的时间,此时间即为startTimes中缓存的start。long taskAttemptStartTime= estimator.attemptEnrolledTime(runningTaskAttemptID);if (taskAttemptStartTime > now) {// This background process ran before we could process the task//  attempt status change that chronicles the attempt startreturn TOO_NEW;}//estimatedEndTime表示估算任务实例的运行结束时间long estimatedEndTime = estimatedRunTime + taskAttemptStartTime;//调用estimator的estimatedNewAttemptRuntime方法估算如果此时重新为任务启动一个实例,此实例运行结束的时间estimatedReplacementEndTime。long estimatedReplacementEndTime= now + estimator.estimatedNewAttemptRuntime(taskID);float progress = taskAttempt.getProgress();TaskAttemptHistoryStatistics data =runningTaskAttemptStatistics.get(runningTaskAttemptID);if (data == null) {//如果缓存中没有任务实例的历史统计信息,那么将estimatedRunTime、任务实例进度progress,当前时间封装为历史统计信息缓存起来。runningTaskAttemptStatistics.put(runningTaskAttemptID,new TaskAttemptHistoryStatistics(estimatedRunTime, progress, now));} else {//如果缓存中存在任务实例的历史统计信息if (estimatedRunTime == data.getEstimatedRunTime()&& progress == data.getProgress()) {//如果缓存的estimatedRunTime和本次估算的estimatedRunTime一样并且缓存的实例进度progress和本次获取的任务实例进度progress一样// Previous stats are same as same statsif (data.notHeartbeatedInAWhile(now)|| estimator.hasStagnatedProgress(runningTaskAttemptID, now)) {// Stats have stagnated for a while, simulate heart-beat.TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();taskAttemptStatus.id = runningTaskAttemptID;taskAttemptStatus.progress = progress;taskAttemptStatus.taskState = taskAttempt.getState();// Now simulate the heart-beat//说明有一段时间没有收到心跳了,则模拟一次心跳。handleAttempt(taskAttemptStatus);}} else {//如果缓存的estimatedRunTime和本次估算的estimatedRunTime不一样或者缓存的实例进度progress和本次获取的任务实例进度progress不一样// Stats have changed - update our data structure//将estimatedRunTime、任务实例进度progress,当前时间更新到任务实例的历史统计信息中。data.setEstimatedRunTime(estimatedRunTime);data.setProgress(progress);data.resetHeartBeatTime(now);}}//如果estimatedEndTime小于当前时间,则说明任务实例的进度良好,返回PROGRESS_IS_GOOD,PROGRESS_IS_GOOD等于Long.MIN_VALUE + 3。if (estimatedEndTime < now) {return PROGRESS_IS_GOOD;}//如果estimatedReplacementEndTime大于等于estimatedEndTime,则说明即便启动备份任务实例也无济于事,因为它的结束时间达不到节省作业总运行时长的作用。if (estimatedReplacementEndTime >= estimatedEndTime) {return TOO_LATE_TO_SPECULATE;}//计算本次估算的结果值result,它等于estimatedEndTime - estimatedReplacementEndTime,当这个差值越大表示备份任务实例运行后比原任务实例的结束时间就越早,因此调度执行的价值越大。result = estimatedEndTime - estimatedReplacementEndTime;}}// If we are here, there's at most one task attempt.if (numberRunningAttempts == 0) {//如果numberRunningAttempts等于0,则表示当前任务还没有启动任务实例,返回NOT_RUNNING,NOT_RUNNING等于Long.MIN_VALUE + 4。return NOT_RUNNING;}//重新计算acceptableRuntime,处理方式与第1步相同。if (acceptableRuntime == Long.MIN_VALUE) {acceptableRuntime = estimator.thresholdRuntime(taskID);if (acceptableRuntime == Long.MAX_VALUE) {return ON_SCHEDULE;}}return result;
}

0xFF 参考

Hadoop中Speculative Task调度策略

Hadoop之推测执行

Hadoop map-reduce的 speculative execution(推测执行)

Hadoop2.6.0运行mapreduce之推断(speculative)执行(上)

★★★★★★关于生活和技术的思考★★★★★★
微信公众账号:罗西的思考
如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,可以扫描下面二维码(或者长按识别二维码)关注个人公众号)。

[记录点滴] 小心 Hadoop Speculative 调度策略相关推荐

  1. HBase中此类异常解决记录org.apache.hadoop.ipc.RemoteException(java.io.IOException):

    HBase中此类异常解决记录org.apache.hadoop.ipc.RemoteException(java.io.IOException): 参考文章: (1)HBase中此类异常解决记录org ...

  2. 记录点滴27(回家的诱惑)

    记录点滴27 2012年11月5日星期一 大学三年级 第一学期第9周结束第10周开始 第九周的记录内容可能会比以往要丰富一点,在学校的生活基本上就是三点一线,课室.饭堂.宿舍的三点一线,没有任何新意, ...

  3. 去公司面试,记录下的hadoop最新面试题

    转载自:hadoop面试题 1.简要描述如何安装配置一个apache开源版hadoop,描述即可,列出步骤更好 1) 安装JDK并配置环境变量(/etc/profile) 2) 关闭防火墙 3) 配置 ...

  4. 记录一次hadoop的空间清理

    使用CDH遇到机器并没有跑什么生产项目,但是产生了将近300G/每个节点的数据. 过程中使用hadoop fs -du -h / 来查看hadoop根目录下文件的空间占用量,在这里看到spark占用了 ...

  5. 记录docker开发hadoop,解决bug Datanode denied communication with namenode because hostname cannot be

    使用docker快速开发了一个单节点的hadoop 首先去docker hub上搜索hadoop 使用hadoop3.1.3的TAG即可,分别拉去datanode和namenode的镜像到本地 官方提 ...

  6. 基于Django实现Linux运维管理平台的整个实现过程和各种API接口调用以及Echarts绘图项目介绍(一)记录点滴生活

    基于Django实现Linux运维管理平台整个实现过程和各种API接口调用以及Echarts绘图的使用介绍 项目内容涉及技术直通车: 我的项目仓库:MyGitHub https://github.co ...

  7. 【分享】5款记录点滴的App,人生路上的轨迹一定要珍藏~

    生活永远是让人陷入无尽思考的代名词,试问一下,我们是否对它抱怨过,期待过,那些生活中的一些小事带给我们的感动我们还是否记得,我们是否对自己来到这个世界有更深刻的理解,这里推荐5款App,让我们随时随地 ...

  8. 2012毕业找工作记录点滴

    从2011年9月18日找工作至今,一个多月的奔波总算告一段落,签了经过综合考虑,自己还算比较满意的公司,除此外,手里也拿了若干个offer, 相对于其它同学来说,或许可以称得上"小牛&quo ...

  9. Win API记录点滴

    这里记叙了在Windows下GDI编程的一些函数,用GDI函数可以有效的控制视窗的显示.尤其涉及到Windows图形编程,GDI函数将非常有用.    关于种种函数,在<Windows程序设计& ...

最新文章

  1. 计算机专业考研可以转专业不,学术可以转专业吗考研
  2. 从RocketMQ看长轮询(Long Polling)
  3. 重庆理工大学国际学院计算机图形学试题,哈尔滨理工大学-第一学期考试试题答案B卷考试.doc...
  4. win10易升_微软Win10版本20H2正式版官方ISO镜像下载大全_windows10_Windows系列_操作系统...
  5. 三维网格精简算法java版_ISMAR 2020 | 商汤提出手机端实时单目三维重建系统
  6. 【jQuery笔记】新浪微博案例笔记
  7. python基础课程第12章_流畅的python学习笔记-第12章
  8. (4)FPGA面试技能提升篇(数字信号处理基础)
  9. Idea创建一个springboot多模块项目
  10. 大型企业Exchange 2010部署方案 – 分割全局地址列表
  11. 计算机效果图线稿的制作方法,如何只用PS将线稿图变成高大上的效果图?
  12. Excel中无法查找和替换
  13. 站长咪咪网整理的Linux命令大全
  14. 互联网创业公司是否需要技术外包?
  15. 一名Java大佬跳槽之旅,离开京东,14面面试经验和收获
  16. 软件工程第四次作业 石墨文档IOS
  17. 微软最爽命令行工具将成 Win11 默认终端
  18. petalinux添加AD9361驱动
  19. CTFHub 工控组态分析 WP
  20. 网络游戏装备是计算机数据,DNF装备搭配计算器_17173DNF专区_17173.com中国游戏门户站...

热门文章

  1. 微信小程序获取并修改app.js中的值
  2. 净化自己的内心,扫除内心的尘埃
  3. Youtube到底怎么读?你读对了吗?
  4. Microsoft Teams通话质量仪表盘(CQD)怎么玩?
  5. 史上最强吃鸡电脑配件更新!这张显卡都能买一套房子了
  6. Masking Adversarial Damage: Finding Adversarial Saliency for Robust and Sparse Network
  7. java jws web_java 用jws发布一个简单的webservice
  8. 鸿蒙应用开发培训笔记01:HarmonyOS介绍
  9. CodeForces 954A Diagonal Walking
  10. Win7和Win10操作系统优劣对比,看完你就懂了!