本文主要研究一下flink的CheckpointScheduler

CheckpointCoordinatorDeActivator

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java

/*** This actor listens to changes in the JobStatus and activates or deactivates the periodic* checkpoint scheduler.*/
public class CheckpointCoordinatorDeActivator implements JobStatusListener {private final CheckpointCoordinator coordinator;public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) {this.coordinator = checkNotNull(coordinator);}@Overridepublic void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {if (newJobStatus == JobStatus.RUNNING) {// start the checkpoint schedulercoordinator.startCheckpointScheduler();} else {// anything else should stop the trigger for nowcoordinator.stopCheckpointScheduler();}}
}
复制代码
  • CheckpointCoordinatorDeActivator实现了JobStatusListener接口,在jobStatusChanges的时候,根据状态来调用coordinator.startCheckpointScheduler或者coordinator.stopCheckpointScheduler

CheckpointCoordinator.ScheduledTrigger

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java

/*** The checkpoint coordinator coordinates the distributed snapshots of operators and state.* It triggers the checkpoint by sending the messages to the relevant tasks and collects the* checkpoint acknowledgements. It also collects and maintains the overview of the state handles* reported by the tasks that acknowledge the checkpoint.*/
public class CheckpointCoordinator {/** Map from checkpoint ID to the pending checkpoint */private final Map<Long, PendingCheckpoint> pendingCheckpoints;/** The number of consecutive failed trigger attempts */private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);//......public void startCheckpointScheduler() {synchronized (lock) {if (shutdown) {throw new IllegalArgumentException("Checkpoint coordinator is shut down");}// make sure all prior timers are cancelledstopCheckpointScheduler();periodicScheduling = true;long initialDelay = ThreadLocalRandom.current().nextLong(minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);currentPeriodicTrigger = timer.scheduleAtFixedRate(new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);}}public void stopCheckpointScheduler() {synchronized (lock) {triggerRequestQueued = false;periodicScheduling = false;if (currentPeriodicTrigger != null) {currentPeriodicTrigger.cancel(false);currentPeriodicTrigger = null;}for (PendingCheckpoint p : pendingCheckpoints.values()) {p.abortError(new Exception("Checkpoint Coordinator is suspending."));}pendingCheckpoints.clear();numUnsuccessfulCheckpointsTriggers.set(0);}}private final class ScheduledTrigger implements Runnable {@Overridepublic void run() {try {triggerCheckpoint(System.currentTimeMillis(), true);}catch (Exception e) {LOG.error("Exception while triggering checkpoint for job {}.", job, e);}}}//......
}
复制代码
  • CheckpointCoordinator的startCheckpointScheduler方法首先调用stopCheckpointScheduler取消PendingCheckpoint,之后使用timer.scheduleAtFixedRate重新调度ScheduledTrigger
  • stopCheckpointScheduler会调用PendingCheckpoint.abortError来取消pendingCheckpoints,然后清空pendingCheckpoints(Map<Long, PendingCheckpoint>)以及numUnsuccessfulCheckpointsTriggers(AtomicInteger)
  • ScheduledTrigger实现了Runnable接口,其run方法主要是调用triggerCheckpoint,传递的isPeriodic参数为true

CheckpointCoordinator.triggerCheckpoint

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java

