Dataset的groupBy agg示例

Dataset<Row> resultDs = dsParsed
.groupBy("enodeb_id", "ecell_id")
.agg(functions.first("scan_start_time").alias("scan_start_time1"),functions.first("insert_time").alias("insert_time1"),functions.first("mr_type").alias("mr_type1"),functions.first("mr_ltescphr").alias("mr_ltescphr1"),functions.first("mr_ltescpuschprbnum").alias("mr_ltescpuschprbnum1"),functions.count("enodeb_id").alias("rows1"))
.selectExpr("ecell_id", "enodeb_id","scan_start_time1 as scan_start_time","insert_time1 as insert_time","mr_type1 as mr_type","mr_ltescphr1 as mr_ltescphr","mr_ltescpuschprbnum1 as mr_ltescpuschprbnum","rows1 as rows");

Dataset Join示例:

        Dataset<Row> ncRes = sparkSession.read().option("delimiter", "|").option("header", true).csv("/user/csv");Dataset<Row> mro=sparkSession.sql("。。。");Dataset<Row> ncJoinMro = ncRes.join(mro, mro.col("id").equalTo(ncRes.col("id")).and(mro.col("calid").equalTo(ncRes.col("calid"))), "left_outer").select(ncRes.col("id").as("int_id"), mro.col("vendor_id"),。。。                 );

join condition另外一种方式:

leftDfWithWatermark.join(rightDfWithWatermark,   expr(""" leftDfId = rightDfId AND leftDfTime >= rightDfTime AND leftDfTime <= rightDfTime + interval 1 hour"""),   joinType = "leftOuter" )

BroadcastHashJoin示例:

