Jobmanager的submitJob逻辑,

/*** Submits a job to the job manager. The job is registered at the libraryCacheManager which* creates the job's class loader. The job graph is appended to the corresponding execution* graph and the execution vertices are queued for scheduling.** @param jobGraph representing the Flink job* @param jobInfo the job info* @param isRecovery Flag indicating whether this is a recovery or initial submission*/private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = {if (jobGraph == null) {jobInfo.notifyClients(decorateMessage(JobResultFailure(new SerializedThrowable(new JobSubmissionException(null, "JobGraph must not be null.")))))}else {val jobId = jobGraph.getJobIDval jobName = jobGraph.getNamevar executionGraph: ExecutionGraph = nulltry {// Important: We need to make sure that the library registration is the first action,// because this makes sure that the uploaded jar files are removed in case of// unsuccessfultry {libraryCacheManager.registerJob(jobGraph.getJobID, jobGraph.getUserJarBlobKeys,jobGraph.getClasspaths)}var userCodeLoader = libraryCacheManager.getClassLoader(jobGraph.getJobID) //加载Jar
val restartStrategy = //加载重启策略
          Option(jobGraph.getSerializedExecutionConfig().deserializeValue(userCodeLoader).getRestartStrategy()).map(RestartStrategyFactory.createRestartStrategy).filter(p => p != null) match {case Some(strategy) => strategycase None => restartStrategyFactory.createRestartStrategy()}val jobMetrics = jobManagerMetricGroup match { //生成job manager metric groupcase Some(group) =>group.addJob(jobGraph) match {case (jobGroup:Any) => jobGroupcase null => new UnregisteredMetricsGroup()}case None =>new UnregisteredMetricsGroup()}val numSlots = scheduler.getTotalNumberOfSlots() //现有的slots数目// see if there already exists an ExecutionGraph for the corresponding job IDval registerNewGraph = currentJobs.get(jobGraph.getJobID) match {case Some((graph, currentJobInfo)) =>executionGraph = graphcurrentJobInfo.setLastActive()falsecase None =>true}executionGraph = ExecutionGraphBuilder.buildGraph( //build ExecutionGraph
          executionGraph,jobGraph,flinkConfiguration,futureExecutor,ioExecutor,userCodeLoader,checkpointRecoveryFactory,Time.of(timeout.length, timeout.unit),restartStrategy,jobMetrics,numSlots,log.logger)if (registerNewGraph) { //如果是新的JobGraph,注册到currentJobs
          currentJobs.put(jobGraph.getJobID, (executionGraph, jobInfo))}// get notified about job status changesexecutionGraph.registerJobStatusListener( //jobmananger加到通知listenersnew StatusListenerMessenger(self, leaderSessionID.orNull))jobInfo.clients foreach { //client加到通知listeners// the sender wants to be notified about state changescase (client, ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) =>val listener  = new StatusListenerMessenger(client, leaderSessionID.orNull)executionGraph.registerExecutionListener(listener)executionGraph.registerJobStatusListener(listener)case _ => // do nothing
        }} catch { //失败case t: Throwable =>log.error(s"Failed to submit job $jobId ($jobName)", t)libraryCacheManager.unregisterJob(jobId)currentJobs.remove(jobId)if (executionGraph != null) {executionGraph.fail(t) //fail executionGraph
          }val rt: Throwable = if (t.isInstanceOf[JobExecutionException]) {t} else {new JobExecutionException(jobId, s"Failed to submit job $jobId ($jobName)", t)}jobInfo.notifyClients(decorateMessage(JobResultFailure(new SerializedThrowable(rt)))) //通知提交失败return}//上面是准备executionGraph,下面是异步提交// execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously// because it is a blocking operation
      future {try {if (isRecovery) {// this is a recovery of a master failure (this master takes over)executionGraph.restoreLatestCheckpointedState(false, false) //加载checkpoint状态
          }else {// load a savepoint only if this is not starting from a newer checkpoint// as part of an master failure recoveryval savepointSettings = jobGraph.getSavepointRestoreSettingsif (savepointSettings.restoreSavepoint()) { //处理savePointtry {val savepointPath = savepointSettings.getRestorePath()val allowNonRestored = savepointSettings.allowNonRestoredState()log.info(s"Starting job from savepoint '$savepointPath'" +(if (allowNonRestored) " (allowing non restored state)" else "") + ".")// load the savepoint as a checkpoint into the systemval savepoint: CompletedCheckpoint = SavepointLoader.loadAndValidateSavepoint(jobId,executionGraph.getAllVertices,savepointPath,executionGraph.getUserClassLoader,allowNonRestored)executionGraph.getCheckpointCoordinator.getCheckpointStore.addCheckpoint(savepoint)// Reset the checkpoint ID counterval nextCheckpointId: Long = savepoint.getCheckpointID + 1log.info(s"Reset the checkpoint ID to $nextCheckpointId")executionGraph.getCheckpointCoordinator.getCheckpointIdCounter.setCount(nextCheckpointId)executionGraph.restoreLatestCheckpointedState(true, allowNonRestored)} catch {case e: Exception =>jobInfo.notifyClients(decorateMessage(JobResultFailure(new SerializedThrowable(e))))throw new SuppressRestartsException(e)}}try {submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo)) //存储该JobGraph到zk,ZooKeeperSubmittedJobGraphStore} catch {case t: Throwable =>// Don't restart the execution if this fails. Otherwise, the// job graph will skip ZooKeeper in case of HA.
                jobInfo.notifyClients(decorateMessage(JobResultFailure(new SerializedThrowable(t))))throw new SuppressRestartsException(t)}}jobInfo.notifyClients(decorateMessage(JobSubmitSuccess(jobGraph.getJobID))) //通知clients提交成功if (leaderElectionService.hasLeadership) {// There is a small chance that multiple job managers schedule the same job after if// they try to recover at the same time. This will eventually be noticed, but can not be// ruled out from the beginning.// NOTE: Scheduling the job for execution is a separate action from the job submission.// The success of submitting the job must be independent from the success of scheduling// the job.log.info(s"Scheduling job $jobId ($jobName).")executionGraph.scheduleForExecution(scheduler) //开始调度} else {// Remove the job graph. Otherwise it will be lingering around and possibly removed from// ZooKeeper by this JM.self ! decorateMessage(RemoveJob(jobId, removeJobFromStateBackend = false))log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " +"this. I am not scheduling the job for execution.")}} catch {case t: Throwable => try {executionGraph.fail(t)} catch {case tt: Throwable =>log.error("Error while marking ExecutionGraph as failed.", tt)}}}(context.dispatcher)}}

可以看到executionGraph在调度前就已经通知用户提交成功

当job发生问题,需要调用到tryRestartOrFail

private boolean tryRestartOrFail() {JobStatus currentState = state;if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {synchronized (progressLock) { //锁final boolean isFailureCauseAllowingRestart = !(failureCause instanceof SuppressRestartsException);final boolean isRestartStrategyAllowingRestart = restartStrategy.canRestart(); //重启策略是否允许重启boolean isRestartable = isFailureCauseAllowingRestart && isRestartStrategyAllowingRestart;if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) {restartStrategy.restart(this);return true;} else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) { //如果不允许重启,就failedfinal List<String> reasonsForNoRestart = new ArrayList<>(2);if (!isFailureCauseAllowingRestart) {reasonsForNoRestart.add("a type of SuppressRestartsException was thrown");}if (!isRestartStrategyAllowingRestart) {reasonsForNoRestart.add("the restart strategy prevented it");}LOG.info("Could not restart the job {} ({}) because {}.", getJobName(), getJobID(),StringUtils.join(reasonsForNoRestart, " and "), failureCause);postRunCleanup();return true;} else {// we must have changed the state concurrently, thus we cannot complete this operationreturn false;}}} else {// this operation is only allowed in the state FAILING or RESTARTINGreturn false;}}

有两处会调用到tryRestartOrFail

1. ExecutionGraph.jobVertexInFinalState

void jobVertexInFinalState() {synchronized (progressLock) {if (numFinishedJobVertices >= verticesInCreationOrder.size()) {throw new IllegalStateException("All vertices are already finished, cannot transition vertex to finished.");}numFinishedJobVertices++;if (numFinishedJobVertices == verticesInCreationOrder.size()) { //当所有的vertices都已经finished// we are done, transition to the final state
            JobStatus current;while (true) {current = this.state;if (current == JobStatus.RUNNING) {if (transitionState(current, JobStatus.FINISHED)) {postRunCleanup();break;}}else if (current == JobStatus.CANCELLING) {if (transitionState(current, JobStatus.CANCELED)) {postRunCleanup();break;}}else if (current == JobStatus.FAILING) {if (tryRestartOrFail()) { //如果failing,调用tryRestartOrFailbreak;}// concurrent job status change, let's check again}

2. 显式的调用到ExecutionGraph.fail

} else if (current == JobStatus.RESTARTING) {this.failureCause = t;if (tryRestartOrFail()) {return;}// concurrent job status change, let's check again
}

上面调用到restartStrategy.restart(this);

restartStrategy有很多种,我们先看看

FixedDelayRestartStrategy
@Overridepublic void restart(final ExecutionGraph executionGraph) {currentRestartAttempt++;FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayBetweenRestartAttempts), executionGraph.getFutureExecutor());}

异步的调用,ExecutionGraphRestarter.restartWithDelay

最终调用到

executionGraph.restart();
public void restart() {try {synchronized (progressLock) {this.currentExecutions.clear();Collection<CoLocationGroup> colGroups = new HashSet<>();for (ExecutionJobVertex jv : this.verticesInCreationOrder) {CoLocationGroup cgroup = jv.getCoLocationGroup();if(cgroup != null && !colGroups.contains(cgroup)){cgroup.resetConstraints();colGroups.add(cgroup);}jv.resetForNewExecution();}for (int i = 0; i < stateTimestamps.length; i++) {if (i != JobStatus.RESTARTING.ordinal()) {// Only clear the non restarting state in order to preserve when the job was// restarted. This is needed for the restarting time gaugestateTimestamps[i] = 0;}}numFinishedJobVertices = 0;transitionState(JobStatus.RESTARTING, JobStatus.CREATED);// if we have checkpointed state, reload it into the executionsif (checkpointCoordinator != null) {checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false);}}scheduleForExecution(slotProvider); //加入schedule
        }catch (Throwable t) {LOG.warn("Failed to restart the job.", t);fail(t);}}

关于重启策略,

参考https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/restart_strategies.html

If checkpointing is not enabled, the “no restart” strategy is used. If checkpointing is activated and the restart strategy has not been configured, the fixed-delay strategy is used with Integer.MAX_VALUE restart attempts.

StreamingJobGraphGenerator
private void configureCheckpointing() {CheckpointConfig cfg = streamGraph.getCheckpointConfig();long interval = cfg.getCheckpointInterval();if (interval > 0) {// check if a restart strategy has been set, if not then set the FixedDelayRestartStrategyif (streamGraph.getExecutionConfig().getRestartStrategy() == null) {// if the user enabled checkpointing, the default number of exec retries is infinite.
                streamGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, DEFAULT_RESTART_DELAY));}}

当打开checkpoint的时候,默认是使用fixedDelayRestart,并Integer.MAX_VALUE次重启

转载于:https://www.cnblogs.com/fxjwind/p/6386201.html

Flink – submitJob相关推荐

  1. Flink – JobManager.submitJob

    JobManager作为actor, case SubmitJob(jobGraph, listeningBehaviour) =>val client = sender()val jobInf ...

  2. 从flink-example分析flink组件(3)WordCount 流式实战及源码分析

    前面介绍了批量处理的WorkCount是如何执行的 <从flink-example分析flink组件(1)WordCount batch实战及源码分析> <从flink-exampl ...

  3. 追源索骥:透过源码看懂Flink核心框架的执行流程

    https://www.cnblogs.com/bethunebtj/p/9168274.html 追源索骥:透过源码看懂Flink核心框架的执行流程 前言 1.从 Hello,World WordC ...

  4. Flink JAR包上传和运行逻辑

    https://blog.csdn.net/xianzhen376/article/details/86774348 文章目录 说明 启动ResetServer 注册Handler Upload JA ...

  5. flink 运行一段时间 内存溢出_Flink之运行时环境

    Flink 运行时环境由两种类型进程组成,JobManager和TaskManager JobManager,也称为 master,用于协调分布式执行.负责调度任务,检查点,失败恢复等. TaskMa ...

  6. flink sql client读取hive时卡住

    问题复现如下: 查看$FLINK_HOME/log/flink-appleyuchi-sql-client-Desktop.log 2020-12-23 11:48:56,811 INFO  org. ...

  7. flink中akka的使用 以jobClient提交任务为例子

    在flink中,集群内部的组件之间通过akka来互相通信,其中采用了akka中的actor模型. 当需要提交一个可用的任务交由jobManager来处理并分配资源时,将会在ClusterClinet中 ...

  8. 【Flink】Flink 源码之ExecutionGraph

    1.概述 以前的一个老文章基于 Flink 1.9版本的,现在是基于flink 1.13版本的. 参考:95-230-028-源码-WordCount走读-获取ExecutionGraph 本文转载: ...

  9. flink运行job任务时报错 Could not retrieve the execution result

    flink运行job任务时报错 org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the e ...

  10. 追源索骥:透过源码看懂Flink核心框架的执行流程--来自GitHub

    追源索骥:透过源码看懂Flink核心框架的执行流程 联系qq2499496272可进行删除,需要文件版本的私聊!!~ 文章目录 追源索骥:透过源码看懂Flink核心框架的执行流程 前言 1.从 ~~H ...

最新文章

  1. 使用两个ThreadPool
  2. WebMisDeveloper4.2.0面世
  3. 寒假每日一题(提高组)【Week 3 完结】
  4. python图形设置_python学习笔记——基本图形绘制
  5. 回归素材(part8)--python机器学习算法
  6. cfg桩设备型号_什么是CFG桩?带您看下CFG桩施工工艺及流程,检测项目
  7. 牛客练习赛70 重新排列
  8. 超详细CookieSession的原理与用法
  9. java调用tuxedo中间件,BEA-TUXEDO中间件介绍.ppt
  10. android gridview 选择变色 再点击还原 并支持多选。记录贴 01
  11. 吊销 BTChina 营业执照”后元旦之前可能相继落马的“影视下载”网站名单
  12. 某辆汽车有一个里程表,该里程表可以显示一个整数,为该车走过的公里数。然而这个里程表有个毛病:它总是从3变到5,而跳过数字4,里程表所有位(个位、 十位、百位等)上的数字都是如此
  13. android的app图标大全,安卓app图标
  14. 《炬丰科技-半导体工艺》半导体封装中金丝键合技术
  15. 数据结构-图、二叉树、B(+)树
  16. qq浏览器HTML5在哪,qq浏览器wifi助手功能在哪里?
  17. java全栈系列之JavaSE-面向对象(方法重写)037
  18. JQuery对CheckBox的一些相关操作
  19. 三年级信息技术用计算机娱乐,“第5课 用计算机娱乐”教学设计
  20. 计算机桌面上的照片转pdf免费,如何把图片转化为pdf,图片转换pdf工具推荐

热门文章

  1. 使用人脸客户端库快速实现对面部的分析---C#
  2. pytest源码_pytest文档60pytest.main()的使用
  3. java se 6 mac_Mac OS X “打开xx软件, 你需要一个Java SE 6运行环境”问题解决
  4. CentOS / RHEL Cachefiles 加速网络文件系统NFS访问速度
  5. 201671010128 2017-10-08《Java程序设计》之接口与内部类
  6. TimesTen 应用层数据库缓存学习:4. 仅仅读缓存
  7. 阿里云推出企业级智能协同办公方案 云桌面、云AP、云客服一应俱全
  8. 如果在文档已完成加载后执行 document.write,整个 HTML 页面将被覆盖
  9. 《指针的编程艺术(第二版)》一3.10 程序实战
  10. 指数有限的子群存在一个右陪集代表元系,同时也是左陪集代表元系