目录

  • 1. 什么operator有状态
  • 2. Operator State
    • 2.1 Broadcast state的使用
    • 2.2 List state和Union List state的使用
    • 2.3 Stateful Source Functions(List state和Union List state的特例)
  • 3. Keyed State
    • 3.1 Keyed state的使用
    • 3.2 State Time-To-Live (TTL)
  • 4. checkpoint的设置
  • 5. Savepoint
  • 6. State Backends

1. 什么operator有状态

  • 什么operator没有状态

    1. 像map函数这种,在一个slot中,一次只处理一个元素,是没有状态的
  • 什么operator有状态

    1. 像FlinkKafkaConsumer这种Source,需要保存消费每个kafka partition的offset, 这就是一种状态
    2. 像Window函数这种,包含多个元素时,需要保存窗口的信息和窗口的元素,这就是一种状态
    3. 对于Sink函数这种,需要保存进入Sink但还未更新到目标源的元素, 这就是一种状态
    4. 在CheckpointedFunction(operator state)或RichFunction(keyed state)中自定义State(ListState), 这就是一种状态
  • 状态如何保证Exactly-Once语义

    1. 对于程序停止时进行savepoint, 再从此savepoint进行恢复,能保证Exactly-Once
    2. 对于程序异常停止,当从checkpoint或savepoint恢复时,需要Source能退回到以前的消费点进行消费(如kafka), 下游的目标对于checkpoint为未完成的数据,是不可见的(如kafka和FileSink),才能保证Exactly-Once语义。当然也可在Sink对数据进行去重,能避免数据增多,但是会产生结果不正确的数据

2. Operator State

  • 非Keyed State, 比如FlinkKafkaConsumer、Sink
  • State的数据类型有broadcast state、list state、union list state

2.1 Broadcast state的使用

  1. 发送到rule_input的数据
[root@bigdata001 ~]#
[root@bigdata001 ~]# nc -lk 9998
1,2021-09-26 12:00:01
2,2021-09-26 12:00:02
3,2021-09-26 12:00:03
  1. 发送到item_input的数据
[root@bigdata001 ~]#
[root@bigdata001 ~]# nc -lk 9999
1,2021-09-26 12:10:01
3,2021-09-26 12:10:03
2,2021-09-26 12:10:02
4,2021-09-26 12:10:04
1,2021-09-26 12:10:01
  1. 示例代码
