Client向RM提交任务的过程大致分为七步,如下图:

1. Client向RM发出请求
2. RM返回一个ApplicationID作为回应
3. Client向RM回应Application Submission Context(ASC)。ASC包括ApplicationID、user、queue,以及其他一些启动AM相关的信息,除此之外,还有一个Container Launch Context(CLC),CLC包含了资源请求数(内存与CPU),job files,安全token,以及其他一些用以在一个node上启动AM的信息。任务一旦提交以后,client可以请求RM去杀死应用或查询应用的运行状态
4. 当RM接受到ASC后,它会调度一个合适的container来启动AM,这个container经常被称作为container 0。AM需要请求其他的container来运行任务,如果没有合适的container,AM就不能启动。当有合适的container时,RM发请求到合适的NM上,来启动AM。这时候,AM的PRC与监控的URL就已经建立了。
5. 当AM启动起来后,RM回应给AM集群的最小与最大资源等信息。这时AM必须决定如何使用那么当前可用的资源。YARN不像那些请求固定资源的scheduler,它能够根据集群的当前状态动态调整。
6. AM根据从RM那里得知的可使用的资源,它会请求一些一定数目的container。This request can be very specific,including containers with multiples of the resource minimum values (e.g., extra memory)。

7. RM将会根据调度策略,尽可能的满足AM申请的container。也就是会分配container给AM,然后这些container的node manager会与AM进行通信,AM会向这些container的node manager发送启动容器的必要配置。

同时,在一个job运行时,AM会向RM汇报心跳与进度信息,在这些心跳过程中,AM可能去申请或释放container。会当任务完成时,AM向RM发送一条任务结束信息然后退出。如下图所示:

接下来,用一个简单的例子来说明整个过程。

代码参考:https://github.com/trumanz/hadoop_the_definitive_guide/tree/master/ch04-yarn/yarnExample

YARN client编写

1.创建一个Application

YarnClientApplication app = yarnClient.createApplication();

createApplication()方法在hadoop源码中yarn project中的org.apache.hadoop.yarn.client.api.impl中YARNClientImpl.java实现。

2.设置Application的名字

app.getApplicationSubmissionContext().setApplicationName( "truman.ApplicationMaster");

getApplicationSubmissionContext()方法位于org.apache.hadoop.yarn.client.api中的YarnClientApplication.java中

setApplicationName()方法位于org.apache.hadoop.yarn.client.api中的ApplicationSubmissionContextPBImpl.java中

3.设置Application的内存和CPU需求以及优先级和queue信息,YARN中RM将根据这些信息来选择合适的container来启动APP master,这个container经常被称作为container 0。

 app.getApplicationSubmissionContext().setResource(Resource.newInstance(100, 1));app.getApplicationSubmissionContext().setPriority(Priority.newInstance(0));app.getApplicationSubmissionContext().setQueue("default");

setResource(),setResource(),setQueue()这些方法都在org.apache.hadoop.yarn.client.api中的ApplicationSubmissionContextPBImpl.java中

4.设置ContainerLaunchContext,这一步,amContainer中包含了App Master执行所需要的资源文件,环境变量和启动命令,这里将资源文件上传到了HDFS,这样在Node Manager就可以通过HDFS取得这些文件。

app.getApplicationSubmissionContext().setAMContainerSpec(amContainer);

setAMContainerSpec()方法位于org.apache.hadoop.yarn.client.api中的ApplicationSubmissionContextPBImpl.java中。

5.提交应用给RM

    ApplicationId appId = yarnClient.submitApplication(app.getApplicationSubmissionContext());

submitApplication()方法位于org.apache.hadoop.yarn.client.api.impl中YARNClientImpl.java中。

对于client的编写还是比较简单的,不需要维护状态,只需要提交相应的消息给RM就行。

YARN APP Master编写

这部分编写比较复杂,AM需要与RM和NM通信,交互。

通过RM,申请container,并接受RM的一些信息,如可用的container资源,结束container等。

通过NM,启动container,并接收NM的信息,如container的状态变化以及Node状态变化等。

1.创建一个AMRMClientAsync对象,负责与RM交互通信

amRMClient = AMRMClientAsync.createAMRMClientAsync( 1000, new RMCallbackHandler());

这里的RMCallbackHandler 是我们编写的继承自AMRMClientAsync.CallbackHandler 的一个类,其功能是处理由Resource Manager收到的消息,
       其需要实现的方法由如下
       public void onContainersCompleted(List<ContainerStatus> statuses);
          public void onContainersAllocated(List<Container> containers) ;
          public void onShutdownRequest() ;
          public void onNodesUpdated(List<NodeReport> updatedNodes) ;
          public void onError(Throwable e) ;

