本文主要研究一下flink的OperatorStateBackend

OperatorStateBackend

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/OperatorStateBackend.java

/*** Interface that combines both, the user facing {@link OperatorStateStore} interface and the system interface* {@link Snapshotable}**/
public interface OperatorStateBackend extendsOperatorStateStore,Snapshotable<SnapshotResult<OperatorStateHandle>, Collection<OperatorStateHandle>>,Closeable,Disposable {@Overridevoid dispose();
}
  • OperatorStateBackend接口继承了OperatorStateStore、Snapshotable、Closeable、Disposable接口

OperatorStateStore

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/OperatorStateStore.java

/*** This interface contains methods for registering operator state with a managed store.*/
@PublicEvolving
public interface OperatorStateStore {<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception;<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;Set<String> getRegisteredStateNames();Set<String> getRegisteredBroadcastStateNames();// -------------------------------------------------------------------------------------------//  Deprecated methods// -------------------------------------------------------------------------------------------@Deprecated<S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception;@Deprecated<T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception;
}
  • OperatorStateStore定义了getBroadcastState、getListState、getUnionListState方法用于create或restore BroadcastState或者ListState;同时也定义了getRegisteredStateNames、getRegisteredBroadcastStateNames用于返回当前注册的state的名称

Snapshotable

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/Snapshotable.java

/*** Interface for operators that can perform snapshots of their state.** @param <S> Generic type of the state object that is created as handle to snapshots.* @param <R> Generic type of the state object that used in restore.*/
@Internal
public interface Snapshotable<S extends StateObject, R> extends SnapshotStrategy<S> {/*** Restores state that was previously snapshotted from the provided parameters. Typically the parameters are state* handles from which the old state is read.** @param state the old state to restore.*/void restore(@Nullable R state) throws Exception;
}
  • Snapshotable接口继承了SnapshotStrategy接口,同时定义了restore方法用于restore state

SnapshotStrategy

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/SnapshotStrategy.java

/*** Interface for different snapshot approaches in state backends. Implementing classes should ideally be stateless or at* least threadsafe, i.e. this is a functional interface and is can be called in parallel by multiple checkpoints.** @param <S> type of the returned state object that represents the result of the snapshot operation.*/
@Internal
public interface SnapshotStrategy<S extends StateObject> {/*** Operation that writes a snapshot into a stream that is provided by the given {@link CheckpointStreamFactory} and* returns a @{@link RunnableFuture} that gives a state handle to the snapshot. It is up to the implementation if* the operation is performed synchronous or asynchronous. In the later case, the returned Runnable must be executed* first before obtaining the handle.** @param checkpointId      The ID of the checkpoint.* @param timestamp         The timestamp of the checkpoint.* @param streamFactory     The factory that we can use for writing our state to streams.* @param checkpointOptions Options for how to perform this checkpoint.* @return A runnable future that will yield a {@link StateObject}.*/@NonnullRunnableFuture<S> snapshot(long checkpointId,long timestamp,@Nonnull CheckpointStreamFactory streamFactory,@Nonnull CheckpointOptions checkpointOptions) throws Exception;
}
  • SnapshotStrategy定义了snapshot方法,给不同的snapshot策略去实现,这里要求snapshot结果返回的类型是StateObject类型

AbstractSnapshotStrategy

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/AbstractSnapshotStrategy.java

