假设Spark的部署方式选择Standalone。一个採用Master/Slaves的典型架构。那么Master是有SPOF(单点故障,Single Point of Failure)。Spark能够选用ZooKeeper来实现HA。

ZooKeeper提供了一个Leader Election机制,利用这个机制能够保证尽管集群存在多个Master可是唯独一个是Active的,其它的都是Standby,当Active的Master出现问题时。另外的一个Standby Master会被选举出来。因为集群的信息,包含Worker, Driver和Application的信息都已经持久化到文件系统,因此在切换的过程中只会影响新Job的提交。对于正在进行的Job没有不论什么的影响。加入ZooKeeper的集群总体架构例如以下图所看到的。

1. Master的重新启动策略

Master在启动时,会依据启动參数来决定不同的Master故障重新启动策略:

  1. ZOOKEEPER实现HA
  2. FILESYSTEM:实现Master无数据丢失重新启动,集群的执行时数据会保存到本地/网络文件系统上
  3. 丢弃全部原来的数据重新启动

Master::preStart()能够看出这三种不同逻辑的实现。

override def preStart() {logInfo("Starting Spark master at " + masterUrl)...//persistenceEngine是持久化Worker。Driver和Application信息的,这样在Master又一次启动时不会影响//已经提交Job的执行persistenceEngine = RECOVERY_MODE match {case "ZOOKEEPER" =>logInfo("Persisting recovery state to ZooKeeper")new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf)case "FILESYSTEM" =>logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))case _ =>new BlackHolePersistenceEngine()}//leaderElectionAgent负责Leader的选取。leaderElectionAgent = RECOVERY_MODE match {case "ZOOKEEPER" =>context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf))case _ => // 唯独一个Master的集群,那么当前的Master就是Active的context.actorOf(Props(classOf[MonarchyLeaderAgent], self))}}

RECOVERY_MODE是一个字符串。能够从spark-env.sh中去设置。

val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")

假设不设置spark.deploy.recoveryMode的话。那么集群的全部执行数据在Master重新启动是都会丢失,这个结论是从BlackHolePersistenceEngine的实现得出的。

private[spark] class BlackHolePersistenceEngine extends PersistenceEngine {override def addApplication(app: ApplicationInfo) {}override def removeApplication(app: ApplicationInfo) {}override def addWorker(worker: WorkerInfo) {}override def removeWorker(worker: WorkerInfo) {}override def addDriver(driver: DriverInfo) {}override def removeDriver(driver: DriverInfo) {}override def readPersistedData() = (Nil, Nil, Nil)
}

它把全部的接口实现为空。PersistenceEngine是一个trait。

作为对照,能够看一下ZooKeeper的实现。

class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)extends PersistenceEnginewith Logging
{val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)SparkCuratorUtil.mkdir(zk, WORKING_DIR)// 将app的信息序列化到文件WORKING_DIR/app_{app.id}中override def addApplication(app: ApplicationInfo) {serializeIntoFile(WORKING_DIR + "/app_" + app.id, app)}override def removeApplication(app: ApplicationInfo) {zk.delete().forPath(WORKING_DIR + "/app_" + app.id)}

Spark使用的并非ZooKeeper的API,而是使用的org.apache.curator.framework.CuratorFramework 和 org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch} 。Curator在ZooKeeper上做了一层非常友好的封装。

2. 集群启动參数的配置

