作者:过往记忆

从官方的文档我们可以知道, Spark 的部署方式有很多种:local、Standalone、Mesos、YARN…..不同部署方式的后台处理进程是不一样的,但是如果我们从代码的角度来看,其实流程都差不多。

从代码中,我们可以得知其实

Spark

的部署方式其实比官方文档中介绍的还要多,这里我来列举一下:

1、local:这种方式是在本地启动一个线程来运行作业;

2、local[N]:也是本地模式,但是启动了N个线程;

3、local[*]:还是本地模式,但是用了系统中所有的核;

4、local[N,M]:这里有两个参数,第一个代表的是用到的核个数;第二个参数代表的是容许该作业失败M次。上面的几种模式没有指定M参数,其默认值都是1;

5、local-cluster[N, cores, memory]:本地伪集群模式,参数的含义我就不说了,看名字就知道;式;

6、spark:// :这是用到了 Spark 的Standalone模

7、(mesos|zk)://:这是Mesos模式;

8、yarn-standalone\yarn-cluster\yarn-client:这是YARN模式。前面两种代表的是集群模式;后面代表的是客户端模式;

9、simr://:这种你就不知道了吧?simr其实是Spark In MapReduce的缩写。我们知道MapReduce 1中是没有YARN的,如果你在MapReduce 1中使用Spark,那么就用这种模式吧。

总体来说,上面列出的各种部署方式运行的流程大致一样:都是从SparkContext切入,在SparkContext的初始化过程中主要做了以下几件事:

1、根据SparkConf创建SparkEnv

 
  private [spark] val env = SparkEnv.create(
  conf,
  "<driver>" ,
  conf.get( "spark.driver.host" ),
  conf.get( "spark.driver.port" ).toInt,
  isDriver = true ,
  isLocal = isLocal,
  listenerBus = listenerBus)
  SparkEnv.set(env)

2、初始化executor的环境变量executorEnvs

这个步骤代码太多了,我就不贴出来。

3、创建TaskScheduler

  // Create and start the scheduler
  private [spark] var taskScheduler = SparkContext.createTaskScheduler(this , master)

