Spark SQL的聚合函数中有first, last函数,从字面意思就是根据分组获取第一条和最后一条记录的值,实际上,只在local模式下,你可以得到满意的答案,但是在生产环境(分布式)时,这个是不能保证的。看源码的解释:

/*** Returns the first value of `child` for a group of rows. If the first value of `child`* is `null`, it returns `null` (respecting nulls). Even if [[First]] is used on an already* sorted column, if we do partial aggregation and final aggregation (when mergeExpression* is used) its result will not be deterministic (unless the input table is sorted and has* a single partition, and we use a single reducer to do the aggregation.).*/

如何保证first, last是有效呢?表要排好序的,同时只能用一个分区处理,再用一个reducer来聚合。。。

所以,在多分区场景不能用first, last函数求得聚合的第一条和最后一条数据。

解决方案:利用Window。

val spark = SparkSession.builder().master("local").appName("Demo").getOrCreate()import spark.implicits._
val df = Seq(("a", 10, 12345), ("a", 12, 34567), ("a", 11, 23456), ("b", 10, 55555), ("b", 8, 12348)).toDF("name", "value", "event_time")// 定义window
val asc = Window.partitionBy("name").orderBy($"event_time")
val desc = Window.partitionBy("name").orderBy($"event_time".desc)// 根据window生成row_number,根据row_number获取对应的数据
val firstValue = df.withColumn("rn", row_number().over(asc)).where($"rn" === 1).drop("rn")
val lastValue = df.withColumn("rn", row_number().over(desc)).where($"rn" === 1).drop("rn")// 利用join把数据聚合一起
df.groupBy("name").count().as("t1").join(firstValue.as("fv"), "name").join(lastValue.as("lv"), "name").select($"t1.name", $"fv.value".as("first_value"), $"lv.value".as("last_value"), $"t1.count").show()

输出:

+----+-----------+----------+-----+
|name|first_value|last_value|count|
+----+-----------+----------+-----+
|   b|          8|        10|    2|
|   a|         10|        12|    3|
+----+-----------+----------+-----+

Spark first, last函数的坑相关推荐

  1. 自定义实现spark的分区函数

    有时自己的业务需要自己实现spark的分区函数 以下代码是实现一个自定义spark分区的demo 实现的功能是根据key值的最后一位数字,写到不同的文件 例如: 10写入到part-00000 11写 ...

  2. spark中flatMap函数用法

    说明 在spark中map函数和flatMap函数是两个比较常用的函数.其中  map:对集合中每个元素进行操作.  flatMap:对集合中每个元素进行操作然后再扁平化.  理解扁平化可以举个简单例 ...

  3. 封包时发现的关于QIODevice类write函数的坑

    关于QIODevice类write函数的坑 问题概述 问题部分代码 问题解决 结论 问题概述 这两天在做TCP通信的封包解包协议操作时,不经意间被write函数坑了好久.通过内存复制进行数据封包,在写 ...

  4. Spark编写UDF函数案例

    Spark编写UDF函数案例 一.前述 二.UDF函数 需求:将orders表中 order_dow和order_number进行求和 一.前述 SparkSql中自定义函数包括UDF和UDAF UD ...

  5. [转载] python 列表List中index函数的坑

    参考链接: Python列表list sort() python 列表List中index函数的坑 例如 a = [1, 2, 1] 如果使用 a.index(1), 输出的只是列表中第一个出现的 1 ...

  6. spark SQL自定义函数:

    spark SQL 自定义函数: 自定义函数: 第一种:  U D F  (用户自定义函数)函数 特点:  一对一的关系,输入一个值以后输出一个值  (一进一出) 大部分的内置函数都是U D F函数 ...

  7. Spark SQL 开窗函数row_number的使用

    Spark SQL 开窗函数row_number的使用 窗口函数 row_number即为分组取topN 参考文本:                   型号                      ...

  8. Spark UDF用户自定义函数

    自定义一个函数实现查询字符串长度.首先创建测试的DataFrame: val spark = SparkSession.builder().master("local").appN ...

  9. Spark _27_自定义函数UDF和UDAF

    UDF:用户自定义函数. 可以自定义类实现UDFX接口. javaAPI: package com.udf;import javafx.scene.chart.PieChart; import org ...

最新文章

  1. 李开复对话彭特兰:AI 不是单打独斗,应避免 AI 冷战!
  2. plt.xlabel 'str' object is not callable
  3. 【CoppeliaSim】远程 API 之 Python 控制,对比 V-rep 有些不同
  4. VTK:PolyData之IterateOverLines
  5. python编程神器下载_Python编程神器 -程序员必备开发手册
  6. openmpi安装_Intel Parallel Studio XE 2019安装设置
  7. linux qt应用程序全屏,QT中MDI应用程序中更改子窗口大小或是全屏显示子窗口的方法...
  8. 怎样追求一个你喜欢的人?
  9. 【学习总结】Git学习-参考廖雪峰老师教程三-创建版本库
  10. OCP 12c最新考试原题及答案(071-7)
  11. SQL中inner join、outer join和cross join的区别
  12. vue中使用echarts地图
  13. Thinkpad笔记本电池保养
  14. linux命令行 jdb,什么使用加多宝(jdb)在linux下调试Java程序
  15. 前端面试技巧和注意事项_前端HR的面试套路,你懂几个?
  16. HTML5期末大作业:电影网站设计——电影资讯博客(5页) HTML+CSS+JavaScript 学生DW网页设计作业成品 web课程设计网页规划与设计 web学生网页设计作业源码
  17. 2 snippets vue 修改配置_VSCode 自定义Vue snippets, 快速生成Vue模板
  18. Flink集群部署OnYarn模式
  19. 阿里云服务器搭建和宝塔面板连接
  20. 数学建模算法学习笔记

热门文章

  1. 广州哪个大学大一计算机学ps,紧急!广东12所“野鸡大学”名单曝光!广州人千万别上当!...
  2. 【统一身份认证】——概念扫盲
  3. Visio Viewer 无法打开 VSD文件
  4. 一种客户端即时通信数据的加密和解密方法
  5. 高新技术产业的股权设计原则
  6. 天使玩偶(CDQ分治+最小曼哈顿距离)
  7. PDK工艺库安装总结
  8. 21世纪世界国土面积排名
  9. python3模拟抓狐狸小游戏
  10. Transaction rollback