目录

事件时间窗口分析

时间概念

​​​​​​​event-time

​​​​​​​延迟数据处理

​​​​​​​延迟数据

​​​​​​​Watermarking 水位

​​​​​​​官方案例演示


事件时间窗口分析

在SparkStreaming中窗口统计分析:Window Operation(设置窗口大小WindowInterval和滑动大小SlideInterval),按照Streaming 流式应用接收数据的时间进行窗口设计的,其实是不符合实际应用场景的。

例如,在物联网数据平台中,每个设备产生的数据,其中包含数据产生的时间,然而数据需要经过一系列采集传输才能被流式计算框架处理:SparkStreaming,此过程需要时间的,再按照处理时间来统计业务的时候,准确性降低,存在不合理性。

在结构化流Structured Streaming中窗口数据统计时间是基于数据本身事件时间EventTime字段统计,更加合理性,官方文档:

http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#window-operations-on-event-time

​​​​​​​时间概念

在Streaming流式数据处理中,按照时间处理数据,其中时间有三种概念:

1)、事件时间EventTime,表示数据本身产生的时间,该字段在数据本身中;

2)、注入时间IngestionTime,表示数据到达流式系统时间,简而言之就是流式处理系统接收到数据的时间;

3)、处理时间ProcessingTime,表示数据被流式系统真正开始计算操作的时间。

不同流式计算框架支持时间不一样,

SparkStreaming框架仅仅支持处理时间ProcessTime,

StructuredStreaming支持事件时间和处理时间,

Flink框架支持三种时间数据操作,

实际项目中往往针对【事件时间EventTime】进行数据处理操作,更加合理化。

​​​​​​​event-time

基于事件时间窗口聚合操作:基于窗口的聚合(例如每分钟事件数)只是事件时间列上特殊类型的分组和聚合,其中每个时间窗口都是一个组,并且每一行可以属于多个窗口/组。

事件时间EventTime是嵌入到数据本身中的时间,数据实际真实产生的时间。例如,如果希望获得每分钟由物联网设备生成的事件数,那么可能希望使用生成数据的时间(即数据中的事件时间event time),而不是Spark接收数据的时间(receive time/archive time)。

这个事件时间很自然地用这个模型表示,设备中的每个事件(Event)都是表中的一行(Row),而事件时间(Event Time)是行中的一列值(Column Value)。

因此,这种基于事件时间窗口的聚合查询既可以在静态数据集(例如,从收集的设备事件日志中)上定义,也可以在数据流上定义,从而使用户的使用更加容易。

修改词频统计程序,数据流包含每行数据以及生成每行行的时间。希望在10分钟的窗口内对单词进行计数,每5分钟更新一次,如下图所示:

单词在10分钟窗口【12:00-12:10、12:05-12:15、12:10-12:20】等之间接收的单词中计数。注意,【12:00-12:10】表示处理数据的事件时间为12:00之后但12:10之前的数据。思考一下,12:07的一条数据,应该增加对应于两个窗口12:00-12:10和12:05-12:15的计数。

基于事件时间窗口统计有两个参数索引:分组键(如单词)和窗口(事件时间字段)。

  • event-time 窗口生成

Structured Streaming中如何依据EventTime事件时间生成窗口的呢?查看类TimeWindowing源码中生成窗口规则:

org.apache.spark.sql.catalyst.analysis.TimeWindowing// 窗口个数/* 最大的窗口数 = 向上取整(窗口长度/滑动步长) */maxNumOverlapping <- ceil(windowDuration / slideDuration)for (i <- 0 until maxNumOverlapping)/**timestamp是event-time 传进的时间戳startTime是window窗口参数,默认是0 second 从时间的0s含义:event-time从1970年...有多少个滑动步长,如果说浮点数会向上取整*/windowId <- ceil((timestamp - startTime) / slideDuration)/**windowId * slideDuration  向上取能整除滑动步长的时间(i - maxNumOverlapping) * slideDuration 每一个窗口开始时间相差一个步长*/windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTimewindowEnd <- windowStart + windowDurationreturn windowStart, windowEnd

