1 ProcessFunction

   ProcessFunction是一个低阶的流处理操作,可以访问事件(event)(流元素),状态(state)(容错性,一致性,仅在keyed stream中),定时器(timers)(event time和processing time, 仅在keyed stream中)。也就是说可以访问普通的转换算子无法访问事件的时间戳信息和Watermark的。

   ProcessFunction可以看作是一个具有keyed state 键控状态和 timers定时器访问权的FlatMapFunction,通过对输入流中接收的每个事件调用来处理事件。①通过RuntimeContext访问keyed state②计时器允许应用程序对处理时间和事件时间中的更改作出响应。对processElement(…)函数的每次调用都获得一个Context对象,该对象可以访问元素的event time timestamp和TimerService;③TimerService可用于为将来的event/process time瞬间注册回调。当到达计时器的特定时间时,将调用onTimer(…)方法。在该调用期间,所有状态都再次限定在创建计时器时使用的键的范围内,从而允许计时器操作键控状态。总之ProcessFunction可以访问时间戳、watermark以及注册定时事件,输出特定的一些事件等。Flink SQL就是使用Process Function实现的。

   如果要访问键控状态和计时器,则必须应用在keyedStream上

stream.keyBy(...).process(new MyProcessFunction())

   Flink提供了8个Process Function:ProcessFunction,KeyedProcessFunction,CoProcessFunction,ProcessJoinFunction,BroadcastProcessFunction,KeyedBroadcastProcessFunction,ProcessWindowFunction,ProcessAllWindowFunction。

   所有的Process Function都继承自RichFunction接口,所以都有open()、close()和getRuntimeContext()等方法,还额外提供了两个方法processElement和onTimer

   processElement:每来一个元素都会调用这个方法,调用结果将会放在Collector数据类型中输出。获得的Context可以访问元素的时间戳,元素的key,以及TimerService时间服务。Context还可以将结果输出到别的流(side outputs)。

   onTimer:是一个回调函数,当之前注册的定时器到达触发时间调用。参数timestamp为定时器所设定的触发的时间戳。Collector为输出结果的集合。OnTimerContext和processElement的Context参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。

2 低阶join

   要实现对两个输入的低级操作,应用程序可以使用CoProcessFunction或KeyedCoProcessFunction。

   CoProcessFunction实现对两个输入的低阶操作,它绑定到两个不同的输入流,分别调用processElement1(…)和processElement2(…)对两个输入流的数据进行处理

   实现低阶join通常遵循以下模式:①为一个(或两个)输入创建一个状态对象②当从输入源收到元素时,更新状态③从另一个输入接收元素后,检索状态并生成连接的结果

3 KeyedProcessFunction

   KeyedProcessFunction作为ProcessFunction的扩展,在其onTimer(…)方法中提供对定时器对应key的访问。

   KeyedProcessFunction用来操作KeyedStream。KeyedProcessFunction会处理流的每一个元素,输出为0个、1个或者多个元素。

override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): Unit = {var key = ctx.getCurrentKey// ...
}

4 Timers

   processing-time/event-time timer都由TimerService在内部维护并排队等待执行,仅在keyed stream中有效。

   由于Flink对(每个key+timestamp)只维护一个计时器。如果为相同的timestamp注册了多个timer ,则只调用onTimer()方法一次。

   Flink保证同步调用onTimer()和processElement() 。因此用户不必担心状态的并发修改。

   容错:Timer具有容错和checkpoint能力(基于flink app的状态)。从故障恢复或从savepoint启动应用程序时,Timer将被恢复。大量计时器会增加检查点时间,因为计时器是检查点状态的一部分。

   定时器合并:由于Flink对每个键和时间戳只维护一个计时器,因此可以通过降低计时器频率来合并计时器,从而减少计时器的数量。 event-time timer只会在watermarks到来时触发。

//对于1秒的定时器分辨率(事件或处理时间),可以将目标时间舍入整秒。计时器的发射时间最多提前1秒,但不迟于要求的毫秒精度。因此,每键最多有一个定时器和第二个定时器。
val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000
ctx.timerService.registerProcessingTimeTimer(coalescedTime)//事件时间计时器只在水印进入的情况下触发,您还可以使用当前Watermark调度这些计时器并将其与下一个Watermark合并:
val coalescedTime = ctx.timerService.currentWatermark + 1
ctx.timerService.registerEventTimeTimer(coalescedTime)//停止处理时间计时器:
val timestampOfTimerToStop = ...
ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)//停止事件时间计时器:
val timestampOfTimerToStop = ...
ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)

5 官方案例

   KeyedProcessFunction维护每个键的计数,并在没有对该键进行更新的情况下,在一分钟内(在事件发生时)发出一个键/计数对:

  • 计数、键和最后修改时间戳存储在ValueState,这是由Key隐式限定范围的。
  • 对于每个记录,KeyedProcessFunction增加计数器并设置最后修改的时间戳。
  • 该函数还会在以后的一分钟内安排一个回调(在事件发生时)。
  • 在每次回调时,它会检查回调的事件时间戳和存储计数的最后修改时间,如果它们匹配,则发出键/计数(也就是说,在这一分钟内没有发生进一步的更新)。
