Spark Shuffle

Spark Shuffle是发生在宽依赖(Shuffle Dependency)的情况下,上游Stage和下游Stage之间传递数据的一种机制。Shuffle解决的问题是如何将数据重新组织,使其能够在上游和下游task之间进行传递和计算。如果是单纯的数据传递,则只需要将数据进行分区、通过网络传输即可,没有太大难度,但Shuffle机制还需要进行各种类型的计算(如聚合、排序),而且数据量一般会很大。如何支持这些不同类型的计算,如何提高Shuffle的性能都是Shuffle机制设计的难点问题。

从总体框架上来看,Spark Shuffle分为Shuffle Write和Shuffle Read两个部分,Shuffle Write解决上游Stage的输出数据的分区问题,Shuffle Read解决下游Stage从上游Stage获取数据、重新组织、并为后续操作提供数据的问题。

Shuffle Write

在Shuffle Write阶段,数据操作需要分区、聚合和排序三个功能,但是不同的数据操作所需要的功能不同,有些数据操作或许只需要一到两个功能。但是,Shuffle Write有一个总体的设计框架,即“map()输出->数据聚合(combine)->排序(sort)->分区”。

不同的数据操作算子有着不一样的实现,这里我讲一下不同情况下的Shuffle Write方式。

1. 不需要聚合(combine)和排序(sort)

这种方法只需要将数据分区即可。输出每条数据并通过hash取模(hashcode(key)%numPartitions)计算其分区id,然后按照分区id输入到不同的buffer中,每当buffer填满时就溢写到磁盘分区文件中。使用buffer是为了减少磁盘I/O次数,用缓冲提高效率。这种Shuffle Write方式叫做BypassMergeSortShuffleWriter。

这种方式的Shuffle Write优点就是速度快,不需要聚合和排序操作,直接按照分区输出,缺点就是资源消耗高,每个分区都需要一个buffer和分区文件,因此不适合过大的分区数。

该模式适用的操作类型:map()端不需要聚合(combine)、Key不需要排序且分区个数较少(〈=spark.Shuffle.sort.bypassMergeThreshold,默认值为200)。例如,groupByKey(100),partitionBy(100),sortByKey(100)等。

2. 不需要聚合(combine),但需要排序(sort)

这种方式跟上一种方式比起来多了一个排序的功能,在计算出分区id后,会把数据放到一个Array中,在这个Array中,我们会让Array的Key变成分区id+Key的形式。在Spark Shuffle中,这个Array名字叫PartitionedPairBuffer。

然后,我们按照分区id+Key来做排序,如果在接收数据过程中buffer满了,就会先扩容,如果还存放不下,那么就会将当前buffer排序后溢写到磁盘,清空buffer继续写。等待数据输出完后,再将Array和磁盘的数据做一个全局排序,得到最后一个大的排序的分区文件。这个Shuffle模式叫做SortShuffleWrite。

这个方式的Shuffle Write的优点就是可以按照分区id+Key排序,并且buffer有扩容和溢写的功能,最后会整合到一个分区文件中,减少了磁盘I/O。缺点就是排序本身提高了计算时延。

该Shuffle模式适用的操作:map()端不需要聚合(combine)、Key需要排序、分区个数无限制。目前,Spark本身没有提供这种排序类型的数据操作,但不排除用户会自定义,或者系统未来会提供这种类型的操作。sortByKey()操作虽然需要按Key进行排序,但这个排序过程在Shuffle Read端完成即可,不需要在Shuffle Write端进行排序。

另外,我们可以看看刚刚所说的BypassMergeSortShuffleWriter的最大问题就是分区过多(>200)会导致buffer过大、建立和打开文件数过多。在这种情况下,我们可以将SortShuffleWrite中的“按照分区id+Key来做排序”改为“只按分区id排序”的实现方式,就可以支持第一种情况中分区数过大时的问题。例如groupByKey(300)、partitionBy(300)、sortByKey(300)。

3. 需要聚合(combine),需要或者不需要按Key进行排序(sort)