将【(event-time向上取 能整除 滑动步长的时间) - (最大窗口数×滑动步长)】作为"初始窗口"的开始时间,然后按照窗口滑动宽度逐渐向时间轴前方推进,直到某个窗口不再包含该event-time 为止,最终以"初始窗口"与"结束窗口"之间的若干个窗口作为最终生成的 event-time 的时间窗口。

每个窗口的起始时间start与结束时间end都是前闭后开(左闭右开)的区间,因此初始窗口和结束窗口都不会包含 event-time,最终不会被使用。假设数据为【2019-08-14 10:50:00, dog】,按照上述规则计算窗口示意图如下:

得到窗口如下:

​​​​​​​延迟数据处理

Structed Streaming与Spark Streaming相比一大特性就是支持基于数据中的时间戳的数据处理。也就是在处理数据时,可以对记录中的eventTime事件时间字段进行考虑。因为eventTime更好的代表数据本身的信息,且可以借助eventTime处理比预期晚到达的数据,但是需要有一个限度(阈值),不能一直等,应该要设定最多等多久。

​​​​​​​延迟数据

在很多流计算系统中,数据延迟到达(the events arrives late to the application)的情况很常见,并且很多时候是不可控的,因为很多时候是外围系统自身问题造成的。Structured Streaming可以保证一条旧的数据进入到流上时,依然可以基于这些“迟到”的数据重新计算并更新计算结果

上图中在12:04(即事件时间)生成的单词可能在12:11被应用程序接收,此时,应用程序应使用时间12:04而不是12:11更新窗口12:00-12:10的旧计数。但是会出现如下两个问题:

问题一:延迟数据计算是否有价值

  • 如果某些数据,延迟很长时间(如30分钟)才到达流式处理系统,数据还需要再次计算吗?计算的结果还有价值吗?原因在于流式处理系统处理数据关键核心在于实时性;
  • 实践表明,流计算关注的是近期数据,更新一个很早之前的状态往往已经不再具有很大的业务价值;

问题二:以前状态保存浪费资源

  • 实时统计来说,如果保存很久以前的数据状态,很多时候没有作用的,反而浪费大量资源;
  • Spark 2.1引入的watermarking允许用户指定延迟数据的阈值,也允许引擎清除掉旧的状态。即根据watermark机制来设置和判断消息的有效性,如可以获取消息本身的时间戳,然后根据该时间戳来判断消息的到达是否延迟(乱序)以及延迟的时间是否在容忍的范围内(延迟的数据是否处理)。

​​​​​​​Watermarking 水位

水位watermarking官方定义:


lets the engine automatically track the current event time in the data and attempt to clean up old state accordingly.

通过指定event-time列(上一批次数据中EventTime最大值)和预估事件的延迟时间上限(Threshold)来定义一个查询的水位线watermark。翻译:让Spark SQL引擎自动追踪数据中当前事件时间EventTime,依据规则清除旧的状态数据

Watermark = MaxEventTime - Threshod

1:执行第一批次数据时,Watermarker为0,所以此批次中所有数据都参与计算;

2:Watermarker值只能逐渐增加,不能减少;

3:Watermark机制主要解决处理聚合延迟数据和减少内存中维护的聚合状态;

4:设置Watermark以后,输出模式OutputMode只能是Append和Update;

如下方式设置阈值Threshold,计算每批次数据执行时的水位Watermark:

看一下官方案例:词频统计WordCount,设置阈值Threshold为10分钟,每5分钟触发执行一次。

  • 延迟到达但没有超过watermark:(12:08, dog)

在12:20触发执行窗口(12:10-12:20)数据中,(12:08, dog) 数据是延迟数据,阈值Threshold设定为10分钟,此时水位线【Watermark = 12:14 - 10m = 12:04】,因为12:14是上个窗口(12:05-12:15)中接收到的最大的事件时间,代表目标系统最后时刻的状态,由于12:08在12:04之后,因此被视为“虽然迟到但尚且可以接收”的数据而被更新到了结果表中,也就是(12:00 - 12:10, dog, 2)和(12:05 - 12:11, dog, 3)。

  • 超出watermark:(12:04, donkey)

