本文主要介绍spark join相关操作。

讲述spark连接相关的三个方法join,left-outer-join,right-outer-join,在这之前,我们用hiveSQL先跑出了结果以方便进行对比。

我们以实例来进行说明。我的实现步骤记录如下。

1、数据准备

2、HSQL描述

3、Spark描述

1、数据准备

我们准备两张Hive表,分别是orders(订单表)和drivers(司机表),通过driver_id字段进行关联。数据如下:

orders

orders表有两个字段,订单id:order_id和司机id:driver_id。司机id将作为连接键。

通过select可以看到三条数据。

hive (gulfstream_test)> select * from orders;
OK
orders.order_id orders.driver_id
1000    5000
1001    5001
1002    5002
Time taken: 0.387 seconds, Fetched: 3 row(s)

drivers

drivers表由两个字段,司机id:driver_id和车辆id:car_id。司机id将作为连接键。

通过select可以看到两条数据。

hive (gulfstream_test)> select * from drivers;
OK
drivers.driver_id       drivers.car_id
5000    100
5003    103
Time taken: 0.036 seconds, Fetched: 2 row(s)

2、HSQL描述

JOIN

自然连接,输出连接键匹配的记录。

可以看到,通过driver_id匹配的数据只有一条。

hive (gulfstream_test)> select * from orders t1 join drivers t2 on (t1.driver_id = t2.driver_id) ;
OK
t1.order_id     t1.driver_id    t2.driver_id    t2.car_id
1000    5000    5000    100
Time taken: 36.079 seconds, Fetched: 1 row(s)

LEFT OUTER JOIN

左外链接,输出连接键匹配的记录,左侧的表无论匹配与否都输出。

可以看到,通过driver_id匹配的数据只有一条,不过所有orders表中的记录都被输出了,drivers中未能匹配的字段被置为空。

hive (gulfstream_test)> select * from orders t1 left outer join drivers t2 on (t1.driver_id = t2.driver_id) ;
OK
t1.order_id     t1.driver_id    t2.driver_id    t2.car_id
1000    5000    5000    100
1001    5001    NULL    NULL
1002    5002    NULL    NULL
Time taken: 36.063 seconds, Fetched: 3 row(s)

RIGHT OUTER JOIN

右外连接,输出连接键匹配的记录,右侧的表无论匹配与否都输出。

可以看到,通过driver_id匹配的数据只有一条,不过所有drivers表中的记录都被输出了,orders中未能匹配的字段被置为空。

hive (gulfstream_test)> select * from orders t1 right outer join drivers t2 on (t1.driver_id = t2.driver_id) ;
OK
t1.order_id     t1.driver_id    t2.driver_id    t2.car_id
1000    5000    5000    100
NULL    NULL    5003    103
Time taken: 30.089 seconds, Fetched: 2 row(s)

3、Spark描述

spark实现join的方式也是通过RDD的算子,spark同样提供了三个算子join,leftOuterJoin,rightOuterJoin。

在下面给出的例子中,我们通过spark-hive读取了Hive中orders表和drivers表中的数据,这时候数据的表现形式是DataFrame,如果要使用Join操作:

1)首先需要先将DataFrame转化成了JavaRDD。

2)不过,JavaRDD其实是没有join算子的,下面还需要通过mapToPair算子将JavaRDD转换成JavaPairRDD,这样就可以使用Join了。

下面例子中给出了三种join操作的实现方式,在join之后,通过collect()函数把数据拉到Driver端本地,并通过标准输出打印。

需要指出的是

1)join算子(join,leftOuterJoin,rightOuterJoin)只能通过PairRDD使用;

2)join算子操作的Tuple2<Object1, Object2>类型中,Object1是连接键,我只试过Integer和String,Object2比较灵活,甚至可以是整个Row。

这里我们使用driver_id作为连接键。 所以在输出Tuple2的时候,我们将driver_id放在了前面。

Join.java

