spark数据倾斜问题

Spark 中的数据倾斜问题主要指 shuffle 过程中出现的数据倾斜问题,是由于不同的 key对应的数据量不同导致的不同 task 所处理的数据量不同的问题。

例如,reduce 点一共要处理 100 万条数据,第一个和第二个 task 分别被分配到了 1 万条数据,计算 5 分钟内完成,第三个 task 分配到了 98 万数据,此时第三个 task 可能需要 10
个小时完成,这使得整个 Spark 作业需要 10 个小时才能运行完成,这就是数据倾斜所带来的后果。

数据倾斜的表现:
➢ Spark 作业的大部分 task 都执行迅速,只有有限的几个 task 执行的非常慢,此时可能出现了数据倾斜,作业可以运行,但是运行得非常慢;

➢ Spark 作业的大部分 task 都执行迅速,但是有的 task 在运行过程中会突然报出 OOM,反复执行几次都在某一个 task 报出 OOM 错误,此时可能出现了数据倾斜,
作业无法正常运行。

定位数据倾斜问题:
➢ 查阅代码中的 shuffle 算子,例如 reduceByKey、countByKey、groupByKey、join 等算子,根据代码逻辑判断此处是否会出现数据倾斜;

➢ 查看 Spark 作业的 log 文件,log 文件对于错误的记录会精确到代码的某一行,可以根据异常定位到的代码位置来明确错误发生在第几个 stage,对应的 shuffle 算子是哪一个;

方案1、过滤导致数据倾斜的key值

如果在 Spark 作业中允许丢弃某些数据,那么可以考虑将可能导致数据倾斜的 key 进行
过滤,滤除可能导致数据倾斜的 key 对应的数据,这样,在 Spark 作业中就不会发生数据倾斜了。

方案2、提高 shuffle 操作中的 reduce 并行度

可以考虑提高 shuffle 过程中的 reduce 端并行度,reduce 端并行度的提高就增加了 reduce 端 task 的数量,那么每个 task分配到的数据量就会相应减少,由此缓解数据倾斜问题。

(1)reduce端并行度的设置

在大部分的 shuffle 算子中,都可以传入一个并行度的设置参数,比如 reduceByKey(500),这个参数会决定 shuffle 过程中 reduce 端的并行度,
在进行 shuffle 操作的时候,就会对应着创建指定数量的 reduce task。

对于 Spark SQL 中的 shuffle 类语句,比如 group by、join 等,需要设置一个参数,即 spark.sql.shuffle.partitions,该参数代表了 shuffle read task 的并行度,
该值默认是 200,对于很多场景来说都有点过小。

增加 shuffle read task 的数量,可以让原本分配给一个 task 的多个 key 分配给多个 task,从而让每个 task 处理比原来更少的数据。

(2) reduce端并行度设置存在的缺陷

提高 reduce 端并行度并没有从根本上改变数据倾斜的本质和问题,只是尽可能地去缓解和减轻 shuffle reduce task 的数据压力,
以及数据倾斜的问题,适用于有较多key对应的数据量都比较大的情况。

该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个 key 对应的数据量有 100 万,那么无论你的 task 数量增加到多少,这个对应着 100 万数据的 key 肯定还是会分配到一个 task 中去处理,因此注定还是会发生数据倾斜的。
所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用最简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。

方案3、使用随机key实现双重聚合(重点,仅适用聚合类算子)

当使用了类似于 groupByKey、reduceByKey 这样的算子时,可以考虑使用随机 key 实现双重聚合

首先,通过 map 算子给每个数据的 key 添加随机数前缀,对 key 进行打散,将原先一样的 key 变成不一样的 key,然后进行第一次聚合,
这样就可以让原本被一个 task 处理的数据分散到多个 task 上去做局部聚合;

随后,去除掉每个 key 的前缀,再次进行聚合。此方法对于由 groupByKey、reduceByKey 这类算子造成的数据倾斜有比较好的效果,仅仅适用于聚合类的 shuffle 操作,适用范围相对较窄。

