Spark源码学习:sparkContext的初始化分析

spark可以运行在本地模式local下,可以运行在yarn和standalone模式下,但是本地程序是通过什么渠道和这些集群交互的呢?那就是sparkContext,他在spark生态系统中的作用不言而喻,绝对是最重要的,整体架构如图所示:

这里我们简单的来剖析一下,sparkContext在初始化最重要的流程和大致框架。spark代码第一句都是先创建sparkConf之后作为参数传递给sparkContext在进行创建sc,之后的一切操作都离不开sparkContext。

val conf = new SparkConf().setAppName(appName).setMaster(master) sc=new SparkContext(conf)

在debug模式下进入源代码。

可以看到这里的sparkContext使用sparkConf作为参数来进行传递参数。变量allowMultipleContext是用来决定是否在spark中指运行一个任务,markPartiallyContructed来确保实例的唯一性。

上面是对sparkConf进行复制之后,然后对配置信息进行校验。上面的代码显示spark运行必须指定master和name,否则就会抛出异常,结束初始化过程,在后面我们会看到master是用来设置部署模式,name用来指定程序名称,相对简单。

下面重要的是sparkEnv的初始化,sparkEnv是spark的执行环境对象,包括很多与Executor执行相关的对象。在local模式下Driver会创建Executor,local-cluster部署模式或者standalone部署模式下worker另起CoarseGrainedExecutorBackend进程中创建executor,继而创建taskRunner方法,运行runtask运行任务,这个方法会有自己的具体实现类,shufflemaptask和resulttask有具体的实现。所以sparkEnv存在于driver或者CoarseGrainedExecutorBackend进程中。代码如下:

我们进入createDriverEnv方法,它隶属于SparkEnv的方法。主要保存spark运行时环境变量。这里阐述最重要的变量初始化,至于什么页面监控之类的就不看了。

(1) Akka的分布式消息系统actorSystem的初始化。

(2) 创建map任务输出跟踪器mapoutputTracker,主要就是跟踪map任务把数据结果写到哪里去了,reduce也可以去取数据map,reduce,shuffle都有自己所对应的ID,着重介绍一下MapOutputTrackerMaxter,它内部使用mapStatuses来维护跟中map任务的输出状态,这个数据结构是一个map,其中key对应shufleID,value存储各个map任务对应的状态信息mapStatus。由于mapStatus是由blockmanagerid(key)和bytesSize(value)组成,key是表示这些计算的中间结果存放在那个blockManager,value表示不同的reduceID要读取的数据大小这时reduce就知道从哪里fetch数据,并且判断数据的大小(和0比较来确保确实获得了数据)。

driver和executor处理mapOutputtTrackermMaster的方式不同:

Driver:创建mapOutputtTrackermMaster,然后创建mapOutputtTrackermMasterActor,并且注册到ActorSystem.

Executor:创建mapOutputtTrackerm,并从ActorSystem中找到mapOutputtTrackermMasterActor。

有那么多的executor,当然就有跟多的mapTask,那driver是怎样知道各个mapTask的执行任务信息呢?那就靠我们上面的mapOutputtTrackermMasterActor啦,哈哈。map任务的状态就是有Executor向持有mapOutputtTracker-MasterActor来发送消息,把map任务状态同步到mapOutputtTracker的mapstatuses上去。问题又来了,executor怎样找到mapOutputtTrackermMasterActor呢?那就靠registe'OrLookup函数啦,它后台使用ActorSystem提供的分布式消息机制实现的。

那就来看看具体的代码吧:

(3)实例化ShuffleManager。它主要是负责管理本地和远程的block的shuffle操作。ShuffleManager通过反射机制来生成默认的SortShuffleManager。可以通过修改spark.shuffle.manager设置为hash来显示的控制使用HashShuffle-Manager。这里的sort主要是指在shuffle中的key-value,key是默认排序好的。具体代码如下:

(4)块儿传输服务BlockTransferServervice。这里默认的是NettyBlockTransferServervice。可以通过配置属性spark.shuffle.blockTransferService使用NioBlockTransferService。NettyBlockTransferServervice使用Netty提供的异步事件驱动网络应用框架,提供web服务及客户端,获取远程节点上的Block集合。代码如下:

