关注交流微信公众号:小满锅

场景

前几天遇到一个任务,从前也没太注意过这个任务,但是经常破9点了,执行时长正常也就2个小时。
看逻辑并不复杂,基本是几段SQL的JOIN操作,其中一个最耗时间的就是要根据底表数据Lateral view explode(array(字段, ‘all’)),一共lateral了4个字段,相当于数据量要扩大16倍。并且可怕的场景,lateral view之后还对11个字段进行了去重。


selecta22 as a,b22 as b,c22 as c,d22 as d,e,f,,count(distinct g) as g    ,count(distinct h) as h  ,count(distinct i) as i    ,count(distinct j) as j  ,count(distinct k) as k,count(distinct l) as l,count(distinct m) as m,count(distinct n) as n,count(distinct o) as o,count(distinct p) as p,count(distinct q) as q,count(distinct r) as r,count(distinct s) as r
from
(select a,b,c,d,e,f,g,h,i,j,k,l,...,r,sfrom tablewhere dt='${etl_date}') tlateral view explode(array(a, 'all')) a as a22lateral view explode(array(b, 'all')) b as b22lateral view explode(array(c, 'all')) c as c22lateral view explode(array(d, 'all')) d as d22group by 1,2,3,4,5,6,7,8,9,10,11,12,13
) t
group by a,b,c,d,e,f

定位问题

查看了执行计划,发现这个任务从Input:4亿多条数据 竟然expand到了700多亿,太夸张了,还以为Spark3.1.1能对这种多维分析场景有很好的优化呢。

但是我又百思不得其解了,从这个数据量来看,应该是一共扩大了176倍,其中Lateral View扩大了16倍我可以理解,剩下的11倍哪里来的?
突然觉得11这个数字很熟悉,就是那11个count(distinct)字段。仔细查看了执行计划,发现除了lateral view 有expand之外,Spark在对Count(distinct)处理时,根据11个字段的去重字段,对于每条记录会都会expand出11条记录,假设为A1-A11,那么每一条记录都对应唯一一个A字段有值,其他10个A字段为NULL,而count(distinct A)只需要计算count(1)即可,看似应该是为了结束数据倾斜的一个优化。这样就解释的通了。
相当于从Read->filter->expand->shuffle 原来的一条记录,变成了176条,也就是扩大了176倍。

解决问题

这种现象第一时间想到的是从两个方面入手,尽量减少这个任务在这个Stage消耗的时间。

  • 第一种就是增加partition,这样减少每个partition的运行时间。
  • 第二种就是减少expand的数据量,从根本上取解决问题。

第一种方案

设置参数
conf.spark.sql.shuffle.partitions = 4096
conf.spark.sql.files.maxPartitionBytes = 8m
conf.spark.shuffle.minNumPartitionsToHighlyCompress = 5000
conf.spark.sql.finalStage.adaptive.skewJoin.skewedPartitionFactor = 3

可以看到partition确实增加了,但是留有的风险比较多,这种就是要通过加资源保证任务快速运行,实际上这个任务不是那种P0任务,这个21min至少得保持1k多的task在并行跑才行,而实际任务在调度的时候,不可能抢到这么多资源,并且不能影响线上重要任务运行。因此这个方案暂时不去做深入研究了。

第二种方案

第二种方案是通过减少expand的量,这样下游处理的数据就会更少一些了。

尝试一

我原本想通过减少expand的量去解决这种问题,比如在子查询中聚合到最细的粒度,然后再去lateral view,但是在本地自己查询发现,即便是去重到最细粒度,仍然有4亿多条,经过那176倍之后,仍然会有700多亿条数据。这个想法失败。

尝试二

因为16个聚合多维分析这个必须存在,所以说扩大16倍是避免不了的(或者有啥其它办法也说不定呢)。

想办法从11个去重字段去入手,可以尝试分别分为两个stage各计算一半的字段,然后结果join起来,但是看起来会需要资源的支撑。

也可以看可不可以关掉count(distinct)的这个数据倾斜优化,找了半天没找到。

尝试三

改用bitmap对字段进行去重 大致就是自己写了一个UDAF,对于整个stage的每个阶段,实现UDAF抽象类的一些方法,比如处理原始数据,输出给下游采用byte[]形式,shuffle时merge,最终聚合输出。这里中间存储的数据结构为RoaringBitMap,听说这个存储和性能比较好,并且shuffle合并聚合时,可以直接利用它的OR操作进行位去重即可,快速且方便。

