Spark中使用Dataset的groupBy/agg/join/broadcast hasjoin/sql broadcast hashjoin示例(java api)
Dataset的groupBy agg示例
Dataset<Row> resultDs = dsParsed .groupBy("enodeb_id", "ecell_id") .agg(functions.first("scan_start_time").alias("scan_start_time1"),functions.first("insert_time").alias("insert_time1"),functions.first("mr_type").alias("mr_type1"),functions.first("mr_ltescphr").alias("mr_ltescphr1"),functions.first("mr_ltescpuschprbnum").alias("mr_ltescpuschprbnum1"),functions.count("enodeb_id").alias("rows1")) .selectExpr("ecell_id", "enodeb_id","scan_start_time1 as scan_start_time","insert_time1 as insert_time","mr_type1 as mr_type","mr_ltescphr1 as mr_ltescphr","mr_ltescpuschprbnum1 as mr_ltescpuschprbnum","rows1 as rows");
Dataset Join示例:
Dataset<Row> ncRes = sparkSession.read().option("delimiter", "|").option("header", true).csv("/user/csv");Dataset<Row> mro=sparkSession.sql("。。。");Dataset<Row> ncJoinMro = ncRes.join(mro, mro.col("id").equalTo(ncRes.col("id")).and(mro.col("calid").equalTo(ncRes.col("calid"))), "left_outer").select(ncRes.col("id").as("int_id"), mro.col("vendor_id"),。。。 );
join condition另外一种方式:
leftDfWithWatermark.join(rightDfWithWatermark, expr(""" leftDfId = rightDfId AND leftDfTime >= rightDfTime AND leftDfTime <= rightDfTime + interval 1 hour"""), joinType = "leftOuter" )
BroadcastHashJoin示例:
package com.dx.testbroadcast;import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.functions;import java.io.*;public class Test {public static void main(String[] args) {String personPath = "E:\\person.csv";String personOrderPath = "E:\\personOrder.csv";//writeToPersion(personPath);//writeToPersionOrder(personOrderPath); SparkConf conf = new SparkConf();SparkSession sparkSession = SparkSession.builder().config(conf).appName("test-broadcast-app").master("local[*]").getOrCreate();Dataset<Row> person = sparkSession.read().option("header", "true").option("inferSchema", "true") //是否自动推到内容的类型.option("delimiter", ",").csv(personPath).as("person");person.printSchema();Dataset<Row> personOrder = sparkSession.read().option("header", "true").option("inferSchema", "true") //是否自动推到内容的类型.option("delimiter", ",").csv(personOrderPath).as("personOrder");personOrder.printSchema();// Default `inner`. Must be one of:`inner`, `cross`, `outer`, `full`, `full_outer`, `left`, `left_outer`,`right`, `right_outer`, `left_semi`, `left_anti`.Dataset<Row> resultDs = personOrder.join(functions.broadcast(person), personOrder.col("personid").equalTo(person.col("id")),"left");resultDs.explain(); resultDs.show(10);}private static void writeToPersion(String personPath) {BufferedWriter personWriter = null;try {personWriter = new BufferedWriter(new FileWriter(personPath));personWriter.write("id,name,age,address\r\n");for (int i = 0; i < 10000; i++) {personWriter.write("" + i + ",person-" + i + "," + i + ",address-address-address-address-address-address-address" + i + "\r\n");}} catch (Exception e) {e.printStackTrace();} finally {if (personWriter != null) {try {personWriter.close();} catch (IOException e) {e.printStackTrace();}}}}private static void writeToPersionOrder(String personOrderPath) {BufferedWriter personWriter = null;try {personWriter = new BufferedWriter(new FileWriter(personOrderPath));personWriter.write("personid,name,age,address\r\n");for (int i = 0; i < 1000000; i++) {personWriter.write("" + i + ",person-" + i + "," + i + ",address-address-address-address-address-address-address" + i + "\r\n");}} catch (Exception e) {e.printStackTrace();} finally {if (personWriter != null) {try {personWriter.close();} catch (IOException e) {e.printStackTrace();}}}} }
打印结果:
== Physical Plan == *(2) BroadcastHashJoin [personid#28], [id#10], LeftOuter, BuildRight :- *(2) FileScan csv [personid#28,name#29,age#30,address#31] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/personOrder.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<personid:int,name:string,age:int,address:string> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))+- *(1) Project [id#10, name#11, age#12, address#13]+- *(1) Filter isnotnull(id#10)+- *(1) FileScan csv [id#10,name#11,age#12,address#13] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/person.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:int,name:string,age:int,address:string>+--------+--------+---+--------------------+---+--------+---+--------------------+ |personid| name|age| address| id| name|age| address| +--------+--------+---+--------------------+---+--------+---+--------------------+ | 0|person-0| 0|address-address-a...| 0|person-0| 0|address-address-a...| | 1|person-1| 1|address-address-a...| 1|person-1| 1|address-address-a...| | 2|person-2| 2|address-address-a...| 2|person-2| 2|address-address-a...| | 3|person-3| 3|address-address-a...| 3|person-3| 3|address-address-a...| | 4|person-4| 4|address-address-a...| 4|person-4| 4|address-address-a...| | 5|person-5| 5|address-address-a...| 5|person-5| 5|address-address-a...| | 6|person-6| 6|address-address-a...| 6|person-6| 6|address-address-a...| | 7|person-7| 7|address-address-a...| 7|person-7| 7|address-address-a...| | 8|person-8| 8|address-address-a...| 8|person-8| 8|address-address-a...| | 9|person-9| 9|address-address-a...| 9|person-9| 9|address-address-a...| +--------+--------+---+--------------------+---+--------+---+--------------------+ only showing top 10 rows
SparkSQL Broadcast HashJoin
person.createOrReplaceTempView("temp_person");personOrder.createOrReplaceTempView("temp_person_order");Dataset<Row> sqlResult = sparkSession.sql(" SELECT /*+ BROADCAST (t11) */" +" t11.id,t11.name,t11.age,t11.address," +" t10.personid as person_id,t10.name as persion_order_name" +" FROM temp_person_order as t10 " +" inner join temp_person as t11" +" on t11.id = t10.personid ");sqlResult.show(10);sqlResult.explain();
打印日志
+---+--------+---+--------------------+---------+------------------+ | id| name|age| address|person_id|persion_order_name| +---+--------+---+--------------------+---------+------------------+ | 0|person-0| 0|address-address-a...| 0| person-0| | 1|person-1| 1|address-address-a...| 1| person-1| | 2|person-2| 2|address-address-a...| 2| person-2| | 3|person-3| 3|address-address-a...| 3| person-3| | 4|person-4| 4|address-address-a...| 4| person-4| | 5|person-5| 5|address-address-a...| 5| person-5| | 6|person-6| 6|address-address-a...| 6| person-6| | 7|person-7| 7|address-address-a...| 7| person-7| | 8|person-8| 8|address-address-a...| 8| person-8| | 9|person-9| 9|address-address-a...| 9| person-9| +---+--------+---+--------------------+---------+------------------+ only showing top 10 rows19/06/24 09:35:50 INFO FileSourceStrategy: Pruning directories with: 19/06/24 09:35:50 INFO FileSourceStrategy: Post-Scan Filters: isnotnull(personid#28) 19/06/24 09:35:50 INFO FileSourceStrategy: Output Data Schema: struct<personid: int, name: string> 19/06/24 09:35:50 INFO FileSourceScanExec: Pushed Filters: IsNotNull(personid) 19/06/24 09:35:50 INFO FileSourceStrategy: Pruning directories with: 19/06/24 09:35:50 INFO FileSourceStrategy: Post-Scan Filters: isnotnull(id#10) 19/06/24 09:35:50 INFO FileSourceStrategy: Output Data Schema: struct<id: int, name: string, age: int, address: string ... 2 more fields> 19/06/24 09:35:50 INFO FileSourceScanExec: Pushed Filters: IsNotNull(id) == Physical Plan == *(2) Project [id#10, name#11, age#12, address#13, personid#28 AS person_id#94, name#29 AS persion_order_name#95] +- *(2) BroadcastHashJoin [personid#28], [id#10], Inner, BuildRight:- *(2) Project [personid#28, name#29]: +- *(2) Filter isnotnull(personid#28): +- *(2) FileScan csv [personid#28,name#29] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/personOrder.csv], PartitionFilters: [], PushedFilters: [IsNotNull(personid)], ReadSchema: struct<personid:int,name:string>+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))+- *(1) Project [id#10, name#11, age#12, address#13]+- *(1) Filter isnotnull(id#10)+- *(1) FileScan csv [id#10,name#11,age#12,address#13] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/person.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:int,name:string,age:int,address:string> 19/06/24 09:35:50 INFO SparkContext: Invoking stop() from shutdown hook
转载于:https://www.cnblogs.com/yy3b2007com/p/9776521.html
Spark中使用Dataset的groupBy/agg/join/broadcast hasjoin/sql broadcast hashjoin示例(java api)相关推荐
- Spark中RDD、DataFrame和DataSet的区别与联系
一.RDD.DataFrame和DataSet的定义 在开始Spark RDD与DataFrame与Dataset之间的比较之前,先让我们看一下Spark中的RDD,DataFrame和Dataset ...
- 在Spark中自定义Kryo序列化输入输出API(转)
原文链接:在Spark中自定义Kryo序列化输入输出API 在Spark中内置支持两种系列化格式:(1).Java serialization:(2).Kryo serialization.在默认情况 ...
- spark中dataframe解析_SparkSql 中 JOIN的实现
Join作为SQL中一个重要语法特性,几乎所有稍微复杂一点的数据分析场景都离不开Join,如今Spark SQL(Dataset/DataFrame)已经成为Spark应用程序开发的主流,作为开发者, ...
- ML之FE:pandas库中数据分析利器之groupby分组函数、agg聚合函数、同时使用groupby与agg函数组合案例之详细攻略
ML之FE:pandas库中数据分析利器之groupby分组函数.agg聚合函数.同时使用groupby与agg函数组合案例之详细攻略 目录 pandas库中数据分析利器之groupby分组函数.ag ...
- Spark 浅谈Spark中的各种join
众所周知,Join的种类丰富: 按照**关联形式(**Join type)划分: 有内关联,外关联,左关联,右关联,半关联,逆关联等,由业务逻辑决定的关联形式决定了Spark任务的运行结果; 按照关联 ...
- Spark中RDD与DataFrame与DataSet的区别与联系
1.概述 这是一个面试题 在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似传统数据库中的二维表格 DataFrame与RDD的主要区别在于,前者带有schema元数据信息,既 ...
- Spark中,RDD概述(五大属性,弹性介绍,5个特性)
1 什么是RDD RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark 中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可并行计算的集合.在Spa ...
- spark中local模式与cluster模式使用场景_Spark 知识点 ( 架构 RDD Task )
1. Spark介绍 Apache Spark是一个围绕速度.易用性和复杂分析构建的大数据处理框架,最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一 ...
- Apache Spark中实现的MapReduce设计模式
该博客是该系列文章的第一篇,讨论了MapReduce设计模式一书中的一些设计模式,并展示了如何在Apache Spark(R)中实现这些模式. 在编写MapReduce或Spark程序时,考虑执行作业 ...
- Spark中DataFrame 基本操作函数
DataFrame的基本操作函数 原文链接 https://blog.csdn.net/u010003835/article/details/106436091?utm_medium=distribu ...
最新文章
- SAP WM中阶Storage Type的Capacity Check – Check based on maximum quantity per bin in storage type.
- dubbo学习之dubbo管理控制台装配及集成zookeeper集群部署(1)【转】
- hdu4081 最小树+DFS或者次小树的变形
- 计算机网络-IP地址的分类
- 牛客练习赛36 F-Rabbit的蛋糕 (叉积求面积, 记录前缀)
- 推荐六款帮助你实现惊艳视差滚动效果的 jQuery 插件
- Java EE重新审视设计模式:异步
- 《又见一帘幽梦》高清视频
- DateTime字段控件值显示短格式的做法
- 两个栈实现队列,两个队列实现栈
- C++ std::condition_variable 是什么 有什么用 条件变量 线程同步 wait wait_for notify_one notify_all
- 在EditPlus中配置java快捷键
- 服务器安全防护措施有哪些?
- 超全面UI基础设计规范
- 关于VS.NET中多个项目的工程相互引用和多个dll引用的问题! - antony--异域空间 - 博客园
- 201809-4 再卖菜 ccf
- 例题 - 最近公共祖先 - 离线算法
- Android Native 代码 Release 编译 - 隐藏符号表
- 用VC实现Html编辑器
- 【转】完美解决iphone连电脑蓝牙出现bluetooth外围设备无法正确安装