State概念解析

State是一个接口,不同类型的分区状态必须实现的接口,State只能应用于KeyedStream的函数访问。key是由系统自动提供的,因此函数总是看到映射到当前元素的键的值。这样,系统可以一致地处理流和状态分区。

KeyedStream:KeyedStream继承了DataStream,是由datastream的keyBy()产生的。表示按key的分区过的流。在datastream的功能基础上,由添加了一些max,min等聚合的功能。

通过状态快照实现容错处理

Flink管理的keyed state是排序分片的key/value形式存储的,每个keyed state的工作副本都是保存在当前taskmanager中。另外Operator state也保存在本地机器节点。Flink定期获取所有的状态快照,并且将这些快照复制到持久化的位置,列如分布式文件系统。
如果发生故障,Flink可以恢复应用程序的完整状态继续处理。
Flink管理的状态存储在state backend中。Flink有两种state backend的实现,一种是基于RocksDB内嵌key/value存储将其工作状态保存在磁盘上的,另一种是基于堆的state backend,将其工作状态保存在Java堆内存中。
这种基于堆的state backend有两种类型:
FsStateBackend,将其状态快照持久化到分布式文件系统;
MemoryStateBackend,它是使用JobManager的堆保存状态快照;
当使用基于堆的state backend保存状态时,访问和更新涉及在堆上读写对象。对于保存在RocksDBStateBackend的中对象,访问和更新涉及到序列化和反序列化,所以开销较大。但是RocksDBStateBackend只受到本地磁盘大小的限制,还有一点RocksDBStateBackend支持增量快照,这对于大量变化缓慢的状态的应用程序非常友好。
所有的State backends都是异步的执行快照,所以不会影响流式程序的运行。

状态快照

定义

  • 快照:是Flink作业状态全局一致性镜像的通用属于。快照包括指向每个数据源的指针(例如:到文件或者Kafka分区的偏移量)以及每个作业有状态运算符的状态副本,该状态副本是处理了source偏移位置之前所有的事件而后产生的状态。
  • Checkpoint:一种有flink自动执行的快照,其目的是能够从故障中恢复。Checkpoints可以是增量的,并为快速恢复做了优化。
  • SavaPoint:用户出于某种操作目的(例如有状态的重新部署/升级/缩放操作)手动(或API调用)触发的快照。Savepints始终是完整的,并且已针对操作灵活性进行了优化。

状态快照如何工作
Flink 使用 Chandy-Lamport algorithm 算法的一种变体,称为异步 barrier 快照(asynchronous barrier snapshotting)。
当checkpoint coordinator(job manager的一部分)指示task manager开始checkpoint时,它会让所有的source记录他们的偏移量,并将编号的checkpoint barrier插入到数据流中。这些barrier流经job graph,标注每个checkpoint前后的流部分。

Checkpoint n将包含了每个operator的state,这些state是对应的operator消费了严格在checkpoint barrier n之前所有的事件,不包含之后任何时间生成的state。
当job graph中每个operator接收到barriers时,他就会记录下其状态。拥有输入流的Operator会执行barrier对齐,当前快照包含消费两个输入流barrier之前(但不超过)的所有events而产生的状态。

Flink的state backends利用写时复制(copy-on-write)机制允许当异步生成旧版本的状态快照时,能够不影响的继续流处理。只有当快照被永久保存后,才不会当作垃圾回收。

确保精确一次(exactly once)

当流处理应用程序发生错误的时候,结果可能产生丢失或者重复。
Flink根据你为应用程序和集群的配置,有以下几种结果:

  • Flink不会从快照中进行恢复(at most once)
  • 没有任何的丢失,但是你可能得到重复冗余的结果(at least once)
  • 没有丢失或冗余重复(exactly once)
    Flink通过回退和重新发送source的数据流中故障恢复,当理想情况被描述为精确一次时,这并不意味着每个事件都将被精确一次处理。相反,这意味着每一个事件都会影响Flink管理精确一次性语义。
    Barrier只有在需要提供精确一次性语义保证时需要进行对齐(Barrier alignment)。如果不需要这种语义,可以通过配置checkpointingMode.AT_LEAST_ONCE关闭barrier对齐来提高性能。

