Spark中的Join操作及问题解决
目录
- 一.数据准备
- 二.Spark Core中的Join
- 三.Spark SQL中的Join
- 1.常用案例
- 2.Spark SQL中的5种Join
- 四.解决数据倾斜问题
- 1.大表Join小表(100MB级别),直接广播
- 2.大表Join小表(GB级别),大表打散,小表扩容
- 3.大表(少量倾斜)Join其他表,倾斜的部分广播
一.数据准备
grade.txt:(id, grade)
1,75
1,86
1,64
2,76address.txt:(id, address)
1,shanghai
1,beijing
二.Spark Core中的Join
需要把每行转换成键值对的形式。
val grade = sc.textFile("D:\\study\\ideaProject\\first_maven\\input\\grade.txt").map(_.split(",")).map(x => (x(0), x(1)))
val address = sc.textFile("D:\\study\\ideaProject\\first_maven\\input\\address.txt").map(_.split(",")).map(x => (x(0), x(1)))
innerJoin:
val joinRDD = grade.join(address).foreach(println)(1,(75,shanghai))
(1,(75,beijing))
(1,(86,shanghai))
(1,(86,beijing))
(1,(64,shanghai))
(1,(64,beijing))
leftOuterJoin:
val joinRDD2 = grade.leftOuterJoin(address).foreach(println)(2,(76,None))
(1,(75,Some(shanghai)))
(1,(75,Some(beijing)))
(1,(86,Some(shanghai)))
(1,(86,Some(beijing)))
(1,(64,Some(shanghai)))
(1,(64,Some(beijing)))
三.Spark SQL中的Join
1.常用案例
import spark.implicits._
val grade = sc.textFile("D:\\study\\ideaProject\\first_maven\\input\\grade.txt").map(_.split(",")).map(x => (x(0), x(1))).toDF("id", "grade")val address = sc.textFile("D:\\study\\ideaProject\\first_maven\\input\\address.txt").map(_.split(",")).map(x => (x(0), x(1))).toDF("id", "address")
如果只是单纯的Join,是求的笛卡尔积操作。感觉和Cross Join一样。
grade.join(address).show()+---+-----+---+--------+
| id|grade| id| address|
+---+-----+---+--------+
| 1| 75| 1|shanghai|
| 1| 86| 1|shanghai|
| 1| 75| 1| beijing|
| 1| 86| 1| beijing|
| 1| 64| 1|shanghai|
| 2| 76| 1|shanghai|
| 1| 64| 1| beijing|
| 2| 76| 1| beijing|
+---+-----+---+--------+
需要指定grade(“id”) === address(“id”),不过也会出现重复列 id。
grade.join(address, grade("id") === address("id")).show()+---+-----+---+--------+
| id|grade| id| address|
+---+-----+---+--------+
| 1| 75| 1|shanghai|
| 1| 75| 1| beijing|
| 1| 86| 1|shanghai|
| 1| 86| 1| beijing|
| 1| 64| 1|shanghai|
| 1| 64| 1| beijing|
+---+-----+---+--------+
left_outer操作,左表会完整显示,右表部分为null。
grade.join(address, grade("id") === address("id"), "left_outer").show()+---+-----+----+--------+
| id|grade| id| address|
+---+-----+----+--------+
| 1| 75| 1|shanghai|
| 1| 75| 1| beijing|
| 1| 86| 1|shanghai|
| 1| 86| 1| beijing|
| 1| 64| 1|shanghai|
| 1| 64| 1| beijing|
| 2| 76|null| null|
+---+-----+----+--------+
使用Seq(“id”),可以去掉重复列。
grade.join(address, Seq("id"), "left_outer").show()+---+-----+--------+
| id|grade| address|
+---+-----+--------+
| 1| 75|shanghai|
| 1| 75| beijing|
| 1| 86|shanghai|
| 1| 86| beijing|
| 1| 64|shanghai|
| 1| 64| beijing|
| 2| 76| null|
+---+-----+--------+
2.Spark SQL中的5种Join
参考文章:Spark的五种JOIN策略解析
四.解决数据倾斜问题
1.大表Join小表(100MB级别),直接广播
spark.sql("""|select /*+ BROADCAST (address) */ * from grade, address where grade.id = address.id|""".stripMargin).show()
运行结果:
+---+-----+---+--------+
| id|grade| id| address|
+---+-----+---+--------+
| 1| 75| 1| beijing|
| 1| 75| 1|shanghai|
| 1| 86| 1| beijing|
| 1| 86| 1|shanghai|
| 1| 64| 1| beijing|
| 1| 64| 1|shanghai|
| 2| 76| 2| wuhan|
+---+-----+---+--------+
2.大表Join小表(GB级别),大表打散,小表扩容
spark.sql("""|select id, grade, concat(id, ceiling(rand() * 100) % 3) as new_id from grade|""".stripMargin).createOrReplaceTempView("grade_new")spark.sql("""|select id, address, concat(id, suffix) as new_id|from (| select id, address, suffix from address Lateral View explode(array(0, 1, 2)) tmp as suffix|)|""".stripMargin).createOrReplaceTempView("address_new")spark.sql("""|select SUBSTRING(grade_new.new_id, 0, 1) as id, grade, address from grade_new, address_new where grade_new.new_id = address_new.new_id|""".stripMargin).show()
运行结果:
+---+-----+--------+
| id|grade| address|
+---+-----+--------+
| 1| 75|shanghai|
| 1| 75| beijing|
| 1| 86|shanghai|
| 1| 86| beijing|
| 1| 64|shanghai|
| 1| 64| beijing|
| 2| 76| wuhan|
+---+-----+--------+
3.大表(少量倾斜)Join其他表,倾斜的部分广播
spark.sql("""|select * from grade where id <> 1|""".stripMargin).createOrReplaceTempView("grade_noskew")spark.sql("""|select * from grade where id = 1|""".stripMargin).createOrReplaceTempView("grade_skew")spark.sql("""|select * from (| select grade_noskew.id, grade, address| from grade_noskew, address where grade_noskew.id = address.id| union all| select /*+ BROADCAST (grade_skew) */ grade_skew.id, grade, address| from grade_skew, address where grade_skew.id = address.id|)|""".stripMargin).show()
运行结果:
+---+-----+--------+
| id|grade| address|
+---+-----+--------+
| 2| 76| wuhan|
| 1| 64|shanghai|
| 1| 86|shanghai|
| 1| 75|shanghai|
| 1| 64| beijing|
| 1| 86| beijing|
| 1| 75| beijing|
+---+-----+--------+
Spark中的Join操作及问题解决相关推荐
- (转载)如何理解RxJava中的join操作
转载:http://avenwu.net/2016/05/10/understand-the-join-operation-in-rx/ 前言 先前写过一篇文章,介绍Rx中不容易理解的概念(Rx那些不 ...
- SQL中的join操作总结(非常好)
1.1.1 摘要 Join是关系型数据库系统的重要操作之一,SQL Server中包含的常用Join:内联接.外联接和交叉联接等.如果我们想在两个或以上的表获取其中从一个表中的行与另一个表中的行匹配的 ...
- spark大小表join操作
spark大小表做join时为了避免数据倾斜并提高效率可以吧小表的rdd放到广播变量中,这样每个executor都会保存一份小文件的数据,避免shuffle 例子: def main(args: Ar ...
- Spark中的键值对操作-scala
1.PairRDD介绍 Spark为包含键值对类型的RDD提供了一些专有的操作.这些RDD被称为PairRDD.PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口.例如,PairRD ...
- spark中local模式与cluster模式使用场景_Spark 知识点 ( 架构 RDD Task )
1. Spark介绍 Apache Spark是一个围绕速度.易用性和复杂分析构建的大数据处理框架,最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一 ...
- Spark中Data skew(数据倾斜)Java+Python+Scala三种接口完整代码
起因 代码中shuffle的算子存在的地方,groupByKey.countByKey.reduceByKey.join等 判断一个算子是shuffle算子可以通过[20] 出现的问题有两种 ①大部分 ...
- Spark SQL JOIN操作代码示例
title: Spark SQL JOIN操作 date: 2021-05-08 15:53:21 tags: Spark 本文主要介绍 Spark SQL 的多表连接,需要预先准备测试数据.分别创建 ...
- spark做两张大表的join操作,mapPartition和重分区算子的使用策略
Spark中做两个大hive表的join操作,先读取过来处理成两个数据量很大的RDD,如果两个RDD直接进行join操作,势必会造成shuffle等导致运行非常缓慢,那么怎么优化呢?方法如下: 首先, ...
- shell中join链接多个域_shell 如何实现两个表的join操作
shell 如何实现两个表的join操作 今天研究的一个问题是:在Shell 脚本中如何实现两个表的 join 操作,这里说的两个表示的其实是 两个文件,但是文件是列表的形式,有固定的分割符号,即就相 ...
最新文章
- windows端自动化遇到的问题
- 揭秘Kaggle神器xgboost
- 学计算机大四找不到工作怎么办,大四学生延迟毕业找不到工作,我不能被原谅吗?...
- python没有库_Python开发者必备6个基本库,这个都没有怎么做开发呢
- 全球五大顶级域名一周统计:7月第三周新增超9万个
- link2001错误无法解析外部符号metaObject
- 她说程序员不懂浪漫,生日宴上惨变单身狗,其实,程序员的浪漫你不懂!
- mysql判断存在返回布尔_MySqlClient访问tinyint字段返回布尔值篇
- 开奖啦!CSDN 程序员节 1024 中奖名单揭晓!
- 获取字符串的真实长度
- scare机器人如何手眼标定_SCARA机器人手眼标定之目标抓取
- [4.6]-AutoSAR零基础学习-CAN通信协议
- sklearn学习之LR算法实践
- 如何设置DiffMerge不进行Class文件的比较
- 英文字母间隔很大怎么解决?全角半角的概念
- Python 实现按键精灵的功能,超简单详细(Windows版)
- opencv(c++)几何变换------图像平移、旋转、缩放、翻转、剪贴
- 2023海南大学计算机考研信息汇总
- JAVA:事件监听器之Button类中的addActionListener(ActionListener l)方法
- c++ 求int数组的长度
热门文章
- C语言浮点数比较大小
- Linux命令英文对照表
- nagios监控安装配置文档+139邮箱报警
- AirPods Pro 2出现随机断连问题,充电仓支持Find My成最大亮点
- 程序员:不是所有的程序员称为程序员!
- 大一新生的pta错题归纳
- python共享单车数据分析_数据分析_共享单车骑行时间分析-zeropython
- Android6.0 源码修改之屏蔽导航栏虚拟按键(Home和RecentAPP)/动态显示和隐藏NavigationBar...
- Android8.1 MTK平台 修改 Volte 视频通话我方视角为矩形
- 头皮发麻之win10宽带拨号错误797