如果是 join 类的 shuffle 操作,还得用其他的解决方案。此方法也是前几种方案没有比较好的效果时要尝试的解决方案。

方案4、将reduce join转换为map端join(其中一个RDD数据量较小)

正常情况下,join 操作都会执行 shuffle 过程,并且执行的是 reduce join,也就是先将所有相同的 key 和对应的 value 汇聚到一个 reduce task 中,然后再进行 join。

普通的 join 是会走 shuffle 过程的,而一旦 shuffle,就相当于会将相同 key 的数据拉取到一个 shuffle read task 中再进行 join,此时就是 reduce join。
但是如果一个 RDD 是比较小的,则可以采用广播小 RDD 全量数据 + map 算子来实现与 join 同样的效果,也就是 map join,此时就不会发生 shuffle 操作,也就不会发生数据倾斜。
(注意,RDD 是并不能进行广播的,只能将 RDD 内部的数据通过 collect 拉取到 Driver 内存然后再进行广播)

object Spark_BoardCast {def main(args: Array[String]): Unit = {//1.创建SparkConf并设置App名称val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest01").setMaster("local[*]")//2.创建SparkContext,该对象是提交Spark App的入口val sc: SparkContext = new SparkContext(conf)val rdd1 = sc.makeRDD(List( ("a",1), ("b", 2), ("c", 3), ("d", 4) ),4)val list = List( ("a",4), ("b", 5), ("c", 6), ("d", 7) )// 声明广播变量val broadcast: Broadcast[List[(String, Int)]] = sc.broadcast(list)val resultRDD: RDD[(String, (Int, Int))] = rdd1.map {case (key, num) => {var num2 = 0//for((k,v) <- list)// 使用广播变量for ((k, v) <- broadcast.value) {if (k == key) {num2 = v}}(key, (num, num2))}}resultRDD.foreach(println)}
}

核心思路:
不使用 join 算子进行连接操作,而使用 Broadcast 变量与 map 类算子实现 join 操作,进而完全规避掉 shuffle 类的操作,彻底避免数据倾斜的发生和出现。

将较小 RDD 中的数据直接通过 collect 算子拉取到 Driver 端的内存中来,然后对其创建一个 Broadcast 变量;接着对另外一个 RDD 执行 map 类算子,在算子函数内,
从 Broadcast 变量中获取较小 RDD 的全量数据,与当前 RDD 的每一条数据按照连接 key 进行比对,如果连接 key 相同的话,那么就将两个 RDD 的数据用你需要的方式连接起来。

根据上述思路,根本不会发生 shuffle 操作,从根本上杜绝了 join 操作可能导致的数据倾斜问题。
当 join 操作有数据倾斜问题并且其中一个 RDD 的数据量较小时,可以优先考虑这种方式,效果非常好。

不适用场景分析:
由于Spark 的广播变量是在每个Executor中保存一个副本,如果两个 RDD数据量都比较大,那么如果将一个数据量比较大的 RDD 做成广播变量,那么很有可能会造成内存溢出。

方案5、sample采样对倾斜key单独进行join

在 Spark 中,如果某个 RDD 只有一个 key,那么在 shuffle 过程中会默认将此 key 对应的数据打散,由不同的 reduce 端 task 进行处理。

当由单个 key 导致数据倾斜时,可将发生数据倾斜的 key 单独提取出来,组成一个RDD,然后用这个原本会导致倾斜的 key 组成的 RDD 和其他 RDD 单独 join,此时,根据
Spark 的运行机制,此 RDD 中的数据会在 shuffle 阶段被分散到多个 task 中去进行 join 操作。
倾斜 key 单独 join 的流程如图所示

1、适用场景分析:
对于 RDD 中的数据,可以将其转换为一个中间表,或者是直接使用 countByKey()的方式,看一个这个 RDD 中各个 key 对应的数据量,此时如果你发现整个 RDD 就一个 key 的
数据量特别多,那么就可以考虑使用这种方法。

当数据量非常大时,可以考虑使用 sample 采样获取 10%的数据,然后分析这 10%的数据中哪个 key 可能会导致数据倾斜,然后将这个 key 对应的数据单独提取出来。

2、不适用场景分析:

如果一个 RDD 中导致数据倾斜的 key 很多,那么此方案不适用。

方案6、使用随机数扩容进行join(大量key导致数据倾斜)

如果在进行 join 操作时,RDD 中有大量的 key 导致数据倾斜,那么进行分拆 key 也没什么意义,此时就只能使用最后一种方案来解决问题了,对于 join 操作,我们可以考虑对其
中一个 RDD 数据进行扩容,另一个 RDD 进行稀释后再 join。

我们会将原先一样的 key 通过附加随机前缀变成不一样的 key,然后就可以将这些处理后的“不同 key”分散到多个 task 中去处理,而不是让一个 task 处理大量的相同 key。
这一种方案是针对有大量倾斜 key 的情况,没法将部分 key 拆分出来进行单独处理,需要对整个RDD 进行数据扩容,对内存资源要求很高。

1.核心思想
选择一个 RDD,使用 flatMap 进行扩容,对每条数据的 key 添加数值前缀(1~N 的数值),将一条数据映射为多条数据;(扩容)
选择另外一个 RDD,进行 map 映射操作,每条数据的 key 都打上一个随机数作为前缀(1~N 的随机数);(稀释)
将两个处理后的 RDD,进行 join 操作。

2. 局限性:
如果两个 RDD 都很大,那么将 RDD 进行 N 倍的扩容显然行不通;使用扩容的方式只能缓解数据倾斜,不能彻底解决数据倾斜问题。

当 RDD 中有几个 key 导致数据倾斜时,方案5不再适用,而方案6又非常消耗资源,此时可以引入方案6的思想完善方案5

➢ 对包含少数几个数据量过大的 key 的那个 RDD,通过 sample 算子采样出一份样本来,然后统计一下每个 key 的数量,计算出来数据量最大的是哪几个 key。

➢ 然后将这几个 key 对应的数据从原来的 RDD 中拆分出来,形成一个单独的 RDD,并给每个 key 都打上 n 以内的随机数作为前缀,而不会导致倾斜的大部分 key
形成另外一个RDD。

➢ 接着将需要 join 的另一个 RDD,也过滤出来那几个倾斜 key 对应的数据并形成一个单独的 RDD,将每条数据膨胀成 n 条数据,这 n 条数据都按顺序附加一个 0~n 的前缀,
不会导致倾斜的大部分 key 也形成另外一个 RDD。

➢ 再将附加了随机前缀的独立 RDD 与另一个膨胀 n 倍的独立 RDD 进行 join,此时就可以将原先相同的 key 打散成 n 份,分散到多个 task 中去进行 join 了。

➢ 而另外两个普通的 RDD 就照常 join 即可。

➢ 最后将两次 join 的结果使用 union 算子合并起来即可,就是最终的 join 结果。

spark性能优化(二)数据倾斜问题相关推荐

