引言

join是SQL中的常用操作,良好的表结构能够将数据分散到不同的表中,使其符合某种规范(mysql三大范式),可以最大程度的减少数据冗余,更新容错等,而建立表和表之间关系的最佳方式就是join操作。

对于Spark来说有3种Join的实现,每种Join对应的不同的应用场景(SparkSQL自动决策使用哪种实现范式):

  1.Broadcast Hash Join:适合一张很小的表和一张大表进行Join;

  2.Shuffle Hash Join:适合一张小表(比上一个大一点)和一张大表进行Join;

  2.Sort Merge Join:适合两张大表进行Join;

前两者都是基于Hash Join的,只不过Hash Join之前需要先shuffle还是先brocadcast。下面详细解释一下这三种Join的具体原理。

Hash Join

先来看看这样一条SQL语句:select * from order,item where item.id = order.i_id,参与join的两张表是order和item,join key分别是item.id以及order.i_id。现在假设Join采用的是hash join算法,整个过程会经历三步:

  1.确定Build Table以及Probe Table:这个概念比较重要,Build Table会被构建成以join key为key的hash table,而Probe Table使用join key在这张hash table表中寻找符合条件的行,然后进行join链接。Build表和Probe表是Spark决定的。通常情况下,小表会被作为Build Table,较大的表会被作为Probe Table。

  2.构建Hash Table:依次读取Build Table(item)的数据,对于每一条数据根据Join Key(item.id)进行hash,hash到对应的bucket中(类似于HashMap的原理),最后会生成一张HashTable,HashTable会缓存在内存中,如果内存放不下会dump到磁盘中。

  3.匹配:生成Hash Table后,在依次扫描Probe Table(order)的数据,使用相同的hash函数(在spark中,实际上就是要使用相同的partitioner)在Hash Table中寻找hash(join key)相同的值,如果匹配成功就将两者join在一起。

基础流程可以参考上图,这里有两个问题需要关注:

  1.hash join性能如何?很显然,hash join基本都只扫描两表一次,可以认为O(a+b),较之最极端的是笛卡尔积运算O(a*b);

  2.为什么Build Table选择小表?道理很简单,因为构建Hash Table时,最好可以把数据全部加载到内存中,因为这样效率才最高,这也决定了hash join只适合于较小的表,如果是两个较大的表的场景就不适用了。

上文说,hash join是传统数据库中的单机join算法,在分布式环境在需要经过一定的分布式改造,说到底就是尽可能利用分布式计算资源进行并行计算,提高总体效率,hash join分布式改造一般有以下两种方案:

  1.broadcast hash join:将其中一张较小的表通过广播的方式,由driver发送到各个executor,大表正常被分成多个区,每个分区的数据和本地的广播变量进行join(相当于每个executor上都有一份小表的数据,并且这份数据是在内存中的,过来的分区中的数据和这份数据进行join)。broadcast适用于表很小,可以直接被广播的场景;

  2.shuffle hash join:一旦小表比较大,此时就不适合使用broadcast hash join了。这种情况下,可以对两张表分别进行shuffle,将相同key的数据分到一个分区中,然后分区和分区之间进行join。相当于将两张表都分成了若干小份,小份和小份之间进行hash join,充分利用集群资源。

Broadcast Hash Join

大家都知道,在数据库的常见模型中(比如星型模型或者雪花模型),表一般分为两种:事实表和维度表,维度表一般指固定的、变动较少的表,例如联系人、物品种类,一般数据有限;而事实表一遍记录流水,比如销售清单等,通过随着时间的增长不断增长。

因为join操作是对两个表中key相同的记录进行连接,在SparkSQL中,对两个表做join的最直接的方式就是先根据key进行分区,再在每个分区中把key相同的记录拿出来做连接操作,但这样不可避免的涉及到shuffle,而shuffle是spark中比较耗时的操作,我们应该尽可能的设计spark应用使其避免大量的shuffle操作。

Broadcast Hash Join的条件有以下几个:

  1.被广播的表需要小于spark.sql.autoBroadcastJoinThreshold所配置的信息,默认是10M;

  2.基表不能被广播,比如left outer join时,只能广播右表。