/*** The checkpoint coordinator coordinates the distributed snapshots of operators and state.* It triggers the checkpoint by sending the messages to the relevant tasks and collects the* checkpoint acknowledgements. It also collects and maintains the overview of the state handles* reported by the tasks that acknowledge the checkpoint.*/
public class CheckpointCoordinator {/** Tasks who need to be sent a message when a checkpoint is started */private final ExecutionVertex[] tasksToTrigger;/** Tasks who need to acknowledge a checkpoint before it succeeds */private final ExecutionVertex[] tasksToWaitFor;/** Map from checkpoint ID to the pending checkpoint */private final Map<Long, PendingCheckpoint> pendingCheckpoints;/** The maximum number of checkpoints that may be in progress at the same time */private final int maxConcurrentCheckpointAttempts;/** The min time(in ns) to delay after a checkpoint could be triggered. Allows to* enforce minimum processing time between checkpoint attempts */private final long minPauseBetweenCheckpointsNanos;/*** Triggers a new standard checkpoint and uses the given timestamp as the checkpoint* timestamp.** @param timestamp The timestamp for the checkpoint.* @param isPeriodic Flag indicating whether this triggered checkpoint is* periodic. If this flag is true, but the periodic scheduler is disabled,* the checkpoint will be declined.* @return <code>true</code> if triggering the checkpoint succeeded.*/public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {return triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic).isSuccess();}@VisibleForTestingpublic CheckpointTriggerResult triggerCheckpoint(long timestamp,CheckpointProperties props,@Nullable String externalSavepointLocation,boolean isPeriodic) {// make some eager pre-checkssynchronized (lock) {// abort if the coordinator has been shutdown in the meantimeif (shutdown) {return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);}// Don't allow periodic checkpoint if scheduling has been disabledif (isPeriodic && !periodicScheduling) {return new CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN);}// validate whether the checkpoint can be triggered, with respect to the limit of// concurrent checkpoints, and the minimum time between checkpoints.// these checks are not relevant for savepointsif (!props.forceCheckpoint()) {// sanity check: there should never be more than one trigger request queuedif (triggerRequestQueued) {LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);}// if too many checkpoints are currently in progress, we need to mark that a request is queuedif (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {triggerRequestQueued = true;if (currentPeriodicTrigger != null) {currentPeriodicTrigger.cancel(false);currentPeriodicTrigger = null;}return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);}// make sure the minimum interval between checkpoints has passedfinal long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;if (durationTillNextMillis > 0) {if (currentPeriodicTrigger != null) {currentPeriodicTrigger.cancel(false);currentPeriodicTrigger = null;}// Reassign the new trigger to the currentPeriodicTriggercurrentPeriodicTrigger = timer.scheduleAtFixedRate(new ScheduledTrigger(),durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);}}}// check if all tasks that we need to trigger are running.// if not, abort the checkpointExecution[] executions = new Execution[tasksToTrigger.length];for (int i = 0; i < tasksToTrigger.length; i++) {Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();if (ee == null) {LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",tasksToTrigger[i].getTaskNameWithSubtaskIndex(),job);return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);} else if (ee.getState() == ExecutionState.RUNNING) {executions[i] = ee;} else {LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",tasksToTrigger[i].getTaskNameWithSubtaskIndex(),job,ExecutionState.RUNNING,ee.getState());return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);}}// next, check if all tasks that need to acknowledge the checkpoint are running.// if not, abort the checkpointMap<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);for (ExecutionVertex ev : tasksToWaitFor) {Execution ee = ev.getCurrentExecutionAttempt();if (ee != null) {ackTasks.put(ee.getAttemptId(), ev);} else {LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",ev.getTaskNameWithSubtaskIndex(),job);return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);}}// we will actually trigger this checkpoint!// we lock with a special lock to make sure that trigger requests do not overtake each other.// this is not done with the coordinator-wide lock, because the 'checkpointIdCounter'// may issue blocking operations. Using a different lock than the coordinator-wide lock,// we avoid blocking the processing of 'acknowledge/decline' messages during that time.synchronized (triggerLock) {final CheckpointStorageLocation checkpointStorageLocation;final long checkpointID;try {// this must happen outside the coordinator-wide lock, because it communicates// with external services (in HA mode) and may block for a while.checkpointID = checkpointIdCounter.getAndIncrement();checkpointStorageLocation = props.isSavepoint() ?checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :checkpointStorage.initializeLocationForCheckpoint(checkpointID);}catch (Throwable t) {int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).",job,numUnsuccessful,t);return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);}final PendingCheckpoint checkpoint = new PendingCheckpoint(job,checkpointID,timestamp,ackTasks,props,checkpointStorageLocation,executor);if (statsTracker != null) {PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(checkpointID,timestamp,props);checkpoint.setStatsCallback(callback);}// schedule the timer that will clean up the expired checkpointsfinal Runnable canceller = () -> {synchronized (lock) {// only do the work if the checkpoint is not discarded anyways// note that checkpoint completion discards the pending checkpoint objectif (!checkpoint.isDiscarded()) {LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);checkpoint.abortExpired();pendingCheckpoints.remove(checkpointID);rememberRecentCheckpointId(checkpointID);triggerQueuedRequests();}}};try {// re-acquire the coordinator-wide locksynchronized (lock) {// since we released the lock in the meantime, we need to re-check// that the conditions still hold.if (shutdown) {return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);}else if (!props.forceCheckpoint()) {if (triggerRequestQueued) {LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);}if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {triggerRequestQueued = true;if (currentPeriodicTrigger != null) {currentPeriodicTrigger.cancel(false);currentPeriodicTrigger = null;}return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);}// make sure the minimum interval between checkpoints has passedfinal long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;if (durationTillNextMillis > 0) {if (currentPeriodicTrigger != null) {currentPeriodicTrigger.cancel(false);currentPeriodicTrigger = null;}// Reassign the new trigger to the currentPeriodicTriggercurrentPeriodicTrigger = timer.scheduleAtFixedRate(new ScheduledTrigger(),durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);}}LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);pendingCheckpoints.put(checkpointID, checkpoint);ScheduledFuture<?> cancellerHandle = timer.schedule(canceller,checkpointTimeout, TimeUnit.MILLISECONDS);if (!checkpoint.setCancellerHandle(cancellerHandle)) {// checkpoint is already disposed!cancellerHandle.cancel(false);}// trigger the master hooks for the checkpointfinal List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(),checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout));for (MasterState s : masterStates) {checkpoint.addMasterState(s);}}// end of lock scopefinal CheckpointOptions checkpointOptions = new CheckpointOptions(props.getCheckpointType(),checkpointStorageLocation.getLocationReference());// send the messages to the tasks that trigger their checkpointfor (Execution execution: executions) {execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);}numUnsuccessfulCheckpointsTriggers.set(0);return new CheckpointTriggerResult(checkpoint);}catch (Throwable t) {// guard the map against concurrent modificationssynchronized (lock) {pendingCheckpoints.remove(checkpointID);}int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",checkpointID, job, numUnsuccessful, t);if (!checkpoint.isDiscarded()) {checkpoint.abortError(new Exception("Failed to trigger checkpoint", t));}try {checkpointStorageLocation.disposeOnFailure();}catch (Throwable t2) {LOG.warn("Cannot dispose failed checkpoint storage location {}", checkpointStorageLocation, t2);}return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);}} // end trigger lock}//......
}
复制代码
  • 首先判断如果不是forceCheckpoint的话,则判断当前的pendingCheckpoints值是否超过maxConcurrentCheckpointAttempts,超过的话,立刻fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);之后判断距离lastCheckpointCompletionNanos的时间是否大于等于minPauseBetweenCheckpointsNanos,否则fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS),确保checkpoint不被频繁触发
  • 之后检查tasksToTrigger的任务(触发checkpoint的时候需要通知到的task)是否都处于RUNNING状态,不是的话则立刻fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING)
  • 之后检查tasksToWaitFor的任务(需要在执行成功的时候ack checkpoint的任务)是否都处于RUNNING状态,不是的话立刻fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING)
  • 前面几步检查通过了之后才开始真正的checkpoint的触发,它首先分配一个checkpointID,然后初始化checkpointStorageLocation,如果异常则返回CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);之后创建PendingCheckpoint,同时准备canceller(用于在失效的时候执行abort操作);之后对于不是forceCheckpoint的,再重新来一轮TOO_MANY_CONCURRENT_CHECKPOINTS、MINIMUM_TIME_BETWEEN_CHECKPOINTS校验
  • 最后就是针对Execution,挨个触发execution的triggerCheckpoint操作,成功返回CheckpointTriggerResult(checkpoint),异常则返回CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION)

