本文主要研究一下flink的FsStateBackend

StateBackend

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/StateBackend.java

@PublicEvolving
public interface StateBackend extends java.io.Serializable {// ------------------------------------------------------------------------//  Checkpoint storage - the durable persistence of checkpoint data// ------------------------------------------------------------------------CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException;CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException;// ------------------------------------------------------------------------//  Structure Backends // ------------------------------------------------------------------------default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry) throws Exception {return createKeyedStateBackend(env,jobID,operatorIdentifier,keySerializer,numberOfKeyGroups,keyGroupRange,kvStateRegistry,TtlTimeProvider.DEFAULT);}default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider) throws Exception {return createKeyedStateBackend(env,jobID,operatorIdentifier,keySerializer,numberOfKeyGroups,keyGroupRange,kvStateRegistry,ttlTimeProvider,new UnregisteredMetricsGroup());}<K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider,MetricGroup metricGroup) throws Exception;OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception;
}
复制代码
  • StateBackend接口定义了有状态的streaming应用的state是如何stored以及checkpointed
  • StateBackend接口定义了createCheckpointStorage、createKeyedStateBackend、createOperatorStateBackend方法;同时继承了Serializable接口;StateBackend接口的实现要求是线程安全的
  • StateBackend有个直接实现的抽象类AbstractStateBackend,而AbstractFileStateBackend及RocksDBStateBackend继承了AbstractStateBackend,之后MemoryStateBackend、FsStateBackend都继承了AbstractFileStateBackend

AbstractStateBackend

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/AbstractStateBackend.java

/*** An abstract base implementation of the {@link StateBackend} interface.** <p>This class has currently no contents and only kept to not break the prior class hierarchy for users.*/
@PublicEvolving
public abstract class AbstractStateBackend implements StateBackend, java.io.Serializable {private static final long serialVersionUID = 4620415814639230247L;// ------------------------------------------------------------------------//  State Backend - State-Holding Backends// ------------------------------------------------------------------------@Overridepublic abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider,MetricGroup metricGroup) throws IOException;@Overridepublic abstract OperatorStateBackend createOperatorStateBackend(Environment env,String operatorIdentifier) throws Exception;
}
复制代码
  • AbstractStateBackend声明实现StateBackend及Serializable接口,它将createKeyedStateBackend方法及createOperatorStateBackend方法重新定义为抽象方法

AbstractFileStateBackend

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java

