https://mp.weixin.qq.com/s/SOCAE-t25DPVlQMxuOT0jw

引言

在Flink的时间与watermarks详解这篇文章中,阐述了Flink的时间与水位线的相关内容。你可能不禁要发问,该如何访问时间戳和水位线呢?首先通过普通的DataStream API是无法访问的,需要借助Flink提供的一个底层的API——Process  Function。Process Function不仅能够访问时间戳与水位线,而且还可以注册在将来的某个特定时间触发的计时器(timers)。除此之外,还可以将数据通过Side Outputs发送到多个输出流中。这样以来,可以实现数据分流的功能,同时也是处理迟到数据的一种方式。下面我们将从源码入手,结合具体的使用案例来说明该如何使用Process  Function。

简介

Flink提供了很多Process Function,每种Process Function都有各自的功能,这些Process Function主要包括:

  • ProcessFunction

  • KeyedProcessFunction

  • CoProcessFunction

  • ProcessJoinFunction

  • ProcessWindowFunction

  • ProcessAllWindowFunction

  • BaseBroadcastProcessFunction

    • KeyedBroadcastProcessFunction

  • BroadcastProcessFunction

继承关系图如下:

从上面的继承关系中可以看出,都实现了RichFunction接口,所以支持使用open()close()getRuntimeContext()等方法的调用。从名字上可以看出,这些函数都有不同的适用场景,但是基本的功能是类似的,下面会以KeyedProcessFunction为例来讨论这些函数的通用功能。

源码

KeyedProcessFunction

/** * 处理KeyedStream流的低级API函数 * 对于输入流中的每个元素都会触发调用processElement方法.该方法会产生0个或多个输出. * 其实现类可以通过Context访问数据的时间戳和计时器(timers).当计时器(timers)触发时,会回调onTimer方法. * onTimer方法会产生0个或者多个输出,并且会注册一个未来的计时器. * * 注意:如果要访问keyed state和计时器(timers),必须在KeyedStream上使用KeyedProcessFunction. * 另外,KeyedProcessFunction的父类AbstractRichFunction实现了RichFunction接口,所以,可以使用 * open(),close()及getRuntimeContext()方法. * * @param <K> key的类型 * @param <I> 输入元素的数据类型 * @param <O> 输出元素的数据类型 */@PublicEvolvingpublic abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
  private static final long serialVersionUID = 1L;  /**   * 处理输入流中的每个元素   * 该方法会输出0个或者多个输出,类似于FlatMap的功能   * 除此之外,该方法还可以更新内部状态或者设置计时器(timer)   * @param value 输入元素   * @param ctx  Context,可以访问输入元素的时间戳,并其可以获取一个时间服务器(TimerService),用于注册计时器(timers)并查询时间   *  Context只有在processElement被调用期间有效.   * @param out  返回的结果值   * @throws Exception   */  public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
  /**   * 是一个回调函数,当在TimerService中注册的计时器(timers)被触发时,会回调该函数   * @param timestamp 触发计时器(timers)的时间戳   * @param ctx  OnTimerContext,允许访问时间戳,TimeDomain枚举类提供了两种时间类型:   * EVENT_TIME与PROCESSING_TIME   * 并其可以获取一个时间服务器(TimerService),用于注册计时器(timers)并查询时间   * OnTimerContext只有在onTimer方法被调用期间有效   * @param out 结果输出   * @throws Exception   */  public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}  /**   * 仅仅在processElement()方法或者onTimer方法被调用期间有效   */  public abstract class Context {
    /**     * 当前被处理元素的时间戳,或者是触发计时器(timers)时的时间戳     * 该值可能为null,比如当程序中设置的时间语义为:TimeCharacteristic#ProcessingTime     * @return     */    public abstract Long timestamp();
    /**     * 访问时间和注册的计时器(timers)     * @return     */    public abstract TimerService timerService();
    /**     * 将元素输出到side output (侧输出)     * @param outputTag 侧输出的标记     * @param value 输出的记录     * @param <X>     */    public abstract <X> void output(OutputTag<X> outputTag, X value);    /**     * 获取被处理元素的key     * @return     */    public abstract K getCurrentKey();  }  /**   * 当onTimer方法被调用时,才可以使用OnTimerContext   */  public abstract class OnTimerContext extends Context {    /**     * 触发计时器(timers)的时间类型,包括两种:EVENT_TIME与PROCESSING_TIME     * @return     */    public abstract TimeDomain timeDomain();    /**     * 获取触发计时器(timer)元素的key     * @return     */    @Override    public abstract K getCurrentKey();  }}

上面的源码中,主要有两个方法,分析如下:

  • processElement(I value, Context ctx, Collector<O> out)

该方法会对流中的每条记录都调用一次,输出0个或者多个元素,类似于FlatMap的功能,通过Collector将结果发出。除此之外,该函数有一个Context 参数,用户可以通过Context 访问时间戳、当前记录的key值以及TimerService(关于TimerService,下面会详细解释)。另外还可以使用output方法将数据发送到side output,实现分流或者处理迟到数据的功能。

  • onTimer(long timestamp, OnTimerContext ctx, Collector<O> out)

