介绍Spark分桶的原理以及如何在数据分析和数据准备中更好的应用分桶。

分桶概念

spark的bucketing分桶是一种组织存储系统中数据的方式。以便后续查询中用到这种机制,来提升计算效率。

如果分桶设计得比较合理,可以避免关联和聚合查询中的混洗(洗牌、打散、重分布)的操作,从而提升性计算性能。

一些查询(sort-merge join、shuffle-hash join、聚合、开窗函数)都需要输入数据按照关联键或聚合键被重分布。更具体地说,具有相同键值的行要被重分布到同一分区。为了满足这个要求,spark需要对数据进行重分布,物理上就是说spark要把数据从一个executor移动到另一个executor,这也就是常说的shuffle。

有了bucketing,我们相当于提前进行了shuffle,并把重分布的结果保存到了存储系统。当我们从存储系统读回这些分桶后的数据时,spark将会感知到这些分布并不会再次执行重分布。也就是常说的,空间换时间。

分桶代码

df.write.mode(saving_mode)  # append/overwrite.bucketBy(n, field1, field2, ...).sortBy(field1, field2, ...).option("path", output_path).saveAsTable(table_name)

注意点:

1、必须以saveAsTable的方式保存为表。因为分区信息需要保存到元数据中。spark在访问表时会得到关于这个表的分桶信息。

2、sortBy是个可选操作。分桶在没有排序下也可以工作。

3、bucketBy第一个参数是桶的个数。设置桶数量是个挑战,需要考虑数据集整体大小和将要被创建文件的个数。不恰当的设置,会导致文件数量过多,有时需要在写分桶之前,先对数据进行repartition。

数据到底去了哪个桶

粗略说就是对分桶字段取hash散列值,然后用散列值除以桶数取余数,并确保余数为正(positive modulo)。

h = hash(key)
b = h mod n
if b < 0:b = (b + n) mod n
from pyspark.sql.functions import hash, col, expr
spark.range(100) # this will create a DataFrame with one column id.withColumn("hash", hash(col("id"))).withColumn("bucket", expr("pmod(hash, 8)"))# pmod 内置sql函数,https://spark.apache.org/docs/latest/api/sql/index.html#pmod
# SELECT pmod(10, 3);
# output 1
# SELECT pmod(-10, 3);
# output 2

分桶的优缺点

优点

有点主要有:避免shuffle(Shuffle-free),和提升过滤效率(bucket pruning/桶裁剪)。

避免shuffle(Shuffle-free)

当没有分桶时,两个大表要join需要重分布。如下图所示,两表都要经过scan-exchange(重分布)-sort-smj

两方都免shuffle

当两表在相同的字段上按照一致的规则分桶后再进行join,如下图所示,只需经过scan-sort-smj,跳过了exchange重分布。

单方免shuffle

如果已经分桶的一方的分桶数大于默认分区数,那分桶方可以免shuffle。

默认shuffle分区数是200,可以手动指定shuffle分区数。

spark.conf.set("spark.sql.shuffle.partitions", n)

两方分桶数不同

如果分桶数小于默认的shuffle分区数,那两方都要shuffle。如果分桶数大于等于默认shuffle分区数,那就是单方免shuffle,分桶数多的一方免shuffle。

3.1版本之后,开启spark.sql.bucketing.coalesceBucketsInJoin.enabled这个配置,两方都可以免shuffle。

关于排序

在 Spark 3.0 之前,如果每个桶仅由一个文件组成,则可以从连接计划中消除排序运算符。在这种情况下,Spark 确信在集群上读取数据后对其进行了排序,并且确实最终的计划是无排序的。但是,如果每个存储桶有更多文件,Spark 不能保证数据是全局排序的,因此会将 Sort 运算符保留在计划中——数据必须在连接执行期间进行排序。

在 Spark 3.0 中,情况发生了变化,默认情况下,即使每个存储桶只有一个文件,也会出现排序。进行此更改的原因是列出所有文件以检查每个存储桶是否只有一个文件太昂贵(如果文件太多),因此决定关闭此检查并始终将排序放在计划中(用于排序合并连接)。如您所见,这是一种权衡,一种优化另一种。还引入了一个新的配置设置 spark.sql.legacy.bucketedTableScan.outputOrdering,您可以将其设置为 True 以强制执行 3.0 之前的行为,并且仍然利用一个文件的排序存储桶。

bucket pruning/桶裁剪

在分桶字段上进行过滤操作时,过滤操作不需要扫描全表,只需去对应桶里扫描,加快过滤性能。

缺点