@PublicEvolving
public abstract class AbstractFileStateBackend extends AbstractStateBackend {private static final long serialVersionUID = 1L;// ------------------------------------------------------------------------//  State Backend Properties// ------------------------------------------------------------------------/** The path where checkpoints will be stored, or null, if none has been configured. */@Nullableprivate final Path baseCheckpointPath;/** The path where savepoints will be stored, or null, if none has been configured. */@Nullableprivate final Path baseSavepointPath;/*** Creates a backend with the given optional checkpoint- and savepoint base directories.** @param baseCheckpointPath The base directory for checkpoints, or null, if none is configured.* @param baseSavepointPath The default directory for savepoints, or null, if none is set.*/protected AbstractFileStateBackend(@Nullable URI baseCheckpointPath,@Nullable URI baseSavepointPath) {this(baseCheckpointPath == null ? null : new Path(baseCheckpointPath),baseSavepointPath == null ? null : new Path(baseSavepointPath));}/*** Creates a backend with the given optional checkpoint- and savepoint base directories.** @param baseCheckpointPath The base directory for checkpoints, or null, if none is configured.* @param baseSavepointPath The default directory for savepoints, or null, if none is set.*/protected AbstractFileStateBackend(@Nullable Path baseCheckpointPath,@Nullable Path baseSavepointPath) {this.baseCheckpointPath = baseCheckpointPath == null ? null : validatePath(baseCheckpointPath);this.baseSavepointPath = baseSavepointPath == null ? null : validatePath(baseSavepointPath);}/*** Creates a new backend using the given checkpoint-/savepoint directories, or the values defined in* the given configuration. If a checkpoint-/savepoint parameter is not null, that value takes precedence* over the value in the configuration. If the configuration does not specify a value, it is possible* that the checkpoint-/savepoint directories in the backend will be null.** <p>This constructor can be used to create a backend that is based partially on a given backend* and partially on a configuration.** @param baseCheckpointPath The checkpoint base directory to use (or null).* @param baseSavepointPath The default savepoint directory to use (or null).* @param configuration The configuration to read values from.*/protected AbstractFileStateBackend(@Nullable Path baseCheckpointPath,@Nullable Path baseSavepointPath,Configuration configuration) {this(parameterOrConfigured(baseCheckpointPath, configuration, CheckpointingOptions.CHECKPOINTS_DIRECTORY),parameterOrConfigured(baseSavepointPath, configuration, CheckpointingOptions.SAVEPOINT_DIRECTORY));}// ------------------------------------------------------------------------/*** Gets the checkpoint base directory. Jobs will create job-specific subdirectories* for checkpoints within this directory. May be null, if not configured.** @return The checkpoint base directory*/@Nullablepublic Path getCheckpointPath() {return baseCheckpointPath;}/*** Gets the directory where savepoints are stored by default (when no custom path is given* to the savepoint trigger command).** @return The default directory for savepoints, or null, if no default directory has been configured.*/@Nullablepublic Path getSavepointPath() {return baseSavepointPath;}// ------------------------------------------------------------------------//  Initialization and metadata storage// ------------------------------------------------------------------------@Overridepublic CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException {return AbstractFsCheckpointStorage.resolveCheckpointPointer(pointer);}// ------------------------------------------------------------------------//  Utilities// ------------------------------------------------------------------------/*** Checks the validity of the path's scheme and path.** @param path The path to check.* @return The URI as a Path.** @throws IllegalArgumentException Thrown, if the URI misses scheme or path.*/private static Path validatePath(Path path) {final URI uri = path.toUri();final String scheme = uri.getScheme();final String pathPart = uri.getPath();// some validity checksif (scheme == null) {throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +"Please specify the file system scheme explicitly in the URI.");}if (pathPart == null) {throw new IllegalArgumentException("The path to store the checkpoint data in is null. " +"Please specify a directory path for the checkpoint data.");}if (pathPart.length() == 0 || pathPart.equals("/")) {throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");}return path;}@Nullableprivate static Path parameterOrConfigured(@Nullable Path path, Configuration config, ConfigOption<String> option) {if (path != null) {return path;}else {String configValue = config.getString(option);try {return configValue == null ? null : new Path(configValue);}catch (IllegalArgumentException e) {throw new IllegalConfigurationException("Cannot parse value for " + option.key() +" : " + configValue + " . Not a valid path.");}}}
}
复制代码
  • AbstractFileStateBackend继承了AbstractStateBackend,它有baseCheckpointPath、baseSavepointPath两个属性,允许为null,路径的格式为hdfs://或者file://开头;resolveCheckpoint方法用于解析checkpoint或savepoint的location,这里使用AbstractFsCheckpointStorage.resolveCheckpointPointer(pointer)来完成

FsStateBackend

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/FsStateBackend.java