端到端精确一次

为了实现端到端的精确一次,以便source中的每个事件都仅精确一次对sinks生效,必须满足以下条件:
1.你的source必须是可重放的
2.你的sinks必须是事务性的(或幂等的)

使用Keyed State

1.ValueState<>:继承自state接口,用于分区单值状态的接口。可以检索或更新该值。

状态由用户函数访问和修改,并作为分布式快照的一部分由系统一致地检查。
state只能由应用于KeyedStream的函数访问。键是由系统自动提供的,因此函数总是看到映射当前元素键的值。这样,系统可以一致地同时处理流和状态分区。

ValueState中的两个方法,value和update

T value() throws IOException

解析:返回当前状态的值。当state没有分区时,对于给定操作符中的所有输入,返回的值都是相同的。如果应用了state分区,则返回值取决于当前state输入的操作符,因为操作符为每个分区维护一个独立的state。
如果在创建valueState时没有指定默认值,那么当前使用update()方法设置值时,返回null。

void update(T value) throws IOException

解析:将value()可访问的运算符状态更新为给定值。下次在调用相同分区value()方法时,返回的时更新后的值。当分区状态更新为null时,将删除当前键的状态,并在下次访问时返回默认值。

2.ListState<>:保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。

ListState的两个方法,update和addAll

state由用户函数访问和修改,并作为分布式快照的一部分由系统一致地检查。
该state可以是键列表state或操作符state。
当它是键控列表state时,它由应用于KeyedStream的函数访问。键是由系统自动提供的,因此函数总是看到映射到当前元素键的值。这样,系统可以一致地同时处理流和state分区。
当他是操作符state列表时,该列表是一个state项的结合,这些state项相互独立,并且在操作符并行性发生变化时,可以在操作符实例之间重新分布。

void update(List values) throws Exception

解析:将现有的值更新到给定的值列表中,下次调用get()方法时,返回的是更新后的值。
如果传入努力了或者空列表,则状态值为null。

void addAll(List values) throws Exception

解析:通过将给定的值添加到现有的值列表,get()方法访问的运算符状态。下次调用get()方法(对于相同的状态分区)时,返回的状态将表示更新的列表。
如果传入null或空列表,则状态值保持不变。

3.ReducingState<>:保存一个单值,表示添加到state的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。

ReducingState是用于还原state的接口。元素可以添加到状态中,它们使用reduce函数进行聚合,可以检查当前的state。
ReducingState只能由应用于KeyedStream的函数访问。键是由系统自动提供的,因此函数总是看到映射到当前元素键的值。这样,系统可以一致地同时处理流和状态分区

4.AggregatingState<IN, OUT>: 保留一个单值,表示添加到state所有值得集合。和AggregatingState相反的是,聚合类型可能与添加到状态的元素的类型不同。接口与ListState类似,但使用add(IN)添加的元素会用指定的AggregateFunction进行聚合。

解析:AggregatingState接口用于聚合state,添加到到这个state中将使用给定的聚合函数预先聚合。
state内部始终保存累加器的类型,当访问状态结果时,会调用AggregateFunction的getResult()方法。

5.MapState<UK, UV>:维护了一个映射列表。你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map<UK,UV>) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。

分区键值的state接口。可以添加、更新和检索键值对。

提供的方法

UV get(UK key) throws Exception

返回与给定键关联的当前值。

void put(UK key, UV value) throws Exception

将新值与给定键相关联。

void putAll(Map<UK, UV> map) throws Exception

将给定的Map集合添加到state中

void remove(UK key) throws Exception

删除给定key的键值

boolean contains(UK key) throws Exception

返回给定的key的键值是否存在

Iterable<Map.Entry<UK, UV>> entries() throws Exception