需要牢记分桶的一个结果是执行期间的并行化。如果一个表被分桶到 n 个桶中,在对其进行查询时,则生成的作业的第一阶段将恰好有 n 个任务。如果表没有分桶或分桶关闭,许多任务可能会非常不同,因为 Spark 会尝试将数据拆分为分区,每个分区大约有 128 MB(这由配置设置控制spark.sql.files.maxPartitionBytes),文件大于128M会被拆到多个task,因此任务具有合理的大小并且不会遇到内存问题。

如果一个表被分桶并且随着时间的推移它的大小增加并且桶变大,那么关闭分桶以允许 Spark 创建更多分区并避免数据溢出问题可能会更有效。这很有用,特别是如果查询并不直接用到分桶提供的预分布便利时。

在 Spark 3.1.1 中实现了一个新功能,它可以根据查询计划(没有连接或聚合)识别分桶无用的情况,并将关闭分桶,因为它将丢弃分布并扫描数据的方式与未分桶相同。此功能默认是开启的,可通过 spark.sql.sources.bucketing.autoBucketedScan.enabled 配置。

如何应用桶

数据分析师

对于分析师来说,了解一些关于分桶表的信息,有助于提升我们的查询分析效率。

查看表是否已分桶

spark.sql("DESCRIBE EXTENDED table_name").show(n=100)

这条命令可以查看分桶字段和分桶数量。

分桶优化是否开启

spark.conf.get("spark.sql.sources.bucketing.enabled")

true就是已开启。

注意访问方式

需要使用读表的方式访问分桶数据,这样spark才能从元数据中获取表的分桶信息。

# Spark will use the information about bucketing from metastore:
df = spark.table(table_name)

这里需要注意的是:如果直接通过路径获取分桶数据,spark是不会得到分桶信息的,分桶优化是不会被执行的。

触发分桶优化

通常情况下,如果表已经分桶,且两方分桶字段和策略一致,分桶优化是开箱即用的。但有时,我们需要做些额外措施,以便帮助spark触发分桶优化。

关于重命名

关联查询时,如果两表分桶字段含义相同但是名字不同,在join前不要对关联字段重命名。保持原字段名就行。

tableA.join(tableB, tableA['user_id'] == tableB['userId'])

spark3.0之后的版本,重命名后再join,分桶也会起作用。但之前版本,需要注意。

关于不同类型

如果两表分桶字段含义相同,策略相同,但遗憾的是当初数据类型不同。此时,需要将其中一方进行类型转化,这样可以变成单方免shuffle。如果不做类型转化,spark将会对双方都进行重分布。

# 假如:tableA user_id是int类型;tableB user_id是long类型;都分了N个桶。
# 那需要把其中一个表进行类型转化,并重分布为N个分桶。
# 如下操作之后,在join时,tableB是可以避免重分布的。
tableA.withColumn('user_id', col('user_id').cast('long')).repartition(50, 'user_id').join(tableB, 'user_id')

关于udf​​​​​​​

如果计算过程用到了UDF,那建议先join再用UDF。因为UDF 将丢弃有关分桶的信息。

tableA.join(tableB, 'user_id').withColumn('x', my_udf('some_col'))

数据工程师

数据湖、数据仓库的里表通常由数据工程师设计。分桶和分区一样是经常被考虑的技术,以便在文件系统更好组织数据,方便数据分析师和科学家使用。

防止大量小文件

假设我们处理一个 20 GB 的数据集,我们在最后一个阶段将数据分布到 200 个任务中(每个任务处理大约 100 MB),我们想要创建一个包含 200 个桶的表。 如果集群上的数据是随机分布的(这是一般情况),对于这 200 个任务中随机分布的数据块,每一个都会携带这200 个桶中的一部分数据,所以每个任务将创建 200 个文件,导致 200 x 200 = 40 000 个文件,而且每个文件非常小。 结果文件的数量是任务数量与存储桶数量的乘积。

所以,这里要做的是将随机分布提前散列重分布。

df.repartition(expr("pmod(hash(user_id), 200)")).write.mode(saving_mode)  # append/overwrite.bucketBy(200, 'user_id').option("path", output_path).saveAsTable(table_name)

这里我们重分布时所用的表达式,其实就是spark分桶时底层进行桶分配的表达式。

设置合适的桶数

主要参考最终桶的大小,当读回数据时,一个桶会被一个任务处理,如果桶很大,任务会出现内存问题,Spark 会在执行期间必须将数据溢出到磁盘上,这将导致性能下降。根据运行的查询,每个存储桶大小在150-200 MB 之间,可能是一个合理的选择,如果知道数据集的总大小,则可以从中计算要创建多少个存储桶。

