Flink DataStream的Operator State、Keyed State、checkpoint、Savepoint、State Backends的使用和讲解
目录
- 1. 什么operator有状态
- 2. Operator State
- 2.1 Broadcast state的使用
- 2.2 List state和Union List state的使用
- 2.3 Stateful Source Functions(List state和Union List state的特例)
- 3. Keyed State
- 3.1 Keyed state的使用
- 3.2 State Time-To-Live (TTL)
- 4. checkpoint的设置
- 5. Savepoint
- 6. State Backends
1. 什么operator有状态
什么operator没有状态
- 像map函数这种,在一个slot中,一次只处理一个元素,是没有状态的
什么operator有状态
- 像FlinkKafkaConsumer这种Source,需要保存消费每个kafka partition的offset, 这就是一种状态
- 像Window函数这种,包含多个元素时,需要保存窗口的信息和窗口的元素,这就是一种状态
- 对于Sink函数这种,需要保存进入Sink但还未更新到目标源的元素, 这就是一种状态
- 在CheckpointedFunction(operator state)或RichFunction(keyed state)中自定义State(ListState), 这就是一种状态
状态如何保证Exactly-Once语义
- 对于程序停止时进行savepoint, 再从此savepoint进行恢复,能保证Exactly-Once
- 对于程序异常停止,当从checkpoint或savepoint恢复时,需要Source能退回到以前的消费点进行消费(如kafka), 下游的目标对于checkpoint为未完成的数据,是不可见的(如kafka和FileSink),才能保证Exactly-Once语义。当然也可在Sink对数据进行去重,能避免数据增多,但是会产生结果不正确的数据
2. Operator State
- 非Keyed State, 比如FlinkKafkaConsumer、Sink
- State的数据类型有broadcast state、list state、union list state
2.1 Broadcast state的使用
- 发送到rule_input的数据
[root@bigdata001 ~]#
[root@bigdata001 ~]# nc -lk 9998
1,2021-09-26 12:00:01
2,2021-09-26 12:00:02
3,2021-09-26 12:00:03
- 发送到item_input的数据
[root@bigdata001 ~]#
[root@bigdata001 ~]# nc -lk 9999
1,2021-09-26 12:10:01
3,2021-09-26 12:10:03
2,2021-09-26 12:10:02
4,2021-09-26 12:10:04
1,2021-09-26 12:10:01
- 示例代码
package datastreamApiimport org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.eventtime._
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector
import scala.collection.JavaConversions.iterableAsScalaIterableclass RecordTimestampAssigner extends TimestampAssigner[(Int, String)] {val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")override def extractTimestamp(element: (Int, String), recordTimestamp: Long): Long = {fdf.parse(element._2).getTime}}class PeriodWatermarkGenerator extends WatermarkGenerator[(Int, String)] {val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")var maxTimestamp: Long = _val maxOutofOrderness = 0override def onEvent(event: (Int, String), eventTimestamp: Long, output: WatermarkOutput): Unit = {maxTimestamp = math.max(fdf.parse(event._2).getTime, maxTimestamp)}override def onPeriodicEmit(output: WatermarkOutput): Unit = {output.emitWatermark(new Watermark(maxTimestamp - maxOutofOrderness - 1))}
}class MyWatermarkStrategy extends WatermarkStrategy[(Int, String)] {override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[(Int, String)] = {new RecordTimestampAssigner()}override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[(Int, String)] = {new PeriodWatermarkGenerator()}}object BroadcastStateTest {def main(args: Array[String]): Unit = {val senv = StreamExecutionEnvironment.getExecutionEnvironmentval rule_input = senv.socketTextStream("192.168.xxx.xxx", 9998).map(line => {val contents = line.split(",")(contents(0).toInt, contents(1))}).assignTimestampsAndWatermarks(new MyWatermarkStrategy())val item_input = senv.socketTextStream("192.168.xxx.xxx", 9999).map(line => {val contents = line.split(",")(contents(0).toInt, contents(1))}).assignTimestampsAndWatermarks(new MyWatermarkStrategy())// 1. 定义一个广播状态描述符val broadcast_descriptor = new MapStateDescriptor[String, (Int, String)]("my_broadcast", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(classOf[(Int, String)]))// 2. 本示例有8个slot, 每个slot都有3条数据和一个Broadcast stateval rule_input_broadcast = rule_input.broadcast(broadcast_descriptor)val output = item_input.connect(rule_input_broadcast).process(new BroadcastProcessFunction[(Int, String), (Int, String), String] {override def processBroadcastElement(in2: (Int, String), context: BroadcastProcessFunction[(Int, String), (Int, String), String]#Context, collector: Collector[String]): Unit = {// 3. 将一个slot上的3条数据,保存到Broadcast statecontext.getBroadcastState(broadcast_descriptor).put(in2._1.toString, in2)}override def processElement(in1: (Int, String), readOnlyContext: BroadcastProcessFunction[(Int, String), (Int, String), String]#ReadOnlyContext, collector: Collector[String]): Unit = {// 4. 获取slot1上的Broadcast state,并转换为iterable类型,其中有3条数据val rules = readOnlyContext.getBroadcastState(broadcast_descriptor).immutableEntries()// 5. 将slot1上的非广播流元素与iterable进行匹配,符合则输出for (rule <- rules) {if (rule.getValue._1 == in1._1) {collector.collect(s"${in1._1.toString}------${rules.map(_.getValue._1).mkString(",")}")}}}})output.print("output")senv.execute("BroadcastTest")}}
执行顺序
- 启动端口9998和9999的ncat
- 启动程序
- 向9998发送rule_input的数据
- 向9999发送item_input的数据
执行结果
output:8> 1------1,2,3
output:1> 3------1,2,3
output:2> 2------1,2,3
output:4> 1------1,2,3
- 注意事项
- 如果rule_input和item_input采用senv.fromElements的方式生成数据,虽然通过assignTimestampsAndWatermarks指定了timestamp, 但是process函数并不依赖timestamp, 可能processElement函数比processBroadcastElement函数先执行
- 所有本示例我们采用socketTextStream的方式生成数据,先发送rule_input的数据,隔几秒钟再发送item_input的数据,让processBroadcastElement函数比processElement函数先执行
2.2 List state和Union List state的使用
package datastreamApiimport org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}import scala.collection.JavaConversions.{bufferAsJavaList, iterableAsScalaIterable}
import scala.collection.mutable.ArrayBufferclass My_sink_function extends RichSinkFunction[Int] with CheckpointedFunction {val threshold = 1000var checkpointed_list_state: ListState[Int] = _val buffered_elements: ArrayBuffer[Int] = ArrayBuffer()// CheckpointedFunction方法override def initializeState(functionInitializationContext: FunctionInitializationContext): Unit = {val list_state_descriptor = new ListStateDescriptor[Int]("my_list_state", TypeInformation.of(classOf[Int]))checkpointed_list_state = functionInitializationContext.getOperatorStateStore.getListState(list_state_descriptor)if (functionInitializationContext.isRestored) {buffered_elements ++= checkpointed_list_state.get().toSeq}}override def invoke(value: Int, context: SinkFunction.Context): Unit = {buffered_elements += valueif (buffered_elements.size == threshold) {// 发送buffered_elements到sink}buffered_elements.clear()}// CheckpointedFunction方法override def snapshotState(functionSnapshotContext: FunctionSnapshotContext): Unit = {checkpointed_list_state.clear()// 一个slot中的list state,只保存该slot中的元素checkpointed_list_state.addAll(buffered_elements)}}object ListStateTest {def main(args: Array[String]): Unit = {val senv = StreamExecutionEnvironment.getExecutionEnvironmentval input = senv.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9)input.addSink(new My_sink_function())senv.execute("ListStateTest")}}
OperatorStateStore.scala
package org.apache.flink.api.common.state;import java.util.Set;
import org.apache.flink.annotation.PublicEvolving;@PublicEvolving
public interface OperatorStateStore {<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> var1) throws Exception;<S> ListState<S> getListState(ListStateDescriptor<S> var1) throws Exception;<S> ListState<S> getUnionListState(ListStateDescriptor<S> var1) throws Exception;Set<String> getRegisteredStateNames();Set<String> getRegisteredBroadcastStateNames();
}
- 需继承CheckpointedFunction,才能使用
- 第一次执行,或从checkpoint或savepoint恢复时,调用initializeState函数
- 进行checkpoint或savepoint时,调用snapshotState函数
- 从OperatorStateStore.scala可以看出,List state和Union List state其实是一样的,区别在于,当从checkpoint或savepoint恢复时:
- getListState将该operator所有slot的List State进行合并,然后再平分到所有slot
- getUnionListState将该operator所有slot的List State进行合并,然后再将合并后的List State直接发送到所有slot
2.3 Stateful Source Functions(List state和Union List state的特例)
- 需指定锁lock, 保证发送数据和状态更新的原子性,保证Exactly-Once语义
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}import scala.collection.JavaConversions.iterableAsScalaIterableclass CounterSource extends RichParallelSourceFunction[Long] with CheckpointedFunction {private var isRunning = trueprivate var offset = 0Lprivate var list_state: ListState[Long] = _override def initializeState(functionInitializationContext: FunctionInitializationContext): Unit = {val list_state_descriptor = new ListStateDescriptor[Long]("my_list_state", TypeInformation.of(classOf[Long]))list_state = functionInitializationContext.getOperatorStateStore.getListState(list_state_descriptor)if (functionInitializationContext.isRestored) {offset = list_state.get().toSeq.apply(0)}}override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = {val lock = sourceContext.getCheckpointLockwhile (isRunning) {// 发送数据进行加锁lock.synchronized({sourceContext.collect(offset)offset += 1})}}override def cancel(): Unit = isRunning = falseoverride def snapshotState(functionSnapshotContext: FunctionSnapshotContext): Unit = {list_state.clear()list_state.add(offset)}}
3. Keyed State
- keyBy操作后,一个key对应一个state, key和state位于相同的slot, 确保state update是本地操作,且便于重新分配state
- State的数据类型如下表所示:
类型 | 功能 | 方法 |
---|---|---|
ValueState<T> | 保存一个单值 |
更新值:void update(T var1) throws IOException; 获取值:T value() throws IOException; |
ListState<T> | 保存多个值 |
添加一个值:void add(IN var1) throws Exception; 添加一个列表:void addAll(List<T> var1) throws Exception; 用一个列表更新list state所有元素:void update(List<T> var1) throws Exception; 获取所有值,返回一个iterable: Iterable get() throws Exception; |
MapState<K, V> | 保存多对映射 |
插入或更新一对映射:void put(K var1, V var2) throws Exception; 插入或更新多对映射:void putAll(Map<K, V> var1) throws Exception; 获取一个key的值:V get(K var1) throws Exception; 获取iterable形式的所有映射:Iterable<Entry<K, V>> entries() throws Exception; 获取iterator形式的所有映射:Iterator<Entry<K, V>> iterator() throws Exception; 获取所有key值:Iterable<K> keys() throws Exception; 获取所有value值:Iterable<V> values() throws Exception; 判断一个key是否存在:boolean contains(K var1) throws Exception; 判断state是否有元素:boolean isEmpty() throws Exception; 移除一个key对应的值:void remove(K var1) throws Exception; |
ReducingState<T> | 保存一个单值, stateDescriptor中需指定一个ReduceFunction, 对元素进行聚合, 使用请参考下面的reducingStateDescriptor的生成 |
添加一个值:void add(T var1) throws Exception; 获取聚合后的结果值:T get() throws Exception; |
AggregatingState<IN, OUT> | 保存一个单值, stateDescriptor中需指定一个AggregateFunction, 对元素进行聚合, 使用请参考下面的aggregatingStateDescriptor的生成 |
添加一个值:void add(IN var1) throws Exception; 获取聚合后的结果值:OUT get() throws Exception; |
reducingStateDescriptor的生成:
new ReducingStateDescriptor[T]("state_name",new ReduceFunction[T] {override def reduce(t1: T, t2: T): T = // do something},TypeInformation.of(classOf[T])
)
aggregatingStateDescriptor的生成:
new AggregatingStateDescriptor[Long, (Long,Long), Double]("my_aggregate_state",new AggregateFunction[Long, (Long,Long), Double] {override def createAccumulator(): (Long,Long) = (0L, 0L)override def add(in: Long, acc: (Long,Long)): (Long,Long) = (acc._1+1, acc._2 + in)override def merge(acc1: (Long,Long), acc2: (Long,Long)): (Long,Long) =(acc1._1+acc2._1, acc1._2 + acc2._2)override def getResult(acc: (Long,Long)): Double = acc._2.toDouble / acc._1},TypeInformation.of(classOf[(Long,Long)])
)
3.1 Keyed state的使用
- 这里我们用ValueState<T>来进行说明,其它的数据类型参考即可
- 只能在RichFunction中调用getRuntimeContext来访问state
package datastreamApiimport org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector// 求每两个数的平均值
class MyAverageFunction extends RichFlatMapFunction[(String, Long), (String, Double)] {private var value_state: ValueState[(Long, Long)] = _override def open(parameters: Configuration): Unit = {val value_state_descriptor = new ValueStateDescriptor[(Long, Long)]("my_value_state", TypeInformation.of(classOf[(Long, Long)]))value_state = getRuntimeContext.getState(value_state_descriptor)}override def flatMap(in: (String, Long), collector: Collector[(String, Double)]): Unit = {val tmp_value: (Long, Long) = value_state.value()val current_value =if (tmp_value != null) tmp_value else (0L, 0L)val new_value = (current_value._1 + 1, current_value._2 + in._2)value_state.update(new_value)if (new_value._1 == 2) {collector.collect((in._1, new_value._2.toDouble / new_value._1))value_state.clear()}}}object KeyStateTest {def main(args: Array[String]): Unit = {val senv = StreamExecutionEnvironment.getExecutionEnvironmentval input = senv.fromElements(("A", 10L), ("A", 20L), ("A", 30L),("B", 100L), ("B", 200L), ("B", 300L))val output = input.keyBy(_._1).flatMap(new MyAverageFunction())output.print("output")senv.execute("KeyStateTest")}}
执行结果:
output:2> (B,150.0)
output:7> (A,15.0)
3.2 State Time-To-Live (TTL)
- TTL的精确粒度能到达单个元素,如ListState或MapState中的一个值
- TTL中的时间是基于Processing Time的,State的更新时间和State一起保存在State backend
- StateTtlConfig中的配置不会保存到State backend,当从checkpoint或savepoint恢复时,代码的TTL是否开启和checkpoint或savepoint的TTL是否开启要保持一致
使用方式如下:
import org.apache.flink.api.common.state.StateTtlConfig.{StateVisibility, UpdateType}
import org.apache.flink.api.common.state.{StateTtlConfig, ValueStateDescriptor}
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.TypeInformationval state_ttl_config = StateTtlConfig.newBuilder(Time.days(1L)).setUpdateType(UpdateType.OnCreateAndWrite).setStateVisibility(StateVisibility.NeverReturnExpired)// .cleanupFullSnapshot()// .cleanupInRocksdbCompactFilter(1000L).build()val value_state_descriptor =new ValueStateDescriptor[Long]("my_value_state", TypeInformation.of(classOf[Long])).enableTimeToLive(state_ttl_config)
state过期参数说明:
- newBuilder(@Nonnull Time ttl): 设置state的过期时间
- setUpdateType(StateTtlConfig.UpdateType updateType):
- OnCreateAndWrite(默认):当创建或写入state时,重置过期时间
- OnReadAndWrite: 当读取或写入state时,重置过期时间
- setStateVisibility(@Nonnull StateTtlConfig.StateVisibility stateVisibility):
- NeverReturnExpired(默认): 不返回过期但未删除的state
- ReturnExpiredIfNotCleanedUp: 返回过期但未删除的state
通用的state删除说明:
- 默认会在读取一个state时,删除state中过期的元素
full state snapshot backend的state删除说明:
- 适用于HashMapStateBackend+FileSystemCheckpointStorage、EmbeddedRocksDBStateBackend全量模式
- cleanupFullSnapshot():进行全量snapshot时,对state backend的过期数据进行删除,但不会删除本地state中的元素
Incremental state snapshot backend的state删除说明:
- 适用于EmbeddedRocksDBStateBackend增量模式
- cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries):当处理了一定次数的state时,就会用compaction filter方式对state中的元素进行增量删除
4. checkpoint的设置
- checkpoint由Flink触发,主要用于程序失败重试时的故障恢复
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentval senv = StreamExecutionEnvironment.getExecutionEnvironment// checkpoint默认关闭,每30秒进行一次checkpoint, 需在服务器压力和重复处理数据量之间进行权衡
// CheckpointingMode为EXACTLY_ONCE(默认)或AT_LEAST_ONCE, 区别在于AT_LEAST_ONCE不需要Barrier Alignment
senv.enableCheckpointing(30L*1000, CheckpointingMode.EXACTLY_ONCE)// 一次checkpoint的时间超过10分钟(默认时间),则删除此checkpoint(不算失败)
senv.getCheckpointConfig.setCheckpointTimeout(600L*1000)// 上一次checkpoint结束到此次checkpoint开始,之间空闲10秒(默认为0),用于缓解服务器压力
// 如果大于1,则maxConcurrentCheckpoints失效
senv.getCheckpointConfig.setMinPauseBetweenCheckpoints(10L*1000)// 同时允许运行的checkpoint数量(默认为1)
senv.getCheckpointConfig.setMaxConcurrentCheckpoints(1)// 当多少次checkpoint失败时,才导致application失败,默认为0
senv.getCheckpointConfig.setTolerableCheckpointFailureNumber(0)// 当服务器压力很大,导致checkpoint的时间特别长(需要耗时等待Barrier Alignment)
// 开启不对齐的checkpoint, 未对齐的部分也会被checkpoint,默认为false
// 只在CheckpointingMode.EXACTLY_ONCE和maxConcurrentCheckpoints = 1才有效
senv.getCheckpointConfig.enableUnalignedCheckpoints(true)// 当stop application时,保留state backend中的最近几个checkpoint,就可以和savepoint一样进行恢复
// 默认为DELETE_ON_CANCELLATION
senv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
)// checkpint的FileSystemCheckpointStorage保存位置
senv.getCheckpointConfig.setCheckpointStorage("hdfs://nnha/flink/ha/checkpoint/")
5. Savepoint
- savepoint需要我们自己手动触发执行,实现机制和checkpoint一样,主要用于改变并行度、程序升级、集群升级等场景
- savepoint的触发执行命令:
- standalone模式:
bin/flink savepoint :jobId [:targetDirectory]
- standalone模式(stop application):
bin/flink savepoint -yid :yarnAppId :jobId [:targetDirectory]
- yarn模式:
bin/flink stop --savepointPath [:targetDirectory] :jobId
- yarn模式(stop application):
bin/flink stop --savepointPath [:targetDirectory] -yid :yarnAppId :jobId
- standalone模式:
6. State Backends
提供了3种State Backend
- 默认为HashMapStateBackend + JobManagerCheckpointStorage
- task manager的本地state保存在heap memory, snapshot保存在job manager的heap memory
- flink-conf.yaml配置
state.backend: hashmap
state.checkpoint-storage: jobmanager
- 代码配置
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentval senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStateBackend(new HashMapStateBackend())
senv.getCheckpointConfig.setCheckpointStorage(new JobManagerCheckpointStorage())
- HashMapStateBackend + FileSystemCheckpointStorage
- task manager的本地state保存在heap memory, snapshot保存在HDFS
- flink-conf.yaml配置
state.backend: hashmap
state.checkpoint-storage: filesystem
state.checkpoints.dir: hdfs://nnha/flink/ha/checkpoint/state.savepoints.dir: hdfs://nnha/flink/ha/savepoint/
- 代码配置
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentval senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStateBackend(new HashMapStateBackend())
// senv.getCheckpointConfig.setCheckpointStorage("hdfs://nnha/flink/ha/checkpoint/")
senv.getCheckpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://nnha/flink/ha/checkpoint/",-1, // 当State小于该参数(单位KB), 则储存为metadata,而不是file, 默认为-1,表示该参数的值由runtime configuration决定-1 // State序列化时的缓存大小,默认为-1,表示该参数的值由runtime configuration决定)
)
- EmbeddedRocksDBStateBackend
task manager的本地state保存在local文件, snapshot保存在HDFS
能进行增量snapshot
flink-conf.yaml配置
state.backend: rocksdb
state.checkpoint-storage: filesystem
state.checkpoints.dir: hdfs://nnha/flink/ha/checkpoint/state.savepoints.dir: hdfs://nnha/flink/ha/savepoint/
- 代码配置
/* IDEA中需添加依赖<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.11</artifactId><version>1.13.2</version><scope>provided</scope>
</dependency>*/import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentval senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStateBackend(new EmbeddedRocksDBStateBackend())
// senv.getCheckpointConfig.setCheckpointStorage("hdfs://nnha/flink/ha/checkpoint/")
senv.getCheckpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://nnha/flink/ha/checkpoint/",-1, // 当State小于该参数(单位KB), 则储存为metadata,而不是file, 默认为-1,表示该参数的值由runtime configuration决定-1 // State序列化时的缓存大小,默认为-1,表示该参数的值由runtime configuration决定)
)
Flink DataStream的Operator State、Keyed State、checkpoint、Savepoint、State Backends的使用和讲解相关推荐
- Flink实战问题(三): Failed to rollback to checkpoint/savepoint
一.背景 Flink cdc 的sql做进行调整,添加where条件过滤.调整sql,现在进行升级重启,想从check poit恢复数据,出现一下问题 二:错误 Caused by: java.lan ...
- Flink DataStream API(基础版)
概述 DataStream(数据流)本身是 Flink 中一个用来表示数据集合的类(Class),我们编写的 Flink 代码其实就是基于这种数据类型的处理,所以这套核心API 就以DataStr ...
- Flink DataStream API 介绍
Flink DataStream API 介绍 StreamExecutionEnvironment #mermaid-svg-JKeWa22W2vWA4zBS {font-family:" ...
- Flink专题四:Flink DataStream 窗口介绍及使用
由于工作需要最近学习flink 现记录下Flink介绍和实际使用过程 这是flink系列的第四篇文章 Flink DataStream 窗口介绍及使用 窗口介绍 时间窗口 翻滚窗口(数据以一个时间断为 ...
- flink DataStream API使用及原理
传统的大数据处理方式一般是批处理式的,也就是说,今天所收集的数据,我们明天再把今天收集到的数据算出来,以供大家使用,但是在很多情况下,数据的时效性对于业务的成败是非常关键的. Spark 和 Flin ...
- Ext.state.Manager.setProvider(new Ext.state.CookieProvider())
Ext.state.Manager.setProvider(new Ext.state.CookieProvider()) 初始化Ext状态管理器,在Cookie中记录用户的操作状态,如果不启用,象刷 ...
- state.php,状态模式(State)
3.9.1. 目的 状态模式可以基于一个对象的同种事务而封装出不同的行为.它提供一种简洁的方式使得对象在运行时可以改变自身行为,而不必借助单一庞大的条件判断语句. 3.9.2. UML 类图 3.9. ...
- 【基础】Flink -- DataStream API
Flink -- DataStream API 执行环境 Execution Environment 创建执行环境 设置执行模式 触发程序执行 源算子 Source 从集合中读取数据 从文件读取数据 ...
- Flink on zeppelin第五弹设置checkpoint
概述 Flink的exactly-once语义实现是需要依赖checkpoint的,对于一个有状态的Flink任务来说如果想要在任务发生failover,或者手动重启任务的时候任务的状态不丢失是必须要 ...
- Apache Flink DataStream 编程全集
概述 Flink是构建在数据流之上的有状态计算的流计算框架,通常被人们理解为是第三代大数据分析方案. 第一代 - Hadoop的MapReduce计算(静态).Storm流计算(2014.9) :两套 ...
最新文章
- String.Format格式说明
- UIView的飞入效果
- jquery的$.extent()方法的总结
- 大学python实训总结-千锋Python实训总结 学好基础才能走的更远
- KVM半虚拟化驱动--virtio概述和基本原理(四)
- python 图表_新手向——制作web图表(基于Python和GooPyCharts)
- 实现Jitsi SFU自动关闭/启动视频层
- MVP Community Camp 社区大课堂
- 操作系统【四】分页存储管理
- Go 开发 HTTP 的另一个选择 fasthttp
- SpringBoot常见面试题总结二
- 微信图文插入超链接的相关问题解答
- 第三方登录 人人php,php 使用curl模拟登录人人(校内)网的简单实例
- html中bottom的作用,css bottom属性怎么用
- 强连通分量 Kosaraju科萨拉朱算法
- C# 001 Windows7 WiFi共享大师 共享精灵 自制无广告精简版本
- 手机共享电脑的proxy网络
- integer conversion resulted in a change of sign
- 外包程序员,如何提高自己跳出外包圈子?
- 基于 MSP430 CC1101的WOR的测试