这里不考虑异常的情况下,只写onContainersAllocated, onContainersCompleted 这两个既可以, 一个是当有新的Container 可以使用, 一个是Container 运行结束。
       在onContainersAllocated 我们需要编写 启动container 的代码,amNMClient.startContainerAsync(container, ctx); 这里的ctx 同Yarn Client 中第4步中的amContainer 是同一个类型, 即这个container 执行的一些资源,环境变量与命令等, 因为这是在回调函数中,为了保证时效性,这个操作最好放在线程池中异步操作。

在onContainersCompleted 中,如果是失败的Container,我们需要重新申请并启动Container,(这一点有可能是YARN的 Fair Schedule 中会强制退出某些Container 以释放资源) 成功的将做记录既可以。

2.创建一个NMClientAsyncImpl对象,负责与NM交互通信

 amNMClient = new NMClientAsyncImpl(new NMCallbackHandler());

这里NMCallbackHandler 使我们需要编写的继承自NMClientAsync.CallbackHandler 的对象,其功能是处理由Node Manager 收到的消息
       public void onContainerStarted(ContainerId containerId,  Map<String, ByteBuffer> allServiceResponse);
          public void onContainerStatusReceived(ContainerId containerId,  ContainerStatus containerStatus);
          public void onContainerStopped(ContainerId containerId) ;
          public void onStartContainerError(ContainerId containerId, Throwable t);
          public void onGetContainerStatusError(ContainerId containerId,  Throwable t) ;
          public void onStopContainerError(ContainerId containerId, Throwable t);

这里简单的不考虑异常的情况下,这些函数可以写一个空函数体,忽略掉处理。

3.将自己(AM)注册到RM上

 RegisterApplicationMasterResponse response = amRMClient.registerApplicationMaster(NetUtils.getHostname(), -1, "");

该函数将自己注册到RM上,没有提供RPC port和trackURL。

该方法在org.apache.hadoop.yarn.client.api.impl的AMRMClientImpl.java中。

4.向RM申请container

ContainerRequest containerAsk = new ContainerRequest(//100*10M + 1vcpuResource.newInstance(100, 1), null, null,Priority.newInstance(0));amRMClient.addContainerRequest(containerAsk);

这里一个containerAsk   表示申请一个 Container, 这里的对nodes和rasks 设置为NULL,猜测MapReduce应该由参数来尝试申请靠近HDFS block的container的。

addContainerRequest()方法在org.apache.hadoop.yarn.client.api.impl的AMRMClientImpl.java中。

5.等待container执行完毕,清除退出

我的代码如下, 循环等待container 执行完毕,并上报执行结果

 void waitComplete() throws YarnException, IOException{while(numTotalContainers.get() != numCompletedConatiners.get()){try{Thread.sleep(1000);LOG.info("waitComplete" + ", numTotalContainers=" + numTotalContainers.get() +", numCompletedConatiners=" + numCompletedConatiners.get());} catch (InterruptedException ex){}}exeService.shutdown();amNMClient.stop();amRMClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, "dummy Message", null);amRMClient.stop();}

YARN Container Application

真正处理数据的是由APP master中amNMClient.startContainerAsync(container, ctx)提交的 Container application,也就是提交给具体的container执行的工作。然后这这个应用并不需要特殊编写,任何程序通过提交相应的运行信息都可以在这些Node中的某个Container 中执行, 所以这个程序可以是一个复杂的MapReduce  Task 或者 是一个简单的脚本。

总结:

YARN 提供了对cluster 资源管理 和 作业调度的功能。
编写一个应用运行在YARN 之上,比较复杂的是App Mstr 的编写,其需要维护container 的状态并能共做一些错误恢复,重启应用的操作。 比较简答的是Client的编写,只需要提交必须的信息既可以,不需要维护状态。 真正运行处理数据的是Container Application ,这个程序可以不需要针对YARN做代码编写。

