1.概述

以前的一个老文章基于 Flink 1.9版本的,现在是基于flink 1.13版本的。

参考:95-230-028-源码-WordCount走读-获取ExecutionGraph

本文转载:Flink源码分析系列文档目录

2.从JobGraph到ExecutionGraph

JobGraph通过Dispatcher.submitJob方法提交。这是后续流程的入口方法。该方法调用了Dispatcher.internalSubmitJob,然后是Dispatcher.persistAndRunJob。

Dispatcher.persistAndRunJob方法存储并执行作业。如下所示:

private void persistAndRunJob(JobGraph jobGraph) throws Exception {jobGraphWriter.putJobGraph(jobGraph);runJob(jobGraph, ExecutionType.SUBMISSION);
}

Dispatcher.runJob接收JobGraph和执行类型两个参数。执行类型有两种:提交任务(SUBMISSION)和恢复任务(RECOVERY)。

private void runJob(JobGraph jobGraph, ExecutionType executionType) {// 确保JobID对应的这个作业目前不在运行状态,避免重复提交Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));// 获取启动时时间戳long initializationTimestamp = System.currentTimeMillis();// 这里将JobManagerRunner创建出来// JobManagerRunner接下来会构造出JobManagerCompletableFuture<JobManagerRunner> jobManagerRunnerFuture =createJobManagerRunner(jobGraph, initializationTimestamp);// 包装JobGraph相关信息供Dispatcher使用DispatcherJob dispatcherJob =DispatcherJob.createFor(jobManagerRunnerFuture,jobGraph.getJobID(),jobGraph.getName(),initializationTimestamp);// 将当前作业的ID加入runningJob集合// 表示当前作业已处于运行状态runningJobs.put(jobGraph.getJobID(), dispatcherJob);final JobID jobId = jobGraph.getJobID();// 处理Job派发结果final CompletableFuture<CleanupJobState> cleanupJobStateFuture =dispatcherJob.getResultFuture().handleAsync((dispatcherJobResult, throwable) -> {Preconditions.checkState(runningJobs.get(jobId) == dispatcherJob,"The job entry in runningJobs must be bound to the lifetime of the DispatcherJob.");if (dispatcherJobResult != null) {return handleDispatcherJobResult(jobId, dispatcherJobResult, executionType);} else {return dispatcherJobFailed(jobId, throwable);}},getMainThreadExecutor());// 作业停止的时候,将JobID从runningJob中移除final CompletableFuture<Void> jobTerminationFuture =cleanupJobStateFuture.thenApply(cleanupJobState -> removeJob(jobId, cleanupJobState)).thenCompose(Function.identity());// 将作业ID和对应的作业停止future加入到dispatcherJobTerminationFutures集合维护FutureUtils.assertNoException(jobTerminationFuture);registerDispatcherJobTerminationFuture(jobId, jobTerminationFuture);
}

接下来是Dispatcher.createJobManagerRunner方法。

JobManager在Dispatcher中被创建出来,然后启动。创建JobManager的逻辑在createJobManagerRunner方法中,如下所示:

CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph, long initializationTimestamp) {final RpcService rpcService = getRpcService();return CompletableFuture.supplyAsync(() -> {try {// 使用工厂类创建JobManager// 传入了JobGraph和高可用服务JobManagerRunner runner =jobManagerRunnerFactory.createJobManagerRunner(jobGraph,configuration,rpcService,highAvailabilityServices,heartbeatServices,jobManagerSharedServices,new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),fatalErrorHandler,initializationTimestamp);// 启动JobManager// 实际上为启动JobManager的leader选举服务,选出JM主节点runner.start();return runner;} catch (Exception e) {throw new CompletionException(new JobInitializationException(jobGraph.getJobID(),"Could not instantiate JobManager.",e));}},ioExecutor); // do not use main thread executor. Otherwise, Dispatcher is blocked on// JobManager creation
}

此时,JobManager开始进行leader竞选活动。为了确保JobManager不存在单点故障问题,Flink设计了JobManager 高可用,可以同时运行多个JobManager实例。在Standalone部署方式中,JobManager的竞选通过Zookeeper来实现。Yarn集群模式下则通过Yarn的ApplicationMaster失败后自动重启动方式来确保JobManager的高可用。有关leader选举的内容请参见Flink 源码之leader选举(Zookeeper方式)。

一旦leader JM被选举出来,选举服务会调用对应JM的grantLeadership方法。该方法内容如下所示:

@Override
public void grantLeadership(final UUID leaderSessionID) {synchronized (lock) {if (shutdown) {log.debug("JobManagerRunner cannot be granted leadership because it is already shut down.");return;}leadershipOperation =leadershipOperation.thenRun(ThrowingRunnable.unchecked(() -> {synchronized (lock) {// 主要逻辑是这个// 检查作业调度状态并启动JobManagerverifyJobSchedulingStatusAndStartJobManager(leaderSessionID);}}));handleException(leadershipOperation, "Could not start the job manager.");}
}

接着我们跟踪到verifyJobSchedulingStatusAndStartJobManager方法。

@GuardedBy("lock")
private void verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId)throws FlinkException {// 如果JobManager已停止,直接返回if (shutdown) {log.debug("Ignoring starting JobMaster because JobManagerRunner is already shut down.");return;}// 从JobRegistry中获取Job调度状态final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus =getJobSchedulingStatus();// 如果作业已执行完毕// 调用作业执行完毕逻辑(实际上是作业未被当前JobManager完成运行的逻辑)if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE) {jobAlreadyDone();} else {// 启动JobMasterstartJobMaster(leaderSessionId);}
}

现在逻辑流转到了JobManagerRunnerImpl.startJobMaster方法。

该方法启动JobMaster。注册JobGraph,启动JobMaster服务并确认该JobMaster为leader。

@GuardedBy("lock")
private void startJobMaster(UUID leaderSessionId) throws FlinkException {log.info("JobManager runner for job {} ({}) was granted leadership with session id {}.",jobGraph.getName(),jobGraph.getJobID(),leaderSessionId);try {// 注册JobGraph// 根据集群部署形式(Standalone,Zookeeper或K8s),采用不同的方式存储JobIDrunningJobsRegistry.setJobRunning(jobGraph.getJobID());} catch (IOException e) {throw new FlinkException(String.format("Failed to set the job %s to running in the running jobs registry.",jobGraph.getJobID()),e);}// 启动JobMaster服务startJobMasterServiceSafely(leaderSessionId);// 确认该JobMaster是leader状态if (jobMasterService != null) {confirmLeaderSessionIdIfStillLeader(jobMasterService, leaderSessionId);}
}

JobManagerRunnerImpl.startJobMasterServiceSafely紧接着通过

DefaultJobMasterServiceFactory.createJobMasterService方法,创建出JobMaster并启动他的Rpc通信服务。

接下来。在JobMaster构造函数中存在构建Flink作业任务调度器的逻辑。JobMaster.createScheduler方法调用了

DefaultSlotPoolServiceSchedulerFactory.createScheduler创建Flink的调度器。该方法又调用了Scheduler工厂类的创建Scheduler实例这个方法DefaultSchedulerFactory.createInstance。

接下来的流程到了DefaultScheduler中。DefaultScheduler是Flink作业调度器的默认实现。它继承了SchedulerBase,SchedulerBase又实现了SchedulerNG接口。

SchedulerBase构造函数中调用了createAndRestoreExecutionGraph方法。

SchedulerBase.createAndRestoreExecutionGraph代码如下所示:

private ExecutionGraph createAndRestoreExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup,CompletedCheckpointStore completedCheckpointStore,CheckpointsCleaner checkpointsCleaner,CheckpointIDCounter checkpointIdCounter,ShuffleMaster<?> shuffleMaster,JobMasterPartitionTracker partitionTracker,ExecutionDeploymentTracker executionDeploymentTracker,long initializationTimestamp,ComponentMainThreadExecutor mainThreadExecutor,JobStatusListener jobStatusListener)throws Exception {// 创建ExecutionGraphExecutionGraph newExecutionGraph =createExecutionGraph(currentJobManagerJobMetricGroup,completedCheckpointStore,checkpointsCleaner,checkpointIdCounter,shuffleMaster,partitionTracker,executionDeploymentTracker,initializationTimestamp);// 获取ExecutionGraph中创建出的CheckpointCoordinator// 创建CheckpointCoordinator的过程后面章节有说明final CheckpointCoordinator checkpointCoordinator =newExecutionGraph.getCheckpointCoordinator();if (checkpointCoordinator != null) {// check whether we find a valid checkpoint// 检查是否存在一个最近的checkpointif (!checkpointCoordinator.restoreInitialCheckpointIfPresent(new HashSet<>(newExecutionGraph.getAllVertices().values()))) {// check whether we can restore from a savepoint// 如果有,尝试从这个检查点恢复tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());}}// 设置任务失败监听器newExecutionGraph.setInternalTaskFailuresListener(new UpdateSchedulerNgOnInternalFailuresListener(this));// 设置作业状态监听器newExecutionGraph.registerJobStatusListener(jobStatusListener);// 设置JobMaster的主线程ThreadExecutornewExecutionGraph.start(mainThreadExecutor);return newExecutionGraph;
}

SchedulerBase.createExecutionGraph方法调用DefaultExecutionGraphBuilder,创建出ExecutionGraph。代码如下所示:

private ExecutionGraph createExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup,CompletedCheckpointStore completedCheckpointStore,CheckpointsCleaner checkpointsCleaner,CheckpointIDCounter checkpointIdCounter,ShuffleMaster<?> shuffleMaster,final JobMasterPartitionTracker partitionTracker,ExecutionDeploymentTracker executionDeploymentTracker,long initializationTimestamp)throws JobExecutionException, JobException {// 创建Execution部署监听器ExecutionDeploymentListener executionDeploymentListener =new ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);//创建Execution状态更新监听器ExecutionStateUpdateListener executionStateUpdateListener =(execution, newState) -> {if (newState.isTerminal()) {executionDeploymentTracker.stopTrackingDeploymentOf(execution);}};// 创建ExecutionGraphreturn DefaultExecutionGraphBuilder.buildGraph(jobGraph,jobMasterConfiguration,futureExecutor,ioExecutor,userCodeLoader,completedCheckpointStore,checkpointsCleaner,checkpointIdCounter,rpcTimeout,currentJobManagerJobMetricGroup,blobWriter,log,shuffleMaster,partitionTracker,TaskDeploymentDescriptorFactory.PartitionLocationConstraint.fromJobType(jobGraph.getJobType()),executionDeploymentListener,executionStateUpdateListener,initializationTimestamp,new DefaultVertexAttemptNumberStore());
}

ExecutionGraph相关概念
ExecutionGraph为Flink作业的物理执行计划。用来协调数据流的分布式执行过程。

和StreamGraph,JobGraph不同的是,ExecutionGraph是在JobManager中生成。

从ExecutionGraph也有顶点(Vertex)的概念,ExecutionGraph中的vertex为ExecutionJobVertex,和JobGraph中的JobVertex对应。从ExecutionGraph到JobGraph的过程中加入了并行度的概念,ExecutionJobVertex包含了与之对应的JobVertex中所有的并行任务。ExecutionJobVertex之中每一个并行的任务由ExecutionVertex代表。也就是说一个ExecutionJobVertex具有多少并行度,它下面就包含多少个ExecutionVertex。

ExecutionVertex可以被执行一次或多次(由于任务恢复,重计算或更新配置)ExecutionVertex的每一次执行都会生成一个Execution对象。Execution负责跟踪ExecutionVertex的任务执行状态变化和资源使用状况。

IntermediateResult和JobGraph中JobVertex的IntermediateDataSet的概念对应,用于表示两个相邻的ExecutionJobVertex之间数据传输过程中的临时存放点。IntermediateResult在ExecutionJobVertex创建的时候被构建出来,数量和该vertex的并行度一致。

DefaultExecutionGraphBuilder的buildGraph方法

public static ExecutionGraph buildGraph(JobGraph jobGraph,Configuration jobManagerConfig,ScheduledExecutorService futureExecutor,Executor ioExecutor,ClassLoader classLoader,CompletedCheckpointStore completedCheckpointStore,CheckpointsCleaner checkpointsCleaner,CheckpointIDCounter checkpointIdCounter,Time rpcTimeout,MetricGroup metrics,BlobWriter blobWriter,Logger log,ShuffleMaster<?> shuffleMaster,JobMasterPartitionTracker partitionTracker,TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,ExecutionDeploymentListener executionDeploymentListener,ExecutionStateUpdateListener executionStateUpdateListener,long initializationTimestamp,VertexAttemptNumberStore vertexAttemptNumberStore)throws JobExecutionException, JobException {checkNotNull(jobGraph, "job graph cannot be null");// 获取作业名称和作业IDfinal String jobName = jobGraph.getName();final JobID jobId = jobGraph.getJobID();// 创建JobInformation// JobInformation为ExecutionGraph中的job相关配置信息的封装类final JobInformation jobInformation =new JobInformation(jobId,jobName,jobGraph.getSerializedExecutionConfig(),jobGraph.getJobConfiguration(),jobGraph.getUserJarBlobKeys(),jobGraph.getClasspaths());// 获取保留在历史记录中的最大重试次数final int maxPriorAttemptsHistoryLength =jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);// 获取IntermediateResultPartition释放策略工厂类final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory =PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(jobManagerConfig);// create a new execution graph, if none exists so far// 创建ExecutionGraph,后面章节分析final DefaultExecutionGraph executionGraph;try {executionGraph =new DefaultExecutionGraph(jobInformation,futureExecutor,ioExecutor,rpcTimeout,maxPriorAttemptsHistoryLength,classLoader,blobWriter,partitionReleaseStrategyFactory,shuffleMaster,partitionTracker,partitionLocationConstraint,executionDeploymentListener,executionStateUpdateListener,initializationTimestamp,vertexAttemptNumberStore);} catch (IOException e) {throw new JobException("Could not create the ExecutionGraph.", e);}// set the basic propertiestry {// 设置json格式的执行计划executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));} catch (Throwable t) {log.warn("Cannot create JSON plan for job", t);// give the graph an empty plan// 如果根据jobGraph生成json执行计划失败,设置一个空的执行计划executionGraph.setJsonPlan("{}");}// initialize the vertices that have a master initialization hook// file output formats create directories here, input formats create splitsfinal long initMasterStart = System.nanoTime();log.info("Running initialization on master for job {} ({}).", jobName, jobId);for (JobVertex vertex : jobGraph.getVertices()) {// 获取节点调用的类名,即节点的taskString executableClass = vertex.getInvokableClassName();// 确保每个节点的调用类必须存在if (executableClass == null || executableClass.isEmpty()) {throw new JobSubmissionException(jobId,"The vertex "+ vertex.getID()+ " ("+ vertex.getName()+ ") has no invokable class.");}try {// 根据不同的节点类型,调用job启动时节点的任务逻辑vertex.initializeOnMaster(classLoader);} catch (Throwable t) {throw new JobExecutionException(jobId,"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(),t);}}log.info("Successfully ran initialization on master in {} ms.",(System.nanoTime() - initMasterStart) / 1_000_000);// topologically sort the job vertices and attach the graph to the existing one// 按照拓扑结构(数据流的顺序)排序,获取所有的Job顶点List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();if (log.isDebugEnabled()) {log.debug("Adding {} vertices from job graph {} ({}).",sortedTopology.size(),jobName,jobId);}// executionGraph绑定所有的Job节点executionGraph.attachJobGraph(sortedTopology);if (log.isDebugEnabled()) {log.debug("Successfully created execution graph from job graph {} ({}).", jobName, jobId);}// configure the state checkpointing// 配置checkpoint// 如果启用了checkpointif (isCheckpointingEnabled(jobGraph)) {// 从JobGraph获取checkpoint的配置// snapshotSettings的配置位于StreamingJobGraphGeneratorJobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();// 获取所有触发checkpoint的顶点,即所有的数据输入顶点List<ExecutionJobVertex> triggerVertices =idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);// 获取所有需要checkpoint确认的顶点,即所有的顶点List<ExecutionJobVertex> ackVertices =idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);// 获取所有需要接收到提交checkpoint信息的顶点,即所有的顶点List<ExecutionJobVertex> confirmVertices =idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);// Maximum number of remembered checkpoints// 获取历史记录checkpoint最大数量int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);// 创建checkpoint状态跟踪器,和CheckpointCoordinator配合工作CheckpointStatsTracker checkpointStatsTracker =new CheckpointStatsTracker(historySize,ackVertices,snapshotSettings.getCheckpointCoordinatorConfiguration(),metrics);// load the state backend from the application settings// 获取状态后端的配置final StateBackend applicationConfiguredBackend;final SerializedValue<StateBackend> serializedAppConfigured =snapshotSettings.getDefaultStateBackend();if (serializedAppConfigured == null) {applicationConfiguredBackend = null;} else {try {// 根据应用的配置获取状态后端applicationConfiguredBackend =serializedAppConfigured.deserializeValue(classLoader);} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(jobId, "Could not deserialize application-defined state backend.", e);}}final StateBackend rootBackend;try {// 获取状态后端配置// 如果应用的状态后端没有配置,使用配置文件中的状态后端// 如果配置文件中也没有,使用默认值rootBackend =StateBackendLoader.fromApplicationOrConfigOrDefault(applicationConfiguredBackend, jobManagerConfig, classLoader, log);} catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);}// load the checkpoint storage from the application settings// 从app设置中加载checkpoint存储配置final CheckpointStorage applicationConfiguredStorage;final SerializedValue<CheckpointStorage> serializedAppConfiguredStorage =snapshotSettings.getDefaultCheckpointStorage();if (serializedAppConfiguredStorage == null) {applicationConfiguredStorage = null;} else {try {applicationConfiguredStorage =serializedAppConfiguredStorage.deserializeValue(classLoader);} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(jobId,"Could not deserialize application-defined checkpoint storage.",e);}}// 和状态后端的配置类似,从应用配置和flink配置文件中加载checkpoint存储配置final CheckpointStorage rootStorage;try {rootStorage =CheckpointStorageLoader.load(applicationConfiguredStorage,null,rootBackend,jobManagerConfig,classLoader,log);} catch (IllegalConfigurationException | DynamicCodeLoadingException e) {throw new JobExecutionException(jobId, "Could not instantiate configured checkpoint storage", e);}// instantiate the user-defined checkpoint hooks// 实例化用户定义的checkpoint钩子// 这些钩子可以在恢复快照或者是触发快照的时候执行final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks =snapshotSettings.getMasterHooks();final List<MasterTriggerRestoreHook<?>> hooks;if (serializedHooks == null) {hooks = Collections.emptyList();} else {final MasterTriggerRestoreHook.Factory[] hookFactories;try {hookFactories = serializedHooks.deserializeValue(classLoader);} catch (IOException | ClassNotFoundException e) {throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e);}final Thread thread = Thread.currentThread();final ClassLoader originalClassLoader = thread.getContextClassLoader();thread.setContextClassLoader(classLoader);try {hooks = new ArrayList<>(hookFactories.length);for (MasterTriggerRestoreHook.Factory factory : hookFactories) {hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));}} finally {thread.setContextClassLoader(originalClassLoader);}}// 获取checkpoint协调器的配置final CheckpointCoordinatorConfiguration chkConfig =snapshotSettings.getCheckpointCoordinatorConfiguration();// 为executionGraph应用checkpoint的相关配置executionGraph.enableCheckpointing(chkConfig,triggerVertices,ackVertices,confirmVertices,hooks,checkpointIdCounter,completedCheckpointStore,rootBackend,checkpointStatsTracker,checkpointsCleaner);}// create all the metrics for the Execution Graph// 创建相关监控项,监控任务运行时间,重启时间和停止时间metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));return executionGraph;
}

创建ExecutionGraph的主要步骤大致如下:

获取Job信息
创建ExecutionGraph本体
绑定JobGraph顶点
设置Checkpoint配置
设置状态后端配置
设置Checkpoint存储
设置Checkpoint钩子
设置作业监控

ExecutionGraph构造函数

jobInformation:作业信息的一个封装,包含作业id,名称,配置,用户代码和classpath等。
futureExecutor:异步执行线程池。
ioExecutor:IO操作线程池。
rpcTimeout:RPC调用超时时间。
maxPriorAttemptsHistoryLength:保留在历史记录中的最大重试次数。
classLoader:用户代码类加载器。
blobWriter:用于将数据写入blob server。
partitionReleaseStrategyFactory:IntermediateResultPartition释放策略工厂类。
shuffleMaster:用于注册IntermediateResultPartition(中间结果分区),向JobMaster注册数据分区及它的生产者。
partitionTracker:用于追踪和释放分区。
partitionLocationConstraint:限制在部署的时候partition的位置可否未知。在批模式,分区未知可以未知,但是在流模式,分区位置必须是已知的。
executionDeploymentListener:执行计划部署监听器
executionStateUpdateListener:执行计划更新监听器
initializationTimestamp:初始时间戳
vertexAttemptNumberStore:用于储存每个Job顶点重试次数
attachJobGraph方法
该方法将JobGraph绑定到ExecutionGraph。

@Override
public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {// 检查在JobMaster主线程执行assertRunningInJobMasterMainThread();LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} "+ "vertices and {} intermediate results.",topologiallySorted.size(),tasks.size(),intermediateResults.size());// 创建保存Execution作业顶点的集合final ArrayList<ExecutionJobVertex> newExecJobVertices =new ArrayList<>(topologiallySorted.size());final long createTimestamp = System.currentTimeMillis();for (JobVertex jobVertex : topologiallySorted) {// 如果有顶点是数据输入顶点并且是无法停止的顶点// 则设置ExecutionGraph的数据源task属性为无法停止if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {this.isStoppable = false;}// create the execution job vertex and attach it to the graph// 创建出ExecutionJobVertexExecutionJobVertex ejv =new ExecutionJobVertex(this,jobVertex,maxPriorAttemptsHistoryLength,rpcTimeout,createTimestamp,this.initialAttemptCounts.getAttemptCounts(jobVertex.getID()));// 设置前一个节点的IntermediateResult给当前ejv// 完成连接到前置节点这个逻辑,即这个方法名的含义ejv.connectToPredecessors(this.intermediateResults);// 将job顶点ID和ejv作为键值对,放入ExecutionGraphExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);// 如果previousTask不为空,说明两个JobGraph的顶点具有相同的ID,为异常情况if (previousTask != null) {throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",jobVertex.getID(), ejv, previousTask));}// 遍历ejv中创建的IntermediateResult// 该IntermediateResult在ExecutionJobVertex构造函数创建// 从ejv对应JobVertex的IntermediateDataSets创建出IntermediateResultfor (IntermediateResult res : ejv.getProducedDataSets()) {IntermediateResult previousDataSet =this.intermediateResults.putIfAbsent(res.getId(), res);// 同理,检查result的ID不能重复if (previousDataSet != null) {throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",res.getId(), res, previousDataSet));}}// 该集合按照顶点创建顺序保存ejv,将ejv保存起来this.verticesInCreationOrder.add(ejv);// 统计总的顶点数,作业实际执行的时候,每个并行度都会部署一个vertex运行task// 因此需要累加各个ejv的并行度this.numVerticesTotal += ejv.getParallelism();newExecJobVertices.add(ejv);}// 注册所有的ExecutionVertex和它输出数据的IntermediateResultPartition// ExecutionVertex为物理执行节点,一个ExecutionJobVertex有多少并行度,就会包含多少个ExecutionVertexregisterExecutionVerticesAndResultPartitions(this.verticesInCreationOrder);// the topology assigning should happen before notifying new vertices to failoverStrategy// 创建执行拓扑//执行拓扑包含所有ExecutionVertex,ResultPartition以及PipelinedRegionexecutionTopology = DefaultExecutionTopology.fromExecutionGraph(this);// 创建分区释放策略partitionReleaseStrategy =partitionReleaseStrategyFactory.createInstance(getSchedulingTopology());
}

Pipelined Region
创建executionTopology我们会遇到Pipelined region。在解释这个概念前需要了解下pipelined result和blocking result的区别。

Pipelined result指的是数据从管道中源源不断的流出,下游可以连续消费产生的数据。一旦上游产生数据,下游就可以立即开始消费。Pipelined result生产数据的过程永远不会停止。此类型对应的作业为流计算作业。

Blocking result只能等到上游数据生产过程结束的时候才可以消费。Blocking result永远是有限的。典型的场景是批处理作业。

Pipelined Region是ExecutionGraph的一部分。它包含连续多个pipeline类型数据交换task(生成pipelined result)。因此一个ExecutionGraph可被分隔为多个pipelined Region,他们之间有block类型作业相连接。

Pipelined Region的意义是region内部的所有消费者必须持续消费上游生产者产生的数据,从而避免阻塞上游,或者产生反压。所以说同一个pipelined Region内的所有task启动或失败之时都必须同时被调度或重启。

Pipelined region调度的官网解释请参见:https://flink.apache.org/2020/12/15/pipelined-region-sheduling.html

enableCheckpointing方法
该方法为ExecutionGraph初始化检查点相关配置。主要逻辑是创建和配置CheckpointCoordinator对象。代码如下所示:

@Override
public void enableCheckpointing(CheckpointCoordinatorConfiguration chkConfig,List<MasterTriggerRestoreHook<?>> masterHooks,CheckpointIDCounter checkpointIDCounter,CompletedCheckpointStore checkpointStore,StateBackend checkpointStateBackend,CheckpointStorage checkpointStorage,CheckpointStatsTracker statsTracker,CheckpointsCleaner checkpointsCleaner) {// 检查作业必须处于已创建状态checkState(state == JobStatus.CREATED, "Job must be in CREATED state");// 检查CheckpointCoordinator(检查点协调器)必须未创建,避免重复操作checkState(checkpointCoordinator == null, "checkpointing already enabled");// 收集各个ExecutionJobVertex的OperatorCoordinator// OperatorCoordinator运行在JobManager,与作业vertex中的operator相关联。用于和operator交互final Collection<OperatorCoordinatorCheckpointContext> operatorCoordinators =buildOpCoordinatorCheckpointContexts();checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");// 创建checkpoint失败管理器,负责在checkpoint失败时候调用处理逻辑CheckpointFailureManager failureManager =new CheckpointFailureManager(chkConfig.getTolerableCheckpointFailureNumber(),new CheckpointFailureManager.FailJobCallback() {@Overridepublic void failJob(Throwable cause) {getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause));}@Overridepublic void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {getJobMasterMainThreadExecutor().execute(() ->failGlobalIfExecutionIsStillRunning(cause, failingTask));}});// 创建CheckpointCoordinator周期自动触发checkpoint的定时器checkState(checkpointCoordinatorTimer == null);checkpointCoordinatorTimer =Executors.newSingleThreadScheduledExecutor(new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));// 创建CheckpointCoordinator,负责协调整个集群范围内所有operator的checkpoint操作,发起checkpoint操作和提交checkpoint// create the coordinator that triggers and commits checkpoints and holds the statecheckpointCoordinator =new CheckpointCoordinator(jobInformation.getJobId(),chkConfig,operatorCoordinators,checkpointIDCounter,checkpointStore,checkpointStorage,ioExecutor,checkpointsCleaner,new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),SharedStateRegistry.DEFAULT_FACTORY,failureManager,createCheckpointPlanCalculator(),new ExecutionAttemptMappingProvider(getAllExecutionVertices()));// register the master hooks on the checkpoint coordinator// 设置主消息钩子,在创建checkpoint或从checkpoint恢复的时候回调for (MasterTriggerRestoreHook<?> hook : masterHooks) {if (!checkpointCoordinator.addMasterHook(hook)) {LOG.warn("Trying to register multiple checkpoint hooks with the name: {}",hook.getIdentifier());}}// 配置checkpoint状态跟踪器checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);// interval of max long value indicates disable periodic checkpoint,// the CheckpointActivatorDeactivator should be created only if the interval is not max// value// 如果没有禁用周期性触发checkpoint,注册一个作业状态监听器// 该listener为CheckpointCoordinator所用,监听作业状态的变化if (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) {// the periodic checkpoint scheduler is activated and deactivated as a result of// job status changes (running -> on, all other states -> off)registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());}// 配置状态后端名称和checkpoint存储名称this.stateBackendName = checkpointStateBackend.getClass().getSimpleName();this.checkpointStorageName = checkpointStorage.getClass().getSimpleName();
}

从ExecutionGraph到部署Task
ExecutionVertex每次执行都会创建出一个Execution对象。

在JobManager启动完毕之后,会对Scheduler发出开始调度的命令(调用SchedulerBase的startScheduling方法)。经过中间层层调用(较为复杂,这里暂时省略),最终到达Execution.deploy方法。

Execution.deploy方法为真正的部署运行逻辑,根据task资源和ExecutionVertex构造出一个task部署描述符。这个部署描述符的作用为携带task执行配置,通过RPC的方式传递给TaskManager,从而创建出一个符合要求的task。

public void deploy() throws JobException {// 确保在JobMaster主线程执行assertRunningInJobMasterMainThread();// 获取分配的资源final LogicalSlot slot = assignedResource;checkNotNull(slot,"In order to deploy the execution we first have to assign a resource via tryAssignResource.");// Check if the TaskManager died in the meantime// This only speeds up the response to TaskManagers failing concurrently to deployments.// The more general check is the rpcTimeout of the deployment callif (!slot.isAlive()) {throw new JobException("Target slot (TaskManager) for deployment is no longer alive.");}// make sure exactly one deployment call happens from the correct state// note: the transition from CREATED to DEPLOYING is for testing purposes only// 获取之前的状态,并切换状态为正在部署(DEPLOYING)ExecutionState previous = this.state;if (previous == SCHEDULED || previous == CREATED) {if (!transitionState(previous, DEPLOYING)) {// race condition, someone else beat us to the deploying call.// this should actually not happen and indicates a race somewhere elsethrow new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");}} else {// vertex may have been cancelled, or it was already scheduledthrow new IllegalStateException("The vertex must be in CREATED or SCHEDULED state to be deployed. Found state "+ previous);}// 检查这个slot资源是否分配给了当前executionif (this != slot.getPayload()) {throw new IllegalStateException(String.format("The execution %s has not been assigned to the assigned slot.", this));}try {// race double check, did we fail/cancel and do we need to release the slot?// 再次检查作业状态是否为正在部署if (this.state != DEPLOYING) {slot.releaseSlot(new FlinkException("Actual state of execution "+ this+ " ("+ state+ ") does not match expected state DEPLOYING."));return;}LOG.info("Deploying {} (attempt #{}) with attempt id {} to {} with allocation id {}",vertex.getTaskNameWithSubtaskIndex(),attemptNumber,vertex.getCurrentExecutionAttempt().getAttemptId(),getAssignedResourceLocation(),slot.getAllocationId());// 创建一个Task部署描述符// 该部署描述符携带了ExecutionVertex及其分配的资源等信息final TaskDeploymentDescriptor deployment =TaskDeploymentDescriptorFactory.fromExecutionVertex(vertex, attemptNumber).createDeploymentDescriptor(slot.getAllocationId(),taskRestore,producedPartitions.values());// null taskRestore to let it be GC'edtaskRestore = null;final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();final ComponentMainThreadExecutor jobMasterMainThreadExecutor =vertex.getExecutionGraphAccessor().getJobMasterMainThreadExecutor();getVertex().notifyPendingDeployment(this);// We run the submission in the future executor so that the serialization of large TDDs// does not block// the main thread and sync back to the main thread once submission is completed.// 在这里,异步调用taskManagerGateway,通过rpc方式通知TaskManager// 将Task部署描述符发送给TaskManager// TaskManager接收到后开始部署TaskCompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor).thenCompose(Function.identity()).whenCompleteAsync((ack, failure) -> {if (failure == null) {vertex.notifyCompletedDeployment(this);} else {if (failure instanceof TimeoutException) {String taskname =vertex.getTaskNameWithSubtaskIndex()+ " ("+ attemptId+ ')';markFailed(new Exception("Cannot deploy task "+ taskname+ " - TaskManager ("+ getAssignedResourceLocation()+ ") not responding after a rpcTimeout of "+ rpcTimeout,failure));} else {markFailed(failure);}}},jobMasterMainThreadExecutor);} catch (Throwable t) {markFailed(t);}
}

上面方法中通过TaskManagerGateway调用了TaskExecutor的submitTask方法。

@Override
public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {// ...// 创建出TaskTask task =new Task(jobInformation,taskInformation,tdd.getExecutionAttemptId(),tdd.getAllocationId(),tdd.getSubtaskIndex(),tdd.getAttemptNumber(),tdd.getProducedPartitions(),tdd.getInputGates(),memoryManager,taskExecutorServices.getIOManager(),taskExecutorServices.getShuffleEnvironment(),taskExecutorServices.getKvStateService(),taskExecutorServices.getBroadcastVariableManager(),taskExecutorServices.getTaskEventDispatcher(),externalResourceInfoProvider,taskStateManager,taskManagerActions,inputSplitProvider,checkpointResponder,taskOperatorEventGateway,aggregateManager,classLoaderHandle,fileCache,taskManagerConfiguration,taskMetricGroup,resultPartitionConsumableNotifier,partitionStateChecker,getRpcService().getExecutor());
// ...
}

到这里为止,TaskManager中的具体任务Task对象已经被创建出来。从JobGraph生成ExecutionGraph并最终部署为Task的过程已分析完毕。

作者:AlienPaul
链接:https://www.jianshu.com/p/571d0510d5e9
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

【Flink】Flink 源码之ExecutionGraph相关推荐

  1. Flink Checkpoint源码浅析

    1. JobManager 端checkpoint调度 dispatcher分发任务后会启动相应的jobMaster, 在创建jobMaster 构建过程中会执行jobGraph -> exec ...

  2. Flink checkpoint源码理解

    参考:https://blog.jrwang.me/2019/flink-source-code-checkpoint/#checkpoint-%E7%9A%84%E5%8F%91%E8%B5%B7% ...

  3. 【Flink】flink highavailabilityservices 源码解析

    1.概述 转载:https://www.freesion.com/article/5743743878/ 写在前面:源码查看入口 runtime ---> Entrypoint 不同模式对应不同 ...

  4. Flink Watermark 源码分析

    随着 flink 的快速发展与 API 的迭代导致新老版本差别巨大遂重拾 flink,在回顾到时间语义时对 watermark 有了不一样的理解. 一.如何生成 在 flink 1.12(第一次学习的 ...

  5. Flink Cep 源码分析

    复合事件处理(Complex Event Processing,CEP)是一种基于动态环境中事件流的分析技术,事件在这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤.关联.聚合等技术,根据事 ...

  6. Flink内核源码(八)Flink Checkpoint

    Flink中Checkpoint是使Flink 能从故障恢复的一种内部机制.检查点是 Flink 应用状态的一个一致性副本,在发生故障时,Flink 通过从检查点加载应用程序状态来恢复. 核心思想:是 ...

  7. flink CompactingHashTable源码解析

    CompactingHashTable是使用flink管理内存的hash表. 这个table被设计分为两个部分,一部分是hash索引,用来定位数据的具体位置,而另一部分则是被分区的内存buffer用来 ...

  8. 【Flink】源码-Flink重启策略-简介 Task恢复策略 重启策略监听器

    文章目录 1.概述 3.固定间隔 4.失败率 4.1 案例 5. 无重启策略 5.1 案例 6.实际代码演示 7. Task恢复策略 8.重启策略监听器 8.1 测试 M.参考 1.概述 ​ Flin ...

  9. Flink 源码解析 —— 源码编译运行

    更新一篇知识星球里面的源码分析文章,去年写的,周末自己录了个视频,大家看下效果好吗?如果好的话,后面补录发在知识星球里面的其他源码解析文章. 前言 之前自己本地 clone 了 Flink 的源码,编 ...

最新文章

  1. 数据的存入取出(注册机方式)
  2. 【数据结构与算法】二叉树遍历
  3. epoll实现压测工具
  4. android最新能调试吗,android – 不能在同一台设备上安装调试和发布版本
  5. 工业轨式 1-16路 4-20MA 模拟量光端机产品介绍
  6. WSS连接服务器端报错
  7. Linux—微服务启停shell脚本编写模板
  8. 和dump文件什么区别_将java进程转移到“解剖台”之前,法医都干了什么?
  9. 跟着 Microsoft 跑!
  10. 第三章 PCB 封装库绘制
  11. MT7621方案GPIO介绍
  12. Git和Gitlab协同工作
  13. 计算机科学与技术本科知识体系
  14. 涨知识:当前主流服务器操作系统的三大流派!
  15. SpringBoot导出word模板并动态渲染数据
  16. 二维小球完全弹性碰撞绝对坐标速度计算公式
  17. win10 图形驱动安装失败解决方法
  18. k武装匪徒强化学习入门课
  19. w7设置双显示器_Win7如何设置双显示器?设置双显示器的方法
  20. 问题1.“程序兼容性助手:无法在此设备上加载驱动程序” ——【“Usb-blaster”、“Hardlock.sys”】。问题2.“关闭内存完整性 后电脑开机一直循环重启,进不去window系统”

热门文章

  1. 春节档电影评分出炉:韩寒《四海》垫底仅有 5.6 分,他居然第一
  2. 亚马逊员工因龙卷风身亡 贝索斯庆祝载人飞行惹众怒
  3. 苹果MacBook Air 2022款也将有刘海屏设计
  4. 虎扑入股“造物”App关联公司 后者为模玩、手办平台
  5. 上市4年,现在苹果要停产它了
  6. 一起教育科技登陆纳斯达克:首日股价上涨0.67%
  7. iPhone 12系列全新渲染图曝光:4个“杯型” 起售价可能不到5k
  8. “央视boys” 四人带货超5亿:权来康康,撒开了买
  9. 微信又上线了新功能,聊天再也不会发错群了?
  10. 新一批国产游戏版号下发:共53款 腾讯、网易在列