*   BitmapDistinctUDAFBuffer 约定map和reduce的每个处理过程*   BitmapAggrBuffer 作为中间结果处理器。**   COMPLETE阶段:只要map阶段才调用*   PARTIAL1阶段:轻度汇总传给下游*      init方法         : 约定原始输入数据类型为Text 下游输出为byte[]*      iterate方法      : 将原始输入数据都拿过来 将其写入聚合去重中间结果处理器BitmapAggrBuffer*      terminatePartial: 将处理过后的BitmapAggrBuffer里面的RoaringBitmap处理成byte[]输出给下游*   PARTIAL2阶段:shuffle之后聚合*      init方法         : 约定下游输出为byte[]*      merge           : 将iterate处理后的BitmapAggrBuffer稍微按照Group Key轻度聚合,即对不同的BitmapAggrBuffer里面的RoaringBitMap进行OR操作*      terminatePartial: 将处理过后的BitmapAggrBuffer里面的RoaringBitmap处理成byte[]输出给下游*   FINAL阶段   :最终结果聚合*      init方法         : 约定下游输出为byte[]*      merge           : 将iterate处理后的BitmapAggrBuffer稍微按照Group Key轻度聚合,即对不同的BitmapAggrBuffer里面的RoaringBitMap进行OR操作*      terminate: 将处理过后的BitmapAggrBuffer里面的RoaringBitmap处理成byte[]输出*/

资源非常充足的情况下,跑了很多次,基本30-40min跑完,申请不到资源仍然有50多min,这几次问题的共同点就是卡在了最后一个shuffle的时候,多维分析存在的问题就是,一旦分析维度存在较大差异,那么最细粒度和最粗粒度的数据一定会存在倾斜,可想而知,聚合粒度最粗的数据,要收集上游expand之后的所有数据,必定会导致这个分区长尾,执行时间也自然最长,并且这里打散还不能这样,多维分析时,得对最细的数据打散,一旦聚合到最细粒度之上了,最细粒度之下的数据无法计算,所以聚合到userid粒度时,再去打散,和上面Spark对其的优化一样,打散了多少呗,本身四亿多条数据,扩大16倍,再打散又扩大几倍,效果也不明显。

尝试四

第四种方式来源于第三种开发UDAF的时候看到RoaringBitMap可以进行一个OR操作,那么其实我读取数据的时候,上面一种方法是只能聚合到用户粒度,那聚合到用户粒度是为了多维分析时能够进行去重,那如果是基于这样的需求的话,我仍然可以聚合到多维分析的最细粒度,将用户信息存储到这个bitmap里面,就像下面这个图,很多用户id,标志0或者1,存储到RoaringBitMap数据结构中,那么下游多维分析时,只需要对轻度聚合之后的数据进行一个OR操作就可以了。

基于上面的方案,我又写了一个BitMapMerge的UDAF,主要是对BitMapDistinct出来的中间聚合结果(最细粒度的bitmap)进行合并操作(OR)。

心急如焚的跑了下,效果非常明显,在多维分析Expand之前,轻度聚合4亿多条到多维分析的最细粒度之后,发现只有3000多条数据了,效果非常明显,心想着如果执行lateral view expand之后应该也就是48000左右的数据,然后再聚合到多维分析的各个粒度,这样即便是数据倾斜了,应该也没事,处理的数据量比较小。这样从700亿数据到48000w左右的expand量,数据量上优化了145w倍

。。。完了,内存爆了,失败了,加到Excutor 12G都不行,而且发生在了Input Read的阶段,这里困扰了我,看了很久,发现一个问题就是,轻度聚合的数据只有3000左右了但是足足有11G?我直接计算出每条记录每个bitmap的数值,发现有的去重结果中,有个别千万级别有亿级别的去重数,所以想会不会就是这些个别的大数导致那一部分task失败了呢(因为我即便加了partition也没用,所以我猜测光一条数据就很大)。这个级别的去重数并不可怕,算一算即便是3亿的去重结果,按照bitmap来算的话,也就是300000000/8/1024/1024=35M左右的数据大小,但是越想越不对,就是这里有11个字段,每个都这么大,在expand,351116=6g,一条记录就要expand这么大的数据,那一个task读几条再加expand还不得起飞,这下我知道了,我遇到的不是数据量的倾斜,而是数据大小的倾斜,极个别数据太大了,导致input阶段在expand的时候超内存了。