/*** Abstract base class for implementing {@link SnapshotStrategy}, that gives a consistent logging across state backends.** @param <T> type of the snapshot result.*/
public abstract class AbstractSnapshotStrategy<T extends StateObject> implements SnapshotStrategy<SnapshotResult<T>> {private static final Logger LOG = LoggerFactory.getLogger(AbstractSnapshotStrategy.class);private static final String LOG_SYNC_COMPLETED_TEMPLATE = "{} ({}, synchronous part) in thread {} took {} ms.";private static final String LOG_ASYNC_COMPLETED_TEMPLATE = "{} ({}, asynchronous part) in thread {} took {} ms.";/** Descriptive name of the snapshot strategy that will appear in the log outputs and {@link #toString()}. */@Nonnullprotected final String description;protected AbstractSnapshotStrategy(@Nonnull String description) {this.description = description;}/*** Logs the duration of the synchronous snapshot part from the given start time.*/public void logSyncCompleted(@Nonnull Object checkpointOutDescription, long startTime) {logCompletedInternal(LOG_SYNC_COMPLETED_TEMPLATE, checkpointOutDescription, startTime);}/*** Logs the duration of the asynchronous snapshot part from the given start time.*/public void logAsyncCompleted(@Nonnull Object checkpointOutDescription, long startTime) {logCompletedInternal(LOG_ASYNC_COMPLETED_TEMPLATE, checkpointOutDescription, startTime);}private void logCompletedInternal(@Nonnull String template,@Nonnull Object checkpointOutDescription,long startTime) {long duration = (System.currentTimeMillis() - startTime);LOG.debug(template,description,checkpointOutDescription,Thread.currentThread(),duration);}@Overridepublic String toString() {return "SnapshotStrategy {" + description + "}";}
}
  • AbstractSnapshotStrategy是个抽象类,它没有实现SnapshotStrategy定义的snapshot方法,这里只是提供了logSyncCompleted方法打印debug信息

StateObject

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/StateObject.java

/*** Base of all handles that represent checkpointed state in some form. The object may hold* the (small) state directly, or contain a file path (state is in the file), or contain the* metadata to access the state stored in some external database.** <p>State objects define how to {@link #discardState() discard state} and how to access the* {@link #getStateSize() size of the state}.* * <p>State Objects are transported via RPC between <i>JobManager</i> and* <i>TaskManager</i> and must be {@link java.io.Serializable serializable} to support that.* * <p>Some State Objects are stored in the checkpoint/savepoint metadata. For long-term* compatibility, they are not stored via {@link java.io.Serializable Java Serialization},* but through custom serializers.*/
public interface StateObject extends Serializable {void discardState() throws Exception;long getStateSize();
}
  • StateObject继承了Serializable接口,因为会通过rpc在JobManager及TaskManager之间进行传输;这个接口定义了discardState及getStateSize方法,discardState用于清理资源,而getStateSize用于返回state的大小

StreamStateHandle

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/StreamStateHandle.java

/*** A {@link StateObject} that represents state that was written to a stream. The data can be read* back via {@link #openInputStream()}.*/
public interface StreamStateHandle extends StateObject {/*** Returns an {@link FSDataInputStream} that can be used to read back the data that* was previously written to the stream.*/FSDataInputStream openInputStream() throws IOException;
}
  • StreamStateHandle继承了StateObject接口,多定义了openInputStream方法

OperatorStateHandle

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/OperatorStateHandle.java

/*** Interface of a state handle for operator state.*/
public interface OperatorStateHandle extends StreamStateHandle {/*** Returns a map of meta data for all contained states by their name.*/Map<String, StateMetaInfo> getStateNameToPartitionOffsets();/*** Returns an input stream to read the operator state information.*/@OverrideFSDataInputStream openInputStream() throws IOException;/*** Returns the underlying stream state handle that points to the state data.*/StreamStateHandle getDelegateStateHandle();//......
}
  • OperatorStateHandle继承了StreamStateHandle,它多定义了getStateNameToPartitionOffsets、getDelegateStateHandle方法,其中getStateNameToPartitionOffsets提供了state name到可用partitions的offset的映射信息

OperatorStreamStateHandle

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/OperatorStreamStateHandle.java

