为什么使用spark的原因是早期的编程模式MapReduce缺乏对数据共享的高效元语,会造成磁盘I/O 以及序列号等开销,spark提出了统一的编程抽象---弹性分布式数据集(RDD),该模型可以令并行计算阶段间高效地进行数据共享。spark处理数据时,会将计算转化为一个有向无环图(DAG)的任务集,RDD能够有效的恢复DAG中故障和慢节点执行的任务,并且RDD提供一种基于粗粒度变换的接口,记录创建数据集的“血统”,能够实现高效的容错性。

spark的作业和任务调度系统是其核心,它能够有效的进行调度的根本原因是因为对任务划分DAG和容错。

DAG,有向无环图,Directed Acyclic Graph的缩写,常用于建模。Spark中使用DAG对RDD的关系进行建模,描述了RDD的依赖关系,这种关系也被称之为lineage,RDD的依赖关系使用Dependency维护,参考Spark RDD之Dependency,DAG在Spark中的对应的实现为DAGScheduler。

基础概念
介绍DAGScheduler中的一些概念,有助于理解后续流程。
作业(Job)  调用RDD的一个action,如count,即触发一个Job,spark中对应实现为ActiveJob,DAGScheduler中使用集合activeJobs和jobIdToActiveJob维护Job
调度阶段(Stage )   代表一个Job的DAG,会在发生shuffle处被切分,切分后每一个部分即为一个Stage,Stage实现分为ShuffleMapStage和ResultStage,一个Job切分的结果是0个或多个ShuffleMapStage加一个ResultStage
任务(Task )   最终被发送到Executor执行的任务,和stage的ShuffleMapStage和ResultStage对应,其实现分为ShuffleMapTask和ResultTask

DAG每个节点代表啥?代表的一个RDD


1) 一个RDD生成两个RDD:

RDD2 = RDD1.filter(xxxxx)

RDD3 = RDD1.filter(yyyy)

是从RDD1到RDD2,RDD3这样的过程

2) Union是两个RDD合并成一个的过程

则是RDD2 RDD3变成RDD4的过程

3) filter/map/reduceByKey 应该都是一条直线

是从RDD4到RDD5这样的过程

上述都是transformation

RDD5.collect();  //action

RDD5.foreach();  //action

这种则会生成两个job,会顺序提交,前一个job执行结束之后才会提交下一个job(假设上述代码都在一个线程中)

(可以和上一章中Persist中的job提交对照看)

(二)、RDD依赖关系
RDD依赖关系,也就是有依赖的RDD之间的关系,比如RDD1------->RDD2(RDD1生成RDD2),RDD2依赖于RDD1。这里的生成也就是RDDtransformation操作

窄依赖(也叫narrow依赖)

从父RDD角度看:一个父RDD只被一个子RDD分区使用。父RDD的每个分区最多只能被一个Child RDD的一个分区使用

从子RDD角度看:依赖上级RDD的部分分区     精确知道依赖的上级RDD分区,会选择和自己在同一节点的上级RDD分区,没有网络IO开销,高效。如map,flatmap,filter

宽依赖(也叫shuffle依赖/wide依赖)

从父RDD角度看:一个父RDD被多个子RDD分区使用。父RDD的每个分区可以被多个Child RDD分区依赖

从子RDD角度看:依赖上级RDD的所有分区     无法精确定位依赖的上级RDD分区,相当于依赖所有分区(例如reduceByKey)  计算就涉及到节点间网络传输

父分区,都只有一根箭头                         父分区,都有多个箭头

子分区,来自部分父分区                         子分区,来自全部父分区

Spark之所以将依赖分为narrow和 shuffle:

(1) narrow dependencies可以支持在同一个集群Executor上,以pipeline管道形式顺序执行多条命令,例如在执行了map后,紧接着执行filter。分区内的计算收敛,不需要依赖所有分区的数据,可以并行地在不同节点进行计算。所以它的失败恢复也更有效,因为它只需要重新计算丢失的parent partition即可,

(2)shuffle dependencies 则需要所有的父分区都是可用的,必须等RDD的parent partition数据全部ready之后才能开始计算,可能还需要调用类似MapReduce之类的操作进行跨节点传递。从失败恢复的角度看,shuffle dependencies 牵涉RDD各级的多个parent partition。

如图所示,左边的都是右边的父分区

