背景

hadoop的推测执行 
  推测执行(Speculative Execution)是指在分布式集群环境下,因为程序BUG,负载不均衡或者资源分布不均等原因,造成同一个job的多个task运行速度不一致,有的task运行速度明显慢于其他task(比如:一个job的某个task进度只有10%,而其他所有task已经运行完毕),则这些task拖慢了作业的整体执行进度,为了避免这种情况发生,Hadoop会为该task启动备份任务让该speculative task与原始task同时处理一份数据,哪个先运行完,则将谁的结果作为最终结果。 
  推测执行优化机制采用了典型的以空间换时间的优化策略,它同时启动多个相同task(备份任务)处理相同的数据块哪个完成的早,则采用哪个task的结果,这样可防止拖后腿Task任务出现,进而提高作业计算速度,但是,这样却会占用更多的资源,在集群资源紧缺的情况下,设计合理的推测执行机制可在多用少量资源情况下,减少大作业的计算时间。

这里可以类比spark的推测执行

  推测任务是指对于一个Stage里面拖后腿的Task,会在其他节点的Executor上再次启动这个task,如果其中一个Task实例运行成功则将这个最先完成的Task的计算结果作为最终结果,同时会干掉其他Executor上运行的实例。spark推测式执行默认是关闭的,可通过spark.speculation属性来开启。

2. 注意事项

  1. 谨慎使用,严重的会造成所有资源被全部占用,不能及时释放