/*** State handle for partitionable operator state. Besides being a {@link StreamStateHandle}, this also provides a* map that contains the offsets to the partitions of named states in the stream.*/
public class OperatorStreamStateHandle implements OperatorStateHandle {private static final long serialVersionUID = 35876522969227335L;/*** unique state name -> offsets for available partitions in the handle stream*/private final Map<String, StateMetaInfo> stateNameToPartitionOffsets;private final StreamStateHandle delegateStateHandle;public OperatorStreamStateHandle(Map<String, StateMetaInfo> stateNameToPartitionOffsets,StreamStateHandle delegateStateHandle) {this.delegateStateHandle = Preconditions.checkNotNull(delegateStateHandle);this.stateNameToPartitionOffsets = Preconditions.checkNotNull(stateNameToPartitionOffsets);}@Overridepublic Map<String, StateMetaInfo> getStateNameToPartitionOffsets() {return stateNameToPartitionOffsets;}@Overridepublic void discardState() throws Exception {delegateStateHandle.discardState();}@Overridepublic long getStateSize() {return delegateStateHandle.getStateSize();}@Overridepublic FSDataInputStream openInputStream() throws IOException {return delegateStateHandle.openInputStream();}@Overridepublic StreamStateHandle getDelegateStateHandle() {return delegateStateHandle;}//......
}
  • OperatorStreamStateHandle实现了OperatorStateHandle接口,它定义了stateNameToPartitionOffsets属性(Map<String, StateMetaInfo>),而getStateNameToPartitionOffsets方法就是返回的stateNameToPartitionOffsets属性

SnapshotResult

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/SnapshotResult.java

/*** This class contains the combined results from the snapshot of a state backend:* <ul>*   <li>A state object representing the state that will be reported to the Job Manager to acknowledge the checkpoint.</li>*   <li>A state object that represents the state for the {@link TaskLocalStateStoreImpl}.</li>* </ul>** Both state objects are optional and can be null, e.g. if there was no state to snapshot in the backend. A local* state object that is not null also requires a state to report to the job manager that is not null, because the* Job Manager always owns the ground truth about the checkpointed state.*/
public class SnapshotResult<T extends StateObject> implements StateObject {private static final long serialVersionUID = 1L;/** An singleton instance to represent an empty snapshot result. */private static final SnapshotResult<?> EMPTY = new SnapshotResult<>(null, null);/** This is the state snapshot that will be reported to the Job Manager to acknowledge a checkpoint. */private final T jobManagerOwnedSnapshot;/** This is the state snapshot that will be reported to the Job Manager to acknowledge a checkpoint. */private final T taskLocalSnapshot;/*** Creates a {@link SnapshotResult} for the given jobManagerOwnedSnapshot and taskLocalSnapshot. If the* jobManagerOwnedSnapshot is null, taskLocalSnapshot must also be null.** @param jobManagerOwnedSnapshot Snapshot for report to job manager. Can be null.* @param taskLocalSnapshot Snapshot for report to local state manager. This is optional and requires*                             jobManagerOwnedSnapshot to be not null if this is not also null.*/private SnapshotResult(T jobManagerOwnedSnapshot, T taskLocalSnapshot) {if (jobManagerOwnedSnapshot == null && taskLocalSnapshot != null) {throw new IllegalStateException("Cannot report local state snapshot without corresponding remote state!");}this.jobManagerOwnedSnapshot = jobManagerOwnedSnapshot;this.taskLocalSnapshot = taskLocalSnapshot;}public T getJobManagerOwnedSnapshot() {return jobManagerOwnedSnapshot;}public T getTaskLocalSnapshot() {return taskLocalSnapshot;}@Overridepublic void discardState() throws Exception {Exception aggregatedExceptions = null;if (jobManagerOwnedSnapshot != null) {try {jobManagerOwnedSnapshot.discardState();} catch (Exception remoteDiscardEx) {aggregatedExceptions = remoteDiscardEx;}}if (taskLocalSnapshot != null) {try {taskLocalSnapshot.discardState();} catch (Exception localDiscardEx) {aggregatedExceptions = ExceptionUtils.firstOrSuppressed(localDiscardEx, aggregatedExceptions);}}if (aggregatedExceptions != null) {throw aggregatedExceptions;}}@Overridepublic long getStateSize() {return jobManagerOwnedSnapshot != null ? jobManagerOwnedSnapshot.getStateSize() : 0L;}@SuppressWarnings("unchecked")public static <T extends StateObject> SnapshotResult<T> empty() {return (SnapshotResult<T>) EMPTY;}public static <T extends StateObject> SnapshotResult<T> of(@Nullable T jobManagerState) {return jobManagerState != null ? new SnapshotResult<>(jobManagerState, null) : empty();}public static <T extends StateObject> SnapshotResult<T> withLocalState(@Nonnull T jobManagerState,@Nonnull T localState) {return new SnapshotResult<>(jobManagerState, localState);}
}
  • SnapshotResult类实现了StateObject接口,它包装了snapshot的结果,这里包括jobManagerOwnedSnapshot、taskLocalSnapshot;它实现的discardState方法,调用了jobManagerOwnedSnapshot及taskLocalSnapshot的discardState方法;getStateSize方法则返回的是jobManagerOwnedSnapshot的stateSize