Execution.triggerCheckpoint

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/executiongraph/Execution.java

public class Execution implements AccessExecution, Archiveable<ArchivedExecution>, LogicalSlot.Payload {/*** Trigger a new checkpoint on the task of this execution.** @param checkpointId of th checkpoint to trigger* @param timestamp of the checkpoint to trigger* @param checkpointOptions of the checkpoint to trigger*/public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {final LogicalSlot slot = assignedResource;if (slot != null) {final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);} else {LOG.debug("The execution has no slot assigned. This indicates that the execution is " +"no longer running.");}}//......
}
复制代码
  • triggerCheckpoint主要是调用taskManagerGateway.triggerCheckpoint,这里的taskManagerGateway为RpcTaskManagerGateway

RpcTaskManagerGateway

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java

/*** Implementation of the {@link TaskManagerGateway} for Flink's RPC system.*/
public class RpcTaskManagerGateway implements TaskManagerGateway {private final TaskExecutorGateway taskExecutorGateway;public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {taskExecutorGateway.triggerCheckpoint(executionAttemptID,checkpointId,timestamp,checkpointOptions);}//......
}
复制代码
  • RpcTaskManagerGateway的triggerCheckpoint方法调用taskExecutorGateway.triggerCheckpoint,这里的taskExecutorGateway为AkkaInvocationHandler,通过rpc通知TaskExecutor

