许鹏:从零开始学习,Apache Spark源码走读(三)

发表于2014-06-18 18:30| 17149次阅读| 来源个人博客| 24 条评论| 作者许鹏

大数据Spark开源技术博客推荐
height="16" src="http://hits.sinajs.cn/A1/weiboshare.html?url=http%3A%2F%2Fwww.csdn.net%2Farticle%2F2014-06-18%2F2820283%2F2&type=3&count=&appkey=&title=%E8%87%AA2013%E5%B9%B46%E6%9C%88%E8%BF%9B%E5%85%A5Apache%E5%AD%B5%E5%8C%96%E5%99%A8%EF%BC%8CSpark%E5%B7%B2%E7%BB%8F%E6%9C%89%E6%9D%A5%E8%87%AA25%E4%B8%AA%E7%BB%84%E7%BB%87%E7%9A%84120%E5%A4%9A%E4%BD%8D%E5%BC%80%E5%8F%91%E8%80%85%E5%8F%82%E4%B8%8E%E8%B4%A1%E7%8C%AE%E3%80%82%E8%80%8C%E5%9C%A8%E4%B8%8D%E4%B9%85%E5%89%8D%EF%BC%8C%E6%9B%B4%E6%88%90%E4%B8%BA%E4%BA%86Apache%E8%BD%AF%E4%BB%B6%E5%9F%BA%E9%87%91%E4%BC%9A%E7%9A%84%E9%A1%B6%E7%BA%A7%E9%A1%B9%E7%9B%AE%EF%BC%8C%E5%BD%93%E4%B8%8B%E5%B7%B2%E6%98%AF%E7%9F%A5%E5%90%8DHadoop%E5%BC%80%E5%8F%91%E5%95%86Cloudera%E5%92%8CMapR%E7%9A%84%E6%96%B0%E5%AE%A0%E3%80%82&pic=&ralateUid=&language=zh_cn&rnd=1443771744781" frameborder="0" width="22" allowtransparency="" scrolling="no" _xhe_src="http://hits.sinajs.cn/A1/weiboshare.html?url=http%3A%2F%2Fwww.csdn.net%2Farticle%2F2014-06-18%2F2820283%2F2&type=3&count=&appkey=&title=%E8%87%AA2013%E5%B9%B46%E6%9C%88%E8%BF%9B%E5%85%A5Apache%E5%AD%B5%E5%8C%96%E5%99%A8%EF%BC%8CSpark%E5%B7%B2%E7%BB%8F%E6%9C%89%E6%9D%A5%E8%87%AA25%E4%B8%AA%E7%BB%84%E7%BB%87%E7%9A%84120%E5%A4%9A%E4%BD%8D%E5%BC%80%E5%8F%91%E8%80%85%E5%8F%82%E4%B8%8E%E8%B4%A1%E7%8C%AE%E3%80%82%E8%80%8C%E5%9C%A8%E4%B8%8D%E4%B9%85%E5%89%8D%EF%BC%8C%E6%9B%B4%E6%88%90%E4%B8%BA%E4%BA%86Apache%E8%BD%AF%E4%BB%B6%E5%9F%BA%E9%87%91%E4%BC%9A%E7%9A%84%E9%A1%B6%E7%BA%A7%E9%A1%B9%E7%9B%AE%EF%BC%8C%E5%BD%93%E4%B8%8B%E5%B7%B2%E6%98%AF%E7%9F%A5%E5%90%8DHadoop%E5%BC%80%E5%8F%91%E5%95%86Cloudera%E5%92%8CMapR%E7%9A%84%E6%96%B0%E5%AE%A0%E3%80%82&pic=&ralateUid=&language=zh_cn&rnd=1443771744781">摘要:自2013年6月进入Apache孵化器,Spark已经有来自25个组织的120多位开发者参与贡献。而在不久前,更成为了Apache软件基金会的顶级项目,当下已是知名Hadoop开发商Cloudera和MapR的新宠。

Standalone部署模式下的容错性分析

概要

本文就standalone部署方式下的容错性问题做比较细致的分析,主要回答standalone部署方式下的包含哪些主要节点,当某一类节点出现问题时,系统是如何处理的。

Standalone部署的节点组成

介绍Spark的资料中对于RDD这个概念涉及的比较多,但对于RDD如何运行起来,如何对应到进程和线程的,着墨的不是很多。

