Spark Structured Steaming 聚合、watermark 以及 window操作,结合输出模式的研究总结
2019独角兽企业重金招聘Python工程师标准>>>
![TOC]
一、 背景
目前实时数仓需要对多张表进行关联聚合等复杂操作, 为了深度理解 Spark Structured Streaming 中聚合、输出模式(complete、append、update)、窗口操作(window)以及水印(watermark)相关操作;并能让团队熟练在实时数仓中使用聚合和窗口相关的操作,现做出以下总结。文中以基本的 wordcount 为例。以聚合为基准,与输出模式、窗口以及水印结合交叉。streaming 数据源为 socket。
关于Structured Streaming 详细介绍,可以查看Spark Structured Streaming官网,这里只针对聚合、窗口以及水印相关操作举例介绍。
文中纯属个人理解,不免有不当之处,万望给予指正,不胜感激。
二、 聚合
聚合只是纯粹的聚合操作,无窗口和水印操作。
1. complete 模式
核心代码如下:
import spark.implicits._val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()// 数据行转 wordval words = lines.as[String].flatMap(_.split(" ")).selectExpr("value as word")val wordCounts = words.groupBy($"word").agg("cn" -> "sum").withColumnRenamed("sum(cn)", "wc")// 开启 query,将数据输出到控制台val query = wordCounts.writeStream.outputMode("complete").format("console").option("truncate", false).start()query.awaitTermination()
运行结果
可以看出,complete 模式下,会把所有的数据都输出,也就意味着,streaming 会保存所有 batch 的状态数据,并根据后续 batch 旧的状态数据做更新。
2. update 模式
核心代码
val query = wordCounts.writeStream.outputMode("update").format("console").option("truncate", false).start()
运行结果
可以看出,update 模式下, 只会将新增以及状态改变的数据输出,同样streaming 会保存所有 batch 的状态数据,并根据后续 batch 旧的状态数据做更新。
3. append 模式
聚合状态下,不使用水印操作,无法使用该模式。
三、聚合+窗口
窗口大小统一为5分钟,每三分钟滑动一次。
1. complete 模式
核心代码
import spark.implicits._val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()// 数据行转 wordval words = lines.as[String].flatMap(_.split(" ")).selectExpr("value as word").withColumn("timestamp", current_timestamp())//时间戳.withColumn("cn", lit(1))val wordCounts = words.groupBy(window($"timestamp", "5 minutes", "3 minutes"),$"word").agg("cn" -> "sum").withColumnRenamed("sum(cn)", "wc")// 开启 query,将数据输出到控制台val query = wordCounts.writeStream.outputMode("append").format("console").option("truncate", false).start()query.awaitTermination()
运行结果
complete 模式下,会对所有窗口数据输出,只会对自己窗口内的事件做聚合,如果某个窗口没有到达事件,输出忽略该窗口。
2. update 模式
update 模式下,只会输出当前事件落到的窗口,并且是新增或者更新的 key 才会被输出,其余窗口不被输出。
运行结果
-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+-----+---+
|window |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|1 |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|world|1 |
|[2019-04-17 23:39:00, 2019-04-17 23:44:00]|hello|1 |
|[2019-04-17 23:39:00, 2019-04-17 23:44:00]|world|1 |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+---+
|window |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|2 |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|spark|1 |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+-----+---+
|window |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|3 |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|cat |1 |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+-----+---+
|window |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|4 |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|sprk |1 |
|[2019-04-17 23:45:00, 2019-04-17 23:50:00]|hello|1 |
|[2019-04-17 23:45:00, 2019-04-17 23:50:00]|sprk |1 |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+-----+---+
|window |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|5 |
|[2019-04-17 23:45:00, 2019-04-17 23:50:00]|hello|2 |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|spark|2 |
|[2019-04-17 23:45:00, 2019-04-17 23:50:00]|spark|1 |
+------------------------------------------+-----+---+
3. append 模式
没有添加watermark,不支持。
三、聚合+窗口+水印
首先说下 watermark 水印是什么,watermark 是为了针对处理迟到数据的机制,所谓迟到,举个例子,12:00这个时刻该到的数据,在12:30到达,该如何处理?watermark 提供了一个时间阈值,简单的理解可以说是等待一个 watermark 时间,在"此时间"前的将被抛弃不处理,在"此时间"后的将被正常处理。在 window 操作中,是以当前最大事件时间为基础,减去 watermark 阈值得到"此时间",即***每次窗口滑动***都会重新计算。这里不做详细介绍,详细介绍请查看 Spark 官网,有详细的图表介绍:Handle late data and watermark
窗口大小统一为5分钟,每三分钟滑动一次,水印长度三分钟。
具体代码,不同模式修改 outputMode 即可。
import spark.implicits._val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()val words = lines.as[String].selectExpr("value").map(line => {val arr = line.mkString.split(",")WordBean(arr(0), DateUtils.parseSqlTimestamp(arr(1)))}).withColumn("cn", lit(1)).withWatermark("timestamp", "3 minutes").groupBy(window($"timestamp", "5 minutes", "3 minutes"),$"word").agg("cn" -> "sum").withColumnRenamed("sum(cn)", "wc")val query = words.writeStream.outputMode("update").format("console").option("truncate", false).start()query.awaitTermination()case class WordBean(word: String, timestamp: Timestamp)
1. complete 模式
complete 模式下输出所有窗口,watermark 无效。省略演示。
2. update 模式
在update 模式下,过期数据将被清除。
运行结果
-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+-----+---+
|window |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|hello|1 |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+---+
|window |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|world|1 |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+-----+---+
|window |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:00:00, 2019-04-19 09:05:00]|hello|1 |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+-----+---+
|window |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:06:00, 2019-04-19 09:11:00]|world|1 |
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|world|2 |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+-----+---+
|window |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:00:00, 2019-04-19 09:05:00]|hello|2 |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 5
-------------------------------------------
+------------------------------------------+-----+---+
|window |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:06:00, 2019-04-19 09:11:00]|world|2 |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 6
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+-------------------------------------------
Batch: 7
-------------------------------------------
+------------------------------------------+-----+---+
|window |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:09:00, 2019-04-19 09:14:00]|spark|1 |
|[2019-04-19 09:06:00, 2019-04-19 09:11:00]|spark|1 |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 8
-------------------------------------------
+------------------------------------------+-----+---+
|window |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|hello|2 |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 9
-------------------------------------------
+------------------------------------------+-----+---+
|window |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:12:00, 2019-04-19 09:17:00]|spark|1 |
|[2019-04-19 09:09:00, 2019-04-19 09:14:00]|spark|2 |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 10
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+
-------------------------------------------
Batch: 11
-------------------------------------------
+------------------------------------------+-----+---+
|window |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:06:00, 2019-04-19 09:11:00]|world|3 |
+------------------------------------------+-----+---+
update 模式下,基本和 window 下输出结果一样,值得注意的是水印产生的效果,观察结果,batch 6时我们试图更新[2019-04-19 09:00:00, 2019-04-19 09:05:00]窗口的值,但是得到确实空的输出,这是因为我们在这之前的 batch 将事件最大值更新到了2019-04-19 09:08:00,水印长度三分钟,此时[2019-04-19 09:00:00, 2019-04-19 09:05:00]窗口的状态已经被清除,将得不到更新;同样在 batch10时,我们想更新[2019-04-19 09:03:00, 2019-04-19 09:08:00]的值,也无法实现;尤其观察batch 11 最后一条数据"world,2019-04-19 09:07:00",数据可以落在[2019-04-19 09:06:00, 2019-04-19 09:11:00]和[2019-04-19 09:03:00, 2019-04-19 09:08:00],但是结果只输出了[2019-04-19 09:06:00, 2019-04-19 09:11:00]窗口的数据。
在update 模式下,过期数据将被清除。
3. append 模式
append 模式应该最值得考究的模式了,初用时可能会觉得神奇,为什么没有数据?明明 source 端发了很多批次,就是看不到数据,尤其是程序中窗口和水印长度都设置很长的话,可能不得不怀疑是不是自己代码写错了?官网案例写错了?
实则不然,update 模式下的机制是:必须在确认某个窗口不在更新时才会输出该窗口,即过了水印长度时间。所以设置了数小时窗口的就耐心等待结果吧,如果后续没有新的事件,那么你可以关机睡觉觉了,因为你永远也看不到你想看到的结果。下面做演示。
-------------------------------------------
Batch: 0
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+-------------------------------------------
Batch: 1
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+-------------------------------------------
Batch: 2
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+-----+---+
|window |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 08:57:00, 2019-04-19 09:02:00]|hello|1 |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 4
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+-------------------------------------------
Batch: 5
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+-------------------------------------------
Batch: 6
-------------------------------------------
+------------------------------------------+-----+---+
|window |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:00:00, 2019-04-19 09:05:00]|hello|2 |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 7
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+-------------------------------------------
Batch: 8
-------------------------------------------
+------------------------------------------+-----+---+
|window |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|hello|2 |
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|world|1 |
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|spark|1 |
+------------------------------------------+-----+---+
注意,在滑动窗口跟新水印时,即便是确定前面窗口不再更新也不会立即输出,需要等到下一个 batch 触发,才会安全的输出,上述输出中[2019-04-19 08:57:00, 2019-04-19 09:02:00]|,输入的前三个 batch,窗口滑动了两次,在第二个窗口输入两个 batch,[2019-04-19 08:57:00, 2019-04-19 09:02:00]|的数据才被输出;
另外,窗口一旦被输出,将会被清理,后续不会被更新。
四、总结
以上,即是聚合、窗口、水印以及各个输出模式结合过程。
转载于:https://my.oschina.net/u/780876/blog/3046823
Spark Structured Steaming 聚合、watermark 以及 window操作,结合输出模式的研究总结相关推荐
- Spark Structured Steaming实战
Spark Structured Steaming Spark Structured Streaming 简介 什么是 Spark Structured Streaming Structured St ...
- Spark2.3(三十五)Spark Structured Streaming源代码剖析(从CSDN和Github中看到别人分析的源代码的文章值得收藏)...
从CSDN中读取到关于spark structured streaming源代码分析不错的几篇文章 spark源码分析--事件总线LiveListenerBus spark事件总线的核心是LiveLi ...
- 【Spark Streaming】(四)基于 Spark Structured Streaming 的开发与数据处理
文章目录 一.前言 二.Spark Streaming vs Structured Streaming 2.1 Spark Streaming 2.2 Structured Streaming 2.3 ...
- Spark Structured Streaming概述
Spark Structured Streaming概述 结构化流(Structured Streaming)是基于Spark SQL引擎的流处理引擎,它具有可扩展和容错性.可以使用类似批数据处理的表 ...
- spark业务开发-聚合(agg)
spark业务开发-聚合(agg) 项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据 name,profession,enroll,s ...
- Spark SQL(二)之DataSet操作
一.创建DataSet 使用SparkSession,应用程序可以从现有的RDD,Hive表的或Spark数据源创建DataFrame . (1)基于JSON的内容创建一个DataFrame //hd ...
- Spark Structured : HIve jdbc方式访问待下划线的表,找不到表的错误
1.背景 Spark Structured : HIve jdbc方式访问待下划线的表,找不到表的错误 > select * from default._xd_after limit 1; &g ...
- Spark: Structured JDBC 方式访问远程的高可用HA的HIVE
1. 背景 Spark: Structured JDBC 方式访问远程的高可用HA的HIVE,hive是看高可用的,连接信息如下 jdbc:hive2://xx.cdh1.test.dtwave.in ...
- Spark: Structured + hive(Jdbc方式)卡死
1.背景 Spark: Structured + hive(Jdbc方式)卡死,运行入下 19/11/04 15:50:48 INFO metadata.Hive: Registering funct ...
- Spark Structured : java.lang.RuntimeException: Offsets committed out of order: 2 followed by 0
文章目录 1. 背景 2.定位问题 1. 背景 测试一个Spark Structured Streaming 写入到RocketMQ,报错. 首先采用socket写入控制台是没问题的. case cl ...
最新文章
- 跑yolo3模型出的效果图_效果图和效果图设计到底有什么区别?区别大着呢,亲……...
- Dotnet core基于ML.net的销售数据预测实践
- powershell 查看系统设备\device status
- python抓有趣的东西_Python 五个有趣的彩蛋,你都知道吗?
- 红米K20 Pro拍照样张出炉:后置4800万像素高清三摄
- word上怎么把图片拼接到一起_word怎么将图片合并
- 戴尔_从偶像派到实力派【我身边的戴尔企业级解决方案】
- IOS版应用商店应用源码
- 小程序如何自定义组件
- mybatis与spring整合详解
- Qt 实现PC端网易云音乐界面
- Cuda相关的函数数
- 大话设计模式 第二章 策略模式购物车价格查询
- 关于java的毕业设计_我的java毕业设计之路回顾
- vscode编辑器搜索小技巧
- 使用管程法解决生产者消费者问题
- mysql 退出数据库_mysql怎样退出使用数据库
- 通过Kali Linux暴力破解WiFi密码
- WiFI 扫描时选择固定频率(只扫2.4G或者5G的AP)
- java我的世界w_Minecraft我的世界Java版19w02a已发布
热门文章
- 使用GIT来管理代码的心得
- error:“Unexpected namespace prefix xmlns found for tag LinearLayout”
- 页面报错webform_postbackoptions未定义
- 讨论String与string的区别.
- 【学习笔记】算法导论基础知识1.1
- 汇编程序16位带符号变量计算
- EO CAT软件下载数据
- Sentinel-1 影像与精轨数据下载(经常更新中)
- Nginx+PHP+MySQL+Ubuntu14.04 64位环境搭建
- c++中new和delete