返回state中所有Map

Iterable keys() throws Exception

返回状态中所有的key

Iterable values() throws Exception

返回状态中所有的value

Iterator<Map.Entry<UK, UV>> iterator() throws Exception

迭代state中的Map

boolean isEmpty() throws Exception

如果此state不包含键值映射,则返回true,否则返回false。

6.所有类型的状态还有一个clear() 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。

public class CountWindowAverage extends RichMapFunction<Tuple2<Long,Long>,Tuple2<Long,Long>> {/*** The ValueState handle. The first field is the count, the second field a running sum.*/private transient ValueState<Tuple2<Long,Long>> sum;@Overridepublic void flatmap (Tuple2<Long,Long> input,Collector<Tuple2<Long,Long>> out) throws Exception{// access the state valueTuple2<Long, Long> currentSum = sum.vlaue;// update the countcurrentSum.f0 += 1;// add the second field of the input valuecurrentSum.f1 += input.f1;// update the statesum.update(currentSum);// if the count reaches 2, emit the average and clear the stateif(currentSum.f0 >= 2) {out.collect(new Tuple2<>(input.f0,currentSum.f1 / currentSum.f0);sum.clear;}}@Overridepublic void open(Configuration config){ValueStateDescriptor<Tuple2<Long,Long>> descriptor = new ValueStateDescriptor<>("average", // the state nameTypeInformation.of(new TypeHint<Tuple2<Long,Long>>(){}), // type informationTuple2.of(0L,0L));sum = getRuntimeContext().getState(descriptor);}
}// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)).keyBy(value -> value.f0).flatMap(new CountWindowAverage()).print();

State的有效期

任何类型的keyed state都可以有有效期(TTL)。如果设置的有效期状态值已过,则会尽最大可能删除该值。所有状态类型都支持单元素的TTL。这意味着列表元素和映射元素将独立到期。
在使用TTL前,需要先构建StateTtlConfig配置对象,然后把配置传到state descriptor中启用TTL功能

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

TTL 配置有以下几个选项:
newBuilder 的第一个参数表示数据的有效期,是必选项。
TTL 的更新策略(默认是 OnCreateAndWrite):

  • StateTtlConfig.UpdateType.onCreateAndWrite - 仅在创建和写入时更新。
  • StateTtlConfig.UpdateType.OnReadAndWrite - 读取时也更新。

数据在过期但还未被清理时的可见性配置如下(默认为 NeverReturnExpired):

  • StateTtlConfig.StateVisibility.NeverReturnExpired - 不返回过期数据
  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 会返回过期但未清理的数据。

NeverReturnExpired 情况下,过期数据就像不存在一样,不管是否被物理删除。这对于不能访问过期数据的场景下非常有用,比如敏感数据。 ReturnExpiredIfNotCleanedUp 在数据被物理删除前都会返回。

注意:

  • 状态上次修改时间会和数据一起保存在state backend中,因此开启该特性会增加状态数据的存储。Heap state backend会额外存储一个包括用户状态和时间戳的Java对象,RocksDB state backend会在每个状态值(list或map的每个元素)序列化后增加8个字节。
  • 暂时只支持基于 processing time 的 TTL。
  • 尝试从 checkpoint/savepoint 进行恢复时,TTL 的状态(是否开启)必须和之前保持一致,否则会遇到 “StateMigrationException”。
  • TTL 的配置并不会保存在 checkpoint/savepoint 中,仅对当前 Job 有效。
  • 当前开启 TTL 的 map state 仅在用户值序列化器支持 null 的情况下,才支持用户值为 null。如果用户值序列化器不支持 null, 可以用 NullableSerializer 包装一层。

过期数据的清理

默认情况下,过期数据会在读取的时候被删除,例如 ValueState#value,同时会有后台线程定期清理(如果 StateBackend 支持的话)。可以通过 StateTtlConfig 配置关闭后台清理:

import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).disableCleanupInBackground().build();

