目录

一、时间语义

1.1 三种时间概念

1.1.1 ProcessTime 在代码中的使用

1.1.2 EventTime 在代码中的使用

1.1.3 关于窗口起始时间的计算值

二、对事件的处理

2.1 有序事件

2.2 乱序事件

2.3 指定 Timestamps 与生成 Watermarks

2.4 使用 WatermarkStrategy 工具类指定时间戳和Watermark

2.5 自定义指定 Timestamps 和 Watermarks

2.6 对迟到数据的处理


一、时间语义

1.1 三种时间概念

Flink 根据时间产生的位置不同,可以将时间分为三种:

  • 事件时间:EventTime,数据产生的时间
  • 接入时间:IngestionTime,数据进入Flink的时间
  • 处理时间:ProcessTime,数据被算子处理的时间

1.1.1 ProcessTime 在代码中的使用

ProcessTime 使用的是系统时间,直接使用对应的API即可,参考:大数据——Flink dataStream 中 窗口函数的使用

  • TumblingProcessingTimeWindows
  • SlidingProcessingTimeWindows
  • ProcessingTimeSessionWindows

1.1.2 EventTime 在代码中的使用

EventTime在数据中,需要在代码中指定

  • TumblingEventTimeWindows
  • SlidingEventTimeWindows
  • EventTimeSessionWindows

1.1.3 关于窗口起始时间的计算值

  • 左闭右开
  • timestamp -  (timestamp - offset + windowSize) % windowSize

二、对事件的处理

2.1 有序事件

如果读取到的数据是有序并且是升序的,可以使用assignAscendingTimestamps

代码如下

package cn.kgc.windowimport cn.kgc.bean.TrainAlarm
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, SlidingEventTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time/*** 基于EventTime实现的窗口计算*/
object WindowEventTime {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val inputStream = env.socketTextStream("master", 666).map(line => {val ps = line.split(",")TrainAlarm(ps(0), ps(1).toLong, ps(2).toDouble)})//如果读取到的数据是有序的并且升序,那么就使用assignAscendingTimestamps//指定EventTime,如果数据中的时间戳为10位数(到秒),需要乘以1000转换为到毫秒.assignAscendingTimestamps(_.ts * 1000L).keyBy(_.id)//基于EventTime实现滚动窗口//起始时间为窗口大小的整数倍,比如:203就是[200,205)窗口内inputStream.window(TumblingEventTimeWindows.of(Time.seconds(5))).maxBy("temp")//基于EventTime实现滑动窗口inputStream.window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(2))).maxBy("temp")//基于EventTime实现会话窗口//会话窗口触发机制:同一字段,一条数据超过会话时间时触发inputStream.window(EventTimeSessionWindows.withGap(Time.seconds(5))).maxBy("temp").print()env.execute()}
}
//列车温度传感器数据的样例类id表示列车id,ts表示事件戳,temp表示温度
case class TrainAlarm(id:String,ts:Long,temp:Double)

2.2 乱序事件

由于网络或者系统等外部因素的影响,数据被传输到 Flink 的时间往往不是按照事件产生的顺序传输进来的,因而会造成乱序或者延迟等问题。比如基于事件时间的 Window 创建后,如何判断 Window 中的数据是否已经全部到达,全部到达则可以执行,未全部到达则需要继等待。比如在理想状态下,数据产生和到达的顺序都是一致的,而实际却是乱序的。在此情况下,就出现了 Watermark 机制,它能够衡量数据到达的进度和完整性。
        比如在理想状态下,数据产生和到达的顺序都是一致的,而实际却是乱序的。

对于上图中实际状态的数据到达情况,如果我们有一个 5s 的窗口算子,当 4 一直未到达时,这个窗口就得一直等待 4 的到来,如果 4 长时间未到达就会影 响整个窗口的计算,而 Watermark 就用来解决此问题。

Watermark:Flink 将最新读取数据的最大的 EventTime 减去固定的时间间隔 作为 Watermark。固定的时间间隔其实就是指最大延迟时间。每条数据都会伴随 着一个 Watermark。如果有一条数据的 Watermark 大于了某个窗口的 EndTime, 就会默认该窗口内的数据已经全部到达,并触发执行。

虽然 4 未到达,但是它的窗口已经被执行了,所以 4 默认不会被处理,因为它在延时时长内还未到达。

2.3 指定 Timestamps 与生成 Watermarks

WatermarkStrategy 可以在 Flink 应用程序中的两处使用,第一种是直接在 数据源上使用,第二种是直接在非数据源的操作之后使用。

第一种方式相比会更好,因为数据源可以利用 watermark 生成逻辑中有关 分片/分区(shards/partitions/splits)的信息。使用这种方式,数据源通常可以更 精准地跟踪 watermark,整体 watermark 生成将更精确。直接在源上指定 WatermarkStrategy 意味着你必须使用特定数据源接口。

