Overview

RMAppAttempt state machine
图 1-1

APP_ACCEPTED Handle

RMAppAttempt 由RMApp创建并启动,向scheduler 提交靖求之后进入submited 状态。 scheduler 验证请求,并创建一个内部App对像并提交到queue,等待调度,向dispatcher 发送APP_ACCEPTED消息,最终该消息将由RMAppAttempt处理:(这里以CapacityScheduler为例)
    FiCaSchedulerApp SchedulerApp = new FiCaSchedulerApp(applicationAttemptId, user, queue, queue.getActiveUsersManager(), rmContext);// Submit to the queuetry {queue.submitApplication(SchedulerApp, user, queueName);} catch (AccessControlException ace) {LOG.info("Failed to submit application " + applicationAttemptId + " to queue " + queueName + " from user " + user, ace);this.rmContext.getDispatcher().getEventHandler().handle(new RMAppAttemptRejectedEvent(applicationAttemptId, ace.toString()));return;}applications.put(applicationAttemptId, SchedulerApp);LOG.info("Application Submission: " + applicationAttemptId + ", user: " + user +" queue: " + queue +", currently active: " + applications.size());rmContext.getDispatcher().getEventHandler().handle(new RMAppAttemptEvent(applicationAttemptId,RMAppAttemptEventType.APP_ACCEPTED)); 

收到该事件,状态机,会调用ScheduleTransition,将自己注册到执行等待队例,然后状态机进入scheduled状态,如果master是可管理的;

CONTAINER_ALLOCATED Handle

状态机进入该状态之后,系统将等待 NM node的下一次heartbeat消息,收到消之后,scheduler会检测该node的当前可用capacity,有capacity,将在该node上为App分配一个container 对像:
In  LeafQueue
  // Create the container if necessaryContainer container = getContainer(rmContainer, application, node, capability, priority);// something went wrong getting/creating the container if (container == null) {LOG.warn("Couldn't get container for allocation!");return Resources.none();}// Can we allocate a container on this node?int availableContainers = resourceCalculator.computeAvailableContainers(available, capability);if (availableContainers > 0) {// Allocate...// Did we previously reserve containers at this 'priority'?if (rmContainer != null){unreserve(application, priority, node, rmContainer);}// Create container tokens in secure-modeif (UserGroupInformation.isSecurityEnabled()) {ContainerToken containerToken = createContainerToken(application, container);if (containerToken == null) {// Something went wrong...return Resources.none();}container.setContainerToken(containerToken);}// Inform the applicationRMContainer allocatedContainer = application.allocate(type, node, priority, request, container);// Does the application need this resource?if (allocatedContainer == null) {return Resources.none();}// Inform the nodenode.allocateContainer(application.getApplicationId(), allocatedContainer);

第一个container 用来运行ApplicationMaster, 

Container 分配成功之后,AppAttempt将向Scheduler请求已分配的container,并设定为Master container,

 // Acquire the AM container from the scheduler.Allocation amContainerAllocation = appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST,EMPTY_CONTAINER_RELEASE_LIST);// Set the masterContainerappAttempt.setMasterContainer(amContainerAllocation.getContainers().get(0));

然后通知 state Store 保存当前App状态,AppAttempt 进入ALLOCATE_SAVING状态  保存完成之后,AppAttempt会收到一个 ATTEMP_SAVED通知。

ATTEMP_SAVED Handle

状态机收到该事件之后,开始加载并启动container,使得master得以开始运行:
  private void launchAttempt(){// Send event to launch the AM ContainereventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this));}
 private void launch() throws IOException {connect();ContainerId masterContainerID = masterContainer.getId();ApplicationSubmissionContext applicationContext =application.getSubmissionContext();LOG.info("Setting up container " + masterContainer+ " for AM " + application.getAppAttemptId());  ContainerLaunchContext launchContext =createAMContainerLaunchContext(applicationContext, masterContainerID);StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);request.setContainerLaunchContext(launchContext);request.setContainer(masterContainer);containerMgrProxy.startContainer(request);LOG.info("Done launching container " + masterContainer+ " for AM " + application.getAppAttemptId());}

LAUNCHE 成功之后,会收到 LAUNCHED可件通知:

LAUNCHED Handle

收到LAUNCHED通知之后,AppAttempt向监视线程注册, 之后等待Master启动运行的消息,master 启动之后,必须要向ResourceManager注册自己, 这时Resourcemanager会把这个注册事件发给appAttempt处理,

REGISTERED Handle

AppAttempt 收到 register 消息之后,保存master运行的相关信息,(host, port, trackingurl)然后通知App:

 // Let the app knowappAttempt.eventHandler.handle(new RMAppEvent(appAttempt.getAppAttemptId().getApplicationId(),RMAppEventType.ATTEMPT_REGISTERED));

ApplicationMaster 注册之后, AM会一直发送heartbeat 消息,通过 调用ApplicationMasterService.allocate() 方法, 收到applicationMaster的heartbeat 消息之后,Scheduler会为先向RMContainer发送Acquired 事件更新已经为AM分配的container状态,RMContainer 状态更新之后发送ContainerAcquired事件通知RMAppAttempt,

