版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45728173

一、应用执行机制

一个应用的生命周期即,用户提交自定义的作业之后,Spark框架进行处理的一系列过程。

在这个过程中,不同的时间段里,应用会被拆分为不同的形态来执行。

1、应用执行过程中的基本组件和形态

Driver:
运行在客户端或者集群中,执行Application的main方法并创建SparkContext,调控整个应用的执行。

Application:
用户自定义并提交的Spark程序。

Job:
一个Application可以包含多个Job,每个Job由Action操作触发。

Stage:
比Job更小的单位,一个Job会根据RDD之间的依赖关系被划分为多个Stage,每个Stage中只存有RDD之间的窄依赖,即Transformation算子。

TaskSet:
每个Stage中包含的一组相同的Task。

Task:
最后被分发到Executor中执行的具体任务,执行Stage中包含的算子。

明确了一个应用的生命周期中会有哪些组件参与之后,再来看看用户是怎么提交Spark程序的。

2、应用的两种提交方式

Driver进程运行在客户端(Client模式):

即用户在客户端直接运行程序。
程序的提交过程大致会经过以下阶段:

  1. 用户运行程序。
  2. 启动Driver进行(包括DriverRunner和SchedulerBackend),并向集群的Master注册。
  3. Driver在客户端初始化DAGScheduler等组件。
  4. Woker节点向Master节点注册并启动Executor(包括ExecutorRunner和ExecutorBackend)。
  5. ExecutorBackend启动后,向Driver内部的SchedulerBackend注册,使得Driver可以找到计算节点。
  6. Driver中的DAGScheduler解析RDD生成Stage等操作。
  7. Driver将Task分配到各个Executor中并行执行。

Driver进程运行在集群中(某个Worker节点,Cluster模式):

即用户将Spark程序提交给Master分配执行。
大致会经过一下流程:

  1. 用户启动客户端,提交Spark程序给Master。
  2. Master针对每个应用分发给指定的Worker启动Driver进行。
  3. Worker收到命令之后启动Driver进程(即DriverRunner和其中的SchedulerBackend),并向Master注册。
  4. Master指定其他Worker启动Executor(即ExecutorRunner和其内部的ExecutorBackend)。
  5. ExecutorBackend向Driver中的SchedulerBackend注册。
  6. Driver中的DAGScheduler解析RDD生产Stage等。
  7. Executor内部启动线程池并行化执行Task。

可以看到,两种程序的提交方式在处理过程中,仅仅是在哪个地方启动Driver进程的区别而已。
为Client模式中时(使用Spark Shell直接执行的程序),Driver就在客户端上。
为Cluster模式时(提交Spark程序到Master),Driver运行与集群中的某个Worker节点。

二、调度与任务分配模块

Spark框架就像一个操作系统一样,有着自己的作业调度策略,当集群运行在不同的模式下,调度不同级别的单位,使用的策略也是有所不同的。

1、Application之间的调度

当有多个用户提交多个Spark程序时,Spark是如何调度这些应用并合理地分配资源呢?

Standalone模式下,默认使用FIFO,每个app会独占所有资源

可以通过以下几个参数调整集群相关的资源:

  • spark.cores.max:调整app可以在整个集群中申请的CPU core数量
  • spark.deploy.defaultCores:默认的CPU core数量
  • spark.executor.memory:限制每个Executor可用的内存

在Mesos模式下,可以使用

  • spark.mesos.coarse=true设置静态配置资源的策略
  • 使用mesos://URL且不配置spark.mesos.coarse=true(每个app会有独立固定的内存分配,空闲时其他机器可以使用其资源)

在Yarn模式下,提交作业时可以使用

  • 通过–num-executors控制分配多少个Executor给app
  • –executor-memory和–executor-cores分别控制Executor的内存和CPU core

2、Application内部的Job调度机制

一个Application中,由各个Action触发的多个Job之间也是存在调度关系的。

Action操作实现上是调用了SparkContext的runJob方法提交Job。

Spark中调度Job有两种策略

FIFO:

  • 第一个Job分配其所需的所有资源
  • 第二个Job如果还有剩余资源的话就分配,否则等待

FAIR:

  • 使用轮询的方式调度Job

可以通过配置spark.scheduler.mode调整Job的调度方式

另外也可以配置调度池,具体参考官方文档
或者参考conf/fairscheduler.xml.template文件。

3、Job中的Stage调度

