2019独角兽企业重金招聘Python工程师标准>>>

![TOC]

一、 背景

目前实时数仓需要对多张表进行关联聚合等复杂操作, 为了深度理解 Spark Structured Streaming 中聚合、输出模式(complete、append、update)、窗口操作(window)以及水印(watermark)相关操作;并能让团队熟练在实时数仓中使用聚合和窗口相关的操作,现做出以下总结。文中以基本的 wordcount 为例。以聚合为基准,与输出模式、窗口以及水印结合交叉。streaming 数据源为 socket。

关于Structured Streaming 详细介绍,可以查看Spark Structured Streaming官网,这里只针对聚合、窗口以及水印相关操作举例介绍。

文中纯属个人理解,不免有不当之处,万望给予指正,不胜感激。

二、 聚合

聚合只是纯粹的聚合操作,无窗口和水印操作。

1. complete 模式

核心代码如下:

 import spark.implicits._val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()// 数据行转 wordval words = lines.as[String].flatMap(_.split(" ")).selectExpr("value as word")val wordCounts = words.groupBy($"word").agg("cn" -> "sum").withColumnRenamed("sum(cn)", "wc")// 开启 query,将数据输出到控制台val query = wordCounts.writeStream.outputMode("complete").format("console").option("truncate", false).start()query.awaitTermination()

运行结果

可以看出,complete 模式下,会把所有的数据都输出,也就意味着,streaming 会保存所有 batch 的状态数据,并根据后续 batch 旧的状态数据做更新。

2. update 模式

核心代码

val query = wordCounts.writeStream.outputMode("update").format("console").option("truncate", false).start()

运行结果

可以看出,update 模式下, 只会将新增以及状态改变的数据输出,同样streaming 会保存所有 batch 的状态数据,并根据后续 batch 旧的状态数据做更新。

3. append 模式

聚合状态下,不使用水印操作,无法使用该模式。

三、聚合+窗口

窗口大小统一为5分钟,每三分钟滑动一次。

1. complete 模式

核心代码

import spark.implicits._val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()// 数据行转 wordval words = lines.as[String].flatMap(_.split(" ")).selectExpr("value as word").withColumn("timestamp", current_timestamp())//时间戳.withColumn("cn", lit(1))val wordCounts = words.groupBy(window($"timestamp", "5 minutes", "3 minutes"),$"word").agg("cn" -> "sum").withColumnRenamed("sum(cn)", "wc")// 开启 query,将数据输出到控制台val query = wordCounts.writeStream.outputMode("append").format("console").option("truncate", false).start()query.awaitTermination()

运行结果

complete 模式下,会对所有窗口数据输出,只会对自己窗口内的事件做聚合,如果某个窗口没有到达事件,输出忽略该窗口。

2. update 模式

update 模式下,只会输出当前事件落到的窗口,并且是新增或者更新的 key 才会被输出,其余窗口不被输出。

运行结果

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|1  |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|world|1  |
|[2019-04-17 23:39:00, 2019-04-17 23:44:00]|hello|1  |
|[2019-04-17 23:39:00, 2019-04-17 23:44:00]|world|1  |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|2  |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|spark|1  |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|3  |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|cat  |1  |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|4  |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|sprk |1  |
|[2019-04-17 23:45:00, 2019-04-17 23:50:00]|hello|1  |
|[2019-04-17 23:45:00, 2019-04-17 23:50:00]|sprk |1  |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|5  |
|[2019-04-17 23:45:00, 2019-04-17 23:50:00]|hello|2  |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|spark|2  |
|[2019-04-17 23:45:00, 2019-04-17 23:50:00]|spark|1  |
+------------------------------------------+-----+---+

3. append 模式

没有添加watermark,不支持。

三、聚合+窗口+水印

首先说下 watermark 水印是什么,watermark 是为了针对处理迟到数据的机制,所谓迟到,举个例子,12:00这个时刻该到的数据,在12:30到达,该如何处理?watermark 提供了一个时间阈值,简单的理解可以说是等待一个 watermark 时间,在"此时间"前的将被抛弃不处理,在"此时间"后的将被正常处理。在 window 操作中,是以当前最大事件时间为基础,减去 watermark 阈值得到"此时间",即***每次窗口滑动***都会重新计算。这里不做详细介绍,详细介绍请查看 Spark 官网,有详细的图表介绍:Handle late data and watermark

窗口大小统一为5分钟,每三分钟滑动一次,水印长度三分钟。

