Spark源码阅读04-Spark运行架构之Local运行模式
Local运行模式
- 基本介绍
- 运行流程图
- 运行流程详细介绍
- 实现原理
- 环境搭建及案例
基本介绍
Spark的Local运行模式又叫本地运行模式、伪分布式模式。之所以这叫本地模式是因为在该模式的Spark的所有进程都运行在本地一台机器的虚拟机中,无需任何资源管理器。它主要是用单机的多个线程来模拟Spark分布式计算,一般是用来进行测试的用途。
本地模式的标准写法是Local[N]模式,这里面的N指的是前面提到的进行多线程模拟Spark分布计算的线程数。如果没有指定N,默认是1个线程(该线程有1个core)。如果是Local[*],则代表在本地运行Spark,其工作线程数与计算机上的逻辑内核数相同。
运行流程图
本地运行模式的运行流程如下图
运行流程详细介绍
1.启动应用程序
启动应用程序即启动SparkContext对象,本阶段主要是对调度器(DAGScheduler、TaskSchedulerImpl
)和本地终端点(LocalBackend、LocalEndpoint)的初始化。
private def createTaskScheduler(sc: SparkContext,master: String,deployMode: String): (SchedulerBackend, TaskScheduler) = {... //未指定运行线程数量时以单线程模式运行,运行时启动给一个线程处理任务case "local" =>val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)//启动单线程处理任务val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)scheduler.initialize(backend)(backend, scheduler)case LOCAL_N_REGEX(threads) =>//获取运行节点可以cpu核数,当匹配字符为local[*]时,启动cpu核数得进程数量def localCpuCount: Int = Runtime.getRuntime.availableProcessors()// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.val threadCount = if (threads == "*") localCpuCount else threads.toIntif (threadCount <= 0) {throw new SparkException(s"Asked to run locally with $threadCount threads")}val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)scheduler.initialize(backend)(backend, scheduler)case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>def localCpuCount: Int = Runtime.getRuntime.availableProcessors()// local[*, M] means the number of cores on the computer with M failures// local[N, M] means exactly N threads with M failuresval threadCount = if (threads == "*") localCpuCount else threads.toIntval scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)scheduler.initialize(backend)(backend, scheduler)... }
2.执行作业,创建Executor并运行任务
对作业的执行首先为对划分调度状态,形成任务集。然后将任务集按照拆分的顺序发送给本地终端点LocalEndpoint,其在接收到任务集后,就在本地启动Executor,启动后,直接在启动的Executor上执行接收到的任务集。
private[spark] class LocalEndpoint(override val rpcEnv: RpcEnv,userClassPath: Seq[URL],scheduler: TaskSchedulerImpl,executorBackend: LocalSchedulerBackend,private val totalCores: Int)extends ThreadSafeRpcEndpoint with Logging {...
//启动executor,启动islocal为真表示本地启动private val executor = new Executor(localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true)...def reviveOffers() {val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores,Some(rpcEnv.address.hostPort)))//根据设置线程数启动相应得线程处理任务for (task <- scheduler.resourceOffers(offers).flatten) {freeCores -= scheduler.CPUS_PER_TASKexecutor.launchTask(executorBackend, task)}}
}
如果设置了多线程,则启动多个Executor并行处理任务
3.反馈任务执行状态
Executor负责执行任务,本地终端点LocalEndpoint将任务执行的状态反馈给上层的作业调度器。上层的作业调度器根据接收到的消息更新任务状态,同时根据这个反馈,实时的调整整个任务集的状态。
private[spark] class LocalEndpoint(override val rpcEnv: RpcEnv,userClassPath: Seq[URL],scheduler: TaskSchedulerImpl,executorBackend: LocalSchedulerBackend,private val totalCores: Int)extends ThreadSafeRpcEndpoint with Logging {...//任务更新case StatusUpdate(taskId, state, serializedData) =>scheduler.statusUpdate(taskId, state, serializedData)if (TaskState.isFinished(state)) {freeCores += scheduler.CPUS_PER_TASKreviveOffers()}...}
如果该任务集完成,则进行下一个任务集
4.程序运行完成,回收资源
根据反馈状态,当所有的任务集完成之后,任务这个时候也就完成了。此时上层作业调度器就注销在LocalBackend中运行的Executor,然后释放DAGScheduler、TaskScheduler和LocalBackend等进程,最后注销SparkContext,进行资源回收。
实现原理
本地运行模式下类调用关系图如下
环境搭建及案例
环境搭建及案例
Spark源码阅读04-Spark运行架构之Local运行模式相关推荐
- Spark源码阅读(五) --- Spark的支持的join方式以及join策略
版本变动 2021-08-30 增加了对Broadcast Hash Join小表大小的评估内容 增加了对Sort Merge Join优于Shuffle Hash Join调用的解释 目录 Spar ...
- Windows + IDEA + SBT 打造Spark源码阅读环境
Spark源码阅读环境的准备 Spark源码是有Scala语言写成的,目前,IDEA对Scala的支持要比eclipse要好,大多数人会选在在IDEA上完成Spark平台应用的开发.因此,Spark源 ...
- Spark源码阅读——任务提交过程
2019独角兽企业重金招聘Python工程师标准>>> Spark 源码阅读--任务提交过程 当我们在使用spark编写mr作业是,最后都要涉及到调用reduce,foreach或者 ...
- 3000门徒内部训练绝密视频(泄密版)第5课:彻底精通Scala隐式转换和并发编程及Spark源码阅读
彻底精通Scala隐式转换和并发编程及Spark源码阅读 Akka ,Scala内部并发 隐式转换.隐式类.隐式参数 可以手动指定某种类型的对象或类转换成其他类型的对象或类.转换的原因是假设写好接口 ...
- 3000门徒内部训练绝密视频(泄密版)第2课:Scala面向对象彻底精通及Spark源码阅读
Scala面向对象彻底精通及Spark源码阅读 不用写public class中的public class Person {private var myName = "flink" ...
- 3000门徒内部训练绝密视频(泄密版)第3课:Scala中函数式编程彻底精通及Spark源码阅读
Scala中函数式编程彻底精通及Spark源码阅读 函数可以不依赖于类,函数可以作为函数的参数,函数可以作为函数的返回值 =>表明对左面的参数进行右面的加工 函数赋值给变量需要在函数名后面加空格 ...
- spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClientFactory剖析
spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClientFactory剖析 TransportContext 首先官方文档对Transpor ...
- Spark源码阅读——DirectInputDStream
2019独角兽企业重金招聘Python工程师标准>>> Spark源码分析--DirectInputDStream 在Spark-Streaming中,对流的抽象是使用DStream ...
- spark源码阅读总纲
spark使用了这么长时间,对于driver.master.worker.BlockManage.RDD.DAGScheduler.TaskScheduler这些概念或多或少都了解一些,但是对于其任务 ...
- [以浪为码]Spark源码阅读03 - 序列化介绍 serializer
版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/u013054888/article/details/90237348 系列文章专栏目录:小浪阅读 S ...
最新文章
- centOS防火墙中端口的开启和关闭
- 基于vue的颜色选择器vue-color-picker
- Part 2 — Making Sense of Smart Contracts
- alter id order by_声卡id查找表
- 千字谏言!Python入门:这两点绝对不能偷懒!否则工作后必后悔
- 计算机专业师资描述,计算机专业师资队伍建设6主持建设优质核心课或教科研课题相关材料.doc...
- 基于大中台小前台模式设计高并发电商架构
- jquery.text()和.html()的原理
- golang ide 环境搭建_新手引导 — Golang后端开发环境搭建
- 插个“USB”就能无线投影,DispalyTen想借传屏切入企业级会议市场
- Spring colud gateway 源码小计
- SpringMVC-狂神笔记
- 【翻译】智能制造中EDA 应用及益处系列之四:精密故障检测与分类(FDC)
- python实时股票数据折线图_股票分笔成交数据导出,python实时股票数据
- sqlite3错误原因
- 《深入理解计算机系统》学习记录
- 将mysql语句转换为sql_数据库-转换sql语句
- (一)大型电商详情页亿级缓存架构简介
- websocket自动重连
- 磁盘黑色未分配区域恢复成绿色逻辑分区