Spark源码分析之Worker
Spark支持三种模式的部署:YARN、Standalone以及Mesos。 Worker节点是Spark的工作节点,用于执行提交的作业。我们先从Worker节点的启动开始介绍。
Spark中Worker的启动有多种方式,但是最终调用的都是org.apache.spark.deploy.worker.Worker类,启动Worker节点的时候可以传很多的参数:内存、核、工作目录等。如果你不知道如何传递,没关系,help一下即可:
[wyp @iteblog spark]$ ./bin/spark- class org.apache.spark.deploy.worker.Worker -h
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Usage: Worker [options] <master>
Master must be a URL of the form spark: //hostname:port
Options:
-c CORES, --cores CORES Number of cores to use
-m MEM, --memory MEM Amount of memory to use (e.g. 1000M, 2G)
-d DIR, --work-dir DIR Directory to run apps in ( default : SPARK_HOME/work)
-i HOST, --ip IP Hostname to listen on (deprecated, please use --host or -h)
-h HOST, --host HOST Hostname to listen on
-p PORT, --port PORT Port to listen on ( default : random)
--webui-port PORT Port for web UI ( default : 8081 )
|
从上面的输出我们可以看出Worker的启动支持多达7个参数!这样每个都这样输入岂不是很麻烦?其实,我们不用担心,Worker节点启动地时候将先读取conf/spark-env.sh里面的配置,这些参数配置的解析都是由Worker中的WorkerArguments类进行解析的。如果你没有设置内存,那么将会把Worker启动所在机器的所有内存(会预先留下1G内存给操作系统)分给Worker,具体的代码实现如下:
def inferDefaultMemory() : Int = {
val ibmVendor = System.getProperty( "java.vendor" ).contains( "IBM" )
var totalMb = 0
try {
val bean = ManagementFactory.getOperatingSystemMXBean()
if (ibmVendor) {
val beanClass = Class.forName( "com.ibm.lang.management.OperatingSystemMXBean" )
val method = beanClass.getDeclaredMethod( "getTotalPhysicalMemory" )
totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024 ).toInt
} else {
val beanClass = Class.forName( "com.sun.management.OperatingSystemMXBean" )
val method = beanClass.getDeclaredMethod( "getTotalPhysicalMemorySize" )
totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024 ).toInt
}
} catch {
case e : Exception = > {
totalMb = 2 * 1024
System.out.println( "Failed to get total physical memory. Using " + totalMb + " MB" )
}
}
// Leave out 1 GB for the operating system, but don't return a negative memory size
math.max(totalMb - 1024 , 512 )
}
|
同样,如果你没设置cores,那么Spark将会获取你机器的所有可用的核作为参数传进去。解析完参数之后,将运行preStart函数,进行一些启动相关的操作,比如判断是否已经向Master注册过,创建工作目录,启动Worker的WEB UI,向Master进行注册等操作,如下:
override def preStart() {
assert(!registered)
logInfo( "Starting Spark worker %s:%d with %d cores, %s RAM" .format(
host, port, cores, Utils.megabytesToString(memory)))
logInfo( "Spark home: " + sparkHome)
createWorkDir()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi = new WorkerWebUI( this , workDir, Some(webUiPort))
webUi.bind()
registerWithMaster()
metricsSystem.registerSource(workerSource)
metricsSystem.start()
}
|
Worker向Master注册的超时时间为20秒,如果在这20秒内没有成功地向Master注册,那么将会进行重试,重试的次数为3,如过重试的次数大于等于3,那么将无法启动Worker,这时候,你就该看看你的网络环境或者你的Master是否存在问题了。
Worker在运行的过程中将会触发许多的事件, 比如:RegisteredWorker、SendHeartbeat、WorkDirCleanup以及MasterChanged等等,收到不同的事件,Worker进行不同的操作。比如,如果需要运行一个作业,Worker将会启动一个或多个ExecutorRunner,具体的代码可参见receiveWithLogging函数:
override def receiveWithLogging = {
case RegisteredWorker(masterUrl, masterWebUiUrl) = >
case SendHeartbeat = >
case WorkDirCleanup = >
case MasterChanged(masterUrl, masterWebUiUrl) = >
case Heartbeat = >
case RegisterWorkerFailed(message) = >
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores _ , memory _ ) = >
case ExecutorStateChanged(appId, execId, state, message, exitStatus) = >
case KillExecutor(masterUrl, appId, execId) = >
case LaunchDriver(driverId, driverDesc) = > {
case KillDriver(driverId) = > {
case DriverStateChanged(driverId, state, exception) = > {
case x : DisassociatedEvent if x.remoteAddress == masterAddress = >
case RequestWorkerState = > {
}
|
上面的代码是经过处理的,其实receiveWithLogging 方法是从ActorLogReceive继承下来的。
当Worker节点Stop的时候,将会执行postStop函数,如下:
override def postStop() {
metricsSystem.report()
registrationRetryTimer.foreach( _ .cancel())
executors.values.foreach( _ .kill())
drivers.values.foreach( _ .kill())
webUi.stop()
metricsSystem.stop()
}
|
杀掉所有还未执行完的executors、drivers等,操作。这方法也是从Actor继承下来的。
Spark源码分析之Worker相关推荐
- Spark源码分析之Worker启动通信机制
Worker是spark的工作节点,主要负责接受Master指令,启动或者杀掉Executor,Driver等;汇报Driver或者Executor状态到Master;发送心跳请求到Master等等 ...
- Spark源码分析之七:Task运行(一)
在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在T ...
- spark 源码分析之十九 -- DAG的生成和Stage的划分
上篇文章 spark 源码分析之十八 -- Spark存储体系剖析 重点剖析了 Spark的存储体系.从本篇文章开始,剖析Spark作业的调度和计算体系. 在说DAG之前,先简单说一下RDD. 对RD ...
- Spark 源码分析
2019独角兽企业重金招聘Python工程师标准>>> 一. 启动篇 (一) 引子 在spark-shell终端执行 val arr = Array(1,2,3,4) val rdd ...
- Spark源码分析之九:内存管理模型
Spark是现在很流行的一个基于内存的分布式计算框架,既然是基于内存,那么自然而然的,内存的管理就是Spark存储管理的重中之重了.那么,Spark究竟采用什么样的内存管理模型呢?本文就为大家揭开Sp ...
- spark 源码分析 Blockmanager
原文链接 参考, Spark源码分析之-Storage模块 对于storage, 为何Spark需要storage模块?为了cache RDD Spark的特点就是可以将RDD cache在memo ...
- Spark源码分析 – DAGScheduler
DAGScheduler的架构其实非常简单, 1. eventQueue, 所有需要DAGScheduler处理的事情都需要往eventQueue中发送event 2. eventLoop Threa ...
- spark 源码分析之十八 -- Spark存储体系剖析
本篇文章主要剖析BlockManager相关的类以及总结Spark底层存储体系. 总述 先看 BlockManager相关类之间的关系如下: 我们从NettyRpcEnv 开始,做一下简单说明. Ne ...
- spark 源码分析之二十 -- Stage的提交
引言 上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分. 本篇文章主要剖析,Stage是如何提交的. rdd的依赖 ...
最新文章
- 1-NET UX1000-实战-配置-Lync Server 2010-集成
- 一生都要Debug,我们最需要掌握哪些硬技能?
- PKI/CA (4)根CA信任模型“概述”
- RabbitMQ路由模式
- C# 并行运算方法简析
- 7-19 求链式线性表的倒数第K项 (20 分)(思路分析+极简代码+超容易理解)
- P3449-[POI2006]PAL-Palindromes【结论题,字符串hash】
- 【博弈论】【SG函数】bzoj1777 [Usaco2010 Hol]rocks 石头木头
- 值得电商美工借鉴的购物APP页面设计,让人无法自拔
- 确认无疑,.NET 6是迄今为止最快的.NET
- echo 在shell及脚本中显示色彩及闪烁警告效果
- SpringBoot日志logback-spring.xml分环境
- 电子秤PCBA方案的功能及设计
- 线性代数之——矩阵乘法和逆矩阵
- 爬虫项目5[爬取拉钩网招聘数据]
- 你知道吗?一个比房地产更大的超级泡沫正风靡全国!
- 广东省清远市谷歌卫星地图下载
- 2021年安徽省职业院校技能大赛网络搭建与应用竞赛
- 人生25句最美丽的名句
- 开课吧JAVAEE学习首周感受
热门文章
- eclipse怎么导出一个Java项目(莫要错过,最详细教程!)
- java 捕获特定异常_java – 使用特定消息捕获异常
- mac VMware Fusion 虚拟机键盘可以使用,鼠标无法使用排查思路及解决方法
- 教室工资管理系统c语言课程设计csdn,工资管理系统(C编写)
- 均匀白噪声的定义及特点_噪声的物理本质是什么?
- php5.5 集成环境,windows下配置php5.5开发环境及开发扩展_PHP
- python 3.6.0新语法_详解Python3.6正式版新特性
- java servlet获取url参数_Java Servlet如何获取请求的参数值?
- java程序设计基础29_java程序设计基础实验29
- c语言课程设计平时成绩,计算中心