为了验证自己的想法没错,我将11个字段减少几个比较大的bitmap字段,结果5min就跑完了,并且内存保持的相对不错这也太好了吧。于是就打算将多个结果拆分出来计算再JOIN了。

其它方案

其实为了解决上面expand的数据大小太大的问题,我们终究就是去一个数据量的倾斜和数据大小倾斜之间的一个平衡。加入我不进行轻度聚合,那么就是数据量的倾斜,假如进行轻度聚合,那么就是数据大小的倾斜。那其实就是为了这个平衡的话,可以在轻度聚合的时候,采用一个比较传统的方式,打散聚合,这样就可以平衡掉某条记录因为读取记录大小太大,导致expand出去的数据太大,假设打散100倍,那么最终expand出去的数据也就480w条,优化效果仍然比以前的700亿的数据量要好很多。

但是最终的数据还是一个task在处理,这样的话最粗粒度的key,将会聚集最大bitmap,处理时间会更久,导致长尾。

总结
对于多维分析切去重计数场景,需要尽可能减少expand的量,同时使用bitmap也有坑,一般来说,像这种多维分析场景中,粒度越粗的key总会是聚集用户数最多的,造成数据大小的倾斜,那么处理的时间也是最长的,直观上从平时打散的角度并不会有太多效果,最终处理的结果还是一样。

比较好的方式应该是拆分字段计算,最终再JOIN。最终加起来在资源不足的情况下差不多20min左右,比起资源不足的源代码至少得运行2小时-2个半小时,效果还是非常明显的,并且expand的数据量也从700亿降到了48000数据量。不过单条数据的数据大小确实变大了。

反思

这里利用BitMap分析多维场景,存在一个问题,就是说更粗粒度的计算过程中,其实已经经历了比它更细粒度的bitmap合并。这就是为什么要在内部先聚合到多维分析的最细粒度,来减少重复合并的这个过程。
那其实对于多维分析,其实每个维度本身也是嵌套的父子关系

比如分析维度
os, appver, source
那假如要分析os,appver的话其实就可以基于上面这个维度去进一步merge
加入要分析os维度,就可以基于os, appver再进一步分析。

那我优化的方式使用分JOIN计算Bitmap字段其实对于每个字段而言,还是存在数据大小的倾斜,就是说即便是一个字段,那么它的最粗粒度的分析一定是倾斜的,因为Group sets或者lateral view是将原始数据直接expand16倍,再进行聚合,那么最粗粒度一定是数据量最多(这里虽然不足48000),自然处理数据大小也最大,可能多大几个G,数据大小太大的原因就是因为BitMap存储还是占用太多空间了,其中一个更好的优化,要是可以基于最细粒度自由上卷下钻就好了,就好比一些外部存储一样,存储最细粒度,自动帮你上卷到最粗粒度,一层层叠加,不重复计算。
那Spark暂时没想到避免重复计算的方式,后面就从改善Bitmap存储大小入手,考虑用HLL优化试试,期待能够优化到10min以内。

