1.美图

2.概述

之前提到的一些算子和函数能够进行一些时间上的操作,但是不能获取算子当前的Processing Time或者是Watermark时间戳,调用起来简单但功能相对受限。如果想获取数据流中Watermark的时间戳,或者在时间上前后穿梭,需要使用ProcessFunction系列函数,它们是Flink体系中最底层的API,提供了对数据流更细粒度的操作权限。Flink SQL是基于这些函数实现的,一些需要高度个性化的业务场景也需要使用这些函数。

目前,这个系列函数主要包括KeyedProcessFunction、ProcessFunction、CoProcessFunction、KeyedCoProcessFunction、ProcessJoinFunction和ProcessWindowFunction等多种函数,这些函数各有侧重,但核心功能比较相似,主要包括两点:

状态:我们可以在这些函数中访问和更新Keyed State 。
定时器(Timer):像定闹钟一样设置定时器,我们可以在时间维度上设计更复杂的业务逻辑。
状态的介绍可以参考我的文章:Flink状态管理详解,这里我们重点讲解一下的使用ProcessFunction其他几个特色功能。本文所有代码都上传到了我的github:https://github.com/luweizheng/flink-tutorials

3.Timer的使用方法

我们可以把Timer理解成一个闹钟,使用前先在Timer中注册一个未来的时间,当这个时间到达,闹钟会“响起”,程序会执行一个回调函数,回调函数中执行一定的业务逻辑。这里以KeyedProcessFunction为例,来介绍Timer的注册和使用。

ProcessFunction有两个重要的接口processElement和onTimer,其中processElement函数在源码中的Java签名如下:

// 处理数据流中的一条元素
public abstract void processElement(I value, Context ctx, Collector<O> out)

processElement方法处理数据流中的一条元素,并通过Collector输出出来。Context是它的区别于FlatMapFunction等普通函数的特色,开发者可以通过Context来获取时间戳,访问TimerService,设置Timer。

另外一个接口是onTimer:

// 时间到达后的回调函数
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)

这是一个回调函数,当到了“闹钟”时间,Flink会调用onTimer,并执行一些业务逻辑。这里也有一个参数OnTimerContext,它实际上是继承了前面的Context,与Context几乎相同。

使用Timer的方法主要逻辑为:

  1. 在processElement方法中通过Context注册一个未来的时间戳t。这个时间戳的语义可以是Processing Time,也可以是Event Time,根据业务需求来选择。
  2. 在onTimer方法中实现一些逻辑,到达t时刻,onTimer方法被自动调用。

从Context中,我们可以获取一个TimerService,这是一个访问时间戳和Timer的接口。我们可以通过Context.timerService.registerProcessingTimeTimer或Context.timerService.registerEventTimeTimer这两个方法来注册Timer,只需要传入一个时间戳即可。我们可以通过Context.timerService.deleteProcessingTimeTimerContext.timerService.deleteEventTimeTimer来删除之前注册的Timer。此外,还可以从中获取当前的时间戳:Context.timerService.currentProcessingTimeContext.timerService.currentWatermark。从函数名看出,这里都是两两出现的函数,两个方法分别对应两种时间语义。

注意,我们只能在KeyedStream上注册Timer。每个Key下可以使用不同的时间戳注册不同的Timer,但是每个Key的每个时间戳只能注册一个Timer。如果想在一个DataStream上应用Timer,可以将所有数据映射到一个伪造的Key上,但这样所有数据会流入一个算子子任务。

我们再次以股票股票交易场景来解释如何使用Timer。一次股票交易包括:股票代号、时间戳、股票价格、成交量。我们现在想看一支股票10秒内是否一直连续上涨,如果一直上涨,则发送出一个提示。