在这种情况下,Shuffle就走完了“map()输出->数据聚合(combine)->排序(sort)->分区”的流程。

在数据聚合阶段,Spark Shuffle会创建一个Map的结构来聚合数据。Map的数据格式是<(PID, K), V>,每次来数据的时候会按照分区id+Key来给数据做聚合,每来一条新数据就以Map的旧数据去更新Map的值。例如聚合方式是sum(),那么每次从Map中取出旧值,与新数据相加,再put到Map中。

数据聚合后,会通过Array将数据进行排序。如果需要按照Key进行排序,那么就按照分区id+Key来排序;如果不需要按照Key进行排序,那么只按照分区id来排序。

如果Map放不下,则会先扩容为两倍大小,如果还存放不下,就把Map中的数据排序后溢写到磁盘上,并清空Map继续聚合,这个操作可以重复多次。当数据处理完后,会把Map数据和磁盘中的数据再次聚合(merge),最后得到一个聚合与排序后的分区文件。

该Shuffle模式的优缺点:

优点是只需要一个Map结构就可以支持map()端的combine功能,Map具有扩容和spill到磁盘上的功能,支持小规模到大规模数据的聚合,也适用于分区个数很大的情况。在聚合后使用Array排序,可以灵活支持不同的排序需求。

缺点是在内存中进行聚合,内存消耗较大,需要额外的数组进行排序,而且如果有数据spill到磁盘上,还需要再次进行聚合。Spark在Shuffle Write中,使用一个经过特殊设计和优化的Map,命名为PartitionedAppendOnlyMap,可以同时支持聚合和排序操作,相当于Map和Array的合体。

该Shuffle模式适用的操作:适合map()端聚合(combine)、需要或者不需要按Key进行排序、分区个数无限制的应用,如reduceByKey()、aggregateByKey()等。

总结:

Shuffle Write框架需要执行的3个步骤是“数据聚合→排序→分区”。

  • 如果应用中的数据操作不需要聚合,也不需要排序,而且分区个数很少,那么可以采用直接输出模式,即BypassMergeSortShuffleWriter。
  • 为了克服BypassMergeSortShuffleWriter打开文件过多、buffer分配过多的缺点,也为了支持需要按Key进行排序的操作,Spark提供了SortShuffleWriter,使用基于Array排序的方法,以分区id或分区id+Key进行排序,只输出单一的分区文件即可。
  • 最后,为了支持map()端combine操作,Spark提供了基于Map的SortShuffleWriter,将Array替换为类似HashMap的操作来支持聚合操作,在聚合后根据partitionId或分区id+Key对record进行排序,并输出分区文件。因为SortShuffleWriter按分区id进行了排序,所以被称为sort-based Shuffle Write。

Shuffle Read

在Shuffle Read中,也有一个总体框架:“跨节点数据获取->聚合->排序”。Reduce Task从各个Map Task端中获取属于该分区的数据,然后使用Map边获取数据边聚合。聚合完成后,放到Array根据Key进行排序,最后将结果输出或者传递给下一个操作。当然,如果不需要聚合或者排序的算子就可以省下这一两个功能。

1. 不需要聚合(combine)和排序(sort)

这种情况只需要把各个Map Task获取的数据输出到buffer即可,是最简单的情况。

该Shuffle模式的优缺点:优点是逻辑和实现简单,内存消耗很小。缺点是不支持聚合、排序等复杂功能。

该Shuffle模式适用的操作:适合既不需要聚合也不需要排序的应用,如partitionBy()等。

2. 不需要聚合(combine),需要按Key排序

这种情况下,把数据从Map Task端获取后,将buffer中的数据输出到一个Array中。这里仍然使用Shuffle Write的PartitionedPairBuffer进行排序,因此保留了分区id,即使一个Reduce Task中的分区都是相同的。

当内存无法存下数据时,PartitionedPairBuffer就会尝试扩容,若内存仍不够,就会在排序后将数据溢写到磁盘中,当所有数据都接收到后,再将buffer中的数据和磁盘中的数据做merge sort。