看起来广播是一个比较理想的方案,但它有没有缺点呢?缺点也是很明显的,这个方案只能广播较小的表,否则数据的冗余传输就是远大于shuffle的开销;另外,广播时需要被广播的表collect到driver端,当频繁的广播出现时,对driver端的内存也是一个考验。

如下图所示,broadcast hash join可以分为两步:

  1.broadcast阶段:将小表广播到所有的executor上,广播的算法有很多,最简单的是先发给driver,driver再统一分发给所有的executor,要不就是基于bittorrete的p2p思路;

  2.hash join阶段:在每个executor上执行 hash join,小表构建为hash table,大表的分区数据匹配hash table中的数据;

Shuffle Hash Join

当一侧的表比较小时,我们可以选择将其广播出去以避免shuffle,提高性能。但因为被广播的表首先被collect到driver端,然后被冗余的发送给各个executor上,所以当表比较大是,采用broadcast join会对driver端和executor端造成较大的压力。

我们可以通过将大表和小表都进行shuffle分区,然后对相同节点上的数据的分区应用hash join,即先将较小的表构建为hash table,然后遍历较大的表,在hash table中寻找可以匹配的hash值,匹配成功进行join连接。这样既在一定程度上减少了driver广播表的压力,也减少了executor端读取整张广播表的内存消耗。

Sshuffle Hash Join分为两步:

  1.对两张表分别按照join key进行重分区(分区函数相同的时候,相同的相同分区中的key一定是相同的),即shuffle,目的是为了让相同join key的记录分到对应的分区中;

  2.对对应分区中的数据进行join,此处先将小表分区构建为一个hash表,然后根据大表中记录的join key的hash值拿来进行匹配,即每个节点山单独执行hash算法。

Shuffle Hash Join的条件有以下几个:

1. 分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold所配置的值,默认是10M

2. 基表不能被广播,比如left outer join时,只能广播右表

3. 一侧的表要明显小于另外一侧,小的一侧将被广播(明显小于的定义为3倍小,此处为经验值)

看到这里,可以初步总结出来如果两张小表join可以直接使用单机版hash join;如果一张大表join一张极小表,可以选择broadcast hash join算法;而如果是一张大表join一张小表,则可以选择shuffle hash join算法;那如果是两张大表进行join呢?

Sort Merge Join

上面介绍的方式只对于两张表有一张是小表的情况适用,而对于两张大表,但当两个表都非常大时,显然无论哪种都会对计算内存造成很大的压力。这是因为join时两者采取都是hash join,是将一侧的数据完全加载到内存中,使用hash code取join key相等的记录进行连接。

当两个表都非常大时,SparkSQL采用了一种全新的方案来对表进行Join,即Sort Merge Join。这种方式不用将一侧数据全部加载后再进行hash join,但需要在join前将数据进行排序。

首先将两张表按照join key进行重新shuffle,保证join key值相同的记录会被分在相应的分区,分区后对每个分区内的数据进行排序,排序后再对相应的分区内的记录进行连接。可以看出,无论分区有多大,Sort Merge Join都不用把一侧的数据全部加载到内存中,而是即用即丢;因为两个序列都有有序的,从头遍历,碰到key相同的就输出,如果不同,左边小就继续取左边,反之取右边。从而大大提高了大数据量下sql join的稳定性。

SparkSQL对两张大表join采用了全新的算法-sort-merge join,如下图所示,整个过程分为三个步骤:

. shuffle阶段:将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理;

2. sort阶段:对单个分区节点的两表数据,分别进行排序;

3. merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,否则取更小一边,见下图示意:

参考:

https://www.cnblogs.com/0xcafedaddy/p/7614299.html

转载于:https://www.cnblogs.com/duodushuduokanbao/p/9911256.html