/*
*   spark-submit --queue=root.zhiliangbu_prod_datamonitor spark-join-1.0-SNAPSHOT-jar-with-dependencies.jar
* */
public class Join implements Serializable {private transient JavaSparkContext javaSparkContext;private transient HiveContext hiveContext;/**   初始化Load*   创建sparkContext, sqlContext, hiveContext* */public Join() {initSparckContext();initHiveContext();}/**   创建sparkContext* */private void initSparckContext() {String warehouseLocation = System.getProperty("user.dir");SparkConf sparkConf = new SparkConf().setAppName("spark-join").set("spark.sql.warehouse.dir", warehouseLocation).setMaster("yarn-client");javaSparkContext = new JavaSparkContext(sparkConf);}/**   创建hiveContext*   用于读取Hive中的数据* */private void initHiveContext() {hiveContext = new HiveContext(javaSparkContext);}public void join() {/**   生成rdd1* */String query1 = "select * from gulfstream_test.orders";DataFrame rows1 = hiveContext.sql(query1).select("order_id", "driver_id");JavaPairRDD<String, String> rdd1 = rows1.toJavaRDD().mapToPair(new PairFunction<Row, String, String>() {@Overridepublic Tuple2<String, String> call(Row row) throws Exception {String orderId = (String)row.get(0);String driverId = (String)row.get(1);return new Tuple2<String, String>(driverId, orderId);}});/**   生成rdd2* */String query2 = "select * from gulfstream_test.drivers";DataFrame rows2 = hiveContext.sql(query2).select("driver_id", "car_id");JavaPairRDD<String, String> rdd2 = rows2.toJavaRDD().mapToPair(new PairFunction<Row, String, String>() {@Overridepublic Tuple2<String, String> call(Row row) throws Exception {String driverId = (String)row.get(0);String carId = (String)row.get(1);return new Tuple2<String, String>(driverId, carId);}});/**   join* */System.out.println(" ****************** join *******************");JavaPairRDD<String, Tuple2<String, String>> joinRdd = rdd1.join(rdd2);Iterator<Tuple2<String, Tuple2<String, String>>> it1 = joinRdd.collect().iterator();while (it1.hasNext()) {Tuple2<String, Tuple2<String, String>> item = it1.next();System.out.println("driver_id:" + item._1 + ", order_id:" + item._2._1 + ", car_id:" + item._2._2 );}/**   leftOuterJoin* */System.out.println(" ****************** leftOuterJoin *******************");JavaPairRDD<String, Tuple2<String, Optional<String>>> leftOuterJoinRdd = rdd1.leftOuterJoin(rdd2);Iterator<Tuple2<String, Tuple2<String, Optional<String>>>> it2 = leftOuterJoinRdd.collect().iterator();while (it2.hasNext()) {Tuple2<String, Tuple2<String, Optional<String>>> item = it2.next();System.out.println("driver_id:" + item._1 + ", order_id:" + item._2._1 + ", car_id:" + item._2._2 );}/**   rightOuterJoin* */System.out.println(" ****************** rightOuterJoin *******************");JavaPairRDD<String, Tuple2<Optional<String>, String>> rightOuterJoinRdd = rdd1.rightOuterJoin(rdd2);Iterator<Tuple2<String, Tuple2<Optional<String>, String>>> it3 = rightOuterJoinRdd.collect().iterator();while (it3.hasNext()) {Tuple2<String, Tuple2<Optional<String>, String>> item = it3.next();System.out.println("driver_id:" + item._1 + ", order_id:" + item._2._1 + ", car_id:" + item._2._2 );}}public static void main(String[] args) {Join sj = new Join();sj.join();}}

执行结果

其中Optional.absent()表示的就是null,可以看到和HSQL是一致的。

Application ID is application_1508228032068_2746260, trackingURL: http://10.93.21.21:4040****************** join *******************
driver_id:5000, order_id:1000, car_id:100                                       ****************** leftOuterJoin *******************
driver_id:5001, order_id:1001, car_id:Optional.absent()
driver_id:5002, order_id:1002, car_id:Optional.absent()
driver_id:5000, order_id:1000, car_id:Optional.of(100)****************** rightOuterJoin *******************
driver_id:5003, order_id:Optional.absent(), car_id:103
driver_id:5000, order_id:Optional.of(1000), car_id:100

由于数据量不大,我没有从执行效率上进行考量。

根据经验,一般在数据量较大的情况下,HSQL的执行效率会高一些,如果数据量较小,Spark会快。