import org.apache.flink.api.common.state.ValueState
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector// the source data stream
val stream: DataStream[Tuple2[String, String]] = ...// apply the process function onto a keyed stream
val result: DataStream[Tuple2[String, Long]] = stream.keyBy(0).process(new CountWithTimeoutFunction())/*** The data type stored in the state*/
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)/*** The implementation of the ProcessFunction that maintains the count and timeouts*/
class CountWithTimeoutFunction extends KeyedProcessFunction[Tuple, (String, String), (String, Long)] {/** The state that is maintained by this process function */lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))override def processElement(value: (String, String), ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#Context, out: Collector[(String, Long)]): Unit = {// initialize or retrieve/update the stateval current: CountWithTimestamp = state.value match {case null =>CountWithTimestamp(value._1, 1, ctx.timestamp)case CountWithTimestamp(key, count, lastModified) =>CountWithTimestamp(key, count + 1, ctx.timestamp)}// write the state backstate.update(current)// schedule the next timer 60 seconds from the current event timectx.timerService.registerEventTimeTimer(current.lastModified + 60000)}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, (String, String), (String, Long)]#OnTimerContext, out: Collector[(String, Long)]): Unit = {state.value match {case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>out.collect((key, count))case _ =>}}
}

Flink的ProcessFunction API相关推荐

  1. 【Flink】ProcessFunction:Flink最底层API使用教程

    1.美图 2.概述 之前提到的一些算子和函数能够进行一些时间上的操作,但是不能获取算子当前的Processing Time或者是Watermark时间戳,调用起来简单但功能相对受限.如果想获取数据流中 ...

  2. Flink算子(ProcessFunction,map和Flatmap)

    Flink提供三层API,每个API在简洁性和表达之间提供不同的权衡,并针对不同的用例 SQL/Table API(dynamic tables) DataStream API(streams,win ...

  3. 使用flink Table Sql api来构建批量和流式应用(3)Flink Sql 使用

    从flink的官方文档,我们知道flink的编程模型分为四层,sql层是最高层的api,Table api是中间层,DataStream/DataSet Api 是核心,stateful Stream ...

  4. 使用flink Table Sql api来构建批量和流式应用(2)Table API概述

    从flink的官方文档,我们知道flink的编程模型分为四层,sql层是最高层的api,Table api是中间层,DataStream/DataSet Api 是核心,stateful Stream ...

  5. 使用flink Table Sql api来构建批量和流式应用(1)Table的基本概念

    从flink的官方文档,我们知道flink的编程模型分为四层,sql层是最高层的api,Table api是中间层,DataStream/DataSet Api 是核心,stateful Stream ...

  6. Flink的Table API 与SQL的流处理

    1 流处理与SQL的区别   Table API和SQL,本质上还是基于关系型表的操作方式:而关系型表.SQL本身,一般是有界的,更适合批处理的场景.所以在流处理的过程中,有一些特殊概念. SQL 流 ...

  7. Flink 使用Table Api 读取文件数据并写出到文件中

    前言 在上一篇我们演示了如何使用Flink 的Table Api 读取文件数据,并过滤特定字段的数据,本篇在上一篇的基础上,将从CSV文件中读取的数据重新输出到一个新的CSV文件中: 在实际业务场景下 ...

  8. Flink入门——DataSet Api编程指南

    简介: Flink入门--DataSet Api编程指南 Apache Flink 是一个兼顾高吞吐.低延迟.高性能的分布式处理框架.在实时计算崛起的今天,Flink正在飞速发展.由于性能的优势和兼顾 ...

  9. Flink基于 DataStream API 实现欺诈检测

    目录 系列文章目录 文章目录 前言 一.Flink基于 DataStream API 实现欺诈检测 二.使用步骤 1.引入pom.xml 2.主类 3.欺诈逻辑判断类 4.运行结果: 总结 前言 在当 ...

最新文章

  1. 计算机应用基础模拟试题一,计算机应用基础模拟试题1
  2. Linux lvs 多端口组成
  3. 在哪开启oracle服务器,开启企业殿堂的钥匙 Oracle服务器的安装
  4. [转]一个老工程师给年轻工程师的忠告
  5. 从报表到数据可视化,我用这五步,成功搭建银行大数据架构
  6. java基础英语---第二十四天
  7. (笔记)Linux内核学习(三)之进程调度
  8. usb 测试软件,usb端口测试(USB端口测试工具)
  9. [济南考勤机专题]考勤机类型(五)打卡考勤机
  10. Unity游戏教程初步(一):开始之前
  11. 交换机与路由器的配置
  12. arcgis html图像标记,图片标记
  13. 静下心来学习MVC之基本概念
  14. UE4 开发从入门到入土
  15. 数电课程设计之7段显示器8421BCD码转换器
  16. android动画知乎,GitHub - ryanhoo/Zhihu-Parallax-Animation: 知乎 Android 客户端启动页的视差动画效果实现...
  17. 打印出js对象里面的内容
  18. Python科研数据分析专题之正态性检验
  19. 谢少荣到计算机学院,我院获批国家自然科学基金委人工智能代码(F06)首个重大项目...
  20. 2016 HCTF web writeup

热门文章

  1. redis -- 学习
  2. JS高级——变量提升
  3. 【收集】Python 微优化
  4. C++ 0x 使用可变参数模板类 实现 C# 的委托机制
  5. 转一篇写的比较好的camera文档[Camera 图像处理原理分析]
  6. nyoj187 快速查找素数
  7. java技术_2020年最流行的Java开发技术
  8. 昆明大学津桥学院计算机科学与技术,昆明理工大学津桥学院计算机科学与技术专业2015年在河南理科高考录取最低分数线...
  9. python print 输出到txt_(Python基础教程之七)Python字符串操作
  10. 二进制包如何知道go 版本_gops 是怎么和 Go 的运行时进行交互的?