一:到底什么是Shuffle?

Shuffle中文翻译为“洗牌”,需要Shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节点上进行计算。

二:Shuffle可能面临的问题?运行Task的时候才会产生Shuffle(Shuffle已经融化在Spark的算子中了)。

1, 数据量非常大;

2, 数据如何分类,即如何Partition,Hash、Sort、钨丝计算;

3, 负载均衡(数据倾斜);

4, 网络传输效率,需要在压缩和解压缩之间做出权衡,序列化和反序列也是要考虑的问题;

说明:具体的Task进行计算的时候尽一切最大可能使得数据具备Process Locality的特性;退而求次是增加数据分片,减少每个Task处理的数据量。

三:Hash Shuffle

1, key不能是Array;

2, Hash Shuffle不需要排序,此时从理论上讲就节省了Hadoop MapReduce中进行Shuffle需要排序时候的时间浪费,因为实际生产环境有大量的不需要排序的Shuffle类型;

思考:不需要排序的Hash Shuffle是否一定比需要排序的Sorted Shuffle速度更快?不一定!如果数据规模比较小的情形下,Hash Shuffle会比Sorted Shuffle速度快(很多)!但是如果数据量大,此时Sorted Shuffle一般都会比Hash Shuffle快(很多)

3,每个ShuffleMapTask会根据key的哈希值计算出当前的key需要写入的Partition,然后把决定后的结果写入当单独的文件,此时会导致每个Task产生R(指下一个Stage的并行度)个文件,如果当前的Stage中有M个ShuffleMapTask,则会M*R个文件!!!

注意:Shuffle操作绝大多数情况下都要通过网络,如果Mapper和Reducer在同一台机器上,此时只需要读取本地磁盘即可。

Hash Shuffle的两大死穴:第一:Shuffle前会产生海量的小文件于磁盘之上,此时会产生大量耗时低效的IO操作;第二:内存不共用!!!由于内存中需要保存海量的文件操作句柄和临时缓存信息,如果数据处理规模比较庞大的话,内存不可承受,出现OOM等问题!

三:Sorted Shuffle:

为了改善上述的问题(同时打开过多文件导致Writer Handler内存使用过大以及产生过度文件导致大量的随机读写带来的效率极为低下的磁盘IO操作),Spark后来推出了Consalidate机制,来把小文件合并,此时Shuffle时文件产生的数量为cores*R,对于ShuffleMapTask的数量明显多于同时可用的并行Cores的数量的情况下,Shuffle产生的文件会大幅度减少,会极大降低OOM的可能;

为此Spark推出了Shuffle Pluggable开放框架,方便系统升级的时候定制Shuffle功能模块,也方便第三方系统改造人员根据实际的业务场景来开放具体最佳的Shuffle模块;核心接口ShuffleManager,具体默认实现有HashShuffleManager、SortShuffleManager等,Spark 1.6.0中具体的配置如下:

一:为什么需要Sort-Based Shuffle?

1,Shuffle一般包含两阶段任务:第一部分,产生Shuffle数据的阶段(Map阶段,额外补充,需要实现ShuffleManager中getWriter来写数据(数据可以BlockManager写到Memory、Disk、Tachyon等,例如像非常快的Shuffle,此时可以考虑把数据写在内存中,但是内存不稳定,建议采用MEMORY_AND_DISK方式));第二部分,使用Shuffle数据的阶段(Reduce阶段,额外的补充,需要实现ShuffleManager的getReader,Reader会向Driver去获取上一下Stage产生的Shuffle数据);

2,Spark的Job会被划分成很多Stage:

如果只有一个Stage,则这个Job就相当于只有一个Mapper阶段,当然不会产生产生Shuffle,适合于简单的ETL;

如果不止一个Stage,则最后一个Stage就是最终的Reducer,最左侧的第一个Stage就仅仅是整个Job的Mapper,中间所有的任意一个Stage是其父Stage的Reducer且是其子Stage的Mapper;

3,Spark Shuffle在最开始的时候只支持Hash-based Shuffle:默认Mapper阶段会为Reducer阶段的每一个Task单独创建一个文件来保存该Task中要使用的数据,但是在一些情况下(例如数据量非常大的情况)会造成大量文件(M*R,其中M代表Mapper中的所有的并行任务数量,R代表Reducer中所有的并行任务数量)的随机磁盘I/O操作且会性能大量的Memory消耗(极易造成OOM),这是致命的问题,因为第一不能够处理大规模的数据,第二Spark不能够运行在大规模的分布式集群上!后来的改善方式是加入了Shuffle Consolidate机制来将Shuffle时候产生的文件数量减少到C*R个(C代表在Mapper端同时能够使用的Cores的数量,R代表Reducer中所有的并行任务数量),但是此时如果Reducer端的并行数据分片过多的话则C*R可能已经过大,此时依旧没有逃脱文件打开过多的厄运!!!

 