(三)、划分stage
由于shuffle依赖必须等RDD的parent RDD partition数据全部ready之后才能开始计算,因此spark的设计是让parent RDD将结果写在本地,完全写完之后,通知后面的RDD。后面的RDD则首先去读之前的本地数据作为input,然后进行运算。

由于上述特性,将shuffle依赖就必须分为两个阶段(stage)去做:

第一个阶段(stage)需要把结果shuffle到本地,例如reduceByKey,首先要聚合某个key的所有记录,才能进行下一步的reduce计算,这个汇聚的过程就是shuffle

第二个阶段(stage)则读入数据进行处理。

同一个stage里面的task是可以并发执行的,下一个stage要等前一个stage ready

(和mapreduce的reduce需要等map过程ready 一脉相承)

(为什么要写在本地:后面的RDD多个partition都要去读这个信息,如果放到内存,如果出现数据丢失,后面的所有步骤全部不能进行,违背了之前所说的需要parent RDD partition数据全部ready的原则。为什么要保证parent RDD要ready,如下例,如果有一个partition未生成或者在内存中丢失,那么直接导致计算结果是完全错误的:

写到文件中更加可靠。Shuffle会生成大量临时文件,以免错误时重新计算,其使用的本地磁盘目录由spark.local.dir指定,缓存到磁盘的RDD数据。最好将这个属性设定为访问速度快的本地磁盘。可以配置多个路径到多个磁盘,增加IO带宽

在Spark 1.0 以后,SPARK_LOCAL_DIRS(Standalone, Mesos) or LOCAL_DIRS (YARN)参数会覆盖这个配置。比如Spark On YARN的时候,Spark Executor的本地路径依赖于Yarn的配置,而不取决于这个参数。)

对于transformation操作,以shuffle依赖为分隔,分为不同的Stages。

窄依赖------>tasks会归并在同一个stage中,(相同节点上的task运算可以像pipeline一样顺序执行,不同节点并行计算,互不影响)

shuffle依赖------>前后拆分为两个stage,前一个stage写完文件后下一个stage才能开始

action操作------>和其他tasks会归并在同一个stage(在没有shuffle依赖的情况下,生成默认的stage,保证至少一个stage)。

(四)、小实验验证
例一:

在spark-shell里面跑小程序,然后在yarn里面观察

val rdd =  sc.parallelize(Array(1,2,3,7)) (因为分的资源是两个核,所以默认设置为两个partition)

rdd.count()

Count是一个action操作。一个action会触发一个job,Count()这个action在整个job没有stage的情况下会生成一个默认的stage

结果:一个job,一个stage,两个task(因为有两个partition)

例二:

