概述

推测任务是指对于一个Stage里面拖后腿的Task,会在其他节点的Executor上再次启动这个task,如果其中一个Task实例运行成功则将这个最先完成的Task的计算结果作为最终结果,同时会干掉其他Executor上运行的实例。spark推测式执行默认是关闭的,可通过spark.speculation属性来开启。

检测是否有需要推测式执行的Task

在SparkContext创建了schedulerBackend和taskScheduler后,立即调用了taskScheduler 的start方法:

override def start() {backend.start()if (!isLocal && conf.getBoolean("spark.speculation", false)) {logInfo("Starting speculative execution thread")speculationScheduler.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryOrStopSparkContext(sc) {checkSpeculatableTasks()}}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)}}

可以看到,TaskScheduler在启动SchedulerBackend后,在非local模式前提下检查推测式执行功能是否开启(默认关闭,可通过spark.speculation开启),若开启则会启动一个线程每隔SPECULATION_INTERVAL_MS(默认100ms,可通过spark.speculation.interval属性设置)通过checkSpeculatableTasks方法检测是否有需要推测式执行的tasks:

// Check for speculatable tasks in all our active jobs.def checkSpeculatableTasks() {var shouldRevive = falsesynchronized {shouldRevive = rootPool.checkSpeculatableTasks()}if (shouldRevive) {backend.reviveOffers()}}

然后又通过rootPool的方法判断是否有需要推测式执行的tasks,若有则会调用SchedulerBackend的reviveOffers去尝试拿资源运行推测任务。继续看看检测逻辑是什么样的:

override def checkSpeculatableTasks(): Boolean = {var shouldRevive = falsefor (schedulable <- schedulableQueue.asScala) {shouldRevive |= schedulable.checkSpeculatableTasks()}shouldRevive}