具体代码,不同模式修改 outputMode 即可。

         import spark.implicits._val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()val words = lines.as[String].selectExpr("value").map(line => {val arr = line.mkString.split(",")WordBean(arr(0), DateUtils.parseSqlTimestamp(arr(1)))}).withColumn("cn", lit(1)).withWatermark("timestamp", "3 minutes").groupBy(window($"timestamp", "5 minutes", "3 minutes"),$"word").agg("cn" -> "sum").withColumnRenamed("sum(cn)", "wc")val query = words.writeStream.outputMode("update").format("console").option("truncate", false).start()query.awaitTermination()case class WordBean(word: String, timestamp: Timestamp)

1. complete 模式

complete 模式下输出所有窗口,watermark 无效。省略演示。

2. update 模式

在update 模式下,过期数据将被清除。

运行结果

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|hello|1  |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|world|1  |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:00:00, 2019-04-19 09:05:00]|hello|1  |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:06:00, 2019-04-19 09:11:00]|world|1  |
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|world|2  |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:00:00, 2019-04-19 09:05:00]|hello|2  |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 5
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:06:00, 2019-04-19 09:11:00]|world|2  |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 6
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+-------------------------------------------
Batch: 7
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:09:00, 2019-04-19 09:14:00]|spark|1  |
|[2019-04-19 09:06:00, 2019-04-19 09:11:00]|spark|1  |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 8
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|hello|2  |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 9
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:12:00, 2019-04-19 09:17:00]|spark|1  |
|[2019-04-19 09:09:00, 2019-04-19 09:14:00]|spark|2  |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 10
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+
-------------------------------------------
Batch: 11
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:06:00, 2019-04-19 09:11:00]|world|3  |
+------------------------------------------+-----+---+

update 模式下,基本和 window 下输出结果一样,值得注意的是水印产生的效果,观察结果,batch 6时我们试图更新[2019-04-19 09:00:00, 2019-04-19 09:05:00]窗口的值,但是得到确实空的输出,这是因为我们在这之前的 batch 将事件最大值更新到了2019-04-19 09:08:00,水印长度三分钟,此时[2019-04-19 09:00:00, 2019-04-19 09:05:00]窗口的状态已经被清除,将得不到更新;同样在 batch10时,我们想更新[2019-04-19 09:03:00, 2019-04-19 09:08:00]的值,也无法实现;尤其观察batch 11 最后一条数据"world,2019-04-19 09:07:00",数据可以落在[2019-04-19 09:06:00, 2019-04-19 09:11:00]和[2019-04-19 09:03:00, 2019-04-19 09:08:00],但是结果只输出了[2019-04-19 09:06:00, 2019-04-19 09:11:00]窗口的数据。

在update 模式下,过期数据将被清除。

3. append 模式

append 模式应该最值得考究的模式了,初用时可能会觉得神奇,为什么没有数据?明明 source 端发了很多批次,就是看不到数据,尤其是程序中窗口和水印长度都设置很长的话,可能不得不怀疑是不是自己代码写错了?官网案例写错了?

实则不然,update 模式下的机制是:必须在确认某个窗口不在更新时才会输出该窗口,即过了水印长度时间。所以设置了数小时窗口的就耐心等待结果吧,如果后续没有新的事件,那么你可以关机睡觉觉了,因为你永远也看不到你想看到的结果。下面做演示。

-------------------------------------------
Batch: 0
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+-------------------------------------------
Batch: 1
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+-------------------------------------------
Batch: 2
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 08:57:00, 2019-04-19 09:02:00]|hello|1  |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 4
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+-------------------------------------------
Batch: 5
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+-------------------------------------------
Batch: 6
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:00:00, 2019-04-19 09:05:00]|hello|2  |
+------------------------------------------+-----+---+-------------------------------------------
Batch: 7
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+-------------------------------------------
Batch: 8
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|hello|2  |
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|world|1  |
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|spark|1  |
+------------------------------------------+-----+---+

注意,在滑动窗口跟新水印时,即便是确定前面窗口不再更新也不会立即输出,需要等到下一个 batch 触发,才会安全的输出,上述输出中[2019-04-19 08:57:00, 2019-04-19 09:02:00]|,输入的前三个 batch,窗口滑动了两次,在第二个窗口输入两个 batch,[2019-04-19 08:57:00, 2019-04-19 09:02:00]|的数据才被输出;

另外,窗口一旦被输出,将会被清理,后续不会被更新。

四、总结

以上,即是聚合、窗口、水印以及各个输出模式结合过程。