Stage是由DAGScheduler组件生产的,在源码中,有三个比较特殊的变量:

  • waitingStages:存储等待执行的Stages
  • runningStages:存储正在执行的Stages
  • failedStages:存储执行失败的Stage

Spark会通过广度优先遍历找到最开始的Stage执行,若有父Stage没有执行完则等待。

4、Stage中的Task调度

暂未了解。。。

三、I/O制度

Spark虽然是基于内存计算的框架,但是不可避免的也会接触到一些存储层,那么在和存储层交互的时候,Spark做了哪些工作?

1、序列化

序列化的本质就是将对象转换为字节流,可以理解为将链表中存储的非连续空间的数据存储转化为连续空间存储的数组中

Spark为什么要做序列化操作?

内存或磁盘中RDD会含有对象的存储,而在节点间数据的传输时,序列化之后的数据可以节约空间和提高效率。

2、压缩

压缩是日常生活中的一个常见操作,好处显而易见,节约空间,从而就可以获得时间上的效率。

Spark中序列化之后的数据可以进行压缩以减少空间开销。
Spark支持两种压缩算法

  • Snappy算法:高压缩速度
  • LZF算法:高压缩比

在不同的场景中选择不同的压缩算法可以有效的提高程序运行的效率。

压缩配置方式:

  1. 启动前在spark-env.sh中设置:export SPARK_JAVA_OPTS=”-Dspark.broadcast.compress”
  2. 在应用程序中配置
    conf.getBoolean(“spark.broadcast.compress,true”)
    conf.set(“spark.broadcast.compress”,true)

3、块管理

RDD从物理上看是一个元数据结构,记录着Block和Node之间的映射关系。

存储RDD是以Block块为单位的,每个分区对应一个块,PartitionID通过元数据信息可以映射到Block。

BlockManager管理和接口、块读写流程、数据块读写管理等细节待继续深入了解。

四、通信模块

Spark中使用Akka作为通信框架

  • Actors是一组包含状态和行为的对象
  • 一个Actor接收到其他Actor的信息之后可以根据需求做出各种反应
  • Client、Master、Worker等都是一个Actor

Spark各个组件的之间协调工作都是基于Akka机制来的,待深入了解的有:

  • Client Actor通信代码逻辑
  • Master Actor通信代码逻辑
  • Worker Actor消息处理逻辑

五、容错机制

之前讲过,RDD之间的算子操作会形成DAG图,RDD之间的依赖关系会形成Lineage。

要理解Lineage机制首先要明确两种依赖的概念:

  • Shuffle Dependencies(宽依赖)
    父分区可以被多个子分区所用
    即多对多的关系

  • Narrow Dependencies(窄依赖)
    父分区最多被一个子分区所用
    即一对一或者多对一的关系

当出现某个节点计算错误的时候,会顺着RDD的操作顺序往回走

一旦是Narrow Dependencies错误,重新计算父RDD分区即可,因为其不依赖其他节点

而如果Shuffle Dependencies错误,重算代价较高,因为一旦重新计算其依赖的父RDD分区,会造成冗余计算

这时候就需要人为的添加检查点来提高容错机制的执行效率

什么情况下需要加CheckPoint

  • DAG中的Lineage过长,如果重算开销太大,故在特定几个Shuffle Dependencies上做CheckPoint是有价值的。
  • Checkpoint会产生磁盘开销,因为其就是将数据持久化到磁盘中,所以做检查点的RDD最好是已经在内存中缓存了。

六、Shuffle机制

Shuffle的定义:对无规则的数据进行重组排序等过程

为什么要Shuffle:分布式计算中数据是分布在各个节点上计算的,而汇总统计等操作需要在所有数据上执行

Spark中Shuffle经历的阶段:

Shuffle Write将各个节点数据写入到指定分区1、根据下一个Stage分区数分成相应的Bucket2、将Bucket写入磁盘
Shuffle Fetch获取各个分区发送的数据1、在存储有Shuffle数据节点的磁盘Fetch需要的数据2、Fetch到本地之后进行自定义的聚集函数操作

最后记录一下提交Spark作业的方法
在spark的bin目录下
执行spark-submit脚本
./spark-submit \
–class 入口函数所在的类名全称 \
–master spark master节点的地址(默认端口7077)\
–executor-memory 指定worker中Executor的内存 \
–total-executor-cores 100 \
jar文件所在的目录 \

