数据倾斜只会发生在shuffle过程中。这里给大家罗列一些常用的并且可能会触发shuffle操作的算子:

distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等

出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的。

某个task执行特别慢的情况

首先要看的,就是数据倾斜发生在第几个stage中。
可以通过Spark Web UI来查看当前运行到了第几个stage,看一下当前这个stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜。

比如下图中,倒数第三列显示了每个task的运行时间。明显可以看到,有的task运行特别快,只需要几秒钟就可以运行完;而有的task运行特别慢,需要几分钟才能运行完,此时单从运行时间上看就已经能够确定发生数据倾斜了。此外,倒数第一列显示了每个task处理的数据量,明显可以看到,运行时间特别短的task只需要处理几百KB的数据即可,而运行时间特别长的task需要处理几千KB的数据,处理的数据量差了10倍。此时更加能够确定是发生了数据倾斜。

知道数据倾斜发生在哪一个stage之后,接着我们就需要根据stage划分原理,推算出来发生倾斜的那个stage对应代码中的哪一部分,这部分代码中肯定会有一个shuffle类算子。精准推算stage与代码的对应关系,这里介绍一个相对简单实用的推算方法:只要看到Spark代码中出现了一个shuffle类算子或者是Spark SQL的SQL语句中出现了会导致shuffle的语句(比如group by语句),那么就可以判定,以那个地方为界限划分出了前后两个stage。

这里我们就以Spark最基础的入门程序——单词计数来举例,如何用最简单的方法大致推算出一个stage对应的代码。如下示例,在整个代码中,只有一个reduceByKey是会发生shuffle的算子,因此就可以认为,以这个算子为界限,会划分出前后两个stage。

stage0,主要是执行从textFile到map操作,以及执行shuffle write操作。shuffle write操作,我们可以简单理解为对pairs RDD中的数据进行分区操作,每个task处理的数据中,相同的key会写入同一个磁盘文件内。

stage1,主要是执行从reduceByKey到collect操作,stage1的各个task一开始运行,就会首先执行shuffle read操作。执行shuffle read操作的task,会从stage0的各个task所在节点拉取属于自己处理的那些key,然后对同一个key进行全局性的聚合或join等操作,在这里就是对key的value值进行累加。stage1在执行完reduceByKey算子之后,就计算出了最终的wordCounts RDD,然后会执行collect算子,将所有数据拉取到Driver上,供我们遍历和打印输出。

val conf = new SparkConf()
val sc = new SparkContext(conf)
val lines = sc.textFile("hdfs://...")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.collect().foreach(println(_))

通过对单词计数程序的分析,希望能够让大家了解最基本的stage划分的原理,以及stage划分后shuffle操作是如何在两个stage的边界处执行的。然后我们就知道如何快速定位出发生数据倾斜的stage对应代码的哪一个部分了。

比如我们在Spark Web UI或者本地log中发现,stage1的某几个task执行得特别慢,判定stage1出现了数据倾斜,那么就可以回到代码中定位出stage1主要包括了reduceByKey这个shuffle类算子,此时基本就可以确定是由reduceByKey算子导致的数据倾斜问题。比如某个单词出现了100万次,其他单词才出现10次,那么stage1的某个task就要处理100万数据,整个stage的速度就会被这个task拖慢。

某个task莫名其妙内存溢出的情况

这种情况下去定位出问题的代码就比较容易了。我们建议直接看yarn-client模式下本地log的异常栈,或者是通过YARN查看yarn-cluster模式下的log中的异常栈。一般来说,通过异常栈信息就可以定位到你的代码中哪一行发生了内存溢出。然后在那行代码附近找找,一般也会有shuffle类算子,此时很可能就是这个算子导致了数据倾斜。
但是大家要注意的是,不能单纯靠偶然的内存溢出就判定发生了数据倾斜。因为自己编写的代码的bug,以及偶然出现的数据异常,也可能会导致内存溢出。因此还是要按照上面所讲的方法,通过Spark Web UI查看报错的那个stage的各个task的运行时间以及分配的数据量,才能确定是否是由于数据倾斜才导致了这次内存溢出。
查看导致数据倾斜的key的数据分布情况

知道了数据倾斜发生在哪里之后,通常需要分析一下那个执行了shuffle操作并且导致了数据倾斜的RDD/Hive表,查看一下其中key的分布情况。这主要是为之后选择哪一种技术方案提供依据。针对不同的key分布与不同的shuffle算子组合起来的各种情况,可能需要选择不同的技术方案来解决。

此时根据你执行操作的情况不同,可以有很多种查看key分布的方式:
如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下SQL中使用的表的key分布情况

如果是对Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码,比如RDD.countByKey()。然后对统计出来的各个key出现的次数,collect/take到客户端打印一下,就可以看到key的分布情况。

举例来说,对于上面所说的单词计数程序,如果确定了是stage1的reduceByKey算子导致了数据倾斜,那么就应该看看进行reduceByKey操作的RDD中的key分布情况,在这个例子中指的就是pairs RDD。如下示例,我们可以先对pairs采样10%的样本数据,然后使用countByKey算子统计出每个key出现的次数,最后在客户端遍历和打印样本数据中各个key的出现次数。