可以按照如下所示配置更细粒度的后台清理策略。当前的实现中 HeapStateBackend 依赖增量数据清理,RocksDBStateBackend 利用压缩过滤器进行后台清理。

全量快照时进行清理

另外,你可以启用全量快照时进行清理的策略,这可以减少整个快照的大小。当前实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据。 该策略可以通过 StateTtlConfig 配置进行配置:

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupFullSnapshot().build();

这种策略在 RocksDBStateBackend 的增量 checkpoint 模式下无效。

注意:这种清理方式可以在任何时候通过 StateTtlConfig 启用或者关闭,比如在从 savepoint 恢复时。

增量数据清理

另外可以选择增量式清理状态数据,在状态访问或/和处理时进行。如果某个状态开启了该清理策略,则会在存储后端保留一个所有状态的惰性全局迭代器。 每次触发增量清理时,从迭代器中选择已经过期的数进行清理。

该特性可以通过 StateTtlConfig 进行配置:

import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupIncrementally(10, true).build();

该策略有两个参数。 第一个是每次清理时检查状态的条目数,在每个状态访问时触发。第二个参数表示是否在处理每条记录时触发清理。 Heap backend 默认会检查 5 条状态,并且关闭在每条记录时触发清理。

注意:

  • 如果没有 state 访问,也没有处理数据,则不会清理过期数据。
  • 增量清理会增加数据处理的耗时。
  • 现在仅 Heap state backend 支持增量清除机制。在 RocksDB state backend 上启用该特性无效。
  • 如果 Heap state backend 使用同步快照方式,则会保存一份所有 key 的拷贝,从而防止并发修改问题,因此会增加内存的使用。但异步快照则没有这个问题。
  • 对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。

在 RocksDB 压缩时清理
如果使用 RocksDB state backend,则会启用 Flink 为 RocksDB 定制的压缩过滤器。RocksDB 会周期性的对数据进行合并压缩从而减少存储空间。 Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。
该特性可以通过 StateTtlConfig 进行配置:

import org.apache.flink.api.common.state.StateTtlConfig;StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupInRocksdbCompactFilter(1000).build();

Flink 处理一定条数的状态数据后,会使用当前时间戳来检测 RocksDB 中的状态是否已经过期, 你可以通过 StateTtlConfig.newBuilder(…).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法指定处理状态的条数。 时间戳更新的越频繁,状态的清理越及时,但由于压缩会有调用 JNI 的开销,因此会影响整体的压缩性能。 RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次。

你还可以通过配置开启 RocksDB 过滤器的 debug 日志: log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG

注意:

  • 压缩时调用 TTL 过滤器会降低速度。TTL 过滤器需要解析上次访问的时间戳,并对每个将参与压缩的状态进行是否过期检查。 对于集合型状态类型(比如 list 和 map),会对集合中每个元素进行检查。
  • 对于元素序列化后长度不固定的列表状态,TTL 过滤器需要在每次 JNI 调用过程中,额外调用 Flink 的 java 序列化器, 从而确定下一个未过期数据的位置。
  • 对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。

DataStream 状态相关的 Scala API

除了上面描述的接口之外,Scala API 还在 KeyedStream 上对 map() 和 flatMap() 访问 ValueState 提供了一个更便捷的接口。 用户函数能够通过 Option 获取当前 ValueState 的值,并且返回即将保存到状态的值。

val stream: DataStream[(String, Int)] = ...val counts: DataStream[(String, Int)] = stream.keyBy(_._1).mapWithState((in: (String, Int), count: Option[Int]) =>count match {case Some(c) => ( (in._1, c), Some(c + in._2) )case None => ( (in._1, 0), Some(in._2) )})

