背景

join是SQL中最常见的操作,写SQL最经常的场景就是几张表各种join,join操作也是各种操作中最耗时的操作之一。

作为一个Spark SQL Boy,有必要详细了解一下Spark的join策略。

MR中的join

介绍Spark的join策略之前,先介绍一下MR中是如何实现join操作的。

MR中的join分为Map端join和Reduce端join。

数据准备如下:

订单表

1001  01  11002  02  21003  03  31004  01  41005  02  51006  03  6

商品表

01  小米02  华为03  格力

Map join:

1. 加载阶段:把小表加载到内存里面;

2. Hash join:构建Hash表做查询。

只在Map端处理数据,没有Reduce,优点是并行度非常高,没有Shuffle,不会出现数据倾斜。

Mapper类

public class JoinMapper extends Mapper {    HashMap<String,String> pdMap=new HashMap<>();    TableBean k = new TableBean();    @Override    protected void setup(Context context) throws IOException, InterruptedException {        URI[] cacheFiles = context.getCacheFiles();        String path = cacheFiles[0].getPath().toString();        InputStreamReader inputStream = new InputStreamReader(new FileInputStream(path));        BufferedReader bis = new BufferedReader(inputStream);        String line;        while((line=bis.readLine())!=null){            String[] fields = line.split("\t");            pdMap.put(fields[0],fields[1]);        }        bis.close();        inputStream.close();    }    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        String line = value.toString();        String[] fields = line.split("\t");        String pId = fields[1];        String pdName = pdMap.get(pId);        k.setAmount(Integer.parseInt(fields[2]));        k.setId(fields[0]);        k.setPid(fields[1]);        k.setPname(pdName);        k.setFlag("order");        context.write(k, NullWritable.get());    }}

Reduce join:

1. Map阶段:分片做循环,join的key为进入Reduce Task的key,value是一个对象;

2. Reduce阶段:同一个key进入同一个Reduce Task,封装输出对象。

Mapper类

public class TableBeanMapper extends Mapper<LongWritable, Text, Text, TableBean> {    FileSplit filesplit;    String name;    TableBean v = new TableBean();    Text k = new Text();    @Override    protected void setup(Context context) throws IOException, InterruptedException {        InputSplit inputSplit = context.getInputSplit();        filesplit = (FileSplit) inputSplit;        Path path = filesplit.getPath();        name = path.getName();    }    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        String s = value.toString();        if (name.startsWith("order")) {            String[] split = s.split("\t");            v.setId(split[0]);            v.setPid(split[1]);            v.setAmount(Integer.parseInt(split[2]));            v.setPname("");            v.setFlag("order");            k.set(split[1]);        } else {            String[] split = s.split("\t");            v.setId("");            v.setPid(split[0]);            v.setAmount(0);            v.setPname(split[1]);            v.setFlag("pd");            k.set(split[0]);        }        context.write(k, v);    }}

Reducer类

import org.apache.commons.beanutils.BeanUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;import java.lang.reflect.InvocationTargetException;import java.util.ArrayList;public class TableBeanReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {    @Override    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {        ArrayList tableBeanArrayList = new ArrayList();        TableBean tableBean = new TableBean();        for (TableBean value : values) {            if (value.getFlag().equals("order")) {                TableBean tableBean1 = new TableBean();                try {                    BeanUtils.copyProperties(tableBean1, value);                    tableBeanArrayList.add(tableBean1);                } catch (IllegalAccessException e) {                    e.printStackTrace();                } catch (InvocationTargetException e) {                    e.printStackTrace();                }            } else {                try {                    BeanUtils.copyProperties(tableBean, value);                } catch (IllegalAccessException e) {                    e.printStackTrace();                } catch (InvocationTargetException e) {                    e.printStackTrace();                }            }        }        for (TableBean bean : tableBeanArrayList) {            bean.setPname(tableBean.getPname());            context.write(bean, NullWritable.get());        }    }}

Spark join策略

Spark SQL执行引擎最常使用的3种策略,分别是Broadcast Hash join,Shuffle Hash join和Sort merge join,剩下两种分别是Nested loop join和Cartesian product join。

Broadcast Hash join

相当于MR中的Map join,分为两个阶段。

1. 广播阶段:把小表广播到大表数据所在的executors上