该Shuffle模式的优缺点:优点是只需要一个Array结构就可以支持按照Key进行排序,Array大小可控,而且具有扩容和spill到磁盘上的功能,不受数据规模限制。缺点是排序增加计算时延。

该Shuffle模式适用的操作:适合reduce端不需要聚合,但需要按Key进行排序的操作,如sortByKey()、sortBy()等。

3. 需要聚合(combine)不需要或者需要按Key进行排序(sort)

这种情况下,获取数据后会建立一个Map来对数据做聚合(ExternalAppendOnlyMap)。聚合操作和Shuffle Write基本一致,用旧值和新数据来更新新值。

做完聚合操作后,如果需要进行排序,那么就建立一个Array并排序,排序后将结果输出或者传递给下一步操作。

如果Map放不下,那么会先扩容到两倍大小。如果还不够,那么会在排序后溢写到磁盘,数据都处理完后再将内存和磁盘的数据做一次聚合、排序,再将数据交给下一次操作。

该Shuffle模式的优缺点:

优点是只需要一个Map和一个Array结构就可以支持reduce端的聚合和排序功能,Map 具有扩容和spill到磁盘上的功能,支持小规模到大规模数据的聚合。边获取数据边聚合,效率较高。

缺点是需要在内存中进行聚合,内存消耗较大,如果有数据spill到磁盘上,还需要进行再次聚合。另外,经过HashMap聚合后的数据仍然需要拷贝到Array中进行排序,内存消耗较大。在实现中,Spark使用的HashMap是一个经过特殊优化的HashMap,命名为ExternalAppendOnlyMap,可以同时支持聚合和排序操作,相当于HashMap和Array的合体。

该Shuffle模式适用的操作:适合reduce端需要聚合、不需要或需要按Key进行排序的操作,如reduceByKey()、aggregateByKey()等。

总结:

  • Shuffle Read框架需要执行的3个步骤是“数据获取→聚合→排序输出”。如果应用中的数据操作不需要聚合,也不需要排序,那么获取数据后直接输出。
  • 对于需要按Key进行排序的操作,Spark 使用基于Array的方法来对Key进行排序。
  • 对于需要聚合的操作,Spark提供了基于HashMap的聚合方法,同时可以再次使用Array来支持按照Key进行排序。
  • 总体来讲,Shuffle Read框架使用的技术和数据结构与Shuffle Write过程类似,而且由于不需要分区,过程比Shuffle Write更为简单。

为高效聚合和排序所设计的数据结构

为了提高Shuffle的聚合与排序的性能,Spark Shuffle特别设计了三种数据结构,如下图所示。这几种数据结构的基本思想是在内存中对record进行聚合和排序,如果存放不下,则进行扩容,如果还存放不下,就将数据排序后spill到磁盘上,最后将磁盘和内存中的数据进行聚合、排序,得到最终结果。

仔细观察Shuffle Write/Read过程,我们会发现Shuffle机制中使用的数据结构的两个特征:一是只需要支持record的插入和更新操作,不需要支持删除操作,这样我们可以对数据结构进行优化,减少内存消耗;二是只有内存放不下时才需要spill到磁盘上,因此数据结构设计以内存为主,磁盘为辅。Spark中的PartitionedAppendOnlyMap和ExternalAppendOnlyMap都基于AppendOnlyMap实现。因此,我们先介绍AppendOnlyMap的原理。

AppendOnlyMap

AppendOnlyMap实际上是一个只支持record添加和对Value进行更新的HashMap。与Java HashMap采用“数组+链表”实现不同,AppendOnlyMap只使用数组来存储元素,根据元素的Hash值确定存储位置,如果存储元素时发生Hash值冲突,则使用二次地址探测法(Quadratic probing)来解决Hash值冲突。