case class StockPrice(symbol: String, ts: Long, price: Double, volume: Int)class IncreaseAlertFunction(intervalMills: Long)
extends KeyedProcessFunction[String, StockPrice, String] {// 状态:保存某支股票上次交易价格lazy val lastPrice: ValueState[Double] =getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastPrice", Types.of[Double]))// 状态:保存某支股票的定时器时间戳lazy val currentTimer: ValueState[Long] =getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer", Types.of[Long]))override def processElement(stock: StockPrice,context: KeyedProcessFunction[String, StockPrice, String]#Context,out: Collector[String]): Unit = {// 获取lastPrice状态中的数据,第一次使用时会被初始化为0val prevPrice = lastPrice.value()// 更新lastPricelastPrice.update(stock.price)val curTimerTimestamp = currentTimer.value()if (prevPrice == 0.0) {// 第一次使用,不做任何处理} else if (stock.price < prevPrice) {// 如果新流入的股票价格降低,删除Timer,否则该Timer一直保留context.timerService().deleteEventTimeTimer(curTimerTimestamp)currentTimer.clear()} else if (stock.price >= prevPrice && curTimerTimestamp == 0) {// 如果新流入的股票价格升高// curTimerTimestamp为0表示currentTimer状态中是空的,还没有对应的Timer// 新Timer = 当前时间 + intervalval timerTs = context.timestamp() + intervalMillsval formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")context.timerService().registerEventTimeTimer(timerTs)// 更新currentTimer状态,后续数据会读取currentTimer,做相关判断currentTimer.update(timerTs)}}override def onTimer(ts: Long,ctx: KeyedProcessFunction[String, StockPrice, String]#OnTimerContext,out: Collector[String]): Unit = {val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")out.collect("time: " + formatter.format(ts) + ", symbol: '" + ctx.getCurrentKey +" monotonically increased for " + intervalMills + " millisecond.")// 清空currentTimer状态currentTimer.clear()}
}

在主逻辑里,通过下面的process算子调用KeyedProcessFunction:

val inputStream: DataStream[StockPrice] = ...
val warnings = inputStream.keyBy(stock => stock.symbol)// 调用process函数.process(new IncreaseAlertFunction(10000))

Checkpoint时,Timer也会随其他状态数据一起保存起来。如果使用Processing Time语义设置一些Timer,重启时这个时间戳已经过期,那些回调函数会立刻被调用执行。

4. 侧输出SideOutput

ProcessFunction的另一大特色功能是可以将一部分数据发送到另外一个流中,而且输出到的两个流数据类型可以不一样,我们通过OutputTag[T]来标记另外一个数据流。在ProcessFunction中这样将某类数据过滤出来:

