简介

Flink相比其他流计算引擎,最大的优势就是号称是有状态的流计算。可见state在Flink中极其重要的位置。数据流是由一个个单独的事件按时间序列组合成的,虽然数据流中的许多操作一次只查看一个单独的事件(例如事件解析器,即不关注状态,不需要过往信息),但有些操作会跨多个事件记住信息(例如窗口操作符)。这些操作称为有状态操作。

下面是一些有状态的操作的使用场景:

1)对一个时间窗口内的数据进行聚合分析
2)在线机器学习场景下,需要根据新流入数据不断更新机器学习的模型参数
3)数据流中的数据有重复,我们想对重复数据去重,需要记录哪些数据已经流入过应用,当新数据流入时,根据已流入过的数据来判断去重等

除了这些有用的应用场景外,state也是Flink使用checkpoints和savepoints实现容错的关键。

现在Flink正在慢慢实现让用户在运行时从Flink外部访问state,当然在发展中,可能api等都会改变,目前不是很稳定,不过未来应该是个不错的功能。

state的分类

state主要分为两类:Keyed State和Operator State

Keyed State

Keyed State只能用在KeyedStream上,所以要先形成KeyedStream(使用stream.keyBy(…))。

Flink的数据模型不是基于键值对的。因此,不需要将数据集类型物理地打包到键和值中。键是“虚拟的”:它们被定义为在实际数据上的函数,以指导分组操作符。

对于Keyed State,Flink提供了几种现成的数据结构供我们使用:ValueState<T>、ListState<T>、ReducingState<T>、AggregatingState<IN, OUT>、MapState<UK, UV>。要注意理解,上面的5种state类型都是表示stream keyBy 的 key的value的state类型。为了与 keyBy 的 key 进行区分,所以 Flink 中把 MapState 的 key、value 分别叫 UserKey、UserValue。

ValueState<T>:存储单一的值,即每个key只有一个值
ListState<T>:存储一个list,即每个key有一个list值
MapState<UK, UV>:存储一个map,即每个key有一个map值
ReducingState<T>和AggregatingState<IN, OUT>与ListState<T>同属于MergingState<T>。与ListState<T>不同的是,ReducingState<T>只有一个元素,而不是一个列表。它的原理是新元素通过add(T)加入后,与已有的状态元素使用ReduceFunction合并为一个元素,并更新到状态里。AggregatingState<IN, OUT>与ReducingState<T>类似,也只有一个元素,只不过AggregatingState<IN, OUT>的输入和输出类型可以不一样。ReducingState<T>和AggregatingState<IN, OUT>与窗口上进行ReduceFunction和AggregateFunction很像,都是将新元素与已有元素做聚合。

因为本身支持这么多类型的,所以不要用ValueState<T>去存list或者map这种数据类型,直接使用ListState和MapState效率会高很多。

State是通过RuntimeContext类获取的,所以使用State的地方就是rich functions,即实现RichFunction或其子接口,就可以获取State。在里面我们就可以通过StateTtlConfig设置State的TTL等。比如:

public class TTLCountMapFunction extends RichMapFunction<Tuple2<String, Long>, Tuple2<String, Long>> {
    private transient ValueState<Long> state;  
    ...
    public void open(Configuration parameters) throws Exception {
        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.seconds(600))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<>("count-state", Long.class);
        stateDescriptor.enableTimeToLive(ttlConfig);
        state = getRuntimeContext().getState(stateDescriptor);
    }
}

StateTtlConfig相关的设置可以查看官网链接。即datastream api中是通过StateTtlConfig设置,而在sql中是通过table.exec.state.ttl配置,默认值是0,表示状态永不过期。在table api中是StreamTableEnvironment.getConfig.setIdleStateRetention。

Flink 中 State 支持设置 TTL,TTL 只是将时间戳与 userValue 封装起来。
· MapState 的 TTL 是基于 UK 级别的
· ValueState 的 TTL 是基于整个 key 的

Operator State(non-keyed state)

Operator State是绑定到一个并行运算符实例(one parallel operator instance)的状态(即记录每个Task对应的状态值数据类型)。kafka connecttor是Flink中运算符状态使用的一个很好的示例。Kafka consumer的每个并行实例都维护一个主题分区和偏移的映射,作为其操作符状态。

在典型的有状态 Flink 应用程序中,你不需要Operator State。 它主要是一种特殊类型的状态,用于实现source/sink或你没有可以对状态进行分区的键的场景。

为了使用Operator State就得要实现CheckpointedFunction。请移步CheckpointedFunction说明。

Broadcast State是一种特殊的Operator State,有着特殊的应用场景,后续会说明如何使用,这里不再讲解。

state的存储

state的存储就是State Backends,在Flink1.13版本以前,老的分类是:MemoryStateBackend、FsStateBackend和RocksDBStateBackend。而在1.13版本以后分类就是:HashMapStateBackend和EmbeddedRocksDBStateBackend,再加上对应的storage。

下面列举新老对应关系:

MemoryStateBackend 相当于使用 HashMapStateBackend 和 JobManagerCheckpointStorage组合。存储位置:State: TaskManager 内存,Checkpoint: Jobmanager 内存。

#flink-conf.yaml配置

state.backend: hashmap

# Optional, Flink will automatically default to JobManagerCheckpointStorage
# when no checkpoint directory is specified.
state.checkpoint-storage: jobmanager

//java代码设置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(new JobManagerStateBackend());