对于每个新来的〈K,V〉record,先使用Hash(K)计算其存放位置,如果存放位置为空,就把record存放到该位置。如果该位置已经被占用,就使用二次探测法来找下一个空闲位置。对于图6.12中新来的〈K6,V6〉record来说,第1次找到的位置Hash(K6)已被K2占用。按照二次探测法向后递增1个record位置,也就是Hash(K6)+1×2,发现位置已被K3占用,然后向后递增4个record位置(指数递增,Hash(K6)+2×2),发现位置没有被占用,放进去即可。

扩容:AppendOnlyMap使用数组来实现的问题是,如果插入的record太多,则很快会被填满。Spark的解决方案是,如果AppendOnlyMap的利用率达到70%,那么就扩张一倍,扩张意味着原来的Hash()失效,因此对所有Key进行rehash,重新排列每个Key的位置。

排序:由于AppendOnlyMap采用了数组作为底层存储结构,可以支持快速排序等排序算法。实现方法,如图6.13所示,先将数组中所有的〈K,V〉record转移到数组的前端,用begin和end来标示起始位置,然后调用排序算法对[begin,end]中的record进行排序。对于需要按Key进行排序的操作,如sortByKey(),可以按照Key值进行排序;对于其他操作,只按照Key的Hash值进行排序即可。

ExternalAppendOnlyMap

AppendOnlyMap的优点是能够将聚合和排序功能很好地结合在一起,缺点是只能使用内存,难以适用于内存空间不足的情况。为了解决这个问题,Spark基于AppendOnlyMap设计实现了基于内存+磁盘的ExternalAppendOnlyMap,用于Shuffle Read端大规模数据聚合。

ExternalAppendOnlyMap的工作原理是,先持有一个AppendOnlyMap来不断接收和聚合新来的record,AppendOnlyMap快被装满时检查一下内存剩余空间是否可以扩展,可直接在内存中扩展,不可对AppendOnlyMap中的record进行排序,然后将record都spill到磁盘上。因为record不断到来,可能会多次填满AppendOnlyMap,所以这个spill过程可以出现多次,最终形成多个spill文件。等record都处理完,此时AppendOnlyMap中可能还留存一些聚合后的record,磁盘上也有多个spill文件。因为这些数据都经过了部分聚合,还需要进行全局聚合(merge)。因此,ExternalAppendOnlyMap的最后一步是将内存中AppendOnlyMap的数据与磁盘上spill文件中的数据进行全局聚合,得到最终结果。

上述过程中涉及3个核心问题:(1)如何获知当前AppendOnlyMap的大小?因为AppendOnlyMap中不断添加和更新record,其大小是动态变化的,什么时候会超过内存界限是难以确定的。(2)如何设计spill的文件结构,使得可以支持高效的全局聚合?(3)怎样进行全局聚合?

1.AppendOnlyMap的大小估计

虽然我们知道AppendOnlyMap中持有的数组的长度和大小,但数组里面存放的是Key和Value的引用,并不是它们的实际对象(object)大小,而且Value会不断被更新,实际大小不断变化。因此,想准确得到AppendOnlyMap的大小比较困难。一种简单的解决方法是在每次插入record或对现有record的Value进行更新后,都扫描一下AppendOnlyMap中存放的record,计算每个record的实际对象大小并相加,但这样会非常耗时。而且一般AppendOnlyMap会插入几万甚至几百万个record,如果每个record进入AppendOnlyMap都计算一遍,则开销会很大。Spark设计了一个增量式的高效估算算法,在每个record插入或更新时根据历史统计值和当前变化量直接估算当前AppendOnlyMap的大小,算法的复杂度是O(1),开销很小。在record插入和聚合过程中会定期对当前AppendOnlyMap中的record进行抽样,然后精确计算这些record的总大小、总个数、更新个数及平均值等,并作为历史统计值。进行抽样是因为AppendOnlyMap中的record可能有上万个,难以对每个都精确计算。之后,每当有record插入或更新时,会根据历史统计值和历史平均的变化值,增量估算AppendOnlyMap的总大小,详见Spark源码中的SizeTracker.estimateSize()方法。抽样也会定期进行,更新统计值以获得更高的精度。

2.Spill过程与排序