package datastreamApiimport org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.eventtime._
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector
import scala.collection.JavaConversions.iterableAsScalaIterableclass RecordTimestampAssigner extends TimestampAssigner[(Int, String)] {val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")override def extractTimestamp(element: (Int, String), recordTimestamp: Long): Long = {fdf.parse(element._2).getTime}}class PeriodWatermarkGenerator extends WatermarkGenerator[(Int, String)] {val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")var maxTimestamp: Long = _val maxOutofOrderness = 0override def onEvent(event: (Int, String), eventTimestamp: Long, output: WatermarkOutput): Unit = {maxTimestamp = math.max(fdf.parse(event._2).getTime, maxTimestamp)}override def onPeriodicEmit(output: WatermarkOutput): Unit = {output.emitWatermark(new Watermark(maxTimestamp - maxOutofOrderness - 1))}
}class MyWatermarkStrategy extends WatermarkStrategy[(Int, String)] {override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[(Int, String)] = {new RecordTimestampAssigner()}override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[(Int, String)] = {new PeriodWatermarkGenerator()}}object BroadcastStateTest {def main(args: Array[String]): Unit = {val senv = StreamExecutionEnvironment.getExecutionEnvironmentval rule_input = senv.socketTextStream("192.168.xxx.xxx", 9998).map(line => {val contents = line.split(",")(contents(0).toInt, contents(1))}).assignTimestampsAndWatermarks(new MyWatermarkStrategy())val item_input = senv.socketTextStream("192.168.xxx.xxx", 9999).map(line => {val contents = line.split(",")(contents(0).toInt, contents(1))}).assignTimestampsAndWatermarks(new MyWatermarkStrategy())// 1. 定义一个广播状态描述符val broadcast_descriptor = new MapStateDescriptor[String, (Int, String)]("my_broadcast", BasicTypeInfo.STRING_TYPE_INFO, TypeInformation.of(classOf[(Int, String)]))// 2. 本示例有8个slot, 每个slot都有3条数据和一个Broadcast stateval rule_input_broadcast = rule_input.broadcast(broadcast_descriptor)val output = item_input.connect(rule_input_broadcast).process(new BroadcastProcessFunction[(Int, String), (Int, String), String] {override def processBroadcastElement(in2: (Int, String), context: BroadcastProcessFunction[(Int, String), (Int, String), String]#Context, collector: Collector[String]): Unit = {// 3. 将一个slot上的3条数据,保存到Broadcast statecontext.getBroadcastState(broadcast_descriptor).put(in2._1.toString, in2)}override def processElement(in1: (Int, String), readOnlyContext: BroadcastProcessFunction[(Int, String), (Int, String), String]#ReadOnlyContext, collector: Collector[String]): Unit = {// 4. 获取slot1上的Broadcast state,并转换为iterable类型,其中有3条数据val rules = readOnlyContext.getBroadcastState(broadcast_descriptor).immutableEntries()// 5. 将slot1上的非广播流元素与iterable进行匹配,符合则输出for (rule <- rules) {if (rule.getValue._1 == in1._1) {collector.collect(s"${in1._1.toString}------${rules.map(_.getValue._1).mkString(",")}")}}}})output.print("output")senv.execute("BroadcastTest")}}
  1. 执行顺序

    1. 启动端口9998和9999的ncat
    2. 启动程序
    3. 向9998发送rule_input的数据
    4. 向9999发送item_input的数据
  2. 执行结果

output:8> 1------1,2,3
output:1> 3------1,2,3
output:2> 2------1,2,3
output:4> 1------1,2,3
  1. 注意事项
  • 如果rule_input和item_input采用senv.fromElements的方式生成数据,虽然通过assignTimestampsAndWatermarks指定了timestamp, 但是process函数并不依赖timestamp, 可能processElement函数比processBroadcastElement函数先执行
  • 所有本示例我们采用socketTextStream的方式生成数据,先发送rule_input的数据,隔几秒钟再发送item_input的数据,让processBroadcastElement函数比processElement函数先执行

2.2 List state和Union List state的使用

package datastreamApiimport org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}import scala.collection.JavaConversions.{bufferAsJavaList, iterableAsScalaIterable}
import scala.collection.mutable.ArrayBufferclass My_sink_function extends RichSinkFunction[Int] with CheckpointedFunction {val threshold = 1000var checkpointed_list_state: ListState[Int] = _val buffered_elements: ArrayBuffer[Int] = ArrayBuffer()// CheckpointedFunction方法override def initializeState(functionInitializationContext: FunctionInitializationContext): Unit = {val list_state_descriptor = new ListStateDescriptor[Int]("my_list_state", TypeInformation.of(classOf[Int]))checkpointed_list_state = functionInitializationContext.getOperatorStateStore.getListState(list_state_descriptor)if (functionInitializationContext.isRestored) {buffered_elements ++= checkpointed_list_state.get().toSeq}}override def invoke(value: Int, context: SinkFunction.Context): Unit = {buffered_elements += valueif (buffered_elements.size == threshold) {// 发送buffered_elements到sink}buffered_elements.clear()}// CheckpointedFunction方法override def snapshotState(functionSnapshotContext: FunctionSnapshotContext): Unit = {checkpointed_list_state.clear()// 一个slot中的list state,只保存该slot中的元素checkpointed_list_state.addAll(buffered_elements)}}object ListStateTest {def main(args: Array[String]): Unit = {val senv = StreamExecutionEnvironment.getExecutionEnvironmentval input = senv.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9)input.addSink(new My_sink_function())senv.execute("ListStateTest")}}

OperatorStateStore.scala

package org.apache.flink.api.common.state;import java.util.Set;
import org.apache.flink.annotation.PublicEvolving;@PublicEvolving
public interface OperatorStateStore {<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> var1) throws Exception;<S> ListState<S> getListState(ListStateDescriptor<S> var1) throws Exception;<S> ListState<S> getUnionListState(ListStateDescriptor<S> var1) throws Exception;Set<String> getRegisteredStateNames();Set<String> getRegisteredBroadcastStateNames();
}
  • 需继承CheckpointedFunction,才能使用
  • 第一次执行,或从checkpoint或savepoint恢复时,调用initializeState函数
  • 进行checkpoint或savepoint时,调用snapshotState函数
  • 从OperatorStateStore.scala可以看出,List state和Union List state其实是一样的,区别在于,当从checkpoint或savepoint恢复时:
    • getListState将该operator所有slot的List State进行合并,然后再平分到所有slot
    • getUnionListState将该operator所有slot的List State进行合并,然后再将合并后的List State直接发送到所有slot

2.3 Stateful Source Functions(List state和Union List state的特例)

  • 需指定锁lock, 保证发送数据和状态更新的原子性,保证Exactly-Once语义
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}import scala.collection.JavaConversions.iterableAsScalaIterableclass CounterSource extends RichParallelSourceFunction[Long] with CheckpointedFunction {private var isRunning = trueprivate var offset = 0Lprivate var list_state: ListState[Long] = _override def initializeState(functionInitializationContext: FunctionInitializationContext): Unit = {val list_state_descriptor = new ListStateDescriptor[Long]("my_list_state", TypeInformation.of(classOf[Long]))list_state = functionInitializationContext.getOperatorStateStore.getListState(list_state_descriptor)if (functionInitializationContext.isRestored) {offset = list_state.get().toSeq.apply(0)}}override def run(sourceContext: SourceFunction.SourceContext[Long]): Unit = {val lock = sourceContext.getCheckpointLockwhile (isRunning) {// 发送数据进行加锁lock.synchronized({sourceContext.collect(offset)offset += 1})}}override def cancel(): Unit = isRunning = falseoverride def snapshotState(functionSnapshotContext: FunctionSnapshotContext): Unit = {list_state.clear()list_state.add(offset)}}

3. Keyed State

  • keyBy操作后,一个key对应一个state, key和state位于相同的slot, 确保state update是本地操作,且便于重新分配state
  • State的数据类型如下表所示:
类型 功能 方法
ValueState<T> 保存一个单值 更新值:void update(T var1) throws IOException;
获取值:T value() throws IOException;
ListState<T> 保存多个值 添加一个值:void add(IN var1) throws Exception;
添加一个列表:void addAll(List<T> var1) throws Exception;
用一个列表更新list state所有元素:void update(List<T> var1) throws Exception;
获取所有值,返回一个iterable: Iterable get() throws Exception;
MapState<K, V> 保存多对映射 插入或更新一对映射:void put(K var1, V var2) throws Exception;
插入或更新多对映射:void putAll(Map<K, V> var1) throws Exception;
获取一个key的值:V get(K var1) throws Exception;
获取iterable形式的所有映射:Iterable<Entry<K, V>> entries() throws Exception;
获取iterator形式的所有映射:Iterator<Entry<K, V>> iterator() throws Exception;
获取所有key值:Iterable<K> keys() throws Exception;
获取所有value值:Iterable<V> values() throws Exception;
判断一个key是否存在:boolean contains(K var1) throws Exception;
判断state是否有元素:boolean isEmpty() throws Exception;
移除一个key对应的值:void remove(K var1) throws Exception;
ReducingState<T> 保存一个单值, stateDescriptor中需指定一个ReduceFunction, 对元素进行聚合, 使用请参考下面的reducingStateDescriptor的生成 添加一个值:void add(T var1) throws Exception;
获取聚合后的结果值:T get() throws Exception;
AggregatingState<IN, OUT> 保存一个单值, stateDescriptor中需指定一个AggregateFunction, 对元素进行聚合, 使用请参考下面的aggregatingStateDescriptor的生成 添加一个值:void add(IN var1) throws Exception;
获取聚合后的结果值:OUT get() throws Exception;

reducingStateDescriptor的生成:

new ReducingStateDescriptor[T]("state_name",new ReduceFunction[T] {override def reduce(t1: T, t2: T): T = // do something},TypeInformation.of(classOf[T])
)

aggregatingStateDescriptor的生成:

new AggregatingStateDescriptor[Long, (Long,Long), Double]("my_aggregate_state",new AggregateFunction[Long, (Long,Long), Double] {override def createAccumulator(): (Long,Long) = (0L, 0L)override def add(in: Long, acc: (Long,Long)): (Long,Long) = (acc._1+1, acc._2 + in)override def merge(acc1: (Long,Long), acc2: (Long,Long)): (Long,Long) =(acc1._1+acc2._1, acc1._2 + acc2._2)override def getResult(acc: (Long,Long)): Double = acc._2.toDouble / acc._1},TypeInformation.of(classOf[(Long,Long)])
)

3.1 Keyed state的使用

  • 这里我们用ValueState<T>来进行说明,其它的数据类型参考即可
  • 只能在RichFunction中调用getRuntimeContext来访问state
package datastreamApiimport org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector// 求每两个数的平均值
class MyAverageFunction extends RichFlatMapFunction[(String, Long), (String, Double)] {private var value_state: ValueState[(Long, Long)] = _override def open(parameters: Configuration): Unit = {val value_state_descriptor = new ValueStateDescriptor[(Long, Long)]("my_value_state", TypeInformation.of(classOf[(Long, Long)]))value_state = getRuntimeContext.getState(value_state_descriptor)}override def flatMap(in: (String, Long), collector: Collector[(String, Double)]): Unit = {val tmp_value: (Long, Long) = value_state.value()val current_value =if (tmp_value != null) tmp_value else (0L, 0L)val new_value = (current_value._1 + 1, current_value._2 + in._2)value_state.update(new_value)if (new_value._1 == 2) {collector.collect((in._1, new_value._2.toDouble / new_value._1))value_state.clear()}}}object KeyStateTest {def main(args: Array[String]): Unit = {val senv = StreamExecutionEnvironment.getExecutionEnvironmentval input = senv.fromElements(("A", 10L), ("A", 20L), ("A", 30L),("B", 100L), ("B", 200L), ("B", 300L))val output = input.keyBy(_._1).flatMap(new MyAverageFunction())output.print("output")senv.execute("KeyStateTest")}}

执行结果:

output:2> (B,150.0)
output:7> (A,15.0)

3.2 State Time-To-Live (TTL)

  • TTL的精确粒度能到达单个元素,如ListState或MapState中的一个值
  • TTL中的时间是基于Processing Time的,State的更新时间和State一起保存在State backend
  • StateTtlConfig中的配置不会保存到State backend,当从checkpoint或savepoint恢复时,代码的TTL是否开启和checkpoint或savepoint的TTL是否开启要保持一致

使用方式如下:

import org.apache.flink.api.common.state.StateTtlConfig.{StateVisibility, UpdateType}
import org.apache.flink.api.common.state.{StateTtlConfig, ValueStateDescriptor}
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.TypeInformationval state_ttl_config = StateTtlConfig.newBuilder(Time.days(1L)).setUpdateType(UpdateType.OnCreateAndWrite).setStateVisibility(StateVisibility.NeverReturnExpired)// .cleanupFullSnapshot()// .cleanupInRocksdbCompactFilter(1000L).build()val value_state_descriptor =new ValueStateDescriptor[Long]("my_value_state", TypeInformation.of(classOf[Long])).enableTimeToLive(state_ttl_config)

state过期参数说明:

  • newBuilder(@Nonnull Time ttl): 设置state的过期时间
  • setUpdateType(StateTtlConfig.UpdateType updateType):
    1. OnCreateAndWrite(默认):当创建或写入state时,重置过期时间
    2. OnReadAndWrite: 当读取或写入state时,重置过期时间
  • setStateVisibility(@Nonnull StateTtlConfig.StateVisibility stateVisibility):
    1. NeverReturnExpired(默认): 不返回过期但未删除的state
    2. ReturnExpiredIfNotCleanedUp: 返回过期但未删除的state

通用的state删除说明:

  • 默认会在读取一个state时,删除state中过期的元素

full state snapshot backend的state删除说明:

  • 适用于HashMapStateBackend+FileSystemCheckpointStorage、EmbeddedRocksDBStateBackend全量模式
  • cleanupFullSnapshot():进行全量snapshot时,对state backend的过期数据进行删除,但不会删除本地state中的元素

Incremental state snapshot backend的state删除说明:

  • 适用于EmbeddedRocksDBStateBackend增量模式
  • cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries):当处理了一定次数的state时,就会用compaction filter方式对state中的元素进行增量删除

4. checkpoint的设置

  • checkpoint由Flink触发,主要用于程序失败重试时的故障恢复
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentval senv = StreamExecutionEnvironment.getExecutionEnvironment// checkpoint默认关闭,每30秒进行一次checkpoint, 需在服务器压力和重复处理数据量之间进行权衡
// CheckpointingMode为EXACTLY_ONCE(默认)或AT_LEAST_ONCE, 区别在于AT_LEAST_ONCE不需要Barrier Alignment
senv.enableCheckpointing(30L*1000, CheckpointingMode.EXACTLY_ONCE)// 一次checkpoint的时间超过10分钟(默认时间),则删除此checkpoint(不算失败)
senv.getCheckpointConfig.setCheckpointTimeout(600L*1000)// 上一次checkpoint结束到此次checkpoint开始,之间空闲10秒(默认为0),用于缓解服务器压力
// 如果大于1,则maxConcurrentCheckpoints失效
senv.getCheckpointConfig.setMinPauseBetweenCheckpoints(10L*1000)// 同时允许运行的checkpoint数量(默认为1)
senv.getCheckpointConfig.setMaxConcurrentCheckpoints(1)// 当多少次checkpoint失败时,才导致application失败,默认为0
senv.getCheckpointConfig.setTolerableCheckpointFailureNumber(0)// 当服务器压力很大,导致checkpoint的时间特别长(需要耗时等待Barrier Alignment)
// 开启不对齐的checkpoint, 未对齐的部分也会被checkpoint,默认为false
// 只在CheckpointingMode.EXACTLY_ONCE和maxConcurrentCheckpoints = 1才有效
senv.getCheckpointConfig.enableUnalignedCheckpoints(true)// 当stop application时,保留state backend中的最近几个checkpoint,就可以和savepoint一样进行恢复
// 默认为DELETE_ON_CANCELLATION
senv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
)// checkpint的FileSystemCheckpointStorage保存位置
senv.getCheckpointConfig.setCheckpointStorage("hdfs://nnha/flink/ha/checkpoint/")

5. Savepoint

  • savepoint需要我们自己手动触发执行,实现机制和checkpoint一样,主要用于改变并行度、程序升级、集群升级等场景
  • savepoint的触发执行命令:
    1. standalone模式:bin/flink savepoint :jobId [:targetDirectory]
    2. standalone模式(stop application):bin/flink savepoint -yid :yarnAppId :jobId [:targetDirectory]
    3. yarn模式:bin/flink stop --savepointPath [:targetDirectory] :jobId
    4. yarn模式(stop application):bin/flink stop --savepointPath [:targetDirectory] -yid :yarnAppId :jobId

6. State Backends

提供了3种State Backend

  1. 默认为HashMapStateBackend + JobManagerCheckpointStorage
  • task manager的本地state保存在heap memory, snapshot保存在job manager的heap memory
  • flink-conf.yaml配置
state.backend: hashmap
state.checkpoint-storage: jobmanager
  • 代码配置
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentval senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStateBackend(new HashMapStateBackend())
senv.getCheckpointConfig.setCheckpointStorage(new JobManagerCheckpointStorage())
  1. HashMapStateBackend + FileSystemCheckpointStorage
  • task manager的本地state保存在heap memory, snapshot保存在HDFS
  • flink-conf.yaml配置
state.backend: hashmap
state.checkpoint-storage: filesystem
state.checkpoints.dir: hdfs://nnha/flink/ha/checkpoint/state.savepoints.dir: hdfs://nnha/flink/ha/savepoint/
  • 代码配置
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentval senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStateBackend(new HashMapStateBackend())
// senv.getCheckpointConfig.setCheckpointStorage("hdfs://nnha/flink/ha/checkpoint/")
senv.getCheckpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://nnha/flink/ha/checkpoint/",-1, // 当State小于该参数(单位KB), 则储存为metadata,而不是file, 默认为-1,表示该参数的值由runtime configuration决定-1 // State序列化时的缓存大小,默认为-1,表示该参数的值由runtime configuration决定)
)
  1. EmbeddedRocksDBStateBackend
  • task manager的本地state保存在local文件, snapshot保存在HDFS

  • 能进行增量snapshot

  • flink-conf.yaml配置

state.backend: rocksdb
state.checkpoint-storage: filesystem
state.checkpoints.dir: hdfs://nnha/flink/ha/checkpoint/state.savepoints.dir: hdfs://nnha/flink/ha/savepoint/
  • 代码配置
/* IDEA中需添加依赖<dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_2.11</artifactId><version>1.13.2</version><scope>provided</scope>
</dependency>*/import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentval senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setStateBackend(new EmbeddedRocksDBStateBackend())
// senv.getCheckpointConfig.setCheckpointStorage("hdfs://nnha/flink/ha/checkpoint/")
senv.getCheckpointConfig.setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://nnha/flink/ha/checkpoint/",-1, // 当State小于该参数(单位KB), 则储存为metadata,而不是file, 默认为-1,表示该参数的值由runtime configuration决定-1 // State序列化时的缓存大小,默认为-1,表示该参数的值由runtime configuration决定)
)

