.

  • 一 .概念
    • 1.1. 什么是有状态的计算?
    • 1.2. 传统的流计算系统缺少对于程序状态的有效支持
    • 1.3. Flink丰富的状态访问和高效的容错机制
  • 二 .Keyed State
    • 2.1.保存state的数据结构
    • 2.2. 状态有效期 (TTL)
      • 2.2.1.过期数据的清理
      • 2.2.全量快照时进行清理
      • 2.3.增量数据清理
      • 2.4.在 RocksDB 压缩时清理
    • 2.3. DataStream 状态相关的 Scala API [mapWithState/flatMapWithState]
      • 2.3.1. mapWithState :
      • 2.3.2.flatMapWithState :
  • 三 .Operator State
    • 1.5.1. CheckpointedFunction
    • 1.5.2. 带状态的 Source Function
  • 四. Broadcast State

一 .概念

1.1. 什么是有状态的计算?

计算任务的结果不仅仅依赖于输入,还依赖于它的当前状态,其实大多数的计算都是有状态的计算。

比如wordcount,计算单词的count,这是一个很常见的业务场景。

count做为输出,在计算的过程中要不断的把输入累加到count上去,那么count就是一个state。

1.2. 传统的流计算系统缺少对于程序状态的有效支持

  • 状态数据的存储和访问;
  • 状态数据的备份和恢复;
  • 状态数据的划分和动态扩容。

在传统的批处理中,数据是划分为块分片去完成的,然后每一个Task去处理一个分片。
当分片执行完成后,把输出聚合起来就是最终的结果。
在这个过程当中,对于state的需求还是比较小的。

对于流计算而言,对State有非常高的要求,因为在流系统中输入是一个无限制的流,会运行很长一段时间,甚至运行几天或者几个月都不会停机。在这个过程当中,就需要将状态数据很好的管理起来。很不幸的是,在传统的流计算系统中,对状态管理支持并不是很完善。比如storm,没有任何程序状态的支持,一种可选的方案是storm+hbase这样的方式去实现,把这状态数据存放在Hbase中,计算的时候再次从Hbase读取状态数据,做更新在写入进去。这样就会有如下几个问题

  • 流计算系统的任务和Hbase的数据存储有可能不在同一台机器上,导致性能会很差。这样经常会做远端的访问,走网络和存储;

  • 备份和恢复是比较困难,因为Hbase是没有回滚的,要做到Exactly once 很困难。在分布式环境下,如果程序出现故障,只能重启Storm,那么Hbase的数据也就无法回滚到之前的状态。
    比如广告计费的这种场景,Storm+Hbase是是行不通的,出现的问题是钱可能就会多算,解决以上的办法是Storm+mysql,通过mysql的回滚解决一致性的问题。但是架构会变得非常复杂。性能也会很差,要commit确保数据的一致性。

  • 对于storm而言状态数据的划分和动态扩容也是非常难做。
    一个很严重的问题是所有用户都会在strom上重复的做这些工作,比如搜索,广告都要在做一遍,由此限制了部门的业务发展。

1.3. Flink丰富的状态访问和高效的容错机制

我们前面写的WordCount的例子,没有包含状态管理。如果一个Task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。

从容错和消息处理的语义上(at least once, exactly once),Flink引入了StateCheckpoint

首先区分一下两个概念:

State:

​ 一般指一个具体的Task/Operator的状态,

​ State数据默认保存在Java的堆内存中

Checkpoint:

​ 表示了一个Flink Job在一个特定时刻的一份全局状态快照,即包含了所有Task/Operator的状态

​ 可以理解为Checkpoint是把State数据定时持久化存储了,

State可以被记录,在失败的情况下数据还可以恢复

Flink中有两种基本类型的State

  • Keyed State
  • Operator State

可以以两种形式存在:原始状态托管状态

托管状态是由Flink框架管理的状态,如ValueState, ListState, MapState等。

而原始状态,由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。

通常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态。

二 .Keyed State

顾名思义,就是基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state。

stream.keyBy(…)

2.1.保存state的数据结构

  • ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值

  • ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值

  • ReducingState :这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值

  • MapState<UK, UV>:即状态值为一个map。用户通过put或putAll方法添加元素

所有类型的状态还有一个clear() 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。

需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄 .

你必须创建一个 StateDescriptor,才能得到对应的状态句柄。 这保存了状态名称(正如我们稍后将看到的,你可以创建多个状态,并且它们必须具有唯一的名称以便可以引用它们), 状态所持有值的类型,并且可能包含用户指定的函数,例如ReduceFunction。 根据不同的状态类型,可以创建ValueStateDescriptor,ListStateDescriptor, ReducingStateDescriptor 或 MapStateDescriptor。

状态通过 RuntimeContext 进行访问,因此只能在 rich functions 中使用。请参阅这里获取相关信息, 但是我们很快也会看到一个例子。RichFunction 中 RuntimeContext 提供如下方法:

RichFunction 中 RuntimeContext 提供如下方法:

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

示例 :


import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector/**** 实现了一个简单的计数窗口。 我们把元组的第一个元素当作 key(在示例中都 key 都是 “1”)。* 该函数将出现的次数以及总和存储在 “ValueState” 中。* 一旦出现次数达到 2,则将平均值发送到下游,并清除状态重新开始。** 请注意,我们会为每个不同的 key(元组中第一个元素)保存一个单独的值。**/
class CountWindowAverage extends RichFlatMapFunction[(Long,Long),(Long,Long)] {private var sum : ValueState[(Long,Long)] = _override def flatMap(input: (Long, Long), out : Collector[(Long, Long)]): Unit = {// 获取当前的状态值val tmpCurrentSum = sum.value()val currentSum = if(tmpCurrentSum != null) {tmpCurrentSum}else{(0L,0L)}// 获取当前值val newSum = (currentSum._1 + 1, currentSum._2 + input._2)// 更新statesum.update(newSum)// 如果计算达到2, 计算平均值,并清除stateif (newSum._1 >= 2) {out.collect((input._1,newSum._2/newSum._1))sum.clear()}}override def open(parameters: Configuration): Unit = {sum = getRuntimeContext.getState(new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)]))}}object ExampleCountWindowAverage extends  App {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.fromCollection(List((1L, 3L),(1L, 5L),(1L, 7L),(1L, 4L),(1L, 2L))).keyBy(_._1).flatMap(new CountWindowAverage()).print()// the printed output will be (1,4) and (1,5)env.execute("ExampleKeyedState")
}

2.2. 状态有效期 (TTL)

任何类型的 keyed state 都可以有 有效期 (TTL)。如果配置了 TTL 且状态值已过期,则会尽最大可能清除对应的值.
所有状态类型都支持单元素的 TTL。 这意味着列表元素和映射元素将独立到期。

在使用状态 TTL 前,需要先构建一个StateTtlConfig 配置对象。 然后把配置传递到 state descriptor 中启用 TTL 功能:

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Timeval ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).buildval stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)

TTL 配置有以下几个选项: newBuilder 的第一个参数表示数据的有效期,是必选项。

TTL 的更新策略(默认是 OnCreateAndWrite):

  • StateTtlConfig.UpdateType.OnCreateAndWrite - 仅在创建和写入时更新
  • StateTtlConfig.UpdateType.OnReadAndWrite - 读取时也更新

数据在过期但还未被清理时的可见性配置如下(默认为 NeverReturnExpired):

  • StateTtlConfig.StateVisibility.NeverReturnExpired - 不返回过期数据
  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 会返回过期但未清理的数据

NeverReturnExpired 情况下,过期数据就像不存在一样,不管是否被物理删除。这对于不能访问过期数据的场景下非常有用,比如敏感数据。
ReturnExpiredIfNotCleanedUp 在数据被物理删除前都会返回。

注意:
状态上次的修改时间会和数据一起保存在 state backend 中,因此开启该特性会增加状态数据的存储。 Heap state backend 会额外存储一个包括用户状态以及时间戳的 Java 对象,RocksDB state backend 会在每个状态值(list 或者 map 的每个元素)序列化后增加 8 个字节。
暂时只支持基于 processing time 的 TTL。
尝试从 checkpoint/savepoint 进行恢复时,TTL 的状态(是否开启)必须和之前保持一致,否则会遇到 “StateMigrationException”。
TTL 的配置并不会保存在 checkpoint/savepoint 中,仅对当前 Job 有效。
当前开启 TTL 的 map state 仅在用户值序列化器支持 null 的情况下,才支持用户值为 null。如果用户值序列化器不支持 null, 可以用 NullableSerializer 包装一层。

2.2.1.过期数据的清理

