聊聊flink的FsStateBackend
序
本文主要研究一下flink的FsStateBackend
StateBackend
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/StateBackend.java
@PublicEvolving
public interface StateBackend extends java.io.Serializable {// ------------------------------------------------------------------------// Checkpoint storage - the durable persistence of checkpoint data// ------------------------------------------------------------------------CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) throws IOException;CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException;// ------------------------------------------------------------------------// Structure Backends // ------------------------------------------------------------------------default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry) throws Exception {return createKeyedStateBackend(env,jobID,operatorIdentifier,keySerializer,numberOfKeyGroups,keyGroupRange,kvStateRegistry,TtlTimeProvider.DEFAULT);}default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider) throws Exception {return createKeyedStateBackend(env,jobID,operatorIdentifier,keySerializer,numberOfKeyGroups,keyGroupRange,kvStateRegistry,ttlTimeProvider,new UnregisteredMetricsGroup());}<K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider,MetricGroup metricGroup) throws Exception;OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception;
}
复制代码
- StateBackend接口定义了有状态的streaming应用的state是如何stored以及checkpointed
- StateBackend接口定义了createCheckpointStorage、createKeyedStateBackend、createOperatorStateBackend方法;同时继承了Serializable接口;StateBackend接口的实现要求是线程安全的
- StateBackend有个直接实现的抽象类AbstractStateBackend,而AbstractFileStateBackend及RocksDBStateBackend继承了AbstractStateBackend,之后MemoryStateBackend、FsStateBackend都继承了AbstractFileStateBackend
AbstractStateBackend
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/AbstractStateBackend.java
/*** An abstract base implementation of the {@link StateBackend} interface.** <p>This class has currently no contents and only kept to not break the prior class hierarchy for users.*/
@PublicEvolving
public abstract class AbstractStateBackend implements StateBackend, java.io.Serializable {private static final long serialVersionUID = 4620415814639230247L;// ------------------------------------------------------------------------// State Backend - State-Holding Backends// ------------------------------------------------------------------------@Overridepublic abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider,MetricGroup metricGroup) throws IOException;@Overridepublic abstract OperatorStateBackend createOperatorStateBackend(Environment env,String operatorIdentifier) throws Exception;
}
复制代码
- AbstractStateBackend声明实现StateBackend及Serializable接口,它将createKeyedStateBackend方法及createOperatorStateBackend方法重新定义为抽象方法
AbstractFileStateBackend
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
@PublicEvolving
public abstract class AbstractFileStateBackend extends AbstractStateBackend {private static final long serialVersionUID = 1L;// ------------------------------------------------------------------------// State Backend Properties// ------------------------------------------------------------------------/** The path where checkpoints will be stored, or null, if none has been configured. */@Nullableprivate final Path baseCheckpointPath;/** The path where savepoints will be stored, or null, if none has been configured. */@Nullableprivate final Path baseSavepointPath;/*** Creates a backend with the given optional checkpoint- and savepoint base directories.** @param baseCheckpointPath The base directory for checkpoints, or null, if none is configured.* @param baseSavepointPath The default directory for savepoints, or null, if none is set.*/protected AbstractFileStateBackend(@Nullable URI baseCheckpointPath,@Nullable URI baseSavepointPath) {this(baseCheckpointPath == null ? null : new Path(baseCheckpointPath),baseSavepointPath == null ? null : new Path(baseSavepointPath));}/*** Creates a backend with the given optional checkpoint- and savepoint base directories.** @param baseCheckpointPath The base directory for checkpoints, or null, if none is configured.* @param baseSavepointPath The default directory for savepoints, or null, if none is set.*/protected AbstractFileStateBackend(@Nullable Path baseCheckpointPath,@Nullable Path baseSavepointPath) {this.baseCheckpointPath = baseCheckpointPath == null ? null : validatePath(baseCheckpointPath);this.baseSavepointPath = baseSavepointPath == null ? null : validatePath(baseSavepointPath);}/*** Creates a new backend using the given checkpoint-/savepoint directories, or the values defined in* the given configuration. If a checkpoint-/savepoint parameter is not null, that value takes precedence* over the value in the configuration. If the configuration does not specify a value, it is possible* that the checkpoint-/savepoint directories in the backend will be null.** <p>This constructor can be used to create a backend that is based partially on a given backend* and partially on a configuration.** @param baseCheckpointPath The checkpoint base directory to use (or null).* @param baseSavepointPath The default savepoint directory to use (or null).* @param configuration The configuration to read values from.*/protected AbstractFileStateBackend(@Nullable Path baseCheckpointPath,@Nullable Path baseSavepointPath,Configuration configuration) {this(parameterOrConfigured(baseCheckpointPath, configuration, CheckpointingOptions.CHECKPOINTS_DIRECTORY),parameterOrConfigured(baseSavepointPath, configuration, CheckpointingOptions.SAVEPOINT_DIRECTORY));}// ------------------------------------------------------------------------/*** Gets the checkpoint base directory. Jobs will create job-specific subdirectories* for checkpoints within this directory. May be null, if not configured.** @return The checkpoint base directory*/@Nullablepublic Path getCheckpointPath() {return baseCheckpointPath;}/*** Gets the directory where savepoints are stored by default (when no custom path is given* to the savepoint trigger command).** @return The default directory for savepoints, or null, if no default directory has been configured.*/@Nullablepublic Path getSavepointPath() {return baseSavepointPath;}// ------------------------------------------------------------------------// Initialization and metadata storage// ------------------------------------------------------------------------@Overridepublic CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException {return AbstractFsCheckpointStorage.resolveCheckpointPointer(pointer);}// ------------------------------------------------------------------------// Utilities// ------------------------------------------------------------------------/*** Checks the validity of the path's scheme and path.** @param path The path to check.* @return The URI as a Path.** @throws IllegalArgumentException Thrown, if the URI misses scheme or path.*/private static Path validatePath(Path path) {final URI uri = path.toUri();final String scheme = uri.getScheme();final String pathPart = uri.getPath();// some validity checksif (scheme == null) {throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " +"Please specify the file system scheme explicitly in the URI.");}if (pathPart == null) {throw new IllegalArgumentException("The path to store the checkpoint data in is null. " +"Please specify a directory path for the checkpoint data.");}if (pathPart.length() == 0 || pathPart.equals("/")) {throw new IllegalArgumentException("Cannot use the root directory for checkpoints.");}return path;}@Nullableprivate static Path parameterOrConfigured(@Nullable Path path, Configuration config, ConfigOption<String> option) {if (path != null) {return path;}else {String configValue = config.getString(option);try {return configValue == null ? null : new Path(configValue);}catch (IllegalArgumentException e) {throw new IllegalConfigurationException("Cannot parse value for " + option.key() +" : " + configValue + " . Not a valid path.");}}}
}
复制代码
- AbstractFileStateBackend继承了AbstractStateBackend,它有baseCheckpointPath、baseSavepointPath两个属性,允许为null,路径的格式为hdfs://或者file://开头;resolveCheckpoint方法用于解析checkpoint或savepoint的location,这里使用AbstractFsCheckpointStorage.resolveCheckpointPointer(pointer)来完成
FsStateBackend
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@PublicEvolving
public class FsStateBackend extends AbstractFileStateBackend implements ConfigurableStateBackend {private static final long serialVersionUID = -8191916350224044011L;/** Maximum size of state that is stored with the metadata, rather than in files (1 MiByte). */private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;// ------------------------------------------------------------------------/** State below this size will be stored as part of the metadata, rather than in files.* A value of '-1' means not yet configured, in which case the default will be used. */private final int fileStateThreshold;/** Switch to chose between synchronous and asynchronous snapshots.* A value of 'undefined' means not yet configured, in which case the default will be used. */private final TernaryBoolean asynchronousSnapshots;//......public FsStateBackend(URI checkpointDirectory,@Nullable URI defaultSavepointDirectory,int fileStateSizeThreshold,TernaryBoolean asynchronousSnapshots) {super(checkNotNull(checkpointDirectory, "checkpoint directory is null"), defaultSavepointDirectory);checkNotNull(asynchronousSnapshots, "asynchronousSnapshots");checkArgument(fileStateSizeThreshold >= -1 && fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD,"The threshold for file state size must be in [-1, %s], where '-1' means to use " +"the value from the deployment's configuration.", MAX_FILE_STATE_THRESHOLD);this.fileStateThreshold = fileStateSizeThreshold;this.asynchronousSnapshots = asynchronousSnapshots;}/*** Private constructor that creates a re-configured copy of the state backend.** @param original The state backend to re-configure* @param configuration The configuration*/private FsStateBackend(FsStateBackend original, Configuration configuration) {super(original.getCheckpointPath(), original.getSavepointPath(), configuration);// if asynchronous snapshots were configured, use that setting,// else check the configurationthis.asynchronousSnapshots = original.asynchronousSnapshots.resolveUndefined(configuration.getBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS));final int sizeThreshold = original.fileStateThreshold >= 0 ?original.fileStateThreshold :configuration.getInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD);if (sizeThreshold >= 0 && sizeThreshold <= MAX_FILE_STATE_THRESHOLD) {this.fileStateThreshold = sizeThreshold;}else {this.fileStateThreshold = CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();// because this is the only place we (unlikely) ever log, we lazily// create the logger hereLoggerFactory.getLogger(AbstractFileStateBackend.class).warn("Ignoring invalid file size threshold value ({}): {} - using default value {} instead.",CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.key(), sizeThreshold,CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue());}}/*** Gets the base directory where all the checkpoints are stored.* The job-specific checkpoint directory is created inside this directory.** @return The base directory for checkpoints.** @deprecated Deprecated in favor of {@link #getCheckpointPath()}.*/@Deprecatedpublic Path getBasePath() {return getCheckpointPath();}/*** Gets the base directory where all the checkpoints are stored.* The job-specific checkpoint directory is created inside this directory.** @return The base directory for checkpoints.*/@Nonnull@Overridepublic Path getCheckpointPath() {// we know that this can never be null by the way of constructor checks//noinspection ConstantConditionsreturn super.getCheckpointPath();}/*** Gets the threshold below which state is stored as part of the metadata, rather than in files.* This threshold ensures that the backend does not create a large amount of very small files,* where potentially the file pointers are larger than the state itself.** <p>If not explicitly configured, this is the default value of* {@link CheckpointingOptions#FS_SMALL_FILE_THRESHOLD}.** @return The file size threshold, in bytes.*/public int getMinFileSizeThreshold() {return fileStateThreshold >= 0 ?fileStateThreshold :CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();}/*** Gets whether the key/value data structures are asynchronously snapshotted.** <p>If not explicitly configured, this is the default value of* {@link CheckpointingOptions#ASYNC_SNAPSHOTS}.*/public boolean isUsingAsynchronousSnapshots() {return asynchronousSnapshots.getOrDefault(CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue());}// ------------------------------------------------------------------------// Reconfiguration// ------------------------------------------------------------------------/*** Creates a copy of this state backend that uses the values defined in the configuration* for fields where that were not specified in this state backend.** @param config the configuration* @return The re-configured variant of the state backend*/@Overridepublic FsStateBackend configure(Configuration config) {return new FsStateBackend(this, config);}// ------------------------------------------------------------------------// initialization and cleanup// ------------------------------------------------------------------------@Overridepublic CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {checkNotNull(jobId, "jobId");return new FsCheckpointStorage(getCheckpointPath(), getSavepointPath(), jobId, getMinFileSizeThreshold());}// ------------------------------------------------------------------------// state holding structures// ------------------------------------------------------------------------@Overridepublic <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env,JobID jobID,String operatorIdentifier,TypeSerializer<K> keySerializer,int numberOfKeyGroups,KeyGroupRange keyGroupRange,TaskKvStateRegistry kvStateRegistry,TtlTimeProvider ttlTimeProvider,MetricGroup metricGroup) {TaskStateManager taskStateManager = env.getTaskStateManager();LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();HeapPriorityQueueSetFactory priorityQueueSetFactory =new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);return new HeapKeyedStateBackend<>(kvStateRegistry,keySerializer,env.getUserClassLoader(),numberOfKeyGroups,keyGroupRange,isUsingAsynchronousSnapshots(),env.getExecutionConfig(),localRecoveryConfig,priorityQueueSetFactory,ttlTimeProvider);}@Overridepublic OperatorStateBackend createOperatorStateBackend(Environment env,String operatorIdentifier) {return new DefaultOperatorStateBackend(env.getUserClassLoader(),env.getExecutionConfig(),isUsingAsynchronousSnapshots());}// ------------------------------------------------------------------------// utilities// ------------------------------------------------------------------------@Overridepublic String toString() {return "File State Backend (" +"checkpoints: '" + getCheckpointPath() +"', savepoints: '" + getSavepointPath() +"', asynchronous: " + asynchronousSnapshots +", fileStateThreshold: " + fileStateThreshold + ")";}
}
复制代码
- FsStateBackend继承了AbstractFileStateBackend,同时实现了ConfigurableStateBackend接口;它的public构造器支持checkpointDirectory、defaultSavepointDirectory、fileStateSizeThreshold及asynchronousSnapshots这几个参数,它要求asynchronousSnapshots不能为null,fileStateSizeThreshold必须大于等于-1,小于等于MAX_FILE_STATE_THRESHOLD
- configure方法则调用的是private的构造器,它会根据Configuration对当前实例进行重新配置,比如重新设置asynchronousSnapshots,对于fileStateThreshold小于0的,则先取CheckpointingOptions.FS_SMALL_FILE_THRESHOLD的值,之后再对该值进行校正(
如果该值大于等于0,小于等于MAX_FILE_STATE_THRESHOLD则取该值,否则取CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue()
) - createCheckpointStorage方法创建的是FsCheckpointStorage,createKeyedStateBackend方法创建的是HeapKeyedStateBackend,createOperatorStateBackend方法创建的是DefaultOperatorStateBackend
小结
- FsStateBackend继承了AbstractFileStateBackend,同时实现了ConfigurableStateBackend接口的configure方法,里头要求fileStateThreshold大于等于-1,小于等于MAX_FILE_STATE_THRESHOLD
- FsStateBackend对于TaskManager的数据先是存在内存,在checkpoint的时候写入到指定的文件系统,而对于JobManager的metadata则存在内存;它默认采用的是async snapshots来避免阻塞线程;为了避免写太多的小文件,它有一个fileStateThreshold阈值,小于该值时state存储到metadata中而不是文件中
- createCheckpointStorage方法创建的是FsCheckpointStorage,createKeyedStateBackend方法创建的是HeapKeyedStateBackend,createOperatorStateBackend方法创建的是DefaultOperatorStateBackend
doc
- The FsStateBackend
- 聊聊flink的MemoryStateBackend
聊聊flink的FsStateBackend相关推荐
- 聊聊flink的HistoryServer
为什么80%的码农都做不了架构师?>>> 序 本文主要研究一下flink的HistoryServer HistoryServer flink-1.7.2/flink-runti ...
- 聊聊flink的TimeCharacteristic
为什么80%的码农都做不了架构师?>>> 序 本文主要研究一下flink的TimeCharacteristic TimeCharacteristic flink-streami ...
- 聊聊flink JobManager的heap大小设置
序 本文主要研究一下flink JobManager的heap大小设置 JobManagerOptions flink-core-1.7.1-sources.jar!/org/apache/flink ...
- 聊聊flink的InternalTimeServiceManager
序 本文主要研究一下flink的InternalTimeServiceManager InternalTimeServiceManager flink-streaming-java_2.11-1.7. ...
- 聊聊flink的StateTtlConfig
序 本文主要研究一下flink的StateTtlConfig 实例 import org.apache.flink.api.common.state.StateTtlConfig; import or ...
- 聊聊flink的AscendingTimestampExtractor
序 本文主要研究一下flink的AscendingTimestampExtractor AscendingTimestampExtractor flink-streaming-java_2.11-1. ...
- 聊聊flink的CheckpointScheduler
序 本文主要研究一下flink的CheckpointScheduler CheckpointCoordinatorDeActivator flink-runtime_2.11-1.7.0-source ...
- 聊聊flink的NetworkEnvironmentConfiguration
序 本文主要研究一下flink的NetworkEnvironmentConfiguration NetworkEnvironmentConfiguration flink-1.7.2/flink-ru ...
- 聊聊flink Table的groupBy操作
序 本文主要研究一下flink Table的groupBy操作 Table.groupBy flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/t ...
最新文章
- 学习一个 Linux 命令:shutdown 命令
- iframe 有那些缺点?
- 柱状图中最大的矩形—leetcode84
- 最有用的Postgres扩展:pg_stat_statements
- 9202 myslq 课堂笔记 dml dql
- pip修改下载源为国内源 linux系统
- ios 控件切圆_iOS中 切圆角,任意几个角(带边框,不带边框)__OC和Swift版本 韩俊强的博客...
- 现代抽象UI素材背景3D流畅的造型(样条)|轻松地为Web创建3D体验
- dataframe转化为array_Pandas入门教程:如何将列表转化成数据框?
- unity 获得当前物体_unity 获取物体尺寸
- python笔记记录神器 jupyter notebook
- 算法研究NO8.用数理统计法消除粗大误差
- 关于几款系统恢复常用工具的用法介绍
- 浙江大学2019年数学分析考研试题
- 人工智能:确定性推理
- 【RL系列】Multi-Armed Bandit问题笔记
- 中国互联网生态报告发布
- mes系统故障_MES系统常见问题解析
- oracle rac节点重启的原因,由重启引起的Oracle RAC节点宕机分析及追根溯源
- 一些常用的ider快捷键使用
热门文章
- 不用也要知道的几种算法(PHP版本)
- 【CSS3】---only-child选择器+only-of-type选择器
- 图片或文字或box垂直居中
- .Net中的事件处理模型
- mysql5.7非源码版msi安装教程
- python+flask编写一个简单的登录接口例子
- mysql的number类型对应的db2_【转】oracle数据库NUMBER数据类型
- 【C语言】指针进阶 - 指针数组 数组指针 数组指针传参 函数指针 指向函数指针数组的指针
- JAVAEWEB实现文件的上传案例
- java 锁定界面_Java中的锁