JobManager作为actor,

  case SubmitJob(jobGraph, listeningBehaviour) =>val client = sender()val jobInfo = new JobInfo(client, listeningBehaviour, System.currentTimeMillis(),jobGraph.getSessionTimeout)submitJob(jobGraph, jobInfo)

submitJob,做3件事、

根据JobGraph生成ExecuteGraph
恢复状态CheckpointedState,或者Savepoint
提交ExecuteGraph给Scheduler进行调度

ExecuteGraph

executionGraph = ExecutionGraphBuilder.buildGraph(executionGraph, //currentJobs.get(jobGraph.getJobID),对应的jobid是否有现存的ExecuteGraph
  jobGraph,flinkConfiguration, //配置futureExecutor, //Executors.newFixedThreadPool(numberProcessors, new NamedThreadFactory("jobmanager-future-", "-thread-")),根据cpu核数创建的线程池ioExecutor, // Executors.newFixedThreadPool(numberProcessors, new NamedThreadFactory("jobmanager-io-", "-thread-"))userCodeLoader,  //libraryCacheManager.getClassLoader(jobGraph.getJobID),从jar中加载checkpointRecoveryFactory, //用于createCheckpointStore和createCheckpointIDCounter,standalone和zk两种
  Time.of(timeout.length, timeout.unit),restartStrategy, //job重启策略
  jobMetrics,numSlots, //scheduler.getTotalNumberOfSlots(),注册到该JM上的instances一共有多少slotslog.logger)

ExecutionGraphBuilder.buildGraph

New

        // create a new execution graph, if none exists so farfinal ExecutionGraph executionGraph;try {executionGraph = (prior != null) ? prior :new ExecutionGraph(futureExecutor,ioExecutor,jobId,jobName,jobGraph.getJobConfiguration(),jobGraph.getSerializedExecutionConfig(),timeout,restartStrategy,jobGraph.getUserJarBlobKeys(),jobGraph.getClasspaths(),classLoader,metrics);} catch (IOException e) {throw new JobException("Could not create the execution graph.", e);}

attachJobGraph,生成Graph的节点和边

        // topologically sort the job vertices and attach the graph to the existing oneList<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();executionGraph.attachJobGraph(sortedTopology);

ExecutionGraph.attachJobGraph

       for (JobVertex jobVertex : topologiallySorted) {// create the execution job vertex and attach it to the graphExecutionJobVertex ejv =new ExecutionJobVertex(this, jobVertex, 1, timeout, createTimestamp);ejv.connectToPredecessors(this.intermediateResults);//All job vertices that are part of this graph, ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasksExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);for (IntermediateResult res : ejv.getProducedDataSets()) {//All intermediate results that are part of this graph//ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResultsIntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);}//All vertices, in the order in which they were created//List<ExecutionJobVertex> verticesInCreationOrderthis.verticesInCreationOrder.add(ejv);}

将JobVertex封装成ExecutionJobVertex

会依次创建出ExecutionJobVertex,ExecutionVertex, Execution; IntermediateResult, IntermediateResultPartition

ExecutionJobVertex

public ExecutionJobVertex(ExecutionGraph graph,JobVertex jobVertex,int defaultParallelism,Time timeout,long createTimestamp) throws JobException {if (graph == null || jobVertex == null) {throw new NullPointerException();}//并发度,决定有多少ExecutionVertexint vertexParallelism = jobVertex.getParallelism();int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;//产生ExecutionVertexthis.taskVertices = new ExecutionVertex[numTaskVertices];this.inputs = new ArrayList<>(jobVertex.getInputs().size());// take the sharing groupthis.slotSharingGroup = jobVertex.getSlotSharingGroup();this.coLocationGroup = jobVertex.getCoLocationGroup();// create the intermediate resultsthis.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()]; //创建用于存放中间结果的IntermediateResultfor (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);this.producedDataSets[i] = new IntermediateResult( //将JobGraph中的IntermediateDataSet封装成IntermediateResult
                    result.getId(),this,numTaskVertices,result.getResultType());}// create all task verticesfor (int i = 0; i < numTaskVertices; i++) {ExecutionVertex vertex = new ExecutionVertex( //初始化ExecutionVertexthis, i, this.producedDataSets, timeout, createTimestamp, maxPriorAttemptsHistoryLength);this.taskVertices[i] = vertex; //
        }finishedSubtasks = new boolean[parallelism];}