在12:25触发执行窗口(12:15-12:25)数据中,(12:04, donkey)数据是延迟数据,上个窗口中接收到最大的事件时间为12:21,此时水位线【Watermark = 12:21 - 10m = 12:11】,而(12:04,  donkey)比这个值还要早,说明它”太旧了”,所以不会被更新到结果表中了。

设置水位线Watermark以后,不同输出模式OutputMode,结果输出不一样:

  • Update模式:总是倾向于“尽可能早”的将处理结果更新到sink,当出现迟到数据时,早期的某个计算结果将会被更新;
  • Append模式:推迟计算结果的输出到一个相对较晚的时刻,确保结果是稳定的,不会再被更新,比如:12:00 - 12:10窗口的处理结果会等到watermark更新到12:11之后才会写入到sink。

如果用于接收处理结果的sink不支持更新操作,则只能选择Append模式。

​​​​​​​官方案例演示

编写代码,演示官方案例,如下几点注意:

1、该outputMode为update模式,即只会输出那些有更新的数据!!

2、官网案例该开窗窗口长度为10 min,步长5 min,水印为eventtime-10 min,但是测试的时候用秒

3、官网案例trigger(Trigger.ProcessingTime("5 minutes")),但是测试的时候用秒

测试数据:

2019-10-10 12:00:07,dog2019-10-10 12:00:08,owl2019-10-10 12:00:14,dog2019-10-10 12:00:09,cat2019-10-10 12:00:15,cat2019-10-10 12:00:08,dog2019-10-10 12:00:13,owl2019-10-10 12:00:21,owl2019-10-10 12:00:04,donkey  --丢失2019-10-10 12:00:17,owl     --不丢失

具体案例代码如下:

package cn.itcast.structedstreamingimport java.sql.Timestampimport org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}/*** 基于Structured Streaming 读取TCP Socket读取数据,事件时间窗口统计词频,将结果打印到控制台* 每5秒钟统计最近10秒内的数据(词频:WordCount),设置水位Watermark时间为10秒* 2019-10-10 12:00:07,dog* 2019-10-10 12:00:08,owl** 2019-10-10 12:00:14,dog* 2019-10-10 12:00:09,cat** 2019-10-10 12:00:15,cat* 2019-10-10 12:00:08,dog* 2019-10-10 12:00:13,owl* 2019-10-10 12:00:21,owl** 2019-10-10 12:00:04,donkey  --丢失* 2019-10-10 12:00:17,owl     --不丢失*/
object StructuredWindow {def main(args: Array[String]): Unit = {// 1. 构建SparkSession实例对象,传递sparkConf参数val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "3").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import org.apache.spark.sql.functions._import spark.implicits._// 2. 使用SparkSession从TCP Socket读取流式数据val inputStreamDF: DataFrame = spark.readStream.format("socket").option("host", "node1").option("port", 9999).load()// 3. 针对获取流式DStream进行词频统计val resultStreamDF = inputStreamDF.as[String].filter(StringUtils.isNotBlank(_))// 将每行数据进行分割单词: 2019-10-12 09:00:02,cat dog.flatMap(line => {val arr = line.trim.split(",")val timestampStr: String = arr(0)val wordsStr: String = arr(1)wordsStr.split("\\s+")//(时间戳,单词).map((Timestamp.valueOf(timestampStr), _))})// 设置列的名称.toDF("timestamp", "word")// TODO:设置水位Watermark.withWatermark("timestamp", "10 seconds")// TODO:设置基于事件时间(event time)窗口 -> time, 每5秒统计最近10秒内数据.groupBy(window($"timestamp", "10 seconds", "5 seconds"),$"word").count()// 按照窗口字段降序排序//.orderBy($"window")/*root|-- window: struct (nullable = true)|    |-- start: timestamp (nullable = true)|    |-- end: timestamp (nullable = true)|-- word: string (nullable = true)|-- count: long (nullable = false)*///resultStreamDF.printSchema()// 4. 将计算的结果输出,打印到控制台val query: StreamingQuery = resultStreamDF.writeStream.outputMode(OutputMode.Update()).format("console").option("numRows", "100").option("truncate", "false").trigger(Trigger.ProcessingTime("5 seconds")).start()query.awaitTermination()query.stop()}
}

2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析相关推荐

  1. 2021年大数据Spark(十二):Spark Core的RDD详解