(5)创建BlockManagerMaster。它是负责block的管理和协调,具体的操作是依赖于BlockManagerMaster-Actor,因为它需要与Executor上的BlockManager通过Actor进行通信。(这里的Endpoint相当于Actor)

(6) 创建BlockManager。他主要运行在worker节点上。虽然这里创建了,但是只有在他的init初始化函数之后才是有效的。

(7)创建广播管理器BroadcastManager。主要是负责把序列化时候的RDD,job以及shuffleDependence等,以及配置信息存储在本地,有时候还会存储到其他的节点以保持可靠性。

(8) 创建存储管理器CacheManager。他主要用于缓存RDD某个分区计算的中间结果,缓存计算结果在迭代计算的时候发生。他很有用,可以减少磁盘IO,加快执行速度。

(9) HTTP文件服务器httpFileServer。退工对jar以及其他文件的http访问。例如jar包的上传等等。他的端口号由spark.fileserver.port来配置,默认情况下是0,表示随机生成端口号。

(10)最后创建测量系统MetricsSystem,是spark的测量系统,不用深究,这里我们不管他。

上面的10条主要说的是sparkEnv的创建,执行环境也就刚刚初始化完毕,核心的核心还没开始呢。

具体的变量创建等等下篇我们再来介绍。

spark的sparkContext初始化中的sparkEnv相关的概念以及重要的组成部分在上一部分简单的介绍 ,这篇简单的介绍一下和DAGScheduler,taskSetManager,SchedulerBackend,taskScheduler等等一些和sparkContext初始化相关的概念,这里讨论的是重中之重,下面就来大体的来看看。首先看一下最重要最核心的一个片段:

在上面的一片代码中我们可以看到,首先是SparkContext.createTaskScheduler来创建SchedulerBackend以及TaskScheduler。理解清楚这两个概念极为重要。首先来看看SchedulerBackend,从源码的注释上可以知道Scheduler-Backend仅仅是后台调度系统的一个抽象接口,目前在1.5.2接口有四个实现类。TaskScheduler主要是底层人物调度接口,仅仅是有一个实现类TaskSchedulerImpl。TaskSchedulerImpl就是通过schedulerBackend来在不同的cluster上调度任务,例如可以设置变量isLocal为true来使用lLocalBackend。

createTaskScheduler方法的两个参数一个sc一个master,master主要就是用来匹配对应的运行模型,例如:

上面的这个model代表的是standalone模式,SparkDeploySchedulerBackend就是standalone的运行模式。接下来进入initlalize方法看看:

上面的这个初始化方法就是在taskSchedulerImpl下初始化的,会发现在任务的调度模式中选择了两种调度方式,先进先出和公平调度方法。

继而开始了taskscheduler的start方法,进入taskSchedluer的run方法,是一个抽象接口,抽象接口的实现task-SchedluerImpl的run方法来看看。backend就是schedulerBackend后台,那么这个start方法是有4个实现的start方法。

那么就加入loacl模式下的LocalBackend代码:

[java] view plaincopy print?
  1. private[spark] class LocalBackend(
  2. conf: SparkConf,
  3. scheduler: TaskSchedulerImpl,
  4. val totalCores: Int)
  5. extends SchedulerBackend with ExecutorBackend with Logging {
  6. private val appId = "local-" + System.currentTimeMillis
  7. var localEndpoint: RpcEndpointRef = null
  8. override def start()
  9. override def stop()
  10. override def reviveOffers()
  11. override def defaultParallelism()
  12. override def killTask(taskId: Long, executorId: String, interruptThread: Boolean)
  13. override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer)
  14. override def applicationId(): String = appId

这里还有一部分中最关键的DAGScheduler的创建相关的代码,下篇进行价绍。

