**

Flink对迟到数据的处理

**

水位线可以用来平衡计算的完整性和延迟两方面。除非我们选择一种非常保守的水位线策略(最大延时设置的非常大,以至于包含了所有的元素,但结果是非常大的延迟),否则我们总需要处理迟到的元素。

迟到的元素是指当这个元素来到时,这个元素所对应的窗口已经计算完毕了(也就是说水位线已经没过窗口结束时间了)。这说明迟到这个特性只针对事件时间。

DataStream API提供了三种策略来处理迟到元素

直接抛弃迟到的元素

将迟到的元素发送到另一条流中去

可以更新窗口已经计算完的结果,并发出计算结果。

  1. 抛弃迟到元素
    抛弃迟到的元素是event time window operator的默认行为。也就是说一个迟到的元素不会创建一个新的窗口。
    process function可以通过比较迟到元素的时间戳和当前水位线的大小来很轻易的过滤掉迟到元素。

  2. 重定向迟到元素
    迟到的元素也可以使用侧输出(side output)特性被重定向到另外的一条流中去。迟到元素所组成的侧输出流可以继续处理或者sink到持久化设施中去。

例子 :直接使用算子指定迟到数据输出到测输出流

package com.late
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector//重定向的处理
object LateTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//设置并行度env.setParallelism(1)//设置时间为事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//val stream = env.socketTextStream("linux102", 9999, '\n')//数据来源 "a 100 12"   "a 99 14"val value = stream.map(x => {val str = x.split(" ")(str(0), str(1).toInt, str(2).toLong * 1000)//未设置延迟的水位线}).assignAscendingTimestamps(x => x._3).keyBy(_._1).timeWindow(Time.seconds(5))//直接将迟到数据重定向到”late_date"的数据流中.sideOutputLateData(new OutputTag[(String, Int, Long)]("late_date"))//.process(new MaxFunction).aggregate(new MaxFunction)//获取“late_date"的测输出流value.getSideOutput(new OutputTag[(String,Int,Long)]("late_date")).print()env.execute()/* stream.map(x=>{val str = x.split(" ")(str(0),str(1).toLong*1000)*/}//采用全窗口函数的形式/*class MaxFunction extends ProcessWindowFunction[(String,Int,Long),(String,Int),String,TimeWindow]{override def process(key: String, context: Context, elements: Iterable[(String, Int, Long)], out: Collector[(String, Int)]): Unit = {out.collect((key,elements.map(_._2).toIterator.max))}}*/// in out key windom
//采取质量函数的形式class MaxFunction extends AggregateFunction[(String,Int,Long),(String,Int),(String,Int)]{// 累加逻辑override def add(in: (String, Int, Long), acc: (String, Int)): (String, Int) = {(in._1,in._2.max(acc._2))}//初始化累加器override def createAccumulator(): (String, Int) =("",0)//返回结果override def getResult(acc: (String, Int)): (String, Int) =acc//累加器聚合override def merge(acc: (String, Int), acc1: (String, Int)): (String, Int) =(acc._1,acc._2.max(acc1._2))}
}

通过自定义ProcessFunction来实现对迟到数据的处理 输出到侧输出流

object LateElement {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = env.socketTextStream("linux1", 9999, '\n')val s = stream.map(line => {val arr = line.split(" ")(arr(0), arr(1).toLong * 1000)})  //设值获取时间的方式 ,水位线的延迟为5秒钟.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) {override def extractTimestamp(element: (String, Long)): Long = element._2}).process(new MyLateProcess)s.getSideOutput(new OutputTag[String]("late")).print()env.execute()}class MyLateProcess extends ProcessFunction[(String, Long), (String, Long)] {val late = new OutputTag[String]("late")override def processElement(value: (String, Long),ctx: ProcessFunction[(String, Long), (String, Long)]#Context,out: Collector[(String, Long)]): Unit = {if (value._2 < ctx.timerService().currentWatermark()) {//将低于水位线的迟到数据输出到侧输出流ctx.output(late, "这个元素迟到了!")} else {out.collect(value)}}}
}
  1. 使用迟到元素更新窗口计算结果(Updating Results by Including Late Events)
    由于存在迟到的元素,所以已经计算出的窗口结果是不准确和不完全的。我们可以使用迟到元素更新已经计算完的窗口结果。

如果我们要求一个operator支持重新计算和更新已经发出的结果,就需要在第一次发出结果以后也要保存之前所有的状态。但显然我们不能一直保存所有的状态,肯定会在某一个时间点将状态清空,而一旦状态被清空,结果就再也不能重新计算或者更新了。而迟到的元素只能被抛弃或者发送到侧输出流。

window operator API提供了方法来明确声明我们要等待迟到元素。当使用event-time window,我们可以指定一个时间段叫做allowed lateness。window operator如果设置了allowed lateness,这个window operator在水位线没过窗口结束时间时也将不会删除窗口和窗口中的状态。窗口会在一段时间内(allowed lateness设置的)保留所有的元素。

当迟到元素在allowed lateness时间内到达时,这个迟到元素会被实时处理并发送到触发器(trigger)。当水位线没过了窗口结束时间+allowed lateness时间时,窗口会被删除,并且所有后来的迟到的元素都会被丢弃。

