宽窄依赖以及shuffle的部分源码理解
相关文章:Hadoop/Spark的shuffle理论知识
注:此文不谈宽窄依赖的概念以及计算失败时恢复的区别【关于这方面的博文,书籍都有不少但几乎雷同的讲解,搜索即可,比如】,仅从源码的角度学习下实现区别,并验证一些说法。
搜索Dependency,有以下类:
Dependency.scala
Dependency(总的基类)
/*** :: DeveloperApi ::* Base class for dependencies.* 所有依赖的父类(基类),抽象类*/
@DeveloperApi
abstract class Dependency[T] extends Serializable {def rdd: RDD[T]
}
- 可以看到,这个类就仅仅继承了Serializable接口,按照Java的思路的话,按理是implements实现接口 。但是Scala中无论是继承还是实现trait都用关键字extends。
- java.io.Serializable :
- 序列化:对象的寿命通常随着生成该对象的程序的终止而终止,有时候需要把在内存中的各种对象的状态(也就是实例变量,不是方法)保存下来,并且可以在需要时再将对象恢复。
- Java 序列化技术可以使你将一个对象的状态写入一个Byte 流里(系列化),并且可以从其它地方把该Byte 流里的数据读出来(反序列化)。
- 用途:想把的内存中的对象状态保存到一个文件中或者数据库中时候;想把对象通过网络进行传播的时候
- 如果是通过网络传输的话,如果serialVersionUID不一致,那么反序列化就不能正常进行。
- 序列化保存的是对象的状态,静态变量属于类的状态,因此 序列化并不保存静态变量。
- 当某些变量不想被序列化,同是又不适合使用static关键字声明,那么此时就需要用transient关键字来声明该变量。
- 当一个父类实现序列化,子类自动实现序列化,不需要显式实现Serializable接口。
参考[1]
- 序列化与持久化的区别:
- 简单的说持久化就是save/load;序列化就是把结构化的对象转化成无结构的字节流。
- 序列化是为了解决对象的传输问题,传输可以在线程之间、进程之间、内存外存之间、主机之间进行。
- 你也可以说将一个对象序列化后存储在数据库中,但是你也不能说是对象持久化
NarrowDependency(窄依赖基类)
/*** 窄依赖的基类,窄依赖允许流水线执行.*/
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {/*** Get the parent partitions for a child partition.* 通过partitionId得到子RDD的某个partition依赖父RDD的所有partitions.* @param partitionId a partition of the child RDD* @return the partitions of the parent RDD that the child partition depends upon(子RDD的每个分区依赖少量(一个或多个)父RDD分区)*/// 为什么返回的是Seq[Int]?def getParents(partitionId: Int): Seq[Int]override def rdd: RDD[T] = _rdd
}
这里的getParents也是抽象方法
ShuffleDependency(宽依赖实现类)
该类主要实现创建新的shuffleId,以及新的shuffle,并把该shuffle相关的信息捆绑于shuffleHandle,shuffleManager用shuffleHandle传递信息给Tasks,并进行一些旧shuffle的清理工作。比如shuffleId,状态,广播变量等。
@DeveloperApi
// ClassTag参考:https://docs.scala-lang.org/overviews/reflection/typetags-manifests.html,https://blog.csdn.net/dax1n/article/details/77036447.
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](@transient private val _rdd: RDD[_ <: Product2[K, V]],val partitioner: Partitioner,val serializer: Serializer = SparkEnv.get.serializer,val keyOrdering: Option[Ordering[K]] = None,val aggregator: Option[Aggregator[K, V, C]] = None,val mapSideCombine: Boolean = false)extends Dependency[Product2[K, V]] {override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getNameprivate[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName// Note: It's possible that the combiner class tag is null, if the combineByKey// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.private[spark] val combinerClassName: Option[String] =Option(reflect.classTag[C]).map(_.runtimeClass.getName)// 产生新的shuffleId,表明要进行新的Shuffle操作.val shuffleId: Int = _rdd.context.newShuffleId()// ShuffleManager使用一个shuffle句柄将关于它的信息传递给Task,// 获取注册该RDD的SparkContext,再获取构建SparkContext时创建的SparkEnv,向SparkEnv中创建的shuffleManager注册shuffle,注册的信息捆绑于shuffleHandle,shuffleManager用shuffleHandle传递信息给Tasksval shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(shuffleId, _rdd.partitions.length, this)
// 清除该Shuffle的一些信息,比如状态,与之相关的广播变量等_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
- 关于ClassTag,参考:https://docs.scala-lang.org/overviews/reflection/typetags-manifests.html,https://blog.csdn.net/dax1n/article/details/77036447.
- 关于Product2:
- Product2 is a Cartesian product of 2 components.
- Product, Equals, Any的子类,Tuple2的父类
- MORE:https://www.scala-lang.org/api/current/scala/Product2.html
- shuffle垃圾清理
/** Register a ShuffleDependency for cleanup when it is garbage collected. */def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit = {registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))}
/** Register a ShuffleDependency for cleanup when it is garbage collected. */def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit = {registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))}
case CleanShuffle(shuffleId) =>doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
真正执行清理工作的方法是:ContextCleaner.scala中的doCleanupShuffle,unregisterShuffle等
mapStatuses、cachedSerializedStatuses、shuffleIdLocks都是线程安全的ConcurrentHashMap,cachedSerializedBroadcast是HashMap
/** Unregister shuffle data */override def unregisterShuffle(shuffleId: Int) {// 移除多个HashMap中的shuffleIdmapStatuses.remove(shuffleId)cachedSerializedStatuses.remove(shuffleId)// 还要移除该shuffle上的所有广播变量.cachedSerializedBroadcast.remove(shuffleId).foreach(v => removeBroadcast(v))// shuffleIdLocks是为了防止同一个shuffle的多次序列化,比如说:在shuffle开始时以及发生请求风暴时.shuffleIdLocks.remove(shuffleId)}
OneToOneDependency(窄依赖实现类)
仅一个对基类中的getParents的重写方法,不太理解getParents为何返回的是Int数,还有List(partitionId)这个”操作“。
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
RangeDependency(窄依赖实现类)
有朋友说RangeDependency仅在Union操作中用到,暂时未验证。
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)extends NarrowDependency[T](rdd) {override def getParents(partitionId: Int): List[Int] = {if (partitionId >= outStart && partitionId < outStart + length) {List(partitionId - outStart + inStart)} else {Nil}}
}
窄依赖的函数有:
map, filter, union, join(父RDD是hash-partitioned ), mapPartitions, mapValues等
宽依赖的函数有:
groupByKey, join(父RDD不是hash-partitioned ), partitionBy、sortByKey、reduceByKey、countByKey等
TODO
[1] ConcurrentHashMap和HashMap的进阶理解
[2] 不太理解getParents为何返回的是Int数,还有List(partitionId)这个”操作“
参考
[1]序列化
宽窄依赖以及shuffle的部分源码理解相关推荐
- Spring-bean的循环依赖以及解决方式___Spring源码初探--Bean的初始化-循环依赖的解决
本文主要是分析Spring bean的循环依赖,以及Spring的解决方式. 通过这种解决方式,我们可以应用在我们实际开发项目中. 什么是循环依赖? 怎么检测循环依赖 Spring怎么解决循环依赖 S ...
- TLD(Tracking-Learning-Detection)学习与源码理解之(六)
TLD(Tracking-Learning-Detection)学习与源码理解之(六) zouxy09@qq.com http://blog.csdn.net/zouxy09 下面是自己在看论文和这些 ...
- TLD(Tracking-Learning-Detection)学习与源码理解之(四)
TLD(Tracking-Learning-Detection)学习与源码理解之(四) zouxy09@qq.com http://blog.csdn.net/zouxy09 下面是自己在看论文和这些 ...
- 从hotspot底层对象结构理解锁膨胀升级过程||深入jdk源码理解longadder的分段cas优化机制——分段CAS优化
深入jdk源码理解longadder的分段cas优化机制 longadder
- faster rcnn源码理解(二)之AnchorTargetLayer(网络中的rpn_data)
转载自:faster rcnn源码理解(二)之AnchorTargetLayer(网络中的rpn_data) - 野孩子的专栏 - 博客频道 - CSDN.NET http://blog.csdn.n ...
- faster rcnn的源码理解(一)SmoothL1LossLayer论文与代码的结合理解
转载自:faster rcnn的源码理解(一)SmoothL1LossLayer论文与代码的结合理解 - 野孩子的专栏 - 博客频道 - CSDN.NET http://blog.csdn.net/u ...
- TLD(Tracking-Learning-Detection)学习与源码理解之(五)
TLD(Tracking-Learning-Detection)学习与源码理解之(五) zouxy09@qq.com http://blog.csdn.net/zouxy09 下面是自己在看论文和 ...
- TLD(Tracking-Learning-Detection)学习与源码理解之(三)
TLD(Tracking-Learning-Detection)学习与源码理解之(三) zouxy09@qq.com http://blog.csdn.net/zouxy09 下面是自己在看论文和这些 ...
- TLD(Tracking-Learning-Detection)学习与源码理解之(二)
TLD(Tracking-Learning-Detection)学习与源码理解之(二) zouxy09@qq.com http://blog.csdn.net/zouxy09 OpenTLD下载与编译 ...
最新文章
- 利用非递归方法实现二叉树的中序遍历
- Python剑指offer:分行从上到下打印二叉树
- LeetCode Pancake Sorting
- asp.net core权限模块的快速构建
- c语言程序 强制关机程序,怎样用C语言编写关机程序
- 收藏 | 让你纵横 GitHub 的五大神器
- PHP使用http_build_query()构造URL字符串的方法(可将POST参数组转换拼接成GET请求链接)...
- webpack 的 scope hoisting 是什么?
- override与final
- MySQL 死锁专题问题处理
- EOS官方钱包keosd
- 单片机STM8S测量电压电路_50个单片机晶振问题及解决方法小结
- Vue 中英文切换设置
- 夏普电视服务器维修,夏普液晶电视机通病维修方法
- c语言改变图片颜色,用c语言把bmp格式的彩色图片转换成黑白的
- GPS是如何定位的?
- 微信小游戏之飞机大战解析
- docker运行yyets_docker 使用教程1
- Stroke – 开源鼠标手势软件[Windows]
- 皕杰报表web应用服务器的差异
热门文章
- Vuex前端saas人力资源中台管理项目第四天 员工管理模块
- 「我是为“数据”去的京东」对话京东供应链首席科学家申作军
- GitOps | 一种云原生的持续交付模型
- linkerd 本地环境安装
- 基于中国剩余定理的秘密共享方案(miracl库)
- 什么蓝绿部署(BlueGreenDeployment) ?
- Linux 服务器配置使用密钥登录教程
- data mining - 实用机器学习工具与技术 - 读书笔记( 一 )
- Win11还原Win10开始菜单及任务栏工具[Win7勿入]
- 每日三省吾身- 持续改进-持续集成