Spark多维分析去重计数场景优化案例【BitMap精确去重的应用与踩坑】相关推荐

  1. Spark的性能优化案例分析(下)

    前言 Spark的性能优化案例分析(上),介绍了软件性能优化必须经过进行性能测试,并在了解软件架构和技术的基础上进行.今天,我们通过几个 Spark 性能优化的案例,看一看所讲的性能优化原则如何落地. ...

  2. 60-320-040-使用-去重-HyperLogLog 去重计数

    文章目录 1.视界 2.概述 3.依赖 4.使用 5.案例 5.1 普通使用 5.2 实现 5.2.1 优化 1.视界 2.概述 在需要对数据进行去重计数的场景里,实现方式是将数据明细存储在集合的数据 ...

  3. Android 性能优化案例

    2019独角兽企业重金招聘Python工程师标准>>>         之前看到一篇关于优化Android性能的文章,写的很不错.但由于一直没有使用过,最近恰好优化Performan ...

  4. sql的不等于条件优化_SQL优化案例(2):OR条件优化

    随后上一篇文章< SQL优化案例(1):隐式转换>的介绍,此处内容围绕OR的优化展开. 在MySQL中,同样的查询条件,如果变换OR在SQL语句中的位置,那么查询的结果也会有差异,在多个复 ...

  5. 19_clickhouse,数据查询与写入优化,分布式子查询优化,外部聚合/排序优化,基于JOIN引擎的优化,SQL优化案例,物化视图提速,查询优化常用经验法则,选择和主键不一样的排序键,数据入库优化

    25.数据查询与写入优化 25.1.分布式子查询优化 25.1.1.分布式表的IN查询示例1(普通IN子查询.IN子查询为本地表) 25.1.2.分布式表的IN查询示例2(普通IN子查询.IN子查询为 ...

  6. 优化案例 | CASE WHEN进行SQL改写优化

    导读 今天给大家分享一个通过SQL改写而独辟蹊径的SQL优化案例 待优化场景 发现SLOW QUERY LOG中有下面这样一条记录: ... # Query_time: 59.503827 Lock_ ...

  7. 字节跳动在 Spark SQL 上的核心优化实践

    作者 | 郭俊 封图 | BanburyTang 字节跳动数据仓库架构团队负责数据仓库领域架构设计,支持字节跳动几乎所有产品线(包含但不限于抖音.今日头条.西瓜视频.火山视频)数据仓库方向的需求,如 ...

  8. 产品读书《AI进化论:解码人工智能商业场景与案例》

    读后总结 公众对AI的认知大多起源于2016年初AlphaGo击败李世石的人机大战.但稍对人工智能有所了解的人士都知道,在这场机器取得胜利的大战以前,人工智能已经走过了长达60多年的"进化& ...

  9. 突破性能瓶颈!ElasticSearch百亿级数据检索优化案例

    一.前言 本文中的数据平台已迭代三个版本,从头开始遇到很多常见的难题,终于有片段时间整理一些已完善的文档,在此分享以供所需朋友的.实现参考,少走些弯路,在此篇幅中偏重于ES的优化,目前生产已存储百亿数 ...

最新文章

  1. SpringBoot(二)——JPA
  2. xml信息管理系统_WPF信息管理系统项目实战教程二:使用XAML实现页面布局
  3. 【PC工具】文件压缩解压工具winrar解压缩装机必备软件,winRAR5.70免费无广告
  4. AngularJS $http 异步后台无法获取请求参数
  5. 反射_Class对象功能_获取Field
  6. 2019 年 8 月编程语言排行榜,C#重回增长之路
  7. linux模块创建proc,[Linux 运维]/proc/modules 以及内核模块工具
  8. 虚拟机网络桥接-NAT-HOST的理解
  9. 第八届蓝桥杯第六题最大公共子串
  10. 通过AOP引入Log4j
  11. P5154 数列游戏(区间dp)
  12. linux下磁盘分区方法详解
  13. Hadoop 集群 傻瓜式搭建手记 (一) 软件准备
  14. java数据结构与算法pdf下载
  15. vue 打印 某块内容成pdf
  16. 约翰诺依曼在1940年发明了计算机英语,冯诺依曼元胞自动机
  17. 计算机中没有汉字输入,电脑没有了输入法无法输入汉字,是为什么??
  18. LInux常用的60个命令,小白必须掌握的命令
  19. Linux定时任务与开机自启动脚本(cron与crontab)
  20. 第五讲-Docker 镜像(image)

热门文章

  1. 基于python的pyshp库读取.shp数据来获取中国城市边界的经纬度数据,并生成hdf文件
  2. c语言中 cd 什么意思,cd是什么意思|cd的中文翻译 - 医学词典
  3. 【IDEA】IDEA中出现下划标红问题应该怎样解决(一种简单方法)
  4. UNiSONSHIFT・ACCENT经典作品推荐 Chu×Chuアイドる和Chu×Chuぱらだいす~Encore Live~介绍(含下载和攻略)...
  5. C# 封装对第三方接口的调用
  6. 统计学相关概念及机器学习中样本相似性度量之马氏距离
  7. Pygame实现小笨鸟,到小飞鸟
  8. Python的初步认知与安装步骤 (小白必备)
  9. 百度云网盘超级会员账号共享2023.3.10最新可用账号(每日更新
  10. Typecho设置伪静态