CONTAINER_ACQIRED Handle

当RMAppAttempt 收到该事件后,把该container 所属的node加放自己的runnodes set中去。
appAttempt.ranNodes.add(acquiredEvent.getContainer().getNodeId());

UNREGSITERD Handle

当任务执行完成之后,AM会向 ApplicationMasterService 注销自己,AppAttempt会收到unregsitered 事件通知,appatempt会执行一系列的清除工作,最后退出。

转载于:https://www.cnblogs.com/tnangle/archive/2013/04/28/3376704.html

HadoopSourceAnalyse---RMAppAttempt FSM相关推荐

  1. FSM之SMC使用总结

    FSM之SMC使用总结 Part1: Smc.jar state machine compiler usage Reference:      http://smc.sourceforge.net/ ...

  2. HDLBits 系列(36)Arbitration circuit implemented by FSM

    目录 原题复现 审题 我的设计 设计解释 原题复现 原题复现: Consider the FSM described by the state diagram shown below: This FS ...

  3. HDLBits 系列(33)Sequence Recognition with Mealy FSM

    目录 原题复现 状态转移图 我的设计 测试 原题复现 原题重现: Implement a Mealy-type finite state machine that recognizes the seq ...

  4. HDLBits 系列(27)孰对孰错 之 Fsm onehot?

    目录 前言 原题复现 审题 我的设计 测试吐槽 最后的解决方案 前言 今天的这个问题,并没有满意的解决,路过的朋友,看出问题所在的,可以给个评论,谢谢. 原题复现 Fsm onehot 下面是一个最基 ...

  5. FSM状态机之状态模式

     首先声明一点,这个模式是我目前见过最好用(本人观点),但是也是最难理解的一个(本人观点). 所以大家需要做好心理准备,如果,对这个模式没有特别强烈的需求,比如: 我有一个Button,我按次数点击它 ...

  6. 2.8 FSM之Moore和Mealy part1

    看过上几篇文章的那个例子,其实在学术上被分类Moore(摩尔,)形FSM.还有一种就叫做Mealy形FSM.如果对计算机理论感兴趣的话两类FSM的数学定义请参照Wiki这里就不拿出来了~ 大家可以看到 ...

  7. 用C语言实现有限状态自动机FSM

    摘要:状态机模式是一种行为模式,在<设计模式>这本书中对其有详细的描述,通过多态实现不同状态的调转行为的确是一种很好的方法,只可惜在嵌入式环境下,有时只能写纯C代码,并且还需要考虑代码的重 ...

  8. FSM(状态机)、HFSM(分层状态机)、BT(行为树)的区别

    游戏人工智能AI中最常听见的就是这三个词拉: FSM 这个不用说拉,百度一大堆解释, 简单将就是将游戏AI行为分为一个一个的状态,状态与状态之间的过渡通过事件的触发来形成. 比如士兵的行为有" ...

  9. SAP S/4HANA Service Management和SAP FSM基于CPI的集成场景介绍

    本文作者是我的同事,Song Hao(宋浩),SAP成都研究院S/4HANA Service Management的开发人员. 项目背景 相信大家已经知道,2018年6月份,SAP收购了一家专注于Fi ...

最新文章

  1. FastAI 2019课程学习笔记 lesson 2:自行获取数据并创建分类器
  2. JPTagView-多样化的标签View
  3. 还在用Logback?Log4j2的异步性能已经无敌了,还不快试试
  4. Springmvc ajax请求400
  5. 服务器控件开发——组合控件(5)
  6. Thinking in JAVA笔记——第三章 操作符 第四章控制执行流程
  7. JS 表单和表单元素
  8. 计网 - TCP 实战:如何进行 TCP 抓包调试?
  9. spring 事务原理_Spring声明式事务处理的实现原理,来自面试官的穷追拷问
  10. html判断安装没安装qq,QQ6.1体验版怎么用?腾讯QQ6.1体验版本安装步骤(无须申请体验账号)...
  11. 收集MySQL常用函数,值得收藏!
  12. Java 8 Lambda 表达式解析 1
  13. bzoj2426 [HAOI2010]工厂选址 读题+贪心
  14. bucket sort sample sort 并行_Java 中 Arrays.sort 和 Arrays.parallelSort 哪个更快?
  15. android: ListView设置emptyView 误区
  16. Javascript 通用Excel导出函数
  17. STL---vector的内存分配策略
  18. XML可扩展语言的发展
  19. 【python】文件的save和load:npy,npz,txt,csv,pkl,(持更)
  20. 脑电分析系列[MNE-Python-7]| Python读取.edf文件

热门文章

  1. [WPF]自定义鼠标指针
  2. Spring Cloud之Hystrix
  3. Codeforces Round #349 (Div. 1) A. Reberland Linguistics 动态规划
  4. leetcode处女作
  5. lib和dll文件的区别和联系(集合了几个博客的内容)
  6. 向sdcard中添加文件遇到的一些问题
  7. 相机原理updateTexImage
  8. anr trace文件分析
  9. Java Socket例子
  10. android 高德地图设置不能旋转_这个地图APP,专注于地图软件该做的事!