该方法是一个回调函数,当在TimerService中注册的计时器(timers)被触发时,会回调该函数。其中@param timestamp参数表示触发计时器(timers)的时间戳,Collector可以将记录发出。细心的你可能会发现,这两个方法都有一个上下文参数,上面的方法传递的是Context 参数,onTimer方法传递的是OnTimerContext参数,这两个参数对象可以实现相似的功能。OnTimerContext还可以返回触发计时器的时间域(EVENT_TIME与PROCESSING_TIME)。

TimerService

在KeyedProcessFunction源码中,使用TimerService来访问时间和计时器,下面来看一下源码:

@PublicEvolvingpublic interface TimerService {  String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";  String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";  // 返回当前的处理时间  long currentProcessingTime();  // 返回当前event-time水位线(watermark)  long currentWatermark();
  /**   * 注册一个计时器(timers),当processing time的时间等于该计时器时钟时会被调用   * @param time   */  void registerProcessingTimeTimer(long time);
  /**   * 注册一个计时器(timers),当event time的水位线(watermark)到达该时间时会被触发   * @param time   */  void registerEventTimeTimer(long time);
  /**   * 根据给定的触发时间(trigger time)来删除processing-time计时器   * 如果这个timer不存在,那么该方法不会起作用,   * 即该计时器(timer)之前已经被注册了,并且没有过时   *   * @param time   */  void deleteProcessingTimeTimer(long time);      /**   * 根据给定的触发时间(trigger time)来删除event-time 计时器   * 如果这个timer不存在,那么该方法不会起作用,   *   即该计时器(timer)之前已经被注册了,并且没有过时   * @param time   */  void deleteEventTimeTimer(long time);}

TimerService提供了以下几种方法:

  • currentProcessingTime()

返回当前的处理时间

  • currentWatermark()

返回当前event-time水位线(watermark)时间戳

  • registerProcessingTimeTimer(long time)

针对当前key,注册一个processing time计时器(timers),当processing time的时间等于该计时器时钟时会被调用

  • registerEventTimeTimer(long time)

针对当前key,注册一个event time计时器(timers),当水位线时间戳大于等于该计时器时钟时会被调用

  • deleteProcessingTimeTimer(long time)

针对当前key,删除一个之前注册过的processing time计时器(timers),如果这个timer不存在,那么该方法不会起作用

  • deleteEventTimeTimer(long time)

针对当前key,删除一个之前注册过的event time计时器(timers),如果这个timer不存在,那么该方法不会起作用

当计时器触发时,会回调onTimer()函数,系统对于ProcessElement()方法和onTimer()方法的调用是同步的

注意:上面的源码中有两个Error 信息,这就说明计时器只能在keyed streams上使用,常见的用途是在某些key值不在使用后清除keyed state,或者实现一些基于时间的自定义窗口逻辑。如果要在一个非KeyedStream上使用计时器,可以使用KeySelector返回一个固定的分区值(比如返回一个常数),这样所有的数据只会发送到一个分区。

使用案例

下面将使用Process Function的side output功能进行分流处理,具体代码如下:

public class ProcessFunctionExample {
    // 定义side output标签    static final OutputTag<UserBehaviors> buyTags = new OutputTag<UserBehaviors>("buy") {    };    static final OutputTag<UserBehaviors> cartTags = new OutputTag<UserBehaviors>("cart") {    };    static final OutputTag<UserBehaviors> favTags = new OutputTag<UserBehaviors>("fav") {    };    static class SplitStreamFunction extends ProcessFunction<UserBehaviors, UserBehaviors> {
        @Override        public void processElement(UserBehaviors value, Context ctx, Collector<UserBehaviors> out) throws Exception {            switch (value.behavior) {                case "buy":                    ctx.output(buyTags, value);                    break;                case "cart":                    ctx.output(cartTags, value);                    break;                case "fav":                    ctx.output(favTags, value);                    break;                default:                    out.collect(value);           }        }    }    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        // 模拟数据源[userId,behavior,product]        SingleOutputStreamOperator<UserBehaviors> splitStream = env.fromElements(                new UserBehaviors(1L, "buy", "iphone"),                new UserBehaviors(1L, "cart", "huawei"),                new UserBehaviors(1L, "buy", "logi"),                new UserBehaviors(1L, "fav", "oppo"),                new UserBehaviors(2L, "buy", "huawei"),                new UserBehaviors(2L, "buy", "onemore"),                new UserBehaviors(2L, "fav", "iphone")).process(new SplitStreamFunction());
        //获取分流之后购买行为的数据        splitStream.getSideOutput(buyTags).print("data_buy");        //获取分流之后加购行为的数据        splitStream.getSideOutput(cartTags).print("data_cart");        //获取分流之后收藏行为的数据        splitStream.getSideOutput(favTags).print("data_fav");
        env.execute("ProcessFunctionExample");    }}