Spark(四) -- Spark工作机制相关推荐

  1. Spark Streaming的工作机制

    1. Spark Streaming的工作机制 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理. 支持从多种数据源获取数据,包括K ...

  2. Spark Streaming 实战案例(四) Spark Streaming 缓存、Checkpoint机制

    主要内容 Spark Stream 缓存 Checkpoint 案例 1. Spark Stream 缓存 通过前面一系列的课程介绍,我们知道DStream是由一系列的RDD构成的,它同一般的RDD一 ...

  3. Spark SQL架构工作原理及流程解析

    Spark SQL架构工作原理及流程解析,spark sql从shark发展而来,Shark为了实现Hive兼容,在HQL方面重用了Hive中HQL的解析.逻辑执行计划翻译.执行计划优化等逻辑. Sp ...

  4. 详细解释什么是hadoop和spark,及其工作原理

    目录 一.hadoop一些知识点: 1.HDFS知识: 1.1 HDFS工作机制 1.2 HDFS 写数据流程 1.3 HDFS 读数据流程 2.MapReduce原理 2.1 什么是 MapRedu ...

  5. Spark系列之Spark应用程序运行机制

    声明:         文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除.感谢.转载请注明出处,感谢. By luoye ...

  6. 第35课: 打通Spark系统运行内幕机制循环流程

    第35课: 打通Spark系统运行内幕机制循环流程 Spark通过DAGScheduler面向整个Job划分出了不同的Stage,划分Stage之后,Stage从后往前划分,执行的时候从前往后执行,每 ...

  7. 一、Spark内核之运行机制

    Spark内核系列目录 一.Spark内核的运行机制 二.Spark内核的通讯架构 文章目录 Spark内核系列目录 前言 一.Spark核心组件 1.Driver 2. Executor 3. Sp ...

  8. Spark中的checkpoint机制

    一.Spark Core中的checkpoint def main(args: Array[String]) {val spark = SparkSession.builder().appName(& ...

  9. 2021年大数据HBase(十四):HBase的原理及其相关的工作机制

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 HBase的原理及其相关的工作机制 一.HBase的flus ...

  10. sparkstreaming监听hdfs目录如何终止_Spark笔试题:Spark Streaming 反压机制

    Spark Streaming 反压机制是1.5版本推出的特性,用来解决处理速度比摄入速度慢的情况,简单来讲就是做流量控制.当批处理时间(Batch Processing Time)大于批次间隔(Ba ...

最新文章

  1. 取代Python多进程!伯克利开源分布式框架Ray
  2. 自学HTML5第三节(拖放效果)
  3. UART串口通信浅谈之(二)--寄存器设置
  4. javaone_JavaOne 2012:101种改进Java的方法-开发人员参与为何如此重要
  5. python联科_联科集团携手阿里云发布科研混合云平台 共建科研教育新生态
  6. mysql中起飞到达城市查询_让mysql慢慢起飞 - 初识慢日志
  7. JavaScript学习(八十五)—数据类型的转换
  8. IntelliJ IDEA提示忽略大小写
  9. zookeeper之理论基础
  10. 数据库概述 数据库入门
  11. SSDP协议内容解析
  12. Windows进行远程桌面连接后如何彻底删除远程记录
  13. Office word中去掉首页的页眉
  14. 如何查找电脑的MAC地址?(上)
  15. Klayout入门(1)基本图形绘制
  16. 华为确定发布鸿蒙的时间了吗,Mate40系列首发,华为鸿蒙OS手机版发布时间确定...
  17. html 设置图片显示比例,css巧妙设置等比例图片显示
  18. bzoj2754JZOJ2834【SCOI2012】喵星球上的点名 AC自动机+STL
  19. cad线性标注命令_CAD尺寸标注命令
  20. 推荐一款超级好用的AI模型训练平台——Tesra超算网络!

热门文章

  1. 安装Oracle Database 11g 找不到文件“WFMLRSVCApp.ear” .
  2. [BZOJ1079][SCOI2008]着色方案 dp
  3. CM记录-选择合适的硬件
  4. python查看删除你微信的账号
  5. Android Fragment详解(二):Fragment创建及其生命周期
  6. 《皇帝:中国的崛起》从入门到精通
  7. 全国计算机一级office2010,2018年全国计算机等级考试一级office2010真题一套(中档题)...
  8. c#json对象转数组_数组和对象的区别
  9. 远程计算机管理权限,肿么获得远程计算机管理员权限
  10. spark sql 上个月_SPARK-SQL内置函数之时间日期类