  1. Spark性能优化之-数据倾斜

    文章目录 概述 现象和判定方式 数据倾斜发生时的现象 数据倾斜发生的原理 如何定位导致数据倾斜的代码 某个task执行特别慢的情况 某个task莫名其妙内存溢出的情况 查看导致数据倾斜的key的数据分 ...

  2. Spark中Data skew(数据倾斜)Java+Python+Scala三种接口完整代码

    起因 代码中shuffle的算子存在的地方,groupByKey.countByKey.reduceByKey.join等 判断一个算子是shuffle算子可以通过[20] 出现的问题有两种 ①大部分 ...

  3. Spark性能优化--如何解决数据倾斜

    1 Data Skew 数据倾斜 1.1 数据倾斜概念 对Hive.Spark.Flink等大数据计算框架来讲,数据量大并不可怕,可怕的是数据倾斜. 数据倾斜是指并行处理的数据集中某一部分的数据显著多 ...

  4. 大数据技术之_19_Spark学习_07_Spark 性能调优 + 数据倾斜调优 + 运行资源调优 + 程序开发调优 + Shuffle 调优 + GC 调优 + Spark 企业应用案例

    大数据技术之_19_Spark学习_07 第1章 Spark 性能优化 1.1 调优基本原则 1.1.1 基本概念和原则 1.1.2 性能监控方式 1.1.3 调优要点 1.2 数据倾斜优化 1.2. ...

