相关文章:Hadoop/Spark的shuffle理论知识
注:此文不谈宽窄依赖的概念以及计算失败时恢复的区别【关于这方面的博文,书籍都有不少但几乎雷同的讲解,搜索即可,比如】,仅从源码的角度学习下实现区别,并验证一些说法。

搜索Dependency,有以下类:

Dependency.scala

Dependency(总的基类)

/*** :: DeveloperApi ::* Base class for dependencies.* 所有依赖的父类(基类),抽象类*/
@DeveloperApi
abstract class Dependency[T] extends Serializable {def rdd: RDD[T]
}
  • 可以看到,这个类就仅仅继承了Serializable接口,按照Java的思路的话,按理是implements实现接口 。但是Scala中无论是继承还是实现trait都用关键字extends。
  • java.io.Serializable :
  1. 序列化:对象的寿命通常随着生成该对象的程序的终止而终止,有时候需要把在内存中的各种对象的状态(也就是实例变量,不是方法)保存下来,并且可以在需要时再将对象恢复。
  2. Java 序列化技术可以使你将一个对象的状态写入一个Byte 流里(系列化),并且可以从其它地方把该Byte 流里的数据读出来(反序列化)。
  3. 用途:想把的内存中的对象状态保存到一个文件中或者数据库中时候;想把对象通过网络进行传播的时候
  4. 如果是通过网络传输的话,如果serialVersionUID不一致,那么反序列化就不能正常进行。
  5. 序列化保存的是对象的状态,静态变量属于类的状态,因此 序列化并不保存静态变量。
  6. 当某些变量不想被序列化,同是又不适合使用static关键字声明,那么此时就需要用transient关键字来声明该变量。
  7. 当一个父类实现序列化,子类自动实现序列化,不需要显式实现Serializable接口。
    参考[1]
  • 序列化与持久化的区别:
  1. 简单的说持久化就是save/load;序列化就是把结构化的对象转化成无结构的字节流。
  2. 序列化是为了解决对象的传输问题,传输可以在线程之间、进程之间、内存外存之间、主机之间进行。
  3. 你也可以说将一个对象序列化后存储在数据库中,但是你也不能说是对象持久化

NarrowDependency(窄依赖基类)

/*** 窄依赖的基类,窄依赖允许流水线执行.*/
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {/*** Get the parent partitions for a child partition.* 通过partitionId得到子RDD的某个partition依赖父RDD的所有partitions.* @param partitionId a partition of the child RDD* @return the partitions of the parent RDD that the child partition depends upon(子RDD的每个分区依赖少量(一个或多个)父RDD分区)*/// 为什么返回的是Seq[Int]?def getParents(partitionId: Int): Seq[Int]override def rdd: RDD[T] = _rdd
}

这里的getParents也是抽象方法

ShuffleDependency(宽依赖实现类)

该类主要实现创建新的shuffleId,以及新的shuffle,并把该shuffle相关的信息捆绑于shuffleHandle,shuffleManager用shuffleHandle传递信息给Tasks,并进行一些旧shuffle的清理工作。比如shuffleId,状态,广播变量等。

@DeveloperApi
// ClassTag参考:https://docs.scala-lang.org/overviews/reflection/typetags-manifests.html,https://blog.csdn.net/dax1n/article/details/77036447.
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](@transient private val _rdd: RDD[_ <: Product2[K, V]],val partitioner: Partitioner,val serializer: Serializer = SparkEnv.get.serializer,val keyOrdering: Option[Ordering[K]] = None,val aggregator: Option[Aggregator[K, V, C]] = None,val mapSideCombine: Boolean = false)extends Dependency[Product2[K, V]] {override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getNameprivate[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName// Note: It's possible that the combiner class tag is null, if the combineByKey// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.private[spark] val combinerClassName: Option[String] =Option(reflect.classTag[C]).map(_.runtimeClass.getName)// 产生新的shuffleId,表明要进行新的Shuffle操作.val shuffleId: Int = _rdd.context.newShuffleId()// ShuffleManager使用一个shuffle句柄将关于它的信息传递给Task,// 获取注册该RDD的SparkContext,再获取构建SparkContext时创建的SparkEnv,向SparkEnv中创建的shuffleManager注册shuffle,注册的信息捆绑于shuffleHandle,shuffleManager用shuffleHandle传递信息给Tasksval shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(shuffleId, _rdd.partitions.length, this)
// 清除该Shuffle的一些信息,比如状态,与之相关的广播变量等_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
  • 关于ClassTag,参考:https://docs.scala-lang.org/overviews/reflection/typetags-manifests.html,https://blog.csdn.net/dax1n/article/details/77036447.
  • 关于Product2:
  1. Product2 is a Cartesian product of 2 components.
  2. Product, Equals, Any的子类,Tuple2的父类
  3. MORE:https://www.scala-lang.org/api/current/scala/Product2.html
  • shuffle垃圾清理
/** Register a ShuffleDependency for cleanup when it is garbage collected. */def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit = {registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))}
/** Register a ShuffleDependency for cleanup when it is garbage collected. */def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit = {registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))}
case CleanShuffle(shuffleId) =>doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)

真正执行清理工作的方法是:ContextCleaner.scala中的doCleanupShuffle,unregisterShuffle等

mapStatuses、cachedSerializedStatuses、shuffleIdLocks都是线程安全的ConcurrentHashMap,cachedSerializedBroadcast是HashMap