FsStateBackend 相当于使用 HashMapStateBackend 和 FileSystemCheckpointStorage。存储位置:State:Taskmanager 内存,Checkpoint: 外部文件系统( 本地或 HDFS )。

#flink-conf.yaml配置

state.backend: hashmap
state.checkpoints.dir: file:///checkpoint-dir/

# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem

//java代码设置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");

// Advanced FsStateBackend configurations, such as write buffer size
// can be set by manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));

RocksDBStateBackend 相当于使用 EmbeddedRocksDBStateBackend 和 FileSystemCheckpointStorage。存储位置:State:rocksdb,Checkpoint: 外部文件系统(本地或 HDFS )。

#flink-conf.yaml配置

state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/

# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem

//java代码设置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");

// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,
// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));

使用建议

1. Keyed State如何清空state,state.clear() 实际上只能清理当前 key 对应的 value 值,如果想要清空整个 state,需要借助于 applyToAllKeys 方法。

2. Operator State慎重使用长list

参考官方文档中对state的介绍和使用的页面:

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/

Flink state使用相关推荐

  1. 1.15.Flink state(状态)管理与恢复、什么是state、Keyed State、Operator State、状态容错(生成快照,恢复快照),checkPoint简介,重启策略等

    1.15.Flink state(状态)管理与恢复 1.15.1.什么是state 1.15.2.状态(State) 1.15.3.Keyed State 1.15.4.Operator State ...

  2. Flink State 有可能代替数据库吗?

    有状态的计算作为容错以及数据一致性的保证,是当今实时计算必不可少的特性之一,流行的实时计算引擎包括 Google Dataflow.Flink.Spark (Structure) Streaming. ...

  3. Flink state缓存测试

    Flink state 在实际生产中的应用 一.FlinkState的概念 1.state分类 2.state backend 类型 二.实际应用如下 1.应用场景介绍 2.FsStateBacken ...

  4. Flink State和容错机制

    为什么80%的码农都做不了架构师?>>>    1. Flink Barriers Flink分布式快照的核心元素是流barriers. 这些barriers被注入数据流并与记录一起 ...

  5. flink state ttl

    flink ttl类型 本文用的flink1.7,一般来说ttl用在keyedStream中,官网只介绍了keyed flink ttl用法 import org.apache.flink.api.c ...

  6. Flink State 误用之痛,你中招了吗?

    简介:本文主要讨论一个问题:ValueState 中存 Map 与 MapState 有什么区别?如果不懂这两者的区别,而且使用 ValueState 中存大对象,生产环境很可能会出现以下问题:CPU ...

  7. Flink State 最佳实践

    本文主要分享与交流 Flink 状态使用过程中的一些经验与心得,当然标题取了"最佳实践"之名,希望文章内容能给读者带去一些干货.本文内容首先是回顾 state 相关概念,并认识和区 ...

  8. Flink State

    键控状态数据结构 1.值状态( Value State ) new ValueStateDescriptor("last-temp", Types.of[T]) • 将状态表示为单 ...

  9. 3种Flink State Backed| 你该用哪个?

    简介 01 .不同 State backend 吞吐量对比 02 .不同 State backend 延迟对比 03. State backend 的选择 04. MemoryStateBackend ...

  10. Flink State 深度讲解

    在基础篇中的 1.2 节中介绍了 Flink 是一款有状态的流处理框架.那么大家可能有点疑问,这个状态是什么意思?拿 Flink 最简单的 Word Count 程序来说,它需要不断的对 word 出 ...

最新文章

  1. 一个IO的传奇一生(8) -- elevator子系统
  2. js 正则判断字符串是否为字母或数字
  3. Java面试题2019简书_2019最新Spring面试题大全含答案之Spring Beans(2019最全Spring超级葵花宝典)...
  4. “约见”面试官系列之常见面试题第四十一篇之VUE生命周期(建议收藏)
  5. Python nonlocal 与 global 关键字解析
  6. HTML:常用代码(自用)
  7. 拓端tecdat|Python用稀疏、高斯随机投影和主成分分析PCA对MNIST手写数字数据进行降维可视化
  8. JS 表单submit() 提交无效的问题
  9. oracle -varchar ,varchar2
  10. keyshot渲染很慢_提高Keyshot逼真渲染的小技巧!
  11. C++中模板的特化与偏特化
  12. Leaving fence domain… found dlm lockspace /sys/kernel/dlm/rgmanager
  13. 账户结构,推广计划,推广单元
  14. 简单的图标移入效果(css缩放)
  15. 国民阅读经典:谈修养读后感
  16. Pytorch入门实战 | 第P2周:彩色图片识别
  17. 微信小程序 给数字或文字加横线,比如原价、已完成任务
  18. 数字化转型思考的延伸问题
  19. 大容量充电宝或成乘机“累赘”
  20. 如何部署超级签名分发平台系统?

热门文章

  1. Protein Cell:心血管疾病中的肠道微生物及其潜在的治疗应用
  2. 刷题小程序【程序猿面试宝典】开发(二)| 页面创建、页面配置、全局配置
  3. android 6.0.1 充电提示音
  4. venv虚拟环境中的pip更新失败问题
  5. [摘录]第11章 造就优势谈判的驱动力
  6. 视频会议系统/在线教育系统/企业远程视频办公通话会议系统EasyRTC,如何替换域名数字证书?
  7. 太原服务器系统租用,太原市弹性云服务器租赁
  8. 线性代数笔记3.2线性相关定理
  9. windows查看dll导出函数名
  10. 画数轴的步骤_如何用几何画板画数轴?