默认情况下,过期数据会在读取的时候被删除,例如 ValueState#value,同时会有后台线程定期清理(如果 StateBackend 支持的话)。可以通过 StateTtlConfig 配置关闭后台清理:

import org.apache.flink.api.common.state.StateTtlConfigval ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).disableCleanupInBackground.build

可以按照如下所示配置更细粒度的后台清理策略。当前的实现中 HeapStateBackend 依赖增量数据清理,RocksDBStateBackend 利用压缩过滤器进行后台清理。

2.2.全量快照时进行清理

可以启用全量快照时进行清理的策略,这可以减少整个快照的大小。当前实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据。
该策略可以通过 StateTtlConfig 配置进行配置:

import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Timeval ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupFullSnapshot.build

这种策略在 RocksDBStateBackend 的增量 checkpoint 模式下无效。

注意: 这种清理方式可以在任何时候通过 StateTtlConfig 启用或者关闭,比如在从 savepoint 恢复时。

2.3.增量数据清理

选择增量式清理状态数据,在状态访问或/和处理时进行。如果某个状态开启了该清理策略,则会在存储后端保留一个所有状态的惰性全局迭代器。 每次触发增量清理时,从迭代器中选择已经过期的数进行清理。

import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupIncrementally(10, true).build

该策略有两个参数。
第一个是每次清理时检查状态的条目数,在每个状态访问时触发。
第二个参数表示是否在处理每条记录时触发清理。
Heap backend 默认会检查 5 条状态,并且关闭在每条记录时触发清理。

注意:
1.如果没有 state 访问,也没有处理数据,则不会清理过期数据。
2.增量清理会增加数据处理的耗时。
3.现在仅 Heap state backend 支持增量清除机制。在 RocksDB state backend 上启用该特性无效。
4.如果 Heap state backend 使用同步快照方式,则会保存一份所有 key 的拷贝,从而防止并发修改问题,因此会增加内存的使用。但异步快照则没有这个问题。
5.对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。

2.4.在 RocksDB 压缩时清理

如果使用 RocksDB state backend,则会启用 Flink 为 RocksDB 定制的压缩过滤器。
RocksDB 会周期性的对数据进行合并压缩从而减少存储空间。
Flink 提供的 RocksDB 压缩过滤器会在压缩时过滤掉已经过期的状态数据。

该特性可以通过 StateTtlConfig 进行配置:

import org.apache.flink.api.common.state.StateTtlConfigval ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).cleanupInRocksdbCompactFilter(1000).build

Flink 处理一定条数的状态数据后,会使用当前时间戳来检测 RocksDB 中的状态是否已经过期, 你可以通过 StateTtlConfig.newBuilder(…).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法指定处理状态的条数。

时间戳更新的越频繁,状态的清理越及时,但由于压缩会有调用 JNI 的开销,因此会影响整体的压缩性能。
RocksDB backend 的默认后台清理策略会每处理 1000 条数据进行一次。

你还可以通过配置开启 RocksDB 过滤器的 debug 日志: log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG

注意:
压缩时调用 TTL 过滤器会降低速度。TTL 过滤器需要解析上次访问的时间戳,并对每个将参与压缩的状态进行是否过期检查。 对于集合型状态类型(比如 list 和 map),会对集合中每个元素进行检查。
对于元素序列化后长度不固定的列表状态,TTL 过滤器需要在每次 JNI 调用过程中,额外调用 Flink 的 java 序列化器, 从而确定下一个未过期数据的位置。
对已有的作业,这个清理方式可以在任何时候通过 StateTtlConfig 启用或禁用该特性,比如从 savepoint 重启后。

2.3. DataStream 状态相关的 Scala API [mapWithState/flatMapWithState]

除了上面描述的接口之外,Scala API 还在 KeyedStream 上对 map() 和 flatMap() 访问 ValueState 提供了一个更便捷的接口。
用户函数能够通过 Option 获取当前 ValueState 的值,并且返回即将保存到状态的值。

2.3.1. mapWithState :