在实际的生产环境中,Spark总是会以集群的方式进行运行的,其中standalone的部署方式是所有集群方式中最为精简的一种,另外是Mesos和YARN,要理解其内部运行机理,显然要花更多的时间才能了解清楚。

standalone cluster的组成

standalone集群由三个不同级别的节点组成,分别是

  • Master 主控节点,可以类比为董事长或总舵主,在整个集群之中,最多只有一个Master处在Active状态
  • Worker 工作节点 ,这个是manager,是分舵主, 在整个集群中,可以有多个worker,如果worker为零,什么事也做不了
  • Executor 干苦力活的,直接受worker掌控,一个worker可以启动多个executor,启动的个数受限于机器中的cpu核数

这三种不同类型的节点各自运行于自己的JVM进程之中。

Driver Application

提交到standalone集群的应用程序称之为Driver Applicaton。

Standalone集群启动及任务提交过程详解

上图总结了正常情况下Standalone集群的启动以及应用提交时,各节点之间有哪些消息交互。下面分集群启动和应用提交两个过程来作详细说明。

集群启动过程

正常启动过程如下所述

step 1: 启动master

[js] view plaincopyprint?
  1. $SPARK_HOME/sbin/start-master.sh
$SPARK_HOME/sbin/start-master.sh

step 2: 启动worker

[js] view plaincopyprint?
  1. ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077

worker启动之后,会做两件事情

  • 将自己注册到Master、RegisterWorker
  • 定期发送心跳消息给Master

任务提交过程

step 1: 提交application

利用如下指令来启动spark-shell

[js] view plaincopyprint?
  1. MASTER=spark://127.0.0.1:7077 $SPARK_HOME/bin/spark-shell
MASTER=spark://127.0.0.1:7077 $SPARK_HOME/bin/spark-shell

运行spark-shell时,会向Master发送RegisterApplication请求

日志位置: master运行产生的日志在$SPARK_HOME/logs目录下

step 2: Master处理RegisterApplication的请求之后

收到RegisterApplication请求之后,Mastet会做如下处理

  1. 如果有worker已经注册上来,发送LaunchExecutor指令给相应worker
  2. 如果没有,则什么事也不做

step 3: 启动Executor

Worker在收到LaunchExecutor指令之后,会启动Executor进程

step 4: 注册Executor

启动的Executor进程会根据启动时的入参,将自己注册到Driver中的SchedulerBackend

日志位置:executor的运行日志在$SPARK_HOME/work目录下

step 5: 运行Task

SchedulerBackend收到Executor的注册消息之后,会将提交到的Spark Job分解为多个具体的Task,然后通过LaunchTask指令将这些Task分散到各个Executor上真正的运行

如果在调用runJob的时候,没有任何的Executor注册到SchedulerBackend,相应的处理逻辑是什么呢?

  1. SchedulerBackend会将Task存储在TaskManager中
  2. 一旦有Executor注册上来,就将TaskManager管理的尚未运行的task提交到executor中
  3. 如果有多个job处于pending状态,默认调度策略是FIFO,即先提交的先运行

测试步骤

  • 启动Master
  • 启动spark-shell
  • 执行 sc.textFile("README.md").count
  • 启动worker
  • 注意worker启动之后,spark-shell中打印出来的日志消息

Job执行结束

任务运行结束时,会将相应的Executor停掉。

可以做如下的试验

  1. 停止spark-shell
  2. 利用ps -ef|grep -i java查看java进程,可以发现CoarseGrainedExecutorBackend进程已经退出

小结

通过上面的控制消息原语之间的先后顺序可以看出

  • Master和worker进程必须显式启动
  • executor是被worker隐式的带起
  • 集群的启动顺序
  1. Master必须先于其它节点启动
  2. worker和driver哪个先启动,无所谓
  3. 但driver提交的job只有在有相应的worker注册到Master之后才可以被真正的执行

异常场景分析

上面说明的是正常情况下,各节点的消息分发细节。那么如果在运行中,集群中的某些节点出现了问题,整个集群是否还能够正常处理Application中的任务呢?

异常分析1: worker异常退出

