1 内部是用ScheduledThreadPoolExecutor实现的

2

CheckpointCoordinator.java

/*** Triggers a new standard checkpoint and uses the given timestamp as the checkpoint* timestamp.** @param timestamp The timestamp for the checkpoint.* @param isPeriodic Flag indicating whether this triggered checkpoint is* periodic. If this flag is true, but the periodic scheduler is disabled,* the checkpoint will be declined.* @return <code>true</code> if triggering the checkpoint succeeded.*/public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {try {triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false);return true;} catch (CheckpointException e) {long latestGeneratedCheckpointId = getCheckpointIdCounter().get();// here we can not get the failed pending checkpoint's id,// so we pass the negative latest generated checkpoint id as a special flagfailureManager.handleJobLevelCheckpointException(e, -1 * latestGeneratedCheckpointId);return false;}}@VisibleForTestingpublic PendingCheckpoint triggerCheckpoint(long timestamp,CheckpointProperties props,@Nullable String externalSavepointLocation,boolean isPeriodic,boolean advanceToEndOfTime) throws CheckpointException {if (advanceToEndOfTime && !(props.isSynchronous() && props.isSavepoint())) {throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");}// make some eager pre-checkssynchronized (lock) {preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());}// check if all tasks that we need to trigger are running.// if not, abort the checkpointExecution[] executions = new Execution[tasksToTrigger.length];for (int i = 0; i < tasksToTrigger.length; i++) {Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();if (ee == null) {LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",tasksToTrigger[i].getTaskNameWithSubtaskIndex(),job);throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);} else if (ee.getState() == ExecutionState.RUNNING) {executions[i] = ee;} else {LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",tasksToTrigger[i].getTaskNameWithSubtaskIndex(),job,ExecutionState.RUNNING,ee.getState());throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);}}// next, check if all tasks that need to acknowledge the checkpoint are running.// if not, abort the checkpointMap<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);for (ExecutionVertex ev : tasksToWaitFor) {Execution ee = ev.getCurrentExecutionAttempt();if (ee != null) {ackTasks.put(ee.getAttemptId(), ev);} else {LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",ev.getTaskNameWithSubtaskIndex(),job);throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);}}// we will actually trigger this checkpoint!// we lock with a special lock to make sure that trigger requests do not overtake each other.// this is not done with the coordinator-wide lock, because the 'checkpointIdCounter'// may issue blocking operations. Using a different lock than the coordinator-wide lock,// we avoid blocking the processing of 'acknowledge/decline' messages during that time.synchronized (triggerLock) {final CheckpointStorageLocation checkpointStorageLocation;final long checkpointID;try {// this must happen outside the coordinator-wide lock, because it communicates// with external services (in HA mode) and may block for a while.checkpointID = checkpointIdCounter.getAndIncrement();checkpointStorageLocation = props.isSavepoint() ?checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :checkpointStorage.initializeLocationForCheckpoint(checkpointID);}catch (Throwable t) {int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).",job,numUnsuccessful,t);throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t);}final PendingCheckpoint checkpoint = new PendingCheckpoint(job,checkpointID,timestamp,ackTasks,props,checkpointStorageLocation,executor);if (statsTracker != null) {PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(checkpointID,timestamp,props);checkpoint.setStatsCallback(callback);}// schedule the timer that will clean up the expired checkpointsfinal Runnable canceller = () -> {synchronized (lock) {// only do the work if the checkpoint is not discarded anyways// note that checkpoint completion discards the pending checkpoint objectif (!checkpoint.isDiscarded()) {LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);failPendingCheckpoint(checkpoint, CheckpointFailureReason.CHECKPOINT_EXPIRED);pendingCheckpoints.remove(checkpointID);rememberRecentCheckpointId(checkpointID);triggerQueuedRequests();}}};try {// re-acquire the coordinator-wide locksynchronized (lock) {// since we released the lock in the meantime, we need to re-check// that the conditions still hold.preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);pendingCheckpoints.put(checkpointID, checkpoint);ScheduledFuture<?> cancellerHandle = timer.schedule(canceller,checkpointTimeout, TimeUnit.MILLISECONDS);if (!checkpoint.setCancellerHandle(cancellerHandle)) {// checkpoint is already disposed!cancellerHandle.cancel(false);}// trigger the master hooks for the checkpointfinal List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(),checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout));for (MasterState s : masterStates) {checkpoint.addMasterState(s);}}// end of lock scopefinal CheckpointOptions checkpointOptions = new CheckpointOptions(props.getCheckpointType(),checkpointStorageLocation.getLocationReference());// send the messages to the tasks that trigger their checkpointfor (Execution execution: executions) {if (props.isSynchronous()) {execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);} else {execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);}}numUnsuccessfulCheckpointsTriggers.set(0);return checkpoint;}catch (Throwable t) {// guard the map against concurrent modificationssynchronized (lock) {pendingCheckpoints.remove(checkpointID);}int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",checkpointID, job, numUnsuccessful, t);if (!checkpoint.isDiscarded()) {failPendingCheckpoint(checkpoint, CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, t);}try {checkpointStorageLocation.disposeOnFailure();} catch (Throwable t2) {LOG.warn("Cannot dispose failed checkpoint storage location {}", checkpointStorageLocation, t2);}// rethrow the CheckpointException directly.if (t instanceof CheckpointException) {throw (CheckpointException) t;}throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t);}}}//做5个检查,都符合条件才继续;反之,有一个不符合都抛出异常,无法继续checkpointprivate void preCheckBeforeTriggeringCheckpoint(boolean isPeriodic, boolean forceCheckpoint) throws CheckpointException {// abort if the coordinator has been shutdown in the meantimeif (shutdown) {throw new CheckpointException(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);}// Don't allow periodic checkpoint if scheduling has been disabledif (isPeriodic && !periodicScheduling) {throw new CheckpointException(CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN);}if (!forceCheckpoint) {if (triggerRequestQueued) {LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);throw new CheckpointException(CheckpointFailureReason.ALREADY_QUEUED);}checkConcurrentCheckpoints();checkMinPauseBetweenCheckpoints();}}

3

package org.apache.flink.runtime.checkpoint;/*** Various reasons why a checkpoint was failure.*/
public enum CheckpointFailureReason {PERIODIC_SCHEDULER_SHUTDOWN(true, "Periodic checkpoint scheduler is shut down."),ALREADY_QUEUED(true, "Another checkpoint request has already been queued."),TOO_MANY_CONCURRENT_CHECKPOINTS(true, "The maximum number of concurrent checkpoints is exceeded"),MINIMUM_TIME_BETWEEN_CHECKPOINTS(true, "The minimum time between checkpoints is still pending. " +"Checkpoint will be triggered after the minimum time."),NOT_ALL_REQUIRED_TASKS_RUNNING(true, "Not all required tasks are currently running."),EXCEPTION(true, "An Exception occurred while triggering the checkpoint."),CHECKPOINT_EXPIRED(false, "Checkpoint expired before completing."),CHECKPOINT_SUBSUMED(false, "Checkpoint has been subsumed."),CHECKPOINT_DECLINED(false, "Checkpoint was declined."),CHECKPOINT_DECLINED_TASK_NOT_READY(false, "Checkpoint was declined (tasks not ready)"),CHECKPOINT_DECLINED_TASK_NOT_CHECKPOINTING(false, "Task does not support checkpointing"),CHECKPOINT_DECLINED_SUBSUMED(false, "Checkpoint was canceled because a barrier from newer checkpoint was received."),CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER(false, "Task received cancellation from one of its inputs"),CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED(false, "The checkpoint alignment phase needed to buffer more than the configured maximum bytes"),CHECKPOINT_DECLINED_INPUT_END_OF_STREAM(false, "Checkpoint was declined because one input stream is finished"),CHECKPOINT_COORDINATOR_SHUTDOWN(false, "CheckpointCoordinator shutdown."),CHECKPOINT_COORDINATOR_SUSPEND(false, "Checkpoint Coordinator is suspending."),JOB_FAILURE(false, "The job has failed."),JOB_FAILOVER_REGION(false, "FailoverRegion is restarting."),TASK_CHECKPOINT_FAILURE(false, "Task local checkpoint failure."),FINALIZE_CHECKPOINT_FAILURE(false, "Failure to finalize checkpoint."),TRIGGER_CHECKPOINT_FAILURE(false, "Trigger checkpoint failure.");// ------------------------------------------------------------------------private final boolean preFlight;private final String message;CheckpointFailureReason(boolean isPreFlight, String message) {this.preFlight = isPreFlight;this.message = message;}public String message() {return message;}/*** @return true if this value indicates a failure reason happening before a checkpoint is passed to a job's tasks.*/public boolean isPreFlight() {return preFlight;}
}

Flink 1.9.2

Flink CheckpointCoordinator 步骤 流程 源码相关推荐

  1. 【flink】Flink 1.12.2 源码浅析 :Task数据输出

    1.概述 转载:Flink 1.12.2 源码浅析 :Task数据输出 Stream的计算模型采用的是PUSH模式, 上游主动向下游推送数据, 上下游之间采用生产者-消费者模式, 下游收到数据触发计算 ...

  2. 【flink】Flink 1.12.2 源码浅析 : Task 浅析

    1.概述 转载:Flink 1.12.2 源码浅析 : Task 浅析 Task 表示TaskManager上并行 subtask 的一次执行. Task封装了一个Flink operator(也可能 ...

  3. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 TaskMasger 启动

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [四] 上一篇: [flink]Flink 1.12.2 源码浅析 : yarn-per-job模式解析 Jo ...

  4. 【flink】Flink 1.12.2 源码浅析 : Task数据输入

    1.概述 转载:Flink 1.12.2 源码浅析 : Task数据输入 在 Task 中,InputGate 是对输入的封装,InputGate 是和 JobGraph 中 JobEdge 一一对应 ...

  5. 【flink】Flink 1.12.2 源码浅析 : StreamTask 浅析

    1.概述 转载:Flink 1.12.2 源码浅析 : StreamTask 浅析 在Task类的doRun方法中, 首先会构建一个运行环境变量RuntimeEnvironment . 然后会调用lo ...

  6. 【Flink】Flink 1.12.2 源码浅析 : TaskExecutor

    1.概述 转载:Flink 1.12.2 源码浅析 : TaskExecutor TaskExecutor 是TaskManger的具体实现. 二 .TaskExecutorGateway TaskE ...

  7. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 JobMasger启动 YarnJobClusterEntrypoint

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [三] 上一章:[flink]Flink 1.12.2 源码浅析 : yarn-per-job模式解析 yar ...

  8. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 yarn 提交过程解析

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [二] 请大家看原文去. 接上文Flink 1.12.2 源码分析 : yarn-per-job模式浅析 [一 ...

  9. 【flink】Flink 1.12.2 源码浅析 : yarn-per-job模式解析 从脚本到主类

    1.概述 转载:Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [一] 可以去看原文.这里是补充专栏.请看原文 2. 前言 主要针对yarn-per-job模式进行代码分析. ...

  10. Flink 1.12.2 源码浅析 : yarn-per-job模式解析 [二]

    . 一 .前言 二 .启动解析 2.1. StreamExecutionEnvironment#execute 2.2. StreamExecutionEnvironment#executeAsync ...

最新文章

  1. Flex+J2EE 之小记
  2. Apk文件破解反编译(转)
  3. 第一章:火狐浏览器 : 环境配置: FireFox 版本38 + jdk 7 + selenium 2.53.6 + selenum-version 2.48.2...
  4. 百练OJ:2742:统计字符数
  5. oracle 实例用法,Oracle merge into用法以及相关例子示例
  6. Windows 10 to Go
  7. 【Python-3.5】绘制随机漫步图
  8. C#中导出Execl
  9. Linux驱动开发|UART驱动
  10. Django 官方文档中文3.2 4.0
  11. eventlet绿化和patch原理
  12. 火山PC浏览文件和选择文件-通用对话框教程
  13. 为互联网IT人打造的中文版awesome-go
  14. 浅谈北京市IT行业现状及就业前景——暑期实践调研
  15. 韦东山freeRTOS系列教程之【第二章】内存管理
  16. Groovy脚本基础全攻略
  17. QQ邮箱取消免费扩容;苹果搜索引擎“胎死腹中”,核心成员已回归谷歌麾下;Xcode 14导致应用体积大增|极客头条
  18. 【设计模式】一、是什么,为什么,怎么学
  19. 程序员遇到 Bug 时的 30 个反应,你是哪一种?
  20. struct hdr_cmn

热门文章

  1. 计算机论文里的代码查重时被标红怎么办呢?
  2. 网站安全检测漏洞扫描系统邮件安全
  3. 浏览器主页被篡改解决方法
  4. ipad html 自定义裁剪图片大小,如何在iPhone或iPad上裁剪和编辑照片 | MOS86
  5. 盘点一道窗口函数的数据分析面试题
  6. 【听课笔记】复旦大学遗传学_10肿瘤遗传学
  7. ios dyld: Library not loaded: @rpath/xxx.framework/xxx 之根本原因
  8. codeforces-1009D Relatively Prime Graph
  9. 计算机网络职业生涯规划书模板前言,计算机网络技术专业个人职业生涯规划书(参考模板).doc...
  10. 计算机c类地址是什么,ip地址中属于c类地址的是什么