2. Hash join:在每个executor上做Hash join

Broadcast Hash join计算发生在本地的executors上,不需要shuffle,并行度也非常的高。

Spark通过spark.sql.autoBroadcastJoinThreshold参数(默认10MB),将小表广播出去。

以下是某个Spark SQL的执行计划,在该SQL中Spark选择logdate_xxx_t2表作为广播表。

== Physical Plan ==Execute CreateHiveTableAsSelectCommand CreateHiveTableAsSelectCommand [Database:default}, TableName: logdate_xxx_dwd, InsertIntoHiveTable]+- *(1) Project [x3#29, x4#30, x5#31, x6#33, CASE WHEN isnotnull(x7#35) THEN 1 ELSE 0 END AS flag#26]   +- *(1) BroadcastHashJoin [x4#30, x3#29], [x4#35, x3#34],    LeftOuter, BuildRight      :- HiveTableScan [x3#29, x4#30, x5#31, x6#33], HiveTableRelation `default`.`logdate_xxx_ods`,       org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [x3#29, x4#30, x5#31, x6#33], [x6#33]      +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true], input[0, string, true]))         +- HiveTableScan [x1#34, x2#35], HiveTableRelation          `default`.`logdate_xxx_t2`,          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [x1#34, x2#35]

Shuffle Hash join

1. shuffle阶段:根据join的key,把参与join的表的数据分为若干个区(默认使用的是hash partitonner),使得两个表相同的key进入同一个partition;

2. Hash join阶段:每个分区做基本的Hash join。

Shuffle Hash join存在数据倾斜的问题。数据倾斜是在实践中碰到最多的问题之一。

如何使得Shuffle Hash join更高效?首先要明白Shuffle Hash join的瓶颈在哪里:

1. 数据分布,准确来说应该是参与join的key的分布,对Shuffle Hash join性能有极大的影响,具体表现在大部分Task执行的速度非常快,而某个Task执行地非常慢,Spark UI中可以观察到该任务Shuffle的数据量非常的大

2. Shuffle Hash join存在并行度不够的问题。

databricks的工程师在演讲中提到如下几种方法:

Shuffle Hash join有非常多优化的手段,比如在数据储存方面,Spark 推荐使用parquet文件格式,parquet文件格式能够让Spark非常高效自动地识别用Shuffle join还是Broadcast join,而且parquet格式对重复数据的encoding非常地友好,输出的数据量会小一点;Spark默认的并行度为200,把key的分区重新打乱,增大并行度;Spark的新特性adaptive shuffling,动态地调整执行计划和分区数。

Sort Merge join

跟MR中Reduce join非常相似,回顾MR的执行过程。

1. shuffle阶段:跟Shuffle Hash join一样;

2. sort阶段:每个分区对key进行排序(MR框架中,默认会在溢写阶段对分区内的数据进行排序,这是为了方便后续同一个key进入一个分区);

3. merge阶段:两个sorted 的数据进行查找,线性复杂度。

Spark SQL 默认的join策略为Sort Merge join。该策略要求key要可排序,基本数据类型都是可排序的。

当Hash表无法放入内存或者构建Hash表的时间比排序时间长的时候,使用Sort Merge join是一种好的选择。

Nested Loop Join & Cartesian join

对参与join的所有表做Cartesian的遍历,适用范围最广,复杂度最高,速度最慢。

感悟

刚学Spark的时候,总是能搜到databricks的博客,最近懵懵懂懂看了一两篇,写得真的不错,尽管大部分内容还是不太懂。

老祖宗说的好呀,知其然,知其所以然,深入之后才发现,掌握的知识水平测度为零。

PS:放上代码,就是为了凑篇幅。

left join最多几张表_Spark中的join策略相关推荐

  1. MySQL的两张表的七种Join查询

    SQL的语法格式如下 SELECT DISTINCT< select_list > FROM< left_table > < join_type > JOIN &l ...

  2. oracle两张表数据匹配,Oracle-left join两表关联只取B表匹配到的第一条记录

    背景: A表.B表两表关联,关联出来的结果里B表有不止一条,需求是只要B表结果中的某一条(按某字段排序) 经过百度,发现 row_number() over(partition by a order ...

  3. mysql的join语句使用_在MySQL中使用JOIN语句进行连接操作的详细教程

    到目前,我们已经学习了从一个表中获取数据.这是简单的需要,但在大多数现实MySQL的使用,经常需要将数据从多个表中的一个单一的查询. 可以使用多个表中的单一SQL查询.在MySQL中联接(join)行 ...

  4. left join左表百万数据查询慢_Spark SQL 之 Join 实现

    正好最近跑一些spark sql ,重新温习了遍有关联合查询的一些底层实现,参考这位博主的分享Spark SQL 之 Join 实现 Join作为SQL中一个重要语法特性,几乎所有稍微复杂一点的数据分 ...

  5. mysql比对表中数据是否相同_如何用sql比较两张表数据是否一致?

    在批量程序的测试中,经常会涉及到对数据库表的测试,今天我们来介绍一下用sql比较两张表结构相同的表数据是否完全一致的方法. 1.inner join 浅尝 提到比对两张表的数据是否完全相同,很容易想到 ...

  6. MySQL和Oracle中如何update一张表中的字段赋值给另一张表的字段

    MySQL 中实现将 一张表table1 中的字段name 等于table2 中的字段 name 通过相同字段no相连,实现如下: update table1 a1,table2 a2 set a1. ...

  7. mysql不同服务器数据库查询_不同服务器不同数据库两张表连接查询使用经验

    使用SQL语句连接查询位于两个不同的服务器不同的数据库中的两张表,最初将SQL语句写成以下形式select*fromProduct pinnerjoin opendatasource('SQLOLED ...

  8. mysql 多张表公用一个序列_Mysql--序列3--分库分表策略

    分库分表是存储层设计中一个普遍而重大的问题,什么时候分?怎么分?分完之后引发的新问题,比如不能Join.分布式事务? 本篇将从最基本的策略出发,逐步深入讲解这里面涉及的一序列策略. 分库-业务分拆 & ...

  9. SQL中cross join,left join,right join ,full join,inner join 的区别

    http://blog.csdn.net/sgivee/article/details/5081350 SQL中cross join,left join,right join ,full join,i ...

最新文章

  1. 信号状态关_路由器要不要关?难怪信号越来越差
  2. 关于Juniper ScreenOS MIP/VIP地址说明
  3. 阿里雷卷:RSocket从入门到落地,RSocket让AJP换发青春
  4. 如何安装python3.7.6_CentOS7安装Python3.7.6,配置pip,安装配置virtualenv和virtualenvwrapper...
  5. php异步传输,php 异步处理-上传文件
  6. easyswoole数据库连接池_如何在 Swoole 中优雅的实现 MySQL 连接池
  7. Spring Http Invoke 请求过程图
  8. SAP License:SAP中的成本核算模型(调侃版)
  9. 「代码随想录」474.一和零【动态规划】力扣详解!
  10. JQuery源码分析 --- 运动animate 基本原理
  11. 常见404与500错误及含义
  12. python gui测试工具_GitHub - Github-Benjamin/LeChu: Python GUI工具 二次开发
  13. 澳洲网:澳高考生扎堆申请专业 热衷艺术及护理学位
  14. matlab PTB 学习笔记03——精确时间控制
  15. 好心情:吃精神科药物药不见效?你可能忽视了血药浓度
  16. 什么是迭代器(Iterator)
  17. 毒液蛋白质相互作用分析
  18. Estimate in progress using
  19. ZCMU1860: zbj的电梯间
  20. 宇视摄像机接存储卡是否支持热插拔?

热门文章

  1. SQL Server经典sql语句大全
  2. 整合区域医疗资源,共享社区病源,实现三级就诊
  3. python示波器 波形数据_Python在嵌入式开发中的应用——数据示波器
  4. 济南大学计算机考研资料汇总
  5. postgresql Mybatis 序列自增
  6. 【建站指南】解决个人网站图片加载缓慢的问题
  7. vivo X21的Usb调试模式在哪里,打开vivo X21Usb调试模式的步骤
  8. 再谈批量下载Modis数据之Google earth engine
  9. 【转】蛋糕尺寸(寸)、尺寸(CM)、重量(磅)、食用人数对照换算参考表...
  10. dede 获取当前栏目的上一级栏目名称,和链接