  5. 大数据IMF传奇行动绝密课程第48课:Spark性能优化第四季

    Spark性能优化第四季 1.序列化 2.JVM性能调优 一.Spark性能调优之序列化 1.之所以进行序列化,最重要的原因是内存空间有限(减少GC的压力,最大化的避免Full GC的产生,因为一旦产 ...

  6. 大数据IMF传奇行动绝密课程第45课:Spark性能优化第一季

    Spark性能优化第一季 1.Spark性能优化需要思考的基本问题 2.CPU和Memory 3.并行度和Task 4.网络 一.Spark性能优化核心基石 1.Spark是采用Master-Slav ...

  7. Spark性能优化指南:基础篇

    前言 在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的功能涵盖了大数据领域的离线批处理.SQL类处理.流式/实时计算.机器学习.图计算等各种不同类型的计算操作 ...

  8. Spark性能优化指南:高级篇

    前言 继基础篇讲解了每个Spark开发人员都必须熟知的开发调优与资源调优之后,本文作为<Spark性能优化指南>的高级篇,将深入分析数据倾斜调优与shuffle调优,以解决更加棘手的性能问 ...

  9. 转载 Spark性能优化指南——基础篇

    前言 在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的功能涵盖了大数据领域的离线批处理.SQL类处理.流式/实时计算.机器学习.图计算等各种不同类型的计算操作 ...

  10. Spark性能优化指南——基础篇

    在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的功能涵盖了大数据领域的离线批处理.SQL类处理.流式/实时计算.机器学习.图计算等各种不同类型的计算操作,应用 ...

最新文章

  1. 第二阶段_第三小节_C#基础
  2. 输入法画面_仙剑奇侠传X百度输入法联名纪念版皮肤今日上线
  3. virtualbox 虚拟化问题
  4. Garden Planner中使用3D视图的技巧
  5. 儒豹手机搜索发布2008年各频道关键词排行榜
  6. C++模板技术实现 泛型编程,通俗易懂的泛型编程教程//。
  7. iphone11屏比例_iPhone每一代的屏幕尺寸比例是多少
  8. H指数和G指数的Java实现
  9. Vue中设置浏览器标签栏图标以及title
  10. 为何学习大数据,要先学Java
  11. 用知识图谱打开梁山好汉一百单八将
  12. 自然摄影指南——第一章:曝光:使用相机的测光表
  13. 西雅图Oracle公寓租赁,在西雅图租房必须知道的那些事
  14. mysql并列查询_MYSQL实现排名及查询指定用户排名功能(并列排名功能)实例代码...
  15. 财经数据----同花顺技术选股,附代码
  16. 梯度类算法原理:最速下降法、牛顿法和拟牛顿法
  17. 【环信IM集成指南】iOS端常见问题整理(2)
  18. seo文章重复率高不利于收录
  19. Unity【DoTween】- 如何使Transform Tween动画序列可编辑
  20. 方向不对,努力白费,人生三大“陷阱”,你踩了几个?

热门文章

  1. yarn打包报错:error during build: Error: Assigning to rvalue (Note that you need plugins to import files
  2. [读书总结]大数据时代
  3. 额头、太阳穴、两鬓长痘痘怎么回事?
  4. iOS开发者知识普及,Swift 挑战 Objective-C,谁会笑到最后?
  5. 数组元素两两组合 php,js多个数组元素两两组合三三组合
  6. 不讨老婆之“不亦快哉”(三十三则)(李敖)
  7. Three.js 基础之灯光
  8. 考公 | 粉笔网课笔记——数量 刘凯
  9. 第二章、Tiny4412 U-BOOT移植二 启动分析【转】
  10. 收藏随身查,光纤光缆60条必备知识!