3. 源代码

    /*** TaskScheduleImpl的初始化和启动是在SparkConext中,进行的,初始化的时候会* 传入SparkDeploySchedulerBackend对象。启动则直接调用start方法。在Start* 方法中,会判断是否启动任务的推测执行,由spark.speculation属性指定,默认不执行*/if (!isLocal && conf.getBoolean("spark.speculation", false)) {logInfo("Starting speculative execution thread")speculationScheduler.scheduleWithFixedDelay(new Runnable {override def run(): Unit = Utils.tryOrStopSparkContext(sc) {// 检查我们所有活跃的job中是否有可推测的任务。checkSpeculatableTasks()}}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)

若开启则会启动一个线程每隔SPECULATION_INTERVAL_MS(默认100ms,可通过spark.speculation.interval属性设置)通过checkSpeculatableTasks方法检测是否有需要推测式执行的tasks:

  // How often to check for speculative tasks  多久检查一次推测任务val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms")// Duplicate copies of a task will only be launched if the original copy has been running for// at least this amount of time. This is to avoid the overhead of launching speculative copies// of tasks that are very short.// 只有在原始副本至少运行了这么多时间的情况下,才会启动任务的副本。这是为了避免产生非常短的任务的推测性副本的开销。val MIN_TIME_TO_SPECULATION = 100
  // Check for speculatable tasks in all our active jobs.// 检查我们所有活跃的job中是否有可推测的任务。def checkSpeculatableTasks() {var shouldRevive = falsesynchronized {shouldRevive = rootPool.checkSpeculatableTasks(MIN_TIME_TO_SPECULATION)}if (shouldRevive) {backend.reviveOffers()}}

然后又通过rootPool的方法判断是否有需要推测式执行的tasks,若有则会调用SchedulerBackend的reviveOffers去尝试拿资源运行推测任务。继续看看检测逻辑是什么样的:

  override def checkSpeculatableTasks(): Boolean = {var shouldRevive = falsefor (schedulable <- schedulableQueue.asScala) {shouldRevive |= schedulable.checkSpeculatableTasks()}shouldRevive}

直接点开schedulable.checkSpeculatableTasks() 调用的是 
private[spark] trait Schedulable 特质中的接口

  def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean

然后看schedulable <- schedulableQueue.asScala,继续看schedulableQueue的定义,

// 存储(pools或者TaskSetManagers)的链表
val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]

ool里又调用了schedulable的方法,schedulable是ConcurrentLinkedQueue[Schedulable]类型,队列里面放的都是TaskSetMagager,

最后再看TaskSetMagager的checkSpeculatableTasks方法,终于找到检测根源了:

/*** Check for tasks to be speculated and return true if there are any. This is called periodically* by the TaskScheduler.**/override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {// Can't speculate if we only have one task, and no need to speculate if the task set is a// zombie.// 如果task只有一个或者所有task都不需要再执行了就没有必要再检测if (isZombie || numTasks == 1) {return false}var foundTasks = false// 所有task数 * SPECULATION_QUANTILE(默认0.75,可通过spark.speculation.quantile设置)val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toIntlogDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)// 成功的task数是否超过总数的75%,并且成功的task是否大于0if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {val time = clock.getTimeMillis()// 取这多个task任务执行成功时间的中位数var medianDuration = successfulTaskDurations.median// 中位数 * SPECULATION_MULTIPLIER (默认1.5,可通过spark.speculation.multiplier设置)val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation)// TODO: Threshold should also look at standard deviation of task durations and have a lower// bound based on that.logDebug("Task length threshold for speculation: " + threshold)// 遍历该TaskSet中的task,取未成功执行、正在执行、执行时间已经大于threshold 、// 推测式执行task列表中未包括的task放进需要推测式执行的列表中speculatableTasksfor (tid <- runningTasksSet) {val info = taskInfos(tid)val index = info.indexif (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&!speculatableTasks.contains(index)) {logInfo("Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms".format(index, taskSet.id, info.host, threshold))speculatableTasks += indexfoundTasks = true}}}foundTasks}

检查逻辑代码中注释很明白,当成功的Task数超过总Task数的75%(可通过参数spark.speculation.quantile设置)时,再统计所有成功的Tasks的运行时间,得到一个中位数,用这个中位数乘以1.5(可通过参数spark.speculation.multiplier控制)得到运行时间门限,如果在运行的Tasks的运行时间超过这个门限,则对它启用推测。简单来说就是对那些拖慢整体进度的Tasks启用推测,以加速整个Stage的运行。
算法大致流程如图:

3.1 推测式任务什么时候被调度

在TaskSetMagager在延迟调度策略下为一个executor分配一个task时会调用dequeueTask方法:

/*** Dequeue a pending task for a given node and return its index and locality level.* Only search for tasks matching the given locality constraint.** 将给定节点的挂起任务删除,并返回其索引和位置级别。只搜索与给定区域约束匹配的任务。** @return An option containing (task index within the task set, locality, is speculative?)*         包含(任务集中的任务索引,地点,是推测的?)*/private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] ={// dequeueTaskFromList()方法:从给定的列表中取消一个挂起的任务并返回它的索引。如果列表为空,则返回None。// PROCESS_LOCAL: 数据在同一个 JVM 中,即同一个 executor 上。这是最佳数据 locality。for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {return Some((index, TaskLocality.PROCESS_LOCAL, false))}// NODE_LOCAL: 数据在同一个节点上。比如数据在同一个节点的另一个 executor上;或在 HDFS 上,// 恰好有 block 在同一个节点上。速度比 PROCESS_LOCAL 稍慢,因为数据需要在不同进程之间传递或从文件中读取if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) {return Some((index, TaskLocality.NODE_LOCAL, false))}}// NO_PREF: 数据从哪里访问都一样快,不需要位置优先if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {// Look for noPref tasks after NODE_LOCAL for minimize cross-rack trafficfor (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) {return Some((index, TaskLocality.PROCESS_LOCAL, false))}}// RACK_LOCAL: 数据在同一机架的不同节点上。需要通过网络传输数据及文件 IO,比 NODE_LOCAL 慢if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {for {rack <- sched.getRackForHost(host)index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))} {return Some((index, TaskLocality.RACK_LOCAL, false))}}// ANY: 数据在非同一机架的网络上,速度最慢if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) {return Some((index, TaskLocality.ANY, false))}}// find a speculative task if all others tasks have been scheduled// 如果所有其他任务都安排好了,就去找一个推测的任务。dequeueSpeculativeTask(execId, host, maxLocality).map {case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}}

该方法的最后一段就是在其他任务都被调度后为推测式任务进行调度,看看如何实现:

/*** Return a speculative task for a given executor if any are available. The task should not have* an attempt running on this host, in case the host is slow. In addition, the task should meet* the given locality constraint.** 如果有任何可用的执行器,返回一个推测任务。任务不应该在主机上运行,以防主机运行缓慢。此外,* 该任务还应满足给定的局部约束条件。*/// Labeled as protected to allow tests to override providing speculative tasks if necessaryprotected def dequeueSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value): Option[(Int, TaskLocality.Value)] ={// 从推测式执行任务列表中移除已经成功完成的task,因为从检测到调度之间还有一段时间,// 某些task已经成功执行// 从set集合中删除完成的任务speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set// 判断task是否可以在该executor对应的Host上执行,判断条件是:// task没有在该host上运行;// 该executor没有在task的黑名单里面(task在这个executor上失败过,并还在'黑暗'时间内)def canRunOnHost(index: Int): Boolean = {!hasAttemptOnHost(index, host) &&!isTaskBlacklistedOnExecOrNode(index, execId, host)}// // 推测执行任务集合是否为空if (!speculatableTasks.isEmpty) {// Check for process-local tasks; note that tasks can be process-local// on multiple nodes when we replicate cached blocks, as in Spark Streaming// 获取能在该executor上启动的taskIndexfor (index <- speculatableTasks if canRunOnHost(index)) {// 获取task的优先位置val prefs = tasks(index).preferredLocationsval executors = prefs.flatMap(_ match {case e: ExecutorCacheTaskLocation => Some(e.executorId)case _ => None});// 优先位置若为ExecutorCacheTaskLocation并且数据所在executor包含当前executor,// 则返回其task在taskSet的index和Locality Levelsif (executors.contains(execId)) {speculatableTasks -= indexreturn Some((index, TaskLocality.PROCESS_LOCAL))}}// Check for node-local tasks// 这里的判断是延迟调度的作用,即使是推测式任务也尽量以最好的本地性级别来启动if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {for (index <- speculatableTasks if canRunOnHost(index)) {val locations = tasks(index).preferredLocations.map(_.host)if (locations.contains(host)) {speculatableTasks -= indexreturn Some((index, TaskLocality.NODE_LOCAL))}}}// Check for no-preference tasksif (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) {for (index <- speculatableTasks if canRunOnHost(index)) {val locations = tasks(index).preferredLocationsif (locations.size == 0) {speculatableTasks -= indexreturn Some((index, TaskLocality.PROCESS_LOCAL))}}}// Check for rack-local tasksif (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {for (rack <- sched.getRackForHost(host)) {for (index <- speculatableTasks if canRunOnHost(index)) {val racks = tasks(index).preferredLocations.map(_.host).flatMap(sched.getRackForHost)if (racks.contains(rack)) {speculatableTasks -= indexreturn Some((index, TaskLocality.RACK_LOCAL))}}}}// Check for non-local tasksif (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {for (index <- speculatableTasks if canRunOnHost(index)) {speculatableTasks -= indexreturn Some((index, TaskLocality.ANY))}}}None}

先过滤掉已经成功执行的task,另外,推测执行task不在和正在执行的task同一Host执行,不在黑名单executor里执行,然后在延迟调度策略下根据task的优先位置来决定是否在该executor上以某种本地性级别被调度执行。

Spark推测执行spark.speculation相关推荐

  1. Spark 推测执行 /spark.speculation=true /spark.speculation.quantile=0.75/spark.speculation.multiplier=1.5

    在Spark中任务会以DAG图的方式并行执行,每个节点都会并行的运行在不同的executor中,但是有的任务可能执行很快,有的任务执行很慢,比如网络抖动.性能不同.数据倾斜等等.有的Task很慢就会成 ...

  2. 87-Spark推测执行spark.speculation

    1. 背景 hadoop的推测执行 推测执行(Speculative Execution)是指在分布式集群环境下,因为程序BUG,负载不均衡或者资源分布不均等原因,造成同一个job的多个task运行速 ...

  3. 智慧出行/spark Streaming-Dstream流优化:1.消费并行度,2.序列化,3.限流,压背,冷启4.cpu空转时间,5.不要在代码中判断这个表是否存在,6.推测执行7.开启动态资源分配

    1.设置合理的消费并行度 最优的方案是:kafka分区数:broker *3/6/9 kafka分区能不能增加,能不能减少? kafka分区数是可以增加的,但是不能减少 2.序列化 java的序列化, ...

  4. 深入理解Spark Streaming执行模型

    摘要:Spark Streaming是Spark中最常用的组件之一,将会有越来越多的有流处理需求的用户踏上Spark的使用之路.本文描述了Spark Streaming的架构并解释如何去提供上述优势, ...

  5. Spark详解(五):Spark作业执行原理

    Spark的作业和任务调度系统是其核心,它能够有效地进行调度的根本原因是对任务的划分DGG和容错.下面我们介绍一下相关术语: 作业(Job):RDD中由行动操作所生成的一个或者多个调度阶段 调度阶段( ...

  6. Spark-sql:以编程方式执行Spark SQL查询(通过反射的方式推断出Schema,通过StrutType直接指定Schema)

    1.编写Spark SQL查询语句 在这之前创建Maven项目.创建的过程如:http://blog.csdn.net/tototuzuoquan/article/details/74571374 在 ...

  7. Spark任务执行期间写临时文件报错导致失败

    spark任务在执行期间,有时候会遇到临时目录创建失败,导致任务执行错误. java.io.IOException: Failed to create local dir in -- spark执行过 ...

  8. dataframe 如何选中某列的一行_PySpark和SparkSQL基础:如何利用Python编程执行Spark(附代码)

    作者:Pinar Ersoy 翻译:孙韬淳 校对:陈振东 本文约2500字,建议阅读10分钟 本文通过介绍Apache Spark在Python中的应用来讲解如何利用PySpark包执行常用函数来进行 ...

  9. 【Spark】Spark应用执行机制

    Spark应用概念 Spark应用(Application)是用户提交的应用程序.执行模式又Local.Standalone.YARN.Mesos.根据Spark Application的Driver ...

  10. Spark 提交执行源码学习

    SparkSubmit 执行后,执行环境准备工作 private def runDriver(): Unit = {addAmIpFilter(None, System.getenv(Applicat ...

最新文章

  1. 应用人工智能有助心理学发展
  2. python语言入门编程猫-编程猫推出海龟编程器,打造Python教育产品矩阵
  3. 图解Reformer:一种高效的Transformer
  4. jaydebeapi可以连接_Python安装jpype调用java,安装jaydebeapi通过jdbc连接数据库
  5. matlab中std和std2的区别,matlab 关于std,std2,mean,mean2初步认识
  6. java动态变量名反射_Java动态性—反射 - Eclipse666的个人空间 - OSCHINA - 中文开源技术交流社区...
  7. 微软亚研院:CV领域2019年重点论文推荐
  8. [VJ][bfs]Catch That Cow
  9. python实时连接oracle_python连接oracle数据库
  10. 微电子学与计算机期刊2019,微电子与通信工程学院研究生两篇论文被人工智能顶级会议AAAI 2019接收...
  11. ubuntu jdk1.7升级到1.8
  12. 我的Windows初始安装软件(技术型办公用电脑)
  13. Maven报:Unable to import maven project: See logs for details
  14. 卡片跳转快应用指定页面,如何点返回直接退出快应用回到卡片
  15. 第十七届全国大学生智能汽车竞赛讯飞-家庭服务机器人挑战赛全国选拔赛规则
  16. nginx中配置gzip_static on提示nginx: [emerg] unknown directive “gzip_static“ in
  17. R数据分析:随机截距交叉滞后RI-CLPM与传统交叉滞后CLPM
  18. 【猿如意】中的『XMind』工具详情介绍
  19. Electron MAC 打包签名生成
  20. 数据库设计(理论实例)

热门文章

  1. justify-content: space-evenly 在移动端部分机型无效
  2. Coverage [minx,miny,maxx,maxy] is [12, 4, 13, 6, 3], index [x,y,z] is [2, 5, 3]错误原因及其解决方式...
  3. 还在用网盘备份同步3D图纸?你落伍了
  4. Error: unable to connect to node rabbit@localhost: nodedown 创建消息队列用户报错
  5. 我的世界服务器怎么制作头颅,我的世界怎么刷生物头颅 生物头颅制作方法
  6. 《蔡康永的说话之道》读书笔记
  7. 关于身份证加密展示处理以及update table set cou=(select from ..)
  8. 微博平台的RPC服务化实践
  9. mysql filtered_为什么Mysql explain extended中的filtered列值总是100%
  10. OBD系统系族分类规则