在Spark运行过程中,经常碰到的问题就是worker异常退出,当worker退出时,整个集群会有哪些故事发生呢?请看下面的具体描述:

  1. worker异常退出,比如说有意识的通过kill指令将worker杀死
  2. worker在退出之前,会将自己所管控的所有小弟executor全干掉
  3. worker需要定期向master改善心跳消息的,现在worker进程都已经玩完了,哪有心跳消息,所以Master会在超时处理中意识到有一个“分舵”离开了
  4. Master非常伤心,伤心的Master将情况汇报给了相应的Driver
  5. Driver通过两方面确认分配给自己的Executor不幸离开了,一是Master发送过来的通知,二是Driver没有在规定时间内收到Executor的StatusUpdate,于是Driver会将注册的Executor移除

后果分析

worker异常退出会带来哪些影响

  1. executor退出导致提交的task无法正常结束,会被再一次提交运行
  2. 如果所有的worker都异常退出,则整个集群不可用
  3. 需要有相应的程序来重启worker进程,比如使用supervisord或runit

测试步骤

  1. 启动Master
  2. 启动worker
  3. 启动spark-shell
  4. 手工kill掉worker进程
  5. 用jps或ps -ef|grep -i java来查看启动着的java进程

异常退出的代码处理

定义于ExecutorRunner.scala的start函数

[js] view plaincopyprint?
  1. def start() {
  2. workerThread = new Thread("ExecutorRunner for " + fullId) {
  3. override def run() { fetchAndRunExecutor() }
  4. }
  5. workerThread.start()
  6. // Shutdown hook that kills actors on shutdown.
  7. shutdownHook = new Thread() {
  8. override def run() {
  9. killProcess(Some("Worker shutting down"))
  10. }
  11. }
  12. Runtime.getRuntime.addShutdownHook(shutdownHook)
  13. }