TaskExecutor.triggerCheckpoint

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskExecutor.java

/*** TaskExecutor implementation. The task executor is responsible for the execution of multiple* {@link Task}.*/
public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {public CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID,long checkpointId,long checkpointTimestamp,CheckpointOptions checkpointOptions) {log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);final Task task = taskSlotTable.getTask(executionAttemptID);if (task != null) {task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);return CompletableFuture.completedFuture(Acknowledge.get());} else {final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';log.debug(message);return FutureUtils.completedExceptionally(new CheckpointException(message));}}//......
}
复制代码
  • TaskExecutor的triggerCheckpoint方法这里调用task.triggerCheckpointBarrier

Task.triggerCheckpointBarrier

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

public class Task implements Runnable, TaskActions, CheckpointListener {/** The invokable of this task, if initialized. All accesses must copy the reference and* check for null, as this field is cleared as part of the disposal logic. */@Nullableprivate volatile AbstractInvokable invokable;/*** Calls the invokable to trigger a checkpoint.** @param checkpointID The ID identifying the checkpoint.* @param checkpointTimestamp The timestamp associated with the checkpoint.* @param checkpointOptions Options for performing this checkpoint.*/public void triggerCheckpointBarrier(final long checkpointID,long checkpointTimestamp,final CheckpointOptions checkpointOptions) {final AbstractInvokable invokable = this.invokable;final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);if (executionState == ExecutionState.RUNNING && invokable != null) {// build a local closurefinal String taskName = taskNameWithSubtask;final SafetyNetCloseableRegistry safetyNetCloseableRegistry =FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();Runnable runnable = new Runnable() {@Overridepublic void run() {// set safety net from the task's context for checkpointing threadLOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);try {boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);if (!success) {checkpointResponder.declineCheckpoint(getJobID(), getExecutionId(), checkpointID,new CheckpointDeclineTaskNotReadyException(taskName));}}catch (Throwable t) {if (getExecutionState() == ExecutionState.RUNNING) {failExternally(new Exception("Error while triggering checkpoint " + checkpointID + " for " +taskNameWithSubtask, t));} else {LOG.debug("Encountered error while triggering checkpoint {} for " +"{} ({}) while being not in state running.", checkpointID,taskNameWithSubtask, executionId, t);}} finally {FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);}}};executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));}else {LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);// send back a message that we did not do the checkpointcheckpointResponder.declineCheckpoint(jobId, executionId, checkpointID,new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));}}//......
}
复制代码
  • Task的triggerCheckpointBarrier方法首先判断executionState是否RUNNING以及invokable是否不为null,不满足条件则执行checkpointResponder.declineCheckpoint
  • 满足条件则执行executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId))
  • 这个runnable方法里头会执行invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions),这里的invokable为SourceStreamTask

SourceStreamTask.triggerCheckpoint

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java