package com.atguiguimport org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collectorobject AllowedLateTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = env.socketTextStream("Linux1", 9999, '\n')val s = stream.map(line => {val arr = line.split(" ")(arr(0), arr(1).toLong * 1000)})
//      .assignAscendingTimestamps(_._2).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) {override def extractTimestamp(element: (String, Long)): Long = element._2}).keyBy(_._1)// [0,5),....timeWindow(Time.seconds(5))// 水位线超过 窗口结束时间 窗口闭合计算,但不销毁// 水位线超过 窗口结束时间 + allowed lateness,窗口更新结果并销毁.allowedLateness(Time.seconds(5)).process(new MyAllowedLateProcess)s.print()env.execute()}class MyAllowedLateProcess extends ProcessWindowFunction[(String, Long),String, String,TimeWindow] {override def process(key: String,context: Context,elements: Iterable[(String, Long)],out: Collector[String]): Unit = {lazy val isUpdate = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("update", Types.of[Boolean]))if (!isUpdate.value()) {out.collect("在水位线超过窗口结束时间的时候,窗口第一次闭合计算")isUpdate.update(true)} else {out.collect("迟到元素来了以后,更新窗口闭合计算的结果")}}}
}

Flink对迟到数据的处理的三种方式相关推荐

  1. python 爬虫 数据抓取的三种方式

    python 爬虫   数据抓取的三种方式 常用抽取网页数据的方式有三种:正则表达式.Beautiful Soup.lxml 1.正则表达式 正则表达式有个很大的缺点是难以构造.可读性差.不易适用未来 ...

  2. html 数据双向绑定,javascript实现数据双向绑定的三种方式小结

    前端数据的双向绑定方法 前端的视图层和数据层有时需要实现双向绑定(two-way-binding),例如mvvm框架,数据驱动视图,视图状态机等,研究了几个目前主流的数据双向绑定框架,总结了下.目前实 ...

  3. 数据转List的三种方式

    一.通过 Arrays.asList(strArray) 方式,将数组转换List后,只能查改 关键代码:List list = Arrays.asList(strArray); 从上图可以看到,添加 ...

  4. 金融科技公司采用大数据领先银行的三种方式

    近几年来,金融行业已成为大数据的最大的消费领域之一.根据调研机构Gartner公司的说法,2013年,64%的金融服务公司都使用了大数据.过去四年来,这一数字稳步上升.大数据正在以前所未有的方式改变行 ...

  5. ajax上传多文件和数据,Ajax上传数据和上传文件(三种方式)

    Ajax向后端发送数据可以有三种方式:原生Ajax方式,jQuery Ajax方式,iframe+form 方式(伪造Ajax方式) Title .btn { background-color: co ...

  6. python解析json数据的三种方式

    目录 1.运用re.json.jsonpath包解析json思路 2.三种方式的json解析案例 (1)运用re正则表达式解析json (2)运用字典的数据结构性质解析json (3)运用jsonpa ...

  7. 【学习笔记 — Flink 处理迟到数据(★)】

    Flink 处理迟到数据(★) 处理迟到数据之前首先了解Lambda架构 Lambda架构的实现是:一个批处理器.一个流处理器.流处理器首先实时输出近似正确的结果(因为乱序流,可能导致流处理结果不准确 ...

  8. Flink 对于迟到数据的处理

    WaterMark 和 Window 机制解决了流式数据的乱序问题,对于因为延迟而顺序有误的数据,可以根据eventTime进行业务处理. Event Time语义下我们使用Watermark来判断数 ...

  9. Flink中迟到数据的处理

    目录 设置水位线延迟时间 允许窗口处理迟到数据 将迟到数据放入窗口侧输出流 总结:         我们知道,所谓的"迟到数据"(late data),是指某个水位线之后到来的数据 ...

最新文章

  1. wps 2016 个人版 重新开始编号
  2. python爬虫能干什么-Python爬虫能做什么
  3. InfoPath开发经验小节
  4. 20应用统计考研复试要点(part11)--应用多元分析
  5. HDU1225 字符串
  6. 线段树、优先队列、单调队列小结
  7. 计算机的英语对话,英语口语对话:谈论电脑
  8. 数据结构与算法笔记总结
  9. [数据仓库]Bill Inmon和Ralph Kimball方法论
  10. LU分解的矩阵逆运算
  11. JSP教程第9讲笔记
  12. libiec61850探究【1】-第一个MMS通讯实例
  13. 华软计算机网络课程设计任务,计算机网络网络课程设计任务.doc
  14. HttpClient4模拟表单提交
  15. 电商运营小白,如何快速入门学习数据分析?
  16. 关于BTA12-600B双向可控硅应用中遇到问题的思考
  17. python输出中文加数字_Python实现阿拉伯数字加上中文数字
  18. Linux - 可视化菜单界面设计
  19. 打包时出现的异常。XXXXXXX-1.0-SNAPSHOT.jar中没有主清单属性的解决办法
  20. javascript字符串匹配正则表达式方法

热门文章

  1. 1.11 查找最接近的元素
  2. DevOps自动化之Jenkins
  3. 癌症分类预测-良/恶性乳腺癌肿瘤预测
  4. tpm linux,Linux内核再次升级 支持TPM芯片
  5. tableau-当工作表没有数据时显示无
  6. .netCore 的开源项目
  7. 8 .数据库-查-高级查询
  8. Excel函数VLOOKUP常用方法
  9. Sa-Token源码简单阅读
  10. Apriori算法(矩阵)