简单总结一下參数的设置,通过上述代码的分析,我们知道为了使用ZooKeeper至少应该设置一下參数(实际上,只须要设置这些參数。通过设置spark-env.sh:

spark.deploy.recoveryMode=ZOOKEEPER
spark.deploy.zookeeper.url=zk_server_1:2181,zk_server_2:2181
spark.deploy.zookeeper.dir=/dir
// OR 通过一下方式设置
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER "
export SPARK_DAEMON_JAVA_OPTS="${SPARK_DAEMON_JAVA_OPTS} -Dspark.deploy.zookeeper.url=zk_server1:2181,zk_server_2:2181"

各个參数的意义:

參数 默认值 含义
spark.deploy.recoveryMode NONE 恢复模式(Master又一次启动的模式),有三种:1, ZooKeeper, 2, FileSystem, 3 NONE
spark.deploy.zookeeper.url ZooKeeper的Server地址
spark.deploy.zookeeper.dir /spark ZooKeeper 保存集群元数据信息的文件文件夹,包含Worker,Driver和Application。

3. CuratorFramework简单介绍

CuratorFramework极大的简化了ZooKeeper的使用,它提供了high-level的API,而且基于ZooKeeper加入了非常多特性,包含

  • 自己主动连接管理:连接到ZooKeeper的Client有可能会连接中断。Curator处理了这样的情况。对于Client来说自己主动重连是透明的。
  • 简洁的API:简化了原生态的ZooKeeper的方法,事件等;提供了一个简单易用的接口。
  • Recipe的实现(很多其它介绍请点击Recipes):
    • Leader的选择
    • 共享锁
    • 缓存和监控
    • 分布式的队列
    • 分布式的优先队列

CuratorFrameworks通过CuratorFrameworkFactory来创建线程安全的ZooKeeper的实例。

CuratorFrameworkFactory.newClient()提供了一个简单的方式来创建ZooKeeper的实例,能够传入不同的參数来对实例进行全然的控制。获取实例后,必须通过start()来启动这个实例。在结束时,须要调用close()。

/*** Create a new client*** @param connectString list of servers to connect to* @param sessionTimeoutMs session timeout* @param connectionTimeoutMs connection timeout* @param retryPolicy retry policy to use* @return client*/public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy){return builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();}

须要关注的还有两个Recipe:org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch}。

首先看一下LeaderlatchListener,它在LeaderLatch状态变化的时候被通知:

  1. 在该节点被选为Leader的时候。接口isLeader()会被调用
  2. 在节点被剥夺Leader的时候,接口notLeader()会被调用

因为通知是异步的。因此有可能在接口被调用的时候。这个状态是准确的,须要确认一下LeaderLatch的hasLeadership()是否的确是true/false。这一点在接下来Spark的实现中能够得到体现。

/**
* LeaderLatchListener can be used to be notified asynchronously about when the state of the LeaderLatch has changed.
*
* Note that just because you are in the middle of one of these method calls, it does not necessarily mean that
* hasLeadership() is the corresponding true/false value. It is possible for the state to change behind the scenes
* before these methods get called. The contract is that if that happens, you should see another call to the other
* method pretty quickly.
*/
public interface LeaderLatchListener
{/**
* This is called when the LeaderLatch's state goes from hasLeadership = false to hasLeadership = true.
*
* Note that it is possible that by the time this method call happens, hasLeadership has fallen back to false. If
* this occurs, you can expect {@link #notLeader()} to also be called.
*/public void isLeader();/**
* This is called when the LeaderLatch's state goes from hasLeadership = true to hasLeadership = false.
*
* Note that it is possible that by the time this method call happens, hasLeadership has become true. If
* this occurs, you can expect {@link #isLeader()} to also be called.
*/public void notLeader();
}

LeaderLatch负责在众多连接到ZooKeeper Cluster的竞争者中选择一个Leader。

Leader的选择机制能够看ZooKeeper的详细实现。LeaderLatch这是完毕了非常好的封装。

我们只须要要知道在初始化它的实例后。须要通过

public class LeaderLatch implements Closeable
{private final Logger log = LoggerFactory.getLogger(getClass());private final CuratorFramework client;private final String latchPath;private final String id;private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);private final AtomicBoolean hasLeadership = new AtomicBoolean(false);private final AtomicReference<String> ourPath = new AtomicReference<String>();private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();private final CloseMode closeMode;private final AtomicReference<Future<?

>> startTask = new AtomicReference<Future<?

>>(); . . . /** * Attaches a listener to this LeaderLatch * <p/> * Attaching the same listener multiple times is a noop from the second time on. * <p/> * All methods for the listener are run using the provided Executor. It is common to pass in a single-threaded * executor so that you can be certain that listener methods are called in sequence, but if you are fine with * them being called out of order you are welcome to use multiple threads. * * @param listener the listener to attach */ public void addListener(LeaderLatchListener listener) { listeners.addListener(listener); }

