1、何为Spark中的宽依赖和窄依赖

1.1、官方源码解释

1.1.1、NarrowDependency(窄依赖)

/*** :: DeveloperApi ::* Base class for dependencies where each partition of the child RDD depends on a small number* of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.*/
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {/*** Get the parent partitions for a child partition.* @param partitionId a partition of the child RDD* @return the partitions of the parent RDD that the child partition depends upon*/def getParents(partitionId: Int): Seq[Int]override def rdd: RDD[T] = _rdd
}

如上源码注释所讲,这些最基本的依赖关系,其中子RDD的每个分区依赖于父RDD的少量分区,并且窄依赖允许通过一个管道来执行。

总结来说:父RDD的一个分区只会被子RDD的1个分区所依赖,并且其中是不会产生shuffle过程的。

下图可以帮助理解

1.1.2、ShuffleDependency(宽依赖)

/*** :: DeveloperApi ::* Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle,* the RDD is transient since we don't need it on the executor side.** @param _rdd the parent RDD* @param partitioner partitioner used to partition the shuffle output* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set*                   explicitly then the default serializer, as specified by `spark.serializer`*                   config option, will be used.* @param keyOrdering key ordering for RDD's shuffles* @param aggregator map/reduce-side aggregator for RDD's shuffle* @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)*/
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](@transient private val _rdd: RDD[_ <: Product2[K, V]],val partitioner: Partitioner,val serializer: Serializer = SparkEnv.get.serializer,val keyOrdering: Option[Ordering[K]] = None,val aggregator: Option[Aggregator[K, V, C]] = None,val mapSideCombine: Boolean = false)extends Dependency[Product2[K, V]] {if (mapSideCombine) {require(aggregator.isDefined, "Map-side combine without Aggregator specified!")}override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getNameprivate[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName// Note: It's possible that the combiner class tag is null, if the combineByKey// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.private[spark] val combinerClassName: Option[String] =Option(reflect.classTag[C]).map(_.runtimeClass.getName)val shuffleId: Int = _rdd.context.newShuffleId()val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(shuffleId, _rdd.partitions.length, this)_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

源码注释可以理解为:表示对shuffle阶段的输出的依赖关系。注意,在shuffle的情况下,RDD是暂时的,因为我们不需要在executor端使用它。其实看dependency名字就知道了,顾名思义,只有发生了shuffle才可以称之为宽依赖

可以总结如下:父RDD的一个分区会被子RDD的多个分区所依赖

下图可以帮助理解

2、为什么需要宽窄依赖

2.1、DAG和Stage概念

2.1.1、DAG

Spark的DAG:就是spark任务/程序执行的流程图,

DAG的开始:从创建RDD开始

DAG的结束:到Action结束

一个Spark程序中有几个Action操作就有几个DAG,如何区分算子是否是action还是transformation,如果api返回的是RDD,则这个api算子一定是transformation,反之则为action。

2.1.2、Stage

Stage:是DAG中根据shuffle划分出来的阶段,前面的阶段执行完才可以执行后面的阶段,同一个阶段中的各个任务可以并行执行无需等待。

2.2、详解如何宽窄依赖之间是如何切分成Stage的、

如图:

总结:

窄依赖: 并行化+容错

宽依赖: 进行阶段划分(shuffle后的阶段需要等待shuffle前的阶段计算完才能执行)

3、哪些算子是宽依赖那些算子是窄依赖

map、flatMap、filter等等常规情况下都是窄依赖,不会产生shuffle

reduceBykey、groupByKey等等常规情况下都是宽依赖,会产生shuffle

但是其实宽窄依赖不能光靠算子名称来划分,需要根据定义来划分,因为有的时候,join操作可能是窄依赖,有的时候就是宽依赖。具体的还是要看stage具体执行的时候划分。

4、根据Spark UI解读join何时是窄依赖何时是宽依赖

4.1、join窄依赖

当join和前一个父RDD被划分到同一个Stage中的时候,就可以认为这是一个窄依赖

        SparkConf conf = new SparkConf().setAppName("Java-Test-WordCount").setMaster("local[*]");JavaSparkContext jsc = new JavaSparkContext(conf);List<Tuple2<String, Integer>> tuple2List1 = Arrays.asList(new Tuple2<>("Alice", 15), new Tuple2<>("Bob", 18), new Tuple2<>("Thomas", 20), new Tuple2<>("Catalina", 25));List<Tuple3<String, String, String>> tuple3List = Arrays.asList(new Tuple3<>("Alice", "Female", "NanJ"), new Tuple3<>("Thomas", "Male", "ShangH"), new Tuple3<>("Tom", "Male", "BeiJ"));//通过parallelize构建第一个RDDJavaRDD<Tuple2<String, Integer>> javaRDD1 = jsc.parallelize(tuple2List1);//通过parallelize构建第二个RDDJavaRDD<Tuple3<String, String, String>> javaRDD2 = jsc.parallelize(tuple3List);//通过mapToPair根据第一个RDD构建第三个RDDJavaPairRDD<String, Integer> javaRDD3 = javaRDD1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(Tuple2<String, Integer> tuple2) {return tuple2;}});//通过partitionBy根据第三个RDD构建第五个RDDJavaPairRDD<String, Integer> javaRDD31 = javaRDD3.partitionBy(new HashPartitioner(2));//通过mapToPair根据第二个RDD构建第四个RDDJavaPairRDD<String, Tuple2<String, String>> javaRDD4 = javaRDD2.mapToPair(new PairFunction<Tuple3<String, String, String>, String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, Tuple2<String, String>> call(Tuple3<String, String, String> tuple3) {return new Tuple2<>(tuple3._1(), new Tuple2<>(tuple3._2(), tuple3._3()));}});//通过partitionBy根据第四个RDD构建第六个RDDJavaPairRDD<String, Tuple2<String, String>> javaRDD41 = javaRDD4.partitionBy(new HashPartitioner(2));//通过join 根据第五和第六个RDD构建出第七个RDDJavaPairRDD<String, Tuple2<Integer, Tuple2<String, String>>> javaRDD6 = javaRDD31.join(javaRDD41);javaRDD6.foreach(new VoidFunction<Tuple2<String, Tuple2<Integer, Tuple2<String, String>>>>() {@Overridepublic void call(Tuple2<String, Tuple2<Integer, Tuple2<String, String>>> stringTuple2Tuple2) throws Exception {System.out.print(stringTuple2Tuple2);}});

查看Spark Web UI

Note:通过如上UI显示,可以看出,Stage5中,partitionBy 和 join在同一个Stage中,并且join是子RDD的算子,故而可以得出结论,在此Stage中,join就是一个窄依赖

4.2、join宽依赖

        SparkConf conf = new SparkConf().setAppName("Java-Test-WordCount").setMaster("local[*]");JavaSparkContext jsc = new JavaSparkContext(conf);List<Tuple2<String, Integer>> tuple2List1 = Arrays.asList(new Tuple2<>("Alice", 15), new Tuple2<>("Bob", 18), new Tuple2<>("Thomas", 20), new Tuple2<>("Catalina", 25));List<Tuple3<String, String, String>> tuple3List = Arrays.asList(new Tuple3<>("Alice", "Female", "NanJ"), new Tuple3<>("Thomas", "Male", "ShangH"), new Tuple3<>("Tom", "Male", "BeiJ"));//通过parallelize构建第一个RDDJavaRDD<Tuple2<String, Integer>> javaRDD1 = jsc.parallelize(tuple2List1);//通过parallelize构建第二个RDDJavaRDD<Tuple3<String, String, String>> javaRDD2 = jsc.parallelize(tuple3List);//通过mapToPair根据第一个RDD构建第三个RDDJavaPairRDD<String, Integer> javaRDD3 = javaRDD1.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(Tuple2<String, Integer> tuple2) {return tuple2;}});//通过mapToPair根据第二个RDD构建第四个RDDJavaPairRDD<String, Tuple2<String, String>> javaRDD4 = javaRDD2.mapToPair(new PairFunction<Tuple3<String, String, String>, String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, Tuple2<String, String>> call(Tuple3<String, String, String> tuple3) {return new Tuple2<>(tuple3._1(), new Tuple2<>(tuple3._2(), tuple3._3()));}});//通过join 根据第三个RDD和第四个RDD构建得出第五个RDDJavaPairRDD<String, Tuple2<Integer, Tuple2<String, String>>> javaRDD5 = javaRDD3.join(javaRDD4);javaRDD5.foreach(new VoidFunction<Tuple2<String, Tuple2<Integer, Tuple2<String, String>>>>() {@Overridepublic void call(Tuple2<String, Tuple2<Integer, Tuple2<String, String>>> stringTuple2Tuple2) throws Exception {System.out.print(stringTuple2Tuple2);}});

查看Spark Web UI

Note:通过如上UI显示,可以看出Stage2中就只有join一个算子操作,故而此处的join算子就是一个宽依赖。

浅谈Spark中的宽依赖和窄依赖相关推荐

  1. Spark 浅谈Spark中的各种join

    众所周知,Join的种类丰富: 按照**关联形式(**Join type)划分: 有内关联,外关联,左关联,右关联,半关联,逆关联等,由业务逻辑决定的关联形式决定了Spark任务的运行结果; 按照关联 ...

  2. python中 是什么类型_浅谈python中的变量默认是什么类型

    浅谈python中的变量默认是什么类型 1.type(变量名),输出的结果就是变量的类型: 例如 >>> type(6) 2.在Python里面变量在声明时,不需要指定变量的类型,变 ...

  3. 浅谈Linux中ldconfig和ldd的用法

    ldd 查看程序依赖库 ldd 作用:用来查看程式运行所需的共享库,常用来解决程式因缺少某个库文件而不能运行的一些问题. 示例:查看test程序运行所依赖的库: /opt/app/todeav1/te ...

  4. 浅谈Spark应用程序的性能调优

    浅谈Spark应用程序的性能调优 :http://geek.csdn.net/news/detail/51819 下面列出的这些API会导致Shuffle操作,是数据倾斜可能发生的关键点所在  1. ...

  5. 视频基础知识:浅谈视频会议中H.264编码标准的技术发展

    浅谈视频会议中H.264编码标准的技术发展 浅谈视频会议中H.264编码标准的技术发展 数字视频技术广泛应用于通信.计算机.广播电视等领域,带来了会议电视.可视电话及数字电视.媒体存储等一系列应用,促 ...

  6. python中gil锁和线程锁_浅谈Python中的全局锁(GIL)问题

    CPU-bound(计算密集型) 和I/O bound(I/O密集型) 计算密集型任务(CPU-bound) 的特点是要进行大量的计算,占据着主要的任务,消耗CPU资源,一直处于满负荷状态.比如复杂的 ...

  7. 宽依赖和窄依赖_Spark RDD中的依赖关系:宽依赖和窄依赖narrow/widedependency

    前言:前面我们讲过,RDD的转化Transformation操作是一个RDD生成另一个新的RDD的过程,那么新的RDD必然依赖原来的RDD.那么RDD的依赖分为几种,分别是什么含义呢?为什么要分类呢? ...

  8. Spark RDD的宽依赖和窄依赖

    系列文章目录 Spark RDD 的宽窄依赖关系 一.RDD的依赖关系? 在 Spark 中,RDD 分区的数据不支持修改,是只读的.如果想更新 RDD 分区中的数据,那么只 能对原有 RDD 进行转 ...

  9. linux中whoami命令的作用是,浅谈linux中的whoami与 who指令

    浅谈linux中的whoami与 who指令 whoami 功能说明: 显示用户名称 语法: whoami 补充说明: 显示自身的用户名称,本指令相当于执行  id -un 指令 whoami 与 w ...

  10. php css定位到图片上,CSS_浅谈css中图片定位之所有图标放在一张图上,如今做网页为了使网站丰富多 - phpStudy...

    浅谈css中图片定位之所有图标放在一张图上 如今做网页为了使网站丰富多彩,富于表现力,往往需要应用大量的图片/图标.如何处理这些图片,使其尽量不影响网页载入,解析等速度,是一个不大不小的问题.如果你的 ...

最新文章

  1. 分解例题及解析_【高考物理】考前梳理,高中物理经典常考例题(带解析),收藏起来考试不低于90+!...
  2. java arraylist json_java Arraylist转json 对象转json
  3. 安卓高手之路之java层Binder
  4. 虚拟化涉及的关键技术都有哪些,分别实现了什么功能?
  5. 用c语言编写图书成绩管理系统,学生成绩管理系统(c语言编写).doc
  6. 2015/Province_C_C++_C/6/奇妙的数字
  7. 高效大数据开发之 bitmap 思想的应用
  8. 计算机怎么删除表格,电脑中删除Excel2010表格多余图片的三种方法
  9. 平台策略:从Portlet到OpenSocial小工具再到渐进式Web应用程序:最新技术
  10. luogu P3244 [HNOI2015]落忆枫音
  11. netty客户端源码
  12. Intel Core Enhanced Core架构/微架构/流水线 (9) - 执行单元发射口旁路时延
  13. Thrift协议的服务模型
  14. css3 clac函数的易错整理
  15. 实现基于小米的电子商务平台架构
  16. zip_longest
  17. R语言︱常用统计方法包+机器学习包(名称、简介)
  18. 个人成长:谈谈工作之余如何提高个人技术水平
  19. Ubuntu新手-谈第一次在Ubuntu升级VMware Tolls
  20. linux 投影仪只显示桌面背景,投影仪只显示桌面背景,不显示桌面图标,怎么解决...

热门文章

  1. jmeter安装配置教程
  2. 计算机网络技术知识点
  3. linkedin 爬虫
  4. 使用socket模块写一个飞秋炸弹
  5. python线程池超过最大数量_讨论一下Python线程池大小设置?
  6. PyCharm2021设置成中文版
  7. 不可以! 南阳理工ACM 题目1071
  8. 学习信号与系统的看过来~~
  9. 开源练习linux网络编程,如何学习Linux网络编程
  10. 无需U盘在Windows下安装Linux系统实现双系统(非子系统)