在rootPool里又调用了schedulable的方法,schedulable是ConcurrentLinkedQueue[Schedulable]类型,队列里面放的都是TaskSetMagager,再看TaskSetMagager的checkSpeculatableTasks方法,终于找到检测根源了:

 override def checkSpeculatableTasks(): Boolean = { // 如果task只有一个或者所有task都不需要再执行了就没有必要再检测if (isZombie || numTasks == 1) {  return false}var foundTasks = false // 所有task数 * SPECULATION_QUANTILE(默认0.75,可通过spark.speculation.quantile设置) val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toIntlogDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation) // 成功的task数是否超过总数的75%,并且成功的task是否大于0if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {val time = clock.getTimeMillis() // 过滤出成功执行的task的执行时间并排序val durations = taskInfos.values.filter(_.successful).map(_.duration).toArrayArrays.sort(durations) // 取这多个时间的中位数val medianDuration = durations(min((0.5 * tasksSuccessful).round.toInt, durations.length - 1)) // 中位数 * SPECULATION_MULTIPLIER (默认1.5,可通过spark.speculation.multiplier设置)val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 100)logDebug("Task length threshold for speculation: " + threshold) // 遍历该TaskSet中的task,取未成功执行、正在执行、执行时间已经大于threshold 、 // 推测式执行task列表中未包括的task放进需要推测式执行的列表中speculatableTasksfor ((tid, info) <- taskInfos) {val index = info.indexif (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&!speculatableTasks.contains(index)) {logInfo("Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms".format(index, taskSet.id, info.host, threshold))speculatableTasks += indexfoundTasks = true}}}foundTasks}

检查逻辑代码中注释很明白,当成功的Task数超过总Task数的75%(可通过参数spark.speculation.quantile设置)时,再统计所有成功的Tasks的运行时间,得到一个中位数,用这个中位数乘以1.5(可通过参数spark.speculation.multiplier控制)得到运行时间门限,如果在运行的Tasks的运行时间超过这个门限,则对它启用推测。简单来说就是对那些拖慢整体进度的Tasks启用推测,以加速整个Stage的运行。
算法大致流程如图:

推测式任务什么时候被调度

在TaskSetMagager在延迟调度策略下为一个executor分配一个task时会调用dequeueTask方法:

private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] ={for (index <- dequeueTaskFromList(execId, getPendingTasksForExecutor(execId))) {return Some((index, TaskLocality.PROCESS_LOCAL, false))}if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {for (index <- dequeueTaskFromList(execId, getPendingTasksForHost(host))) {return Some((index, TaskLocality.NODE_LOCAL, false))}}......// find a speculative task if all others tasks have been scheduleddequeueSpeculativeTask(execId, host, maxLocality).map {case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}}

该方法的最后一段就是在其他任务都被调度后为推测式任务进行调度,看看起实现:

protected def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value): Option[(Int, TaskLocality.Value)] ={// 从推测式执行任务列表中移除已经成功完成的task,因为从检测到调度之间还有一段时间,// 某些task已经成功执行speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set// 判断task是否可以在该executor对应的Host上执行,判断条件是:// task没有在该host上运行;// 该executor没有在task的黑名单里面(task在这个executor上失败过,并还在'黑暗'时间内)def canRunOnHost(index: Int): Boolean =!hasAttemptOnHost(index, host) && !executorIsBlacklisted(execId, index)if (!speculatableTasks.isEmpty) {// 获取能在该executor上启动的taskIndexfor (index <- speculatableTasks if canRunOnHost(index)) {// 获取task的优先位置val prefs = tasks(index).preferredLocations val executors = prefs.flatMap(_ match {case e: ExecutorCacheTaskLocation => Some(e.executorId)case _ => None});// 优先位置若为ExecutorCacheTaskLocation并且数据所在executor包含当前executor,// 则返回其task在taskSet的index和Locality Levelsif (executors.contains(execId)) {speculatableTasks -= indexreturn Some((index, TaskLocality.PROCESS_LOCAL))}}// 这里的判断是延迟调度的作用,即使是推测式任务也尽量以最好的本地性级别来启动if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {for (index <- speculatableTasks if canRunOnHost(index)) {val locations = tasks(index).preferredLocations.map(_.host)if (locations.contains(host)) {speculatableTasks -= indexreturn Some((index, TaskLocality.NODE_LOCAL))}}}........}None}

代码太长只列了前面一部分,不过都是类似的逻辑,代码中注释也很清晰。先过滤掉已经成功执行的task,另外,推测执行task不在和正在执行的task同一Host执行,不在黑名单executor里执行,然后在延迟调度策略下根据task的优先位置来决定是否在该executor上以某种本地性级别被调度执行。

[spark] spark推测式执行相关推荐

  1. 从Storm和Spark 学习流式实时分布式计算的设计

    转自:http://www.dataguru.cn/thread-341168-1-1.html 流式实时分布式计算系统在互联网公司占有举足轻重的地位,尤其在在线和近线的海量数据处理上.而处理这些海量 ...

  2. Spark Steaming流式日志过滤与分析

    Spark Steaming流式日志过滤与分析 这篇大概讲的是 spark steaming 监听 hdfs 的某个目录,当你在终端A使用 spark-submit 运行 Log2DB.py 文件后, ...

  3. Spark架构与作业执行流程简介

    2019独角兽企业重金招聘Python工程师标准>>> Spark架构与作业执行流程简介 博客分类: spark Local模式 运行Spark最简单的方法是通过Local模式(即伪 ...

  4. Spark——Spark概述

    一.Spark是什么 二.Spark and Hadoop 在之前的学习中,Hadoop的MapReduce是大家广为熟知的计算框架,那为什么咱们还要学习新的计算框架Spark呢,这里就不得不提到Sp ...

  5. [Spark]Spark Streaming 指南四 输入DStreams和Receivers

    1. 输入DStream与Receiver 输入DStreams表示从源中获取输入数据流的DStreams.在指南一示例中,lines表示输入DStream,它代表从netcat服务器获取的数据流.每 ...

  6. 推测式提问,先“复述你对别人的话语理解”

    我们要意识到沟通的过程中,我们的话语含义常常会被误解或者曲解,不论是你的意思,还是别人的意思,都一样.这种误读往往造成我们彼此之间深深的误会,还不清楚哪里出了问题.    所以我们建议,在沟通过程中使 ...

  7. Spark转换算子和执行算子

    在Spark编程模式下,所有的操作被分为转换(transformation)和执行(action)两种. 一般来说,转换操作是对一个数据集里的所有记录执行某种函数,从而使记录发生改变;而执行通常是运行 ...

  8. Spark Streaming 流式计算实战

    这篇文章由一次平安夜的微信分享整理而来.在Stuq 做的分享,原文内容. 业务场景 这次分享会比较实战些.具体业务场景描述: 我们每分钟会有几百万条的日志进入系统,我们希望根据日志提取出时间以及用户名 ...

  9. 大数据学习系列----基于Spark Streaming流式计算

    2019独角兽企业重金招聘Python工程师标准>>> 个性化的需求 随着互联网知识信息指数级膨胀,个性化的需求对于用户来说越来越重要,通过推荐算法和用户点击行为的流式计算可以很简单 ...

  10. Spark Streaming中流式计算的困境与解决之道

    Spark streaming  在各种流程处理框架生态中占着举足轻重的位置, 但是不可避免地也会面对网络波动带来的数据延迟的问题,所以必须要进行增量数据的累加. 在更新Spark 应用的时候或者其他 ...

最新文章

  1. Vue开发跨端应用(四)electron发布web应用并打包app
  2. 高可靠芯片搭配视觉演算法,影像式ADAS满足车规要求
  3. 深入学习keepalived之一 keepalived的启动
  4. js 字符串加减法_JavaScript (+) 加法运算符
  5. javaweb----DAO模型设计
  6. R 读取excel的方法
  7. php获取html中文本框内容_小猿圈Python入门之批量获取html内body内容的方法
  8. Delphi 7学习开发控件
  9. 按钮灭了_劣质灭火器整瓶都灭不掉一盆火 教你几招辨别消防产品真假
  10. 脚本、脚本语言、写脚本都是什么呀???
  11. 调试铁通与联通专线遇到的问题
  12. 2021年江阴各高中高考成绩查询,江阴高考,全市12所高中高考成绩比较
  13. Win7设置wifi热点
  14. 一机玩转docker之六:搭建crucible
  15. wordpress 瀑布流ajax,WordPress瀑布流主题:蛋花儿Free版
  16. Python制作七夕表白实例项目-让你的情人心动起来
  17. 苹果电脑上android环境的搭建
  18. 推荐系统领域对比学习和数据增强论文及代码集锦
  19. 《机器学习实战 学习笔记》(二):端到端的机器学习项目
  20. raid0 raid1 raid5 raid6 raid10的优缺点

热门文章

  1. Windows 10 数据恢复与预防数据丢失指南
  2. 设备综合效率OEE:基于数采的OEE优化分析
  3. 用 Python 画哆啦 A 梦
  4. 茶道形式、用具及要素
  5. JSP报错 At least one JAR was scanned for TLDs yet contained no TLDs
  6. java话费充值_手机话费充值-java示例
  7. Unity发布windows程序,Fullscreen Mode设置为Windowed,可运行总是全屏
  8. inherits在java中是什么属性_在Java中,要想让一个类继承另一个类,可以使用哪个关键字?()...
  9. OPPO R7Plusm(全网通)root、刷入twrp recovery、卡刷刷入CM系统教程合集_ recovery.img文件下载 联想A7600-m线刷刷机教程 手机卡在双4G双百兆无法开
  10. 【校园网环境下知网研学下载文献出现“下载文献 当前IP没有获取权限或服务器异常”处理办法】