/** Unregister shuffle data */override def unregisterShuffle(shuffleId: Int) {// 移除多个HashMap中的shuffleIdmapStatuses.remove(shuffleId)cachedSerializedStatuses.remove(shuffleId)// 还要移除该shuffle上的所有广播变量.cachedSerializedBroadcast.remove(shuffleId).foreach(v => removeBroadcast(v))// shuffleIdLocks是为了防止同一个shuffle的多次序列化,比如说:在shuffle开始时以及发生请求风暴时.shuffleIdLocks.remove(shuffleId)}

OneToOneDependency(窄依赖实现类)

仅一个对基类中的getParents的重写方法,不太理解getParents为何返回的是Int数,还有List(partitionId)这个”操作“。

@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

RangeDependency(窄依赖实现类)

有朋友说RangeDependency仅在Union操作中用到,暂时未验证。

@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)extends NarrowDependency[T](rdd) {override def getParents(partitionId: Int): List[Int] = {if (partitionId >= outStart && partitionId < outStart + length) {List(partitionId - outStart + inStart)} else {Nil}}
}

窄依赖的函数有:
map, filter, union, join(父RDD是hash-partitioned ), mapPartitions, mapValues等

宽依赖的函数有:
groupByKey, join(父RDD不是hash-partitioned ), partitionBy、sortByKey、reduceByKey、countByKey等

TODO

[1] ConcurrentHashMap和HashMap的进阶理解
[2] 不太理解getParents为何返回的是Int数,还有List(partitionId)这个”操作“

参考

[1]序列化

宽窄依赖以及shuffle的部分源码理解相关推荐

  1. Spring-bean的循环依赖以及解决方式___Spring源码初探--Bean的初始化-循环依赖的解决

    本文主要是分析Spring bean的循环依赖,以及Spring的解决方式. 通过这种解决方式,我们可以应用在我们实际开发项目中. 什么是循环依赖? 怎么检测循环依赖 Spring怎么解决循环依赖 S ...

  2. TLD(Tracking-Learning-Detection)学习与源码理解之(六)

    TLD(Tracking-Learning-Detection)学习与源码理解之(六) zouxy09@qq.com http://blog.csdn.net/zouxy09 下面是自己在看论文和这些 ...

  3. TLD(Tracking-Learning-Detection)学习与源码理解之(四)

    TLD(Tracking-Learning-Detection)学习与源码理解之(四) zouxy09@qq.com http://blog.csdn.net/zouxy09 下面是自己在看论文和这些 ...

  4. 从hotspot底层对象结构理解锁膨胀升级过程||深入jdk源码理解longadder的分段cas优化机制——分段CAS优化

    深入jdk源码理解longadder的分段cas优化机制 longadder

  5. faster rcnn源码理解(二)之AnchorTargetLayer(网络中的rpn_data)

    转载自:faster rcnn源码理解(二)之AnchorTargetLayer(网络中的rpn_data) - 野孩子的专栏 - 博客频道 - CSDN.NET http://blog.csdn.n ...

  6. faster rcnn的源码理解(一)SmoothL1LossLayer论文与代码的结合理解

    转载自:faster rcnn的源码理解(一)SmoothL1LossLayer论文与代码的结合理解 - 野孩子的专栏 - 博客频道 - CSDN.NET http://blog.csdn.net/u ...

  7. TLD(Tracking-Learning-Detection)学习与源码理解之(五)

    TLD(Tracking-Learning-Detection)学习与源码理解之(五)   zouxy09@qq.com http://blog.csdn.net/zouxy09 下面是自己在看论文和 ...

  8. TLD(Tracking-Learning-Detection)学习与源码理解之(三)

    TLD(Tracking-Learning-Detection)学习与源码理解之(三) zouxy09@qq.com http://blog.csdn.net/zouxy09 下面是自己在看论文和这些 ...

  9. TLD(Tracking-Learning-Detection)学习与源码理解之(二)

    TLD(Tracking-Learning-Detection)学习与源码理解之(二) zouxy09@qq.com http://blog.csdn.net/zouxy09 OpenTLD下载与编译 ...

最新文章

  1. 利用非递归方法实现二叉树的中序遍历
  2. Python剑指offer:分行从上到下打印二叉树
  3. LeetCode Pancake Sorting
  4. asp.net core权限模块的快速构建
  5. c语言程序 强制关机程序,怎样用C语言编写关机程序
  6. 收藏 | 让你纵横 GitHub 的五大神器
  7. PHP使用http_build_query()构造URL字符串的方法(可将POST参数组转换拼接成GET请求链接)...
  8. webpack 的 scope hoisting 是什么?
  9. override与final
  10. MySQL 死锁专题问题处理
  11. EOS官方钱包keosd
  12. 单片机STM8S测量电压电路_50个单片机晶振问题及解决方法小结
  13. Vue 中英文切换设置
  14. 夏普电视服务器维修,夏普液晶电视机通病维修方法
  15. c语言改变图片颜色,用c语言把bmp格式的彩色图片转换成黑白的
  16. GPS是如何定位的?
  17. 微信小游戏之飞机大战解析
  18. docker运行yyets_docker 使用教程1
  19. Stroke – 开源鼠标手势软件[Windows]
  20. 皕杰报表web应用服务器的差异

热门文章

  1. Vuex前端saas人力资源中台管理项目第四天 员工管理模块
  2. 「我是为“数据”去的京东」对话京东供应链首席科学家申作军
  3. GitOps | 一种云原生的持续交付模型
  4. linkerd 本地环境安装
  5. 基于中国剩余定理的秘密共享方案(miracl库)
  6. 什么蓝绿部署(BlueGreenDeployment) ?
  7. Linux 服务器配置使用密钥登录教程
  8. data mining - 实用机器学习工具与技术 - 读书笔记( 一 )
  9. Win11还原Win10开始菜单及任务栏工具[Win7勿入]
  10. 每日三省吾身- 持续改进-持续集成