通过addListener能够将我们实现的Listener加入到LeaderLatch。在Listener里,我们在两个接口里实现了被选为Leader或者被剥夺Leader角色时的逻辑就可以。

4. ZooKeeperLeaderElectionAgent的实现

实际上因为有Curator的存在,Spark实现Master的HA就变得非常easy了,ZooKeeperLeaderElectionAgent实现了接口LeaderLatchListener。在isLeader()确认所属的Master被选为Leader后。向Master发送消息ElectedLeader,Master会将自己的状态改为ALIVE。当noLeader()被调用时,它会向Master发送消息RevokedLeadership时。Master会关闭。

private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,masterUrl: String, conf: SparkConf)extends LeaderElectionAgent with LeaderLatchListener with Logging  {val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"// zk是通过CuratorFrameworkFactory创建的ZooKeeper实例private var zk: CuratorFramework = _// leaderLatch:Curator负责选出Leader。

private var leaderLatch: LeaderLatch = _   private var status = LeadershipStatus.NOT_LEADER   override def preStart() {     logInfo("Starting ZooKeeper LeaderElection agent")     zk = SparkCuratorUtil.newClient(conf)     leaderLatch = new LeaderLatch(zk, WORKING_DIR)     leaderLatch.addListener(this)     leaderLatch.start()   }

在prestart中,启动了leaderLatch来处理选举ZK中的Leader。

就如在上节分析的。基本的逻辑在isLeader和noLeader中。

  override def isLeader() {synchronized {// could have lost leadership by now.//如今leadership可能已经被剥夺了。

详情參见Curator的实现。 if (!leaderLatch.hasLeadership) { return } logInfo("We have gained leadership") updateLeadershipStatus(true) } } override def notLeader() { synchronized { // 如今可能赋予leadership了。详情參见Curator的实现。 if (leaderLatch.hasLeadership) { return } logInfo("We have lost leadership") updateLeadershipStatus(false) } }

updateLeadershipStatus的逻辑非常easy。就是向Master发送消息。

def updateLeadershipStatus(isLeader: Boolean) {if (isLeader && status == LeadershipStatus.NOT_LEADER) {status = LeadershipStatus.LEADERmasterActor ! ElectedLeader} else if (!isLeader && status == LeadershipStatus.LEADER) {status = LeadershipStatus.NOT_LEADERmasterActor ! RevokedLeadership}}

5. 设计理念

为了解决Standalone模式下的Master的SPOF。Spark採用了ZooKeeper提供的选举功能。Spark并没有採用ZooKeeper原生的Java API,而是採用了Curator。一个对ZooKeeper进行了封装的框架。

採用了Curator后。Spark不用管理与ZooKeeper的连接,这些对于Spark来说都是透明的。

Spark只使用了100行代码,就实现了Master的HA。当然了,Spark是站在的巨人的肩膀上。谁又会去反复发明轮子呢?

请您支持:

假设你看到这里,相信这篇文章对您有所帮助。假设是的话,请为本文投一下票吧: 点击投票,多谢。假设您已经在投票页面,请点击以下的投一票吧!

BTW。即使您没有CSDN的帐号,能够使用第三方登录的,包含微博,QQ。Gmail,GitHub,百度,等。

转载于:https://www.cnblogs.com/jzssuanfa/p/6978112.html

Spark技术内幕:Master基于ZooKeeper的High Availability(HA)源代码实现相关推荐

  1. Spark集群搭建+基于zookeeper实现高可用HA

    vi spark-env.sh(三台都要) export JAVA_HOME=/usr/java/jdk1.8.0_20/export SCALA_HOME=/home/iespark/hadoop_ ...

  2. Spark技术内幕: Task向Executor提交的源代码解析

    在上文<Spark技术内幕:Stage划分及提交源代码分析>中,我们分析了Stage的生成和提交.可是Stage的提交,仅仅是DAGScheduler完毕了对DAG的划分,生成了一个计算拓 ...

  3. 基于Zookeeper搭建hadoop的HA功能

    简介 使用zookeeper和Hadoop的FailOverController的心跳检测来维护hadoop,并在hadoop宕机的时候通过zookeeper选举功能进行Active的切换 并使用Jo ...

  4. 我的第一本著作:Spark技术内幕上市!

    现在各大网站销售中! 京东:http://item.jd.com/11770787.html 当当:http://product.dangdang.com/23776595.html 亚马逊:http ...

  5. Spark技术内幕:究竟什么是RDD

    RDD是Spark最基本,也是最根本的数据抽象.http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf 是关于RDD的论文.如果觉得英 ...

  6. Spark技术内幕:Shuffle Read的整体流程

    回忆一下,每个Stage的上边界,要么需要从外部存储读取数据,要么需要读取上一个Stage的输出:而下边界,要么是需要写入本地文件系统(需要Shuffle),以供childStage读取,要么是最后一 ...

  7. Spark技术内幕:Stage划分及提交源码分析

    当触发一个RDD的action后,以count为例,调用关系如下: org.apache.spark.rdd.RDD#count org.apache.spark.SparkContext#runJo ...

  8. Spark HA高可用部署、基于文件系统单点恢复、基于zookeeper的Standby Master、如何恢复到上一次活着master挂掉之前的状态 03

    1. Spark HA高可用部署 Spark Standalone集群时Master-Slaves架构的集群模式,和大部分的Master-Slaves结果集群一样,存在着Master单点故障的问题.如 ...

  9. Spark集群基于Zookeeper的HA搭建部署笔记(转)

    原文链接:Spark集群基于Zookeeper的HA搭建部署笔记 1.环境介绍 (1)操作系统RHEL6.2-64 (2)两个节点:spark1(192.168.232.147),spark2(192 ...

  10. 从Paxos到ZooKeeper-四、ZooKeeper技术内幕

    本文将从系统模型.序列化与协议.客户端工作原理.会话.服务端工作原理以及数据存储等方面来揭示ZooKeeper的技术内幕. 一.系统模型 1.1 数据模型 ZooKeeper的视图结构使用了其特有的& ...

最新文章

  1. python 简易HTTP服务器搭建
  2. Linux多线程同步——信号量
  3. 【IT人的管理进阶课】如何提升带团队的能力
  4. 【收藏】k8s使用securityContext和sysctl
  5. 1015 德才论 (25 分)(c语言)
  6. Project: Individual Project - Word frequency program----11061192zmx
  7. MOCTF-Web-一道水题
  8. SVN使用和解决方案
  9. 数据清洗-拉格朗日插值
  10. 相机标定基础【1】- 在Visual Station 2019 上搭建OpenCV应用 (1)- 安装配置VS
  11. 建立你第一个 Outlook Add-in
  12. html连接数据库id号自动生成器,SQL Server数据库sql语句生成器(SqlDataToScript)的使用(sql server自增列(id)插入固定值)...
  13. 在CKEditor中创建自己的命令按钮
  14. storm - 常用命令
  15. Selenium WebDriver架构
  16. 团队建设(Team building)就是等于组织大家一起吃饭娱乐吗?
  17. MATLAB2016笔记(十一):基本粒子群优化算法(PSO)的MATLAB实现
  18. 深入解析Tensor索引中的Indexing Multi-dimensional arrays问题
  19. 音视频流媒体————基本概念
  20. model.evaluate() 解释一下

热门文章

  1. 国内首款 FPGA 云服务器,性能是通用 CPU 服务器 30 倍以上
  2. 【JAVA源码分析——Java.lang】String源码分析
  3. Redis Nosql数据库
  4. PHP获取当前文件路径,上层目录路径
  5. 路由汇总之二ospf路由汇总
  6. winform+c#之窗体之间的传值
  7. 印度:10美元电脑与全民免费上网
  8. [BZOJ1415]聪聪和可可
  9. UVa 10118 免费糖果(记忆化搜索+哈希)
  10. 电够动力足——认识主板上的CPU供电模块