ExecutionVertex

      public ExecutionVertex(ExecutionJobVertex jobVertex,int subTaskIndex, //第几个task,task和ExecutionVertex对应
            IntermediateResult[] producedDataSets,Time timeout,long createTimestamp,int maxPriorExecutionHistoryLength) {this.jobVertex = jobVertex;this.subTaskIndex = subTaskIndex;this.taskNameWithSubtask = String.format("%s (%d/%d)",jobVertex.getJobVertex().getName(), subTaskIndex + 1, jobVertex.getParallelism());this.resultPartitions = new LinkedHashMap<IntermediateResultPartitionID, IntermediateResultPartition>(producedDataSets.length, 1); //用于记录IntermediateResultPartitionfor (IntermediateResult result : producedDataSets) {IntermediateResultPartition irp = new IntermediateResultPartition(result, this, subTaskIndex); //初始化IntermediateResultPartition
            result.setPartition(subTaskIndex, irp);resultPartitions.put(irp.getPartitionId(), irp);}this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];this.priorExecutions = new EvictingBoundedList<>(maxPriorExecutionHistoryLength);this.currentExecution = new Execution( //创建Execution
            getExecutionGraph().getFutureExecutor(),this,0,createTimestamp,timeout);this.timeout = timeout;}

connectToPredecessors,把节点用edge相连

    public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {List<JobEdge> inputs = jobVertex.getInputs(); //JobVertex的输入for (int num = 0; num < inputs.size(); num++) {JobEdge edge = inputs.get(num); //对应的JobEdge
            IntermediateResult ires = intermediateDataSets.get(edge.getSourceId()); //取出JobEdge的source IntermediateResultthis.inputs.add(ires); //List<IntermediateResult> inputs;int consumerIndex = ires.registerConsumer(); //将当前vertex作为consumer注册到IntermediateResult的每个IntermediateResultPartitionfor (int i = 0; i < parallelism; i++) {ExecutionVertex ev = taskVertices[i];ev.connectSource(num, ires, edge, consumerIndex); //为每个ExecutionVertex建立到具体IntermediateResultPartition的ExecutionEdge}}}

connectSource

public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {final DistributionPattern pattern = edge.getDistributionPattern(); // 获取edge的distribution patternfinal IntermediateResultPartition[] sourcePartitions = source.getPartitions(); // 获取souce的partitionsExecutionEdge[] edges;switch (pattern) {case POINTWISE:edges = connectPointwise(sourcePartitions, inputNumber);break;case ALL_TO_ALL:edges = connectAllToAll(sourcePartitions, inputNumber);break;default:throw new RuntimeException("Unrecognized distribution pattern.");}this.inputEdges[inputNumber] = edges;// add the consumers to the source// for now (until the receiver initiated handshake is in place), we need to register the // edges as the execution graphfor (ExecutionEdge ee : edges) {ee.getSource().addConsumer(ee, consumerNumber);}
}

看下connectPointwise

private ExecutionEdge[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) {final int numSources = sourcePartitions.length;  //Partitions的个数final int parallelism = getTotalNumberOfParallelSubtasks(); //subTasks的并发度// simple case same number of sources as targetsif (numSources == parallelism) { //如果1比1,简单return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) }; //取sourcePartitions中和subTaskIndex对应的那个partition
    }else if (numSources < parallelism) { //如果subTasks的并发度高,那一个source会对应于多个taskint sourcePartition;// check if the pattern is regular or irregular// we use int arithmetics for regular, and floating point with rounding for irregularif (parallelism % numSources == 0) { //整除的情况下,比如2个source,6个task,那么第3个task应该对应于第一个source// same number of targets per sourceint factor = parallelism / numSources;sourcePartition = subTaskIndex / factor;}else {// different number of targets per sourcefloat factor = ((float) parallelism) / numSources;sourcePartition = (int) (subTaskIndex / factor);}return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[sourcePartition], this, inputNumber) };}else {//......
    }
}