比如我们使用 Flink 读取 Kafka 中的数据的时候,Kafka 中的分区个数与 Flink 中的分区个数是一致的。多个分区常常并行使用,因此交错来自各个分区的事件 数据就会破坏每个分区的事件时间模式。在这种情况下,你可以使用 Flink 中可 识别 Kafka 分区的 watermark 生成机制。使用此特性,将在 Kafka 消费端内部 针对每个 Kafka 分区生成 watermark。

2.4 使用 WatermarkStrategy 工具类指定时间戳和Watermark

package cn.kgc.windowimport java.time.Durationimport org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Timeobject EventTimeAndWaterMark {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val keyedStream = env.socketTextStream("master", 1234).map(line => {val ps = line.split(",")TrainerAlarm(ps(0), ps(1).toLong, ps(2).toDouble)})//指定时间戳和注入水印.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner[TrainerAlarm] {override def extractTimestamp(element: TrainerAlarm, timestamps: Long): Long = {if (element.timestamp.toString.length == 10){element.timestamp*1000L}else{element.timestamp}}})).keyBy(_.id)//指定窗口类型为滚动窗口keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5))).maxBy("temp").print()env.execute()}
}case class TrainerAlarm(id:String,timestamp:Long,temperature:Double)

2.5 自定义指定 Timestamps 和 Watermarks