def start() {
workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() }
}
workerThread.start()
// Shutdown hook that kills actors on shutdown.
shutdownHook = new Thread() {
override def run() {
killProcess(Some("Worker shutting down"))
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
}

killProcess的过程就是停止相应CoarseGrainedExecutorBackend的过程。

worker停止的时候,一定要先将自己启动的Executor停止掉。这是不是很像水浒中宋江的手段,李逵就是这样不明不白的把命给丢了。

小结

需要特别指出的是,当worker在启动Executor的时候,是通过ExecutorRunner来完成的,ExecutorRunner是一个独立的线程,和Executor是一对一的关系,这很重要。Executor作为一个独立的进程在运行,但会受到ExecutorRunner的严密监控。

异常分析2: executor异常退出

Executor作为Standalone集群部署方式下的最底层员工,一旦异常退出,其后果会是什么呢?

  1. executor异常退出,ExecutorRunner注意到异常,将情况通过ExecutorStateChanged汇报给Master
  2. Master收到通知之后,非常不高兴,尽然有小弟要跑路,那还了得,要求Executor所属的worker再次启动
  3. Worker收到LaunchExecutor指令,再次启动executor

作为一名底层员工,想轻易摞挑子不干是不成的。"人在江湖,身不由己“啊。

测试步骤

  • 启动Master
  • 启动Worker
  • 启动spark-shell
  • 手工kill掉CoarseGrainedExecutorBackend

fetchAndRunExecutor

fetchAndRunExecutor负责启动具体的Executor,并监控其运行状态,具体代码逻辑如下所示

[js] view plaincopyprint?
  1. def fetchAndRunExecutor() {
  2. try {
  3. // Create the executor's working directory
  4. val executorDir = new File(workDir, appId + "/" + execId)
  5. if (!executorDir.mkdirs()) {
  6. throw new IOException("Failed to create directory " + executorDir)
  7. }
  8. // Launch the process
  9. val command = getCommandSeq
  10. logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
  11. val builder = new ProcessBuilder(command: _*).directory(executorDir)
  12. val env = builder.environment()
  13. for ((key, value)  {
  14. logInfo("Runner thread for executor " + fullId + " interrupted")
  15. state = ExecutorState.KILLED
  16. killProcess(None)
  17. }
  18. case e: Exception => {
  19. logError("Error running executor", e)
  20. state = ExecutorState.FAILED
  21. killProcess(Some(e.toString))
  22. }
  23. }
  24. }
def fetchAndRunExecutor() {
try {
// Create the executor's working directory
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
// Launch the process
val command = getCommandSeq
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
val builder = new ProcessBuilder(command: _*).directory(executorDir)
val env = builder.environment()
for ((key, value)  {
logInfo("Runner thread for executor " + fullId + " interrupted")
state = ExecutorState.KILLED
killProcess(None)
}
case e: Exception => {
logError("Error running executor", e)
state = ExecutorState.FAILED
killProcess(Some(e.toString))
}
}
}

异常分析3: master 异常退出

worker和executor异常退出的场景都讲到了,我们剩下最后一种情况了,master挂掉了怎么办?

带头大哥如果不在了,会是什么后果呢?

  • worker没有汇报的对象了,也就是如果executor再次跑飞,worker是不会将executor启动起来的,大哥没给指令
  • 无法向集群提交新的任务
  • 老的任务即便结束了,占用的资源也无法清除,因为资源清除的指令是Master发出的

怎么样,知道后果很严重了吧?别看老大平时不干活,要真的不在,仅凭小弟们是不行的。

Master单点失效问题的解决

那么怎么解决Master单点失效的问题呢?

你说再加一个Master就是了,两个老大。两个老大如果同时具有指挥权,结果也将是灾难性的。设立一个副职人员,当目前的正职挂掉之后,副职接管。也就是同一时刻,有且只有一个active master。

注意不错,如何实现呢?使用zookeeper的ElectLeader功能,效果图如下

配置细节

如何搭建zookeeper集群,这里不再废话,哪天有空的话再整一整,或者可以参考写的storm系列中谈到的zookeeper的集群安装步骤。

假设zookeeper集群已经设置成功,那么如何启动standalone集群中的节点呢?有哪些特别的地方?

conf/spark-env.sh

在conf/spark-env.sh中,为SPARK_DAEMON_JAVA_OPTS添加如下选项

System property Meaning
spark.deploy.recoveryMode Set to ZOOKEEPER to enable standby Master recovery mode (default: NONE).
spark.deploy.zookeeper.url The ZooKeeper cluster url (e.g., 192.168.1.100:2181,192.168.1.101:2181).
spark.deploy.zookeeper.dir The directory in ZooKeeper to store recovery state (default: /spark).

设置SPARK_DAEMON_JAVA_OPTS的实际例子

[js] view plaincopyprint?
  1. SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.deploy.recoveryMode=ZOOKEEPER"
SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.deploy.recoveryMode=ZOOKEEPER"

应用程序启动

应用程序运行的时候,指定多个master地址,用逗号分开,如下所示

[js] view plaincopyprint?
  1. MASTER=spark://192.168.100.101:7077,spark://192.168.100.102:7077 bin/spark-shell
MASTER=spark://192.168.100.101:7077,spark://192.168.100.102:7077 bin/spark-shell

小结

Standalone集群部署方式下的容错性分析让我们对于Spark的任务分发过程又有了进一处的认识。前面的篇章从整体上匆匆过了一遍Spark所涉及的知识点,分析的不够深,不够细。

此篇尝试着就某一具体问题做深入的分析。套用书画中的说法,在框架分析的时候,我们可以”大开大合,疏可走马,计白当黑“,在细节分析的时候,又要做到“密不透风,条分缕析,层层递进”。

相关链接

Apache Spark源码走读之14 -- Graphx实现剖析

Apache Spark源码走读之15 -- Standalone部署模式下的容错性分析(责编/仲浩)

许鹏:从零开始学习,Apache Spark源码走读(三)相关推荐

  1. Apache Spark源码走读之16 -- spark repl实现详解

    欢迎转载,转载请注明出处,徽沪一郎. 概要 之所以对spark shell的内部实现产生兴趣全部缘于好奇代码的编译加载过程,scala是需要编译才能执行的语言,但提供的scala repl可以实现代码 ...

  2. Apache Spark源码走读(九)如何进行代码跟读使用Intellij idea调试Spark源码

    <一>如何进行代码跟读 概要 今天不谈Spark中什么复杂的技术实现,只稍为聊聊如何进行代码跟读.众所周知,Spark使用scala进行开发,由于scala有众多的语法糖,很多时候代码跟着 ...

  3. Apache Spark源码走读之6 -- 存储子系统分析

    Spark计算速度远胜于Hadoop的原因之一就在于中间结果是缓存在内存而不是直接写入到disk,本文尝试分析Spark中存储子系统的构成,并以数据写入和数据读取为例,讲述清楚存储子系统中各部件的交互 ...

  4. Apache Spark源码走读之8 -- Spark on Yarn

    欢迎转载,转载请注明出处,徽沪一郎. 概要 Hadoop2中的Yarn是一个分布式计算资源的管理平台,由于其有极好的模型抽象,非常有可能成为分布式计算资源管理的事实标准.其主要职责将是分布式计算集群的 ...

  5. Apache Spark源码走读之3 -- Task运行期之函数调用关系分析

    概要 本篇主要阐述在TaskRunner中执行的task其业务逻辑是如何被调用到的,另外试图讲清楚运行着的task其输入的数据从哪获取,处理的结果返回到哪里,如何返回. 准备 spark已经安装完毕 ...

  6. Apache Spark源码走读之22 -- 浅谈mllib中线性回归的算法实现

    欢迎转载,转载请注明出处,徽沪一郎. 概要 本文简要描述线性回归算法在Spark MLLib中的具体实现,涉及线性回归算法本身及线性回归并行处理的理论基础,然后对代码实现部分进行走读. 线性回归模型 ...

  7. Apache Spark源码走读之4 -- DStream实时流数据处理

    欢迎转载,转载请注明出处,徽沪一郎. Spark Streaming能够对流数据进行近乎实时的速度进行数据处理.采用了不同于一般的流式数据处理模型,该模型使得Spark Streaming有非常高的处 ...

  8. sparkcore分区_Spark学习:Spark源码和调优简介 Spark Core (二)

    本文基于 Spark 2.4.4 版本的源码,试图分析其 Core 模块的部分实现原理,其中如有错误,请指正.为了简化论述,将部分细节放到了源码中作为注释,因此正文中是主要内容. 第一部分内容见: S ...

  9. 编译 Apache Spark 源码报错?那是因为你漏掉了关键操作

    前言 本文隶属于专栏<1000个问题搞定大数据技术体系>,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢! 本专栏目录结构和参考文献请见1000个问题搞定大数据技 ...

  10. Spark源码走读概述

    Spark代码量 --Spark:20000loc --Hadoop 1.0:90000loc --Hadoop 2.0:220000loc Spark生态系统代码量 Spark生态系统 概述 --构 ...

最新文章

  1. 小白成长之路_LeetCode刷题记录
  2. 讲讲排序(C++描述 )
  3. java分页查询oracle_Java中实现Oracle分页查询
  4. Median String
  5. mysql添加索引造成的影响
  6. 安徽计算机应用基础高考试题,安徽省对口高考试题(计算机应用基础部分)
  7. BZOJ #3064. Tyvj 1518 CPU监控(线段树,历史最值)
  8. dedecms代码研究六
  9. enspar启动失败40_法式长棍面包,在家自己做,简单零失败,低糖无油不担心长胖...
  10. Python机器学习(sklearn)——分类模型评估与调参总结(下)
  11. 用汇编的眼光看C++(之class构造、析构)
  12. oracle转mysql总结,原理+实战+视频+源码
  13. 《机器人学导论》Matlab计算坐标系变换矩阵
  14. 前端人员必看css命名,前端css命名规范
  15. 汇总!零基础到进阶Graphpad Prism完整指南!教程全方位汇总!
  16. BZOJ 1069 最大土地面积(旋转卡壳求最大四边形)
  17. list(map(tokenizer.tokenize, text))
  18. Python print() 函数,在同一行打印
  19. 免费的网课API接口附加题库
  20. 十五万左右纯电SUV怎么选?奇瑞大蚂蚁是真香

热门文章

  1. u盘重置后计算机不显示了,u盘在电脑上不显示了如何恢复
  2. kvm使用virsh iface-bridge ens33 br0命令建立桥接网卡br0报错error:Failed to start bridge interface br0
  3. Nginx 配置域名
  4. nali工具解析ip来源
  5. 太强了,这款开源终端工具可查询 IP 信息 ...
  6. 恭喜这2所高校,喜提“电子土豆大学”“四川土豆大学”称号
  7. windows权限提升——烂土豆+dll劫持+引号路径+服务权限
  8. python特殊回文数
  9. 4.5 Frank 口语习语前7
  10. 鸿蒙系统视频美颜,BeautyCam美颜相机