Spark SQL join的三种实现方式相关推荐

  1. oracle Hash Join及三种连接方式

    在Oracle中,确定连接操作类型是执行计划生成的重要方面.各种连接操作类型代表着不同的连接操作算法,不同的连接操作类型也适应于不同的数据量和数据分布情况. 无论是Nest Loop Join(嵌套循 ...

  2. Spark三种部署方式

    目前Apache Spark支持三种分布式部署方式,分别是standalone.spark on mesos和 spark on YARN,其中,第一种类似于MapReduce 1.0所采用的模式,内 ...

  3. Spark SQL JOIN操作代码示例

    title: Spark SQL JOIN操作 date: 2021-05-08 15:53:21 tags: Spark 本文主要介绍 Spark SQL 的多表连接,需要预先准备测试数据.分别创建 ...

  4. mysql几种安装方法_mysql的三种安装方式(详细)

    安装MySQL的方式常见的有三种: rpm包形式 通用二进制形式 源码编译 1,rpm包形式 (1) 操作系统发行商提供的 (2) MySQL官方提供的(版本更新,修复了更多常见BUG)www.mys ...

  5. powerdesigner 怎么关联两张表_【PL/SQL数据库】 三种关联机制 - 执行计划

    看完这章你会学习到以下内容: 1. 三种关联分别是什么? 2. 什么时候那个适合用哪个?(总结) 一共有三种关联机制: Nest Loop 嵌套循环 (大小表) Sort Merge 排序合并 (添加 ...

  6. Hive metastore三种配置方式

    Hive的meta数据支持以下三种存储方式,其中两种属于本地存储,一种为远端存储.远端存储比较适合生产环境.Hive官方wiki详细介绍了这三种方式,链接为:Hive Metastore. 一.本地d ...

  7. oracle if=,oracle中if/else的三种实现方式详解

    1.标准sql规范 1.单个IF IF v=... THEN END IF; 2.IF ... ELSE IF v=... THEN ELSE t....; END IF; 3.多个IF IF v=. ...

  8. pdo_fetch执行mysql_PDO中执行SQL语句的三种方法

    在PDO中,我们可以使用三种方式来执行SQL语句,分别是 exec()方法,query方法,以及预处理语句prepare()和execute()方法~大理石构件来图加工 在上一篇文章<使用PDO ...

  9. java jndi tomcat_tomcat下jndi的三种配置方式

    Java命名和目录接口(the Java naming and directory interface,JNDI)是一组在Java应用中访问命名和目录服务的API.命名服务将名称和对象联系起来,使得读 ...

最新文章

  1. 控制输入框只能输入数字
  2. python中calendar怎么用_Python时间模块datetime、time、calendar的使用方法
  3. vs2010中添加项目中找不到EntityFramework实体框架解决办法
  4. C# 获取USB设备信息
  5. 人活系列Streetlights (秩)
  6. 数字信号处理学习笔记(五)|有限脉冲响应数字滤波器的设计
  7. jquery获取当前的节点
  8. 【随笔】4.上海上港
  9. GloVe损失函数的理解
  10. 基于SSM的电影购票系统
  11. 如何方便的下载csdn博客正文
  12. xp系统怎么开启usb服务器,windowsxp系统设置usb手机网络分享的方法
  13. Java编程题修院子_2020大学moocJava程序设计题目答案
  14. python 微信自动回复小程序
  15. 100天成就卓越领导力:新晋领导者的First100训练法
  16. table表格自动换行
  17. termux目录_Termux 入门与实践
  18. 华尔街和散户「权力的游戏」,留下一地怎样的「鸡毛」
  19. javaftp读取服务器文件,java读取ftp服务器文件
  20. poi导出复杂的excle,简单易懂一看既会

热门文章

  1. 使用 Github 作为专用 Nuget 包服务器并共享您的包
  2. 开源的方舟编译器将进入深圳大学课堂
  3. 台达vfd一ⅴe变频说明书_PLC运动控制实例解析:PLC与变频器系统
  4. wireshark linux版本_Wireshark大法-WiFi6无线抓包
  5. python导入模块介绍_详解Python模块导入方法
  6. eslint是什么_为什么eslint没有 no-magic-string?
  7. div 隐藏_SEO优化,隐藏文本与隐藏链接对SEO的影响!
  8. python函数的内涵_python内涵段子文章爬取
  9. mysql根据月份查询订单销售额
  10. 长短期记忆网络_科研成果快报第181期:改进的长短期记忆网络用于长江上游干支流径流预测...