1.概述

转载:flink zookeeper HA 实现分析
仅仅是自己学习。

2.Zookeeper HA相关配置

## 使用zk做HA
high-availability: zookeeper
## zk地址
high-availability.zookeeper.quorum: node1:2181,node2:2181,node3:2181
## flink在zk下的工作路径
high-availability.zookeeper.path.root: /flink
## 任务所在的HA路径
high-availability.cluster-id: /default
## 保存元数据到文件系统
high-availability.storageDir: hdfs:///flink/recovery## --任务运行在YARN上的配置--
## applicationMaster重试的次数,默认为1,当application master失败的时候,该任务不会重启。
## 设置一个比较大的值的话,yarn会尝试重启applicationMaster。
yarn.application-attempts: 10
## flink是否应该重新分配失败的taskmanager容器。默认是true。
yarn.reallocate-failed:true
## applicationMaster可以接受的容器最大失败次数,达到这个参数,就会认为yarn job失败。
## 默认这个次数和初始化请求的taskmanager数量相等(-n 参数指定的)。
yarn.maximum-failed-containers:1

flink使用Zookeeper做HA

flink的ResourceManager、Dispatcher、JobManager、WebServer组件都需要高可用保证,同时flink高可用还需要持久化checkpoint的元数据信息,保留最近一次已经完成的checkpoint等工作,其中最重要的就是组件的leader选举、leader状态跟踪。本次抽取出Flink使用zk实现leader选举、leader状态跟踪代码,学习下flink是如何使用curator的。类之间的关系如下:


ZooKeeperHaServices是HighAvailabilityServices基于zookeeper的实现,通过使用ZooKeeperUtils类来创建组件的LeaderRetrievalService以及LeaderElectionService。

LeaderRetrievalService用来跟踪leader的变化,当发现leader地址变化时,要通知依赖它的组件去依赖新的leader。比如getResourceManagerLeaderRetriever方法,flink会监听zk的/leader/resource_manager_lock节点内容变化,内容是rm的leader地址和leaderUUID,而taskmanger调用该服务的start方法传递了一个LeaderRetrievalListener。如果节点内容发生变化,意味着rm的leader地址发生变化,那么的LeaderRetrievalListener的notifyLeaderAddress就会通知taskmanger去新的ResourceManager地址进行注册。zk实现该功能使用的是curator的NodeCache并重写了nodeChanged方法。

LeaderElectionService用来进行leader选举工作,当节点成为leader后会调用LeaderContender的grantLeadership方法。以ResourceManagerLeaderElection为例,flink会在zk的/leaderlatch/resource_manager_lock路径下创建临时节点,创建成功的rm节点成为leader触发rm的grantLeadership,最终将当前地址和UUID写入/leader/resource_manager_lock中,这样就触发了LeaderRetrievalService服务。zk实现leader选举使用的是curator的LeaderLatch并重写了isLeader和notLeader方法。同时使用NodeCache监听/leader/resource_manager_lock内容变化,确保新leader地址和UUID成功写入节点。

LeaderRetrievalListener对LeaderRetrievalService的leader地址变化做出响应,通过notifyLeaderAddress传递新leader地址。

LeaderContender对LeaderElectionService的节点角色发生变化做出响应,通过grantLeadership和revokeLeadership进行leader的授权和撤销工作。

3.zk结构

一个集群目录下的zk结构如下图所示:

4.flink相关源码

简单的走一下流程,看看集群启动时是如何创建ZooKeeperHaServices的。

集群启动入口ClusterEntrypoint

根据集群的部署模式session or perjob由对应的子类调用ClusterEntrypoint的startCluster方法启动集群,接着会先调用initializeServices方法,启动集群相关的组件信息。这里只看启动haServices部分。

public void startCluster() throws ClusterEntrypointException {SecurityContext securityContext = installSecurityContext(configuration);securityContext.runSecured((Callable<Void>) () -> {runCluster(configuration);return null;});
}protected void initializeServices(Configuration configuration) {ioExecutor = Executors.newFixedThreadPool(Hardware.getNumberCPUCores(),new ExecutorThreadFactory("cluster-io"));haServices = createHaServices(configuration, ioExecutor);blobServer = new BlobServer(configuration, haServices.createBlobStore());blobServer.start();}
}

根据high-availability配置创建ZooKeeperHaServices,默认情况下为NONE。

