在介绍Spark中的任务和资源之前先解释几个名词:

Dirver Program:运行Application的main函数(用户提交的jar包中的main函数)并新建SparkContext实例的程序,称为驱动程序,通常用SparkContext代表驱动程序(任务的驱动程序)。

Cluster Manager:集群管理器是集群资源管理的外部服务。Spark上现在主要有Standalone、YARN、Mesos3种集群资源管理器。Spark自带的Standalone模式能满足绝大部分

        Spark计算环境中对集群资源管理的需求,基本只有在集群中运行多套计算框架时才考虑使用YARN和Mesos。通常说的Spark on YARN或者Standalone指的就是

        不同的集群资源管理方式(资源管理器)。

Worker Node:集群中可以运行Application代码的工作节点(计算资源)。

Executor:  在Worker Node上为Application启动的一个工作进程,在进程中负责任务(Task)的运行,并且负责将数据存放在内存或者磁盘上,在Excutor内部通过多线程(线程池)

       并发处理应用程序的具体任务(在计算资源上运行的工作进程)。

       每个Application都有各自独立的Executors,因此应用程序之间是相互隔离的。

Task:    任务是指被Driver送到Executor上的工作单元。通常一个任务会处理一个Partition的数据,每个Partition一般是一个HDFS的Block块的大小(在工作进程中运行的任务

       线程)。

Application: 是创建了SparkContext实例对象的Spark用户程序,包含一个Driver Program和集群中的多个Executor(运行在Spark集群上的应用程序)。

Job:    和Spark的action对应,每个action都会对应一个Job实例,每个Job会拆分成多个Stage,一个Stage包含一个任务集(TaskSet),任务集中的各个任务通过一定的

        调度机制发送到工作单位(Executor)上并行执行(Application中进行任务切分的粒度)。

0. 资源的调度管理YARN vs Standalone:

Standalone:此模式下由Master节点负责,Worker节点是在Master节点的调度下启动的Executor。此时集群的部署为典型的Master/Slave架构。

Spark on YARN:yarn-cluster模式提交,首先它会和ResourceManager通信,发送请求给ResourceManager,请求启动ApplicationMaster,ResourceManager接收到请求之后,

     会给它分配一个Container,然后在某个NodeManager上启动ApplicationMaster。ApplicationMaster启动之后,会和ResourceManager通信,ApplicationMaster(AM)

     就相当于Driver。AM找RM,请求container,启动Executor,RM会给它分配一批Container,用于启动Exectutor。此时AM会去连接其他NM,去启动Executor,NM

     就相当于Worker。Executor启动之后,向AM反向注册。与standalone相比,AM就相当于Driver,NM相当于Worker,RM相当于Master。NM上启动Executor之后还是

     会反向向AM注册,后面的流程与之前的结构是一样的,这就是yarn-cluster提交模式。

参考:https://zhuanlan.zhihu.com/p/61902619

1. Standalone模式下任务与资源的关系

从上文可知,Spark集群中的资源主要为计算资源,在YARN模式下对应的是Container,Standalone模式下对应的是Worker,Application是用户开发的Spark应用程序,提交到

集群上运行,运行开始时集群给Application分配资源,运行结束后集群会回收资源(Application会释放资源);同一个集群上可以同时运行多个Application,Application之间相互隔离,每个Application对应一个SparkContext对象,由该SparkContext维护与集群之间的关系。

Worker的资源由Master进行管理,Application任务注册到Master之后,由Master根据集群当前Worker的工作状况进行资源分配。分配的资源由SparkContext创建的三个核心对象DAGScheduler,TaskScheduler,SchedulerBackend根据Application进行相应的任务划分和调度。SparkContext创建核心对象及获取计算资源的流程如下图

2. DAGScheduler

DAGSchedule是针对Application对任务进行规划。

DAGScheduler是面向Stage的高层级调度器,DAGScheduler把DAG拆分成很多Task,每组Task都是一个Stage,解析时以Shuffle(宽依赖进行数据同步时会产生Shuffle)为边界反向解析构建Stage;每次遇到Shuffle会产生新的Stage,然后以一个个TaskSet(每个Stage中的Tasks会封装成一个TaskSet)的形式提交给底层的任务调度器TaskScheduler。DAGScheduler需要记录那些RDD被存入磁盘,寻求Task的最优化调度(如Stage内部数据的本地性),监视Shuffle跨节点数据的状态,失败重新提交该Stage。

DAGScheduler的核心工作是进行Stage的划分,Stage划分的依据是RDD的宽窄依赖,父RDD的一个分区同时被多个子RDD分区依赖称为宽依赖,父RDD分区只被一个子RDD分区依赖称为窄依赖。