配置checkpoint

                executionGraph.enableSnapshotCheckpointing(snapshotSettings.getCheckpointInterval(),snapshotSettings.getCheckpointTimeout(),snapshotSettings.getMinPauseBetweenCheckpoints(),snapshotSettings.getMaxConcurrentCheckpoints(),snapshotSettings.getExternalizedCheckpointSettings(),triggerVertices,ackVertices,confirmVertices,checkpointIdCounter,completedCheckpoints,externalizedCheckpointsDir,checkpointStatsTracker);

启动CheckpointCoordinator,参考专门讨论Checkpoint机制的blog

Scheduler

下面看看如何将生成好的ExecutionGraph进行调度

     future { //异步try {submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo)) //放入submittedJobGraphs} catch {//
            }}jobInfo.notifyClients(decorateMessage(JobSubmitSuccess(jobGraph.getJobID))) //通知用户提交成功if (leaderElectionService.hasLeadership) {executionGraph.scheduleForExecution(scheduler) //调度
          }} catch {//
        }}(context.dispatcher)}

executionGraph.scheduleForExecution

    public void scheduleForExecution(SlotProvider slotProvider) throws JobException {switch (scheduleMode) {case LAZY_FROM_SOURCES:// simply take the vertices without inputs.for (ExecutionJobVertex ejv : this.tasks.values()) { //ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks,这个tasks的命名不科学if (ejv.getJobVertex().isInputVertex()) {ejv.scheduleAll(slotProvider, allowQueuedScheduling);}}break;case EAGER:for (ExecutionJobVertex ejv : getVerticesTopologically()) {ejv.scheduleAll(slotProvider, allowQueuedScheduling);}break;default:throw new JobException("Schedule mode is invalid.");}}

对于流默认是EAGER,

public JobGraph createJobGraph() {jobGraph = new JobGraph(streamGraph.getJobName());// make sure that all vertices start immediatelyjobGraph.setScheduleMode(ScheduleMode.EAGER);

ExecutionJobVertex.scheduleAll

    public void scheduleAll(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {    ExecutionVertex[] vertices = this.taskVertices;// kick off the tasksfor (ExecutionVertex ev : vertices) {ev.scheduleForExecution(slotProvider, queued);}}

ExecutionVertex.scheduleForExecution

//The current or latest execution attempt of this vertex's task
public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {return this.currentExecution.scheduleForExecution(slotProvider, queued);
}

Execution.scheduleForExecution

    public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();if (transitionState(CREATED, SCHEDULED)) {ScheduledUnit toSchedule = locationConstraint == null ? //生成ScheduledUnitnew ScheduledUnit(this, sharingGroup) :new ScheduledUnit(this, sharingGroup, locationConstraint);final Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule, queued); //从slotProvider获取slotfinal Future<Void> deploymentFuture = slotAllocationFuture.handle(new BiFunction<SimpleSlot, Throwable, Void>() {@Overridepublic Void apply(SimpleSlot simpleSlot, Throwable throwable) {if (simpleSlot != null) { //slot分配成功try {deployToSlot(simpleSlot); //deploy} catch (Throwable t) {try {simpleSlot.releaseSlot();} finally {markFailed(t);}}}else {markFailed(throwable);}return null;}});}

slotProvider,参考Flink - Scheduler

deployToSlot,核心就是往TaskManager提交submitTask请求

    public void deployToSlot(final SimpleSlot slot) throws JobException {ExecutionState previous = this.state;if (previous == SCHEDULED || previous == CREATED) {if (!transitionState(previous, DEPLOYING)) { //状态迁移成Deployingthrow new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");}}try {// good, we are allowed to deployif (!slot.setExecutedVertex(this)) { //设置slot和ExecuteVertex关系throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);}this.assignedResource = slot;final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor( //创建DeploymentDescriptor
                attemptId,slot,taskState,attemptNumber);// register this execution at the execution graph, to receive call backsvertex.getExecutionGraph().registerExecution(this);final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();final Future<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout); //向TaskMananger的Actor发送请求
submitResultFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {......}}

Flink – JobManager.submitJob相关推荐

  1. 聊聊flink JobManager的heap大小设置

    序 本文主要研究一下flink JobManager的heap大小设置 JobManagerOptions flink-core-1.7.1-sources.jar!/org/apache/flink ...

  2. 【Flink】 Flink JobManager HA 机制的扩展与实现

    1.概述 转载:Flink 源码阅读笔记(21)- Flink JobManager HA 机制的扩展与实现 在 Flink 1.12 中,Flink on Kubernetes 的 Native 部 ...

  3. Flink JobManager的HA原理分析

    文章目录 前言 JobManager的HA切换通知 利用Zookeeper的领导选举与消息通知 引用 前言 在中心式管理的系统里,主节点如果只是单独服务部署的话,或多或少都会存在单点瓶颈(SPOF)问 ...

  4. Flink JobManager HA模式部署(基于Standalone)

    参考文章:https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability. ...

  5. Flink : Flink JobManager报错 akka.pattern.AskTimeoutException: Ask timed out on

    1.美图 2.背景 Flink 1.10 JobManager报错 错误信息如下 2020-04-02 14:38:26,867 INFO org.apache.flink.runtime.execu ...

  6. Flink JobManager占用注册端口引起的小问题

    继续超短流水账一篇. 今天午饭时间,一个向来非常稳定的Flink on YARN任务忽然持续报警.查看TaskManager日志均没有问题,但JobManager日志内报出大量Connection r ...

  7. Flink(九):JobManager 内存简介

    一.简介 JobManager 具有许多与协调 Flink 应用程序的分布式执行有关的职责:它决定何时调度下一个 task(或一组 task).对完成的 task 或执行失败做出反应.协调 check ...

  8. Flink检查点失败问题-汇总

    原创作品:https://blog.csdn.net/fct2001140269/article/details/88404441 禁止转载 参考检查点失败的文章:https://www.jiansh ...

  9. Flink 从0到1学习—— 分享四本 Flink 国外的书和二十多篇 Paper 论文

    前言 之前也分享了不少自己的文章,但是对于 Flink 来说,还是有不少新入门的朋友,这里给大家分享点 Flink 相关的资料(国外数据 pdf 和流处理相关的 Paper),期望可以帮你更好的理解 ...

最新文章

  1. python day two,while
  2. 改改 Python 代码,运行速度还能提升 6 万倍,Science:先别想摩尔定律了
  3. springboot实现上传文件
  4. 用Python实现一个SVM分类器策略
  5. 使用react实现select_使用 Hooks 优化 React 组件
  6. c++new时赋初值_优质 quot;嵌入式C编程quot; 必备指南
  7. Android 系统(194)---Android实践 -- 设置系统日期时间和时区
  8. WindowServer2003中IIS6.0允许运行32位程序
  9. 【python】 类、对象的练习题
  10. url里面的参数不能带特殊字符
  11. caffe+CPU︱虚拟机+Ubuntu16.04+CPU+caffe安装笔记
  12. pandas处理mysql 展现wpf_Pandas DataFrame使用多列聚合函数
  13. roseha 8.9 for linux oracle 11g,RoseHA 9.0 for Linux配合Oracle12c配置文档_v2.0-2015-04.pdf
  14. python怎么解压rar文件_Python 解压缩文件详解
  15. 六个鲜为人知的超酷Unix/Linux命令
  16. 为什么说java是一个纯粹的面向对象的语言?面向对象语言和面向对象编程
  17. 关于HTTP的几个个人预言
  18. 攻防红队日记:利用路由器创建PPTP搭建隧道进内网
  19. 微信小程序基于百度云实现图文识别(胎教级教程)
  20. linux系统装fluent没有界面,linux系统下安装fluent

热门文章

  1. Unity AssetBundle内存管理相关问题
  2. 邮箱服务器ip地址白名单,申请SSL证书时如何设置IP地址白名单和邮箱白名单
  3. webpack4进阶配置
  4. WIX(20121031) 应用设置默认变量
  5. nginx配置静态文件过期时间
  6. Ok6410挂载NFS
  7. MySQL关联left join 条件on与where不同
  8. echarts tooltip在图表范围内显示
  9. java类的结构:构造器 —(13)
  10. java运算符 —(5)