@PublicEvolving
public class FsStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend {private static final long serialVersionUID = -8191916350224044011L;/** Maximum size of state that is stored with the metadata, rather than in files (1 MiByte). */private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;// ------------------------------------------------------------------------/** State below this size will be stored as part of the metadata, rather than in files.* A value of '-1' means not yet configured, in which case the default will be used. */private final int fileStateThreshold;/** Switch to chose between synchronous and asynchronous snapshots.* A value of 'undefined' means not yet configured, in which case the default will be used. */private final TernaryBoolean asynchronousSnapshots;//......public FsStateBackend(URI checkpointDirectory,@Nullable URI defaultSavepointDirectory,int fileStateSizeThreshold,TernaryBoolean asynchronousSnapshots) {super(checkNotNull(checkpointDirectory, "checkpoint directory is null"), defaultSavepointDirectory);checkNotNull(asynchronousSnapshots, "asynchronousSnapshots");checkArgument(fileStateSizeThreshold >= -1 && fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD,"The threshold for file state size must be in [-1, %s], where '-1' means to use " +"the value from the deployment's configuration.", MAX_FILE_STATE_THRESHOLD);this.fileStateThreshold = fileStateSizeThreshold;this.asynchronousSnapshots = asynchronousSnapshots;}/*** Private constructor that creates a re-configured copy of the state backend.** @param original The state backend to re-configure* @param configuration The configuration*/private FsStateBackend(FsStateBackend original, Configuration configuration) {super(original.getCheckpointPath(), original.getSavepointPath(), configuration);// if asynchronous snapshots were configured, use that setting,// else check the configurationthis.asynchronousSnapshots = original.asynchronousSnapshots.resolveUndefined(configuration.getBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS));final int sizeThreshold = original.fileStateThreshold >= 0 ?original.fileStateThreshold :configuration.getInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD);if (sizeThreshold >= 0 && sizeThreshold <= MAX_FILE_STATE_THRESHOLD) {this.fileStateThreshold = sizeThreshold;}else {this.fileStateThreshold = CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();// because this is the only place we (unlikely) ever log, we lazily// create the logger hereLoggerFactory.getLogger(AbstractFileStateBackend.class).warn("Ignoring invalid file size threshold value ({}): {} - using default value {} instead.",CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.key(), sizeThreshold,CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue());}}/*** Gets the base directory where all the checkpoints are stored.* The job-specific checkpoint directory is created inside this directory.** @return The base directory for checkpoints.** @deprecated Deprecated in favor of {@link #getCheckpointPath()}.*/@Deprecatedpublic Path getBasePath() {return getCheckpointPath();}/*** Gets the base directory where all the checkpoints are stored.* The job-specific checkpoint directory is created inside this directory.** @return The base directory for checkpoints.*/@Nonnull@Overridepublic Path getCheckpointPath() {// we know that this can never be null by the way of constructor checks//noinspection ConstantConditionsreturn super.getCheckpointPath();}/*** Gets the threshold below which state is stored as part of the metadata, rather than in files.* This threshold ensures that the backend does not create a large amount of very small files,* where potentially the file pointers are larger than the state itself.** <p>If not explicitly configured, this is the default value of* {@link CheckpointingOptions#FS_SMALL_FILE_THRESHOLD}.** @return The file size threshold, in bytes.*/public int getMinFileSizeThreshold() {return fileStateThreshold >= 0 ?fileStateThreshold :CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();}/*** Gets whether the key/value data structures are asynchronously snapshotted.** <p>If not explicitly configured, this is the default value of* {@link CheckpointingOptions#ASYNC_SNAPSHOTS}.*/public boolean isUsingAsynchronousSnapshots() {return asynchronousSnapshots.getOrDefault(CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue());}// ------------------------------------------------------------------------//  Reconfiguration// ------------------------------------------------------------------------/*** Creates a copy of this state backend that uses the values defined in the configuration* for fields where that were not specified in this state backend.** @param config the configuration* @return The re-configured variant of the state backend*/@Overridepublic FsStateBackend configure(Configuration config) {return new FsStateBackend(this, config);}// ------------------------------------------------------------------------//  initialization and cleanup// ------------------------------------------------------------------------@Overridepublic CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {checkNotNull(jobId, "jobId");return new FsCheckpointStorage(getCheckpointPath(), getSavepointPath(), jobId, getMinFileSizeThreshold());}// ------------------------------------------------------------------------//  state holding structures// ------------------------------------------------------------------------@Overridepublic <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider,MetricGroup metricGroup) {TaskStateManager taskStateManager = env.getTaskStateManager();LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();HeapPriorityQueueSetFactory priorityQueueSetFactory =new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);return new HeapKeyedStateBackend<>(kvStateRegistry,keySerializer,env.getUserClassLoader(),numberOfKeyGroups,keyGroupRange,isUsingAsynchronousSnapshots(),env.getExecutionConfig(),localRecoveryConfig,priorityQueueSetFactory,ttlTimeProvider);}@Overridepublic OperatorStateBackend createOperatorStateBackend(Environment env,String operatorIdentifier) {return new DefaultOperatorStateBackend(env.getUserClassLoader(),env.getExecutionConfig(),isUsingAsynchronousSnapshots());}// ------------------------------------------------------------------------//  utilities// ------------------------------------------------------------------------@Overridepublic String toString() {return "File State Backend (" +"checkpoints: '" + getCheckpointPath() +"', savepoints: '" + getSavepointPath() +"', asynchronous: " + asynchronousSnapshots +", fileStateThreshold: " + fileStateThreshold + ")";}
}
复制代码
  • FsStateBackend继承了AbstractFileStateBackend,同时实现了ConfigurableStateBackend接口;它的public构造器支持checkpointDirectory、defaultSavepointDirectory、fileStateSizeThreshold及asynchronousSnapshots这几个参数,它要求asynchronousSnapshots不能为null,fileStateSizeThreshold必须大于等于-1,小于等于MAX_FILE_STATE_THRESHOLD
  • configure方法则调用的是private的构造器,它会根据Configuration对当前实例进行重新配置,比如重新设置asynchronousSnapshots,对于fileStateThreshold小于0的,则先取CheckpointingOptions.FS_SMALL_FILE_THRESHOLD的值,之后再对该值进行校正(如果该值大于等于0,小于等于MAX_FILE_STATE_THRESHOLD则取该值,否则取CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue())
  • createCheckpointStorage方法创建的是FsCheckpointStorage,createKeyedStateBackend方法创建的是HeapKeyedStateBackend,createOperatorStateBackend方法创建的是DefaultOperatorStateBackend

