Spark first, last函数的坑
Spark SQL的聚合函数中有first, last函数,从字面意思就是根据分组获取第一条和最后一条记录的值,实际上,只在local模式下,你可以得到满意的答案,但是在生产环境(分布式)时,这个是不能保证的。看源码的解释:
/*** Returns the first value of `child` for a group of rows. If the first value of `child`* is `null`, it returns `null` (respecting nulls). Even if [[First]] is used on an already* sorted column, if we do partial aggregation and final aggregation (when mergeExpression* is used) its result will not be deterministic (unless the input table is sorted and has* a single partition, and we use a single reducer to do the aggregation.).*/
如何保证first, last是有效呢?表要排好序的,同时只能用一个分区处理,再用一个reducer来聚合。。。
所以,在多分区场景不能用first, last函数求得聚合的第一条和最后一条数据。
解决方案:利用Window。
val spark = SparkSession.builder().master("local").appName("Demo").getOrCreate()import spark.implicits._
val df = Seq(("a", 10, 12345), ("a", 12, 34567), ("a", 11, 23456), ("b", 10, 55555), ("b", 8, 12348)).toDF("name", "value", "event_time")// 定义window
val asc = Window.partitionBy("name").orderBy($"event_time")
val desc = Window.partitionBy("name").orderBy($"event_time".desc)// 根据window生成row_number,根据row_number获取对应的数据
val firstValue = df.withColumn("rn", row_number().over(asc)).where($"rn" === 1).drop("rn")
val lastValue = df.withColumn("rn", row_number().over(desc)).where($"rn" === 1).drop("rn")// 利用join把数据聚合一起
df.groupBy("name").count().as("t1").join(firstValue.as("fv"), "name").join(lastValue.as("lv"), "name").select($"t1.name", $"fv.value".as("first_value"), $"lv.value".as("last_value"), $"t1.count").show()
输出:
+----+-----------+----------+-----+
|name|first_value|last_value|count|
+----+-----------+----------+-----+
| b| 8| 10| 2|
| a| 10| 12| 3|
+----+-----------+----------+-----+
Spark first, last函数的坑相关推荐
- 自定义实现spark的分区函数
有时自己的业务需要自己实现spark的分区函数 以下代码是实现一个自定义spark分区的demo 实现的功能是根据key值的最后一位数字,写到不同的文件 例如: 10写入到part-00000 11写 ...
- spark中flatMap函数用法
说明 在spark中map函数和flatMap函数是两个比较常用的函数.其中 map:对集合中每个元素进行操作. flatMap:对集合中每个元素进行操作然后再扁平化. 理解扁平化可以举个简单例 ...
- 封包时发现的关于QIODevice类write函数的坑
关于QIODevice类write函数的坑 问题概述 问题部分代码 问题解决 结论 问题概述 这两天在做TCP通信的封包解包协议操作时,不经意间被write函数坑了好久.通过内存复制进行数据封包,在写 ...
- Spark编写UDF函数案例
Spark编写UDF函数案例 一.前述 二.UDF函数 需求:将orders表中 order_dow和order_number进行求和 一.前述 SparkSql中自定义函数包括UDF和UDAF UD ...
- [转载] python 列表List中index函数的坑
参考链接: Python列表list sort() python 列表List中index函数的坑 例如 a = [1, 2, 1] 如果使用 a.index(1), 输出的只是列表中第一个出现的 1 ...
- spark SQL自定义函数:
spark SQL 自定义函数: 自定义函数: 第一种: U D F (用户自定义函数)函数 特点: 一对一的关系,输入一个值以后输出一个值 (一进一出) 大部分的内置函数都是U D F函数 ...
- Spark SQL 开窗函数row_number的使用
Spark SQL 开窗函数row_number的使用 窗口函数 row_number即为分组取topN 参考文本: 型号 ...
- Spark UDF用户自定义函数
自定义一个函数实现查询字符串长度.首先创建测试的DataFrame: val spark = SparkSession.builder().master("local").appN ...
- Spark _27_自定义函数UDF和UDAF
UDF:用户自定义函数. 可以自定义类实现UDFX接口. javaAPI: package com.udf;import javafx.scene.chart.PieChart; import org ...
最新文章
- 李开复对话彭特兰:AI 不是单打独斗,应避免 AI 冷战!
- plt.xlabel 'str' object is not callable
- 【CoppeliaSim】远程 API 之 Python 控制,对比 V-rep 有些不同
- VTK:PolyData之IterateOverLines
- python编程神器下载_Python编程神器 -程序员必备开发手册
- openmpi安装_Intel Parallel Studio XE 2019安装设置
- linux qt应用程序全屏,QT中MDI应用程序中更改子窗口大小或是全屏显示子窗口的方法...
- 怎样追求一个你喜欢的人?
- 【学习总结】Git学习-参考廖雪峰老师教程三-创建版本库
- OCP 12c最新考试原题及答案(071-7)
- SQL中inner join、outer join和cross join的区别
- vue中使用echarts地图
- Thinkpad笔记本电池保养
- linux命令行 jdb,什么使用加多宝(jdb)在linux下调试Java程序
- 前端面试技巧和注意事项_前端HR的面试套路,你懂几个?
- HTML5期末大作业:电影网站设计——电影资讯博客(5页) HTML+CSS+JavaScript 学生DW网页设计作业成品 web课程设计网页规划与设计 web学生网页设计作业源码
- 2 snippets vue 修改配置_VSCode 自定义Vue snippets, 快速生成Vue模板
- Flink集群部署OnYarn模式
- 阿里云服务器搭建和宝塔面板连接
- 数学建模算法学习笔记