class IncreaseAlertFunction(intervalMills: Long) extends KeyedProcessFunction[String, Stock, String] {override def processElement(stock: Stock,context: KeyedProcessFunction[String, Stock, String]#Context,out: Collector[String]): Unit = {// 其他业务逻辑...// 定义一个OutputTag,Stock为这个SideOutput流的数据类型val highVolumeOutput: OutputTag[Stock] = new OutputTag[Stock]("high-volume-trade")if (stock.volume > 1000) {// 将Stock筛选出来发送到该OutputTag下context.output(highVolumeOutput, stock)}}
}

在主逻辑中,通过下面的方法获取侧输出:

// 收集SideOutput
val outputTag: OutputTag[Stock] = OutputTag[Stock]("high-volume-trade")
val sideOutputStream: DataStream[Stock] = mainStream.getSideOutput(outputTag)

从这个例子中可以看到,KeyedProcessFunction的输出类型是String,而SideOutput的输出类型是Stock,两者可以不同。

5.使用ProcessFunction实现Join

如果想从更细的粒度上实现两个数据流的Join,可以使用CoProcessFunction或KeyedCoProcessFunction。这两个函数都有processElement1和processElement2方法,分别对第一个数据流和第二个数据流的每个元素进行处理。两个数据流的数据类型以及输出类型可以互不相同。尽管数据来自两个不同的流,但是他们可以共享同样的状态,所以可以参考下面的逻辑来实现Join:

创建一到多个状态,两个数据流都能访问到这些状态,这里以状态a为例。

processElement1方法处理第一个数据流,更新状态a。
processElement2方法处理第二个数据流,根据状态a中的数据,生成相应的输出。

我们这次将股票价格结合媒体评价两个数据流一起讨论,假设对于某支股票有一个媒体评价数据流,这个数据流包含了对该支股票的正负评价。两支数据流一起流入KeyedCoProcessFunction,processElement2方法处理流入的媒体数据,将媒体评价更新到状态mediaState上,processElement1方法处理流入的股票交易数据,获取mediaState`状态,生成到新的数据流。两个方法分别处理两个数据流,共享一个状态,通过状态来通信。

在主逻辑中,我们将两个数据流connect,然后按照股票代号进行keyBy,进而使用process算子:

val stockPriceRawStream: DataStream[StockPrice] = ...
val mediaStatusStream: DataStream[Media] = ...
val warnings = stockStream.connect(mediaStream).keyBy(0, 0)// 调用process函数.process(new AlertProcessFunction())
KeyedCoProcessFunction的具体实现:class JoinStockMediaProcessFunction extends KeyedCoProcessFunction[String, StockPrice, Media, StockPrice] {// mediaStateprivate var mediaState: ValueState[String] = _override def open(parameters: Configuration): Unit = {// 从RuntimeContext中获取状态mediaState = getRuntimeContext.getState(new ValueStateDescriptor[String]("mediaStatusState", classOf[String]))}override def processElement1(stock: StockPrice,context: KeyedCoProcessFunction[String, StockPrice, Media, StockPrice]#Context,collector: Collector[StockPrice]): Unit = {val mediaStatus = mediaState.value()if (null != mediaStatus) {val newStock = stock.copy(mediaStatus = mediaStatus)collector.collect(newStock)}}override def processElement2(media: Media,context: KeyedCoProcessFunction[String, StockPrice, Media, StockPrice]#Context,collector: Collector[StockPrice]): Unit = {// 第二个流更新mediaStatemediaState.update(media.status)}}

这个例子比较简单,没有使用Timer,实际的业务场景中状态一般用到Timer将过期的状态清除。很多互联网APP的机器学习样本拼接都可能依赖这个函数来实现:服务端的机器学习特征是实时生成的,用户在APP上的行为是交互后产生的,两者属于两个不同的数据流,可以按照这个逻辑来将两个数据流拼接起来,通过拼接更快得到下一轮机器学习的样本数据。两个数据流的中间数据放在状态中,为避免状态的无限增长,需要使用Timer将过期的状态清除。

注意,使用Event Time时,两个数据流必须都设置好Watermark,只设置一个流的Event Time和Watermark,无法在CoProcessFunction和KeyedCoProcessFunction中使用Timer功能,因为process算子无法确定自己应该以怎样的时间来处理数据。

扩展阅读

Flink系列 11. 介绍Flink中 ProcessFunction 的使用

【Flink】ProcessFunction:Flink最底层API使用教程相关推荐

  1. 【Flink】Flink 如何 保证 同一个key对应的state是一个呢?KeyedProcessFunction

    1.概述 本文转载:Flink处理函数实战之一:深入了解ProcessFunction的状态(Flink-1.10) 作者:git 许多博客 并且做出自己的补充说明 相关其他博客:95-170-046 ...

  2. Flink教程(07)- Flink批流一体API(Transformation示例)

    文章目录 01 引言 02 Transformation 2.1 基本操作 2.1.1 API 解析 2.1.2 示例代码 2.2 合并 2.2.1 union 2.2.2 connect 2.2.3 ...

  3. Flink教程(10)- Flink批流一体API(其它)

    文章目录 01 引言 02 累加器 2.1 相关API 2.2 示例代码 03 广播变量 3.1 原理 3.2 示例代码 04 分布式缓存 4.1 原理 4.2 示例代码 05 文末 01 引言 在前 ...

  4. Flink教程(06)- Flink批流一体API(Source示例)

    文章目录 01 引言 02 Source 2.1 基于集合的Source 2.2 基于文件的Source 2.3 基于Socket的Source 2.4 自定义Source 2.4.1 案例 - 随机 ...

  5. Flink教程(09)- Flink批流一体API(Connectors示例)

    文章目录 01 引言 02 Connectors 2.1 Flink目前支持的Connectors 2.2 JDBC案例 2.3 Kafa案例 2.3.1 Kafa相关命令 2.3.2 Kafka C ...

  6. 11.Flink ProcessFunction介绍及KeyedProcessFunction实例

    Flink ProcessFunction介绍及KeyedProcessFunction实例 1. ProcessFunction简介 2. KeyedProcessFunction简单使用 2.1. ...

  7. python flink_如何在 Apache Flink 中使用 Python API?

    原标题:如何在 Apache Flink 中使用 Python API? 导读:本文重点为大家介绍 Flink Python API 的现状及未来规划,主要内容包括:Apache Flink Pyth ...

  8. 大话Flink之十一Table API 和 Flink SQL

    目录 Table API 和 Flink SQL 1 Table API 和 Flink SQL 是什么 2 基本程序结构 3 创建 TableEnvironment 4 表(Table) 4.1 创 ...

  9. Flink SQL Gateway REST Endpoint 使用教程

    介绍 SQL Gateway 是一种支持远程多个客户机并发执行 SQL 的服务.它提供了一种提交 Flink  Job.查找元数据和在线分析数据的简单方法.SQL Gateway 由可插拔 Endpo ...

最新文章

  1. VTK:可视化算法之HeadSlice
  2. OpenCV转换PyTorch分类模型并使用OpenCV Python启动
  3. reddit_如何将多个子Reddit与多个Reddit合并
  4. SSH:WARNING: REMOTE HOST IDENTIFICATION HAS CHANGED!
  5. 电子围栏判断_脉冲电子围栏和张力围栏之间的区别
  6. 软件项目管理 2.1.项目立项
  7. adb的环境搭建,下载和卸载app
  8. CVE-2017-0143(远程溢出)漏洞复现
  9. C:L1-061 新胖子公式 (10分)
  10. 如何修改docker容器的hostname
  11. JeecgBoot集成DataV组件库
  12. Office软件Android无法登录,office mobile(安卓版)无法使用
  13. 浏览器页面前端自适应方案
  14. 自监督学习(Self-Supervised Learning)
  15. 电脑安装android系统 锤子,锤子系统手机桌面
  16. 【Scratch-侦测模块】Scratch-碰到
  17. Qt发布版本退出时错误处理“The inferior stopped because it received a signal from the operating system.”
  18. 微信ios接入-Objc -all_load的坑
  19. 机器人视觉分析算法_机器视觉处理:目标检测和跟踪
  20. git push错误(fatal: The upstream branch of your current branch does not match)解决方案

热门文章

  1. BOSS直聘上线春雷行动 首周吸引上万名应届生参与活动
  2. 前置仓没活路?盒马mini要做行业终极目标还为时尚早
  3. 美团和滴滴会合并吗?投资人回应吃瓜群众的期待
  4. 腾讯安全发布十大产业互联网安全议题:聚焦5G、数据加密等
  5. 小米“小仙女”来了:强大美颜 女性专属的定制手机
  6. 5G牌照提前发放 将对整个产业界带来哪些影响?
  7. 是兄弟就来砍我!“贪玩蓝月”母公司实控人被捕:曾是中国最年轻富豪
  8. 《权力的游戏》第八季剧情翻水:超40万粉丝请愿重拍
  9. 人缘还挺好!史玉柱:被人网上恶搞 马云等十几人打电话来慰问我
  10. 串口的输出设置【原创】