转载于:https://my.oschina.net/u/780876/blog/3046823

Spark Structured Steaming 聚合、watermark 以及 window操作,结合输出模式的研究总结相关推荐

  1. Spark Structured Steaming实战

    Spark Structured Steaming Spark Structured Streaming 简介 什么是 Spark Structured Streaming Structured St ...

  2. Spark2.3(三十五)Spark Structured Streaming源代码剖析(从CSDN和Github中看到别人分析的源代码的文章值得收藏)...

    从CSDN中读取到关于spark structured streaming源代码分析不错的几篇文章 spark源码分析--事件总线LiveListenerBus spark事件总线的核心是LiveLi ...

  3. 【Spark Streaming】(四)基于 Spark Structured Streaming 的开发与数据处理

    文章目录 一.前言 二.Spark Streaming vs Structured Streaming 2.1 Spark Streaming 2.2 Structured Streaming 2.3 ...

  4. Spark Structured Streaming概述

    Spark Structured Streaming概述 结构化流(Structured Streaming)是基于Spark SQL引擎的流处理引擎,它具有可扩展和容错性.可以使用类似批数据处理的表 ...

  5. spark业务开发-聚合(agg)

    spark业务开发-聚合(agg) 项目地址:https://gitee.com/cch-bigdata/spark-process.git 输入数据 name,profession,enroll,s ...

  6. Spark SQL(二)之DataSet操作

    一.创建DataSet 使用SparkSession,应用程序可以从现有的RDD,Hive表的或Spark数据源创建DataFrame . (1)基于JSON的内容创建一个DataFrame //hd ...

  7. Spark Structured : HIve jdbc方式访问待下划线的表,找不到表的错误

    1.背景 Spark Structured : HIve jdbc方式访问待下划线的表,找不到表的错误 > select * from default._xd_after limit 1; &g ...

  8. Spark: Structured JDBC 方式访问远程的高可用HA的HIVE

    1. 背景 Spark: Structured JDBC 方式访问远程的高可用HA的HIVE,hive是看高可用的,连接信息如下 jdbc:hive2://xx.cdh1.test.dtwave.in ...

  9. Spark: Structured + hive(Jdbc方式)卡死

    1.背景 Spark: Structured + hive(Jdbc方式)卡死,运行入下 19/11/04 15:50:48 INFO metadata.Hive: Registering funct ...

  10. Spark Structured : java.lang.RuntimeException: Offsets committed out of order: 2 followed by 0

    文章目录 1. 背景 2.定位问题 1. 背景 测试一个Spark Structured Streaming 写入到RocketMQ,报错. 首先采用socket写入控制台是没问题的. case cl ...

最新文章

  1. 跑yolo3模型出的效果图_效果图和效果图设计到底有什么区别?区别大着呢,亲……...
  2. Dotnet core基于ML.net的销售数据预测实践
  3. powershell 查看系统设备\device status
  4. python抓有趣的东西_Python 五个有趣的彩蛋,你都知道吗?
  5. 红米K20 Pro拍照样张出炉:后置4800万像素高清三摄
  6. word上怎么把图片拼接到一起_word怎么将图片合并
  7. 戴尔_从偶像派到实力派【我身边的戴尔企业级解决方案】
  8. IOS版应用商店应用源码
  9. 小程序如何自定义组件
  10. mybatis与spring整合详解
  11. Qt 实现PC端网易云音乐界面
  12. Cuda相关的函数数
  13. 大话设计模式 第二章 策略模式购物车价格查询
  14. 关于java的毕业设计_我的java毕业设计之路回顾
  15. vscode编辑器搜索小技巧
  16. 使用管程法解决生产者消费者问题
  17. mysql 退出数据库_mysql怎样退出使用数据库
  18. 通过Kali Linux暴力破解WiFi密码
  19. WiFI 扫描时选择固定频率(只扫2.4G或者5G的AP)
  20. java我的世界w_Minecraft我的世界Java版19w02a已发布

热门文章

  1. 使用GIT来管理代码的心得
  2. error:“Unexpected namespace prefix xmlns found for tag LinearLayout”
  3. 页面报错webform_postbackoptions未定义
  4. 讨论String与string的区别.
  5. 【学习笔记】算法导论基础知识1.1
  6. 汇编程序16位带符号变量计算
  7. EO CAT软件下载数据
  8. Sentinel-1 影像与精轨数据下载(经常更新中)
  9. Nginx+PHP+MySQL+Ubuntu14.04 64位环境搭建
  10. c++中new和delete