Master作为集群的Manager,对于集群的健壮运行发挥着十分重要的作用。下面,我们一起了解一下Master是听从Client(Leader)的号召,如何管理好Worker的吧。

1.家当(静态属性)

1.设置一个守护单线程的消息发送器,
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
2.根据sparkConf得到hadoopConf
private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
3.一个bool类型的标识,如果设置为true,那么app的执行将会尽量分步到尽可能多的worker上,否则app的执行将会先用完一个worker的资源,然后再使用下一个worker的资源
private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
4.设置执行app默认的最大核数为Int类型的最大值
private val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
5.还有一些关于worker、driver、app等的字段信息,都比较简单,限于篇幅限制就不一一列出了

2.技能(方法)

由于Master上本质上是一个RpcEndpoint,所以我们按照它的生命周期进行介绍。如果不明白,请看文章

Spark Rpc通信源码分析 http://www.cnblogs.com/yourarebest/p/5297157.html

1.构造函数就是Master默认的主构造器
2.onStart方法,主要功能是启动Jetty的WebUI服务,Rest服务、选出持久化引擎及持久化代理

override def onStart(): Unit = {
logInfo("Starting Spark master at " + masterUrl)
logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
webUi = new MasterWebUI(this, webUiPort)
//启动JettyServer并绑定webUI端口号
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
//forwardMessageThread线程每1min中检查Worker是否宕了
checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
self.send(CheckForWorkerTimeOut)
}
}, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
//启动Rest服务,默认端口6066
if (restServerEnabled) {
val port = conf.getInt("spark.master.rest.port", 6066)
restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))
}
//返回绑定的端口号
restServerBoundPort = restServer.map(.start())
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
//当metrics系统启动后,将master和app的metrics servlet的hadnler给webui
masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
//序列化Spark的配置文件
val serializer = new JavaSerializer(conf)
//支持三种持久化引擎,将Spark的配置参数持久化,便于以后恢复使用
val (persistenceEngine
, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
val zkFactory =
new ZooKeeperRecoveryModeFactory(conf, serializer)
(zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
case "FILESYSTEM" =>
val fsFactory =
new FileSystemRecoveryModeFactory(conf, serializer)
(fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
case "CUSTOM" =>
val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
.newInstance(conf, serializer)
.asInstanceOf[StandaloneRecoveryModeFactory]
(factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
case _ =>
(new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
}
persistenceEngine = persistenceEngine_
leaderElectionAgent = leaderElectionAgent_
}

3.onStop方法,停止master的metrics系统、停止app的metrics系统、取消异步执行的任务、停止WebUi服务、停止rest服务以及持久化引擎和选举代理的停止。

override def onStop() {
masterMetricsSystem.report()
applicationMetricsSystem.report()
//避免异步发出的CompleteRecovery消息导致master的重启
if (recoveryCompletionTask != null) {
recoveryCompletionTask.cancel(true)
}
if (checkForWorkerTimeOutTask != null) {
checkForWorkerTimeOutTask.cancel(true)
}
forwardMessageThread.shutdownNow()
webUi.stop()
restServer.foreach(_.stop())
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
persistenceEngine.close()
leaderElectionAgent.stop()
}

还有一个重要的方法receive方法,留到下一篇吧。

转载于:https://www.cnblogs.com/yourarebest/p/5312965.html

【原】Spark中Master源码分析(一)相关推荐

  1. Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

    Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadO ...

  2. Spark RPC框架源码分析(二)RPC运行时序

    前情提要: Spark RPC框架源码分析(一)简述 一. Spark RPC概述 上一篇我们已经说明了Spark RPC框架的一个简单例子,Spark RPC相关的两个编程模型,Actor模型和Re ...

  3. Android多媒体框架(3)—— libstagefright中MediaCodec源码分析

    libstagefright中MediaCodec源码分析 和前两篇一样,我们按照MediaCodec的各个状态来分析libstagefright中MediaCodec的源代码. configure ...

  4. zipline中TradingCalendar源码分析

    zipline中TradingCalendar源码分析 1 TradingCalendar 交易日历 2 依赖项 3 canonicalize_datetime 时间进行格式化转换 4 get_non ...

  5. JDK7中HashMap源码分析

    文章目录 JDK7中的HashMap 一.JDK7中HashMap源码中重要的参数 二.JDK7中HashMap的构造方法 三.JDK7中创建一个HashMap的步骤 四.JDK7中HashMap的p ...

  6. spark读取文件源码分析-3

    本篇是spark read一个parquet源码分析的第三篇,这一篇主要介绍spark的默认的partition的设置逻辑,当然,这一篇实际上算不上源码分析了 第一篇 第二篇 1 . userProf ...

  7. spark读取文件源码分析-2

    文章目录 1. job1产生时机源码分析 1. DataSoure.getOrInferFileFormatSchema() 2. ParquetFileFormat.inferSchema 1. 简 ...

  8. spark读取文件源码分析-1

    文章目录 1. 问题背景 2. 测试代码 3. 生成的DAG图 1. job0 2. job1 4. job0 产生的时机源码分析 1. 调用DataFrameReader.load,DataFram ...

  9. spark 2.3源码分析之SortShuffleWriter

    SortShuffleWriter 概述 SortShuffleWriter它主要是判断在Map端是否需要本地进行combine操作.如果需要聚合,则使用PartitionedAppendOnlyMa ...

最新文章

  1. 音频编辑软件_Audacity2.12版
  2. 新冠肺炎数据里学到的四个数据分析和机器学习知识
  3. 新建一个spyder窗口
  4. 探寻阿里云服务器迈入2.0时代的技术要点
  5. 数据挖掘网上资料大全
  6. python为什么运行不了_python为什么会环境变量设置不成功
  7. oracle并发执行max,跪求大量并发执行insert into select语句的方案
  8. 云上远程运维的最后那点担心,“云梯”帮你解决
  9. js layui跳转页面_【WEB前端开辟】layui的iframe跳转链接与页面按钮跳转相干引见...
  10. Java性能调优小技巧
  11. 通过jquery进行ajax的一些“异常”请求的页面自提交到其它页面
  12. 战胜棋王后,人工智能是否可以颠覆安全?
  13. 一种增加先验知识库的贝叶斯网络推理模型
  14. 金鹏GB28181平台对接
  15. UBUNTU给已有用户改名
  16. eval('{kkk:{}}')出错,eval('{}')与eval('var ss = {kkk:{}}')正常
  17. SCSI代码分析(2)SCSI设备的管理1
  18. 桌面宠物秀,电脑桌面美化
  19. c++质数判定及输出质数表
  20. 数据清理中,处理缺失值的方法

热门文章

  1. 【CodeForces - 569B】Inventory (标记,乱搞)
  2. 人脑意识转入量子计算机,人脑产生意识:可能是因为量子纠缠
  3. vue css自定义标签,Vue如何使用CSS自定义变量
  4. .Net开发WebApi如何使用JObject对象接收参数
  5. Windows+Nginx+Tomcat搭建负载均衡和集群环境同时实现session共享(一)
  6. java写的MySQL数据库备份和恢复代码:
  7. php转化xml数组_PHP实现数组array转换成xml的方法
  8. C++ 流的操作 | 初识IO类、文件流、string流的使用
  9. Collections 工具类常见方法
  10. Hotspot虚拟机的对象