spark和HSQL的连接join方式相关推荐

  1. Spark源码阅读(五) --- Spark的支持的join方式以及join策略

    版本变动 2021-08-30 增加了对Broadcast Hash Join小表大小的评估内容 增加了对Sort Merge Join优于Shuffle Hash Join调用的解释 目录 Spar ...

  2. SQL的连接(join)有哪些常见形式?解释一下?解释 SQL 的 left join 和 right join?

    SQL的连接(join)有哪些常见形式?解释一下?解释 SQL 的 left join 和 right join? SQL的连接(join)有哪些常见形式? sql连接查询:把多张表的列组合在一起,产 ...

  3. MySql 内连接,外连接查询方式区别

    MySql 内连接,外连接查询方式 CREATE TABLE `question_test` (`q_id` int(11) DEFAULT NULL,`q_name` varchar(10) DEF ...

  4. mysql+join的原理,Mysql连接join查询原理知识点

    Mysql连接join查询原理知识点 Mysql连接(join)查询 1.基本概念 将两个表的每一行,以"两两横向对接"的方式,所得到的所有行的结果. 假设: 表A有n1行,m1列 ...

  5. 达梦中的连接查询方式

    达梦中的连接查询方式,全部参照官方文档. 查询语句中FROM子句包含多个表时,称为连接查询.生成连接查询的执行计划,需要考虑三方面因素:访问路径.连接方式.连接顺序. 访问路径指对于每张表采用何种方式 ...

  6. ORACLE中的全连接(Full Join)、内连接(JOIN/INNER JOIN)、左连接(Left Join)、右连接(Left Join)、(+)符号以及Theta连接

    测试表: A表:LS_TEMP1 B表:LS_TEMP2 左连接(左外连接)A LEFT JOIN B / (+)放在B表后边:左表为基础,显示所有左表数据,右表只显示能与左表关联上的数据 右连接(右 ...

  7. android string.join java8_Java8 - 更优雅的字符串连接(join)收集器 Collectors.joining

    Java8 - 更优雅的字符串连接(join)收集器 Collectors.joining Zebe 2018-10-15 38 0 Java,Java8 StringBuilder,Collecto ...

  8. 数据库学习-连接/join

    数据库的连接在我看来就是通过连接将两个表的合成一个新的表 连接/join有四种方式,内连接-inner join,左连接-left join,右连接-right join,全连接-full join. ...

  9. mysql版本链图解_图解 SQL 中各种连接 JOIN

    先用文字来捋一下思路,数据库操作中无非就是「增删查改」,其中「 查」用得最多且最复杂,变化多端.查询的时候,我们可以只是单表查询,也可以是多表连接查询,单表查询中的学问也很大,但限于篇幅,本次主要分享 ...

最新文章

  1. mysqldump和xtrabackup备份原理实现说明
  2. 如何用Python和深度神经网络识别图像?
  3. SparkSQL在IDEA中的使用
  4. Postman教程大全
  5. 设计模式C++实现(9)——享元模式
  6. db2 oracle mysql sqlserver_mysql、sqlserver、db2、oracle、hsql数据库获取数据库连接方法及分页函数...
  7. 管理表空间和数据文件——维护表空间——设置默认表空间和删除表空间和删除数据文件盒临时文件...
  8. win10没有windows无线服务器,win10系统电脑没windows无线服务的解决方法
  9. 苹果又出新专利?全包围屏幕iPhone
  10. Spring(二十二):Spring 事务
  11. 家政O2O要在寒冬下为用户创造体验春天
  12. SAP Brazil J1BTAX 为税收例外创建税收组(翻译)
  13. [2018.03.13 T1] 比赛(contest)
  14. 西电网络攻防大赛--渗透测试第五题
  15. 不可不知的设计师接活报价公式
  16. 新版标准日本语中级_第二十六课
  17. 《FMEA潜在失效模式及效应分析实务》课程大纲--台湾李文棕老师
  18. 令人不寒而栗的黄蓉(转)
  19. python居然能语音控制电脑壁纸切换,只需60行代码
  20. Elasticsearch 数据迁移方案

热门文章

  1. mysql 定时器停止_mysql事件【定时器】
  2. centos mysql 安装 yum源_Linux - CentOS 7 通过Yum源安装 MySql 5.7
  3. ctypes python_Python使用Ctypes与C/C++
  4. javascript--弹出对话框 四种对话框 获得用户输入值 .
  5. activity5.1初始密码
  6. linux删除目录是显示非空,Linux删除非空目录
  7. xp中mysql优化_XP加速技巧大盘点 看看那个最实用
  8. input python 验证int_全国计算机二级Python真题解析-1
  9. python求n的阶乘_python求n的阶乘
  10. python安装成功第三方库但import出问题_解析pip安装第三方库但PyCharm中却无法识别的问题及PyCharm安装第三方库的方法教程...