Flink ProcessFunction介绍及KeyedProcessFunction实例

  • 1. ProcessFunction简介
  • 2. KeyedProcessFunction简单使用
    • 2.1. [Java版本](https://github.com/fanjianhai/flink_project_maven_repository.git)
    • 2.2. [Scala版本](https://github.com/fanjianhai/flink_project_sbt_repository.git)(`代码非常简洁`)
  • 3. 参考链接
  • 4. 寄语:心静一切皆美,情深万象皆深

1. ProcessFunction简介

  • 转换算子是无法访问事件的时间戳信息和水位线信息的。而这在一些应用场景下,极为重要。例如我们常用的MapFunction转换算子就无法访问时间戳或者当前事件的事件时间。
  • 基于此,DataStream API提供了一系列的Low-Level转换算子。可以访问时间戳watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。
  • Process Function用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。例如,Flink SQL就是使用Process Function实现的。
  • Flink提供了8个Process Function:
    • ProcessFunction dataStream
    • KeyedProcessFunction 用于KeyedStream,keyBy之后的流处理
    • CoProcessFunction 用于connect连接的流
    • ProcessJoinFunction 用于join流操作
    • BroadcastProcessFunction 用于广播
    • KeyedBroadcastProcessFunction keyBy之后的广播
    • ProcessWindowFunction 窗口增量聚合
    • ProcessAllWindowFunction 全窗口聚合

2. KeyedProcessFunction简单使用

2.1. Java版本

  • CountWithTimestamp.java

    package com.xiaofan.flinkstudy.keyedprocessfunction;/***@author   xiaofan*@email    594042358@qq.com*@date     2020/7/7 16:34*@description  实体类,保存在key状态中*/
    public class CountWithTimestamp {public String key;public long count;public long lastModified;
    }
  • Splitter.java

    package com.xiaofan.flinkstudy;import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.StringUtils;/***@author   xiaofan*@email    594042358@qq.com*@date     2020/7/7 16:33*@description  通用的FlatMap操作Function*/
    public class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {if(StringUtils.isNullOrWhitespaceOnly(s)) {System.out.println("invalid line");return;}for(String word : s.split(" ")) {collector.collect(new Tuple2<String, Integer>(word, 1));}}
    }
  • ProcessTime.java

    package com.xiaofan.flinkstudy.keyedprocessfunction;import com.xiaofan.flinkstudy.Splitter;
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.streaming.api.watermark.Watermark;
    import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;
    import java.util.Date;/***@author   xiaofan*@email    594042358@qq.com*@date     2020/7/7 16:36*@description  体验KeyedProcessFunction类(时间类型是处理时间)*/
    public class ProcessTime {/*** KeyedProcessFunction的子类,作用是将每个单词最新出现时间记录到backend,并创建定时器,* 定时器触发的时候,检查这个单词距离上次出现是否已经达到10秒,如果是,就发射给下游算子*/static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {// 自定义状态private ValueState<CountWithTimestamp> state;@Overridepublic void open(Configuration parameters) throws Exception {// 初始化状态,name是myStatestate = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));}@Overridepublic void processElement(Tuple2<String, Integer> value,Context ctx,Collector<Tuple2<String, Long>> out) throws Exception {// 取得当前是哪个单词Tuple currentKey = ctx.getCurrentKey();// 从backend取得当前单词的myState状态CountWithTimestamp current = state.value();// 如果myState还从未没有赋值过,就在此初始化if (current == null) {current = new CountWithTimestamp();current.key = value.f0;}// 单词数量加一current.count++;// 取当前元素的时间戳,作为该单词最后一次出现的时间current.lastModified = ctx.timestamp();// 重新保存到backend,包括该单词出现的次数,以及最后一次出现的时间state.update(current);// 为当前单词创建定时器,十秒后后触发long timer = current.lastModified + 10000;ctx.timerService().registerProcessingTimeTimer(timer);// 打印所有信息,用于核对数据正确性System.out.println(String.format("process, %s, %d, lastModified : %d (%s), timer : %d (%s)\n\n",currentKey.getField(0),current.count,current.lastModified,time(current.lastModified),timer,time(timer)));}/*** 定时器触发后执行的方法* @param timestamp 这个时间戳代表的是该定时器的触发时间* @param ctx* @param out* @throws Exception*/@Overridepublic void onTimer(long timestamp,OnTimerContext ctx,Collector<Tuple2<String, Long>> out) throws Exception {// 取得当前单词Tuple currentKey = ctx.getCurrentKey();// 取得该单词的myState状态CountWithTimestamp result = state.value();// 当前元素是否已经连续10秒未出现的标志boolean isTimeout = false;// timestamp是定时器触发时间,如果等于最后一次更新时间+10秒,就表示这十秒内已经收到过该单词了,// 这种连续十秒没有出现的元素,被发送到下游算子if (timestamp == result.lastModified + 10000) {// 发送out.collect(new Tuple2<String, Long>(result.key, result.count));isTimeout = true;}// 打印数据,用于核对是否符合预期System.out.println(String.format("ontimer, %s, %d, lastModified : %d (%s), stamp : %d (%s), isTimeout : %s\n\n",currentKey.getField(0),result.count,result.lastModified,time(result.lastModified),timestamp,time(timestamp),isTimeout));}}public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 并行度1env.setParallelism(1);// 处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 监听本地9999端口,读取字符串DataStream<String> socketDataStream = env.socketTextStream("192.168.1.27", 9999);// 所有输入的单词,如果超过10秒没有再次出现,都可以通过CountWithTimeoutFunction得到DataStream<Tuple2<String, Long>> timeOutWord = socketDataStream// 对收到的字符串用空格做分割,得到多个单词.flatMap(new Splitter())// 设置时间戳分配器,用当前时间作为时间戳.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {@Overridepublic long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {// 使用当前系统时间作为时间戳return System.currentTimeMillis();}@Overridepublic Watermark getCurrentWatermark() {// 本例不需要watermark,返回nullreturn null;}})// 将单词作为key分区.keyBy(0)// 按单词分区后的数据,交给自定义KeyedProcessFunction处理.process(new CountWithTimeoutFunction());// 所有输入的单词,如果超过10秒没有再次出现,就在此打印出来timeOutWord.print();env.execute("ProcessFunction demo : KeyedProcessFunction");}public static String time(long timeStamp) {return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));}
    }
  • 测试结果

2.2. Scala版本(代码非常简洁

  • 测试结果

    package com.xiaofan.sbtimport org.apache.commons.lang3.time.FastDateFormat
    import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, KeyedProcessFunction}
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
    import org.apache.flink.streaming.api.watermark.Watermark
    import org.apache.flink.util.Collectorcase class CountWithTimestamp(key: String, count: Long, lastModified: Long)class CountWithTimeoutFunction extends KeyedProcessFunction[String, (String, Int), (String, Long)] {lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))override def processElement(value: (String, Int), ctx: KeyedProcessFunction[String, (String, Int), (String, Long)]#Context, out: Collector[(String, Long)]): Unit = {val current: CountWithTimestamp = state.value match {case null => CountWithTimestamp(value._1, 1, ctx.timestamp)case CountWithTimestamp(key, count, lastModified) => CountWithTimestamp(key, count + 1, ctx.timestamp)}state.update(current)val timer = current.lastModified + 10000ctx.timerService().registerProcessingTimeTimer(timer)println(s"process ==> key: ${ctx.getCurrentKey}, count: ${current.count}, lastModified: ${current.lastModified}(${ProcessTime.sdf.format(current.lastModified)}), timer: $timer(${ProcessTime.sdf.format(timer)})")}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, (String, Int), (String, Long)]#OnTimerContext, out: Collector[(String, Long)]): Unit = {var isTimeout: Boolean = falseval value: CountWithTimestamp = state.valuevalue match {case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 10000) =>out.collect((key, count))isTimeout = truecase _ =>}println(s"ontimer ==> key: ${ctx.getCurrentKey}, count: ${value.count}, lastModified: ${value.lastModified}(${ProcessTime.sdf.format(value.lastModified)}), stamp: $timestamp(${ProcessTime.sdf.format(timestamp)}) isTimeout: ${isTimeout}")}
    }object ProcessTime {// 线程安全的时间格式化对象val sdf: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)val socketDataStream: DataStream[String] = env.socketTextStream("192.168.1.27", 9999)val value: DataStream[(String, Int)] = socketDataStream.flatMap(_.split(" ")).map((_, 1))socketDataStream.flatMap(_.split(" ")).map((_, 1)).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Int)] {override def getCurrentWatermark: Watermark = nulloverride def extractTimestamp(element: (String, Int), previousElementTimestamp: Long): Long = System.currentTimeMillis()}).keyBy(_._1).process(new CountWithTimeoutFunction).print()env.execute("ProcessFunction demo : KeyedProcessFunction")}
    }
  • 测试结果

3. 参考链接

  • Flink处理函数实战之一:ProcessFunction类
  • flink底层函数与计时器
  • 从源码看项目中flink processfunction调用过程
  • flink onTimer定时器实现定时需求

4. 寄语:心静一切皆美,情深万象皆深

11.Flink ProcessFunction介绍及KeyedProcessFunction实例相关推荐

  1. Flink ProcessFunction 介绍使用

    目录 实现功能 代码 测试 问题 官网描述:https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/stream/ope ...

  2. 1.18.3.Flink Catalog介绍、Catalog 定义、Catalog 的实现、Catalog 使用举例

    1.18.3.Flink Catalog介绍 1.18.3.1.引言 1.18.3.2.Catalog 定义 1.18.3.3.Catalog 的实现 1.18.3.4.Catalog 使用举例 1. ...

  3. (三)AJAX基本介绍和简单实例03

    (三)AJAX基本介绍和简单实例03-----Ajax与数据库的动态应用 前台显示界面: 选择所有客户之后: 选择其中一个客户---杜森: Demo03.html代码 <html> < ...

  4. 《从0到1学习Flink》—— 介绍Flink中的Stream Windows

    前言 目前有许多数据分析的场景从批处理到流处理的演变, 虽然可以将批处理作为流处理的特殊情况来处理,但是分析无穷集的流数据通常需要思维方式的转变并且具有其自己的术语(例如,"windowin ...

  5. 【Android数据存储】ContentProvider详细介绍(附实例源码)

    1.ContentProvider是什么? ContentProvider--内容提供者.它是一个类,这个类主要是对Android系统中进行共享的数据进行包装,并提供了一组统一的访问接口供其他程序调用 ...

  6. 【Visual C++】游戏开发笔记二十七 Direct3D 11入门级知识介绍

    游戏开发笔记二十七 Direct3D 11入门级知识介绍 作者:毛星云    邮箱: happylifemxy@163.com    期待着与志同道合的朋友们相互交流 上一节里我们介绍了在迈入Dire ...

  7. Java XML解析工具 dom4j介绍及使用实例

    Java XML解析工具 dom4j介绍及使用实例 dom4j介绍 dom4j的项目地址:http://sourceforge.net/projects/dom4j/?source=directory ...

  8. C++11 unordered_map详细介绍

    整理的算法模板合集: ACM模板 目录: 1.介绍 1.1 特性 2. 模版 2.1 迭代器 3. 功能函数 3.1 构造函数 3.2 容量操作 3.2.1 size 3.2.2 empty 3.3 ...

  9. php正则运用,php中常用的正则表达式的介绍及应用实例代码

    更全面的实例,可以参考 最常用的PHP正则表达式收集整理 //www.jb51.net/article/14049.htm php 正则表达式小结 //www.jb51.net/article/198 ...

最新文章

  1. iOS base64 MD5
  2. MATLAB实现数字识别系统,基于人工神经网络的MATLAB手写数字识别系统
  3. RocketMQ简介及核心概念说明
  4. 又跌!6月全国程序员工资新统计,太扎心
  5. 系统架构设计师 - 面向服务架构 SOA
  6. C#下载文件和将文件转换为数据流下载的示例
  7. Echarts横向的柱状图
  8. Java编写敏感词过滤程序
  9. 房地产大数据管理系统——房地产大数据融合平台
  10. 倍福触摸屏维修操作面板维修CP7032-1031-0010故障分析
  11. 让Win7系统下的硬盘不在狂闪的诀窍【mfxp】
  12. 数字签名、数字信封、数字证书
  13. [14] 胜利大逃亡
  14. TP、FP、TN、FN傻傻分不清楚
  15. win 通过 Distro 安装 linux 子系统
  16. (ROC-RK3568-PC) 裸机19_VOP2和IEP笔记
  17. 背景颜色渐变 css3 ---- 转自:至尊宝的BLOG http://blog.sina.com.cn/zzbnie
  18. 美格智能5G模组助力电力巡检之无人机产品智能化高效运作
  19. 事件循环EventLoop机制
  20. 基于python的大数据分析-数据处理(代码实战)

热门文章

  1. 【附源码】Java计算机毕业设计基于篮球云网站(程序+LW+部署)
  2. 华为鸿蒙第一批,正式确认了!华为鸿蒙系统第一批升级名单出炉,果粉:华为不厚道...
  3. 杀死linux进程的N种方法
  4. JavaScript基础学习之对象
  5. 指令周期、时钟周期、总线周期概念辨析
  6. 【友盟+】O2O报告:租用车类App最受上海网民追捧!
  7. 毕业论文查重重复率不达标这样做
  8. 安卓学习 Day24:ORMLite框架
  9. 百度云服务器有哪些优势?
  10. IDEA 编译 .jar (可能涉及问题)