Spark bucketing bucket分桶原理和应用相关推荐

  1. Hive 的概念、应用场景、安装部署及使用、数据存储 、table(内部表)和external table(外部表)、partition(分区表)和bucket(分桶表)

    目录 1.Hive 的概念 2.Hive 的特点 3.Hive 和 RDBMS(关系型数据库) 的对比 4.Hive 和 HBase 的差别 5.Hive 架构 6.Hive安装与使用方法介绍 7.H ...

  2. 大数据学习笔记42:Hive - 分桶表

    文章目录 一.Hive分桶表 1.分桶操作 2.分桶原理 3.注意事项 二.分桶表案例演示 1.创建数据文件courses.txt 2.将数据文件上传到HDFS的/bucket目录 3.基于/buck ...

  3. 流量为王:ABTest流量分层分桶机制

    在互联网行业,无论是构建搜索推荐系统,还是智能营销等场景,都是围绕用户进行不同的实验,从各项指标上观察用户对不同交互.流程.策略.算法等反馈,进而对产品.营销策略.搜索推荐算法等进行迭代改进. 在之前 ...

  4. 大数据之hive:hive分桶表

    目录 一.回顾分区表 二.为什么分桶? 三.分桶表的使用 1.创建一个带分桶定义的表(分桶表) 2.加载数据: 3.对分桶表的查询 分桶总结: 实例 一.回顾分区表 为什么有分区? 随着系统运行时间增 ...

  5. Hive分桶(bucket)

    一 什么是桶的概念,和分区有啥区别? 对于每一个表或者分区,可以进一步细分成桶,桶是对数据进行更细粒度的划分.默认时对某一列进行hash,使用hashcode对 桶的个数求模取余,确定哪一条记录进入哪 ...

  6. Hive分桶之BUCKET详解

    Bucket 1.对于每一个表(table)或者分区(partition), Hive可以进一步组织成桶,也就是说桶是更为细粒度的数据范围划分.Hive也是 针对某一列进行桶的组织.Hive采用对列值 ...

  7. Hive Sampling 抽样函数:Random随机抽样、Block 基于数据块抽样、Bucket table 基于分桶表抽样

    Hive Sampling 抽样函数 文章目录 Hive Sampling 抽样函数 Random随机抽样 Block 基于数据块抽样 Bucket table 基于分桶表抽样 语法 在HQL中,可以 ...

  8. Bucket Join:分桶Join

    Bucket Join:分桶Join 场景:大表join大表,多次join 实现 step1:将两张大表的数据构建分桶 数据按照分桶的规则拆分到不同的文件中 分桶规则=MapReduce分区的规则=k ...

  9. Hive 分桶表原理及优化大表 join 实战

    一.什么是分桶表 分桶表,比普通表或者分区表有着更为细粒度的数据划分. 举个例子,每天产生的日志可以建立分区表,每个分区在 hdfs 上就是一个目录,这个目录下包含了当天的所有日志记录. 而分桶表,可 ...

最新文章

  1. 数据类型(Python)
  2. TensorFlow 笔记6--迁移学习
  3. php mysql 查询 区分大小写_MySQL查询字符串时区分大小写
  4. LRU原理及其实现(C++)
  5. 快速获取csv数量_【数量技术宅|数据爬虫系列分享】如何获取免费的数字货币历史数据...
  6. 上下div高度动态自适应--另类处理方案
  7. 20200316:H指数(leetcode274)
  8. 1057 字符转数字,判断
  9. MFC 线程创建方式
  10. ppc+安装编译mysql_redhat7ppc安装
  11. 用口诀背英语单词绿色版简介
  12. aut0cad2010卸载工具_autocad2010官方版
  13. Ibeacon一维小项目
  14. VUE(11) : 图片点击全屏展示
  15. API Design for ios 译文
  16. 极路由php插件,极路由SDK
  17. TinyOS总体介绍
  18. 解决IE11审查元素面板空白问题
  19. 数据分析如何避免客户流失
  20. 测量用计算机软件管理办法,《计算机应用基础》测试题(一)

热门文章

  1. SINS工具箱介绍—各种初始对准方法
  2. 直通车拼多多7天均价比价
  3. linux安装IBM Webshere MQ以及配置
  4. 2022 年欧美上市公司价值/数量暴跌 90%
  5. win10 ubuntu双系统进入系统的时候recovering journal的解决办法
  6. java高级程序员(Java高级程序员招聘)
  7. 不可逆加密:说一下你常用的加密算法
  8. markdown使用方法大全
  9. OkHttp,蚂蚁金服Android架构面试题
  10. 信息系统工程的总体规划