sparkContext之一:sparkContext的初始化分析相关推荐

  1. SECTIONS bss段初始化分析

    SECTIONS bss段初始化分析 都知道bss段需要初始化,但是这是为什么呢? 通过浏览资料,都会发现,bss段是不会出现在程序下载文件(*.bin .hex)中的,因为全都是0.如果把出现在程序 ...

  2. linux libata初始化分析

    进来分析libata模块,颇有所感,记录如下,希望能对大家有所帮助,同时也对自己的理解进一步深入. linux版本:linux-2.6.24.3 注:因完全是个人理解,理解不当难免,恳请批评指正!!! ...

  3. celery源码分析-worker初始化分析(下)

    celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery的worker启动 在上文中分析到了Hub类的初始化,接下来继续分析Pool类的 ...

  4. celery源码分析-wroker初始化分析(上)

    celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery与Django的配合使用 首先,在安装有django的环境中创建一个django ...

  5. Service Manger的初始化分析

    在init.rc文件中,有定义serviceservicemanager /system/bin/servicemanager,在系统初始化会解析init.rc文件时,遇到这句话,将会把service ...

  6. Android编译系统环境过程初始化分析【转】

    本文转载自:http://blog.csdn.net/luoshengyang/article/details/18928789 Android源代码在编译之前,要先对编译环境进行初始化,其中最主要就 ...

  7. STM32_3 时钟初始化分析

    在startup文件中,调用了2个函数,一个是System_Init, 另一个是main. System_Init()在system_stm32f10x.c 这个文件中,先看一下时钟树,再分析一下这个 ...

  8. 基于arm64的linux kernel cpu 初始化分析

    从start_kernel开始分析 init/main.cstart_kernel|smp_setup_processor_id|setup_arch|setup_nr_cpu_ids|setup_p ...

  9. PowerPC基于vxWorks的中断初始化分析

    1. 本文简介 本文主要介绍P2020芯片中vxWorks中断初始化过程(部分讲解是以linux为例).P2020属于PPC85XX系列,内核为e500v2,它是PowerPC体系结构中主要应用于通信 ...

最新文章

  1. java 判断ocx是否存在_OCX控件的注册卸载,以及判断是否注册
  2. js insertBefore insertAfter appendChild
  3. Sublime Text 3(中文)在Windows下的配置、安装、运行
  4. 使用ANY和ALL条件
  5. 阿里云esc云服务器IP不能访问的解决办法
  6. 下载网页中的图片到本地
  7. 解剖几个有点难度的C笔试题
  8. Kubernetes API 与 Operator,不为人知的开发者战争
  9. html5中Canvas、绘制线条模糊、常见绘制工具、绘制基本图形、绘制图片、面向对象的方式绘制图形图片、绘制文本、帧动画绘制
  10. Sqoop是一款开源的工具,主要用于在HADOOP(Hive)与传统的数据库(mysql、oracle...)间进行数据的传递...
  11. 清华大学 计算机系 研究生导师,清华大学计算机科学与技术系研究生导师简介-胡事民...
  12. C++题解:百钱买百鸡数量
  13. nmap扫描服务器端口不稳定,端口扫描命令nmap
  14. adb shell使用教程+sqlite3使用示例
  15. 测试工程师进阶之测试用例发散思维(一)
  16. smartforms 黑底白字的标签logo制作
  17. Homepod评测:一款音质超棒但低智商的音箱
  18. AMESim锂离子电池包电化学机理模型
  19. 生鲜配送管理系统_升鲜宝V2.0 供应商协同系统设计思想及设计效果展现(一)...
  20. python中 math模块下 atan 和 atan2的区别

热门文章

  1. 再见,谷歌!再见,算法!
  2. 微软转向,鸿蒙拥抱,炒房团都来了,Linux 为何引各方英雄竞折腰?
  3. 中国开源正在走向成熟!
  4. 洗礼灵魂,修炼python(85)-- 知识拾遗篇 —— 深度剖析让人幽怨的编码
  5. 【Android游戏开发二十七】讲解游戏开发与项目下的hdpi 、mdpi与ldpi资源文件夹以及游戏高清版本的设置...
  6. 查找命令find的用法
  7. 转【C#调用DLL的几种方法,包括C#调用C\C++\C#DLL】
  8. [unity3d]水果忍者-界面搭建
  9. python操作Mysql基础
  10. 常用的CSS命名规则