YARN-client提交任务处理过程相关推荐

  1. yarn client 提交任务

    文章目录 目的 概念和流程 yarn运行机制 1. 起yarn client 2. 起app master 3. Reference 目的 提交一个自定义的任务到Yarn上 概念和流程 yarn运行机 ...

  2. Spark通过YARN提交任务不成功(包含YARN cluster和YARN client)

    无论用YARN cluster和YARN client来跑,均会出现如下问题. [spark@master spark-1.6.1-bin-hadoop2.6]$ jps 2049 NameNode ...

  3. 从源码角度看Spark on yarn client cluster模式的本质区别

    首先区分下AppMaster和Driver,任何一个yarn上运行的任务都必须有一个AppMaster,而任何一个Spark任务都会有一个Driver,Driver就是运行SparkContext(它 ...

  4. 添加spark.yarn.jars 解决 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set,

    一:问题现象: 在spark on yarn 提交任务是,提示如下: WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive ...

  5. spark yarn模式提交任务不成功(application state: ACCEPTED)

    问题详情 电脑8G,目前搭建3节点的spark集群,采用YARN模式. master分配2G,slave1分配1G,slave2分配1G.(在安装虚拟机时) export SPARK_WORKER_M ...

  6. 大数据Flink进阶(十五):Flink On Yarn任务提交

    文章目录 Flink On Yarn任务提交 一.Flink On Yarn运行原理 二.代码及Yarn环境准备 1.准备代码 2.yarn 环境准备 三.Yarn Session模式 1.任务提交命 ...

  7. Java代码使用Spark on Yarn 方式提交任务到带Kerberos认证的Hadoop集群

    2019独角兽企业重金招聘Python工程师标准>>> 项目中遇到Spark Yarn方式提交到Hadoop集群,访问集群HDFS时发现使用的当前用户,没有访问权限,经过排查后发现H ...

  8. 12、基于yarn的提交模式

    一.三种提交模式 1.Spark内核架构,其实就是第一种模式,standalone模式,基于Spark自己的Master-Worker集群.2.第二种,是基于YARN的yarn-cluster模式.3 ...

  9. YARN作业提交流程剖析

    YARN(MapReduce2) Yet Another Resource Negotiator / YARN Application Resource Negotiator 对于节点数超出4000的 ...

  10. MapReduce中Client提交Job源码分析

    回顾 在进行submit源码分析之前,先来回顾一下WordCount案例(点击查看WordCount案例).仔细回想一下曾经Client都干了点啥?获取对象-->一通set-->job.w ...

最新文章

  1. c#总结最近的几项重要代码
  2. mysql知识点概览_MySQL 基本架构概览
  3. 使用display:none和visibility:hidden隐藏的区别
  4. 一個便宜的高负载网站架构
  5. 【数据结构和算法笔记】用并查集求解等价关系
  6. 新兴市场成为联想如何7年超越三星的胜负关键
  7. Atitit 如何设置与安放知识的trap陷阱  知识聚合 rss url聚合工具 以及与trap的对比
  8. QT IDE下载及安装(最新版本)
  9. C# Winform开发框架源码 Winform系统开发 图书借阅系统,图书管理系统,说明文档齐全
  10. 挑战程序设计竞赛——抽签Ⅱ
  11. dell 7050台式计算机,Dell OptiPlex 7050 系统指南
  12. 盛京剑客系列26:极简估值教程——第二讲历史估值的参考与运用
  13. 百度编辑器UEditor图片上传尺寸大小设置
  14. Flixel横板游戏制作教程(九)—SquashingthePlayer(挤压Player)
  15. 查询昌吉州二中2021年高考成绩,2017新疆、昌吉州文理状元分数出炉!昌吉州这所中学喜摘文理状元桂冠!...
  16. Linux(入门基础):85---Linux单一计划任务(at服务、at、atq、atrm、batch命令)
  17. Log Parser 2.2 + Log Parser Lizard GUI 分析IIS日志示例
  18. 计算机考研考电路学校,集成电路工程考研学校排名
  19. windows自动开关机教程
  20. gRPC大数据量消息传递方法

热门文章

  1. 别让西药毁了凉茶!盒马牵手平安堂,能放心喝的凉茶来了
  2. 做一个视频通话给自己用吧
  3. Android Killer反编译时遇到的异常
  4. 基于BP神经网络控制+Simulink双闭环直流调速系统仿真
  5. Qt自定义控件大全文章导航
  6. 【第10章】接口与Lambda表达式
  7. 谁说游戏里的建模都是贴图?细数那些可以肆意破坏场景的单机大作
  8. 打印机能两个计算机共用吗,打印机共享线_两台电脑用一台打印机_分线器可以接打印机吗...
  9. 《国史通鉴》- 宋朝
  10. Java--JSON嵌套JSON中带‘\‘字符的解决方式