总结

本文首先介绍了Flink提供的几种底层Process Function API,这些API可以访问时间戳和水位线,同时支持注册一个计时器,进行调用回调函数onTimer()。接着从源码的角度解读了这些API的共同部分,详细解释了每个方法的具体含义和使用方式。最后,给出了一个Process Function常见使用场景案例,使用其实现分流处理。除此之外,用户还可以使用这些函数,通过注册计时器,在回调函数中定义处理逻辑,使用非常的灵活。

Flink DataStream API 中的多面手——Process Function详解相关推荐

  1. 机器学习(十九)——PageRank算法, KNN, loss function详解

    http://antkillerfarm.github.io/ PageRank算法 概述 在PageRank提出之前,已经有研究者提出利用网页的入链数量来进行链接分析计算,这种入链方法假设一个网页的 ...

  2. flink DataStream API使用及原理

    传统的大数据处理方式一般是批处理式的,也就是说,今天所收集的数据,我们明天再把今天收集到的数据算出来,以供大家使用,但是在很多情况下,数据的时效性对于业务的成败是非常关键的. Spark 和 Flin ...

  3. Flink DataStream API 介绍

    Flink DataStream API 介绍 StreamExecutionEnvironment #mermaid-svg-JKeWa22W2vWA4zBS {font-family:" ...

  4. 【基础】Flink -- DataStream API

    Flink -- DataStream API 执行环境 Execution Environment 创建执行环境 设置执行模式 触发程序执行 源算子 Source 从集合中读取数据 从文件读取数据 ...

  5. jQuery中getJSON跨域原理详解

    详见:http://blog.yemou.net/article/query/info/tytfjhfascvhzxcytp28 jQuery中getJSON跨域原理详解 前几天我再开发一个叫 河蟹工 ...

  6. c语言二级指针有什么作用,C语言中二级指针的实例详解

    C语言中二级指针的实例详解 C语言中二级指针的实例详解 用图说明 示例代码: #include int main(int argc, const char * argv[]) { // int a = ...

  7. iOS中的HotFix方案总结详解

    iOS中的HotFix方案总结详解 相信HotFix大家应该都很熟悉了,今天主要对于最近调研的一些方案做一些总结.iOS中的HotFix方案大致可以分为四种: WaxPatch(Alibaba) Dy ...

  8. python中的subprocess.Popen()使用详解---以及注意的问题(死锁)

    从python2.4版本开始,可以用subprocess这个模块来产生子进程,并连接到子进程的标准输入/输出/错误中去,还可以得到子进程的返回值. subprocess意在替代其他几个老的模块或者函数 ...

  9. 前端后台以及游戏中使用Google Protocol Buffer详解

    前端后台以及游戏中使用Google Protocol Buffer详解 前端后台以及游戏中使用Google Protocol Buffer详解 0.什么是protoBuf 1.下载protobuf的编 ...

最新文章

  1. 为什么大公司还在采用过时的技术?
  2. 如何使用pip升级所有Python软件包?
  3. 我是如何阅读编程书的
  4. vue-cli eslint 规则
  5. 逻辑回归模型_联邦学习体系下——逻辑回归模型
  6. sync不生效 vue_Vue实战项目-记账器-重要知识点汇总
  7. 诺基亚AirScale支持低频段和高频段5G服务 确保运营商投资收入
  8. 定时任务执行利器Timer和ScheduledThreadPoolExecutor使用
  9. 分布式搜索引擎ElasticSearch(四) -- 插件使用
  10. 一张图学会python-一张图 python
  11. AI智能人脸识别很难?30行Python代码完美打造
  12. Linux中的cp命令老九门
  13. 系统集成项目管理工程师目录
  14. Android(安卓)手机变砖复活的三种恢复方法
  15. 深入Guerrilla Games解密次世代开山大作《杀戮地带暗影坠落》(The technology of Killzone Shadow Fall)
  16. spring data jpa 使用@Query 不确定参数查询
  17. 政府采购和招标投标常见问题:评标的依据是什么?
  18. Linux常用指令(5)——20.4.25
  19. Microsoft server2008的sql server身份验证出现18456错误
  20. 统计学知识大梳理(三)

热门文章

  1. linux下nginx安装与配置说明
  2. 【贼好理解!!】C++ list链表常用成员函数讲解
  3. Vue3 之 Pinia - 状态管理
  4. 在Linux服务器部署Halo博客系统及配置HTTPS
  5. 列表查询java_查询订单列表示例代码
  6. 谷歌三大论文之the Google File System
  7. python实现图片文字提取,准确率高达99%,强无敌!!!
  8. 网易严选风控实践(上)-打造现代化的风控体系
  9. Android Studio 连接网易MuMu模拟器教程
  10. [chat-GPT]解决OpenAI‘s services are not available in your country问题