Flink DataStream的Operator State、Keyed State、checkpoint、Savepoint、State Backends的使用和讲解相关推荐

  1. Flink实战问题(三): Failed to rollback to checkpoint/savepoint

    一.背景 Flink cdc 的sql做进行调整,添加where条件过滤.调整sql,现在进行升级重启,想从check poit恢复数据,出现一下问题 二:错误 Caused by: java.lan ...

  2. Flink DataStream API(基础版)

    概述   DataStream(数据流)本身是 Flink 中一个用来表示数据集合的类(Class),我们编写的 Flink 代码其实就是基于这种数据类型的处理,所以这套核心API 就以DataStr ...

  3. Flink DataStream API 介绍

    Flink DataStream API 介绍 StreamExecutionEnvironment #mermaid-svg-JKeWa22W2vWA4zBS {font-family:" ...

  4. Flink专题四:Flink DataStream 窗口介绍及使用

    由于工作需要最近学习flink 现记录下Flink介绍和实际使用过程 这是flink系列的第四篇文章 Flink DataStream 窗口介绍及使用 窗口介绍 时间窗口 翻滚窗口(数据以一个时间断为 ...

  5. flink DataStream API使用及原理

    传统的大数据处理方式一般是批处理式的,也就是说,今天所收集的数据,我们明天再把今天收集到的数据算出来,以供大家使用,但是在很多情况下,数据的时效性对于业务的成败是非常关键的. Spark 和 Flin ...

  6. Ext.state.Manager.setProvider(new Ext.state.CookieProvider())

    Ext.state.Manager.setProvider(new Ext.state.CookieProvider()) 初始化Ext状态管理器,在Cookie中记录用户的操作状态,如果不启用,象刷 ...

  7. state.php,状态模式(State)

    3.9.1. 目的 状态模式可以基于一个对象的同种事务而封装出不同的行为.它提供一种简洁的方式使得对象在运行时可以改变自身行为,而不必借助单一庞大的条件判断语句. 3.9.2. UML 类图 3.9. ...

  8. 【基础】Flink -- DataStream API

    Flink -- DataStream API 执行环境 Execution Environment 创建执行环境 设置执行模式 触发程序执行 源算子 Source 从集合中读取数据 从文件读取数据 ...

  9. Flink on zeppelin第五弹设置checkpoint

    概述 Flink的exactly-once语义实现是需要依赖checkpoint的,对于一个有状态的Flink任务来说如果想要在任务发生failover,或者手动重启任务的时候任务的状态不丢失是必须要 ...

  10. Apache Flink DataStream 编程全集

    概述 Flink是构建在数据流之上的有状态计算的流计算框架,通常被人们理解为是第三代大数据分析方案. 第一代 - Hadoop的MapReduce计算(静态).Storm流计算(2014.9) :两套 ...

最新文章

  1. String.Format格式说明
  2. UIView的飞入效果
  3. jquery的$.extent()方法的总结
  4. 大学python实训总结-千锋Python实训总结 学好基础才能走的更远
  5. KVM半虚拟化驱动--virtio概述和基本原理(四)
  6. python 图表_新手向——制作web图表(基于Python和GooPyCharts)
  7. 实现Jitsi SFU自动关闭/启动视频层
  8. MVP Community Camp 社区大课堂
  9. 操作系统【四】分页存储管理
  10. Go 开发 HTTP 的另一个选择 fasthttp
  11. SpringBoot常见面试题总结二
  12. 微信图文插入超链接的相关问题解答
  13. 第三方登录 人人php,php 使用curl模拟登录人人(校内)网的简单实例
  14. html中bottom的作用,css bottom属性怎么用
  15. 强连通分量 Kosaraju科萨拉朱算法
  16. C# 001 Windows7 WiFi共享大师 共享精灵 自制无广告精简版本
  17. 手机共享电脑的proxy网络
  18. integer conversion resulted in a change of sign
  19. 外包程序员,如何提高自己跳出外包圈子?
  20. 基于 MSP430 CC1101的WOR的测试

热门文章

  1. TeamViewer的安装和使用方法
  2. 复变函数——一到三章总结
  3. 2019 计蒜之道 初赛 第三场 - 淘宝商品价格大PK
  4. 计算机通信机房消防要求,消防专用电话的设置场所及设计要求
  5. ArcGIS Server 基于Token密匙
  6. 高德地图驾车导航使用
  7. 软回车和硬回车 MS高级office
  8. java 阳历日期时间获取年月日时干支
  9. java 创建gbase,GBase 8t使用Java UDR的方法
  10. git合并——衍合于挑拣(cherry-pick)——只合并某个commit