protected HighAvailabilityServices createHaServices(Configuration configuration,Executor executor) throws Exception {//创建HA服务时不需要地址解析return HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration,executor,HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
}//根据传递的high-availability配置,选择创建哪种HA服务,默认为NONE
public static HighAvailabilityServices createHighAvailabilityServices(Configuration configuration,Executor executor,AddressResolution addressResolution) throws Exception {//获取high-availability配置 如:zookeeperHighAvailabilityMode highAvailabilityMode = LeaderRetrievalUtils.getRecoveryMode(configuration);switch (highAvailabilityMode) {case NONE:final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);final String jobManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(hostnamePort.f0,hostnamePort.f1,JobMaster.JOB_MANAGER_NAME,addressResolution,configuration);final String resourceManagerRpcUrl = AkkaRpcServiceUtils.getRpcUrl(hostnamePort.f0,hostnamePort.f1,ResourceManager.RESOURCE_MANAGER_NAME,addressResolution,configuration);final String dispatcherRpcUrl = AkkaRpcServiceUtils.getRpcUrl(hostnamePort.f0,hostnamePort.f1,Dispatcher.DISPATCHER_NAME,addressResolution,configuration);final String address = checkNotNull(configuration.getString(RestOptions.ADDRESS),"%s must be set",RestOptions.ADDRESS.key());final int port = configuration.getInteger(RestOptions.PORT);final boolean enableSSL = SSLUtils.isRestSSLEnabled(configuration);final String protocol = enableSSL ? "https://" : "http://";return new StandaloneHaServices(resourceManagerRpcUrl,dispatcherRpcUrl,jobManagerRpcUrl,String.format("%s%s:%s", protocol, address, port));case ZOOKEEPER://元数据存储服务  我们通常使用FileSystemBlobStore 路径就是 high-availability.storageDir: hdfs:///flink/recoveryBlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);//  使用 ZooKeeper做HA服务return new ZooKeeperHaServices(ZooKeeperUtils.startCuratorFramework(configuration),executor,configuration,blobStoreService);case FACTORY_CLASS:return createCustomHAServices(configuration, executor);default:throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");}
}

ZooKeeperHaServices主要提供了创建LeaderRetrievalServiceLeaderElectionService方法,并给出了各个服务组件使用的ZK节点名称。别看是以_lock结尾,这个节点名称既在leaderlatcher做leader选举的分布式锁产生的路径,又在leader目录下用来存放leader的地址信息。


private static final String RESOURCE_MANAGER_LEADER_PATH = "/resource_manager_lock";
private static final String DISPATCHER_LEADER_PATH = "/dispatcher_lock";
private static final String JOB_MANAGER_LEADER_PATH = "/job_manager_lock";
//  web展示服务
private static final String REST_SERVER_LEADER_PATH = "/rest_server_lock";// 创建ResourceManagerLeaderRetriever,对RM的leader地址变化进行跟踪
public LeaderRetrievalService getResourceManagerLeaderRetriever() {return ZooKeeperUtils.createLeaderRetrievalService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
}// 创建ResourceManagerLeaderElectionService,对RMleader挂掉后重新进行选举
public LeaderElectionService getResourceManagerLeaderElectionService() {return ZooKeeperUtils.createLeaderElectionService(client, configuration, RESOURCE_MANAGER_LEADER_PATH);
}

ZooKeeperUtils创建LeaderRetrievalService流程。

  • 接收curator客户端以及服务在zk下的节点路径,创建出ZooKeeperLeaderRetrievalService(ZKLRS)对象。
  • ZKLRS这个对象就是对zk节点的内容进行了监听,当内容发生变化时,通知给通过start方法传递过来的LeaderRetrievalListener
