Flink state使用
简介
Flink相比其他流计算引擎,最大的优势就是号称是有状态的流计算。可见state在Flink中极其重要的位置。数据流是由一个个单独的事件按时间序列组合成的,虽然数据流中的许多操作一次只查看一个单独的事件(例如事件解析器,即不关注状态,不需要过往信息),但有些操作会跨多个事件记住信息(例如窗口操作符)。这些操作称为有状态操作。
下面是一些有状态的操作的使用场景:
1)对一个时间窗口内的数据进行聚合分析
2)在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数
3)数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重等
除了这些有用的应用场景外,state也是Flink使用checkpoints和savepoints实现容错的关键。
现在Flink正在慢慢实现让用户在运行时从Flink外部访问state,当然在发展中,可能api等都会改变,目前不是很稳定,不过未来应该是个不错的功能。
state的分类
state主要分为两类:Keyed State和Operator State
Keyed State
Keyed State只能用在KeyedStream上,所以要先形成KeyedStream(使用stream.keyBy(…))。
Flink的数据模型不是基于键值对的。因此,不需要将数据集类型物理地打包到键和值中。键是“虚拟的”:它们被定义为在实际数据上的函数,以指导分组操作符。
对于Keyed State,Flink提供了几种现成的数据结构供我们使用:ValueState<T>、ListState<T>、ReducingState<T>、AggregatingState<IN, OUT>、MapState<UK, UV>。要注意理解,上面的5种state类型都是表示stream keyBy 的 key的value的state类型。为了与 keyBy 的 key 进行区分,所以 Flink 中把 MapState 的 key、value 分别叫 UserKey、UserValue。
ValueState<T>:存储单一的值,即每个key只有一个值
ListState<T>:存储一个list,即每个key有一个list值
MapState<UK, UV>:存储一个map,即每个key有一个map值
ReducingState<T>和AggregatingState<IN, OUT>与ListState<T>同属于MergingState<T>。与ListState<T>不同的是,ReducingState<T>只有一个元素,而不是一个列表。它的原理是新元素通过add(T)加入后,与已有的状态元素使用ReduceFunction合并为一个元素,并更新到状态里。AggregatingState<IN, OUT>与ReducingState<T>类似,也只有一个元素,只不过AggregatingState<IN, OUT>的输入和输出类型可以不一样。ReducingState<T>和AggregatingState<IN, OUT>与窗口上进行ReduceFunction和AggregateFunction很像,都是将新元素与已有元素做聚合。
因为本身支持这么多类型的,所以不要用ValueState<T>去存list或者map这种数据类型,直接使用ListState和MapState效率会高很多。
State是通过RuntimeContext类获取的,所以使用State的地方就是rich functions,即实现RichFunction或其子接口,就可以获取State。在里面我们就可以通过StateTtlConfig设置State的TTL等。比如:
public class TTLCountMapFunction extends RichMapFunction<Tuple2<String, Long>, Tuple2<String, Long>> {
private transient ValueState<Long> state;
...
public void open(Configuration parameters) throws Exception {
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(600))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<>("count-state", Long.class);
stateDescriptor.enableTimeToLive(ttlConfig);
state = getRuntimeContext().getState(stateDescriptor);
}
}
StateTtlConfig相关的设置可以查看官网链接。即datastream api中是通过StateTtlConfig设置,而在sql中是通过table.exec.state.ttl配置,默认值是0,表示状态永不过期。在table api中是StreamTableEnvironment.getConfig.setIdleStateRetention。
Flink 中 State 支持设置 TTL,TTL 只是将时间戳与 userValue 封装起来。
· MapState 的 TTL 是基于 UK 级别的
· ValueState 的 TTL 是基于整个 key 的
Operator State(non-keyed state)
Operator State是绑定到一个并行运算符实例(one parallel operator instance)的状态(即记录每个Task对应的状态值数据类型)。kafka connecttor是Flink中运算符状态使用的一个很好的示例。Kafka consumer的每个并行实例都维护一个主题分区和偏移的映射,作为其操作符状态。
在典型的有状态 Flink 应用程序中,你不需要Operator State。 它主要是一种特殊类型的状态,用于实现source/sink或你没有可以对状态进行分区的键的场景。
为了使用Operator State就得要实现CheckpointedFunction。请移步CheckpointedFunction说明。
Broadcast State是一种特殊的Operator State,有着特殊的应用场景,后续会说明如何使用,这里不再讲解。
state的存储
state的存储就是State Backends,在Flink1.13版本以前,老的分类是:MemoryStateBackend、FsStateBackend和RocksDBStateBackend。而在1.13版本以后分类就是:HashMapStateBackend和EmbeddedRocksDBStateBackend,再加上对应的storage。
下面列举新老对应关系:
MemoryStateBackend 相当于使用 HashMapStateBackend 和 JobManagerCheckpointStorage组合。存储位置:State: TaskManager 内存,Checkpoint: Jobmanager 内存。
#flink-conf.yaml配置
state.backend: hashmap
# Optional, Flink will automatically default to JobManagerCheckpointStorage
# when no checkpoint directory is specified.
state.checkpoint-storage: jobmanager
//java代码设置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(new JobManagerStateBackend());
FsStateBackend 相当于使用 HashMapStateBackend 和 FileSystemCheckpointStorage。存储位置:State:Taskmanager 内存,Checkpoint: 外部文件系统( 本地或 HDFS )。
#flink-conf.yaml配置
state.backend: hashmap
state.checkpoints.dir: file:///checkpoint-dir/# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem
//java代码设置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");// Advanced FsStateBackend configurations, such as write buffer size
// can be set by manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
RocksDBStateBackend 相当于使用 EmbeddedRocksDBStateBackend 和 FileSystemCheckpointStorage。存储位置:State:rocksdb,Checkpoint: 外部文件系统(本地或 HDFS )。
#flink-conf.yaml配置
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem
//java代码设置
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,
// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
使用建议
1. Keyed State如何清空state,state.clear() 实际上只能清理当前 key 对应的 value 值,如果想要清空整个 state,需要借助于 applyToAllKeys 方法。
2. Operator State慎重使用长list
参考官方文档中对state的介绍和使用的页面:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/
Flink state使用相关推荐
- 1.15.Flink state(状态)管理与恢复、什么是state、Keyed State、Operator State、状态容错(生成快照,恢复快照),checkPoint简介,重启策略等
1.15.Flink state(状态)管理与恢复 1.15.1.什么是state 1.15.2.状态(State) 1.15.3.Keyed State 1.15.4.Operator State ...
- Flink State 有可能代替数据库吗?
有状态的计算作为容错以及数据一致性的保证,是当今实时计算必不可少的特性之一,流行的实时计算引擎包括 Google Dataflow.Flink.Spark (Structure) Streaming. ...
- Flink state缓存测试
Flink state 在实际生产中的应用 一.FlinkState的概念 1.state分类 2.state backend 类型 二.实际应用如下 1.应用场景介绍 2.FsStateBacken ...
- Flink State和容错机制
为什么80%的码农都做不了架构师?>>> 1. Flink Barriers Flink分布式快照的核心元素是流barriers. 这些barriers被注入数据流并与记录一起 ...
- flink state ttl
flink ttl类型 本文用的flink1.7,一般来说ttl用在keyedStream中,官网只介绍了keyed flink ttl用法 import org.apache.flink.api.c ...
- Flink State 误用之痛,你中招了吗?
简介:本文主要讨论一个问题:ValueState 中存 Map 与 MapState 有什么区别?如果不懂这两者的区别,而且使用 ValueState 中存大对象,生产环境很可能会出现以下问题:CPU ...
- Flink State 最佳实践
本文主要分享与交流 Flink 状态使用过程中的一些经验与心得,当然标题取了"最佳实践"之名,希望文章内容能给读者带去一些干货.本文内容首先是回顾 state 相关概念,并认识和区 ...
- Flink State
键控状态数据结构 1.值状态( Value State ) new ValueStateDescriptor("last-temp", Types.of[T]) • 将状态表示为单 ...
- 3种Flink State Backed| 你该用哪个?
简介 01 .不同 State backend 吞吐量对比 02 .不同 State backend 延迟对比 03. State backend 的选择 04. MemoryStateBackend ...
- Flink State 深度讲解
在基础篇中的 1.2 节中介绍了 Flink 是一款有状态的流处理框架.那么大家可能有点疑问,这个状态是什么意思?拿 Flink 最简单的 Word Count 程序来说,它需要不断的对 word 出 ...
最新文章
- 一个IO的传奇一生(8) -- elevator子系统
- js 正则判断字符串是否为字母或数字
- Java面试题2019简书_2019最新Spring面试题大全含答案之Spring Beans(2019最全Spring超级葵花宝典)...
- “约见”面试官系列之常见面试题第四十一篇之VUE生命周期(建议收藏)
- Python nonlocal 与 global 关键字解析
- HTML:常用代码(自用)
- 拓端tecdat|Python用稀疏、高斯随机投影和主成分分析PCA对MNIST手写数字数据进行降维可视化
- JS 表单submit() 提交无效的问题
- oracle -varchar ,varchar2
- keyshot渲染很慢_提高Keyshot逼真渲染的小技巧!
- C++中模板的特化与偏特化
- Leaving fence domain… found dlm lockspace /sys/kernel/dlm/rgmanager
- 账户结构,推广计划,推广单元
- 简单的图标移入效果(css缩放)
- 国民阅读经典:谈修养读后感
- Pytorch入门实战 | 第P2周:彩色图片识别
- 微信小程序 给数字或文字加横线,比如原价、已完成任务
- 数字化转型思考的延伸问题
- 大容量充电宝或成乘机“累赘”
- 如何部署超级签名分发平台系统?
热门文章
- Protein Cell:心血管疾病中的肠道微生物及其潜在的治疗应用
- 刷题小程序【程序猿面试宝典】开发(二)| 页面创建、页面配置、全局配置
- android 6.0.1 充电提示音
- venv虚拟环境中的pip更新失败问题
- [摘录]第11章 造就优势谈判的驱动力
- 视频会议系统/在线教育系统/企业远程视频办公通话会议系统EasyRTC,如何替换域名数字证书?
- 太原服务器系统租用,太原市弹性云服务器租赁
- 线性代数笔记3.2线性相关定理
- windows查看dll导出函数名
- 画数轴的步骤_如何用几何画板画数轴?