一文搞懂 checkpoint 全过程
前言
前面我们讲解了 一文搞懂 Flink 处理 Barrier 全过程 和 一文搞定 Flink Checkpoint Barrier 全流程 基本上都是跟 checkpoint 相关。这次我们就具体看一下 checkpoint 是如何发生的。
正文
跟 checkpoint 相关的起点在 buildGraph
@Deprecatedpublic static ExecutionGraph buildGraph(@Nullable ExecutionGraph prior,JobGraph jobGraph,Configuration jobManagerConfig,ScheduledExecutorService futureExecutor,Executor ioExecutor,SlotProvider slotProvider,ClassLoader classLoader,CheckpointRecoveryFactory recoveryFactory,Time rpcTimeout,RestartStrategy restartStrategy,MetricGroup metrics,int parallelismForAutoMax,BlobWriter blobWriter,Time allocationTimeout,Logger log)throws JobExecutionException, JobException {checkNotNull(jobGraph, "job graph cannot be null");final String jobName = jobGraph.getName();final JobID jobId = jobGraph.getJobID();final FailoverStrategy.Factory failoverStrategy =FailoverStrategyLoader.loadFailoverStrategy(jobManagerConfig, log);final JobInformation jobInformation = new JobInformation(jobId,jobName,jobGraph.getSerializedExecutionConfig(),jobGraph.getJobConfiguration(),jobGraph.getUserJarBlobKeys(),jobGraph.getClasspaths());// create a new execution graph, if none exists so farfinal ExecutionGraph executionGraph;try {executionGraph = (prior != null) ? prior :new ExecutionGraph(jobInformation,futureExecutor,ioExecutor,rpcTimeout,restartStrategy,failoverStrategy,slotProvider,classLoader,blobWriter,allocationTimeout);} catch (IOException e) {throw new JobException("Could not create the ExecutionGraph.", e);}......// configure the state checkpointingJobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();if (snapshotSettings != null) {// 确定哪些 operator chain trigger checkpoint ,哪些 operator chain ack ,哪些 operator chain confirm// 用来 trigger checkpointList<ExecutionJobVertex> triggerVertices =idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);//用来 ack checkpointList<ExecutionJobVertex> ackVertices =idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);//用来 confirm checkpoint List<ExecutionJobVertex> confirmVertices =idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);CompletedCheckpointStore completedCheckpoints;CheckpointIDCounter checkpointIdCounter;try {int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);if (maxNumberOfCheckpointsToRetain <= 0) {// warning and use 1 as the default value if the setting in// state.checkpoints.max-retained-checkpoints is not greater than 0.log.warn("The setting for '{} : {}' is invalid. Using default value of {}",CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(),maxNumberOfCheckpointsToRetain,CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());maxNumberOfCheckpointsToRetain = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();}// HA 会连接 zookeeper maxNumberOfCheckpointsToRetain 保持多少个 checkpoint 默认是一个completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader);checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId);}catch (Exception e) {throw new JobExecutionException(jobId, "Failed to initialize high-availability checkpoint handler", e);}// Maximum number of remembered checkpoints 默认是 10 个int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);// 用户 web 界面显示 checkpoint ack 情况CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(historySize,ackVertices,snapshotSettings.getCheckpointCoordinatorConfiguration(),metrics);// The default directory for externalized checkpointsString externalizedCheckpointsDir = jobManagerConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);......final StateBackend rootBackend;try {// 在 builder executionGraph 确定 state backendrootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(applicationConfiguredBackend, jobManagerConfig, classLoader, log);}catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);}
......final CheckpointCoordinatorConfiguration chkConfig = snapshotSettings.getCheckpointCoordinatorConfiguration();// 结合 checkpoint config,还有// triggerVertices、ackVertices、confirmVertices、state backend、checkpointStatsTracker// 会创建 CheckpointCoordinator 对象executionGraph.enableCheckpointing(chkConfig.getCheckpointInterval(),chkConfig.getCheckpointTimeout(),chkConfig.getMinPauseBetweenCheckpoints(),chkConfig.getMaxConcurrentCheckpoints(),chkConfig.getCheckpointRetentionPolicy(),triggerVertices,ackVertices,confirmVertices,hooks,checkpointIdCounter,completedCheckpoints,rootBackend,checkpointStatsTracker);}......return executionGraph;}
在 build graph 时确定了 triggerVertices ( 用来触发 chekcpoint,也是下面提到的 trigger tasks 往往是 source task operator chains ),ackVertices ( 用来接收 checkpoint 已经完成的报告,也是下面要提到的 ackTasks , 每个需要做 checkpoint 的 operator chain 都会属于它 )以及 confirmVertices ( 用来确认 checkpoint 已经完成, 每个需要做 checkpoint 的 operator chain 都需要 confirm ,这也算是 checkpoint 的二阶段提交了 )。
当 flink 提交 job 时,会启动 CheckpointCoordinator.startCheckpointScheduler 方法
// flink 在启动 job 时,会启 动这个方法public void startCheckpointScheduler() {synchronized (lock) {if (shutdown) {throw new IllegalArgumentException("Checkpoint coordinator is shut down");}// make sure all prior timers are cancelledstopCheckpointScheduler();periodicScheduling = true;long initialDelay = ThreadLocalRandom.current().nextLong(minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);// 定时任务currentPeriodicTrigger = timer.scheduleAtFixedRate(new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);}}
通过一个定时任务来执行 ScheduledTrigger
//触发 checkpointprivate final class ScheduledTrigger implements Runnable {@Overridepublic void run() {try {triggerCheckpoint(System.currentTimeMillis(), true);}catch (Exception e) {LOG.error("Exception while triggering checkpoint for job {}.", job, e);}}}
开始执行 trigger checkpoint
@VisibleForTesting//触发 checkpointpublic CheckpointTriggerResult triggerCheckpoint(long timestamp,CheckpointProperties props,@Nullable String externalSavepointLocation,boolean isPeriodic) {......// check if all tasks that we need to trigger are running.// if not, abort the checkpointExecution[] executions = new Execution[tasksToTrigger.length];for (int i = 0; i < tasksToTrigger.length; i++) {Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();if (ee == null) {LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",tasksToTrigger[i].getTaskNameWithSubtaskIndex(),job);return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);} else if (ee.getState() == ExecutionState.RUNNING) {executions[i] = ee;} else {LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",tasksToTrigger[i].getTaskNameWithSubtaskIndex(),job,ExecutionState.RUNNING,ee.getState());return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);}}// next, check if all tasks that need to acknowledge the checkpoint are running.// if not, abort the checkpointMap<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);for (ExecutionVertex ev : tasksToWaitFor) {Execution ee = ev.getCurrentExecutionAttempt();if (ee != null) {ackTasks.put(ee.getAttemptId(), ev);} else {LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",ev.getTaskNameWithSubtaskIndex(),job);return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);}}......//启动一个checkpoint,但还没有被确认,待所有 task 都确认了本次 checkpoint,那么这个 checkpoint 对象将转化为一个 CompleteCheckpointfinal PendingCheckpoint checkpoint = new PendingCheckpoint(job,checkpointID,timestamp,ackTasks, // 需要 ack checkpoint 的 tasksprops,checkpointStorageLocation,executor);if (statsTracker != null) {PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(checkpointID,timestamp,props);checkpoint.setStatsCallback(callback);}// schedule the timer that will clean up the expired checkpointsfinal Runnable canceller = () -> {synchronized (lock) {// only do the work if the checkpoint is not discarded anyways// note that checkpoint completion discards the pending checkpoint objectif (!checkpoint.isDiscarded()) {LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);checkpoint.abortExpired();pendingCheckpoints.remove(checkpointID);rememberRecentCheckpointId(checkpointID);triggerQueuedRequests();}}};try {// re-acquire the coordinator-wide locksynchronized (lock) {......// end of lock scopefinal CheckpointOptions checkpointOptions = new CheckpointOptions(props.getCheckpointType(),checkpointStorageLocation.getLocationReference());// send the messages to the tasks that trigger their checkpointfor (Execution execution: executions) {//trigger task (operator chain,在产生 ExecutionGraph 是确定的 )// 调用 TaskExecutor.triggerCheckpoint 最终调用 task.triggerCheckpointBarrier// source ->flatMapexecution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);}numUnsuccessfulCheckpointsTriggers.set(0);return new CheckpointTriggerResult(checkpoint);}......} // end trigger lock}
这里有 trigger task 触发 checkpoint 。追踪至 task.triggerCheckpoint
@Override// trigger operator chain task trigger checkpointpublic CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID,long checkpointId,long checkpointTimestamp,CheckpointOptions checkpointOptions) {log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);final Task task = taskSlotTable.getTask(executionAttemptID);if (task != null) {task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);return CompletableFuture.completedFuture(Acknowledge.get());} else {final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';log.debug(message);return FutureUtils.completedExceptionally(new CheckpointException(message));}}
然后就到 triggerCheckpointBarrier 方法了
// trigger operator chain trigger checkpoint 最终触发 triggerCheckpointBarrierpublic void triggerCheckpointBarrier(final long checkpointID,long checkpointTimestamp,final CheckpointOptions checkpointOptions) {//实际上就是 StreamTask Task类实际上是将 checkpoint 委托给了具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码// source ->flatMap// invokable 实际上是 operator chainfinal AbstractInvokable invokable = this.invokable;final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);if (executionState == ExecutionState.RUNNING && invokable != null) {// build a local closurefinal String taskName = taskNameWithSubtask;final SafetyNetCloseableRegistry safetyNetCloseableRegistry =FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();Runnable runnable = new Runnable() {@Overridepublic void run() {// set safety net from the task's context for checkpointing threadLOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);try {// invokable 事实上就是 StreamTask Task 类实际上是将 checkpoint 委托给了更具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);if (!success) {checkpointResponder.declineCheckpoint(getJobID(), getExecutionId(), checkpointID,new CheckpointDeclineTaskNotReadyException(taskName));}} catch (Throwable t) {if (getExecutionState() == ExecutionState.RUNNING) {failExternally(new Exception("Error while triggering checkpoint " + checkpointID + " for " +taskNameWithSubtask, t));} else {LOG.debug("Encountered error while triggering checkpoint {} for " +"{} ({}) while being not in state running.", checkpointID,taskNameWithSubtask, executionId, t);}} finally {FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);}}};executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));} else {LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);// send back a message that we did not do the checkpointcheckpointResponder.declineCheckpoint(jobId, executionId, checkpointID,new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));}}
由 invokable 调用 triggerCheckpoint。由于 trigger task 都是 source operator chain 所以进入 sourceStreamTask
@Overridepublic boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {if (!externallyInducedCheckpoints) {return super.triggerCheckpoint(checkpointMetaData, checkpointOptions);}else {// we do not trigger checkpoints here, we simply state whether we can trigger themsynchronized (getCheckpointLock()) {return isRunning();}}}
具体跟踪到 StreamTask
// trigger opator chain 一路调用到这里,开始出现 barrier (实际上是定时任务 checkpoint 产生的)private boolean performCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics) throws Exception {LOG.debug("Starting checkpoint ({}) {} on task {}",checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());synchronized (lock) {if (isRunning) {// we can do a checkpoint// All of the following steps happen as an atomic step from the perspective of barriers and// records/watermarks/timers/callbacks.// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream// checkpoint alignments// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.// The pre-barrier work should be nothing or minimal in the common case.//注意,从这里开始,整个执行链路上开始出现BarrieroperatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());// Step (2): Send the checkpoint barrier downstream/*反压时,此处会阻塞 source chain do checkpoint,因为会申请内存发送 barrier 到下游,下游的 operator 接收到本 barrier 就会触发其自身的 checkpoint*/operatorChain.broadcastCheckpointBarrier(checkpointMetaData.getCheckpointId(),checkpointMetaData.getTimestamp(),checkpointOptions);// Step (3): Take the state snapshot. This should be largely asynchronous, to not// impact progress of the streaming topology// 执行 checkoint source task chain(trigger task )是直接通过 triggerCheckpoint 来触发 checkpoint 的// 而非 source task chain 是通过 processBarrier 来触发 checkpoint 的checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);return true;}else {// we cannot perform our checkpoint - let the downstream operators know that they// should not wait for any input from this operator// we cannot broadcast the cancellation markers on the 'operator chain', because it may not// yet be createdfinal CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());Exception exception = null;for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter : recordWriters) {try {//类似于 barrier 的另一种消息recordWriter.broadcastEvent(message);} catch (Exception e) {exception = ExceptionUtils.firstOrSuppressed(new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),exception);}}if (exception != null) {throw exception;}return false;}}}
除了首次出现 barrier 并广播 barrier 外,最重要的就是 checkpointState
private void checkpointState(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics) throws Exception {CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(checkpointMetaData.getCheckpointId(),checkpointOptions.getTargetLocation());CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this,checkpointMetaData,checkpointOptions,storage,checkpointMetrics);// 执行 checkpointcheckpointingOperation.executeCheckpointing();}
public void executeCheckpointing() throws Exception {startSyncPartNano = System.nanoTime();try {// 调用 StreamOperator 进行 snapshotState 的入口方法// 先 sourceOperator (flatMap -> source) 再 sinkOperator (sink -> filter)for (StreamOperator<?> op : allOperators) {//对每一个算子进行 snapshotInProgress 并存储至 operatorSnapshotsInProgress// (存储 是异步checkpoint的一个引用) 然后分别进行本地 checkpoint store and jobManager ack// 捕获 barrier 的过程其实就是处理 input 数据的过程,对应着 StreamInputProcessor.processInput() 方法checkpointStreamOperator(op);}if (LOG.isDebugEnabled()) {LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",checkpointMetaData.getCheckpointId(), owner.getName());}startAsyncPartNano = System.nanoTime();checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit//当一个 operator 保存完 checkpoint 数据后,就会启动一个异步对象 AsyncCheckpointRunnable,// 用以报告该检查点已完成,其具体逻辑在 reportCompletedSnapshotStates 中AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(owner,operatorSnapshotsInProgress,checkpointMetaData,checkpointMetrics,startAsyncPartNano);owner.cancelables.registerCloseable(asyncCheckpointRunnable);// 这里注册了一个 Runnable,在执行完 checkpoint 之后向 JobManager 发出 CompletedCheckPoint 消息, ack// 这也是 fault tolerant 两阶段提交的一部分,最后调用 jobMaster 的 acknowledgeCheckpointowner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);if (LOG.isDebugEnabled()) {LOG.debug("{} - finished synchronous part of checkpoint {}. " +"Alignment duration: {} ms, snapshot duration {} ms",owner.getName(), checkpointMetaData.getCheckpointId(),checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,checkpointMetrics.getSyncDurationMillis());}} catch (Exception ex) {// Cleanup to release resourcesfor (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {if (null != operatorSnapshotResult) {try {operatorSnapshotResult.cancel();} catch (Exception e) {LOG.warn("Could not properly cancel an operator snapshot result.", e);}}}if (LOG.isDebugEnabled()) {LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. " +"Alignment duration: {} ms, snapshot duration {} ms",owner.getName(), checkpointMetaData.getCheckpointId(),checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,checkpointMetrics.getSyncDurationMillis());}owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, ex);}}
jobMaster 的 acknowledgeCheckpoint 最终会调用 CheckpointCoordinator.receiveAcknowledgeMessage 方法。如果接受到的消息时 SUCCESS 的话,则会
/*** Try to complete the given pending checkpoint.** <p>Important: This method should only be called in the checkpoint lock scope.** @param pendingCheckpoint to complete* @throws CheckpointException if the completion failed*//*把 pendinCgCheckpoint 转换为 CompletedCheckpoint把 CompletedCheckpoint 加入已完成的检查点集合,并从未完成检查点集合删除该检查点再度向各个 operator 发出 rpc ,通知该检查点已完成*/private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {final long checkpointId = pendingCheckpoint.getCheckpointId();final CompletedCheckpoint completedCheckpoint;// As a first step to complete the checkpoint, we register its state with the registryMap<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();sharedStateRegistry.registerAll(operatorStates.values());try {try {completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();}catch (Exception e1) {// abort the current pending checkpoint if we fails to finalize the pending checkpoint.if (!pendingCheckpoint.isDiscarded()) {pendingCheckpoint.abortError(e1);}throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.', e1);}// the pending checkpoint must be discarded after the finalizationPreconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);try {completedCheckpointStore.addCheckpoint(completedCheckpoint);} catch (Exception exception) {// we failed to store the completed checkpoint. Let's clean upexecutor.execute(new Runnable() {@Overridepublic void run() {try {completedCheckpoint.discardOnFailedStoring();} catch (Throwable t) {LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);}}});throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception);}} finally {pendingCheckpoints.remove(checkpointId);triggerQueuedRequests();}rememberRecentCheckpointId(checkpointId);// drop those pending checkpoints that are at prior to the completed onedropSubsumedCheckpoints(checkpointId);// record the time when this was completed, to calculate// the 'min delay between checkpoints'lastCheckpointCompletionNanos = System.nanoTime();LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());if (LOG.isDebugEnabled()) {StringBuilder builder = new StringBuilder();builder.append("Checkpoint state: ");for (OperatorState state : completedCheckpoint.getOperatorStates().values()) {builder.append(state);builder.append(", ");}// Remove last two chars ", "builder.setLength(builder.length() - 2);LOG.debug(builder.toString());}// send the "notify complete" call to all verticesfinal long timestamp = completedCheckpoint.getTimestamp();//也就是 confirm tasksfor (ExecutionVertex ev : tasksToCommitTo) {Execution ee = ev.getCurrentExecutionAttempt();if (ee != null) {//层层通知对应的算子对 checkpoint 已完成做出响应ee.notifyCheckpointComplete(checkpointId, timestamp);}}}
confirm tasks 层层确认,究竟是如何确认的呢?追踪至 task.notifyCheckpointComplete
@Overridepublic void notifyCheckpointComplete(final long checkpointID) {final AbstractInvokable invokable = this.invokable;if (executionState == ExecutionState.RUNNING && invokable != null) {Runnable runnable = new Runnable() {@Overridepublic void run() {try {// operator chain notify checkpoint complete 调用 StreamTask.notifyCheckpointCompleteinvokable.notifyCheckpointComplete(checkpointID);// operator chain notify checkpoint complete over taskStateManagertaskStateManager.notifyCheckpointComplete(checkpointID);} catch (Throwable t) {if (getExecutionState() == ExecutionState.RUNNING) {// fail task if checkpoint confirmation failed.failExternally(new RuntimeException("Error while confirming checkpoint",t));}}}};executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " +taskNameWithSubtask);} else {LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask);}}
在往下,我们以 kafka 为例,具体可参考
Flink如何保存Offset
至此为此 source task ( trigger task ) 的 checkpoint 已经完成。
前面我们说了,整个流程中首次出现 barrier ,而 barrier 又可以看做是特殊的 msg,广播到下游之后会怎么样呢?具体可以参考
一文搞懂 Flink 处理 Barrier 全过程
我们可以知道 numBarriersReceived + numClosedChannels == totalNumberOfInputChannels 的时候,notifyCheckpoint(receivedBarrier);,最终又会调用 StreamTask.performCheckpoint方法。至此非 source task operator chain 已进行完 checkpoint,循环往复。
一文搞懂 checkpoint 全过程相关推荐
- 一文搞懂RNN(循环神经网络)
基础篇|一文搞懂RNN(循环神经网络) https://mp.weixin.qq.com/s/va1gmavl2ZESgnM7biORQg 神经网络基础 神经网络可以当做是能够拟合任意函数的黑盒子,只 ...
- 一文搞懂 Python 的 import 机制
一.前言 希望能够让读者一文搞懂 Python 的 import 机制 1.什么是 import 机制? 通常来讲,在一段 Python 代码中去执行引用另一个模块中的代码,就需要使用 Python ...
- python语言语句快的标记是什么_一文搞懂Python程序语句
原标题:一文搞懂Python程序语句 程序流 Python 程序中常用的基本数据类型,包括: 内置的数值数据类型 Tuple 容器类型 String 容器类型 List 容器类型 自然的顺序是从页面或 ...
- 一文搞懂 Java 线程中断
转载自 一文搞懂 Java 线程中断 在之前的一文<如何"优雅"地终止一个线程>中详细说明了 stop 终止线程的坏处及如何优雅地终止线程,那么还有别的可以终止线程 ...
- 一文搞懂HMM(隐马尔可夫模型)-Viterbi algorithm
***一文搞懂HMM(隐马尔可夫模型)*** 简单来说,熵是表示物质系统状态的一种度量,用它老表征系统的无序程度.熵越大,系统越无序,意味着系统结构和运动的不确定和无规则:反之,,熵越小,系统越有序, ...
- 一文搞懂如何使用Node.js进行TCP网络通信
摘要: 网络是通信互联的基础,Node.js提供了net.http.dgram等模块,分别用来实现TCP.HTTP.UDP的通信,本文主要对使用Node.js的TCP通信部份进行实践记录. 本文分享自 ...
- 【UE·蓝图底层篇】一文搞懂NativeClass、GeneratedClass、BlueprintClass、ParentClass
本文将对蓝图类UBlueprint的几个UClass成员变量NativeClass.GeneratedClass.BlueprintClass.ParentClass进行比较深入的讲解,看完之后对蓝图 ...
- 一文搞懂AWS EC2, IGW, RT, NAT, SG 基础篇下
B站实操视频更新 跟着拉面学习AWS--EC2, IGW, RT, NAT, SG 简介 长文多图预警,看结论可以直接拖到"总结"部分 本文承接上一篇文章介绍以下 AWS 基础概念 ...
- 一文搞懂CAN FD总线协议帧格式
目录 1.为什么会出现CAN FD? 2.什么是CAN FD? 3.CAN FD和CAN总线协议帧异同 4.解析CAN FD帧结构 4.1.帧起始 4.2.仲裁段 4.3.控制段 4.4.数据段 4. ...
最新文章
- 滴滴重磅开源跨平台统一 MVVM 框架 Chameleon
- C语言的本质(3)——整数的本质与运算
- 2.自定义变量调节器
- 高等数学上-赵立军-北京大学出版社-题解-练习5.3
- 卫星为什么在那么高的太空能看清地面?
- 物联网卡就是流量卡 这说法正确吗
- Linux之镜像源篇
- NBIOT的BC26使用
- Topaz Video Enhance AI(ai视频画质增强软件)官方正式版V2.2.0 | AI视频放大软件下载 | 视频画质怎么变清晰?
- 网络钓鱼(Phishing)攻击方式
- 关于语言发育迟缓的孩子
- Python练习题:根据一段单词,找出其中的最长单词
- HJ87 密码强度等级(一把过)
- python实时监控文件目录_教你三种方法,用 Python实时监控文件
- IDEA 多Moudle指定当前Moudle的JDK版本
- 总结PHP中几种常用的网页跳转代码
- 知识图谱入门 (七) 知识推理
- 电子标签拣选专家+自动分拣系统 +WMS系统+电子标签亮灯拣选系统,仓库整体解决方案
- 四轴飞行器建模及控制(一)
- 7-9 集合相似度 (25分)
热门文章
- (一)JQuery动态加载js的三种方法
- java图片和视频上传_java实现上传和读取图片(视频)
- c++ 按键暂停继续 程序_C++ 实现按任意键继续~~~
- X射线衍射方法测量残余应力原理与方法(黄继武老师亲授)
- MySQL procedure详解
- JCVI安装以及数据下载(用于共线性分析)
- 【原创】谈谈服务雪崩、降级与熔断
- 华为手机最大屏是几英寸的_余承东:华为智慧屏将有三款,最大 75 英寸
- java弱连接和强连接,移动互联网时代的强连接与弱连接
- Geant4 不完全学习指南4(用户借口类intercoms 简单归纳)