spark shuffle再补充
shuffle概览
一个spark的RDD有一组固定的分区组成,每个分区有一系列的记录组成。对于由窄依赖变换(例如map和filter)返回的RDD,会延续父RDD的分区信息,以pipeline的形式计算。每个对象仅依赖于父RDD中的单个对象。诸如coalesce之类的操作可能导致任务处理多个输入分区,但转换仍然被认为是窄依赖的,因为一个父RDD的分区只会被一个子RDD分区继承。
Spark还支持宽依赖的转换,例如groupByKey和reduceByKey。在这些依赖项中,计算单个分区中的记录所需的数据可以来自于父数据集的许多分区中。要执行这些转换,具有相同key的所有元组必须最终位于同一分区中,由同一任务处理。为了满足这一要求,Spark产生一个shuffle,它在集群内部传输数据,并产生一个带有一组新分区的新stage。
可以看下面的代码片段:
sc.textFile("someFile.txt").map(mapFunc).flatMap(flatMapFunc).filter(filterFunc).count()
上面的代码片段只有一个action操作,count,从输入textfile到action经过了三个转换操作。这段代码只会在一个stage中运行,因为,三个转换操作没有shuffle,也即是三个转换操作的每个分区都是只依赖于它的父RDD的单个分区。
但是,下面的单词统计就跟上面有很大区别:
val tokenized = sc.textFile(args(0)).flatMap(_.split(' '))
val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)
val filtered = wordCounts.filter(_._2 >= 1000)
val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _)
charCounts.collect()
这段代码里有两个reducebykey操作,三个stage。
下面图更复杂,因为有一个join操作:
粉框圈住的就是整个DAG的stage划分。
在每个stage的边界,父stage的task会将数据写入磁盘,子stage的task会将数据通过网络读取。由于它们会导致很高的磁盘和网络IO,所以shuffle代价相当高,应该尽量避免。父stage的数据分区往往和子stage的分区数不同。触发shuffle的操作算子往往可以指定分区数的,也即是numPartitions代表下个stage会有多少个分区。就像mr任务中reducer的数据是非常重要的一个参数一样,shuffle的时候指定分区数也将在很大程度上决定一个应用程序的性能。
优化shuffle
通常情况可以选择使用产生相同结果的action和transform相互替换。但是并不是产生相同结果的算子就会有相同的性能。通常避免常见的陷阱并选择正确的算子可以显著提高应用程序的性能。
当选择转换操作的时候,应最小化shuffle次数和shuffle的数据量。shuffle是非常消耗性能的操作。所有的shuffle数据都会被写入磁盘,然后通过网络传输。repartition , join, cogroup, 和 *By 或者 *ByKey 类型的操作都会产生shuffle。我们可以对一下几个操作算子进行优化:
1. groupByKey某些情况下可以被reducebykey代替。
2. reduceByKey某些情况下可以被 aggregatebykey代替。
3. flatMap-join-groupBy某些情况下可以被cgroup代替。
no shuffle
在某些情况下,前面描述的转换操作不会导致shuffle。当先前的转换操作已经使用了和shuffle相同的分区器分区数据的时候,spark就不会产生shuffle。
举个例子:
rdd1 = someRdd.reduceByKey(...)rdd2 = someOtherRdd.reduceByKey(...)rdd3 = rdd1.join(rdd2)
由于使用redcuebykey的时候没有指定分区器,所以都是使用的默认分区器,会导致rdd1和rdd2都采用的是hash分区器。两个reducebykey操作会产生两个shuffle过程。如果,数据集有相同的分区数,执行join操作的时候就不需要进行额外的shuffle。由于数据集的分区相同,因此rdd1的任何单个分区中的key集合只能出现在rdd2的单个分区中。因此,rdd3的任何单个输出分区的内容仅取决于rdd1中单个分区的内容和rdd2中的单个分区,并且不需要第三个shuffle。
例如,如果someRdd有四个分区,someOtherRdd有两个分区,而reduceByKeys都使用三个分区,运行的任务集如下所示:
如果rdd1和rdd2使用不同的分区器或者相同的分区器不同的分区数,仅仅一个数据集在join的过程中需要重新shuffle
在join的过程中为了避免shuffle,可以使用广播变量。当executor内存可以存储数据集,在driver端可以将其加载到一个hash表中,然后广播到executor。然后,map转换可以引用哈希表来执行查找。
增加shuffle
有时候需要打破最小化shuffle次数的规则。
当增加并行度的时候,额外的shuffle是有利的。例如,数据中有一些文件是不可分割的,那么该大文件对应的分区就会有大量的记录,而不是说将数据分散到尽可能多的分区内部来使用所有已经申请cpu。在这种情况下,使用reparition重新产生更多的分区数,以满足后面转换算子所需的并行度,这会提升很大性能。
使用reduce和aggregate操作将数据聚合到driver端,也是修改区数的很好的例子。
在对大量分区执行聚合的时候,在driver的单线程中聚合会成为瓶颈。要减driver的负载,可以首先使用reducebykey或者aggregatebykey执行一轮分布式聚合,同时将结果数据集分区数减少。实际思路是首先在每个分区内部进行初步聚合,同时减少分区数,然后再将聚合的结果发到driver端实现最终聚合。典型的操作是treeReduce 和 treeAggregate。
当聚合已经按照key进行分组时,此方法特别适用。例如,假如一个程序计算语料库中每个单词出现的次数,并将结果使用map返回到driver。一种方法是可以使用聚合操作完成在每个分区计算局部map,然后在driver中合并map。可以用aggregateByKey以完全分布的方式进行统计,然后简单的用collectAsMap将结果返回到driver。
转自:https://mp.weixin.qq.com/mp/profile_ext?action=home&__biz=MzA3MDY0NTMxOQ==&scene=124#wechat_redirect
spark shuffle再补充相关推荐
- Spark Shuffle两种Manager
文章目录 前言 hashShuffleManager 1.普通机制 缺陷 2.合并机制-优化 sortShuffleManager 1.普通机制 2.byPass机制 总结: 前言 reduceByK ...
- 022 Spark shuffle过程
1.官网 http://spark.apache.org/docs/1.6.1/configuration.html#shuffle-behavior Spark数据进行重新分区的操作就叫做shuf ...
- Spark Shuffle 解析
5.Spark Shuffle 解析 5.1 Shuffle 的核心要点 5.1.1 ShuffleMapStage 与 FinalStage 在划分 stage 时,最后一个 stage 称为 Fi ...
- spark shuffle的写操作之准备工作
前言 在前三篇文章中,spark 源码分析之十九 -- DAG的生成和Stage的划分 剖析了DAG的构建和Stage的划分,spark 源码分析之二十 -- Stage的提交 剖析了TaskSet任 ...
- Spark Shuffle Write阶段磁盘文件分析
流程分析 入口处: org.apache.spark.scheduler.ShuffleMapTask.runTask override def runTask(context: TaskContex ...
- 一文搞清楚 Spark shuffle 调优
Spark shuffle 调优 Spark 基于内存进行计算,擅长迭代计算,流式处理,但也会发生shuffle 过程.shuffle 的优化,以及避免产生 shuffle 会给程序提高更好的性能.因 ...
- Spark(Shuffle)
2019独角兽企业重金招聘Python工程师标准>>> Shuffle Shuffle是Spark对各分区的数据进行重新分布的机制,是一个复杂而且代价较高的操作, 因为一般需要在执行 ...
- Spark Shuffle详解剖析
HashShuffle 一:概述 所谓Shuffle就是将不同节点上相同的Key拉取到一个节点的过程.这之中涉及到各种IO,所以执行时间势必会较长,Spark的Shuffle在1.2之前默认的计算引擎 ...
- Spark内核解析之五:Spark Shuffle解析
Shuffle的核心要点 1. ShuffleMapStage与ResultStage 在划分stage时,最后一个stage称为finalStage,它本质上是一个ResultStage对象,前面的 ...
最新文章
- soapui工具_python接口自动化(四)--接口测试工具介绍(详解)
- 计算机系统—CPU结构和内部工作
- python手机版代码-Python手机号码归属地查询代码
- Browser控制台分析
- go语言实现斐波那契
- cocos2dx遇到的坑1
- sendmsg返回值_[求助]怎么处理sendmessage的返回值
- 2016河北省职称计算机考试试题及答案,2016年最新河北省职称计算机考试试题及答案..doc...
- SN3FAP反激式开关电源12V2A输出电路
- JAVA 通过POI实现Excel从单元格选择下拉选项
- 高跟鞋,五角星与黄金分割比
- android虚拟机获取root权限,Android虚拟机获取root权限
- 开博尔智能android播放器,高端安卓播放器的选择——开博尔Q10Plus 二代 4K高清播放器...
- 爬虫技术原来可以做这么多牛逼哄哄的事情!
- Django学习笔记
- Mybatis深层理解之mybatis到底为我们做了什么?
- 【Seedlabs】ARP Cache Poisoning Attack Lab
- PHPstudy 数据库基本操作
- 在Centos/Linux系统下使用Phalcon开发PHP项目
- TensorFlow报错:AttributeError: module 'tensorflow._api.v1.train' has no attribute 'SummaryWriter'等
热门文章
- 牛客多校2 - Greater and Greater(bitset优化暴力)
- UVA - 10480 Sabotage(最小割-最大流+输出割边)
- CH - 0104 起床困难综合症(位运算+贪心)
- 中石油训练赛 - Faulhaber’s Triangle(打表)
- python pcm 分贝_语音文件 pcm 静默(静音)判断
- 给网游写一个挂吧(二) – 启动外挂上
- cocos2d-x游戏开发(八)各类构造器
- C++线程池原理及创建(转)
- 假笨说-谨防JDK8重复类定义造成的内存泄漏
- 面试官让我说出2种@Transactional注解的失效场景,我一口气给他说了六种