1. shuffle 逻辑流程

2. shuffleDependency是什么

shuffleDependency是两个Stage中的连接属性。
shuffleDependency即是上游Stage的一个参数。有是下游Stage的一个属性。

3. shuffleDependency的Handler最优选择。

shuffleDependency在注册shuffleManager的时候会调用 registerShuffle 。
registerShuffle会选择最优的Handler。来确保性能优化。
当调用handler时候会 依次判断是否符合条件 。如果符合使用该handler。如果不符合前两
个那么使用BaseShuffleHandle。 shuffleHandler有三种:
1. BypassMergeSortShuffleHandle
条件1. 不能有map端的聚合
2. 下游分区数必须小于200
2. SerializedShuffleHandle
条件 1. 序列化器是否支持寻址
2. 不能有聚合操作
3. 下游分区数小于16777215
3. BaseShuffleHandle
不符合上面两个,那么就调用BaseShuffleHandle。

4. 根据不同的Handle选择不同的Writer

1. BypassMergeSortShuffleHandle -------> BypassMergeSortShuffleWriter
2. SerializedShuffleHandle -----> UnsafeShuffleWrite
3. BaseShuffleHandle -------> BypassMergeSortShuffleWriter

5. RDD,Task和stage的关系

1 . 一个stage中RDD都是窄依赖关联。stage和stage之间是shuffle关联
2. Task是一个载体。使用RunTask()方法来运行不同的RDD。而RDD必然是stage最后
一个RDD类型。
3. stage是抽象的,RDD有多少个分区就会有多少个Task。
4. 一个Stage的Task的并行度是由Stage的最后一个RDD的分区数来决定的。
5. Rdd和Stage都是抽象的,逻辑的。只有分区和Task是物理的。

6. 上下游任务task怎么完成最优的计算和拉取数据

7. Spark和MapReducec的落盘方式

a. BypassMergeSortShuffleHandle -------
>BypassMergeSortShuffleWriter 的文件落盘方式。(无内存缓冲区,无内
存开辟,直接动用IO来,直接合成一个文件,来形成文件)
1. 所有的Map计算完成后,根据hash取余来计算分区号。放入到对应的分区文件中。
2. 当计算完成之后。按照分区文件线性IO进行拼接。形成index的索引。来记录顺序。
这样可以避免随机读写。
3. groupByKey,sortByKey,combineByKey 会触发
BypassMergeSortShuffleWriter。(必须满足
BypassMergeSortShuffleHandle的两个条件)
b. MapReduce的文件落盘方式
1. map计算之后,计算分区号。Map输出的对象要进行序列化。会将序列化后的对象
放入Hash环中。(下图为环形缓冲区(Hash环))
(Hash环中分为数据段和索引段。排序的是索引段,当数据端数据过大超出范围会
占用索引段的空间所以叫做环形缓冲区)
2. 当Hash环的阈值达到百分80之后会另外开启一个线程,进行排序并形成内部有序外
部无序的文件。
a. (在进行排序时会把对象进行反序列化。)
b. (为什么要进行排序。因为如果不进行这次排序,后面就很难形成一个复杂度为1
的归并排序)
3. 数据再进行一次归并排序
4. 最后也会形成一个index的索引文件。
c. BypassMergeSortShuffleWriter 和 MapReduce比较。
1. 两者都进行了文件排序。都形成了索引index
2. 但是spark的 BypassMergeSortShuffleWriter要比MapReduce的速度
快。 因为在Map分离时直接将数据写入到文件中,减少了一次排序的过程。减少
了对象序列化和反序列化的过程
2. BaseShuffleHandle -------> BypassMergeSortShuffleWriter
的文件落盘方式。
(动用内存缓冲的handle。动用内存缓冲分为聚合和不聚合。有聚合器
使用Map,没有聚合器使用buffer)
a.UnsafeShuffleWrite写文件的两种方式:
判断是否有聚合器:
1. buffer
条件:不需要Map端聚合数据的。使用buffer(内存缓冲区的方式)。为了应
付小文件太多。因为 BypassMergeSortShuffleWriter只支持200个一下的,最
后通过溢写->排序->归并的到一个全排序文件。
过程:
1. 将数据放到堆中。生成一个数组的索引来记录k和v。
2. 索引的数据结构采用 K=2*n V=2*n+1记录,数组容量64*2,元素类型
4Byte。
3. 当数组容量满了的时候进行扩容。使用 System.arraycopy。扩容原数组
的2倍。
2. map
条件: 需要聚合数据。通过一个算法(线性探测在散列)来合并计算。(类似
HashMap底层但不一样。)最后还是要合成一个排序文件。
过程 1. 底层先会计算k的hash值。来确定放在那个数组。在数组里又有链表(比较像
hashMap)
2. 确定在那个数组,查看是否有数据。如果没有直接放入。K放在2n的位置。V放
在2N+1的位置
3. 如果有数据么么判断K是否相同,如果相同那么V进行合并,如果不相同,那么
计算K的Hash值并+1,如果还是有值,K不相同,那么再将取模后的值+2.进入死循环,直
到数据放入退出循环。
4.如果hash的值加到达容量的0.7还是没有加进去。那么会进行扩容。(系统会记
录+到的最大数据。默认最大时64)
5. 扩容会形成一个新的数据,循环将数据放入新的数组中。并重新计算hash。
3. SerializedShuffleHandle -----> UnsafeShuffleWrite 的文件落盘方式。
条件:必须是序列化的,不能是对象
下面10和11详细介绍

