PyFlink 教程:PyFlink DataStream API - state timer
简介:介绍如何在 Python DataStream API 中使用 state & timer 功能。
一、背景
Flink 1.13 已于近期正式发布,超过 200 名贡献者参与了 Flink 1.13 的开发,提交了超过 1000 个 commits,完成了若干重要功能。其中,PyFlink 模块在该版本中也新增了若干重要功能,比如支持了 state、自定义 window、row-based operation 等。随着这些功能的引入,PyFlink 功能已经日趋完善,用户可以使用 Python 语言完成绝大多数类型Flink作业的开发。接下来,我们详细介绍如何在 Python DataStream API 中使用 state & timer 功能。
二、state 功能介绍
作为流计算引擎,state 是 Flink 中最核心的功能之一。
- 在 1.12 中,Python DataStream API 尚不支持 state,用户使用 Python DataStream API 只能实现一些简单的、不需要使用 state 的应用;
- 而在 1.13 中,Python DataStream API 支持了此项重要功能。
state 使用示例
如下是一个简单的示例,说明如何在 Python DataStream API 作业中使用 state:
from pyflink.common import WatermarkStrategy, Row
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptorclass MyMapFunction(MapFunction):def open(self, runtime_context: RuntimeContext):state_desc = ValueStateDescriptor('cnt', Types.LONG())# 定义value stateself.cnt_state = runtime_context.get_state(state_desc)def map(self, value):cnt = self.cnt_state.value()if cnt is None:cnt = 0new_cnt = cnt + 1self.cnt_state.update(new_cnt)return value[0], new_cntdef state_access_demo():# 1. 创建 StreamExecutionEnvironmentenv = StreamExecutionEnvironment.get_execution_environment()# 2. 创建数据源seq_num_source = NumberSequenceSource(1, 100)ds = env.from_source(source=seq_num_source,watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),source_name='seq_num_source',type_info=Types.LONG())# 3. 定义执行逻辑ds = ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \.key_by(lambda a: a[0]) \.map(MyMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.LONG()]))# 4. 将打印结果数据ds.print()# 5. 执行作业env.execute()if __name__ == '__main__':state_access_demo()
在上面的例子中,我们定义了一个 MapFunction,该 MapFunction 中定义了一个名字为 “cnt_state” 的 ValueState,用于记录每一个 key 出现的次数。
说明:
- 除了 ValueState 之外,Python DataStream API 还支持 ListState、MapState、ReducingState,以及 AggregatingState;
- 定义 state 的 StateDescriptor 时,需要声明 state 中所存储的数据的类型(TypeInformation)。另外需要注意的是,当前 TypeInformation 字段并未被使用,默认使用 pickle 进行序列化,因此建议将 TypeInformation 字段定义为 Types.PICKLED_BYTE_ARRAY() 类型,与实际所使用的序列化器相匹配。这样的话,当后续版本支持使用 TypeInformation 之后,可以保持后向兼容性;
- state 除了可以在 KeyedStream 的 map 操作中使用,还可以在其它操作中使用;除此之外,还可以在连接流中使用 state,比如:
ds1 = ... # type DataStream
ds2 = ... # type DataStream
ds1.connect(ds2) \.key_by(key_selector1=lambda a: a[0], key_selector2=lambda a: a[0]) \.map(MyCoMapFunction()) # 可以在MyCoMapFunction中使用state
可以使用 state 的 API 列表如下:
操作 | 自定义函数 | |
---|---|---|
KeyedStream | map | MapFunction |
flat_map | FlatMapFunction | |
reduce | ReduceFunction | |
filter | FilterFunction | |
process | KeyedProcessFunction | |
ConnectedStreams | map | CoMapFunction |
flat_map | CoFlatMapFunction | |
process | KeyedCoProcessFunction | |
WindowedStream | apply | WindowFunction |
process | ProcessWindowFunction |
state 工作原理
上图是 PyFlink 中,state 工作原理的架构图。从图中我们可以看出,Python 自定义函数运行在 Python worker 进程中,而 state backend 运行在 JVM 进程中(由 Java 算子来管理)。当 Python 自定义函数需要访问 state 时,会通过远程调用的方式,访问 state backend。
我们知道,远程调用的开销是非常大的,为了提升 state 读写的性能,PyFlink 针对 state 读写做了以下几个方面的优化工作:
- Lazy Read:
对于包含多个 entry 的 state,比如 MapState,当遍历 state 时,state 数据并不会一次性全部读取到 Python worker 中,只有当真正需要访问时,才从 state backend 读取。
- Async Write:
当更新 state 时,更新后的 state,会先存储在 LRU cache 中,并不会同步地更新到远端的 state backend,这样做可以避免每次 state 更新操作都访问远端的 state backend;同时,针对同一个 key 的多次更新操作,可以合并执行,尽量避免无效的 state 更新。
- LRU cache:
在 Python worker 进程中维护了 state 读写的 cache。当读取某个 key 时,会先查看其是否已经被加载到读 cache 中;当更新某个 key 时,会先将其存放到写 cache 中。针对频繁读写的 key,LRU cache 可以避免每次读写操作,都访问远端的 state backend,对于有热点 key 的场景,可以极大提升 state 读写性能。
- Flush on Checkpoint:
为了保证 checkpoint 语义的正确性,当 Java 算子需要执行 checkpoint时,会将 Python worker中的写 cache 都 flush 回 state backend。
其中 LRU cache 可以细分为二级,如下图所示:
说明:
- 二级 cache 为 global cache,二级 cache 中的读 cache 中存储着当前 Python worker 进程中所有缓存的原始 state 数据(未反序列化);二级 cache 中的写 cache 中存储着当前 Python worker 进程中所有创建的 state 对象。
- 一级 cache 位于每一个 state 对象内,在 state 对象中缓存着该 state 对象已经从远端的 state backend 读取的 state 数据以及待更新回远端的 state backend 的 state 数据。
工作流程:
- 当在 Python UDF 中,创建一个 state 对象时,首先会查看当前 key 所对应的 state 对象是否已经存在(在二级 cache 中的 “Global Write Cache” 中查找),如果存在,则返回对应的 state 对象;如果不存在,则创建新的 state 对象,并存入 “Global Write Cache”;
- state 读取:当在 Python UDF 中,读取 state 对象时,如果待读取的 state 数据已经存在(一级 cache),比如对于 MapState,待读取的 map key/map value 已经存在,则直接返回对应的 map key/map value;否则,访问二级 cache,如果二级 cache 中也不存在待读取的 state 数据,则从远端的 state backend 读取;
- state 写入:当在 Python UDF 中,更新 state 对象时,先写到 state 对象内部的写 cache 中(一级 cache);当 state 对象中待写回 state backend 的 state 数据的大小超过指定阈值或者当遇到 checkpoint 时,将待写回的 state 数据写回远端的 state backend。
state 性能调优
通过前一节的介绍,我们知道 PyFlink 使用了多种优化手段,用于提升 state 读写的性能,这些优化行为可以通过以下参数配置:
配置 | 说明 |
---|---|
python.state.cache-size | Python worker 中读 cache 以及写 cache 的大小。(二级 cache)需要注意的是:读 cache、写 cache是独立的,当前不支持分别配置读 cache 以及写 cache 的大小。 |
python.map-state.iterate-response-batch-size | 当遍历 MapState 时,每次从 state backend 读取并返回给 Python worker 的 entry 的最大个数。 |
python.map-state.read-cache-size | 一个 MapState 的读 cache 中最大允许的 entry 个数(一级 cache)。当一个 MapState 中,读 cache 中的 entry 个数超过该阈值时,会通过 LRU 策略从读 cache 中删除最近最少访问过的 entry。 |
python.map-state.write-cache-size | 一个 MapState 的写 cache 中最大允许的待更新 entry 的个数(一级 cache)。当一个 MapState 中,写 cache 中待更新的 entry 的个数超过该阈值时,会将该 MapState 下所有待更新 state 数据写回远端的 state backend。 |
需要注意的是,state 读写的性能不仅取决于以上参数,还受其它因素的影响,比如:
- 输入数据中 key 的分布:
输入数据的 key 越分散,读 cache 命中的概率越低,则性能越差。
- Python UDF 中 state 读写次数:
state 读写可能涉及到读写远端的 state backend,应该尽量优化 Python UDF 的实现,减少不必要的 state 读写。
- checkpoint interval:
为了保证 checkpoint 语义的正确性,当遇到 checkpoint 时,Python worker 会将所有缓存的待更新 state 数据,写回 state backend。如果配置的 checkpoint interval 过小,则可能并不能有效减少 Python worker 写回 state backend 的数据量。
- bundle size / bundle time:
当前 Python 算子会将输入数据划分成多个批次,发送给 Python worker 执行。当一个批次的数据处理完之后,会强制将 Python worker 进程中的待更新 state 写回 state backend。与 checkpoint interval 类似,该行为也可能会影响 state 写性能。批次的大小可以通过 python.fn-execution.bundle.size 和 python.fn-execution.bundle.time 参数控制。
三、timer 功能介绍
timer 使用示例
除了 state 之外,用户还可以在 Python DataStream API 中使用定时器 timer。
import datetimefrom pyflink.common import Row, WatermarkStrategy
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.table import StreamTableEnvironmentclass CountWithTimeoutFunction(KeyedProcessFunction):def __init__(self):self.state = Nonedef open(self, runtime_context: RuntimeContext):self.state = runtime_context.get_state(ValueStateDescriptor("my_state", Types.ROW([Types.STRING(), Types.LONG(), Types.LONG()])))def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):# retrieve the current countcurrent = self.state.value()if current is None:current = Row(value.f1, 0, 0)# update the state's countcurrent[1] += 1# set the state's timestamp to the record's assigned event time timestampcurrent[2] = ctx.timestamp()# write the state backself.state.update(current)# schedule the next timer 60 seconds from the current event timectx.timer_service().register_event_time_timer(current[2] + 60000)def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):# get the state for the key that scheduled the timerresult = self.state.value()# check if this is an outdated timer or the latest timerif timestamp == result[2] + 60000:# emit the state on timeoutyield result[0], result[1]class MyTimestampAssigner(TimestampAssigner):def __init__(self):self.epoch = datetime.datetime.utcfromtimestamp(0)def extract_timestamp(self, value, record_timestamp) -> int:return int((value[0] - self.epoch).total_seconds() * 1000)if __name__ == '__main__':env = StreamExecutionEnvironment.get_execution_environment()t_env = StreamTableEnvironment.create(stream_execution_environment=env)t_env.execute_sql("""CREATE TABLE my_source (a TIMESTAMP(3),b VARCHAR,c VARCHAR) WITH ('connector' = 'datagen','rows-per-second' = '10')""")stream = t_env.to_append_stream(t_env.from_path('my_source'),Types.ROW([Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING()]))watermarked_stream = stream.assign_timestamps_and_watermarks(WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(MyTimestampAssigner()))# apply the process function onto a keyed streamwatermarked_stream.key_by(lambda value: value[1])\.process(CountWithTimeoutFunction()) \.print()env.execute()
在上述示例中,我们定义了一个 KeyedProcessFunction,该 KeyedProcessFunction 记录每一个 key 出现的次数,当一个 key 超过 60 秒没有更新时,会将该 key 以及其出现次数,发送到下游节点。
除了 event time timer 之外,用户还可以使用 processing time timer。
timer 工作原理
timer 的工作流程是这样的:
- 与 state 访问使用单独的通信信道不同,当用户注册 timer 之后,注册消息通过数据通道发送到 Java 算子;
- Java 算子收到 timer 注册消息之后,首先检查待注册 timer 的触发时间,如果已经超过当前时间,则直接触发;否则的话,将 timer 注册到 Java 算子的 timer service 中;
- 当 timer 触发之后,触发消息通过数据通道发送到 Python worker,Python worker 回调用户 Python UDF 中的的 on_timer 方法。
需要注意的是:由于 timer 注册消息以及触发消息通过数据通道异步地在 Java 算子以及 Python worker 之间传输,这会造成在某些场景下,timer 的触发可能没有那么及时。比如当用户注册了一个 processing time timer,当 timer 触发之后,触发消息通过数据通道传输到 Python UDF 时,可能已经是几秒中之后了。
四、总结
在这篇文章中,我们主要介绍了如何在 Python DataStream API 作业中使用 state & timer,state & timer 的工作原理以及如何进行性能调优。接下来,我们会继续推出 PyFlink 系列文章,帮助 PyFlink 用户深入了解 PyFlink 中各种功能、应用场景以及最佳实践等。
另外,阿里云实时计算生态团队长期招聘优秀大数据人才(包括实习 + 社招),我们的工作包括:
- 实时机器学习:支持机器学习场景下实时特征工程和 AI 引擎配合,基于 Apache Flink 及其生态打造实时机器学习的标准,推动例如搜索、推荐、广告、风控等场景的全面实时化;
- 大数据 + AI 一体化:包括编程语言一体化 (PyFlink 相关工作),执行引擎集成化 (TF on Flink),工作流及管理一体化(Flink AI Flow)。
如果你对开源、大数据或者 AI 感兴趣,请发简历到:fudian.fd@alibaba-inc.com
此外,也欢迎大家加入 “PyFlink交流群”,交流 PyFlink 相关的问题。
原文链接:https://developer.aliyun.com/article/784551?
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
PyFlink 教程:PyFlink DataStream API - state timer相关推荐
- PyFlink 教程(三):PyFlink DataStream API - state timer
一.背景 Flink 1.13 已于近期正式发布,超过 200 名贡献者参与了 Flink 1.13 的开发,提交了超过 1000 个 commits,完成了若干重要功能.其中,PyFlink 模块在 ...
- 大数据系列教程(4)Flink 使用 DataStream API 进行欺诈检测
目录 使用 DataStream API 进行欺诈检测 **版本1** 版本2 版本3 使用 DataStream API 进行欺诈检测 Apache Flink 提供了一个 DataStream A ...
- Flink基于 DataStream API 实现欺诈检测
目录 系列文章目录 文章目录 前言 一.Flink基于 DataStream API 实现欺诈检测 二.使用步骤 1.引入pom.xml 2.主类 3.欺诈逻辑判断类 4.运行结果: 总结 前言 在当 ...
- Flink DataStream API 中的多面手——Process Function详解
https://mp.weixin.qq.com/s/SOCAE-t25DPVlQMxuOT0jw 引言 在Flink的时间与watermarks详解这篇文章中,阐述了Flink的时间与水位线的相关内 ...
- flink DataStream API使用及原理
传统的大数据处理方式一般是批处理式的,也就是说,今天所收集的数据,我们明天再把今天收集到的数据算出来,以供大家使用,但是在很多情况下,数据的时效性对于业务的成败是非常关键的. Spark 和 Flin ...
- Flink 1.13,面向流批一体的运行时与 DataStream API 优化
简介:在 1.13 中,针对流批一体的目标,Flink 优化了大规模作业调度以及批执行模式下网络 Shuffle 的性能,以及在 DataStream API 方面完善有限流作业的退出语义. 本文由社 ...
- flink fi java_Flink DataStream API编程指南
Flink中的DataStream程序是实现数据流转换的常规程序(例如:filtering, updating state, defining windows, aggregating).数据流最初是 ...
- (十八)Flink Table API SQL 编程指南 Table API 和Datastream API 集成
文章目录 DataStream 和 Table 之间的转换 依赖项和导入 配置 执行行为 datastream API table API 批处理运行时模式 Changelog统一 处理(仅插入)流 ...
- Flink DataStream API(基础版)
概述 DataStream(数据流)本身是 Flink 中一个用来表示数据集合的类(Class),我们编写的 Flink 代码其实就是基于这种数据类型的处理,所以这套核心API 就以DataStr ...
最新文章
- java线程池拒绝策略_Java核心知识 多线程并发 线程池原理(二十三)
- 【网寻】mui - 点击事件
- 今晚解决了ActiveSync不能连接到手机的问题,电脑软件没有问题的情况
- 谷歌guava_Google Guava:您永远不会知道的5件事
- mysql 并行复制原理_MySQL 5.7 并行复制实现原理与调优
- 国产激光雷达厂商禾赛科技B轮融资2.5亿元,光速中国和百度领投
- watch解放你的双手
- iZotope系列音频软件如何卸载操作指南
- Xshel和Xftp免费版
- xmind服务器维护,如何使用XMind组织您的待办事项?
- 实况足球2015pc版
- 制作一个美团饿了么外卖的cps小程序
- 泊松分布、二项分布与正态分布
- 微分几何为何必然兴起?
- ESP8266+1.3“ or 0.96“ OLED两个太空人动画(胖子和瘦子)
- 【LeetCode】1056. Confusing Number 解题报告(C++)
- visio画图复制粘贴到word_VISO复制到WORD中全是空白框
- 手机录像出现arn无响应
- MongoDB中shell基本使用
- PPT图片别再直接插入,这样处理一下,让你的PPT秒变高逼格
热门文章
- Unknown database ‘xxx‘
- 详谈4大主流CPU处理器技术架构
- win7-64+usb安装
- 电感RDC、IDC值是什么意思?或DCR DCI
- 【Go资料】go语言学习资料书籍
- 臀部大的美女最令男人着迷
- 狂神说Spring讲解第19动态代理中错误java: 不兼容的类型: com.Orac.kuang.Host无法转换为com.kuang.demo3.Rent
- 千亿级平台技术架构:为了支撑高并发,我把身份证存到了JS里
- kettle 常用输出(插入更新、表输出、执行 SQL 脚本)
- 2010年度CSDN十大资讯 (编辑推荐)