Spark 推测执行是一种优化技术。

在Spark中,可以通过推测执行,即Speculative Execution,来识别并在其他节点的Executor上重启某些运行缓慢的Task,并行处理同样的数据,谁先完成就用谁的结果,并将另一个未完成的Task Kill掉,从而加快Task处理速度。适用于某些Spark任务中部分Task被hang住或运行缓慢,从而拖慢了整个任务运行速度的场景。

注意:

1. 不是所有运行缓慢的Spark任务,都可以用推测执行来解决。

2. 使用推测执行时应谨慎。需要合适的场景、合适的参数,参数不合理可能会导致大量推测执行Task占用资源。

3. 如Spark Streaming写Kafka缓慢,若启用推测执行,可能会导致数据重复。

4. 被推测的Task不会再次被推测。

Spark推测执行参数

spark.speculation :默认false。是否开启推测执行。

spark.speculation.interval :默认100ms。多久检查一次要推测执行的Task。

spark.speculation.multiplier :默认1.5。一个Stage中,运行时间比成功完成的Task的运行时间的中位数还慢1.5倍的Task才可能会被推测执行。

spark.speculation.quantile: 默认0.75。推测的分位数。即一个Stage中,至少要完成75%的Task才开始推测。

Spark推测执行源码解析

源码解析

  /*** TaskScheduleImpl在启动时,会判断是否启动Task的推测执行。*/override def start() {backend.start()if (!isLocal && conf.getBoolean("spark.speculation", false)) {logInfo("Starting speculative execution thread")// scheduleWithFixedDelay 位于`java.util.concurrent.ScheduledExecutorService#scheduleWithFixedDelay`// scheduleWithFixedDelay 指的是系统启动等待`第一个SPECULATION_INTERVAL_MS 时间后`,开始执行定时任务,每隔`第二个SPECULATION_INTERVAL_MS 时间`执行一次。// SPECULATION_INTERVAL_MS 可通过`spark.speculation.interval`参数设置speculationScheduler.scheduleWithFixedDelay(new Runnable {override def run(): Unit = Utils.tryOrStopSparkContext(sc) {// 检查需要推测执行的TaskcheckSpeculatableTasks()}}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)}}

如果开启Spark推测执行(即设置参数spark.speculation=true),且不是Local模式运行,则TaskScheduleImpl在启动spark.speculation.interval(即上述第一个SPECULATION_INTERVAL_MS)时间后,会每隔spark.speculation.interval(即上述第二个SPECULATION_INTERVAL_MS)时间启动一个线程去检查需要推测执行的Task。

点击checkSpeculatableTasks()方法,跳转到org.apache.spark.scheduler.checkSpeculatableTasks,如下代码:

 def checkSpeculatableTasks() {var shouldRevive = falsesynchronized {// MIN_TIME_TO_SPECULATION 在原始副本运行至少这段时间后,才会启动任务的重复副本。 shouldRevive = rootPool.checkSpeculatableTasks(MIN_TIME_TO_SPECULATION)}if (shouldRevive) {// 如果有需要推测执行的Task,则SchedulerBackend向ApplicationMaster发送reviveOffers消息,获取集群中可用的executor列表,发起taskbackend.reviveOffers()}}

可以看到,该方法内部调用的是rootPool.checkSpeculatableTasks(MIN_TIME_TO_SPECULATION),如下代码:

  override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {var shouldRevive = false//schedulableQueue是ConcurrentLinkedQueue[Schedulable]类型,而Schedulable Trait有两种类型的调度实体:Pool、TaskSetManagerfor (schedulable <- schedulableQueue.asScala) {shouldRevive |= schedulable.checkSpeculatableTasks(minTimeToSpeculation)}shouldRevive}

可以看到,最终调用的是schedulable.checkSpeculatableTasks(minTimeToSpeculation)方法。

schedulable是schedulableQueue中的对象,schedulableQueue是ConcurrentLinkedQueue[Schedulable]类型,而Schedulable Trait有两种类型的调度实体:Pool、TaskSetManager。

通过查看org.apache.spark.scheduler.TaskSetManager#checkSpeculatableTasks方法可看到真正检测推测Task的逻辑。如下:

//真正检测推测执行Task的逻辑override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {// Can't speculate if we only have one task, and no need to speculate if the task set is a// zombie or is from a barrier stage.if (isZombie || isBarrier || numTasks == 1) {return false}var foundTasks = false// minFinishedForSpeculation=SPECULATION_QUANTILE * numTasks// SPECULATION_QUANTILE即spark.speculation.quantile// numTasks即某个Stage中Taskset的任务总数。val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toIntlogDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)// 1)已经成功的Task数必须要大于等于`spark.speculation.quantile * numTasks`,才开始处理这个TaskSetif (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {val time = clock.getTimeMillis()// medianDuration: 已经成功的Task的运行时间的中位数// threshold=max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation)// SPECULATION_MULTIPLIER:即spark.speculation.multiplierval medianDuration = successfulTaskDurations.medianval threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation)// TODO: Threshold should also look at standard deviation of task durations and have a lower// bound based on that.logDebug("Task length threshold for speculation: " + threshold)// 2)遍历TaskSet中的每一个Taskfor (tid <- runningTasksSet) {val info = taskInfos(tid)val index = info.index// 3)如果还未运行成功 且 正在执行 且 运行时间已经超过threshold 且 当前不是推测运行的Task// 就将该Task取出放到需要推测执行的列表中if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&!speculatableTasks.contains(index)) {logInfo("Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms".format(index, taskSet.id, info.host, threshold))speculatableTasks += index// 4)最终由DAGScheduler将Task提交到待执行的队列中,后台线程将对提交的任务进行处理sched.dagScheduler.speculativeTaskSubmitted(tasks(index))foundTasks = true}}}foundTasks}

检测推测Task的大致流程

Spark推测执行示例

代码示例

package com.bigdata.sparkimport org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory/*** Author: Wang Pei* License: Copyright(c) Pei.Wang* Summary: *   Spark推测执行*/
object SparkSpeculative {def main(args: Array[String]): Unit = {@transient lazy val logger = LoggerFactory.getLogger(this.getClass)val spark=SparkSession.builder()//启用Spark推测执行.config("spark.speculation",true).config("spark.speculation.interval",1000).config("spark.speculation.multiplier",1.5).config("spark.speculation.quantile",0.10).getOrCreate()logger.info("开始处理.........................................")//设置5个并行度,一个Stage中,5个Task同时运行//为保证5个Task同时运行,Spark Submit提交任务时给5个核//这样,方便观察第4个Task被推测执行spark.sparkContext.parallelize(0 to 50,5).foreach(item=>{if(item ==38){Thread.sleep(200000)}val taskContext = TaskContext.get()val stageId = taskContext.stageId()val taskAttemptId = taskContext.taskAttemptId()logger.info(s"当前Stage:${stageId},Task:${taskAttemptId},打印的数字..............${item}..................")})logger.info("处理完成.........................................")}
}

任务提交

/data/apps/spark-2.4.0-bin-2.7.3.2.6.5.3-10/bin/spark-submit \--master yarn \--deploy-mode cluster \--driver-memory 1g \--executor-memory 1g \--executor-cores  1 \--num-executors  5 \--queue offline \--name SparkSpeculative \--class com.bigdata.spark.SparkSpeculative \bigdata_spark.jar

Yarn日志查看

在Yarn上可以看到如下日志:

19/03/31 04:21:39 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 318 ms on x.x.x.x (executor 4) (1/5)
19/03/31 04:21:39 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 321 ms on x.x.x.x (executor 2) (2/5)
19/03/31 04:21:39 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 338 ms on x.x.x.x (executor 1) (3/5)
19/03/31 04:21:39 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 327 ms on x.x.x.x (executor 3) (4/5)
#task 3被标记为推测执行
19/03/31 04:21:40 INFO scheduler.TaskSetManager: Marking task 3 in stage 0.0 (on x.x.x.x) as speculatable because it ran more than 486 ms
#启动task 3的推测执行task(taskID=5)
19/03/31 04:21:40 INFO scheduler.TaskSetManager: Starting task 3.1 in stage 0.0 (TID 5, x.x.x.x, executor 3, partition 3, PROCESS_LOCAL, 7855 bytes)
#kill掉task 3的推测执行task(taskID=5),由于原来的task已经成功
19/03/31 04:24:59 INFO scheduler.TaskSetManager: Killing attempt 1 for task 3.1 in stage 0.0 (TID 5) on x.x.x.x as the attempt 0 succeeded on x.x.x.x
19/03/31 04:24:59 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 200311 ms on x.x.x.x (executor 5) (5/5)

Spark WebUI查看

在Spark WebUI上可以看到如下结果:

Spark 推测执行相关推荐

  1. Spark 推测执行 /spark.speculation=true /spark.speculation.quantile=0.75/spark.speculation.multiplier=1.5

    在Spark中任务会以DAG图的方式并行执行,每个节点都会并行的运行在不同的executor中,但是有的任务可能执行很快,有的任务执行很慢,比如网络抖动.性能不同.数据倾斜等等.有的Task很慢就会成 ...

  2. 智慧出行/spark Streaming-Dstream流优化:1.消费并行度,2.序列化,3.限流,压背,冷启4.cpu空转时间,5.不要在代码中判断这个表是否存在,6.推测执行7.开启动态资源分配

    1.设置合理的消费并行度 最优的方案是:kafka分区数:broker *3/6/9 kafka分区能不能增加,能不能减少? kafka分区数是可以增加的,但是不能减少 2.序列化 java的序列化, ...

  3. 87-Spark推测执行spark.speculation

    1. 背景 hadoop的推测执行 推测执行(Speculative Execution)是指在分布式集群环境下,因为程序BUG,负载不均衡或者资源分布不均等原因,造成同一个job的多个task运行速 ...

  4. Forerunner:首个面向“多未来”的推测执行技术

    来源:微软研究院AI头条 编者按:10月26-29日,系统领域的全球顶会 SOSP 2021 在线上举办.在本届大会上,微软亚洲研究院研究员陈洋.郭众鑫.李润怀(实习生,浙江大学).陈硕.周礼栋.张宪 ...

  5. 深入理解Spark Streaming执行模型

    摘要:Spark Streaming是Spark中最常用的组件之一,将会有越来越多的有流处理需求的用户踏上Spark的使用之路.本文描述了Spark Streaming的架构并解释如何去提供上述优势, ...

  6. Spark详解(五):Spark作业执行原理

    Spark的作业和任务调度系统是其核心,它能够有效地进行调度的根本原因是对任务的划分DGG和容错.下面我们介绍一下相关术语: 作业(Job):RDD中由行动操作所生成的一个或者多个调度阶段 调度阶段( ...

  7. Spark任务执行期间写临时文件报错导致失败

    spark任务在执行期间,有时候会遇到临时目录创建失败,导致任务执行错误. java.io.IOException: Failed to create local dir in -- spark执行过 ...

  8. Hadoop之资源调度器与任务推测执行

    Hadoop之资源调度器 目录 资源调度器概述 先进先出调度器(FIFO) 容量调度器(Capacity Scheduler) 公平调度器(Fair Scheduler) 任务的推测执行 1. 资源调 ...

  9. 【大数据开发】SparkCore——Spark作业执行流程、RDD编程的两种方式、简单算子

    文章目录 一.Spark作业执行流程(重点) 二.RDD编程 2.1创建RDD的⼆种⽅式: 2.2Transformation算⼦ 2.3Action算子 三.简单算子(必须掌握) 3.1 map.m ...

  10. Spark任务执行流程

    这是Spark官方给的图,大致意思就是: 四个步骤 1.构建DAG(有向无环图)(调用RDD上的方法) 2.DAGScheduler将DAG切分Stage(切分的依据是Shuffle),将Stage中 ...

最新文章

  1. mysql的聚合函数综合案例_MySQL常用聚合函数详解
  2. Python基础教程:列表字典的键值修改
  3. bzoj5252 [2018多省省队联测]林克卡特树
  4. [Vue.js] 基础 -- Vue简介
  5. 利用计算机语言实现ID3算法,机器学习之决策树学习-id3算法-原理分析及c语言代码实现.pdf...
  6. Linux学习总结(30)——优秀程序员喜欢用Linux操作系统
  7. SSL 1105——【USACO 2.1】顺序的分数(递归+二分)
  8. Scala 类型、数值类型及类型转换
  9. “21天好习惯“第一期-4
  10. java 字符串和整型的相互转换
  11. win10多合一原版系统_微软Win10专业版制作多合一系统安装盘教程
  12. 15b万用表怎么测电容_怎么判断启动电容好坏_启动电容怎么测量好坏_数字万用表测电容好坏...
  13. WEEK-3 实战作业
  14. 排序算法系列之(二)——冒泡排序名字最为形象的一个
  15. c语言人民邮电出版社课后答案,C语言程序设计教程(人民邮电出版社) 课后习题解答6-10...
  16. 2018计算机考研时间表,2018年考研时间与考试各科目的日程安排
  17. Typora保存图片,上传图片,分享图片,图片丢失
  18. mysql搜索所有符合 条件的列名
  19. JQuery基础教程:入门
  20. 2021安道拓企业研究数据报告_爱普搜汽车

热门文章

  1. Android P 网络请求相关总结
  2. java.sql.SQLException: Access denied for user '''localhost' (using password: NO) 的处理方法
  3. 设备综合效率OEE:基于数采的OEE优化分析
  4. 《上古卷轴5:天际》控制台代码之装备
  5. android 充话费接口,调用手机话费充值API的SDK编写思路
  6. 概率论复习大纲 | 速成
  7. anisotropy texture filtering
  8. MATLAB TIFF转Shape、TIFF和Shape的读写
  9. python文件重命名加日期_Python文件创建日期和Critiqu的重命名请求
  10. java麻将算法_Java实现的麻将胡牌算法