package com.dx.testbroadcast;import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;import java.io.*;public class Test {public static void main(String[] args) {String personPath = "E:\\person.csv";String personOrderPath = "E:\\personOrder.csv";//writeToPersion(personPath);//writeToPersionOrder(personOrderPath);
SparkConf conf = new SparkConf();SparkSession sparkSession = SparkSession.builder().config(conf).appName("test-broadcast-app").master("local[*]").getOrCreate();Dataset<Row> person = sparkSession.read().option("header", "true").option("inferSchema", "true") //是否自动推到内容的类型.option("delimiter", ",").csv(personPath).as("person");person.printSchema();Dataset<Row> personOrder = sparkSession.read().option("header", "true").option("inferSchema", "true") //是否自动推到内容的类型.option("delimiter", ",").csv(personOrderPath).as("personOrder");personOrder.printSchema();// Default `inner`. Must be one of:`inner`, `cross`, `outer`, `full`, `full_outer`, `left`, `left_outer`,`right`, `right_outer`, `left_semi`, `left_anti`.Dataset<Row> resultDs = personOrder.join(functions.broadcast(person), personOrder.col("personid").equalTo(person.col("id")),"left");resultDs.explain();        resultDs.show(10);}private static void writeToPersion(String personPath) {BufferedWriter personWriter = null;try {personWriter = new BufferedWriter(new FileWriter(personPath));personWriter.write("id,name,age,address\r\n");for (int i = 0; i < 10000; i++) {personWriter.write("" + i + ",person-" + i + "," + i + ",address-address-address-address-address-address-address" + i + "\r\n");}} catch (Exception e) {e.printStackTrace();} finally {if (personWriter != null) {try {personWriter.close();} catch (IOException e) {e.printStackTrace();}}}}private static void writeToPersionOrder(String personOrderPath) {BufferedWriter personWriter = null;try {personWriter = new BufferedWriter(new FileWriter(personOrderPath));personWriter.write("personid,name,age,address\r\n");for (int i = 0; i < 1000000; i++) {personWriter.write("" + i + ",person-" + i + "," + i + ",address-address-address-address-address-address-address" + i + "\r\n");}} catch (Exception e) {e.printStackTrace();} finally {if (personWriter != null) {try {personWriter.close();} catch (IOException e) {e.printStackTrace();}}}}
}

打印结果:

== Physical Plan ==
*(2) BroadcastHashJoin [personid#28], [id#10], LeftOuter, BuildRight
:- *(2) FileScan csv [personid#28,name#29,age#30,address#31] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/personOrder.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<personid:int,name:string,age:int,address:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))+- *(1) Project [id#10, name#11, age#12, address#13]+- *(1) Filter isnotnull(id#10)+- *(1) FileScan csv [id#10,name#11,age#12,address#13] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/person.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:int,name:string,age:int,address:string>+--------+--------+---+--------------------+---+--------+---+--------------------+
|personid|    name|age|             address| id|    name|age|             address|
+--------+--------+---+--------------------+---+--------+---+--------------------+
|       0|person-0|  0|address-address-a...|  0|person-0|  0|address-address-a...|
|       1|person-1|  1|address-address-a...|  1|person-1|  1|address-address-a...|
|       2|person-2|  2|address-address-a...|  2|person-2|  2|address-address-a...|
|       3|person-3|  3|address-address-a...|  3|person-3|  3|address-address-a...|
|       4|person-4|  4|address-address-a...|  4|person-4|  4|address-address-a...|
|       5|person-5|  5|address-address-a...|  5|person-5|  5|address-address-a...|
|       6|person-6|  6|address-address-a...|  6|person-6|  6|address-address-a...|
|       7|person-7|  7|address-address-a...|  7|person-7|  7|address-address-a...|
|       8|person-8|  8|address-address-a...|  8|person-8|  8|address-address-a...|
|       9|person-9|  9|address-address-a...|  9|person-9|  9|address-address-a...|
+--------+--------+---+--------------------+---+--------+---+--------------------+
only showing top 10 rows

SparkSQL Broadcast HashJoin

        person.createOrReplaceTempView("temp_person");personOrder.createOrReplaceTempView("temp_person_order");Dataset<Row> sqlResult = sparkSession.sql(" SELECT /*+ BROADCAST (t11) */" +" t11.id,t11.name,t11.age,t11.address," +" t10.personid as person_id,t10.name as persion_order_name" +" FROM temp_person_order as t10 " +" inner join temp_person as t11" +" on t11.id = t10.personid ");sqlResult.show(10);sqlResult.explain();

打印日志

+---+--------+---+--------------------+---------+------------------+
| id|    name|age|             address|person_id|persion_order_name|
+---+--------+---+--------------------+---------+------------------+
|  0|person-0|  0|address-address-a...|        0|          person-0|
|  1|person-1|  1|address-address-a...|        1|          person-1|
|  2|person-2|  2|address-address-a...|        2|          person-2|
|  3|person-3|  3|address-address-a...|        3|          person-3|
|  4|person-4|  4|address-address-a...|        4|          person-4|
|  5|person-5|  5|address-address-a...|        5|          person-5|
|  6|person-6|  6|address-address-a...|        6|          person-6|
|  7|person-7|  7|address-address-a...|        7|          person-7|
|  8|person-8|  8|address-address-a...|        8|          person-8|
|  9|person-9|  9|address-address-a...|        9|          person-9|
+---+--------+---+--------------------+---------+------------------+
only showing top 10 rows19/06/24 09:35:50 INFO FileSourceStrategy: Pruning directories with:
19/06/24 09:35:50 INFO FileSourceStrategy: Post-Scan Filters: isnotnull(personid#28)
19/06/24 09:35:50 INFO FileSourceStrategy: Output Data Schema: struct<personid: int, name: string>
19/06/24 09:35:50 INFO FileSourceScanExec: Pushed Filters: IsNotNull(personid)
19/06/24 09:35:50 INFO FileSourceStrategy: Pruning directories with:
19/06/24 09:35:50 INFO FileSourceStrategy: Post-Scan Filters: isnotnull(id#10)
19/06/24 09:35:50 INFO FileSourceStrategy: Output Data Schema: struct<id: int, name: string, age: int, address: string ... 2 more fields>
19/06/24 09:35:50 INFO FileSourceScanExec: Pushed Filters: IsNotNull(id)
== Physical Plan ==
*(2) Project [id#10, name#11, age#12, address#13, personid#28 AS person_id#94, name#29 AS persion_order_name#95]
+- *(2) BroadcastHashJoin [personid#28], [id#10], Inner, BuildRight:- *(2) Project [personid#28, name#29]:  +- *(2) Filter isnotnull(personid#28):     +- *(2) FileScan csv [personid#28,name#29] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/personOrder.csv], PartitionFilters: [], PushedFilters: [IsNotNull(personid)], ReadSchema: struct<personid:int,name:string>+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))+- *(1) Project [id#10, name#11, age#12, address#13]+- *(1) Filter isnotnull(id#10)+- *(1) FileScan csv [id#10,name#11,age#12,address#13] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/E:/person.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:int,name:string,age:int,address:string>
19/06/24 09:35:50 INFO SparkContext: Invoking stop() from shutdown hook

转载于:https://www.cnblogs.com/yy3b2007com/p/9776521.html

Spark中使用Dataset的groupBy/agg/join/broadcast hasjoin/sql broadcast hashjoin示例(java api)相关推荐

  1. Spark中RDD、DataFrame和DataSet的区别与联系

    一.RDD.DataFrame和DataSet的定义 在开始Spark RDD与DataFrame与Dataset之间的比较之前,先让我们看一下Spark中的RDD,DataFrame和Dataset ...

  2. 在Spark中自定义Kryo序列化输入输出API(转)

    原文链接:在Spark中自定义Kryo序列化输入输出API 在Spark中内置支持两种系列化格式:(1).Java serialization:(2).Kryo serialization.在默认情况 ...

  3. spark中dataframe解析_SparkSql 中 JOIN的实现

    Join作为SQL中一个重要语法特性,几乎所有稍微复杂一点的数据分析场景都离不开Join,如今Spark SQL(Dataset/DataFrame)已经成为Spark应用程序开发的主流,作为开发者, ...

  4. ML之FE:pandas库中数据分析利器之groupby分组函数、agg聚合函数、同时使用groupby与agg函数组合案例之详细攻略

    ML之FE:pandas库中数据分析利器之groupby分组函数.agg聚合函数.同时使用groupby与agg函数组合案例之详细攻略 目录 pandas库中数据分析利器之groupby分组函数.ag ...

  5. Spark 浅谈Spark中的各种join

    众所周知,Join的种类丰富: 按照**关联形式(**Join type)划分: 有内关联,外关联,左关联,右关联,半关联,逆关联等,由业务逻辑决定的关联形式决定了Spark任务的运行结果; 按照关联 ...

  6. Spark中RDD与DataFrame与DataSet的区别与联系

    1.概述 这是一个面试题 在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似传统数据库中的二维表格 DataFrame与RDD的主要区别在于,前者带有schema元数据信息,既 ...

  7. Spark中,RDD概述(五大属性,弹性介绍,5个特性)

    1 什么是RDD RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark 中最基本的数据抽象,它代表一个不可变.可分区.里面的元素可并行计算的集合.在Spa ...

  8. spark中local模式与cluster模式使用场景_Spark 知识点 ( 架构 RDD Task )

    1. Spark介绍 Apache Spark是一个围绕速度.易用性和复杂分析构建的大数据处理框架,最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一 ...

  9. Apache Spark中实现的MapReduce设计模式

    该博客是该系列文章的第一篇,讨论了MapReduce设计模式一书中的一些设计模式,并展示了如何在Apache Spark(R)中实现这些模式. 在编写MapReduce或Spark程序时,考虑执行作业 ...

  10. Spark中DataFrame 基本操作函数

    DataFrame的基本操作函数 原文链接 https://blog.csdn.net/u010003835/article/details/106436091?utm_medium=distribu ...

最新文章

  1. SAP WM中阶Storage Type的Capacity Check – Check based on maximum quantity per bin in storage type.
  2. dubbo学习之dubbo管理控制台装配及集成zookeeper集群部署(1)【转】
  3. hdu4081 最小树+DFS或者次小树的变形
  4. 计算机网络-IP地址的分类
  5. 牛客练习赛36 F-Rabbit的蛋糕 (叉积求面积, 记录前缀)
  6. 推荐六款帮助你实现惊艳视差滚动效果的 jQuery 插件
  7. Java EE重新审视设计模式:异步
  8. 《又见一帘幽梦》高清视频
  9. DateTime字段控件值显示短格式的做法
  10. 两个栈实现队列,两个队列实现栈
  11. C++ std::condition_variable 是什么 有什么用 条件变量 线程同步 wait wait_for notify_one notify_all
  12. 在EditPlus中配置java快捷键
  13. 服务器安全防护措施有哪些?
  14. 超全面UI基础设计规范
  15. 关于VS.NET中多个项目的工程相互引用和多个dll引用的问题! - antony--异域空间 - 博客园
  16. 201809-4 再卖菜 ccf
  17. 例题 - 最近公共祖先 - 离线算法
  18. Android Native 代码 Release 编译 - 隐藏符号表
  19. 用VC实现Html编辑器
  20. 【转】完美解决iphone连电脑蓝牙出现bluetooth外围设备无法正确安装

热门文章

  1. 正向混合云和反向混合云解析
  2. Linux中断 - GIC代码分析
  3. Android基础篇1:Activity
  4. win7 更新失败解决方案一
  5. 移动端1px细线问题
  6. C/C++/Objective-C经典书籍推荐
  7. 转:面试题收集——Java基础部分(一)
  8. 利用栈将html源码解析为节点树
  9. Web.config常用节点解析:
  10. 考前必背的50个知识点——系统集成项目管理工程师考试