Flink的State详解(1)
State概念解析
State是一个接口,不同类型的分区状态必须实现的接口,State只能应用于KeyedStream的函数访问。key是由系统自动提供的,因此函数总是看到映射到当前元素的键的值。这样,系统可以一致地处理流和状态分区。
KeyedStream:KeyedStream继承了DataStream,是由datastream的keyBy()产生的。表示按key的分区过的流。在datastream的功能基础上,由添加了一些max,min等聚合的功能。
通过状态快照实现容错处理
Flink管理的keyed state是排序分片的key/value形式存储的,每个keyed state的工作副本都是保存在当前taskmanager中。另外Operator state也保存在本地机器节点。Flink定期获取所有的状态快照,并且将这些快照复制到持久化的位置,列如分布式文件系统。
如果发生故障,Flink可以恢复应用程序的完整状态继续处理。
Flink管理的状态存储在state backend中。Flink有两种state backend的实现,一种是基于RocksDB内嵌key/value存储将其工作状态保存在磁盘上的,另一种是基于堆的state backend,将其工作状态保存在Java堆内存中。
这种基于堆的state backend有两种类型:
FsStateBackend,将其状态快照持久化到分布式文件系统;
MemoryStateBackend,它是使用JobManager的堆保存状态快照;
当使用基于堆的state backend保存状态时,访问和更新涉及在堆上读写对象。对于保存在RocksDBStateBackend的中对象,访问和更新涉及到序列化和反序列化,所以开销较大。但是RocksDBStateBackend只受到本地磁盘大小的限制,还有一点RocksDBStateBackend支持增量快照,这对于大量变化缓慢的状态的应用程序非常友好。
所有的State backends都是异步的执行快照,所以不会影响流式程序的运行。
状态快照
定义
- 快照:是Flink作业状态全局一致性镜像的通用属于。快照包括指向每个数据源的指针(例如:到文件或者Kafka分区的偏移量)以及每个作业有状态运算符的状态副本,该状态副本是处理了source偏移位置之前所有的事件而后产生的状态。
- Checkpoint:一种有flink自动执行的快照,其目的是能够从故障中恢复。Checkpoints可以是增量的,并为快速恢复做了优化。
- SavaPoint:用户出于某种操作目的(例如有状态的重新部署/升级/缩放操作)手动(或API调用)触发的快照。Savepints始终是完整的,并且已针对操作灵活性进行了优化。
状态快照如何工作
Flink 使用 Chandy-Lamport algorithm 算法的一种变体,称为异步 barrier 快照(asynchronous barrier snapshotting)。
当checkpoint coordinator(job manager的一部分)指示task manager开始checkpoint时,它会让所有的source记录他们的偏移量,并将编号的checkpoint barrier插入到数据流中。这些barrier流经job graph,标注每个checkpoint前后的流部分。
Checkpoint n将包含了每个operator的state,这些state是对应的operator消费了严格在checkpoint barrier n之前所有的事件,不包含之后任何时间生成的state。
当job graph中每个operator接收到barriers时,他就会记录下其状态。拥有输入流的Operator会执行barrier对齐,当前快照包含消费两个输入流barrier之前(但不超过)的所有events而产生的状态。
Flink的state backends利用写时复制(copy-on-write)机制允许当异步生成旧版本的状态快照时,能够不影响的继续流处理。只有当快照被永久保存后,才不会当作垃圾回收。
确保精确一次(exactly once)
当流处理应用程序发生错误的时候,结果可能产生丢失或者重复。
Flink根据你为应用程序和集群的配置,有以下几种结果:
- Flink不会从快照中进行恢复(at most once)
- 没有任何的丢失,但是你可能得到重复冗余的结果(at least once)
- 没有丢失或冗余重复(exactly once)
Flink通过回退和重新发送source的数据流中故障恢复,当理想情况被描述为精确一次时,这并不意味着每个事件都将被精确一次处理。相反,这意味着每一个事件都会影响Flink管理精确一次性语义。
Barrier只有在需要提供精确一次性语义保证时需要进行对齐(Barrier alignment)。如果不需要这种语义,可以通过配置checkpointingMode.AT_LEAST_ONCE关闭barrier对齐来提高性能。
端到端精确一次
为了实现端到端的精确一次,以便source中的每个事件都仅精确一次对sinks生效,必须满足以下条件:
1.你的source必须是可重放的
2.你的sinks必须是事务性的(或幂等的)
使用Keyed State
1.ValueState<>:继承自state接口,用于分区单值状态的接口。可以检索或更新该值。
状态由用户函数访问和修改,并作为分布式快照的一部分由系统一致地检查。
state只能由应用于KeyedStream的函数访问。键是由系统自动提供的,因此函数总是看到映射当前元素键的值。这样,系统可以一致地同时处理流和状态分区。
ValueState中的两个方法,value和update
T value() throws IOException
解析:返回当前状态的值。当state没有分区时,对于给定操作符中的所有输入,返回的值都是相同的。如果应用了state分区,则返回值取决于当前state输入的操作符,因为操作符为每个分区维护一个独立的state。
如果在创建valueState时没有指定默认值,那么当前使用update()方法设置值时,返回null。
void update(T value) throws IOException
解析:将value()可访问的运算符状态更新为给定值。下次在调用相同分区value()方法时,返回的时更新后的值。当分区状态更新为null时,将删除当前键的状态,并在下次访问时返回默认值。
2.ListState<>:保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。
ListState的两个方法,update和addAll
state由用户函数访问和修改,并作为分布式快照的一部分由系统一致地检查。
该state可以是键列表state或操作符state。
当它是键控列表state时,它由应用于KeyedStream的函数访问。键是由系统自动提供的,因此函数总是看到映射到当前元素键的值。这样,系统可以一致地同时处理流和state分区。
当他是操作符state列表时,该列表是一个state项的结合,这些state项相互独立,并且在操作符并行性发生变化时,可以在操作符实例之间重新分布。
void update(List values) throws Exception
解析:将现有的值更新到给定的值列表中,下次调用get()方法时,返回的是更新后的值。
如果传入努力了或者空列表,则状态值为null。
void addAll(List values) throws Exception
解析:通过将给定的值添加到现有的值列表,get()方法访问的运算符状态。下次调用get()方法(对于相同的状态分区)时,返回的状态将表示更新的列表。
如果传入null或空列表,则状态值保持不变。
3.ReducingState<>:保存一个单值,表示添加到state的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。
ReducingState是用于还原state的接口。元素可以添加到状态中,它们使用reduce函数进行聚合,可以检查当前的state。
ReducingState只能由应用于KeyedStream的函数访问。键是由系统自动提供的,因此函数总是看到映射到当前元素键的值。这样,系统可以一致地同时处理流和状态分区
4.AggregatingState<IN, OUT>: 保留一个单值,表示添加到state所有值得集合。和AggregatingState相反的是,聚合类型可能与添加到状态的元素的类型不同。接口与ListState类似,但使用add(IN)添加的元素会用指定的AggregateFunction进行聚合。
解析:AggregatingState接口用于聚合state,添加到到这个state中将使用给定的聚合函数预先聚合。
state内部始终保存累加器的类型,当访问状态结果时,会调用AggregateFunction的getResult()方法。
5.MapState<UK, UV>:维护了一个映射列表。你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。
分区键值的state接口。可以添加、更新和检索键值对。
提供的方法
UV get(UK key) throws Exception
返回与给定键关联的当前值。
void put(UK key, UV value) throws Exception
将新值与给定键相关联。
void putAll(Map<UK, UV> map) throws Exception
将给定的Map集合添加到state中
void remove(UK key) throws Exception
删除给定key的键值
boolean contains(UK key) throws Exception
返回给定的key的键值是否存在
Iterable<Map.Entry<UK, UV>> entries() throws Exception
返回state中所有Map
Iterable keys() throws Exception
返回状态中所有的key
Iterable values() throws Exception
返回状态中所有的value
Iterator<Map.Entry<UK, UV>> iterator() throws Exception
迭代state中的Map
boolean isEmpty() throws Exception
如果此state不包含键值映射,则返回true,否则返回false。
6.所有类型的状态还有一个clear() 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。
public class CountWindowAverage extends RichMapFunction<Tuple2<Long,Long>,Tuple2<Long,Long>> {/*** The ValueState handle. The first field is the count, the second field a running sum.*/private transient ValueState<Tuple2<Long,Long>> sum;@Overridepublic void flatmap (Tuple2<Long,Long> input,Collector<Tuple2<Long,Long>> out) throws Exception{// access the state valueTuple2<Long, Long> currentSum = sum.vlaue;// update the countcurrentSum.f0 += 1;// add the second field of the input valuecurrentSum.f1 += input.f1;// update the statesum.update(currentSum);// if the count reaches 2, emit the average and clear the stateif(currentSum.f0 >= 2) {out.collect(new Tuple2<>(input.f0,currentSum.f1 / currentSum.f0);sum.clear;}}@Overridepublic void open(Configuration config){ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new ValueStateDescriptor<>("average", // the state nameTypeInformation.of(new TypeHint<Tuple2<Long,Long>>(){}), // type informationTuple2.of(0L,0L));sum = getRuntimeContext().getState(descriptor);}
}// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)).keyBy(value -> value.f0).flatMap(new CountWindowAverage()).print();
State的有效期
任何类型的keyed state都可以有有效期(TTL)。如果设置的有效期状态值已过,则会尽最大可能删除该值。所有状态类型都支持单元素的TTL。这意味着列表元素和映射元素将独立到期。
在使用TTL前,需要先构建StateTtlConfig配置对象,然后把配置传到state descriptor中启用TTL功能
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
TTL 配置有以下几个选项:
newBuilder 的第一个参数表示数据的有效期,是必选项。
TTL 的更新策略(默认是 OnCreateAndWrite):
- StateTtlConfig.UpdateType.onCreateAndWrite - 仅在创建和写入时更新。
- StateTtlConfig.UpdateType.OnReadAndWrite - 读取时也更新。
数据在过期但还未被清理时的可见性配置如下(默认为 NeverReturnExpired):
- StateTtlConfig.StateVisibility.NeverReturnExpired - 不返回过期数据
- StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 会返回过期但未清理的数据。
NeverReturnExpired 情况下,过期数据就像不存在一样,不管是否被物理删除。这对于不能访问过期数据的场景下非常有用,比如敏感数据。 ReturnExpiredIfNotCleanedUp 在数据被物理删除前都会返回。
注意:
- 状态上次修改时间会和数据一起保存在state backend中,因此开启该特性会增加状态数据的存储。Heap state backend会额外存储一个包括用户状态和时间戳的Java对象,RocksDB state backend会在每个状态值(list或map的每个元素)序列化后增加8个字节。
- 暂时只支持基于 processing time 的 TTL。
- 尝试从 checkpoint/savepoint 进行恢复时,TTL 的状态(是否开启)必须和之前保持一致,否则会遇到 “StateMigrationException”。
- TTL 的配置并不会保存在 checkpoint/savepoint 中,仅对当前 Job 有效。
- 当前开启 TTL 的 map state 仅在用户值序列化器支持 null 的情况下,才支持用户值为 null。如果用户值序列化器不支持 null, 可以用 NullableSerializer 包装一层。
过期数据的清理
默认情况下,过期数据会在读取的时候被删除,例如 ValueState#value,同时会有后台线程定期清理(如果 StateBackend 支持的话)。可以通过 StateTtlConfig 配置关闭后台清理:
import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).disableCleanupInBackground().build();
可以按照如下所示配置更细粒度的后台清理策略。当前的实现中 HeapStateBackend 依赖增量数据清理,RocksDBStateBackend 利用压缩过滤器进行后台清理。
全量快照时进行清理
另外,你可以启用全量快照时进行清理的策略,这可以减少整个快照的大小。当前实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据。 该策略可以通过 StateTtlConfig 配置进行配置:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupFullSnapshot().build();
这种策略在 RocksDBStateBackend 的增量 checkpoint 模式下无效。
注意:这种清理方式可以在任何时候通过 StateTtlConfig 启用或者关闭,比如在从 savepoint 恢复时。
增量数据清理
另外可以选择增量式清理状态数据,在状态访问或/和处理时进行。如果某个状态开启了该清理策略,则会在存储后端保留一个所有状态的惰性全局迭代器。 每次触发增量清理时,从迭代器中选择已经过期的数进行清理。
该特性可以通过 StateTtlConfig 进行配置:
import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupIncrementally(10, true).build();
该策略有两个参数。 第一个是每次清理时检查状态的条目数,在每个状态访问时触发。第二个参数表示是否在处理每条记录时触发清理。 Heap backend 默认会检查 5 条状态,并且关闭在每条记录时触发清理。
注意:
- 如果没有 state 访问,也没有处理数据,则不会清理过期数据。
- 增量清理会增加数据处理的耗时。
- 现在仅 Heap state backend 支持增量清除机制。在 RocksDB state backend 上启用该特性无效。
- 如果 Heap state backend 使用同步快照方式,则会保存一份所有 key 的拷贝,从而防止并发修改问题,因此会增加内存的使用。但异步快照则没有这个问题。
- 对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。
在 RocksDB 压缩时清理
如果使用 RocksDB state backend,则会启用 Flink 为 RocksDB 定制的压缩过滤器。RocksDB 会周期性的对数据进行合并压缩从而减少存储空间。 Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。
该特性可以通过 StateTtlConfig 进行配置:
import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupInRocksdbCompactFilter(1000).build();
Flink 处理一定条数的状态数据后,会使用当前时间戳来检测 RocksDB 中的状态是否已经过期, 你可以通过 StateTtlConfig.newBuilder(…).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法指定处理状态的条数。 时间戳更新的越频繁,状态的清理越及时,但由于压缩会有调用 JNI 的开销,因此会影响整体的压缩性能。 RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次。
你还可以通过配置开启 RocksDB 过滤器的 debug 日志: log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG
注意:
- 压缩时调用 TTL 过滤器会降低速度。TTL 过滤器需要解析上次访问的时间戳,并对每个将参与压缩的状态进行是否过期检查。 对于集合型状态类型(比如 list 和 map),会对集合中每个元素进行检查。
- 对于元素序列化后长度不固定的列表状态,TTL 过滤器需要在每次 JNI 调用过程中,额外调用 Flink 的 java 序列化器, 从而确定下一个未过期数据的位置。
- 对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。
DataStream 状态相关的 Scala API
除了上面描述的接口之外,Scala API 还在 KeyedStream 上对 map() 和 flatMap() 访问 ValueState 提供了一个更便捷的接口。 用户函数能够通过 Option 获取当前 ValueState 的值,并且返回即将保存到状态的值。
val stream: DataStream[(String, Int)] = ...val counts: DataStream[(String, Int)] = stream.keyBy(_._1).mapWithState((in: (String, Int), count: Option[Int]) =>count match {case Some(c) => ( (in._1, c), Some(c + in._2) )case None => ( (in._1, 0), Some(in._2) )})
Flink的State详解(1)相关推荐
- [Trident] Storm Trident 教程,state详解、trident api详解及实例
英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial ---------------- Trident是在storm基础上,一个 ...
- Flink checkpoint操作流程详解与报错调试方法汇总,增量checkpoint原理及版本更新变化,作业恢复和扩缩容原理与优化
这里写目录标题 flink checkpint出错类型 flink 重启策略 Checkpint 流程简介 增量Checkpoint实现原理 MemoryStateBackend 原理 FsState ...
- 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战025--DateSet实用API详解025
一.Flink DateSet定制API详解(JAVA版) -002 flatMap 以element为粒度,对element进行1:n的转化. 执行程序: package code.book.bat ...
- Flink Window机制详解
Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理.而窗口(window)就是从 Streaming 到 Batch ...
- flutter中state详解
State 在说到StatefulWidget之前,先说下State.State的作用有两点: 在widget构建的时候可以被同步读取: 在widget的生命周期中可能会被改变. State生命周期 ...
- flink自定义trigger详解
适用的场景解释: [1]中有句话是这样的: "其实,我们要实现基于事件时间的窗口随意输出,比如1000个元素触发一次输出,那么我们就可以通过修改这个触发器来实现." 这句话的意思是 ...
- react中的state详解
state 理解:state是组件对象最重要的属性,值是对象(可以包含多个key-value组合) state中的值可以修改,修改的唯一方法是调用this.setState,每次修改以后,自动调用 t ...
- QML state详解
1.state简介 changes(list<Change>):保存当前State下的多个Change对象,比如PropertyChanges.StateChangeScript.Pare ...
- Flink中Window详解之Window的聚合函数AggregateFunction
和 ReduceFunction 相似,AggregateFunction 也是基于中间状态计算结果的增量计算 函数,但 AggregateFunction 在窗口计算上更加通用.AggregateF ...
最新文章
- opencv转pytorch
- shell脚本——实现简单的功能
- 在python中 函数赋值给变量时,需要注意的几个事项
- java实例摘要(四)
- 字节跳动2021内推启动啦
- Python 从入门到精通 全程最佳实现梳理
- 手机图片怎么免费转换成PDF格式?教程来了
- Android投屏,屏幕共享,两个设备互相投屏
- 学NTFS格式磁盘解析及atapi磁盘读写
- Verilog编写FSM有限状态机来检测序列11011,则输出1;可对序列进行重复检测
- 数据科学 IPython 笔记本 7.2 数据整理
- 腾讯人均月薪 8 万,恍恍惚惚,又被平均了?
- 【我的Android进阶之旅】Configuration 'compile' is obsolete and has been replaced with 'implementation' and
- 2021机动车检测站签字授权人考试专业基础知识部分题库与答案
- 举例说明 频分多址FDMA、时分多址TDMA、码分多址CDMA、空分多址SDMA的异同
- Python——下载音乐(干货)
- 道一云与畅捷通T+对接集成获取报销信息列表=>凭证创建
- Springboot集成springFox-Swagger3并通过Yapi做接口管理
- 虚拟机无法启动提示give root password for maintenance的多种解决方法
- oracle 中文导入 乱码 ZHS16GBK AL32UTF8