前言

前面我们讲解了 一文搞懂 Flink 处理 Barrier 全过程 和 一文搞定 Flink Checkpoint Barrier 全流程 基本上都是跟 checkpoint 相关。这次我们就具体看一下 checkpoint 是如何发生的。

正文

跟 checkpoint 相关的起点在 buildGraph

@Deprecatedpublic static ExecutionGraph buildGraph(@Nullable ExecutionGraph prior,JobGraph jobGraph,Configuration jobManagerConfig,ScheduledExecutorService futureExecutor,Executor ioExecutor,SlotProvider slotProvider,ClassLoader classLoader,CheckpointRecoveryFactory recoveryFactory,Time rpcTimeout,RestartStrategy restartStrategy,MetricGroup metrics,int parallelismForAutoMax,BlobWriter blobWriter,Time allocationTimeout,Logger log)throws JobExecutionException, JobException {checkNotNull(jobGraph, "job graph cannot be null");final String jobName = jobGraph.getName();final JobID jobId = jobGraph.getJobID();final FailoverStrategy.Factory failoverStrategy =FailoverStrategyLoader.loadFailoverStrategy(jobManagerConfig, log);final JobInformation jobInformation = new JobInformation(jobId,jobName,jobGraph.getSerializedExecutionConfig(),jobGraph.getJobConfiguration(),jobGraph.getUserJarBlobKeys(),jobGraph.getClasspaths());// create a new execution graph, if none exists so farfinal ExecutionGraph executionGraph;try {executionGraph = (prior != null) ? prior :new ExecutionGraph(jobInformation,futureExecutor,ioExecutor,rpcTimeout,restartStrategy,failoverStrategy,slotProvider,classLoader,blobWriter,allocationTimeout);} catch (IOException e) {throw new JobException("Could not create the ExecutionGraph.", e);}......// configure the state checkpointingJobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();if (snapshotSettings != null) {// 确定哪些 operator chain trigger checkpoint ,哪些 operator chain ack ,哪些 operator chain confirm// 用来 trigger checkpointList<ExecutionJobVertex> triggerVertices =idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);//用来 ack checkpointList<ExecutionJobVertex> ackVertices =idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);//用来 confirm checkpoint List<ExecutionJobVertex> confirmVertices =idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);CompletedCheckpointStore completedCheckpoints;CheckpointIDCounter checkpointIdCounter;try {int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);if (maxNumberOfCheckpointsToRetain <= 0) {// warning and use 1 as the default value if the setting in// state.checkpoints.max-retained-checkpoints is not greater than 0.log.warn("The setting for '{} : {}' is invalid. Using default value of {}",CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(),maxNumberOfCheckpointsToRetain,CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());maxNumberOfCheckpointsToRetain = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();}// HA 会连接 zookeeper maxNumberOfCheckpointsToRetain 保持多少个 checkpoint 默认是一个completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader);checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId);}catch (Exception e) {throw new JobExecutionException(jobId, "Failed to initialize high-availability checkpoint handler", e);}// Maximum number of remembered checkpoints 默认是 10 个int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);// 用户 web 界面显示 checkpoint ack 情况CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(historySize,ackVertices,snapshotSettings.getCheckpointCoordinatorConfiguration(),metrics);// The default directory for externalized checkpointsString externalizedCheckpointsDir = jobManagerConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);......final StateBackend rootBackend;try {// 在 builder executionGraph 确定 state backendrootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(applicationConfiguredBackend, jobManagerConfig, classLoader, log);}catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);}
......final CheckpointCoordinatorConfiguration chkConfig = snapshotSettings.getCheckpointCoordinatorConfiguration();// 结合 checkpoint config,还有// triggerVertices、ackVertices、confirmVertices、state backend、checkpointStatsTracker// 会创建 CheckpointCoordinator 对象executionGraph.enableCheckpointing(chkConfig.getCheckpointInterval(),chkConfig.getCheckpointTimeout(),chkConfig.getMinPauseBetweenCheckpoints(),chkConfig.getMaxConcurrentCheckpoints(),chkConfig.getCheckpointRetentionPolicy(),triggerVertices,ackVertices,confirmVertices,hooks,checkpointIdCounter,completedCheckpoints,rootBackend,checkpointStatsTracker);}......return executionGraph;}

在 build graph 时确定了 triggerVertices ( 用来触发 chekcpoint,也是下面提到的 trigger tasks 往往是 source task operator chains ),ackVertices ( 用来接收 checkpoint 已经完成的报告,也是下面要提到的 ackTasks , 每个需要做 checkpoint 的 operator chain 都会属于它 )以及 confirmVertices ( 用来确认 checkpoint 已经完成, 每个需要做 checkpoint 的 operator chain 都需要 confirm ,这也算是 checkpoint 的二阶段提交了 )。
当 flink 提交 job 时,会启动 CheckpointCoordinator.startCheckpointScheduler 方法

// flink 在启动 job 时,会启 动这个方法public void startCheckpointScheduler() {synchronized (lock) {if (shutdown) {throw new IllegalArgumentException("Checkpoint coordinator is shut down");}// make sure all prior timers are cancelledstopCheckpointScheduler();periodicScheduling = true;long initialDelay = ThreadLocalRandom.current().nextLong(minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);// 定时任务currentPeriodicTrigger = timer.scheduleAtFixedRate(new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);}}

通过一个定时任务来执行 ScheduledTrigger

//触发 checkpointprivate final class ScheduledTrigger implements Runnable {@Overridepublic void run() {try {triggerCheckpoint(System.currentTimeMillis(), true);}catch (Exception e) {LOG.error("Exception while triggering checkpoint for job {}.", job, e);}}}

开始执行 trigger checkpoint

@VisibleForTesting//触发 checkpointpublic CheckpointTriggerResult triggerCheckpoint(long timestamp,CheckpointProperties props,@Nullable String externalSavepointLocation,boolean isPeriodic) {......// check if all tasks that we need to trigger are running.// if not, abort the checkpointExecution[] executions = new Execution[tasksToTrigger.length];for (int i = 0; i < tasksToTrigger.length; i++) {Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();if (ee == null) {LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",tasksToTrigger[i].getTaskNameWithSubtaskIndex(),job);return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);} else if (ee.getState() == ExecutionState.RUNNING) {executions[i] = ee;} else {LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",tasksToTrigger[i].getTaskNameWithSubtaskIndex(),job,ExecutionState.RUNNING,ee.getState());return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);}}// next, check if all tasks that need to acknowledge the checkpoint are running.// if not, abort the checkpointMap<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);for (ExecutionVertex ev : tasksToWaitFor) {Execution ee = ev.getCurrentExecutionAttempt();if (ee != null) {ackTasks.put(ee.getAttemptId(), ev);} else {LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",ev.getTaskNameWithSubtaskIndex(),job);return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);}}......//启动一个checkpoint,但还没有被确认,待所有 task 都确认了本次 checkpoint,那么这个 checkpoint 对象将转化为一个 CompleteCheckpointfinal PendingCheckpoint checkpoint = new PendingCheckpoint(job,checkpointID,timestamp,ackTasks, // 需要 ack checkpoint 的 tasksprops,checkpointStorageLocation,executor);if (statsTracker != null) {PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(checkpointID,timestamp,props);checkpoint.setStatsCallback(callback);}// schedule the timer that will clean up the expired checkpointsfinal Runnable canceller = () -> {synchronized (lock) {// only do the work if the checkpoint is not discarded anyways// note that checkpoint completion discards the pending checkpoint objectif (!checkpoint.isDiscarded()) {LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);checkpoint.abortExpired();pendingCheckpoints.remove(checkpointID);rememberRecentCheckpointId(checkpointID);triggerQueuedRequests();}}};try {// re-acquire the coordinator-wide locksynchronized (lock) {......// end of lock scopefinal CheckpointOptions checkpointOptions = new CheckpointOptions(props.getCheckpointType(),checkpointStorageLocation.getLocationReference());// send the messages to the tasks that trigger their checkpointfor (Execution execution: executions) {//trigger task (operator chain,在产生 ExecutionGraph 是确定的 )// 调用 TaskExecutor.triggerCheckpoint 最终调用 task.triggerCheckpointBarrier// source ->flatMapexecution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);}numUnsuccessfulCheckpointsTriggers.set(0);return new CheckpointTriggerResult(checkpoint);}......} // end trigger lock}

这里有 trigger task 触发 checkpoint 。追踪至 task.triggerCheckpoint

@Override// trigger operator chain task trigger checkpointpublic CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID,long checkpointId,long checkpointTimestamp,CheckpointOptions checkpointOptions) {log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);final Task task = taskSlotTable.getTask(executionAttemptID);if (task != null) {task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);return CompletableFuture.completedFuture(Acknowledge.get());} else {final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';log.debug(message);return FutureUtils.completedExceptionally(new CheckpointException(message));}}

然后就到 triggerCheckpointBarrier 方法了

// trigger operator chain trigger checkpoint  最终触发 triggerCheckpointBarrierpublic void triggerCheckpointBarrier(final long checkpointID,long checkpointTimestamp,final CheckpointOptions checkpointOptions) {//实际上就是 StreamTask  Task类实际上是将 checkpoint 委托给了具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码// source ->flatMap// invokable 实际上是 operator chainfinal AbstractInvokable invokable = this.invokable;final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);if (executionState == ExecutionState.RUNNING && invokable != null) {// build a local closurefinal String taskName = taskNameWithSubtask;final SafetyNetCloseableRegistry safetyNetCloseableRegistry =FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();Runnable runnable = new Runnable() {@Overridepublic void run() {// set safety net from the task's context for checkpointing threadLOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);try {// invokable 事实上就是 StreamTask Task 类实际上是将 checkpoint 委托给了更具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);if (!success) {checkpointResponder.declineCheckpoint(getJobID(), getExecutionId(), checkpointID,new CheckpointDeclineTaskNotReadyException(taskName));}} catch (Throwable t) {if (getExecutionState() == ExecutionState.RUNNING) {failExternally(new Exception("Error while triggering checkpoint " + checkpointID + " for " +taskNameWithSubtask, t));} else {LOG.debug("Encountered error while triggering checkpoint {} for " +"{} ({}) while being not in state running.", checkpointID,taskNameWithSubtask, executionId, t);}} finally {FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);}}};executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));} else {LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);// send back a message that we did not do the checkpointcheckpointResponder.declineCheckpoint(jobId, executionId, checkpointID,new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));}}

由 invokable 调用 triggerCheckpoint。由于 trigger task 都是 source operator chain 所以进入 sourceStreamTask

@Overridepublic boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {if (!externallyInducedCheckpoints) {return super.triggerCheckpoint(checkpointMetaData, checkpointOptions);}else {// we do not trigger checkpoints here, we simply state whether we can trigger themsynchronized (getCheckpointLock()) {return isRunning();}}}

具体跟踪到 StreamTask

// trigger opator chain 一路调用到这里,开始出现 barrier (实际上是定时任务 checkpoint 产生的)private boolean performCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics) throws Exception {LOG.debug("Starting checkpoint ({}) {} on task {}",checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());synchronized (lock) {if (isRunning) {// we can do a checkpoint// All of the following steps happen as an atomic step from the perspective of barriers and// records/watermarks/timers/callbacks.// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream// checkpoint alignments// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.//           The pre-barrier work should be nothing or minimal in the common case.//注意,从这里开始,整个执行链路上开始出现BarrieroperatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());// Step (2): Send the checkpoint barrier downstream/*反压时,此处会阻塞 source chain do checkpoint,因为会申请内存发送 barrier 到下游,下游的 operator 接收到本 barrier 就会触发其自身的 checkpoint*/operatorChain.broadcastCheckpointBarrier(checkpointMetaData.getCheckpointId(),checkpointMetaData.getTimestamp(),checkpointOptions);// Step (3): Take the state snapshot. This should be largely asynchronous, to not//           impact progress of the streaming topology// 执行 checkoint source task chain(trigger task )是直接通过 triggerCheckpoint 来触发 checkpoint 的// 而非 source task chain 是通过 processBarrier 来触发 checkpoint 的checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);return true;}else {// we cannot perform our checkpoint - let the downstream operators know that they// should not wait for any input from this operator// we cannot broadcast the cancellation markers on the 'operator chain', because it may not// yet be createdfinal CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());Exception exception = null;for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter : recordWriters) {try {//类似于 barrier 的另一种消息recordWriter.broadcastEvent(message);} catch (Exception e) {exception = ExceptionUtils.firstOrSuppressed(new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),exception);}}if (exception != null) {throw exception;}return false;}}}

除了首次出现 barrier 并广播 barrier 外,最重要的就是 checkpointState

private void checkpointState(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics) throws Exception {CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(checkpointMetaData.getCheckpointId(),checkpointOptions.getTargetLocation());CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this,checkpointMetaData,checkpointOptions,storage,checkpointMetrics);// 执行 checkpointcheckpointingOperation.executeCheckpointing();}
public void executeCheckpointing() throws Exception {startSyncPartNano = System.nanoTime();try {// 调用 StreamOperator 进行 snapshotState 的入口方法// 先 sourceOperator (flatMap -> source) 再 sinkOperator (sink -> filter)for (StreamOperator<?> op : allOperators) {//对每一个算子进行 snapshotInProgress 并存储至 operatorSnapshotsInProgress// (存储 是异步checkpoint的一个引用) 然后分别进行本地 checkpoint store and jobManager ack// 捕获 barrier 的过程其实就是处理 input 数据的过程,对应着 StreamInputProcessor.processInput() 方法checkpointStreamOperator(op);}if (LOG.isDebugEnabled()) {LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",checkpointMetaData.getCheckpointId(), owner.getName());}startAsyncPartNano = System.nanoTime();checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit//当一个 operator 保存完 checkpoint 数据后,就会启动一个异步对象 AsyncCheckpointRunnable,// 用以报告该检查点已完成,其具体逻辑在 reportCompletedSnapshotStates 中AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(owner,operatorSnapshotsInProgress,checkpointMetaData,checkpointMetrics,startAsyncPartNano);owner.cancelables.registerCloseable(asyncCheckpointRunnable);// 这里注册了一个 Runnable,在执行完 checkpoint 之后向 JobManager 发出 CompletedCheckPoint 消息, ack// 这也是 fault tolerant 两阶段提交的一部分,最后调用 jobMaster 的 acknowledgeCheckpointowner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);if (LOG.isDebugEnabled()) {LOG.debug("{} - finished synchronous part of checkpoint {}. " +"Alignment duration: {} ms, snapshot duration {} ms",owner.getName(), checkpointMetaData.getCheckpointId(),checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,checkpointMetrics.getSyncDurationMillis());}} catch (Exception ex) {// Cleanup to release resourcesfor (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {if (null != operatorSnapshotResult) {try {operatorSnapshotResult.cancel();} catch (Exception e) {LOG.warn("Could not properly cancel an operator snapshot result.", e);}}}if (LOG.isDebugEnabled()) {LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. " +"Alignment duration: {} ms, snapshot duration {} ms",owner.getName(), checkpointMetaData.getCheckpointId(),checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,checkpointMetrics.getSyncDurationMillis());}owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, ex);}}

jobMaster 的 acknowledgeCheckpoint 最终会调用 CheckpointCoordinator.receiveAcknowledgeMessage 方法。如果接受到的消息时 SUCCESS 的话,则会

/*** Try to complete the given pending checkpoint.** <p>Important: This method should only be called in the checkpoint lock scope.** @param pendingCheckpoint to complete* @throws CheckpointException if the completion failed*//*把 pendinCgCheckpoint 转换为 CompletedCheckpoint把 CompletedCheckpoint 加入已完成的检查点集合,并从未完成检查点集合删除该检查点再度向各个 operator 发出 rpc ,通知该检查点已完成*/private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {final long checkpointId = pendingCheckpoint.getCheckpointId();final CompletedCheckpoint completedCheckpoint;// As a first step to complete the checkpoint, we register its state with the registryMap<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();sharedStateRegistry.registerAll(operatorStates.values());try {try {completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();}catch (Exception e1) {// abort the current pending checkpoint if we fails to finalize the pending checkpoint.if (!pendingCheckpoint.isDiscarded()) {pendingCheckpoint.abortError(e1);}throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.', e1);}// the pending checkpoint must be discarded after the finalizationPreconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);try {completedCheckpointStore.addCheckpoint(completedCheckpoint);} catch (Exception exception) {// we failed to store the completed checkpoint. Let's clean upexecutor.execute(new Runnable() {@Overridepublic void run() {try {completedCheckpoint.discardOnFailedStoring();} catch (Throwable t) {LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);}}});throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception);}} finally {pendingCheckpoints.remove(checkpointId);triggerQueuedRequests();}rememberRecentCheckpointId(checkpointId);// drop those pending checkpoints that are at prior to the completed onedropSubsumedCheckpoints(checkpointId);// record the time when this was completed, to calculate// the 'min delay between checkpoints'lastCheckpointCompletionNanos = System.nanoTime();LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());if (LOG.isDebugEnabled()) {StringBuilder builder = new StringBuilder();builder.append("Checkpoint state: ");for (OperatorState state : completedCheckpoint.getOperatorStates().values()) {builder.append(state);builder.append(", ");}// Remove last two chars ", "builder.setLength(builder.length() - 2);LOG.debug(builder.toString());}// send the "notify complete" call to all verticesfinal long timestamp = completedCheckpoint.getTimestamp();//也就是 confirm tasksfor (ExecutionVertex ev : tasksToCommitTo) {Execution ee = ev.getCurrentExecutionAttempt();if (ee != null) {//层层通知对应的算子对 checkpoint 已完成做出响应ee.notifyCheckpointComplete(checkpointId, timestamp);}}}

confirm tasks 层层确认,究竟是如何确认的呢?追踪至 task.notifyCheckpointComplete

@Overridepublic void notifyCheckpointComplete(final long checkpointID) {final AbstractInvokable invokable = this.invokable;if (executionState == ExecutionState.RUNNING && invokable != null) {Runnable runnable = new Runnable() {@Overridepublic void run() {try {// operator chain notify checkpoint complete 调用 StreamTask.notifyCheckpointCompleteinvokable.notifyCheckpointComplete(checkpointID);// operator chain notify checkpoint complete over taskStateManagertaskStateManager.notifyCheckpointComplete(checkpointID);} catch (Throwable t) {if (getExecutionState() == ExecutionState.RUNNING) {// fail task if checkpoint confirmation failed.failExternally(new RuntimeException("Error while confirming checkpoint",t));}}}};executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " +taskNameWithSubtask);} else {LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask);}}

在往下,我们以 kafka 为例,具体可参考
Flink如何保存Offset

至此为此 source task ( trigger task ) 的 checkpoint 已经完成。

前面我们说了,整个流程中首次出现 barrier ,而 barrier 又可以看做是特殊的 msg,广播到下游之后会怎么样呢?具体可以参考
一文搞懂 Flink 处理 Barrier 全过程
我们可以知道 numBarriersReceived + numClosedChannels == totalNumberOfInputChannels 的时候,notifyCheckpoint(receivedBarrier);,最终又会调用 StreamTask.performCheckpoint方法。至此非 source task operator chain 已进行完 checkpoint,循环往复。

一文搞懂 checkpoint 全过程相关推荐

  1. 一文搞懂RNN(循环神经网络)

    基础篇|一文搞懂RNN(循环神经网络) https://mp.weixin.qq.com/s/va1gmavl2ZESgnM7biORQg 神经网络基础 神经网络可以当做是能够拟合任意函数的黑盒子,只 ...

  2. 一文搞懂 Python 的 import 机制

    一.前言 希望能够让读者一文搞懂 Python 的 import 机制 1.什么是 import 机制? 通常来讲,在一段 Python 代码中去执行引用另一个模块中的代码,就需要使用 Python ...

  3. python语言语句快的标记是什么_一文搞懂Python程序语句

    原标题:一文搞懂Python程序语句 程序流 Python 程序中常用的基本数据类型,包括: 内置的数值数据类型 Tuple 容器类型 String 容器类型 List 容器类型 自然的顺序是从页面或 ...

  4. 一文搞懂 Java 线程中断

    转载自   一文搞懂 Java 线程中断 在之前的一文<如何"优雅"地终止一个线程>中详细说明了 stop 终止线程的坏处及如何优雅地终止线程,那么还有别的可以终止线程 ...

  5. 一文搞懂HMM(隐马尔可夫模型)-Viterbi algorithm

    ***一文搞懂HMM(隐马尔可夫模型)*** 简单来说,熵是表示物质系统状态的一种度量,用它老表征系统的无序程度.熵越大,系统越无序,意味着系统结构和运动的不确定和无规则:反之,,熵越小,系统越有序, ...

  6. 一文搞懂如何使用Node.js进行TCP网络通信

    摘要: 网络是通信互联的基础,Node.js提供了net.http.dgram等模块,分别用来实现TCP.HTTP.UDP的通信,本文主要对使用Node.js的TCP通信部份进行实践记录. 本文分享自 ...

  7. 【UE·蓝图底层篇】一文搞懂NativeClass、GeneratedClass、BlueprintClass、ParentClass

    本文将对蓝图类UBlueprint的几个UClass成员变量NativeClass.GeneratedClass.BlueprintClass.ParentClass进行比较深入的讲解,看完之后对蓝图 ...

  8. 一文搞懂AWS EC2, IGW, RT, NAT, SG 基础篇下

    B站实操视频更新 跟着拉面学习AWS--EC2, IGW, RT, NAT, SG 简介 长文多图预警,看结论可以直接拖到"总结"部分 本文承接上一篇文章介绍以下 AWS 基础概念 ...

  9. 一文搞懂CAN FD总线协议帧格式

    目录 1.为什么会出现CAN FD? 2.什么是CAN FD? 3.CAN FD和CAN总线协议帧异同 4.解析CAN FD帧结构 4.1.帧起始 4.2.仲裁段 4.3.控制段 4.4.数据段 4. ...

最新文章

  1. 滴滴重磅开源跨平台统一 MVVM 框架 Chameleon
  2. C语言的本质(3)——整数的本质与运算
  3. 2.自定义变量调节器
  4. 高等数学上-赵立军-北京大学出版社-题解-练习5.3
  5. 卫星为什么在那么高的太空能看清地面?
  6. 物联网卡就是流量卡 这说法正确吗
  7. Linux之镜像源篇
  8. NBIOT的BC26使用
  9. Topaz Video Enhance AI(ai视频画质增强软件)官方正式版V2.2.0 | AI视频放大软件下载 | 视频画质怎么变清晰?
  10. 网络钓鱼(Phishing)攻击方式
  11. 关于语言发育迟缓的孩子
  12. Python练习题:根据一段单词,找出其中的最长单词
  13. HJ87 密码强度等级(一把过)
  14. python实时监控文件目录_教你三种方法,用 Python实时监控文件
  15. IDEA 多Moudle指定当前Moudle的JDK版本
  16. 总结PHP中几种常用的网页跳转代码
  17. 知识图谱入门 (七) 知识推理
  18. 电子标签拣选专家+自动分拣系统 +WMS系统+电子标签亮灯拣选系统,仓库整体解决方案
  19. 四轴飞行器建模及控制(一)
  20. 7-9 集合相似度 (25分)

热门文章

  1. (一)JQuery动态加载js的三种方法
  2. java图片和视频上传_java实现上传和读取图片(视频)
  3. c++ 按键暂停继续 程序_C++ 实现按任意键继续~~~
  4. X射线衍射方法测量残余应力原理与方法(黄继武老师亲授)
  5. MySQL procedure详解
  6. JCVI安装以及数据下载(用于共线性分析)
  7. 【原创】谈谈服务雪崩、降级与熔断
  8. 华为手机最大屏是几英寸的_余承东:华为智慧屏将有三款,最大 75 英寸
  9. java弱连接和强连接,移动互联网时代的强连接与弱连接
  10. Geant4 不完全学习指南4(用户借口类intercoms 简单归纳)