val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))

Spark如何定位导致数据倾斜的代码相关推荐

  1. Spark性能优化之-数据倾斜

    文章目录 概述 现象和判定方式 数据倾斜发生时的现象 数据倾斜发生的原理 如何定位导致数据倾斜的代码 某个task执行特别慢的情况 某个task莫名其妙内存溢出的情况 查看导致数据倾斜的key的数据分 ...

  2. Spark中Data skew(数据倾斜)Java+Python+Scala三种接口完整代码

    起因 代码中shuffle的算子存在的地方,groupByKey.countByKey.reduceByKey.join等 判断一个算子是shuffle算子可以通过[20] 出现的问题有两种 ①大部分 ...

  3. 【Spark调优】大表join大表,少数key导致数据倾斜解决方案

    [Spark调优]大表join大表,少数key导致数据倾斜解决方案 参考文章: (1)[Spark调优]大表join大表,少数key导致数据倾斜解决方案 (2)https://www.cnblogs. ...

  4. Spark 调优之数据倾斜

    什么是数据倾斜? Spark 的计算抽象如下 数据倾斜指的是:并行处理的数据集中,某一部分(如 Spark 或 Kafka 的一个 Partition)的数据显著多于其它部分,从而使得该部分的处理速度 ...

  5. 解决spark中遇到的数据倾斜问题

    一. 数据倾斜的现象 多数task执行速度较快,少数task执行时间非常长,或者等待很长时间后提示你内存不足,执行失败. 二. 数据倾斜的原因 常见于各种shuffle操作,例如reduceByKey ...

  6. 关于(我们流量表优化),分区表数据块过多,聚合又导致数据倾斜问题

    前提:上个文章记录了我流量表的开发过程,成型后每个分区会有4000文件,不用hive分发+rand()函数会有6万个细碎文件.虽然已经大量减少了细碎文件的产生,但是每天产生4000个,月报4万个文件对 ...

  7. java程序cpu突然飚高_Java 定位导致CPU飙升的代码过程

    线上的一个日志实时输出的程序曾经出过这样一个问题,刚开始上线java程序占用的CPU的资源很少,但是到了整点的时候,CPU直线飙高,直接到达100%根本没有要下降的趋势,唯一的方法只能杀掉它了,后面在 ...

  8. spark学习之处理数据倾斜

  9. hive解决数据倾斜问题_八种解决 Spark 数据倾斜的方法

    有的时候,我们可能会遇到大数据计算中一个最棘手的问题--数据倾斜,此时Spark作业的性能会比期望差很多.数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能. 数 ...

最新文章

  1. 解决MYSQL大表问题-实战篇(二)
  2. C++实现用堆求最小的k个数
  3. java - 线程1打印1-10,当线程打印到5后,线程2打印“hello”,然后线程1继续打印...
  4. 通电后第一次开机黑屏_电脑无法开机怎么办,8 种情况的修复方法
  5. ThinkJs笔记琐碎
  6. 用 C# 写一个 Redis 数据同步小工具
  7. c语言24点游戏代码回法,C语言解24点游戏程序
  8. 电信5g网络apn接入点_华为就5G网络设备禁令起诉瑞典邮政和电信管理局
  9. 为Vue2集成UIkit
  10. python:安装pycaret2.2.3(pytorch版本为1.7.1)
  11. p2p服务器系统,先锋P2P高清点播服务器(XfServer)
  12. 西方哲学史的主要发展阶段
  13. 第五章:3ds max UV展开和BP贴图绘制(上)
  14. Writing a Cause and Effect Essay
  15. 云计算的认识和看法_我的关于云计算的看法和认识
  16. Android AOA协议Android端 流程总结
  17. android 打开系统键盘的方法
  18. EMQX(emqtt)安装错误:Required dependencies: openssl-1.1.1 (libcrypto), libncurses and libatomic1安装openssl
  19. 【MySQL】如何使用SQL语句获取表结构和获取全部表名
  20. 企业办公必备:千里眼VSir视频会议系统软件

热门文章

  1. vue 打包css路径不对_vue项目打包后css背景图路径不对的问题
  2. java不同项目加token访问_利用JWT实现前后端分离的Token验证
  3. 彻底解决VS中找不到 Windows SDK 版本 8.1的错误
  4. spring学习--JdbcTemplate-查询返回-批量操作
  5. 用anaconda配置深度学习的环境,从配置环境到下载各种包,绝对学会,还是没学会留下评论,我看到会回答
  6. python通过什么对象连接数据库步骤_python如何连接数据库
  7. 吃了核辐射食物怎么办_尿酸过高怎么办?这几种食物,平时可多吃
  8. 小腹右侧突然疼了一下_腰椎间盘膨出,为什么不是脊椎柱中间疼,而是左侧疼?...
  9. @RequestParam注解用法
  10. layer 一些理解