DefaultOperatorStateBackend

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java

/*** Default implementation of OperatorStateStore that provides the ability to make snapshots.*/
@Internal
public class DefaultOperatorStateBackend implements OperatorStateBackend {private static final Logger LOG = LoggerFactory.getLogger(DefaultOperatorStateBackend.class);/*** The default namespace for state in cases where no state name is provided*/public static final String DEFAULT_OPERATOR_STATE_NAME = "_default_";/*** Map for all registered operator states. Maps state name -> state*/private final Map<String, PartitionableListState<?>> registeredOperatorStates;/*** Map for all registered operator broadcast states. Maps state name -> state*/private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;/*** CloseableRegistry to participate in the tasks lifecycle.*/private final CloseableRegistry closeStreamOnCancelRegistry;/*** Default serializer. Only used for the default operator state.*/private final JavaSerializer<Serializable> javaSerializer;/*** The user code classloader.*/private final ClassLoader userClassloader;/*** The execution configuration.*/private final ExecutionConfig executionConfig;/*** Flag to de/activate asynchronous snapshots.*/private final boolean asynchronousSnapshots;/*** Map of state names to their corresponding restored state meta info.** <p>TODO this map can be removed when eager-state registration is in place.* TODO we currently need this cached to check state migration strategies when new serializers are registered.*/private final Map<String, StateMetaInfoSnapshot> restoredOperatorStateMetaInfos;/*** Map of state names to their corresponding restored broadcast state meta info.*/private final Map<String, StateMetaInfoSnapshot> restoredBroadcastStateMetaInfos;/*** Cache of already accessed states.** <p>In contrast to {@link #registeredOperatorStates} and {@link #restoredOperatorStateMetaInfos} which may be repopulated* with restored state, this map is always empty at the beginning.** <p>TODO this map should be moved to a base class once we have proper hierarchy for the operator state backends.** @see <a href="https://issues.apache.org/jira/browse/FLINK-6849">FLINK-6849</a>*/private final HashMap<String, PartitionableListState<?>> accessedStatesByName;private final Map<String, BackendWritableBroadcastState<?, ?>> accessedBroadcastStatesByName;private final AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy;public DefaultOperatorStateBackend(ClassLoader userClassLoader,ExecutionConfig executionConfig,boolean asynchronousSnapshots) {this.closeStreamOnCancelRegistry = new CloseableRegistry();this.userClassloader = Preconditions.checkNotNull(userClassLoader);this.executionConfig = executionConfig;this.javaSerializer = new JavaSerializer<>();this.registeredOperatorStates = new HashMap<>();this.registeredBroadcastStates = new HashMap<>();this.asynchronousSnapshots = asynchronousSnapshots;this.accessedStatesByName = new HashMap<>();this.accessedBroadcastStatesByName = new HashMap<>();this.restoredOperatorStateMetaInfos = new HashMap<>();this.restoredBroadcastStateMetaInfos = new HashMap<>();this.snapshotStrategy = new DefaultOperatorStateBackendSnapshotStrategy();}@Overridepublic Set<String> getRegisteredStateNames() {return registeredOperatorStates.keySet();}@Overridepublic Set<String> getRegisteredBroadcastStateNames() {return registeredBroadcastStates.keySet();}@Overridepublic void close() throws IOException {closeStreamOnCancelRegistry.close();}@Overridepublic void dispose() {IOUtils.closeQuietly(closeStreamOnCancelRegistry);registeredOperatorStates.clear();registeredBroadcastStates.clear();}// -------------------------------------------------------------------------------------------//  State access methods// -------------------------------------------------------------------------------------------@SuppressWarnings("unchecked")@Overridepublic <K, V> BroadcastState<K, V> getBroadcastState(final MapStateDescriptor<K, V> stateDescriptor) throws StateMigrationException {//......}@Overridepublic <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception {return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);}@Overridepublic <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception {return getListState(stateDescriptor, OperatorStateHandle.Mode.UNION);}@Nonnull@Overridepublic RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(long checkpointId,long timestamp,@Nonnull CheckpointStreamFactory streamFactory,@Nonnull CheckpointOptions checkpointOptions) throws Exception {long syncStartTime = System.currentTimeMillis();RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshotRunner =snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);snapshotStrategy.logSyncCompleted(streamFactory, syncStartTime);return snapshotRunner;}//......
}
  • DefaultOperatorStateBackend实现了OperatorStateBackend接口
  • getRegisteredStateNames方法返回的是registeredOperatorStates.keySet();getRegisteredBroadcastStateNames方法返回的是registeredBroadcastStates.keySet(),可以看到这两个都是基于内存的Map来实现的
  • close方法主要是调用closeStreamOnCancelRegistry的close方法;dispose方法也会关闭closeStreamOnCancelRegistry,同时清空registeredOperatorStates及registeredBroadcastStates
  • getListState及getUnionListState方法都调用了getListState(ListStateDescriptor<S> stateDescriptor,OperatorStateHandle.Mode mode)方法
  • snapshot方法使用的snapshotStrategy是DefaultOperatorStateBackendSnapshotStrategy