4、创建DAGScheduler

  @ volatile private [spark] var dagScheduler : DAGScheduler = _
  try {
  dagScheduler = new DAGScheduler( this )
  } catch {
  case e : Exception = > throw
  new SparkException( "DAGScheduler
  cannot be initialized due to %s" .format(e.getMessage))
  }

5、启动TaskScheduler

 
 
  // constructor
  taskScheduler.start()

那么,DAGScheduler和TaskScheduler都是什么?

DAGScheduler称为作业调度,它基于Stage的高层调度模块的实现,它为每个Job的Stages计算DAG,记录哪些RDD和Stage的输出已经实物化,然后找到最小的调度方式来运行这个Job。然后以Task Sets的形式提交给底层的任务调度模块来具体执行。

TaskScheduler称为任务调度。它是低层次的task调度接口,目前仅仅被TaskSchedulerImpl实现。这个接口可以以插件的形式应用在不同的task调度器中。每个TaskScheduler只给一个SparkContext调度task,这些调度器接受来自DAGScheduler中的每个stage提交的tasks,并负责将这些tasks提交给cluster运行。如果提交失败了,它将会重试;并处理stragglers。所有的事件都返回到DAGScheduler中。

在创建DAGScheduler的时候,程序已经将taskScheduler作为参数传进去了,代码如下:

  def this (sc : SparkContext, taskScheduler : TaskScheduler) = {
  this (
  sc,
  taskScheduler,
  sc.listenerBus,
  sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
  sc.env.blockManager.master,
  sc.env)
  }
 
  def this (sc : SparkContext) = this (sc, sc.taskScheduler)

也就是DAGScheduler封装了TaskScheduler。TaskScheduler中有两个比较重要的方法:

 
  def submitTasks(taskSet : TaskSet) : Unit
 
  // Cancel a stage.
  def cancelTasks(stageId : Int, interruptThread : Boolean)

这些方法在DAGScheduler中被调用,而TaskSchedulerImpl实现了TaskScheduler,为各种调度模式提供了任务调度接口,在TaskSchedulerImpl中还实现了resourceOffers和statusUpdate两个接口给Backend调用,用于提供调度资源和更新任务状态。在YARN模式中,还提供了YarnClusterScheduler类,他只是简单地继承TaskSchedulerImpl类,主要重写了getRackForHost(hostPort: String)和postStartHook() 方法。继承图如下:

在 上文 我们谈到了 Spark Context的初始化过程会做好几件事情,其中做了一件重要的事情就是 创建TaskScheduler 。

  // Create and start the scheduler
  private [spark] var taskScheduler = < span class ="wp_keywordlink_affiliate" >< a href ="http://www.iteblog.com/archives/tag/spark" title = "" target = "_blank"data-original-title = "View all posts in Spark" > Spark < /a >< /span >Context.createTaskScheduler( this , master)

在createTaskScheduler方法中,会根据用户传进来的master URL分别初始化不同的SchedulerBackend和ExecutorBackend。而且从代码中我们可以看到master URL多大九种格式。但是在代码中SchedulerBackend的种类可没九种,只有五种;而ExecutorBackend只有三种,我们先来看看这些SchedulerBackend和ExecutorBackend的类继承关系:

每一个Application都对应了一个SchedulerBackend和多个ExecutorBackend。下面我们分别介绍各种运行模式所涉及到的类

1、Local模式

local模式出了伪集群模式(local-cluster),所有的local都是用到了LocalBackend和TaskSchedulerImpl类。LocalBackend接收来自TaskSchedulerImpl的receiveOffers()调用,并根据运行Application传进来的CPU核生成WorkerOffer,并调用scheduler.resourceOffers(offers)生成Task,最后通过 executor.launchTask来执行这些Task。

2、Standalone

Standalone模式使用SparkDeploySchedulerBackend和TaskSchedulerImpl,SparkDeploySchedulerBackend是继承自CoarseGrainedSchedulerBackend类,并重写了其中的一些方法。

CoarseGrainedSchedulerBackend是一个粗粒度的资源调度类,在Spark job运行的整个期间,它会保存所有的Executor,在task运行完的时候,并不释放该Executor,也不向Scheduler申请一个新的Executor。Executor的启动方式有很多中,需要根据Application提交的Master URL进行判断。在CoarseGrainedSchedulerBackend中封装了一个DriverActor类,它接受Executor注册(RegisterExecutor)、状态更新(StatusUpdate)、响应Scheduler的ReviveOffers请求、杀死Task等等。

在本模式中将会启动一个或者多个CoarseGrainedExecutorBackend。具体是通过AppClient类向Master请求注册Application。当注册成功之后,Master会向Client进行反馈,并调用schedule启动Driver和CoarseGrainedExecutorBackend,启动的Executor会向DriverActor进行注册。然后CoarseGrainedExecutorBackend通过aunchTask方法启动已经提交的Task。

3、yarn-cluster

yarn-cluster集群模式涉及到的类有YarnClusterScheduler和YarnClusterSchedulerBackend。YarnClusterSchedulerBackend同样是继承自CoarseGrainedSchedulerBackend。而YarnClusterScheduler继承自TaskSchedulerImpl,它只是简单地对TaskSchedulerImpl进行封装,并重写了getRackForHost和postStartHook方法。

Client类通过YarnClient在Hadoop集群上启动一个Container,并在其中运行ApplicationMaster,并通过Yarn提供的接口在集群中启动多个Container用于运行CoarseGrainedExecutorBackend,并向CoarseGrainedSchedulerBackend中的DriverActor进行注册。

yarn-cluster模式作业从提交到运行的整个过程请参见本博客文章: 《Spark on YARN集群模式作业运行全过程分析》

4、yarn-client

yarn-cluster集群模式涉及到的类有YarnClientClusterScheduler和YarnClientSchedulerBackend。YarnClientClusterScheduler继承自TaskSchedulerImpl,并对其中的getRackForHost方法进行了重写。Yarn-client模式下,会在集群外面启动一个ExecutorLauncher来作为driver,并想集群申请Container,来启动CoarseGrainedExecutorBackend,并向CoarseGrainedSchedulerBackend中的DriverActor进行注册。

5、Mesos

Mesos模式调度方式有两种:粗粒度和细粒度。粗粒度涉及到的类有CoarseMesosSchedulerBackend和TaskSchedulerImpl类;而细粒度涉及到的类有MesosSchedulerBackend和TaskSchedulerImpl类。CoarseMesosSchedulerBackend和 MesosSchedulerBackend都继承了MScheduler(其实是Mesos的Scheduler),便于注册到Mesos资源调度的框架中。选择哪种模式可以通过spark.mesos.coarse参数配置。默认的是MesosSchedulerBackend。

上面涉及到Spark的许多部署模式,究竟哪种模式好这个很难说,需要根据你的需求,如果你只是测试Spark Application,你可以选择local模式。而如果你数据量不是很多,Standalone 是个不错的选择。当你需要统一管理集群资源(Hadoop、Spark等)那么你可以选择Yarn,但是这样维护成本就会变高。yarn-cluster和yarn-client模式内部实现还是有很大的区别。如果你需要用于生产环境,那么请选择yarn-cluster;而如果你仅仅是Debug程序,可以选择yarn-client。

文章出处: 过往记忆 《Spark源码分析:多种部署方式之间的区别与联系》

Spark源码分析:多种部署方式之间的区别与联系相关推荐

  1. Spark源码分析之七:Task运行(一)

    在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...

  2. Spark源码分析之九:内存管理模型

    Spark是现在很流行的一个基于内存的分布式计算框架,既然是基于内存,那么自然而然的,内存的管理就是Spark存储管理的重中之重了.那么,Spark究竟采用什么样的内存管理模型呢?本文就为大家揭开Sp ...

  3. spark 源码分析 Blockmanager

    原文链接 参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD  Spark的特点就是可以将RDD cache在memo ...

  4. Spark源码分析 – DAGScheduler

    DAGScheduler的架构其实非常简单, 1. eventQueue, 所有需要DAGScheduler处理的事情都需要往eventQueue中发送event 2. eventLoop Threa ...

  5. spark 源码分析之十八 -- Spark存储体系剖析

    本篇文章主要剖析BlockManager相关的类以及总结Spark底层存储体系. 总述 先看 BlockManager相关类之间的关系如下: 我们从NettyRpcEnv 开始,做一下简单说明. Ne ...

  6. Spark源码分析之Sort-Based Shuffle读写流程

    一 概述 我们知道Spark Shuffle机制总共有三种: # 未优化的Hash Shuffle:每一个ShuffleMapTask都会为每一个ReducerTask创建一个单独的文件,总的文件数是 ...

  7. Spark 源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 一. 启动篇 (一) 引子 在spark-shell终端执行 val arr = Array(1,2,3,4) val rdd ...

  8. spark 源码分析之二十 -- Stage的提交

    引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...

  9. spark 源码分析之十九 -- DAG的生成和Stage的划分

    上篇文章 spark 源码分析之十八 -- Spark存储体系剖析 重点剖析了 Spark的存储体系.从本篇文章开始,剖析Spark作业的调度和计算体系. 在说DAG之前,先简单说一下RDD. 对RD ...

最新文章

  1. R语言基础练习与入门实践
  2. 20个开源项目助你快速掌握区块链应用开发,JS Go PHP Python Ruby
  3. [编写高质量代码:改善java程序的151个建议]建议57 推荐在复杂字符串操作中使用正则表达式...
  4. php内置邮件sendmail发送,PHP发送邮件函数sendmail()
  5. STM32f103 can的两个接收fifo使用方法
  6. flink HA高可用Standalone集群搭建
  7. gitlab常规维护命令
  8. pat 1025 反转链表
  9. gin 前端文件打包_远程URL文件批量下载打包的方法
  10. Android开发基础(四大组件及Intent)
  11. Windows 命令行改主机名、加域、退域、更改IP
  12. asp.net服务器端对话框控件的简单实现(附源码)
  13. oracle 英文 简历,英文优秀个人简历模板范文
  14. 方正璞华:硬核攻关,自主创新,推动印前处理自动化云流程走向世界
  15. 犹太人和你想的不一样
  16. QChart入门教程-绘制正弦曲线
  17. 看华为生态大学 如何玩转人才生态?
  18. Axis2创建web service(一) - eclipse安装Axis2插件
  19. 贝叶斯神经网络 BNN
  20. IC数字常见问题(一)时钟

热门文章

  1. 2. APIS官网剖析(博主推荐)
  2. JavaScript高级程序设计之什么是原型模式
  3. 在索引列上正确使用LIKE运算符
  4. [z] Flare-兼容Memcached协议的分布式(key/value store)键值存储系统
  5. 大班体育游戏 电子计算机,【大班户外游戏】_幼儿园大班体育游戏活动设计40篇...
  6. STM32_Systick学习及例程改写
  7. 递推——覆盖墙壁(洛谷 P1990)
  8. 下载的长数据怎么分开R语言_TCGA数据库单基因gsea作业之COAD-READ
  9. 本周4天4场直播,解决你对Oracle的种种疑惑,还有第2期大咖讲坛讨论敏捷开发中的性能质量管控...
  10. 给数据库减负的八个思路,盘它!