小结

  • FsStateBackend继承了AbstractFileStateBackend,同时实现了ConfigurableStateBackend接口的configure方法,里头要求fileStateThreshold大于等于-1,小于等于MAX_FILE_STATE_THRESHOLD
  • FsStateBackend对于TaskManager的数据先是存在内存,在checkpoint的时候写入到指定的文件系统,而对于JobManager的metadata则存在内存;它默认采用的是async snapshots来避免阻塞线程;为了避免写太多的小文件,它有一个fileStateThreshold阈值,小于该值时state存储到metadata中而不是文件中
  • createCheckpointStorage方法创建的是FsCheckpointStorage,createKeyedStateBackend方法创建的是HeapKeyedStateBackend,createOperatorStateBackend方法创建的是DefaultOperatorStateBackend

doc

  • The FsStateBackend
  • 聊聊flink的MemoryStateBackend

聊聊flink的FsStateBackend相关推荐

  1. 聊聊flink的HistoryServer

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的HistoryServer HistoryServer flink-1.7.2/flink-runti ...

  2. 聊聊flink的TimeCharacteristic

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的TimeCharacteristic TimeCharacteristic flink-streami ...

  3. 聊聊flink JobManager的heap大小设置

    序 本文主要研究一下flink JobManager的heap大小设置 JobManagerOptions flink-core-1.7.1-sources.jar!/org/apache/flink ...

  4. 聊聊flink的InternalTimeServiceManager

    序 本文主要研究一下flink的InternalTimeServiceManager InternalTimeServiceManager flink-streaming-java_2.11-1.7. ...

  5. 聊聊flink的StateTtlConfig

    序 本文主要研究一下flink的StateTtlConfig 实例 import org.apache.flink.api.common.state.StateTtlConfig; import or ...

  6. 聊聊flink的AscendingTimestampExtractor

    序 本文主要研究一下flink的AscendingTimestampExtractor AscendingTimestampExtractor flink-streaming-java_2.11-1. ...

  7. 聊聊flink的CheckpointScheduler

    序 本文主要研究一下flink的CheckpointScheduler CheckpointCoordinatorDeActivator flink-runtime_2.11-1.7.0-source ...

  8. 聊聊flink的NetworkEnvironmentConfiguration

    序 本文主要研究一下flink的NetworkEnvironmentConfiguration NetworkEnvironmentConfiguration flink-1.7.2/flink-ru ...

  9. 聊聊flink Table的groupBy操作

    序 本文主要研究一下flink Table的groupBy操作 Table.groupBy flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/t ...

最新文章

  1. 学习一个 Linux 命令:shutdown 命令
  2. iframe 有那些缺点?
  3. 柱状图中最大的矩形—leetcode84
  4. 最有用的Postgres扩展:pg_stat_statements
  5. 9202 myslq 课堂笔记 dml dql
  6. pip修改下载源为国内源 linux系统
  7. ios 控件切圆_iOS中 切圆角,任意几个角(带边框,不带边框)__OC和Swift版本 韩俊强的博客...
  8. 现代抽象UI素材背景3D流畅的造型(样条)|轻松地为Web创建3D体验
  9. dataframe转化为array_Pandas入门教程:如何将列表转化成数据框?
  10. unity 获得当前物体_unity 获取物体尺寸
  11. python笔记记录神器 jupyter notebook
  12. 算法研究NO8.用数理统计法消除粗大误差
  13. 关于几款系统恢复常用工具的用法介绍
  14. 浙江大学2019年数学分析考研试题
  15. 人工智能:确定性推理
  16. 【RL系列】Multi-Armed Bandit问题笔记
  17. 中国互联网生态报告发布
  18. mes系统故障_MES系统常见问题解析
  19. oracle rac节点重启的原因,由重启引起的Oracle RAC节点宕机分析及追根溯源
  20. 一些常用的ider快捷键使用

热门文章

  1. 不用也要知道的几种算法(PHP版本)
  2. 【CSS3】---only-child选择器+only-of-type选择器
  3. 图片或文字或box垂直居中
  4. .Net中的事件处理模型
  5. mysql5.7非源码版msi安装教程
  6. python+flask编写一个简单的登录接口例子
  7. mysql的number类型对应的db2_【转】oracle数据库NUMBER数据类型
  8. 【C语言】指针进阶 - 指针数组 数组指针 数组指针传参 函数指针 指向函数指针数组的指针
  9. JAVAEWEB实现文件的上传案例
  10. java 锁定界面_Java中的锁