    目录 RDD详解 为什么需要RDD? 什么是RDD? RDD的5大特性 第一个:A list of partitions 第二个:A function for computing each split ...

  2. 2021年大数据Kafka(十二):❤️Kafka配额限速机制❤️

    全网最详细的大数据Kafka文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Kafka配额限速机制 限制producer端的速率 限制c ...

  3. 2021年大数据HBase(十二):Apache Phoenix 二级索引

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Apache Phoenix 二级索引 一.索引分类 ...

  4. 2021年大数据Hive(十二):Hive综合案例!!!

    全网最详细的大数据Hive文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 前言 Hive综合案例 一.需求描述 二.项目表的字段 三.进 ...

  5. 2021年大数据Hadoop(十二):HDFS的API操作

    2021大数据领域优质创作博客,带你从入门到精通,该博客每天更新,逐渐完善大数据各个知识体系的文章,帮助大家更高效学习. 有对大数据感兴趣的可以关注微信公众号:三帮大数据 目录 HDFS的API操作 ...

  6. 2021年大数据ELK(十二):Elasticsearch编程(环境准备)

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 Elasticsearch编程 一.环境准备 1.准备IDEA项目结构 2.准 ...

  7. 2021年大数据Spark(十五):Spark Core的RDD常用算子

    目录 常用算子 基本算子 分区操作函数算子 重分区函数算子 1).增加分区函数 2).减少分区函数 3).调整分区函数 ​​​​​​​聚合函数算子 ​​​​​​​Scala集合中的聚合函数 ​​​​​ ...

  8. 2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量

    目录 共享变量 广播变量 累加器 ​​​​​​​案例演示 共享变量 在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副 ...

  9. 2021年大数据Spark(十四):Spark Core的RDD操作

    目录 RDD的操作 函数(算子)分类 Transformation函数 ​​​​​​​Action函数 RDD的操作 有一定开发经验的读者应该都使用过多线程,利用多核 CPU 的并行能力来加快运算速率 ...

最新文章

  1. codevs1316 文化之旅
  2. hexo的yelee主题使用自定义字体并用字蛛进行字体压缩的sed脚本
  3. TypeError: unhashable type: 'numpy.ndarray'
  4. android开发适配深色模式,手机不支持深色模式,如何用软件解决深色模式的问题?(附有系统全局深色模式实现方法...
  5. clob存base64文件存不进去_工行信用卡这几个提额方法,你知道吗?一般人我不告诉他!...
  6. bash编程(一)之运算及比较
  7. Soul网关源码阅读(九)插件配置加载初探
  8. Python的类与类型
  9. Java接口的实现源代码_Comparable接口的实现:源代码
  10. js对文字的修饰 big() small() fixed() toUpperCase() toLowerCase()
  11. smartpdf双击无法返回latex如何解决
  12. 最新时空观测结果证实爱因斯坦相对论合理性
  13. 帕累托法则/20:80法则/犹太法则
  14. 房子千万不要这样装修啊 别说我没告诉你
  15. html条件查询,高级查询条件设置- 通用查询-报表设计初级教程
  16. 联通沃云发布全新战略:强大基座,就在身边
  17. 数学建模系列:历年优秀论文+入门+进阶+国赛+美赛+其他
  18. IIS服务器部署php项目
  19. 简单的指针二叉查找树和数组二叉查找树
  20. html form提交前md5,javascript实现MD5加密-JavaScript获取HTML元素的三种方...-兼容IE与firefox的js回车提交表单_169IT.COM...

热门文章

  1. RPC 笔记(01)— RPC概念、调用流程、RPC 与 Restful API 区别
  2. Docker 入门系列(5)- Docker 端口映射(映射所有IP地址、映射到指定地址和指定端口、映射指定地址任意端口、查看映射端口配置)
  3. 知名高校共享课程资源GitHub地址
  4. 一本读懂BERT(实践篇)重点
  5. LeetCode简单题之二叉搜索树的范围和
  6. Android中Service生命周期、启动、绑定、混合使用
  7. Python数据挖掘:数据探索,数据清洗,异常值处理
  8. C++ 字符串字母大小写转换
  9. 使用码云给同事地址地址注意事项
  10. FLINK源代码调试方式