package com.boyi.stateimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._/***** mapWithState 函数说明 :** mapWithState[R, S]*  (fun : scala.Function2[T, scala.Option[S], scala.Tuple2[ R, scala.Option[S] ]])* R: TypeInformation (return返回类型)* S: TypeInformation (stateful状态类型)* T (input 输入类型)* fun: (T, Option[S]) => (R, Option[S]) 函数将输入泛型转化了R,状态泛型没有变化**** 案例 : 计算 动态计算 每个key的平均数*/
object MapWithStateOp extends  App {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.fromCollection(List((1L, 3L),(1L, 5L),(1L, 7L),(1L, 4L),(1L, 2L),(2L,10L),(2L,2L))).keyBy(_._1).mapWithState[(Long,Long),(Long,Long)]((input: (Long,Long), out: Option[(Long,Long)]) =>out match {case Some(state) => {// return  : (key,平均数) , ()// state   : (value个数,value总和)((input._1,((state._2+input._2)/(state._1+1))),Some((state._1+1, state._2+input._2)))}case None => ((input._1,input._2),Some((1L,input._2)))}).map(x => {"key : "+x._1+" ===> avg : "+x._2}).print()env.execute("MapWithStateOp")
}

2.3.2.flatMapWithState :


package com.boyi.stateimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._/***** flatMapWithState 函数说明 :** flatMapWithState[R, S]*  (fun : scala.Function2[T, scala.Option[S], scala.Tuple2[ scala.TraversableOnce[R], scala.Option[S] ]])* R: TypeInformation (return返回类型)* S: TypeInformation (stateful状态类型)* T (input 输入类型)* fun: (T, Option[S]) => (R, Option[S]) 函数将输入泛型转化了R,状态泛型没有变化**** 案例 : 计算 动态计算 每个key的平均数*/
object FlatMapWithStateOp extends  App {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.fromCollection(List((1L, 3L),(1L, 5L),(1L, 7L),(1L, 4L),(1L, 2L),(2L,10L),(2L,2L))).keyBy(_._1)//.flatMapWithState[(Long,Long),(Long,Long)]((input: (Long,Long), state: Option[(Long,Long)]) =>state match {case Some(state) => {// return  : (key,平均数) , ()// state   : (value个数,value总和)( List((input._1,((state._2+input._2)/(state._1+1)))),Some((state._1+1,state._2+input._2)) )}case None => ( List((input._1,input._2)),Some((1L,input._2)) )}).map(x=> "key : "+x._1 + "  ====> value : " + x._2).print()env.execute("FlatMapWithStateOp")
}

三 .Operator State

  • 与Key无关的State,与Operator绑定的state,整个operator只对应一个state

  • 保存state的数据结构

    • ListState
  • 举例来说,Flink中的Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射

1.5.1. CheckpointedFunction

用户可以通过实现 CheckpointedFunction 接口来使用 operator state。
CheckpointedFunction 接口提供了访问 non-keyed state 的方法,需要实现如下两个方法:

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;

进行 checkpoint 时会调用 snapshotState()。 用户自定义函数初始化时会调用 initializeState(),初始化包括第一次自定义函数初始化和从之前的 checkpoint 恢复。 因此 initializeState() 不仅是定义不同状态类型初始化的地方,也需要包括状态恢复的逻辑。

当前 operator state 以 list 的形式存在。这些状态是一个 可序列化 对象的集合 List,彼此独立,方便在改变并发后进行状态的重新分派。 换句话说,这些对象是重新分配 non-keyed state 的最细粒度。根据状态的不同访问方式,有如下几种重新分配的模式:

  • Even-split redistribution: 每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成。当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配。 比如说,算子 A 的并发读为 1,包含两个元素 element1 和 element2,当并发读增加为 2 时,element1 会被分到并发 0 上,element2 则会被分到并发 1 上。

  • Union redistribution: 每个算子保存一个列表形式的状态集合。整个状态由所有的列表拼接而成。当作业恢复或重新分配时,每个算子都将获得所有的状态数据。 Do not use this feature if your list may have high cardinality. Checkpoint metadata will store an offset to each list entry, which could lead to RPC framesize or out-of-memory errors.

下面的例子中的 SinkFunction 在 CheckpointedFunction 中进行数据缓存,然后统一发送到下游,这个例子演示了列表状态数据的 event-split redistribution。

package com.boyi.stateimport org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction.Contextimport scala.collection.mutable.ListBufferclass BufferingSink(threshold: Int = 0)extends SinkFunction[(String, Int)]with CheckpointedFunction {@transientprivate var checkpointedState: ListState[(String, Int)] = _private val bufferedElements = ListBuffer[(String, Int)]()override def invoke(value: (String, Int), context: Context): Unit = {bufferedElements += valueif (bufferedElements.size == threshold) {for (element <- bufferedElements) {// send it to the sink}bufferedElements.clear()}}override def snapshotState(context: FunctionSnapshotContext): Unit = {checkpointedState.clear()bufferedElements.foreach(element => {checkpointedState.add(element)})}override def initializeState(context: FunctionInitializationContext): Unit = {val descriptor = new ListStateDescriptor[(String, Int)]("buffered-elements",TypeInformation.of(new TypeHint[(String, Int)]() {}))checkpointedState = context.getOperatorStateStore.getListState(descriptor)if(context.isRestored) {val iterate  = checkpointedState.get()iterate.forEach(element => bufferedElements += element )}}}

initializeState 方法接收一个 FunctionInitializationContext 参数,会用来初始化 non-keyed state 的 “容器”。这些容器是一个 ListState 用于在 checkpoint 时保存 non-keyed state 对象。

注意这些状态是如何初始化的,和 keyed state 类系,StateDescriptor 会包括状态名字、以及状态类型相关信息。

val descriptor = new ListStateDescriptor[(String, Long)]("buffered-elements",TypeInformation.of(new TypeHint[(String, Long)]() {})
)// 调用不同的获取状态对象的接口,会使用不同的状态分配算法。
checkpointedState = context.getOperatorStateStore.getListState(descriptor)

调用不同的获取状态对象的接口,会使用不同的状态分配算法。
比如 getUnionListState(descriptor) 会使用 union redistribution 算法,
而 getListState(descriptor) 则简单的使用 even-split redistribution 算法。

当初始化好状态对象后,我们通过 isRestored() 方法判断是否从之前的故障中恢复回来,如果该方法返回 true 则表示从故障中进行恢复,会执行接下来的恢复逻辑。

正如代码所示,BufferingSink 中初始化时,恢复回来的 ListState 的所有元素会添加到一个局部变量中,供下次 snapshotState() 时使用。 然后清空 ListState,再把当前局部变量中的所有元素写入到 checkpoint 中。

另外,我们同样可以在 initializeState() 方法中使用 FunctionInitializationContext 初始化 keyed state。

1.5.2. 带状态的 Source Function

带状态的数据源比其他的算子需要注意更多东西。
为了保证更新状态以及输出的原子性(用于支持 exactly-once 语义),用户需要在发送数据前获取数据源的全局锁。


import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}import scala.collection.JavaConverters.iterableAsScalaIterableConverter/*** 带状态的 Source Function* 带状态的数据源比其他的算子需要注意更多东西。** 为了保证更新状态以及输出的原子性(用于支持 exactly-once 语义),用户需要在发送数据前获取数据源的全局锁。**/
class CounterSourceextends RichParallelSourceFunction[Long]with CheckpointedFunction {@volatileprivate var isRunning = trueprivate var offset = 0Lprivate var state: ListState[Long] = _override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {val lock = ctx.getCheckpointLockwhile (isRunning) {// output and state update are atomiclock.synchronized({ctx.collect(offset)offset += 1})}}override def cancel(): Unit = isRunning = falseoverride def initializeState(context: FunctionInitializationContext): Unit = {state = context.getOperatorStateStore.getListState(new ListStateDescriptor[Long]("state", classOf[Long]))for (l <- state.get().asScala) {offset = l}}override def snapshotState(context: FunctionSnapshotContext): Unit = {state.clear()state.add(offset)}
}

四. Broadcast State

广播状态是一种特殊的操作状态。

Broadcast State is a special type of Operator State. It was introduced to support use cases where records of one stream need to be broadcasted to all downstream tasks, where they are used to maintain the same state among all subtasks. This state can then be accessed while processing records of a second stream. As an example where broadcast state can emerge as a natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all elements coming from another stream. Having the above type of use cases in mind, broadcast state differs from the rest of operator states in that:

  1. it has a map format,
  2. it is only available to specific operators that have as inputs a broadcasted stream and a non-broadcasted one, andsuch an operator can have multiple broadcast states with different names.

官方文档 :

https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/stream/state/state.html

Flink实操 : 状态管理相关推荐

  1. 详解Flink中的状态管理

    流式计算分为无状态和有状态两种情况.无状态的计算观察每个独立事件,并根据最后一个事件输出结果.例如:流处理应用程序从传感器接收温度读数,并在温度超过90度时发出警告.有状态的计算则会基于多个事件输出结 ...

  2. Flink中的状态管理

    1 Flink中的状态   当数据流中的许多操作只查看一个每次事件(如事件解析器),一些操作会跨多个事件的信息(如窗口操作).这些操作称为有状态.状态由一个任务维护,并且用来计算某个结果的所有数据,都 ...

  3. Flink实操 : 广播变量/累加器/分布式缓存

    . 一 .前言 二 .广播变量使用 2.1.前言 2.2. 使用 三 .累加器 3.1. 前言 3.2. 使用 四 .分布式缓存 4.1. 前言 4.2.使用 一 .前言 二 .广播变量使用 2.1. ...

  4. Flink实操 : DataSource操作

    . 一 .前言 二 .四种读取类型 2.1. 基于本地集合的source(Collection-based-source) 2.2. 基于文件的source(File-based-source) 2. ...

  5. Flink实操 : 算子操作

    . 一 .前言 二 .算子操作 2.1. map 2.2. flatMap 2.3. mapPartition 2.4. filter 2.5. reduce/groupBy 2.6. reduceG ...

  6. Flink实操 : Sink操作

    . 一 .前言 二 .类型 2.1. 基于本地集合的sink 2.2. 基于文件的sink 2.2.1.将数据写入本地文件 2.2.2.将数据写入HDFS 2.3. Kafka Sink 2.4. M ...

  7. Flink状态管理和容错机制介绍

    作者: 施晓罡 本文来自2018年8月11日在北京举行的 Flink Meetup会议,分享来自于施晓罡,目前在阿里大数据团队部从事Blink方面的研发,现在主要负责Blink状态管理和容错相关技术的 ...

  8. supervisor 守护多个进程_supervisor守护进程管理实操笔记

    2020年年后工作中需开发一支持多数据源自动上报业务数据的程序,程序开发完部署上线时需要对其进程进行自动管理,不然哪天程序down了还不知,可就麻烦了,所以这里选用了强大的supervisor,以下文 ...

  9. 2021年危险化学品生产单位安全生产管理人员考试试卷及危险化学品生产单位安全生产管理人员实操考试视频

    题库来源:安全生产模拟考试一点通公众号小程序 安全生产模拟考试一点通:危险化学品生产单位安全生产管理人员考试试卷是安全生产模拟考试一点通生成的,危险化学品生产单位安全生产管理人员证模拟考试题库是根据危 ...

最新文章

  1. cdialog创建后马上隐藏_隐藏你的小秘密,这款神器就是玩的这么6!
  2. 云桌面可附加桌面文件的程序_给我几分钟,还你一个小清新的电脑桌面
  3. python把桢写入txt_ffmpeg 常用参数一览表及python 使用示例
  4. python re模块下载_python re模块详解
  5. 6、mybatis中的sql映射文件详解(1)
  6. java 封闭实例_不能访问类型…的封闭实例
  7. criteria函数_干货铺 | 二级MS office考试中一些常考函数(2)
  8. java thread lambda_Java8新特性--Lambda表达式
  9. while(0)循环还执行吗_for循环
  10. php定义浏览器编码,从php脚本到浏览器,编码方式浅析
  11. 两个选择框 ajax如何根据另一个选择框的内容获取_Python数据结构:数据框
  12. 数据结构与算法(二):线性表、栈、树(二叉树,AVL树)、图
  13. QPainter绘图基本使用
  14. 3dsMax---二维图形[描图]
  15. 语义web一些简单示例
  16. Gitlab集成Sonarqube实现自动检测代码并发送报告给提交者
  17. xml充当数据库实现电影院购票管理系统
  18. USB过压保护芯片,高输入电压充电器(OVP)
  19. Flume编写拦截器
  20. 并行计算机概述--性能和评估标准

热门文章

  1. 简易的web全栈开发——服务器部分
  2. 自己写的ajax通用 脚本
  3. flex:1是什么?
  4. CCVP 642-446认证介绍
  5. 弹幕插件easyDanmaku.js使用详解
  6. 关于Topic设计的思考
  7. rebar3简单使用
  8. 太神了!世界上最著名的菲尔人格测试!
  9. MTK9652和Mstar938的区别
  10. 忆享聚焦|全球云计算市场份额、数字虚拟人、“元宇宙”实体店……近期行业热点速览