scala> val rdd= sc.parallelize(Array("hello","spark","hello","zhangvalue"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:24scala> val pairs = rdd.map(s=>(s,1))
pairs: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[12] at map at <console>:25scala> val count = pairs.reduceByKey((a,b) => a+b)
count: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[13] at reduceByKey at <console>:25scala> count.count()
res4: Long = 3


最终这个生成一个job,因为reducebykey是shuffle依赖,所以这里划分为两个stage

parallelize和map被分在一起,为stage0,map最后进行了ShuffleWrite

reduceByKey和count()被划分到一个stage1里面了,最开始要进行shuffle read


Stage0的tasks如下图,两个partitions(两个tasks)都进行了shuffle write。两个task互相独立,并不需要依赖彼此做完或者怎样,所以他们在一个stage里面并发执行

Stage1的tasks如下:Stage1是依赖之前的stage0完成shuffle的,reduceByKey开始需要ShuffleRead stage0的计算结果

如果后面还有其他操作,这些操作是要等上面这个shuffle执行完的

reduceByKey 则在下一阶段,shuffleRead读到数据

所以根据shuffle依赖必须分为多个stage

但一个stage内部,多个task(partition)是独立并发执行的,互不打扰

Spark基础 DAG相关推荐

  1. 分布式离线计算—Spark—基础介绍

    原文作者:饥渴的小苹果 原文地址:[Spark]Spark基础教程 目录 Spark特点 Spark相对于Hadoop的优势 Spark生态系统 Spark基本概念 Spark结构设计 Spark各种 ...

  2. 【Spark】Spark基础教程知识点

    第 1 部分 Spark 基础 Spark 概述 本章介绍 Spark 的一些基本认识. Spark官方地址 一:什么是 Spark Spark 是一个快速(基于内存), 通用, 可扩展的集群计算引擎 ...

  3. Spark学习之路一——Spark基础及环境搭建

    Spark学习之路一--Spark基础及环境搭建 文章目录 一. Spark 概述 1.1 概述 1.2 优势特性 1.2.1 运行速度快 1.2.2 容易使用 1.2.3 通用性 1.2.4 运行模 ...

  4. Spark基础知识解答

    Spark基础知识解答 一. Spark基础知识 1. Spark是什么? UCBerkeley AMPlab所开源的类HadoopMapReduce的通用的并行计算框架. Spark基于mapred ...

  5. 2022年Spark基础学习笔记目录

    一.Spark学习笔记 在私有云上创建与配置虚拟机 Spark基础学习笔记01:初步了解Spark Spark基础学习笔记02:Spark运行时架构 Spark基础学习笔记03:搭建Spark单机版环 ...

  6. Spark基础学习笔记19:RDD的依赖与Stage划分

    文章目录 零.本讲学习目标 一.RDD的依赖 (一)窄依赖 1.map()与filter()算子 2.union()算子 3.join()算子 (二)宽依赖 1.groupBy()算子 2.join( ...

  7. Spark基础学习笔记16:创建RDD

    文章目录 零.本讲学习目标 一.RDD为何物 (一)RDD概念 (二)RDD示例 (三)RDD主要特征 二.做好准备工作 (一)准备文件 1.准备本地系统文件 2.准备HDFS系统文件 (二)启动Sp ...

  8. Spark基础学习笔记10:Scala集成开发环境

    文章目录 零.本讲学习目标 一.搭建Scala的Eclipse开发环境 (一)安装Scala插件 (二)创建Scala项目 二.搭建Scala的IntelliJ IDEA开发环境 (一)启动IDEA ...

  9. Spark基础学习笔记04:搭建Spark伪分布式环境

    文章目录 零.本讲学习目标 一.搭建伪分布式Hadoop (一)登录ied虚拟机 (二)配置免密登录 1.生成密钥对 2.将生成的公钥发送到本机(虚拟机ied) 3.验证虚拟机是否能免密登录本机 (三 ...

最新文章

  1. 实时音视频助力在线教育风口
  2. php中命名空间、面向对象、访问控制、接口
  3. 6-4 开班简介1
  4. 电脑一直弹出传奇游戏网页弹窗怎么办
  5. 长虹电视+刷回android,【原创教程】长虹智能电视Q3T手动升级and刷机救砖教程
  6. 2018年银行业网络金融黑产分析报告
  7. 5G到底厉害在什么地方?和4G有什么不同?
  8. LeetCode题解:707.设计链表
  9. python函数之间变量的调用
  10. 英语知识点整理day02
  11. html中斜体样式怎么写,html i 斜体标签
  12. 热乎乎的蚂蚁金服面经分享,offer已到手建议收藏(Java岗、附答案解析)
  13. 【技能树共建】Python 列表推导式
  14. 假如银行利率如下所示,请分别计算存款10000元,活期1年、活期2年、定期1年、定期2年后的本息合计。(结果四舍五入,不保留小数位。使用Math.round(double d)实现
  15. ubuntu 下语系编码转换
  16. Oracle-实现Boolean类型字段
  17. WinDbg基本使用
  18. 【VirtualAPP 双开系列07】第三方 APP Service、Provider 加载分析
  19. 复旦大学 计算机专业课,当计算机专业课与校史相遇:复旦学子“硬核学史”...
  20. LLBL Gen Pro 3.X 下使用 Template 模版绑定(一)

热门文章

  1. Hadoop pig进阶语法
  2. deepin linux 安装最新版node方法
  3. 利用python一键修改host 一键上网
  4. 为什么别人的成长叫蓝图,你的成长始终是流浪!
  5. Servlet相关技术的使用与分析
  6. 回首三年,拥报2009(四)-终结篇
  7. macOS Monterey 12.6.8 (21G725) 正式版发布,ISO、IPSW、PKG 下载
  8. typeScript(小满版本);
  9. 西部数码 php版本,西部数码云主机如何安装PHP版本
  10. 揭开邮票界《一轮生肖猴》的神秘面纱