Spark Application因为不同的Action出发多个Job,每个Job由一个或多个Stage组成,后面的Stage依赖于前面的Stage。Spark在在Job的提交过程中进行Stage的划分以及确定Task的最佳位置,Stage划分以后才进行计算;Task的最佳位置及利用本地数据进行计算,本地数据即为数据就在当前内存中。DAGScheduler利用RDD自身的getPreferedLocations中的数据计算数据的本地性,getPreferedLocations中标明了每个Partition的数据本地性。

 3.TaskScheduler

TaskScheduler针对Task具体的执行过程,也是针对任务而言。

TaskScheduler的核心任务是提交TaskSet到集群进行运算并汇报结果。

  a. 为TaskSet创建和维护一个TaskSetManager,并追踪任务的本地性以及错误信息

  b. 遇到Straggle任务时,会放到其他节点重试

  c. 向DAGScheduler汇报任务执行情况,包括在Shuffle输出丢失的时候报告fetch failed错误等信息。

TaskSchduler需要确定Task任务使用的计算资源,即需要根据计算本地性原则确定Task具体要运行在哪个ExecutorBackend中。

TaskScheduler是从具体计算的角度考虑本地性,区别于DAGScheduler从数据层面考虑的本地性。

TaskSchedulerImpl是TaskScheduler的子类,通过resourceOffers方法确定Task任务具体运行的ExecutorBackend,具体过程如下:

  1. 通过Random.shuffle方法重新洗牌所有计算资源,以寻求计算的负载均衡;

  2. ExecutorBackend的cores个数声明类型为TaskDescription的ArrayBuffer数组;

  3. 如果有新的ExecutorBackend分配给我们的Job,此时会调用executorAdded来获得新的完整的可用计算资源;

  4. 寻求最高级别的优先级本地性;

 /*** Called by cluster manager to offer resources on slaves. We respond by asking our active task* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so* that tasks are balanced across the cluster.*/def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {...// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
    val shuffledOffers = Random.shuffle(offers)
    // Build a list of tasks to assign to each worker.val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    val sortedTaskSets = rootPool.getSortedTaskSetQueuefor (taskSet <- sortedTaskSets) {logDebug("parentName: %s, name: %s, runningTasks: %s".format(taskSet.parent.name, taskSet.name, taskSet.runningTasks))if (newExecAvail) {
        taskSet.executorAdded()
      }}//以下代码计算最高级别的优先级本地性// Take each TaskSet in our scheduling order, and then offer it each node in increasing order// of locality levels so that it gets a chance to launch local tasks on all of them.// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANYfor (taskSet <- sortedTaskSets) {var launchedAnyTask = falsevar launchedTaskAtCurrentMaxLocality = falsefor (currentMaxLocality <- taskSet.myLocalityLevels) {do {launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)launchedAnyTask |= launchedTaskAtCurrentMaxLocality} while (launchedTaskAtCurrentMaxLocality)}if (!launchedAnyTask) {taskSet.abortIfCompletelyBlacklisted(hostToExecutors)}}if (tasks.size > 0) {hasLaunchedTask = true}return tasks}

  5. 通过调用TaskSetManager的resourceOffer最终确定每个Task具体运行的ExecutorBackend的Locality Level;

  6. 通过launchTasks把任务发送给ExecutorBackend执行。launchTasks首先会进行序列化,序列化的大小不能超过默认设置128M,否则报错。由参数    

      spark.rpc.message.maxSize设置。

 4. SchedulerBackend

  SchedulerBackend针对资源,所以该接口在不同的部署模式下会创建不同的子类对象(YarnSchedulerBackend)来进行资源管理,如StandaloneSchedulerBackend是在Standalone模式下的管理对象,负责收集和分配资源给Task使用。

  StandaloneSchedulerBackend在接收到TaskSchedulerImpl的submitTasks后,会调用父类CoarseGrainedSchedulerBackend中的reviveOffers方法,最终调用makOffers方法分配资源执行Task。

  makOffers方法的执行过程:

  1. 首先过滤出Active状态的Executor,然后构建代表Executor资源可用的WorkerOffer(此处为构建可用的资源);

  2. 调用TaskSchedulerImpl的resourceOffers得到TaskDescrition的二维数组,包含Task ID、Executor ID、Task Index等Task执行需要的信息;

  3. 回调DriverEndPoint的launchTask给每个Task对应的Executor发执行Task的LaunchTask信息。

    // Make fake resource offers on all executorsprivate def makeOffers() {// Filter out executors under killingval activeExecutors = executorDataMap.filterKeys(executorIsAlive)val workOffers = activeExecutors.map { case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toIndexedSeqlaunchTasks(scheduler.resourceOffers(workOffers))}

  