@Internal
public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>>extends StreamTask<OUT, OP> {private volatile boolean externallyInducedCheckpoints;@Overridepublic boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {if (!externallyInducedCheckpoints) {return super.triggerCheckpoint(checkpointMetaData, checkpointOptions);}else {// we do not trigger checkpoints here, we simply state whether we can trigger themsynchronized (getCheckpointLock()) {return isRunning();}}}//......
}
复制代码
  • SourceStreamTask的triggerCheckpoint先判断,如果externallyInducedCheckpoints为false,则调用父类StreamTask的triggerCheckpoint

StreamTask.triggerCheckpoint

@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>extends AbstractInvokableimplements AsyncExceptionHandler {@Overridepublic boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {try {// No alignment if we inject a checkpointCheckpointMetrics checkpointMetrics = new CheckpointMetrics().setBytesBufferedInAlignment(0L).setAlignmentDurationNanos(0L);return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);}catch (Exception e) {// propagate exceptions only if the task is still in "running" stateif (isRunning) {throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() +" for operator " + getName() + '.', e);} else {LOG.debug("Could not perform checkpoint {} for operator {} while the " +"invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);return false;}}}private boolean performCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics) throws Exception {LOG.debug("Starting checkpoint ({}) {} on task {}",checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());synchronized (lock) {if (isRunning) {// we can do a checkpoint// All of the following steps happen as an atomic step from the perspective of barriers and// records/watermarks/timers/callbacks.// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream// checkpoint alignments// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.//           The pre-barrier work should be nothing or minimal in the common case.operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());// Step (2): Send the checkpoint barrier downstreamoperatorChain.broadcastCheckpointBarrier(checkpointMetaData.getCheckpointId(),checkpointMetaData.getTimestamp(),checkpointOptions);// Step (3): Take the state snapshot. This should be largely asynchronous, to not//           impact progress of the streaming topologycheckpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);return true;}else {// we cannot perform our checkpoint - let the downstream operators know that they// should not wait for any input from this operator// we cannot broadcast the cancellation markers on the 'operator chain', because it may not// yet be createdfinal CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());Exception exception = null;for (StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> streamRecordWriter : streamRecordWriters) {try {streamRecordWriter.broadcastEvent(message);} catch (Exception e) {exception = ExceptionUtils.firstOrSuppressed(new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),exception);}}if (exception != null) {throw exception;}return false;}}}private void checkpointState(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics) throws Exception {CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(checkpointMetaData.getCheckpointId(),checkpointOptions.getTargetLocation());CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this,checkpointMetaData,checkpointOptions,storage,checkpointMetrics);checkpointingOperation.executeCheckpointing();}//......
}
复制代码
  • StreamTask的triggerCheckpoint方法的主要处理逻辑在performCheckpoint方法上,该方法针对task的isRunning分别进行不同处理
  • isRunning为true的时候,这里头分了三步来处理,第一步执行operatorChain.prepareSnapshotPreBarrier,第二步执行operatorChain.broadcastCheckpointBarrier,第三步执行checkpointState方法,checkpointState里头创建CheckpointingOperation,然后调用checkpointingOperation.executeCheckpointing()
  • 如果isRunning为false,则这里streamRecordWriter.broadcastEvent(message),这里的message为CancelCheckpointMarker

OperatorChain.prepareSnapshotPreBarrier

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/OperatorChain.java

@Internal
public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer {public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {// go forward through the operator chain and tell each operator// to prepare the checkpointfinal StreamOperator<?>[] operators = this.allOperators;for (int i = operators.length - 1; i >= 0; --i) {final StreamOperator<?> op = operators[i];if (op != null) {op.prepareSnapshotPreBarrier(checkpointId);}}}//......
}
复制代码
  • OperatorChain的prepareSnapshotPreBarrier会遍历allOperators挨个调用StreamOperator的prepareSnapshotPreBarrier方法

OperatorChain.broadcastCheckpointBarrier

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/OperatorChain.java

@Internal
public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer {public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);for (RecordWriterOutput<?> streamOutput : streamOutputs) {streamOutput.broadcastEvent(barrier);}}//......
}
复制代码
  • OperatorChain的broadcastCheckpointBarrier方法则会遍历streamOutputs挨个调用streamOutput的broadcastEvent方法

CheckpointingOperation.executeCheckpointing

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java

    private static final class CheckpointingOperation {private final StreamTask<?, ?> owner;private final CheckpointMetaData checkpointMetaData;private final CheckpointOptions checkpointOptions;private final CheckpointMetrics checkpointMetrics;private final CheckpointStreamFactory storageLocation;private final StreamOperator<?>[] allOperators;private long startSyncPartNano;private long startAsyncPartNano;// ------------------------private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;public CheckpointingOperation(StreamTask<?, ?> owner,CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointStreamFactory checkpointStorageLocation,CheckpointMetrics checkpointMetrics) {this.owner = Preconditions.checkNotNull(owner);this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData);this.checkpointOptions = Preconditions.checkNotNull(checkpointOptions);this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics);this.storageLocation = Preconditions.checkNotNull(checkpointStorageLocation);this.allOperators = owner.operatorChain.getAllOperators();this.operatorSnapshotsInProgress = new HashMap<>(allOperators.length);}public void executeCheckpointing() throws Exception {startSyncPartNano = System.nanoTime();try {for (StreamOperator<?> op : allOperators) {checkpointStreamOperator(op);}if (LOG.isDebugEnabled()) {LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",checkpointMetaData.getCheckpointId(), owner.getName());}startAsyncPartNano = System.nanoTime();checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submitAsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(owner,operatorSnapshotsInProgress,checkpointMetaData,checkpointMetrics,startAsyncPartNano);owner.cancelables.registerCloseable(asyncCheckpointRunnable);owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);if (LOG.isDebugEnabled()) {LOG.debug("{} - finished synchronous part of checkpoint {}. " +"Alignment duration: {} ms, snapshot duration {} ms",owner.getName(), checkpointMetaData.getCheckpointId(),checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,checkpointMetrics.getSyncDurationMillis());}} catch (Exception ex) {// Cleanup to release resourcesfor (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {if (null != operatorSnapshotResult) {try {operatorSnapshotResult.cancel();} catch (Exception e) {LOG.warn("Could not properly cancel an operator snapshot result.", e);}}}if (LOG.isDebugEnabled()) {LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. " +"Alignment duration: {} ms, snapshot duration {} ms",owner.getName(), checkpointMetaData.getCheckpointId(),checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,checkpointMetrics.getSyncDurationMillis());}owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, ex);}}@SuppressWarnings("deprecation")private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {if (null != op) {OperatorSnapshotFutures snapshotInProgress = op.snapshotState(checkpointMetaData.getCheckpointId(),checkpointMetaData.getTimestamp(),checkpointOptions,storageLocation);operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);}}private enum AsyncCheckpointState {RUNNING,DISCARDED,COMPLETED}}
复制代码
  • CheckpointingOperation定义在StreamTask类里头,executeCheckpointing方法先对所有的StreamOperator执行checkpointStreamOperator操作,checkpointStreamOperator方法会调用StreamOperator的snapshotState方法,之后创建AsyncCheckpointRunnable任务并提交异步运行

AbstractStreamOperator.snapshotState

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java

@PublicEvolving
public abstract class AbstractStreamOperator<OUT>implements StreamOperator<OUT>, Serializable {@Overridepublic final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,CheckpointStreamFactory factory) throws Exception {KeyGroupRange keyGroupRange = null != keyedStateBackend ?keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(checkpointId,timestamp,factory,keyGroupRange,getContainingTask().getCancelables())) {snapshotState(snapshotContext);snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());if (null != operatorStateBackend) {snapshotInProgress.setOperatorStateManagedFuture(operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}if (null != keyedStateBackend) {snapshotInProgress.setKeyedStateManagedFuture(keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}} catch (Exception snapshotException) {try {snapshotInProgress.cancel();} catch (Exception e) {snapshotException.addSuppressed(e);}String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " +getOperatorName() + ".";if (!getContainingTask().isCanceled()) {LOG.info(snapshotFailMessage, snapshotException);}throw new Exception(snapshotFailMessage, snapshotException);}return snapshotInProgress;}/*** Stream operators with state, which want to participate in a snapshot need to override this hook method.** @param context context that provides information and means required for taking a snapshot*/public void snapshotState(StateSnapshotContext context) throws Exception {final KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();//TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshotsif (keyedStateBackend instanceof AbstractKeyedStateBackend &&((AbstractKeyedStateBackend<?>) keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) {KeyedStateCheckpointOutputStream out;try {out = context.getRawKeyedOperatorStateOutput();} catch (Exception exception) {throw new Exception("Could not open raw keyed operator state stream for " +getOperatorName() + '.', exception);}try {KeyGroupsList allKeyGroups = out.getKeyGroupList();for (int keyGroupIdx : allKeyGroups) {out.startNewKeyGroup(keyGroupIdx);timeServiceManager.snapshotStateForKeyGroup(new DataOutputViewStreamWrapper(out), keyGroupIdx);}} catch (Exception exception) {throw new Exception("Could not write timer service of " + getOperatorName() +" to checkpoint state stream.", exception);} finally {try {out.close();} catch (Exception closeException) {LOG.warn("Could not close raw keyed operator state stream for {}. This " +"might have prevented deleting some state data.", getOperatorName(), closeException);}}}}//......
}
复制代码
  • AbstractStreamOperator的snapshotState方法只有在keyedStateBackend是AbstractKeyedStateBackend类型,而且requiresLegacySynchronousTimerSnapshots为true的条件下才会操作,具体是触发timeServiceManager.snapshotStateForKeyGroup(new DataOutputViewStreamWrapper(out), keyGroupIdx);不过它有不同的子类可能覆盖了snapshotState方法,比如AbstractUdfStreamOperator

AbstractUdfStreamOperator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java

@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>extends AbstractStreamOperator<OUT>implements OutputTypeConfigurable<OUT> {@Overridepublic void snapshotState(StateSnapshotContext context) throws Exception {super.snapshotState(context);StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);}//......
}
复制代码
  • AbstractUdfStreamOperator覆盖了父类AbstractStreamOperator的snapshotState方法,新增了StreamingFunctionUtils.snapshotFunctionState操作

StreamingFunctionUtils.snapshotFunctionState

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java

@Internal
public final class StreamingFunctionUtils {public static void snapshotFunctionState(StateSnapshotContext context,OperatorStateBackend backend,Function userFunction) throws Exception {Preconditions.checkNotNull(context);Preconditions.checkNotNull(backend);while (true) {if (trySnapshotFunctionState(context, backend, userFunction)) {break;}// inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner functionif (userFunction instanceof WrappingFunction) {userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();} else {break;}}}private static boolean trySnapshotFunctionState(StateSnapshotContext context,OperatorStateBackend backend,Function userFunction) throws Exception {if (userFunction instanceof CheckpointedFunction) {((CheckpointedFunction) userFunction).snapshotState(context);return true;}if (userFunction instanceof ListCheckpointed) {@SuppressWarnings("unchecked")List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());ListState<Serializable> listState = backend.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);listState.clear();if (null != partitionableState) {try {for (Serializable statePartition : partitionableState) {listState.add(statePartition);}} catch (Exception e) {listState.clear();throw new Exception("Could not write partitionable state to operator " +"state backend.", e);}}return true;}return false;}//......
}
复制代码
  • snapshotFunctionState方法,这里执行了trySnapshotFunctionState操作,这里userFunction的类型,如果实现了CheckpointedFunction接口,则调用CheckpointedFunction.snapshotState,如果实现了ListCheckpointed接口,则调用ListCheckpointed.snapshotState方法,注意这里先clear了ListState,然后调用ListState.add方法将返回的List添加到ListState中

小结

  • flink的CheckpointCoordinatorDeActivator在job的status为RUNNING的时候会触发CheckpointCoordinator的startCheckpointScheduler,非RUNNING的时候调用CheckpointCoordinator的stopCheckpointScheduler方法
  • CheckpointCoordinator的startCheckpointScheduler主要是注册了ScheduledTrigger任务,其run方法执行triggerCheckpoint操作,triggerCheckpoint方法在真正触发checkpoint之前会进行一系列的校验,不满足则立刻fail fast,其中可能的原因有(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS、CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS、NOT_ALL_REQUIRED_TASKS_RUNNING);满足条件的话,就是挨个遍历executions,调用Execution.triggerCheckpoint,它借助taskManagerGateway.triggerCheckpoint来通过rpc调用TaskExecutor的triggerCheckpoint方法
  • TaskExecutor的triggerCheckpoint主要是调用Task的triggerCheckpointBarrier方法,后者主要是异步执行一个runnable,里头的run方法是调用invokable.triggerCheckpoint,这里的invokable为SourceStreamTask,而它主要是调用父类StreamTask的triggerCheckpoint方法,该方法的主要逻辑在performCheckpoint操作上;performCheckpoint在isRunning为true的时候,分了三步来处理,第一步执行operatorChain.prepareSnapshotPreBarrier,第二步执行operatorChain.broadcastCheckpointBarrier,第三步执行checkpointState方法,checkpointState里头创建CheckpointingOperation,然后调用checkpointingOperation.executeCheckpointing()
  • CheckpointingOperation的executeCheckpointing方法会对所有的StreamOperator执行checkpointStreamOperator操作,而checkpointStreamOperator方法会调用StreamOperator的snapshotState方法;AbstractStreamOperator的snapshotState方法只有在keyedStateBackend是AbstractKeyedStateBackend类型,而且requiresLegacySynchronousTimerSnapshots为true的条件下才会操作
  • AbstractUdfStreamOperator覆盖了父类AbstractStreamOperator的snapshotState方法,新增了StreamingFunctionUtils.snapshotFunctionState操作,该操作会根据userFunction的类型调用相应的方法(如果实现了CheckpointedFunction接口,则调用CheckpointedFunction.snapshotState,如果实现了ListCheckpointed接口,则调用ListCheckpointed.snapshotState方法)

doc

  • Working with State

聊聊flink的CheckpointScheduler 1相关推荐

  1. 聊聊flink的CheckpointScheduler

    序 本文主要研究一下flink的CheckpointScheduler CheckpointCoordinatorDeActivator flink-runtime_2.11-1.7.0-source ...

  2. 聊聊flink的FsStateBackend

    序 本文主要研究一下flink的FsStateBackend StateBackend flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/r ...

  3. 聊聊flink的HistoryServer

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的HistoryServer HistoryServer flink-1.7.2/flink-runti ...

  4. 聊聊flink的TimeCharacteristic

    为什么80%的码农都做不了架构师?>>>    序 本文主要研究一下flink的TimeCharacteristic TimeCharacteristic flink-streami ...

  5. 聊聊flink JobManager的heap大小设置

    序 本文主要研究一下flink JobManager的heap大小设置 JobManagerOptions flink-core-1.7.1-sources.jar!/org/apache/flink ...

  6. 聊聊flink的InternalTimeServiceManager

    序 本文主要研究一下flink的InternalTimeServiceManager InternalTimeServiceManager flink-streaming-java_2.11-1.7. ...

  7. 聊聊flink的StateTtlConfig

    序 本文主要研究一下flink的StateTtlConfig 实例 import org.apache.flink.api.common.state.StateTtlConfig; import or ...

  8. 聊聊flink的AscendingTimestampExtractor

    序 本文主要研究一下flink的AscendingTimestampExtractor AscendingTimestampExtractor flink-streaming-java_2.11-1. ...

  9. 聊聊flink的NetworkEnvironmentConfiguration

    序 本文主要研究一下flink的NetworkEnvironmentConfiguration NetworkEnvironmentConfiguration flink-1.7.2/flink-ru ...

最新文章

  1. 深入浅出 Java Concurrency (29): 线程池 part 2 Executor 以及Executors[转]
  2. Python词频对比并导入CSV文件
  3. s插件——SlimScroll滚动美化插件
  4. python调用exe程序 传入参数_关于使用c#调用python脚本文件,脚本文件需要传递参数...
  5. 初识ES-es与mysql的概念对比
  6. Anaconda 安装 ml_metrics package
  7. Mysql权限控制 - 允许用户远程连接
  8. c语言 465串口编程,用C语言编写串口程序
  9. 3D 空间音效+空气衰减+人声模糊
  10. LaTex 论文排版(1): Win10 下 LaTex所需软件安装 (Tex live 2018 + Tex studio)
  11. 中国企业云计算应用现状及需求调研报告
  12. 怎么用软件测试相似相似度,文档相似性检测工具
  13. HDFS副本存放机制
  14. 个人申请阿里云ICP备案流程
  15. 如何制作千千静听个性皮肤
  16. DST与Neural Belief Tracker
  17. nodejs常用模块async(waterfall,each,eachSeries,whilst)
  18. 使用谷歌Colab(Colaboratory)免费GPU训练自己的模型及谷歌网盘无限容量(Google drive)申请教程
  19. oracle去掉0x00,Oracle O001 / O00n 进程 100% CPU资源耗用
  20. [VC] 冒号(:)与C/C++

热门文章

  1. mysql查询大量数据报错_mysql 查询大量数据报错
  2. html图片旋转代码_HTML设计一个小程序
  3. doc转docx文件会乱吗_Word文档doc与docx的区别
  4. linux安装tightvnc_tightvnc安装配置,在Linux系统中进行tightvnc安装配置
  5. python 多进程中锁的使用方法
  6. 理解注意力机制的好文二
  7. fastText原理和文本分类实战
  8. AutoML Vision教程:训练模型解决计算机视觉问题,准确率达94.5%
  9. 不同坐标系下角速度_坐标系统及常见坐标系
  10. 【考前必知】软考考前注意事项