package cn.kgc.windowimport org.apache.flink.api.common.eventtime.{TimestampAssigner, TimestampAssignerSupplier, Watermark, WatermarkGenerator, WatermarkGeneratorSupplier, WatermarkOutput, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Timeobject EventTimeAndWaterMark {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val keyedStream = env.socketTextStream("master", 1234).map(line => {val ps = line.split(",")TrainerAlarm(ps(0), ps(1).toLong, ps(2).toDouble)})//使用自定义指定的timestamp和水印.assignTimestampsAndWatermarks(new MyWatermarkStrategy).keyBy(_.id)//指定窗口类型为滚动窗口keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5))).maxBy("temp").print()env.execute()}
}case class TrainerAlarm(id:String,timestamps:Long,temperature:Double)//继承WatermarkStrategy来自定义指定时间戳和创建水印
class MyWatermarkStrategy extends  WatermarkStrategy[TrainerAlarm]{override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[TrainerAlarm] = {new TimestampAssigner[TrainerAlarm] {override def extractTimestamp(element: TrainerAlarm, timestamps: Long): Long = {if (element.timestamps.toString.length == 10){element.timestamps*1000L}else{element.timestamps}}}}override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[TrainerAlarm] = {new WatermarkGenerator[TrainerAlarm] {val maxOutOfOrderness = 2000Lvar maxTimestamps = 0Loverride def onEvent(element: TrainerAlarm, timestamp: Long, watermarkOutput: WatermarkOutput): Unit = {//提取已经来的数据中最大的EventTimemaxTimestamps = Math.max(maxTimestamps,element.timestamps)}override def onPeriodicEmit(watermarkOutput: WatermarkOutput): Unit = {//创建水印watermarkOutput.emitWatermark(new Watermark({if (maxTimestamps.toString.length == 10){maxTimestamps*1000L-maxTimestamps}else{maxTimestamps-maxTimestamps}}))}}}
}

2.6 对迟到数据的处理

如果 Watermark 设置的延迟太久,收到结果的速度可能就会很慢,解决办 法是在 Watermark 到达之前输出一个近似的结果。

如果 Watermark 到达的太早,则可能收到错误的结果,不过 Flink 处理延迟 到的数据机制可能解决这个问题。

实际开发常见处理,一般都是设置比较少的延迟时间(可以解决大部分的乱序 数据的一个时间),然后使用延迟数据处理机制和侧输出流。

代码

package cn.kgc.windowimport java.time.Durationimport org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Timeobject EventTimeAndWaterMark {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)val keyedStream = env.socketTextStream("master", 1234).map(line => {val ps = line.split(",")TrainerAlarm(ps(0), ps(1).toLong, ps(2).toDouble)})//指定时间戳和注入水印.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner[TrainerAlarm] {override def extractTimestamp(element: TrainerAlarm, timestamps: Long): Long = {if (element.timestamp.toString.length == 10){element.timestamp*1000L}else{element.timestamp}}})).keyBy(_.id)val late = new OutputTag[TrainerAlarm]("late")//指定窗口类型为滚动窗口val result = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)))//设置数据允许迟到时间:窗口触发后不会关闭,直到超过允许迟到时间时才会关闭窗口,若属于该窗口的数据在允许迟到时间内到达,仍会参与窗口计算.allowedLateness(Time.minutes(1))//超过允许迟到时间到达的数据会被记录到侧输出流.sideOutputLateData(late).maxBy("temp")result.print()result.getSideOutput(late).print("late===")env.execute()}
}case class TrainerAlarm(id:String,timestamp:Long,temperature:Double)

大数据——Flink 时间语义相关推荐

  1. 大数据——Flink 知识点整理

    目录 1. Flink 的特点 2. Flink 和 SparkStreaming 的对比 3. Flink 和 Blink.Alink之间的关系 4. JobManager 和 TaskManage ...

  2. 大数据Flink最强手册

    大家好,我是脚丫先生 (o^^o) 近日持续高强度的研究实时流,终究还是把Flink强撸. 强撸Flink容易飞灰湮灭,古人诚不欺我. 于是想给小伙伴们,推荐Flink学习文档(当然还是希望多支持正版 ...

  3. 黄智生教授:大数据时代的语义技术(公号回复“黄智生语义技术”下载彩标PDF论文)

    黄智生教授:大数据时代的语义技术(公号回复"黄智生语义技术"下载彩标PDF论文) 原创: 黄智生 数据简化DataSimp 今天 数据简化DataSimp导读:介绍面向大数据环境的 ...

  4. 大数据 Flink 教程之使用 Apache Flink 进行无服务器复杂事件处理

    大数据 Flink 教程之使用 Apache Flink 进行无服务器复杂事件处理 什么是 Apache Flink? Flink 是一个分布式处理引擎,能够对数据流进行大规模的内存计算.数据流是一系 ...

  5. 【大数据Flink系列】Flink教程:详细全部

    [大数据Flink系列]Flink 核心概念综述 [大数据Flink系列]Flink单机模式和集群搭建 [大数据Flink系列] Flink 开发环境搭建 [大数据Flink系列]Flink Data ...

  6. 2021年大数据Flink(四十八):扩展阅读  Streaming File Sink

    目录 扩展阅读  Streaming File Sink 介绍 场景描述 Bucket和SubTask.PartFile 案例演示 扩展阅读  配置详解 PartFile PartFile序列化编码 ...

  7. 2021年大数据Flink(一):乘风破浪的Flink-Flink概述

    目录 乘风破浪的Flink-Flink概述 实时即未来 一切从Apache开始 富二代Flink Flink官方介绍 官网地址: Flink组件栈 ​​​​​​​Flink基石 Checkpoint ...

  8. 大数据Flink概述

    目录 1 Flink概述 1.1 框架版本 1.2 编程语言 2 实时即未来 3 富二代Flink 4 Flink官方介绍 5 Flink组件栈 6 Flink基石 7 Flink用武之地 7.1 E ...

  9. 2021年大数据Flink(四十):​​​​​​​Flink模拟双十一实时大屏统计

    目录 Flink模拟双十一实时大屏统计 需求 数据 编码步骤: 1.env 2.source 3.transformation 4.使用上面聚合的结果,实现业务需求: 5.execute 参考代码 实 ...

最新文章

  1. SQL Server 表分区实战系列(文章索引)
  2. 让手机站点像原生应用的四大途径
  3. LeetCode Maximum Depth of Binary Tree
  4. HDU1257 最少拦截系统(下降自序列个数)
  5. Python中*args和**kwargs
  6. Understanding Java class loading - part 2
  7. python科学计算笔记(十二)pandas的resample采样
  8. log4j2 无日志记录_在Log4j2中更好地执行非日志记录器调用
  9. 【iOS - 周总结】开发中遇到的小知识点(2018.12.10-2018.12.15)
  10. php和windows对应,哪个.so文件可以用于windows系统中与.dll文件相对应的linux系统,以便将php连接到ms sql server...
  11. kafka(一)—基本概念
  12. 基于Cocos2d-x的手机游戏性能监控
  13. 16年4月20号 个人总结
  14. AD09之与AD6版本使用不同对比
  15. 银行数据仓库体系实践(14)--数据应用之内部报表及数据分析
  16. 周记录学习总结<大杂烩>
  17. 学计算机r7000和y7000哪个好,联想拯救者r7000p和y7000p哪个好-联想拯救者r7000p和y7000p评测对比...
  18. 数据预处理和特征工程1--无量纲化:数据归一化、标准化
  19. BIOS视频中断 10号中断详解
  20. 万马股份旗下万马爱充遭通报下架:违规收集个人信息,未及时整改

热门文章

  1. esxi error 1962
  2. asp.net中GridView排序的手动实现
  3. 三创数据分析题库及个人作答
  4. 【翻译】RUST无锁编程
  5. 计算机二级刷题先EXCEL,备考篇丨迎接计算机二级考试
  6. *.ftl文件中文乱码的问题
  7. 申请计算机专业有关个人陈述吗,计算机专业个人陈述
  8. JavaScript 可视化框架汇总
  9. 电脑显卡4种接口类型
  10. Mac安装Drozer apk安全测试框架踩坑记录, ‘openssl/opensslv.h‘ file not found 和implicit declaration of function‘xx‘