9. 内存管理器MemoryManager

1. MemoryManager 内存管理器分为 on heap(堆内)和off heap(堆外)。
2. MemoryManager又有execution(计算)和storage(存储)两部分。两
部分都是可长可短。可以相互占用。
3. BlockManager包括两个模块disk和memory。 当task结束时要写磁盘中。通过
blockmanager的disk写入磁盘。当task调用cache(缓存)的时候调用memory,将缓存放
入到memoryManager的storage中。
4. execution在任务计算的过程当中使用

10 内存的Page页理解

内存页有索引在指定内存数据的位置 1. 每个task在要写入的是否需要先将对象转换成字节数组。在根据字节数组申请page。
2. page只能存放byte[]。
3. page有堆外和堆内两种。根据page的不同。inMemSorter(排序索引)也分为堆内堆外两
1. 一个大内存里有很多小的page页。一个page页中又有很多小的Frame。

11. 内存分配

1.在 MemoryBlock 管理内存分配器。内存分配分为两种:
堆上分配和堆外分配
1.堆上分配(HeapMemoryAllocator):(只要是堆上存储就有对
象头)
1 new MemoryBlock ( array , Platform . LONG_ARRAY_OFFSET , size );
a. 会通过unsafe的 arrayBaseOffset(long[].class) 获取 long[]第一个元素面向long[]的偏移量:16,
(因为有头信息,所以第一个位置从16开始) 。获取到元素偏移量,元素的大小,在pageNum(第几页)位
置。(
总结:1. 申请page页面。
2. 如果是第一次来存放数据那么申请页面。
3. 如果不是第一次。那么 判断 “游标”加上“新数据”的长度大于page的大小。那
么申请页面。小于page的大小。那么不申请页面。使用原page。)
b. 然后通过unsafe的putInt方法,将pageCursor(游标)跳过对象头(16b),对象头后面添加四
个字节(该对象的元素大小,后面要查询的时候可以精确知道元素的位置信息。)
c. 最后在经过unsafe的coptMemory方法,将原数据的recordBase拷贝到page的
recordBase中。对游标进行更新(记录到这条元素最后的位置)。这样就将数据放入到了page中。
2.堆外分配
1 new MemoryBlock ( null , address , size );
a. 通过unsafe的 allocateMemory进行堆外内存的申请。
b. 然后通过unsafe的putInt方法,将游标放入内存中(游标记录的是数据的长度)。没有对象头所以从
0开始。
c. 最后在经过unsafe的coptMemory方法,将原数据的recordBase拷贝到内存的
recordBase中。对游标进行更新(记录到这条元素最后的位置)。这样就将数据放入到了
page中。
3.比较 a. 堆内存储比堆外存储要慢些。因为堆内存进行page分页和对象头还要计算内存位置
等一系列操作。而堆外没有对象头和page分页,不需要计算可以直接寻址,直接在一块内
存上直接操作。
b. 堆内都得经过JVM进行翻译,而堆外不需要JVM翻译。
c. 使用堆内和堆外取决于 MemoryBlock是否有 array long[]。
d. 堆内内存是先向jvm申请一块内存空间。然后在通过unsafe方法对内存空间的数据进行修
改。
堆外内存是直接通过unsafe开辟一块内存空间,不经过jvm。直接用unsafe方法堆外存
空间进行修改。jvm不会堆外内存有影响。
e. 堆内内存GC可以回收。而堆外内存只能通过unsafe的方法进行回收

12. 内存读取

1.线性读取
a.每次读取直接读取前4b,获取数据的长度,读取第二个元素。直接使用游标
0+4b+数据长度。获取第二个元素的长度进行读取。
2. 环形读取(MapReduce)
a. 每次查找的是索引列。根据索引列的数据长度来获取数据。
13.溢写操作(spillSize)
1. 当堆内或者堆外内存满了的时候进行溢写。
2. 溢写对文件的索引进行排序。排序成分区有序的文件。