DefaultOperatorStateBackend.getListState

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java

    private <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor,OperatorStateHandle.Mode mode) throws StateMigrationException {Preconditions.checkNotNull(stateDescriptor);String name = Preconditions.checkNotNull(stateDescriptor.getName());@SuppressWarnings("unchecked")PartitionableListState<S> previous = (PartitionableListState<S>) accessedStatesByName.get(name);if (previous != null) {checkStateNameAndMode(previous.getStateMetaInfo().getName(),name,previous.getStateMetaInfo().getAssignmentMode(),mode);return previous;}// end up here if its the first time access after execution for the// provided state name; check compatibility of restored state, if any// TODO with eager registration in place, these checks should be moved to restore()stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getElementSerializer());@SuppressWarnings("unchecked")PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredOperatorStates.get(name);if (null == partitionableListState) {// no restored state for the state name; simply create new state holderpartitionableListState = new PartitionableListState<>(new RegisteredOperatorStateBackendMetaInfo<>(name,partitionStateSerializer,mode));registeredOperatorStates.put(name, partitionableListState);} else {// has restored state; check compatibility of new state accesscheckStateNameAndMode(partitionableListState.getStateMetaInfo().getName(),name,partitionableListState.getStateMetaInfo().getAssignmentMode(),mode);StateMetaInfoSnapshot restoredSnapshot = restoredOperatorStateMetaInfos.get(name);RegisteredOperatorStateBackendMetaInfo<S> metaInfo =new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);// check compatibility to determine if state migration is requiredTypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();@SuppressWarnings("unchecked")TypeSerializerSnapshot<S> stateSerializerSnapshot = Preconditions.checkNotNull((TypeSerializerSnapshot<S>) restoredSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));TypeSerializerSchemaCompatibility<S> stateCompatibility =stateSerializerSnapshot.resolveSchemaCompatibility(newPartitionStateSerializer);if (stateCompatibility.isIncompatible()) {throw new StateMigrationException("The new state serializer for operator state must not be incompatible.");}partitionableListState.setStateMetaInfo(new RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer, mode));}accessedStatesByName.put(name, partitionableListState);return partitionableListState;}
  • 从registeredOperatorStates获取对应PartitionableListState,没有的话则创建,有的话则检查下兼容性,然后往partitionableListState设置stateMetaInfo

DefaultOperatorStateBackendSnapshotStrategy

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java

    /*** Snapshot strategy for this backend.*/private class DefaultOperatorStateBackendSnapshotStrategy extends AbstractSnapshotStrategy<OperatorStateHandle> {protected DefaultOperatorStateBackendSnapshotStrategy() {super("DefaultOperatorStateBackend snapshot");}@Nonnull@Overridepublic RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(final long checkpointId,final long timestamp,@Nonnull final CheckpointStreamFactory streamFactory,@Nonnull final CheckpointOptions checkpointOptions) throws IOException {if (registeredOperatorStates.isEmpty() && registeredBroadcastStates.isEmpty()) {return DoneFuture.of(SnapshotResult.empty());}final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies =new HashMap<>(registeredOperatorStates.size());final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies =new HashMap<>(registeredBroadcastStates.size());ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();Thread.currentThread().setContextClassLoader(userClassloader);try {// eagerly create deep copies of the list and the broadcast states (if any)// in the synchronous phase, so that we can use them in the async writing.if (!registeredOperatorStates.isEmpty()) {for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStates.entrySet()) {PartitionableListState<?> listState = entry.getValue();if (null != listState) {listState = listState.deepCopy();}registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);}}if (!registeredBroadcastStates.isEmpty()) {for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry : registeredBroadcastStates.entrySet()) {BackendWritableBroadcastState<?, ?> broadcastState = entry.getValue();if (null != broadcastState) {broadcastState = broadcastState.deepCopy();}registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);}}} finally {Thread.currentThread().setContextClassLoader(snapshotClassLoader);}AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>> snapshotCallable =new AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>() {@Overrideprotected SnapshotResult<OperatorStateHandle> callInternal() throws Exception {CheckpointStreamFactory.CheckpointStateOutputStream localOut =streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);registerCloseableForCancellation(localOut);// get the registered operator state infos ...List<StateMetaInfoSnapshot> operatorMetaInfoSnapshots =new ArrayList<>(registeredOperatorStatesDeepCopies.size());for (Map.Entry<String, PartitionableListState<?>> entry :registeredOperatorStatesDeepCopies.entrySet()) {operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());}// ... get the registered broadcast operator state infos ...List<StateMetaInfoSnapshot> broadcastMetaInfoSnapshots =new ArrayList<>(registeredBroadcastStatesDeepCopies.size());for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :registeredBroadcastStatesDeepCopies.entrySet()) {broadcastMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());}// ... write them all in the checkpoint stream ...DataOutputView dov = new DataOutputViewStreamWrapper(localOut);OperatorBackendSerializationProxy backendSerializationProxy =new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);backendSerializationProxy.write(dov);// ... and then go for the states ...// we put BOTH normal and broadcast state metadata hereint initialMapCapacity =registeredOperatorStatesDeepCopies.size() + registeredBroadcastStatesDeepCopies.size();final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =new HashMap<>(initialMapCapacity);for (Map.Entry<String, PartitionableListState<?>> entry :registeredOperatorStatesDeepCopies.entrySet()) {PartitionableListState<?> value = entry.getValue();long[] partitionOffsets = value.write(localOut);OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();writtenStatesMetaData.put(entry.getKey(),new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));}// ... and the broadcast states themselves ...for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :registeredBroadcastStatesDeepCopies.entrySet()) {BackendWritableBroadcastState<?, ?> value = entry.getValue();long[] partitionOffsets = {value.write(localOut)};OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();writtenStatesMetaData.put(entry.getKey(),new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));}// ... and, finally, create the state handle.OperatorStateHandle retValue = null;if (unregisterCloseableFromCancellation(localOut)) {StreamStateHandle stateHandle = localOut.closeAndGetHandle();if (stateHandle != null) {retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle);}return SnapshotResult.of(retValue);} else {throw new IOException("Stream was already unregistered.");}}@Overrideprotected void cleanupProvidedResources() {// nothing to do}@Overrideprotected void logAsyncSnapshotComplete(long startTime) {if (asynchronousSnapshots) {logAsyncCompleted(streamFactory, startTime);}}};final FutureTask<SnapshotResult<OperatorStateHandle>> task =snapshotCallable.toAsyncSnapshotFutureTask(closeStreamOnCancelRegistry);if (!asynchronousSnapshots) {task.run();}return task;}}
  • DefaultOperatorStateBackendSnapshotStrategy继承了AbstractSnapshotStrategy,它实现的snapshot方法主要是创建registeredOperatorStatesDeepCopies及registeredBroadcastStatesDeepCopies,然后通过AsyncSnapshotCallable来实现
  • AsyncSnapshotCallable抽象类实现了Callable接口的call方法,该方法会调用callInternal方法,然后再执行logAsyncSnapshotComplete方法
  • AsyncSnapshotCallable的callInternal方法返回的是SnapshotResult<OperatorStateHandle>,它里头主要是将registeredOperatorStatesDeepCopies及registeredBroadcastStatesDeepCopies的数据写入到CheckpointStreamFactory(比如MemCheckpointStreamFactory).CheckpointStateOutputStream及writtenStatesMetaData,最后通过CheckpointStateOutputStream的closeAndGetHandle返回的stateHandle及writtenStatesMetaData创建OperatorStreamStateHandle返回

小结

  • OperatorStateBackend接口继承了OperatorStateStore、Snapshotable、Closeable、Disposable接口
  • OperatorStateStore定义了getBroadcastState、getListState、getUnionListState方法用于create或restore BroadcastState或者ListState;同时也定义了getRegisteredStateNames、getRegisteredBroadcastStateNames用于返回当前注册的state的名称;DefaultOperatorStateBackend实现了OperatorStateStore接口,getRegisteredStateNames方法返回的是registeredOperatorStates.keySet();getRegisteredBroadcastStateNames方法返回的是registeredBroadcastStates.keySet()(registeredOperatorStates及registeredBroadcastStates这两个都是内存的Map);getListState及getUnionListState方法都调用了getListState(ListStateDescriptor<S> stateDescriptor,OperatorStateHandle.Mode mode)方法
  • Snapshotable接口继承了SnapshotStrategy接口,同时定义了restore方法用于restore state;SnapshotStrategy定义了snapshot方法,给不同的snapshot策略去实现,这里要求snapshot结果返回的类型是StateObject类型;AbstractSnapshotStrategy是个抽象类,它没有实现SnapshotStrategy定义的snapshot方法,这里只是提供了logSyncCompleted方法打印debug信息
  • DefaultOperatorStateBackend实现了Snapshotable接口,snapshot方法使用的snapshotStrategy是DefaultOperatorStateBackendSnapshotStrategy;DefaultOperatorStateBackendSnapshotStrategy继承了AbstractSnapshotStrategy,它实现的snapshot方法主要是创建registeredOperatorStatesDeepCopies及registeredBroadcastStatesDeepCopies,然后通过AsyncSnapshotCallable来实现,它里头主要是将registeredOperatorStatesDeepCopies及registeredBroadcastStatesDeepCopies的数据写入到CheckpointStreamFactory(比如MemCheckpointStreamFactory).CheckpointStateOutputStream及writtenStatesMetaData
  • Snapshotable接口要求source的泛型为StateObject类型,StateObject继承了Serializable接口,因为会通过rpc在JobManager及TaskManager之间进行传输;OperatorStateBackend继承Snapshotable接口时,指定source为SnapshotResult<OperatorStateHandle>,而result的为Collection<OperatorStateHandle>类型
  • StreamStateHandle继承了StateObject接口,多定义了openInputStream方法;OperatorStateHandle继承了StreamStateHandle,它多定义了getStateNameToPartitionOffsets、getDelegateStateHandle方法,其中getStateNameToPartitionOffsets提供了state name到可用partitions的offset的映射信息;OperatorStreamStateHandle实现了OperatorStateHandle接口,它定义了stateNameToPartitionOffsets属性(Map<String,StateMetaInfo>),而getStateNameToPartitionOffsets方法就是返回的stateNameToPartitionOffsets属性
  • SnapshotResult类实现了StateObject接口,它包装了snapshot的结果,这里包括jobManagerOwnedSnapshot、taskLocalSnapshot;它实现的discardState方法,调用了jobManagerOwnedSnapshot及taskLocalSnapshot的discardState方法;getStateSize方法则返回的是jobManagerOwnedSnapshot的stateSize

doc

  • State Backends

聊聊flink的OperatorStateBackend相关推荐

  1. 聊聊flink的FsStateBackend

    序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...

  2. 聊聊flink的CheckpointScheduler

    序 本文主要研究一下flink的CheckpointScheduler CheckpointCoordinatorDeActivator flink-runtime_2.11-1.7.0-source ...

  3. 聊聊flink的CheckpointScheduler 1

    序 本文主要研究一下flink的CheckpointScheduler CheckpointCoordinatorDeActivator flink-runtime_2.11-1.7.0-source ...

  4. 聊聊flink的HistoryServer

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的HistoryServer HistoryServer flink-1.7.2/flink-runti ...

  5. 聊聊flink的TimeCharacteristic

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的TimeCharacteristic TimeCharacteristic flink-streami ...

  6. 聊聊flink JobManager的heap大小设置

    序 本文主要研究一下flink JobManager的heap大小设置 JobManagerOptions flink-core-1.7.1-sources.jar!/org/apache/flink ...

  7. 聊聊flink的InternalTimeServiceManager

    序 本文主要研究一下flink的InternalTimeServiceManager InternalTimeServiceManager flink-streaming-java_2.11-1.7. ...

  8. 聊聊flink的StateTtlConfig

    序 本文主要研究一下flink的StateTtlConfig 实例 import org.apache.flink.api.common.state.StateTtlConfig; import or ...

  9. 聊聊flink的AscendingTimestampExtractor

    序 本文主要研究一下flink的AscendingTimestampExtractor AscendingTimestampExtractor flink-streaming-java_2.11-1. ...

最新文章

  1. 关于python字典以下选项中描述错误的是_关于 Python 对文件的处理,以下选项中描述错误的是_学小易找答案...
  2. mysql workbench 在模板与数据库间同步
  3. linux java串口读写权限_解决linux下java读取串口之权限问题 No permission to create lock file. | 学步园...
  4. 【matlab】ode45求解二阶微分方程,绘制曲线图 | 使用函数句柄的方法
  5. python对于设计师有什么用-如果你有设计师朋友,请对他好一些...
  6. Ubuntu图形界面更改软件下载源为清华镜像站以及解决等待apt退出的问题
  7. 【完美解决】arcgis engine 10.0 for cross platform C++ 在visual studio2010上编译的AE程序 的License无法初始化错误。...
  8. win10清理注册表的方法
  9. iOS 逻辑分辨率、物理分辨率
  10. CSS3 Shape详解
  11. 宁波大学2014年数学分析考研试题
  12. 九月亲测可运营【付费视频奖励计划赞助视频付费计划】最新版本的视频奖励源代码带有订阅可扣除金额带有代理新UI ...
  13. ZipOutputStream导出压缩文件
  14. 运行 c++ 程序出现“Failed to execute ”.exe“ Error 0:操作成功完成
  15. 电脑C盘空间严重不足,教你5招!电脑内存瞬间多出10个G
  16. ebcdic java_java EBCDIC
  17. Where storage lives(From:Think in JAVA)
  18. CT探测器中“排”与“层”的实现方法
  19. uniapp之 登录成功后返回之前页面
  20. 【私有云盘】利用服务器搭建专属私有云盘

热门文章

  1. python中矩阵拼接_numpy实现合并多维矩阵、list的扩展方法
  2. JavaScript语言基础7
  3. Suring开发集成部署时问题记录
  4. Linux Centos7 命令总结
  5. 第一阶段团队成员贡献打分
  6. git没有changId解决方法
  7. Windows 之 win10快捷键
  8. 【读书笔记】iOS-ARC-不要向已经释放的对象发送消息
  9. 求数列的和 AC 杭电
  10. navicat for mysql 显示中文乱码解决办法