版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/51190842

前言

Hadoop2.x.x版本的底层实现中作了很多优化:用状态机对各种对象生命周期和状态转移进行管理;采用事件机制避免线程同步与阻塞;采用Protocol Buffers优化RPC性能;采用Apache Avro优化日志等。本文主要针对YARN中状态机的实现进行分析,在这个过程中,会捎带一些事件的内容。

事件

YARN中的很多组件之间进行通信,主要借助于事件。为了可读性、可维护性及可扩展性,YARN中的事件由事件名称和事件类型组成。比如JobImpl处理的事件名称为JobEvent,而事件类型为JobEventType。有关Hadoop2.6.0的事件分类与实现可以参考《Hadoop2.6.0的事件分类与实现》一文。

状态

YARN中的每个组件都有其自身所处的一系列状态,比如JobImpl内部的一系列状态都定义在JobStateInternal中,如代码清单1所示。

代码清单1

public enum JobStateInternal {NEW,SETUP,INITED,RUNNING,COMMITTING,SUCCEEDED,FAIL_WAIT,FAIL_ABORT,FAILED,KILL_WAIT,KILL_ABORT,KILLED,ERROR,REBOOT
}

我们看到JobImpl的内部状态包括新建(NEW)、初始化(INITED)、运行中(RUNNING)、提交中(COMMITTING)、成功(SUCCEEDED)、失败(FAILED)等。

转换(过渡)

我们已经了解了事件与状态的基本实现与概念,那么事件与状态有什么关系?从哲学角度讲,状态是一个事物的静止属性,而事件则是一个事物与外界沟通的桥梁,只有静止却没有变化,那么它只是一滩死水。事物只有在接收信息后动起来,才算与外界有了互动。一个事物动起来就会潜移默化的发生改变,它内部就会发生转换。一个对象当前处于状态state0,当对象接收到事件Event后,将引发转换动作transition,最终当前对象的状态过渡到state1,这个过程可以用图1来表示。

图1 状态迁移示例

YARN中与过渡相关的类图如图2所示。

图2 YARN中与过渡相关的类图

YARN中的各个组件的变化都离不开状态的过度与变化,于是对这种行为进行了抽象,这种转换分为两类:单弧过渡与多弧过渡。(这种翻译不知道是否准确,我认为从一个状态到另一个状态的转换发生时,就像是在两个状态之间划了一道弧线一样)

单弧过渡

YARN中单弧过渡的实现代码如代码清单2,它的作用是当有限状态机(FSM)中的状态转换为已经注册到状态机的某种状态时,伴随的行为。

代码清单2

@Public
@Evolving
public interface SingleArcTransition<OPERAND, EVENT> {/*** Transition hook.* * @param operand the entity attached to the FSM, whose internal *                state may change.* @param event causal event*/public void transition(OPERAND operand, EVENT event);}

由于SingleArcTransition的具体实现类只负责接收到事件后的具体操作或行为,并没有包含状态相关的信息,所以在状态机执行状态过渡时,并不是直接调用SingleArcTransition具体实现类的transition方法,而是由接口Transition定义(见代码清单3)真正的转态过渡(包括行为和状态改变)。

代码清单3

  private interface Transition<OPERAND, STATE extends Enum<STATE>,EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {STATE doTransition(OPERAND operand, STATE oldState,EVENT event, EVENTTYPE eventType);}

SingleInternalArc作为Transition接口的实现类,在代理SingleArcTransition的同时,负责状态变换,见代码清单4。