Flink的State详解(1)相关推荐

  1. [Trident] Storm Trident 教程,state详解、trident api详解及实例

    英文原址:https://github.com/nathanmarz/storm/wiki/Trident-tutorial ---------------- Trident是在storm基础上,一个 ...

  2. Flink checkpoint操作流程详解与报错调试方法汇总,增量checkpoint原理及版本更新变化,作业恢复和扩缩容原理与优化

    这里写目录标题 flink checkpint出错类型 flink 重启策略 Checkpint 流程简介 增量Checkpoint实现原理 MemoryStateBackend 原理 FsState ...

  3. 【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战025--DateSet实用API详解025

    一.Flink DateSet定制API详解(JAVA版) -002 flatMap 以element为粒度,对element进行1:n的转化. 执行程序: package code.book.bat ...

  4. Flink Window机制详解

    Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理.而窗口(window)就是从 Streaming 到 Batch ...

  5. flutter中state详解

    State 在说到StatefulWidget之前,先说下State.State的作用有两点: 在widget构建的时候可以被同步读取: 在widget的生命周期中可能会被改变. State生命周期 ...

  6. flink自定义trigger详解

    适用的场景解释: [1]中有句话是这样的: "其实,我们要实现基于事件时间的窗口随意输出,比如1000个元素触发一次输出,那么我们就可以通过修改这个触发器来实现." 这句话的意思是 ...

  7. react中的state详解

    state 理解:state是组件对象最重要的属性,值是对象(可以包含多个key-value组合) state中的值可以修改,修改的唯一方法是调用this.setState,每次修改以后,自动调用 t ...

  8. QML state详解

    1.state简介 changes(list<Change>):保存当前State下的多个Change对象,比如PropertyChanges.StateChangeScript.Pare ...

  9. Flink中Window详解之Window的聚合函数AggregateFunction

    和 ReduceFunction 相似,AggregateFunction 也是基于中间状态计算结果的增量计算 函数,但 AggregateFunction 在窗口计算上更加通用.AggregateF ...

最新文章

  1. opencv转pytorch
  2. shell脚本——实现简单的功能
  3. 在python中 函数赋值给变量时,需要注意的几个事项
  4. java实例摘要(四)
  5. 字节跳动2021内推启动啦
  6. Python 从入门到精通 全程最佳实现梳理
  7. 手机图片怎么免费转换成PDF格式?教程来了
  8. Android投屏,屏幕共享,两个设备互相投屏
  9. 学NTFS格式磁盘解析及atapi磁盘读写
  10. Verilog编写FSM有限状态机来检测序列11011,则输出1;可对序列进行重复检测
  11. 数据科学 IPython 笔记本 7.2 数据整理
  12. 腾讯人均月薪 8 万,恍恍惚惚,又被平均了?
  13. 【我的Android进阶之旅】Configuration 'compile' is obsolete and has been replaced with 'implementation' and
  14. 2021机动车检测站签字授权人考试专业基础知识部分题库与答案
  15. 举例说明 频分多址FDMA、时分多址TDMA、码分多址CDMA、空分多址SDMA的异同
  16. Python——下载音乐(干货)
  17. 道一云与畅捷通T+对接集成获取报销信息列表=>凭证创建
  18. Springboot集成springFox-Swagger3并通过Yapi做接口管理
  19. 虚拟机无法启动提示give root password for maintenance的多种解决方法
  20. oracle 中文导入 乱码 ZHS16GBK AL32UTF8

热门文章

  1. 跳槽对个人发展的利与弊是什么?
  2. C语言系列:2、数据类型、运算符和表达式
  3. 计算机编程数学英语不好怎么办,英语和数学不好的人是不是学不会编程?
  4. 虚拟机ping外网连接失败解决方法
  5. 好博医疗冲刺科创板:年营收2.6亿 万永钢和沈智群为实控人
  6. python项目练习——外星人入侵游戏(一)——武装飞船
  7. 网站服务器记录登录,怎样查看远程登录过服务器的记录
  8. 使用Python进行ADSL宽带拨号连接等操作
  9. Ta-lib学习笔记01--成交量指标
  10. win 7计算机图标变了,win7系统图标变黑的三种解决方法(图文)