Spark系列(八)Worker工作原理
工作原理图
源代码分析
包名:org.apache.spark.deploy.worker
启动driver入口点:registerWithMaster方法中的case LaunchDriver
1 | case LaunchDriver(driverId, driverDesc) => { |
2 | logInfo(s"Asked to launch driver $driverId") |
3 | // 创建DriverRunner对象启动Driver |
4 | val driver = new DriverRunner( |
5 | conf, |
6 | driverId, |
7 | workDir, |
8 | sparkHome, |
9 | driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)), |
10 | self, |
11 | akkaUrl) |
12 | // 将driver加入本地缓存 |
13 | drivers(driverId) = driver |
14 | driver.start() |
15 | |
16 | // 增加已使用core |
17 | coresUsed += driverDesc.cores |
18 | // 增加已使用内存 |
19 | memoryUsed += driverDesc.mem |
20 | } |
DriverRunner
管理一个driver的执行,包括失败时自动重启driver,这种方式仅仅适用于standalone集群部署模式
DriverRunner类中start方法实现
1 | def start() = { |
2 | // 创建新线程 |
3 | new Thread("DriverRunner for " + driverId) { |
4 | override def run() { |
5 | try { |
6 | // 创建driver工作目录 |
7 | val driverDir = createWorkingDirectory() |
8 | // 下载应用所需的的Jar包 |
9 | val localJarFilename = downloadUserJar(driverDir) |
10 | |
11 | def substituteVariables(argument: String): String = argument match { |
12 | case "{{WORKER_URL}}" => workerUrl |
13 | case "{{USER_JAR}}" => localJarFilename |
14 | case other => other |
15 | } |
16 | |
17 | // TODO: If we add ability to submit multiple jars they should also be added here |
18 | // 构建ProcessBuilder对象,传入启动driver命令(所需内存大小) |
19 | val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem, |
20 | sparkHome.getAbsolutePath, substituteVariables) |
21 | // 启动driver进程 |
22 | launchDriver(builder, driverDir, driverDesc.supervise) |
23 | } |
24 | catch { |
25 | case e: Exception => finalException = Some(e) |
26 | } |
27 | |
28 | // Driver退出状态处理 |
29 | val state = |
30 | if (killed) { |
31 | DriverState.KILLED |
32 | } else if (finalException.isDefined) { |
33 | DriverState.ERROR |
34 | } else { |
35 | finalExitCode match { |
36 | case Some(0) => DriverState.FINISHED |
37 | case _ => DriverState.FAILED |
38 | } |
39 | } |
40 | |
41 | finalState = Some(state) |
42 | // 向Driver所属worker发送DriverStateChanged消息 |
43 | worker ! DriverStateChanged(driverId, state, finalException) |
44 | } |
45 | }.start() |
46 | } |
LaunchExecutor
管理LaunchExecutor的启动
1 | case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => |
2 | if (masterUrl != activeMasterUrl) { |
3 | logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") |
4 | } else { |
5 | try { |
6 | logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) |
7 | |
8 | // Create the executor's working directory |
9 | // 创建executor本地工作目录 |
10 | val executorDir = new File(workDir, appId + "/" + execId) |
11 | if (!executorDir.mkdirs()) { |
12 | throw new IOException("Failed to create directory " + executorDir) |
13 | } |
14 | |
15 | // Create local dirs for the executor. These are passed to the executor via the |
16 | // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the |
17 | // application finishes. |
18 | val appLocalDirs = appDirectories.get(appId).getOrElse { |
19 | Utils.getOrCreateLocalRootDirs(conf).map { dir => |
20 | Utils.createDirectory(dir).getAbsolutePath() |
21 | }.toSeq |
22 | } |
23 | appDirectories(appId) = appLocalDirs |
24 | // 创建ExecutorRunner对象 |
25 | val manager = new ExecutorRunner( |
26 | appId, |
27 | execId, |
28 | appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)), |
29 | cores_, |
30 | memory_, |
31 | self, |
32 | workerId, |
33 | host, |
34 | webUi.boundPort, |
35 | publicAddress, |
36 | sparkHome, |
37 | executorDir, |
38 | akkaUrl, |
39 | conf, |
40 | appLocalDirs, ExecutorState.LOADING) |
41 | // executor加入本地缓存 |
42 | executors(appId + "/" + execId) = manager |
43 | manager.start() |
44 | // 增加worker已使用core |
45 | coresUsed += cores_ |
46 | // 增加worker已使用memory |
47 | memoryUsed += memory_ |
48 | // 通知master发送ExecutorStateChanged消息 |
49 | master ! ExecutorStateChanged(appId, execId, manager.state, None, None) |
50 | } |
51 | // 异常情况处理,通知master发送ExecutorStateChanged FAILED消息 |
52 | catch { |
53 | case e: Exception => { |
54 | logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e) |
55 | if (executors.contains(appId + "/" + execId)) { |
56 | executors(appId + "/" + execId).kill() |
57 | executors -= appId + "/" + execId |
58 | } |
59 | master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, |
60 | Some(e.toString), None) |
61 | } |
62 | } |
63 | } |
总结
1、Worker、Driver、Application启动后都会向Master进行注册,并缓存到Master内存数据模型中
2、完成注册后发送LaunchExecutor、LaunchDriver到Worker
3、Worker收到消息后启动executor和driver进程,并调用Worker的ExecutorStateChanged和DriverStateChanged方法
4、发送ExecutorStateChanged和DriverStateChanged消息到Master的,根据各自的状态信息进行处理,最重要的是会调用schedule方法进行资源的重新调度
转载于:https://www.cnblogs.com/jianyuan/p/Spark%E7%B3%BB%E5%88%97%E4%B9%8BWorker%E5%B7%A5%E4%BD%9C%E5%8E%9F%E7%90%86.html
Spark系列(八)Worker工作原理相关推荐
- Git使用 从入门到入土 收藏吃灰系列(四) Git工作原理
文章目录 一.前言 一.Git基本理论(核心) 1.1工作区 1.2工作流程 一.前言 参考安装Git 详细安装教程 参考视频B站 Git最新教程通俗易懂,这个有点长,感觉讲的精华不多 参考视频『Gi ...
- Git使用 从入门到入土 收藏吃灰系列 (八) 什么是分支 分支的作用
文章目录 一.前言 二.Git分支 2.1什么是分支? 2.2 分支有什么用? 一.前言 参考安装Git 详细安装教程 参考视频B站 Git最新教程通俗易懂,这个有点长,感觉讲的精华不多 参考视频『G ...
- JAVA开发运维(nginx工作原理)
nginx源码目录结构: . ├── auto 自动检测系统环境以及编译相关的脚本 │ ├── cc 关于编译器相关的编译选项的检测脚本 │ ├── lib nginx编译所需要的一些库的检测脚本 │ ...
- iommu 工作原理解析之dma remapping
深入了解iommu系列二:iommu 工作原理解析之dma remapping: https://zhuanlan.zhihu.com/p/479963917
- spark任务shell运行_大数据系列:Spark的工作原理及架构
介绍 本Apache Spark教程将说明Apache Spark的运行时架构以及主要的Spark术语,例如Apache SparkContext,Spark shell,Apache Spark应用 ...
- 深入浅出理解 Spark:环境部署与工作原理
一.Spark 概述 Spark 是 UC Berkeley AMP Lab 开源的通用分布式并行计算框架,目前已成为 Apache 软件基金会的顶级开源项目.Spark 支持多种编程语言,包括 Ja ...
- 菜鸟学Kubernetes(K8s)系列——(七)关于Kubernetes底层工作原理
菜鸟学Kubernetes(K8s)系列--(七)关于Kubernetes底层工作原理 Kubernetes系列文章 主要内容 菜鸟学Kubernetes(K8s)系列--(一)关于Pod和Names ...
- Spark工作原理入门
Spark工作原理入门 文章目录 Spark工作原理入门 1.功能概要 基本描述 运用场景 实际使用 2.模块组成 HDFS MLlib Mesos Tachyon GraphX Spark SQL ...
- 【Nginx系列】Nginx配置使用与工作原理
热门系列: [Linux系列]Linux实践(一):linux常用命令 程序人生,精彩抢先看 目录 1.Nginx介绍 1.1 什么是Nginx? 1.2 Nginx能做什么 1.3 为什么要选择用N ...
最新文章
- 创建一个水平盒子java_盒子模型理解
- 登录form php一个页面跳转页面,form表单页面跳转方式提交练习
- vue方法传值到data_Vue 组件传值几种常用方法【总结】
- Spork: Pig on Spark实现分析
- 让无线网卡同时工作在 AP 和 STA 模式
- 算数运算符与关系运算符_【Flutter 110】Flutter手把手教程Dart语言——运算符
- Html代码打包后如何修改,html代码打包封装成APP教程
- BZOJ5312 冒险 势能分析、线段树
- 如何速成java_极*Java速成教程 - (2)
- c语言一个字符串怎么做除法,c语言实数除法怎样保留小数部分
- Excel如何构建简单的透视表
- 谈谈反向代理Nginx
- 如何做客户分析?客户分析的内容有哪些?
- 2 什么是计算机网络的拓扑结构,什么是网络拓扑?
- ArcGIS中消除两幅卫星影像之间色带问题
- LeetCode:20 vaild parentless
- PPT文件不能编辑,什么情况?
- join语句的执行顺序
- codeforces E. Sum of Digits
- 危险漫步_有关2010年糖尿病漫步的详细信息和感谢
热门文章
- (easyui datagrid+mvc+json)之asp.net分页查询
- 查询数据库,处理NULL值问题
- 分布式经典书籍--深入分布式缓存 从原理到实践
- 后台开发经典书籍--高性能mysql
- c:线性表的链式表示
- es6 Set的几种使用场景
- iOS_“图片浏览选择”功能的编写思路
- django 文档生成器
- python Gevent – 高性能的Python并发框架
- 一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序...