Spark在引入Sort-based Shuffle(Spark 1.1版本以前)以前比较适用于中小规模的大数据处理!

4,为了让Spark在更大规模的集群上更高性能处理更大规模的数据,于是就引入了Sort-based Shuffle!从此以后(Spark 1.1版本开始),Spark可以胜任任意规模(包含PB级别及PB以上的级别)的大数据的处理,尤其是随着钨丝计划的引入和优化,把Spark更快速的在更大规模的集群处理更海量的数据的能力推向了一个新的巅峰!

5,Spark 1.6版本支持至少三种类型Shuffle:

// Let the user specify short names for shuffle managers
val shortShuffleMgrNames =Map(
  "hash"-> "org.apache.spark.shuffle.hash.HashShuffleManager",
  "sort"-> "org.apache.spark.shuffle.sort.SortShuffleManager",
  "tungsten-sort"-> "org.apache.spark.shuffle.sort.SortShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager","sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase,shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

实现ShuffleManager接口可以更具自己的业务实际需要最优化的使用自定义的Shuffle实现;

6,Spark 1.6默认采用的就是Sort-based Shuffle的方式:

val shuffleMgrName =conf.get("spark.shuffle.manager","sort")

上述的源码说明,你可以在Spark的配置文件中配置Spark框架运行时要使用的具体的ShuffleManager的实现

修改conf/spark-default.conf, 加入如下内容:

spark.shuffle.manager SORT

Sort-based Shuffle不会为每个Reducer中的Task生成一个单独的文件,相反,Sort-based Shuffle会把Mapper中每个ShuffleMapTask所有的输出数据Data只写到一个文件中,因为每个ShuffleMapTask的中的数据会被分类,所以Sort-based Shuffle使用了index文件存储具体ShuffleMapTask输出数据在同一个Data文件中是如何分类的信息!!!所以说基于Sort-base的Shuffle会在Mapper中的每一个ShuffleMapTask中产生两个文件:Data文件和Index文件,其中Data文件是存储当前Task的Shuffle输出的,而Index文件中则存储数据了Data文件中的数据通过Partitioner的分类信息,此时下一个阶段的Stage中的Task就是根据这个Index文件获取自己所要抓取的上一个Stage中的ShuffleMapTask产生的数据的;

Sort-based Shuffle会产生2M(M代表Mapper阶段中并行的Partition的总数量,其实就是Mapper端ShuffleMapTask的总数量)个Shuffle临时文件!!!

回顾整个Shuffle的历史,Shuffle产生的临时文件的数量的变化一次为:

Basic Hash Shuffle:M*R;

Consalidate方式的Hash Shuffle:C*R;

Sort-based Shuffle:2M;

二:在集群中动手实战Sort-based Shuffle

通过动手实践确实证明了Sort-based Shuffle产生了2M个文件!!!

shuffle_0_0_0.data

shuffle_0_3_0.index

在Sort-based Shuffle的中Reducer是如何获取自己需要的数据的呢?具体而言,Reducer首先找Driver去获取父Stage中每个ShuffleMapTask输出的位置信息,根据位置信息获取index文件,解析index文件,从解析的index文件中获取Data文件中属于自己的那部分内容;

三:默认Sort-based Shuffle的几个缺陷:

1, 如果Mapper中Task的数量过大,依旧会产生很多小文件;此时在Shuffle传递数据的过程中到Reducer端,reduce会需要同时大量的记录来进行反序列化,导致大量的内存消耗和GC的巨大负担,造成系统缓慢甚至崩溃!

2, 如果需要在分片内也进行排序的话,此时需要进行Mapper端和Reducer端的两次排序!!!

valshortShuffleMgrNames =Map(
  "hash"-> "org.apache.spark.shuffle.hash.HashShuffleManager",
  "sort"-> "org.apache.spark.shuffle.sort.SortShuffleManager",
  "tungsten-sort"-> "org.apache.spark.shuffle.sort.SortShuffleManager")

spark shuffle 内幕彻底解密相关推荐

  1. Hash-based Shuffle内幕彻底解密

    Hash-based Shuffle内幕彻底解密 视频学习来源:DT-大数据梦工厂 IMF传奇行动视频 本期内容: 1 Hash Shuffle彻底解密 2 Shuffle Pluggable解密 3 ...

  2. spark Hash Shuffle内幕彻底解密

    本博文的主要内容: 1.Hash Shuffle彻底解密 2.Shuffle Pluggable解密 3.Sorted Shuffle解密 4.Shuffle性能优化 一:到底什么是Shuffle? ...

  3. 第42课: Spark Broadcast内幕解密:Broadcast运行机制彻底解密、Broadcast源码解析、Broadcast最佳实践

    第42课:  Spark Broadcast内幕解密:Broadcast运行机制彻底解密.Broadcast源码解析.Broadcast最佳实践 Broadcast在机器学习.图计算.构建日常的各种算 ...

  4. 《Spark商业案例与性能调优实战100课》第27课:彻底解密Spark Shuffle令人费解的6大经典问题

    <Spark商业案例与性能调优实战100课>第27课:彻底解密Spark Shuffle令人费解的6大经典问题

  5. Spark技术内幕:Shuffle Read的整体流程

    回忆一下,每个Stage的上边界,要么需要从外部存储读取数据,要么需要读取上一个Stage的输出:而下边界,要么是需要写入本地文件系统(需要Shuffle),以供childStage读取,要么是最后一 ...

  6. Spark技术内幕: Task向Executor提交的源代码解析

    在上文<Spark技术内幕:Stage划分及提交源代码分析>中,我们分析了Stage的生成和提交.可是Stage的提交,仅仅是DAGScheduler完毕了对DAG的划分,生成了一个计算拓 ...

  7. Spark Shuffle源码分析系列之UnsafeShuffleWriter

    前面我们介绍了BypassMergeSortShuffleWriter和SortShuffleWriter,知道了它们的应用场景和实现方式,本节我们来看下UnsafeShuffleWriter,它使用 ...

  8. Spark Shuffle原理解析

    Spark Shuffle原理解析 一:到底什么是Shuffle? Shuffle中文翻译为"洗牌",需要Shuffle的关键性原因是某种具有共同特征的数据需要最终汇聚到一个计算节 ...

  9. Spark shuffle调优

    Spark shuffle是什么 Shuffle在Spark中即是把父RDD中的KV对按照Key重新分区,从而得到一个新的RDD.也就是说原本同属于父RDD同一个分区的数据需要进入到子RDD的不同的分 ...

最新文章

  1. android 后台服务拍照,Android实现后台开启服务默默拍照功能
  2. Le Chapitre VI
  3. golang辟谣之模板编程
  4. summerDao-比mybatis更强大无需映射配置的dao工具
  5. 通用无线设备对码软件_电动车上的无线电池管理系统wBMS
  6. 2-zookeeper、ha
  7. 数据结构与算法——栈、队列、堆汇总整理
  8. PROFINET工业以太网教程(10)——GSD文件
  9. 友基s400手写板怎么安装_电脑怎么安装手写板 手写板安装方法【教程】
  10. 多线程+反爬:爬取阿里巴巴国际站物流表现数据到mysql
  11. 5G+智能电网应用项目开建,将带来何种“活力”?
  12. 记一次计算机课作文,记一次有趣的作文课作文800字
  13. vue3+vite assets动态引入图片的几种方式,解决打包后图片路径错误不显示的问题
  14. vue 调起浏览器打印
  15. 三足鼎立写博赚钱之道--献给2010年初互联网扫黄运动被错杀的兄弟
  16. 致远SPM解决方案之库存管理
  17. 全球及中国多普勒导航仪行业投资分析及前景预测报告2022-2028年
  18. 杰理6905A芯片引脚的设置
  19. 安卓wifi调试助手(单片机wifi上位机)
  20. Java 搭建高级画图板

热门文章

  1. K-空间 在图像变换及重建
  2. 激光SLAM保存pcd点云地图
  3. 爬取QQ音乐——新手不知道的那些坑 之 中英文(全/半角)冒号坑
  4. 科学数据库(Pandas)——第二节 pandas之DataFrame
  5. 如果取消Windows Ink后ps画笔没有压感
  6. 任务管理器计算机快捷键,win10任务管理快捷键是什么_教你用快捷键打开任务管理器的方法...
  7. 正则验证邮箱、8到16位数字字母特殊符号组合
  8. ANSYS Workbench中多场耦合下不同模块间的信息共享设置
  9. 《Java从入门到项目实战(全程视频版)》(李兴华 著)【配套资源及赠送资源】
  10. 项目资源管理-日历图