代码清单4

  private class SingleInternalArcimplements Transition<OPERAND, STATE, EVENTTYPE, EVENT> {private STATE postState;private SingleArcTransition<OPERAND, EVENT> hook; // transition hookSingleInternalArc(STATE postState,SingleArcTransition<OPERAND, EVENT> hook) {this.postState = postState;this.hook = hook;}@Overridepublic STATE doTransition(OPERAND operand, STATE oldState,EVENT event, EVENTTYPE eventType) {if (hook != null) {hook.transition(operand, event);}return postState;}}

多弧过渡

YARN中多弧过渡的实现代码如代码清单5,它的作用是当有限状态机(FSM)中的状态转换为已经注册到状态机的多个有效状态中的一个时,伴随的行为与操作。

代码清单5

@Public
@Evolving
public interface MultipleArcTransition<OPERAND, EVENT, STATE extends Enum<STATE>> {/*** Transition hook.* @return the postState. Post state must be one of the *                      valid post states registered in StateMachine.* @param operand the entity attached to the FSM, whose internal *                state may change.* @param event causal event*/public STATE transition(OPERAND operand, EVENT event);}

由于MultipleArcTransition的具体实现类只负责接收到事件后的具体操作或行为,并没有包含状态相关的信息,所以在状态机执行状态过渡时,并不是直接调用MultipleArcTransition具体实现类的transition方法,而是通过代理类MultipleInternalArc,见代码清单6。MultipleInternalArc也实现了Transition接口,并在代理MultipleArcTransition的转换行为的同时,负责状态变换。

代码清单6

  private class MultipleInternalArcimplements Transition<OPERAND, STATE, EVENTTYPE, EVENT>{// Fieldsprivate Set<STATE> validPostStates;private MultipleArcTransition<OPERAND, EVENT, STATE> hook;  // transition hookMultipleInternalArc(Set<STATE> postStates,MultipleArcTransition<OPERAND, EVENT, STATE> hook) {this.validPostStates = postStates;this.hook = hook;}@Overridepublic STATE doTransition(OPERAND operand, STATE oldState,EVENT event, EVENTTYPE eventType)throws InvalidStateTransitonException {STATE postState = hook.transition(operand, event);if (!validPostStates.contains(postState)) {throw new InvalidStateTransitonException(oldState, eventType);}return postState;}}

为了将所有状态机中的状态过渡与状态建立起映射关系,YARN中提供了ApplicableTransition接口用于将SingleInternalArc和MultipleInternalArc添加到状态机的拓扑表中,提高在检索状态对应的过渡实现时的性能,ApplicableTransition的实现类为ApplicableSingleOrMultipleTransition类,其apply方法用于代理SingleInternalArc和MultipleInternalArc,将它们添加到状态拓扑表中。ApplicableTransition接口的定义见代码清单7,ApplicableSingleOrMultipleTransition的实现见代码清单8。

代码清单7

  private interface ApplicableTransition<OPERAND, STATE extends Enum<STATE>,EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {void apply(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject);}

代码清单8

  static private class ApplicableSingleOrMultipleTransition<OPERAND, STATE extends Enum<STATE>,EVENTTYPE extends Enum<EVENTTYPE>, EVENT>implements ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> {final STATE preState;final EVENTTYPE eventType;final Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition;ApplicableSingleOrMultipleTransition(STATE preState, EVENTTYPE eventType,Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition) {this.preState = preState;this.eventType = eventType;this.transition = transition;}@Overridepublic void apply(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject) {Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap= subject.stateMachineTable.get(preState);if (transitionMap == null) {// I use HashMap here because I would expect most EVENTTYPE's to not//  apply out of a particular state, so FSM sizes would be //  quadratic if I use EnumMap's here as I do at the top level.transitionMap = new HashMap<EVENTTYPE,Transition<OPERAND, STATE, EVENTTYPE, EVENT>>();subject.stateMachineTable.put(preState, transitionMap);}transitionMap.put(eventType, transition);}}

可以看到ApplicableSingleOrMultipleTransition的apply方法就是为构建状态拓扑表而开发的。

状态机

YARN中状态机的实现类是StateMachineFactory,它主要包含4个属性信息:

  • transitionsListNode:过渡列表节点。根据其名字不太容易理解,我这里说得简单点,就是将状态机的一个个过渡的ApplicableTransition实现串联为一个列表,每个节点包含一个ApplicableTransition实现及指向下一个节点的引用,其实现见代码清单9所示。

代码清单9

  private class TransitionsListNode {final ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition;final TransitionsListNode next;TransitionsListNode(ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition,TransitionsListNode next) {this.transition = transition;this.next = next;}}

transitionsListNode形成的过渡列表节点可以用图3表示。

图3 transitionsListNode过渡链表结构

  • stateMachineTable:状态拓扑表,为了提高检索状态对应的过渡map而冗余的数据结构,此结构在optimized为真时,通过对transitionsListNode链表进行处理产生。stateMachineTable的结构可以用图4来表示。

图4 状态拓扑表数据结构

  • defaultInitialState:对象创建时,内部有限状态机的默认初始状态。比如:JobImpl的内部状态机默认初始状态是JobStateInternal.NEW。
  • optimized:布尔类型,用于标记当前状态机是否需要优化性能,即构建状态拓扑表stateMachineTable。

共有构造器

StateMachineFactory的公有构造器只有一个,其实现见代码清单10。

代码清单10

  public StateMachineFactory(STATE defaultInitialState) {this.transitionsListNode = null;this.defaultInitialState = defaultInitialState;this.optimized = false;this.stateMachineTable = null;}

可见新建的StateMachineFactory实例只有一个默认初始状态参数defaultInitialState。

私有构造器

StateMachineFactory的私有构造器有两个,其中代码清单11中的构造器在addTransition方法中使用。从其实现看出,此构造器的主要作用是构建transitionsListNode链表。

代码清单11

  private StateMachineFactory(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that,ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> t) {this.defaultInitialState = that.defaultInitialState;this.transitionsListNode = new TransitionsListNode(t, that.transitionsListNode);this.optimized = false;this.stateMachineTable = null;}

而代码清单12中的构造器则在installTopology方法中使用。

代码清单12

  private StateMachineFactory(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that,boolean optimized) {this.defaultInitialState = that.defaultInitialState;this.transitionsListNode = that.transitionsListNode;this.optimized = optimized;if (optimized) {makeStateMachineTable();} else {stateMachineTable = null;}}

代码清单12中的构造器当optimized参数为true时,调用了makeStateMachineTable方法,makeStateMachineTable的实现见代码清单13所示。

代码清单13

  private void makeStateMachineTable() {Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>> stack =new Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>>();Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>prototype = new HashMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>();prototype.put(defaultInitialState, null);// I use EnumMap here because it'll be faster and denser.  I would//  expect most of the states to have at least one transition.stateMachineTable= new EnumMap<STATE, Map<EVENTTYPE,Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>(prototype);for (TransitionsListNode cursor = transitionsListNode;cursor != null;cursor = cursor.next) {stack.push(cursor.transition);}while (!stack.isEmpty()) {stack.pop().apply(this);}}

通过阅读makeStateMachineTable的实现,不难看出其作用:

  1. 创建堆栈stack,用于将transitionsListNode链表中各个节点持有的ApplicableSingleOrMultipleTransition压入栈中;
  2. 创建状态拓扑表stateMachineTable,并在此拓扑表中插入一个额外的默认初始状态defaultInitialState与null的映射;
  3. 迭代访问transitionsListNode链表,并将各个节点持有的ApplicableSingleOrMultipleTransition压入栈中;
  4. 依次弹出栈顶的ApplicableSingleOrMultipleTransition,并应用其apply方法(已在前面小节介绍),持续不断的构建状态拓扑表stateMachineTable。

至此,关于YARN状态机的基本概念和接口叙述完毕。下面分析状态机构建过程。

状态机构建

为了简化叙述,本节以JobImpl中状态机的构建为例。由于JobImpl的状态机预设的(调用addTransition方法)加入的ApplicableSingleOrMultipleTransition非常多,我们节选其中的2个作为典型进行分析。最后还会分析installTopology方法的实现。JobImpl中状态机的定义见代码清单14。

代码清单14

  protected static finalStateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> stateMachineFactory= new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>(JobStateInternal.NEW)// Transitions from NEW state.addTransition(JobStateInternal.NEW, JobStateInternal.NEW,JobEventType.JOB_DIAGNOSTIC_UPDATE,DIAGNOSTIC_UPDATE_TRANSITION).addTransition(JobStateInternal.NEW, JobStateInternal.NEW,JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION).addTransition(JobStateInternal.NEW,EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),JobEventType.JOB_INIT,new InitTransition())// 省略其它addTransition调用// create the topology tables.installTopology();

构建JobImpl的状态机的步骤如下:

  1. 调用StateMachineFactory构造器创建一个初始的状态机;
  2. 调用addTransition(STATE preState, STATE postState, EVENTTYPE eventType, SingleArcTransition<OPERAND, EVENT> hook)方法添加单弧过渡。从其实现(见代码清单15)可以知道addTransition方法将SingleArcTransition封装为SingleInternalArc,然后将SingleInternalArc封装为ApplicableSingleOrMultipleTransition,最后调用之前说的第一个私有构造器构建transitionsListNode链表;
  3. 调用addTransition(STATE preState, Set<STATE> postStates, EVENTTYPE eventType, MultipleArcTransition<OPERAND, EVENT, STATE> hook)方法添加多弧过渡。从其实现(见代码清单16)可以知道addTransition方法将MultipleArcTransition封装为MultipleInternalArc,然后将MultipleInternalArc封装为ApplicableSingleOrMultipleTransition,最后调用之前说的第一个私有构造器构建transitionsListNode链表;
  4. 最后调用installTopology方法,其实现见代码清单17。installTopology正是在使用之前说的第二个私有构造器构建状态拓扑表stateMachineTable;
代码清单15
  public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>addTransition(STATE preState, STATE postState,EVENTTYPE eventType,SingleArcTransition<OPERAND, EVENT> hook){return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>(this, new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>(preState, eventType, new SingleInternalArc(postState, hook)));}

代码清单16

  public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>addTransition(STATE preState, Set<STATE> postStates,EVENTTYPE eventType,MultipleArcTransition<OPERAND, EVENT, STATE> hook){return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>(this,new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>(preState, eventType, new MultipleInternalArc(postStates, hook)));}
代码清单17

  public StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>installTopology() {return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>(this, true);}

再来看看代码清单14中列出的DIAGNOSTIC_UPDATE_TRANSITION,其实现如下。

  private static final DiagnosticsUpdateTransitionDIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
DiagnosticsUpdateTransition的代码实现如下,可见其类型的确是SingleArcTransition。COUNTER_UPDATE_TRANSITION也是类似,故不再赘述。
  private static class DiagnosticsUpdateTransition implementsSingleArcTransition<JobImpl, JobEvent> {@Overridepublic void transition(JobImpl job, JobEvent event) {job.addDiagnostic(((JobDiagnosticsUpdateEvent) event).getDiagnosticUpdate());}}

代码清单14中的InitTransition,其实现如下。具体逻辑此处就不必详述了,有兴趣的同学可以继续进行分析。

  public static class InitTransition implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {@Overridepublic JobStateInternal transition(JobImpl job, JobEvent event) {// 省略具体逻辑}}}

状态转移

StateMachineFactory状态转换的代码如下。

  private STATE doTransition(OPERAND operand, STATE oldState, EVENTTYPE eventType, EVENT event)throws InvalidStateTransitonException {// We can assume that stateMachineTable is non-null because we call//  maybeMakeStateMachineTable() when we build an InnerStateMachine ,//  and this code only gets called from inside a working InnerStateMachine .Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap= stateMachineTable.get(oldState);if (transitionMap != null) {Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition= transitionMap.get(eventType);if (transition != null) {return transition.doTransition(operand, oldState, event, eventType);}}throw new InvalidStateTransitonException(oldState, eventType);}

通过阅读其实现,doTransition方法的执行步骤如下:

  1. 根据组件(例如JobImpl)当前状态(oldState)从状态拓扑表stateMachineTable中获取oldState对应的Transition映射表;
  2. 如果oldState对应的Transition映射表不为null,则根据事件类型EVENTTYPE从映射表中获取对应的Transition;
  3. 如果存在对应的Transition,那么调用其doTransition方法进行真正的转态转移(过渡)。

后记:个人总结整理的《深入理解Spark:核心思想与源码分析》一书现在已经正式出版上市,目前京东、当当、天猫等网站均有销售,欢迎感兴趣的同学购买。

京东:http://item.jd.com/11846120.html

当当:http://product.dangdang.com/23838168.html

Hadoop2.6.0中YARN底层状态机实现分析相关推荐

  1. hadoop之 Hadoop2.2.0中HDFS的高可用性实现原理

    在Hadoop2.0.0之前,NameNode(NN)在HDFS集群中存在单点故障(single point of failure),每一个集群中存在一个NameNode,如果NN所在的机器出现了故障 ...

  2. Hadoop-2.2.0中文文档——MapReduce 下一代 -——集群配置

    目的 这份文档描写叙述了怎样安装.配置和管理从几个节点到有数千个节点的Hadoop集群. 玩的话,你可能想先在单机上安装.(看单节点配置). 准备 从Apache镜像上下载一个Hadoop的稳定版本号 ...

  3. Hadoop2.2.0 中错误总结之(org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /test._COPYING)

    错误: [root@xiajie01 sbin]# hadoop fs -put /root/20131210110122880.doc  hdfs://192.168.30.169:9000/tes ...

  4. Hadoop2.6.0运行mapreduce之Uber模式验证

    前言 在有些情况下,运行于Hadoop集群上的一些mapreduce作业本身的数据量并不是很大,如果此时的任务分片很多,那么为每个map任务或者reduce任务频繁创建Container,势必会增加H ...

  5. 搭建hadoop2.6.0 HA及YARN HA

    以前用hadoop2.2.0只搭建了hadoop的高可用,但在hadoop2.2.0中始终没有完成YARN HA的搭建,直接下载了hadoop最新稳定版本2.6.0完成了YARN HA及HADOOP ...

  6. (转载)hadoop2.2.0集群的HA高可靠的最简单配置

    转载网址:http://www.cnblogs.com/meiyuanbao/p/hadoop2.html 简介 hadoop中的NameNode好比是人的心脏,非常重要,绝对不可以停止工作.在had ...

  7. 国内最全最详细的hadoop2.2.0集群的HA高可靠的最简单配置

    简介 hadoop中的NameNode好比是人的心脏,非常重要,绝对不可以停止工作.在hadoop1时代,只有一个NameNode.如果该NameNode数据丢失或者不能工作,那么整个集群就不能恢复了 ...

  8. eclipse中hadoop2.3.0环境部署及在eclipse中直接提交mapreduce任务

    转自:http://my.oschina.net/mkh/blog/340112 1 eclipse中hadoop环境部署概览 eclipse中部署hadoop包括两大部分:hdfs环境部署和mapr ...

  9. Eclipse中部署hadoop2.3.0

    1 eclipse中hadoop环境部署概览 eclipse 中部署hadoop包括两大部分:hdfs环境部署和mapreduce任务执行环境部署.一般hdfs环境部署比较简单,部署后就 可以在ecl ...

最新文章

  1. xmanager破解待验证
  2. 人工智能写的散文之白色月光下
  3. SpringMvc Intercetor
  4. log4j和web.xml配置webAppRootKey 的问题(一个tomcat下部署多个应用)
  5. Java网络编程-简易聊天室源码分享
  6. 券商pb系统量化交易接口代码
  7. 易到要在网约车市场突围并不容易
  8. r7525服务器电源管理系统,戴尔 R7525评估:EPYC内核密度让英特尔心碎
  9. 命主属性是水什么意思_​五行中,你属什么就是什么样的人!太准了~
  10. win10 磁盘管理 压缩卷 无法启动问题
  11. 计算机网络术语中rt是什么意思?今天就来给你解答
  12. gcc的ar工具及as汇编编译器入门练习及curses库
  13. java 创建word文件_Java 创建Word
  14. 山东大学-飞桨人工智能教育创新中心正式挂牌,打造区域产教融合新范式
  15. #FreeFortnite的卑鄙本质
  16. 华为鸿蒙os英语,华为正式发布HarmonyOS鸿蒙操作系统
  17. ipad pro如何作为windows、mac pro和手机的低时延的写字板(低于1秒)
  18. 1、第一次亲密接触Linux
  19. C语言square的用法,square的用法总结大全
  20. COM劫持 BypassUAC

热门文章

  1. eclipse spring mysql_为Eclipse上的Maven project添加SpringMVC和Mybatis以实现数据库
  2. mysql叶子结点存储的什么_B+树叶子结点到底存储了什么?
  3. mysql数据库主从同步的原理_mysql数据库主从同步复制原理
  4. php mysql 一级分类_无限级分类 for PHP+Mysql
  5. kitti数据集_神秘的Waymo一反常态,CVPR现场发布大型自动驾驶数据集
  6. mysql实用工教程_MYSQL实用教程
  7. 模拟人生畅玩版android,模拟人生畅玩版手机版
  8. statistic在c语言中的作用,模型评价除了C-statistic,还能用什么指标?
  9. 高数学习笔记:计算方向导数
  10. Spring Boot基础学习笔记13:路径扫描整合Servlet三大组件