当AppendOnlyMap达到内存限制时,会将record排序后写入磁盘中。排序是为了方便下一步全局聚合(聚合内存和磁盘上的record)时可以采用更高效的merge-sort(外部排序+聚合)。那么,问题是根据什么对record进行排序的?自然想到的是根据record的Key进行排序的,但是这就要求操作定义Key的排序方法,如sortByKey()等操作定义了按照Key进行的排序。大部分操作,如groupByKey(),并没有定义Key的排序方法,也不需要输出结果是按照Key进行排序的。在这种情况下,Spark采用按照Key的Hash值进行排序的方法,这样既可以进行merge-sort,又不要求操作定义Key排序的方法。然而,这种方法的问题是会出现Hash值冲突,也就是不同的Key具有相同的Hash值。为了解决这个问题,Spark在merge-sort的同时会比较Key的Hash值是否相等,以及Key的实际值是否相等。

前面提到过,由于最终的spill文件和内存中的AppendOnlyMap都是经过部分聚合后的结果,其中可能存在相同Key的record,因此还需要一个全局聚合阶段将AppendOnlyMap中的record与spill文件中的record进行聚合,得到最终聚合后的结果。全局聚合的方法就是建立一个最小堆或最大堆,每次从各个spill文件中读取前几个具有相同Key(或者相同Key的Hash值)的record,然后与AppendOnlyMap中的record进行聚合,并输出聚合后的结果。在图6.14中,在全局聚合时,Spark分别从4个spill文件中提取第1个〈K,V〉record,与还留在AppendOnlyMap中的第1个record组成最小堆,然后不断从最小堆中提取具有相同Key的record进行聚合(merge)。然后,Spark继续读取spill文件及AppendOnlyMap中的record填充最小堆,直到所有record处理完成。由于每个spill文件中的record是经过排序的,按顺序读取和聚合可以保证能够对每个record得到全局聚合的结果。

总结:ExternalAppendOnlyMap是一个高性能的HashMap,只支持数据插入和更新,但可以同时利用内存和磁盘对大规模数据进行聚合和排序,满足了Shuffle Read阶段数据聚合、排序的需求。

PartitionedAppendOnlyMap

PartitionedAppendOnlyMap用于在Shuffle Write端对record进行聚合(combine)。PartitionedAppendOnlyMap的功能和实现与ExternalAppendOnlyMap的功能和实现基本一样,唯一区别是PartitionedAppendOnlyMap中的Key是“PartitionId+Key”,这样既可以根据partitionId进行排序(面向不需要按Key进行排序的操作),也可以根据partitionId+Key进行排序(面向需要按Key进行排序的操作),从而在Shuffle Write阶段可以进行聚合、排序和分区。

PartitionedPairBuffer

PartitionedPairBuffer本质上是一个基于内存+磁盘的Array,随着数据添加,不断地扩容,当到达内存限制时,就将Array中的数据按照partitionId或partitionId+Key进行排序,然后spill到磁盘上,该过程可以进行多次,最后对内存中和磁盘上的数据进行全局排序,输出或者提供给下一个操作。

参考:许利杰 方亚芬《大数据处理框架Apache Spark设计与实现》