转载于:https://www.cnblogs.com/beichenroot/p/11414173.html

Spark中资源与任务的关系相关推荐

  1. 在vc++中,资源和类有怎样的关系?

    严格的讲,资源不属于某一个类.资源是属于整个工程.在整个工程中,任何地方都可以通过指针.类对象声明.资源ID号.使用头文件.引用宏.使用资源序号.调用动态库文件等方式,来使用这些资源.对话框资源,要使 ...

  2. 使用Spark中DataFrame的语法与SQL操作,对人类数据进行处理,比较学历与离婚率的关系

    简介 整理Kaggle上的人类信息数据 Machine-Learning-Databases,这个数据集已经有二十多年的历史,虽然历史久远,但是格式明确,是比较好的入门数据集. 通过Spark中的Da ...

  3. Spark中Task,Partition,RDD、节点数、Executor数、core数目(线程池)、mem数

    Spark中Task,Partition,RDD.节点数.Executor数.core数目的关系和Application,Driver,Job,Task,Stage理解 from:https://bl ...

  4. spark中local模式与cluster模式使用场景_Spark 知识点 ( 架构 RDD Task )

    1. Spark介绍 Apache Spark是一个围绕速度.易用性和复杂分析构建的大数据处理框架,最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一 ...

  5. 什么是spark的惰性计算?有什么优势?_spark——spark中常说RDD,究竟RDD是什么?

    本文始发于个人公众号:TechFlow,原创不易,求个关注 今天是spark专题第二篇文章,我们来看spark非常重要的一个概念--RDD. 在上一讲当中我们在本地安装好了spark,虽然我们只有lo ...

  6. Spark精华问答 | 谈谈spark中的宽窄依赖

    总的来说,Spark采用更先进的架构,使得灵活性.易用性.性能等方面都比Hadoop更有优势,有取代Hadoop的趋势,但其稳定性有待进一步提高.我总结,具体表现在如下几个方面. 1 Q:Spark ...

  7. Spark Shuffle系列-----1. Spark Shuffle与任务调度之间的关系

    本文转自http://blog.csdn.net/u012684933/article/details/49074185,所有权力归原作者所有,仅供学习. Spark根据RDD间的依赖关系是否是Shu ...

  8. Spark系列之Spark的资源调优

    title: Spark系列 第十一章 Spark的资源调优 11.1 概述 ​ 在开发完Spark作业之后,就该为作业配置合适的资源了.Spark的资源参数,基本都可以在sparksubmit命令中 ...

  9. Spark中内存模型管理

    一.概述 Spark 作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色.理解 Spark 内存管理的基本原理,有助于更好地开发 Spark 应用程序和进行性能调优.本文 ...

最新文章

  1. MySQL——统计某个表每天的总量和增量问题解决方案
  2. 把List对象或者单值对象转换为Json格式
  3. koa --- [MVC实现之三]换个角度重新开始-初始化
  4. 无线网络受限制或无连接处理方法
  5. 什么是Intel LBR(上次分支记录),BTS(分支跟踪存储)和AET(体系结构事件跟踪)?
  6. 矩阵方程求解最快c语言算法,求助! C语言用矩阵求解方程组
  7. mysql缺少函数_总结零散的 MySQL 基础知识
  8. 方法二 NTC热敏电阻转换温度的计算方式
  9. ADMM之1范数理解
  10. 基于 VIVADO 的 AM 调制解调(2)工程实现
  11. 小米蓝牙广播数据解析(MiBeacon)
  12. Android组件化开发实践和案例分享 1
  13. 课程学习:让神经机器翻译模型像人类一样学习
  14. UPDATE STATISTICS
  15. Java Web应用开发
  16. 利用描点绘图法求解复杂函数
  17. 记一次神奇的CVPR 2021 Rebuttal 经历
  18. 简单几步:图解——VS2012发布网站详细步骤
  19. 聊聊线性代数(14)SVD的应用--2
  20. Python笔记_13_推导式_集合推导式_字典推导式_生成器

热门文章

  1. 北京师范大学计算机试题二答案,北京师范大学期末计算机试题二
  2. Locally Differential for Frequency Estimation
  3. ngx_thread_pool_init()
  4. javaSE (十六)Randon、System类的方法(生成随机数、主动垃圾回收、终止jvm、对程序进行记时、System自带的复制数组的方法)
  5. 1、零基础学工控——初识plc
  6. Keil更改背景颜色
  7. Openvpn 客户端路由配置
  8. 一文解决安装Anaconda后C盘不断增加的问题、修改默认配置
  9. 带你认识40G单纤双向光模块-QSFP+ BiDi光模块
  10. 尝试加载 Oracle 客户端库时引发 BadImageFormatException