spark的shuffle的write原理逻辑相关推荐

  1. 详细探究Spark的shuffle实现

    Background 在MapReduce框架中,shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了 ...

  2. [Spark][翻译]Spark 架构: Shuffle过程分析

    为什么80%的码农都做不了架构师?>>>    Spark Architecture: Shuffle Spark架构:Shuffle This is my second artic ...

  3. 大数据开发:剖析Hadoop和Spark的Shuffle过程差异

    一.前言 对于基于MapReduce编程范式的分布式计算来说,本质上而言,就是在计算数据的交.并.差.聚合.排序等过程.而分布式计算分而治之的思想,让每个节点只计算部分数据,也就是只处理一个分片,那么 ...

  4. Spark的shuffle机制

    Spark的shuffle机制 最近在面试大数据开发工程师,面某B公司的时候问到了Spark的shuffle机制,并且问和MR的shuffle有什么样的区别,当时答得不太好,决定好好研究这个玩意儿,网 ...

  5. Spark bucketing bucket分桶原理和应用

    介绍Spark分桶的原理以及如何在数据分析和数据准备中更好的应用分桶. 分桶概念 spark的bucketing分桶是一种组织存储系统中数据的方式.以便后续查询中用到这种机制,来提升计算效率. 如果分 ...

  6. Spark 之 Shuffle

    Shuffle 在Spark中Shuffle的目的是为了保证每一个key所对应的value都会汇聚到同一个分区上去聚合和处理. Spark 中的 Shuffle 操作的特点 只有 Key-Value ...

  7. shuffle机制和原理分析

    Shuffle简介 Shuffle描述着数据从map task输出到reduce task输入的这段过程.shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过s ...

  8. 剖析Hadoop和Spark的Shuffle过程差异(一)

    一.前言 对于基于MapReduce编程范式的分布式计算来说,本质上而言,就是在计算数据的交.并.差.聚合.排序等过程.而分布式计算分而治之的思想,让每个节点只计算部分数据,也就是只处理一个分片,那么 ...

  9. 谈谈spark.sql.shuffle.partitions和 spark.default.parallelism 的区别及spark并行度的理解

    谈谈spark.sql.shuffle.partitions和 spark.default.parallelism 的区别及spark并行度的理解 spark.sql.shuffle.partitio ...

最新文章

  1. .Net Core 简洁架构事件(这个不完整,待仔细补充)
  2. java 不执行构造函数_函数作为构造函数执行,但不作为函数执行
  3. android 本地日历,Android日历提供商:如何删除自己的本地日历?
  4. 淘宝直播应关注哪些方面?
  5. vue中的组件重定向
  6. 【三维路径规划】基于matlab麻雀算法求解无人机三维路径规划问题【含Matlab源码 212期】
  7. matlab-m文件常用积分函数-ode45含有时变参数用法/菜鸟理解4
  8. 【解决方案】Excel条形图顺序与源数据相反怎么办
  9. c语言编写排列组合程序,C语言实现排列组合
  10. 迷宫小游戏Java实现
  11. 【读 深入浅出数据分析】 一日一章 读书第三天 第三章-最优化:寻找最大值
  12. win2003控制面板不见了,打开“控制面板”的方法,安全策略
  13. 如何修改DOSBOX的窗口大小
  14. 【算法思考】双目视差求像素深度公式理解
  15. 逻辑运算符,与,或,非,以及优先级。
  16. 转:使用memc-nginx和srcache-nginx模块构建高效透明的缓存机制
  17. ffmpeg音频解码重采样(立体声转成单声道)
  18. 把握数字中国建设重大契机,实在智能携手山东商业职业技术学院共建“现代金融数字化实训中心”
  19. 如果你已经过了20岁但还不到25岁
  20. SPARK 2.2.1 SQL处理各种数据源的案例与解读

热门文章

  1. PPTV的走向:“卖身”求生或成唯一选择
  2. 项目管理 之五 版本控制系统(CVS、SVN、BitKeeper、Git )概念、分类
  3. 苹果iPad Pro新品:7.9寸+手写笔+3D Touch
  4. 完美解决Error:java: 无效的源发行版: 12
  5. yum [Errno 256] No more mirrors to try
  6. realmeq2鸿蒙系统,realme UI 2.0官宣:十月随新机一起发布!
  7. 【Linux】vim的复制粘贴
  8. 「清华集训 2017」某位歌姬的故事
  9. 数据结构与算法的思考与历程
  10. 如何查看服务器CPU核心数和线程数