Local运行模式

  • 基本介绍
  • 运行流程图
    • 运行流程详细介绍
  • 实现原理
  • 环境搭建及案例

基本介绍

Spark的Local运行模式又叫本地运行模式、伪分布式模式。之所以这叫本地模式是因为在该模式的Spark的所有进程都运行在本地一台机器的虚拟机中,无需任何资源管理器。它主要是用单机的多个线程来模拟Spark分布式计算,一般是用来进行测试的用途。

本地模式的标准写法是Local[N]模式,这里面的N指的是前面提到的进行多线程模拟Spark分布计算的线程数。如果没有指定N,默认是1个线程(该线程有1个core)。如果是Local[*],则代表在本地运行Spark,其工作线程数与计算机上的逻辑内核数相同。

运行流程图

本地运行模式的运行流程如下图

运行流程详细介绍

1.启动应用程序

启动应用程序即启动SparkContext对象,本阶段主要是对调度器(DAGScheduler、TaskSchedulerImpl
)和本地终端点(LocalBackend、LocalEndpoint)的初始化。

 private def createTaskScheduler(sc: SparkContext,master: String,deployMode: String): (SchedulerBackend, TaskScheduler) = {... //未指定运行线程数量时以单线程模式运行,运行时启动给一个线程处理任务case "local" =>val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)//启动单线程处理任务val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)scheduler.initialize(backend)(backend, scheduler)case LOCAL_N_REGEX(threads) =>//获取运行节点可以cpu核数,当匹配字符为local[*]时,启动cpu核数得进程数量def localCpuCount: Int = Runtime.getRuntime.availableProcessors()// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.val threadCount = if (threads == "*") localCpuCount else threads.toIntif (threadCount <= 0) {throw new SparkException(s"Asked to run locally with $threadCount threads")}val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)scheduler.initialize(backend)(backend, scheduler)case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>def localCpuCount: Int = Runtime.getRuntime.availableProcessors()// local[*, M] means the number of cores on the computer with M failures// local[N, M] means exactly N threads with M failuresval threadCount = if (threads == "*") localCpuCount else threads.toIntval scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)scheduler.initialize(backend)(backend, scheduler)... }

2.执行作业,创建Executor并运行任务

对作业的执行首先为对划分调度状态,形成任务集。然后将任务集按照拆分的顺序发送给本地终端点LocalEndpoint,其在接收到任务集后,就在本地启动Executor,启动后,直接在启动的Executor上执行接收到的任务集。

private[spark] class LocalEndpoint(override val rpcEnv: RpcEnv,userClassPath: Seq[URL],scheduler: TaskSchedulerImpl,executorBackend: LocalSchedulerBackend,private val totalCores: Int)extends ThreadSafeRpcEndpoint with Logging {...
//启动executor,启动islocal为真表示本地启动private val executor = new Executor(localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true)...def reviveOffers() {val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores,Some(rpcEnv.address.hostPort)))//根据设置线程数启动相应得线程处理任务for (task <- scheduler.resourceOffers(offers).flatten) {freeCores -= scheduler.CPUS_PER_TASKexecutor.launchTask(executorBackend, task)}}
}

如果设置了多线程,则启动多个Executor并行处理任务

3.反馈任务执行状态

Executor负责执行任务,本地终端点LocalEndpoint将任务执行的状态反馈给上层的作业调度器。上层的作业调度器根据接收到的消息更新任务状态,同时根据这个反馈,实时的调整整个任务集的状态。

 private[spark] class LocalEndpoint(override val rpcEnv: RpcEnv,userClassPath: Seq[URL],scheduler: TaskSchedulerImpl,executorBackend: LocalSchedulerBackend,private val totalCores: Int)extends ThreadSafeRpcEndpoint with Logging {...//任务更新case StatusUpdate(taskId, state, serializedData) =>scheduler.statusUpdate(taskId, state, serializedData)if (TaskState.isFinished(state)) {freeCores += scheduler.CPUS_PER_TASKreviveOffers()}...}

如果该任务集完成,则进行下一个任务集

4.程序运行完成,回收资源

根据反馈状态,当所有的任务集完成之后,任务这个时候也就完成了。此时上层作业调度器就注销在LocalBackend中运行的Executor,然后释放DAGScheduler、TaskScheduler和LocalBackend等进程,最后注销SparkContext,进行资源回收。

实现原理

本地运行模式下类调用关系图如下

环境搭建及案例

环境搭建及案例

Spark源码阅读04-Spark运行架构之Local运行模式相关推荐

  1. Spark源码阅读(五) --- Spark的支持的join方式以及join策略

    版本变动 2021-08-30 增加了对Broadcast Hash Join小表大小的评估内容 增加了对Sort Merge Join优于Shuffle Hash Join调用的解释 目录 Spar ...

  2. Windows + IDEA + SBT 打造Spark源码阅读环境

    Spark源码阅读环境的准备 Spark源码是有Scala语言写成的,目前,IDEA对Scala的支持要比eclipse要好,大多数人会选在在IDEA上完成Spark平台应用的开发.因此,Spark源 ...

  3. Spark源码阅读——任务提交过程

    2019独角兽企业重金招聘Python工程师标准>>> Spark 源码阅读--任务提交过程 当我们在使用spark编写mr作业是,最后都要涉及到调用reduce,foreach或者 ...

  4. 3000门徒内部训练绝密视频(泄密版)第5课:彻底精通Scala隐式转换和并发编程及Spark源码阅读

    彻底精通Scala隐式转换和并发编程及Spark源码阅读 Akka ,Scala内部并发 隐式转换.隐式类.隐式参数 可以手动指定某种类型的对象或类转换成其他类型的对象或类.转换的原因是假设写好接口 ...

  5. 3000门徒内部训练绝密视频(泄密版)第2课:Scala面向对象彻底精通及Spark源码阅读

    Scala面向对象彻底精通及Spark源码阅读 不用写public class中的public class Person {private var myName = "flink" ...

  6. 3000门徒内部训练绝密视频(泄密版)第3课:Scala中函数式编程彻底精通及Spark源码阅读

    Scala中函数式编程彻底精通及Spark源码阅读 函数可以不依赖于类,函数可以作为函数的参数,函数可以作为函数的返回值 =>表明对左面的参数进行右面的加工 函数赋值给变量需要在函数名后面加空格 ...

  7. spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClientFactory剖析

    spark 源码分析之八--Spark RPC剖析之TransportContext和TransportClientFactory剖析 TransportContext 首先官方文档对Transpor ...

  8. Spark源码阅读——DirectInputDStream

    2019独角兽企业重金招聘Python工程师标准>>> Spark源码分析--DirectInputDStream 在Spark-Streaming中,对流的抽象是使用DStream ...

  9. spark源码阅读总纲

    spark使用了这么长时间,对于driver.master.worker.BlockManage.RDD.DAGScheduler.TaskScheduler这些概念或多或少都了解一些,但是对于其任务 ...

  10. [以浪为码]Spark源码阅读03 - 序列化介绍 serializer

    版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/u013054888/article/details/90237348 系列文章专栏目录:小浪阅读 S ...

最新文章

  1. centOS防火墙中端口的开启和关闭
  2. 基于vue的颜色选择器vue-color-picker
  3. Part 2 — Making Sense of Smart Contracts
  4. alter id order by_声卡id查找表
  5. 千字谏言!Python入门:这两点绝对不能偷懒!否则工作后必后悔
  6. 计算机专业师资描述,计算机专业师资队伍建设6主持建设优质核心课或教科研课题相关材料.doc...
  7. 基于大中台小前台模式设计高并发电商架构
  8. jquery.text()和.html()的原理
  9. golang ide 环境搭建_新手引导 — Golang后端开发环境搭建
  10. 插个“USB”就能无线投影,DispalyTen想借传屏切入企业级会议市场
  11. Spring colud gateway 源码小计
  12. SpringMVC-狂神笔记
  13. 【翻译】智能制造中EDA 应用及益处系列之四:精密故障检测与分类(FDC)
  14. python实时股票数据折线图_股票分笔成交数据导出,python实时股票数据
  15. sqlite3错误原因
  16. 《深入理解计算机系统》学习记录
  17. 将mysql语句转换为sql_数据库-转换sql语句
  18. (一)大型电商详情页亿级缓存架构简介
  19. websocket自动重连
  20. 磁盘黑色未分配区域恢复成绿色逻辑分区

热门文章

  1. 如何生成存储器配置文件?
  2. 【 C 】动态内存分配案例分析
  3. [java手把手教程][第二季]java后端博客系统文章系统——No10
  4. SQL SERVER的统计信息
  5. Docker - Tips
  6. Angular中ngCookies模块介绍
  7. (笔试题)不用除法操作符,实现两个整数的除法
  8. Fedora 17 下安装codeblocks
  9. 查看mysql数据库大小、表大小和最后修改时间
  10. 最强杀毒软件NOD32免费升级ID(保持最新)