Spark Shuffle相关推荐

  1. Spark Shuffle原理解析

    Spark Shuffle原理解析 一:到底什么是Shuffle? Shuffle中文翻译为"洗牌",需要Shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节 ...

  2. Spark shuffle调优

    Spark shuffle是什么 Shuffle在Spark中即是把父RDD中的KV对按照Key重新分区,从而得到一个新的RDD.也就是说原本同属于父RDD同一个分区的数据需要进入到子RDD的不同的分 ...

  3. Spark Shuffle两种Manager

    文章目录 前言 hashShuffleManager 1.普通机制 缺陷 2.合并机制-优化 sortShuffleManager 1.普通机制 2.byPass机制 总结: 前言 reduceByK ...

  4. spark shuffle内在原理说明

    在MapReduce框架中,shuffle是连接Map和Reduce之间的桥梁,Map的输出要用到Reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量 ...

  5. 022 Spark shuffle过程

    1.官网  http://spark.apache.org/docs/1.6.1/configuration.html#shuffle-behavior Spark数据进行重新分区的操作就叫做shuf ...

  6. Spark Shuffle 解析

    5.Spark Shuffle 解析 5.1 Shuffle 的核心要点 5.1.1 ShuffleMapStage 与 FinalStage 在划分 stage 时,最后一个 stage 称为 Fi ...

  7. spark shuffle的写操作之准备工作

    前言 在前三篇文章中,spark 源码分析之十九 -- DAG的生成和Stage的划分 剖析了DAG的构建和Stage的划分,spark 源码分析之二十 -- Stage的提交 剖析了TaskSet任 ...

  8. 阿里云Spark Shuffle的优化

    转自:大数据技术与架构 本次分享者:辰石,来自阿里巴巴计算平台事业部EMR团队技术专家,目前从事大数据存储以及Spark相关方面的工作. Spark Shuffle介绍 Smart Shuffle设计 ...

  9. Spark Shuffle Write阶段磁盘文件分析

    流程分析 入口处: org.apache.spark.scheduler.ShuffleMapTask.runTask override def runTask(context: TaskContex ...

  10. 一文搞清楚 Spark shuffle 调优

    Spark shuffle 调优 Spark 基于内存进行计算,擅长迭代计算,流式处理,但也会发生shuffle 过程.shuffle 的优化,以及避免产生 shuffle 会给程序提高更好的性能.因 ...

最新文章

  1. HDU1217(Bellman-ford和Floyd两种算法)
  2. Git/码云上多人协作,创建分支,上传修改
  3. 需求实在太旺盛,三星电子考虑扩大在华芯片产能
  4. 巨一自动化工业机器人_2021第11届深圳国际工业自动化及机器人展览会
  5. python计算机视觉2:图像边缘检测
  6. E/Trace: error opening trace file: No such file or directory
  7. 计算机网络工程教程:基于cisco路由器和交换机,计算机网络工程教程—基于cisco路由器和交换机教学课件作者陆魁军chap2交换机VLAN设置v2.ppt...
  8. ZooKeeper学习第七期--ZooKeeper一致性原理(转)
  9. 白日门手游luac文件加密怎么解密_浅析android手游lua脚本的加密与解密
  10. search_web_resources
  11. 专属资料包——Ps基础、788款设计师字体、Ps品牌样机、《20款顶级vi品牌手册》、100款以上世界级ui设计、《世界级插画集》
  12. UIUC计算机科学系博士,伊利诺伊大学香槟分校计算机系统博士排名
  13. 用mac系统怎么连宽带连接服务器吗,苹果电脑怎么连宽带_MAC系统怎么连接有线宽带-win7之家...
  14. android 视频插件下载,轻视频动态壁纸插件
  15. 解决-Dmaven.multiModuleProjectDirectory system property is not set. Check $M2_HO问题
  16. 【Python学习笔记】第一章基础知识:格式化输出,转义字符,变量类型转换,算术运算符,运算符优先级和赋值运算符,逻辑运算符,世界杯案例题目,条件判断if语句,猜拳游戏与三目运算符
  17. xhtml转xsl-fo
  18. 女工程师独家揭秘:双11秘密武器阿里云数据库团队故事
  19. Unity常用API详解--初学必备
  20. 36套模具3D+2D原图+结构图

热门文章

  1. USB等高速差分信号是否需要包地?
  2. linux一键电影网站脚本,Linux下HTML5播放器一键生成脚本
  3. 旅游行业网站怎么搭建?
  4. 不能联网的服务器上安装R包
  5. 黑马程序员_配置环境变量
  6. 潮人篮球怎么在电脑上玩 潮人篮球电脑版玩法教程
  7. 迭代计划和故事点真正的阻力在哪里?——敏捷是人猿相揖别
  8. 七种方法,教你培养持续学习的习惯
  9. 人工智能会取代人类,会毁灭人类吗?
  10. less与sass的区别是什么?