public void start(LeaderRetrievalListener listener) throws Exception {synchronized (lock) {//leader发生变化时,通知对应的LeaderRetrievalListenerleaderListener = listener;// 异常时调用当前对象的unhandledError方法client.getUnhandledErrorListenable().addListener(this);// 使用NodeCache监听节点内容变化cache.getListenable().addListener(this);cache.start();// 对会话连接状态进行跟踪client.getConnectionStateListenable().addListener(connectionStateListener);running = true;}
}

通过重写nodeChanged方法,来获取Leader变更后的地址,并传递新的地址

public void nodeChanged() throws Exception {synchronized (lock) {if (running) {try {LOG.debug("Leader node has changed.");ChildData childData = cache.getCurrentData();String leaderAddress;UUID leaderSessionID;if (childData == null) {leaderAddress = null;leaderSessionID = null;} else {byte[] data = childData.getData();if (data == null || data.length == 0) {leaderAddress = null;leaderSessionID = null;} else {ByteArrayInputStream bais = new ByteArrayInputStream(data);ObjectInputStream ois = new ObjectInputStream(bais);// leader 地址leaderAddress = ois.readUTF();// leader uuidleaderSessionID = (UUID) ois.readObject();}}// leader 地址发生变化if (!(Objects.equals(leaderAddress, lastLeaderAddress) &&Objects.equals(leaderSessionID, lastLeaderSessionID))) {lastLeaderAddress = leaderAddress;lastLeaderSessionID = leaderSessionID;//  传递新的leaderAddress和leaderSessionIDleaderListener.notifyLeaderAddress(leaderAddress, leaderSessionID);}} catch (Exception e) {leaderListener.handleError(new Exception("Could not handle node changed event.", e));throw e;}} else {LOG.debug("Ignoring node change notification since the service has already been stopped.");}}
}

ZooKeeperUtils创建ZooKeeperLeaderElectionService流程。

传递leader所在的zk路径、选举时临时节点创建的zk路径。之所以要传递leader节点是要在新leader产生时,将新leader的地址和uuid写入。

public static ZooKeeperLeaderElectionService createLeaderElectionService(final CuratorFramework client,final Configuration configuration,final String pathSuffix) {// 在leaderlatch节点下进行选举final String latchPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH) + pathSuffix;// leader节点    final String leaderPath = configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH) + pathSuffix;return new ZooKeeperLeaderElectionService(client, latchPath, leaderPath);
}

通过调用start方法传递LeaderContender,并开启leader选举。

public void start(LeaderContender contender) throws Exception {synchronized (lock) {// 绑定异常处理监听器client.getUnhandledErrorListenable().addListener(this);// 传递Contender竞争者leaderContender = contender;//开启leader选举服务,成为leader的节点会触发isleaderleaderLatch.addListener(this);leaderLatch.start();//监听leader节点内容变化cache.getListenable().addListener(this);cache.start();client.getConnectionStateListenable().addListener(listener);running = true;}
}

当某一Contender成为leader后,会触发grantLeadership传递新leader的uuid进行授权,并调用LeaderElectionService的confirmLeaderSessionID,将新leader地址写入leader节点。

public void confirmLeaderSessionID(UUID leaderSessionID) {// 是Leaderif (leaderLatch.hasLeadership()) {// check if this is an old confirmation callsynchronized (lock) {if (running) {if (leaderSessionID.equals(this.issuedLeaderSessionID)) {confirmedLeaderSessionID = leaderSessionID;// 将confirmLeaderSessionID写到 leader目录下writeLeaderInformation(confirmedLeaderSessionID);}}}}
}

写入时会触发当前对象的nodeChanged方法,该方法用来确保新leader地址和uuid成功写入。

public void nodeChanged() throws Exception {try {// leaderSessionID is null if the leader contender has not yet confirmed the session IDif (leaderLatch.hasLeadership()) { // leadersynchronized (lock) {if (running) {// 当选为leader 已经被确认if (confirmedLeaderSessionID != null) {ChildData childData = cache.getCurrentData();//  没写进去,再写一次if (childData == null) {if (LOG.isDebugEnabled()) {LOG.debug("Writing leader information into empty node by {}.",leaderContender.getAddress());}writeLeaderInformation(confirmedLeaderSessionID);} else {byte[] data = childData.getData();if (data == null || data.length == 0) {// the data field seems to be empty, rewrite informationwriteLeaderInformation(confirmedLeaderSessionID);} else {ByteArrayInputStream bais = new ByteArrayInputStream(data);ObjectInputStream ois = new ObjectInputStream(bais);String leaderAddress = ois.readUTF();UUID leaderSessionID = (UUID) ois.readObject();if (!leaderAddress.equals(this.leaderContender.getAddress()) ||(leaderSessionID == null || !leaderSessionID.equals(confirmedLeaderSessionID))) {writeLeaderInformation(confirmedLeaderSessionID);}}}}} else {// leader未确认confirmedLeaderSessionIDLOG.debug("Ignoring node change notification since the service has already been stopped.");}}}} catch (Exception e) {...}
}

writeLeaderInformation用来写入leader地址和uuid,写入时先判断leader节点是否由当前leader会话创建的,如果不是则删除后重写创建。

protected void writeLeaderInformation(UUID leaderSessionID) {try {ByteArrayOutputStream baos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(baos);//  leader 地址oos.writeUTF(leaderContender.getAddress());//  leader的 UUIDoos.writeObject(leaderSessionID);oos.close();boolean dataWritten = false;while (!dataWritten && leaderLatch.hasLeadership()) {Stat stat = client.checkExists().forPath(leaderPath);if (stat != null) {long owner = stat.getEphemeralOwner();long sessionID = client.getZookeeperClient().getZooKeeper().getSessionId();//节点由当前会话创建if (owner == sessionID) {try {client.setData().forPath(leaderPath, baos.toByteArray());dataWritten = true;} catch (KeeperException.NoNodeException noNode) {// node was deleted in the meantime}} else {try {//  不是当前节点创建则先删除client.delete().forPath(leaderPath);} catch (KeeperException.NoNodeException noNode) {// node was deleted in the meantime --> try again}}} else {try {client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(leaderPath,baos.toByteArray());dataWritten = true;} catch (KeeperException.NodeExistsException nodeExists) {// node has been created in the meantime --> try again}}}}
}

本次学习了flink如何使用curator来操作zk节点,实现leader选举和leader状态跟踪。LeaderRetrievalListener和LeaderContender两个接口更像是这一部分功能的输入和输出,来跟踪leader的变化情况。而中间部分对zk节点的操作和状态监听,则可以抽取出来在自己的项目中使用。

【Flink】flink zookeeper HA 实现分析相关推荐

  1. Flink JobManager的HA原理分析

    文章目录 前言 JobManager的HA切换通知 利用Zookeeper的领导选举与消息通知 引用 前言 在中心式管理的系统里,主节点如果只是单独服务部署的话,或多或少都会存在单点瓶颈(SPOF)问 ...

  2. 【Flink】Zookeeper connection loss leads to Flink job restart

    1.概述 转载:添Zookeeper connection loss leads to Flink job restart 看这个之前可以先看看:[Flink]Flink 报错 ResourceMan ...

  3. Flink Standalone模式HA部署

    Standalone Cluster HA 前面我们配置的 Standalone 集群实际上只有一个 JobManager,此时是存在单点故障的,所以官方提供了 Standalone Cluster ...

  4. flink on yarn HA高可用集群搭建

    无论以什么样的模式提交Application到Yarn中运行,都会启动一个yarn-session(Flink 集群),依然是由JobManager和TaskManager组成,那么JobManage ...

  5. Z04 - 999、Flink与电商指标分析

    初学耗时:999h 注:CSDN手机端暂不支持章节内链跳转,但外链可用,更好体验还请上电脑端. 『   因为要去见那个不一般的人,所以我就不能是一般人.』 1.Z04系列全套网盘资源: Z04系列全套 ...

  6. Flink on yarn (HA)测试

      前面已经介绍了Flink on yarn的相关部署,在yarn当中,启动Flink有两种模式,分别是客户端模式和分离式,下面将介绍Flink on yarn HA是如何在宕机后,重启Flink及k ...

  7. 基于 Flink、ClickHouse 的舆情分析系统:项目展示与 Git 地址

    一.基于 Flink.ClickHouse 的舆情分析系统 基于Flink.ClickHouse.ElasticSearch 的舆情分析系统 二.Git Code https://github.com ...

  8. 大数据计算引擎之Flink Flink CEP复杂事件编程

    原文地址:大数据计算引擎之Flink Flink CEP复杂事件编程 复杂事件编程(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的时事件系序列库,并利 ...

  9. zookeeper源码分析之五服务端(集群leader)处理请求流程

    leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcesso ...

最新文章

  1. 让人“眼前一亮、不明觉厉”的互联网技术PPT
  2. 剑指offer_第4题_重建二叉树
  3. Intel SGX Remote Attestation实例代码安装和执行,笔记
  4. 介绍一个使用 cl_abap_corresponding 进行两个内表不同名称字段赋值的快捷方法
  5. 自定义SAP Spartacus的产品搜索API参数 - Product Search
  6. Android之监测database的改变--notifyChange
  7. 树莓派SSH 连接不上:socket error Event:32 Error:10053
  8. 实验3.2 定义一个简单的Computer类
  9. [转]『TensorFlow』读书笔记_TFRecord学习
  10. python处理词项的停用词_词项邻近 停用词 词干还原
  11. python绘制缓和曲线_cad缓和曲线怎么绘制? cad缓和曲线插件的下载使用方法
  12. linux未找到命令rpm,RPM命令执行失败:bash: rpm: 未找到命令...
  13. 基于FFT的ofdm系统框图
  14. Usability Engineering
  15. sql中interval用法总结
  16. Spec2Vec快速入门
  17. Word中的字体大小(几号-几磅)
  18. JavaScript高级编程设计(第三版)——第四章:变量作用域和内存问题
  19. mysql面试题1313
  20. OpenWrt之DHCP(动态主机配置协议)

热门文章

  1. 星巴克又出事 被强制执行1087万!网友:欠租了吗?
  2. 全球电动车竞争加剧 特斯拉全球市场份额持续下滑
  3. 抢光儿童餐,是这届95后最后的倔强
  4. iOS 14.5来了!除了新功能,苹果还要让用户掌控自己的隐私数据
  5. 研究机构:全球半导体厂商今年资本支出1081亿美元
  6. 每卖出一部新款iPhone SE,苹果就要赚1500元?
  7. 巴菲特:承认错误“抄底”航空股,不看好航空公司
  8. 新iPhone终于要去掉刘海了,但这个操作好骚啊!
  9. 疑似锤子新机谍照曝光,后置“拐角”四摄,前CEO犀利点评...
  10. 网易传媒回应“变相裁员 ”说法:假消息,将提起诉讼