Leader选举

在分布式计算中,Leader选举是指指定单个进程作为分布在多台计算机(节点)之间的某些任务的组织者的过程。在任务开始之前,所有网络节点都不知道哪个节点将充当任务的Leader。然而,在执行Leader选举算法之后,整个网络中的每个节点都会选举出一个特定的、唯一的节点作为任务Leader

LeaderLatch

Curator框架提供了两种Leader选举的实现,LeaderLatchLeaderSelector,本篇博客介绍LeaderLatch的使用。

测试代码

CuratorFrameworkProperties类(提供CuratorFramework需要的一些配置信息):

package com.kaven.zookeeper;import org.apache.curator.RetryPolicy;
import org.apache.curator.retry.ExponentialBackoffRetry;public class CuratorFrameworkProperties {// 连接地址public static final String CONNECT_ADDRESS = "192.168.31.175:9000";// 连接超时时间public static final int CONNECTION_TIMEOUT_MS = 40000;// Session超时时间public static final int SESSION_TIMEOUT_MS = 10000;// 命名空间public static final String NAMESPACE = "MyNamespace";// 重试策略public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);
}

LeaderLatchRunnable类(实现了Runnable接口,模拟分布式服务节点参与Leader选举):

package com.kaven.zookeeper;import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class LeaderLatchRunnable implements Runnable{private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();@SneakyThrows@Overridepublic void run() {// 使用不同的CuratorFramework实例,表示不同的分布式服务节点CuratorFramework curator = getCuratorFramework();curator.start();assert curator.getState().equals(CuratorFrameworkState.STARTED);// 模拟随机加入的分布式服务节点int randomSleep = new Random().nextInt(1000);Thread.sleep(randomSleep);// 创建LeaderLatch实例(用于Leader选举)// curator是CuratorFramework实例,用于与ZooKeeper交互// "/services/leader"是latchPath,Leader节点会成功创建该节点(其他节点则会失败)// 将线程名(Thread.currentThread().getName())作为分布式服务节点的id// LeaderLatch.CloseMode.NOTIFY_LEADER表示close模式,即节点进行close操作后的模式LeaderLatch latch = new LeaderLatch(curator, "/services/leader",Thread.currentThread().getName(), LeaderLatch.CloseMode.NOTIFY_LEADER);// 给LeaderLatch实例添加监听器(LeaderLatchListenerImpl实例)// EXECUTOR_SERVICE表示执行该LeaderLatchListenerImpl实例的Executor实例latch.addListener(new LeaderLatchListenerImpl(latch), EXECUTOR_SERVICE);System.out.println(latch.getId() + "准备好了!");// 开始Leader选举latch.start();System.out.println(latch.getId() + "开始Leader选举!");}private CuratorFramework getCuratorFramework() {// 创建CuratorFramework实例return CuratorFrameworkFactory.builder().connectString(CuratorFrameworkProperties.CONNECT_ADDRESS).retryPolicy(CuratorFrameworkProperties.RETRY_POLICY).connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS).sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS).namespace(CuratorFrameworkProperties.NAMESPACE).build();}@RequiredArgsConstructorprivate static class LeaderLatchListenerImpl implements LeaderLatchListener {private final LeaderLatch LATCH;// 被选举为Leader节点时调用@SneakyThrows@Overridepublic void isLeader() {System.out.println("--------------------------------" + LATCH.getId() + "被选举为Leader--------------------------------");LATCH.getParticipants().forEach(System.out::println);// 睡眠5秒就close(该节点会从Leader选举中移除),其他节点会重新进行Leader选举Thread.sleep(5000);LATCH.close();}@Overridepublic void notLeader() {// 节点调用了close方法,只有在LeaderLatch.CloseMode.NOTIFY_LEADER模式下会调用该方法// LeaderLatch.CloseMode.SILENT模式下不会调用该方法System.out.println("--------------------------------" + LATCH.getId() + "离开,重新进行Leader选举--------------------------------");}}
}

启动类:

package com.kaven.zookeeper;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class Application {private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();public static void main(String[] args) throws Exception {for (int i = 0; i < 7; i++) {EXECUTOR_SERVICE.execute(new LeaderLatchRunnable());}Thread.sleep(10000000);}
}

模拟7个分布式服务节点进行Leader选举,输出如下所示:

pool-1-thread-6准备好了!
pool-1-thread-6开始Leader选举!
pool-1-thread-7准备好了!
pool-1-thread-7开始Leader选举!
pool-1-thread-5准备好了!
pool-1-thread-5开始Leader选举!
pool-1-thread-2准备好了!
pool-1-thread-2开始Leader选举!
pool-1-thread-4准备好了!
pool-1-thread-4开始Leader选举!
pool-1-thread-1准备好了!
pool-1-thread-1开始Leader选举!
pool-1-thread-3准备好了!
pool-1-thread-3开始Leader选举!
--------------------------------pool-1-thread-1被选举为Leader--------------------------------
Participant{id='pool-1-thread-1', isLeader=true}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-1离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-4被选举为Leader--------------------------------
Participant{id='pool-1-thread-4', isLeader=true}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-4离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-3被选举为Leader--------------------------------
Participant{id='pool-1-thread-3', isLeader=true}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-3离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-5被选举为Leader--------------------------------
Participant{id='pool-1-thread-5', isLeader=true}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-5离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-6被选举为Leader--------------------------------
Participant{id='pool-1-thread-6', isLeader=true}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-6离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-7被选举为Leader--------------------------------
Participant{id='pool-1-thread-7', isLeader=true}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-7离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-2被选举为Leader--------------------------------
Participant{id='pool-1-thread-2', isLeader=true}
--------------------------------pool-1-thread-2离开,重新进行Leader选举--------------------------------

CloseMode枚举类:

/*** 节点(latch)调用close方法时如何处理监听器*/public enum CloseMode{/*** 当latch关闭时,不通知监听器(默认行为)*/SILENT,/*** 当latch关闭时,通知监听器*/NOTIFY_LEADER}

修改成LeaderLatch.CloseMode.SILENT模式(默认模式,因此也可以不设置closeMode属性):

LeaderLatch latch = new LeaderLatch(curator, "/services/leader",Thread.currentThread().getName(), LeaderLatch.CloseMode.SILENT);

输出变成如下所示:

pool-1-thread-7准备好了!
pool-1-thread-7开始Leader选举!
pool-1-thread-3准备好了!
pool-1-thread-3开始Leader选举!
pool-1-thread-6准备好了!
pool-1-thread-6开始Leader选举!
pool-1-thread-4准备好了!
pool-1-thread-4开始Leader选举!
pool-1-thread-5准备好了!
pool-1-thread-5开始Leader选举!
pool-1-thread-2准备好了!
pool-1-thread-2开始Leader选举!
pool-1-thread-1准备好了!
pool-1-thread-1开始Leader选举!
--------------------------------pool-1-thread-1被选举为Leader--------------------------------
Participant{id='pool-1-thread-1', isLeader=true}
Participant{id='pool-1-thread-2', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-2被选举为Leader--------------------------------
Participant{id='pool-1-thread-2', isLeader=true}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-3被选举为Leader--------------------------------
Participant{id='pool-1-thread-3', isLeader=true}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-6被选举为Leader--------------------------------
Participant{id='pool-1-thread-6', isLeader=true}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-7被选举为Leader--------------------------------
Participant{id='pool-1-thread-7', isLeader=true}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-4被选举为Leader--------------------------------
Participant{id='pool-1-thread-4', isLeader=true}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-5被选举为Leader--------------------------------
Participant{id='pool-1-thread-5', isLeader=true}

很显然监听器实例的notLeader方法没有被调用。

    /*** 从Leader选举中删除此实例* 如果此实例是Leader,则释放领导权* 释放领导权的唯一方法是调用 close()* 所有LeaderLatch实例最终都必须关闭*/public synchronized void close(CloseMode closeMode) throws IOException{Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");Preconditions.checkNotNull(closeMode, "closeMode cannot be null");cancelStartTask();try{setNode(null);client.removeWatchers();}catch ( Exception e ){ThreadUtils.checkInterrupted(e);throw new IOException(e);}finally{client.getConnectionStateListenable().removeListener(listener);switch ( closeMode ){case NOTIFY_LEADER:{  // 先设置hasLeadership属性(设置完后会调用监听器),再删除监听器setLeadership(false);listeners.clear();break;}default:{// 先删除监听器,再设置hasLeadership属性(设置完后会调用监听器,但已经没有监听器了)listeners.clear();setLeadership(false);break;}}}}

为什么LeaderLatchRunnable实例的run方法执行结束了,还能继续进行Leader选举,因为执行了latch.start();,有新线程被启动:

    /*** 将此实例添加到Leader选举并尝试获得领导权*/public void start() throws Exception{Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");// 有新线程被启动startTask.set(AfterConnectionEstablished.execute(client, new Runnable(){@Overridepublic void run(){try{internalStart();}finally{startTask.set(null);}}}));}

LeaderLatch类的public方法如下图所示:

都是一些比较常规的方法,这里只介绍两个await方法,其他方法比较简单。

  • await():调用该方法会导致当前线程等待,直到此实例获得领导权,除非线程被中断或关闭。如果此实例已经是Leader,则此方法立即返回true。否则,当前线程将被禁用并处于休眠状态,直到发生以下三种情况之一:

    • 此实例成为Leader
    • 其他线程中断当前线程
    • 实例已关闭
  • await(long timeout, TimeUnit unit):调用该方法会导致当前线程等待,直到此实例获得领导权,除非线程被中断、指定的等待时间已过或实例已关闭。如果指定的等待时间已过或实例已关闭,则返回false ,如果等待时间小于或等于0,则该方法根本不会等待。如果此实例已经是Leader,则此方法立即返回true。否则,当前线程将被禁用并处于休眠状态,直到发生以下四种情况之一:
    • 此实例成为Leader
    • 其他线程中断当前线程
    • 指定的等待时间已过
    • 实例已关闭

通过await方法,使用LeaderLatch进行Leader选举就像使用CountDownLatch一样方便,Curator框架的Leader选举实现LeaderLatch就介绍到这里,源码以后会进行分析介绍,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

ZooKeeper : Curator框架之Leader选举LeaderLatch相关推荐

  1. ZooKeeper集群与Leader选举

    说说你对ZooKeeper集群与Leader选举的理解? ZooKeeper是一个开源分布式协调服务.分布式数据一致性解决方案.可基于ZooKeeper实现命名服务.集群管理.Master选举.分布式 ...

  2. springboot使用curator来实现leader选举

    本文来说下springboot使用curator来实现leader选举 文章目录 概述 概述

  3. 面试官:说说你对ZooKeeper集群与Leader选举的理解?

    作者:TalkingData 史天舒 来自:TalkingData ZooKeeper是一个开源分布式协调服务.分布式数据一致性解决方案.可基于ZooKeeper实现命名服务.集群管理.Master选 ...

  4. zookeeper的设计猜想-leader选举

    当leader挂了,需要从其他follower节点中选择一个新的节点进行处理,这个时候就需要涉及到leader选举 从这个过程中,我们推导处了zookeeper的一些设计思想

  5. 集群没有leader_面试官问:说说你对ZooKeeper集群与Leader选举的理解?

    ZooKeeper是一个开源分布式协调服务.分布式数据一致性解决方案.可基于ZooKeeper实现命名服务.集群管理.Master选举.分布式锁等功能. 高可用 为了保证ZooKeeper的可用性,在 ...

  6. ZooKeeper 技术内幕,Leader 选举是一个什么样的过程?

    几个问题,引发思考: 什么时候 leader 选举? 选举的过程? 选举过程中,是否能提供服务? 选举结果,是否会丢失数据? 服务器角色 2 个小问题: 服务器节点有多少角色? 每个角色的作用? 角色 ...

  7. 分布式开发必须了解的Zookeeper的Leader选举机制(源码解析)

    分布式开发必须知道的Zookeeper知识及其的Leader选举机制(ZAB原子广播协议)   ZooKeeper是Hadoop下的一个子项目,它是一个针对大型分布式系统的可靠协调系统,提供的功能包括 ...

  8. zookeeper的Leader选举机制详解

    转载自:https://www.toutiao.com/i6701570306445672963/?tt_from=copy_link&utm_campaign=client_share&am ...

  9. Zookeeper命令操作(初始Zookeeper、JavaAPI操作、分布式锁实现、模拟12306售票分布式锁、Zookeeper集群搭建、选举投票)

    Zookeeper命令操作(初始Zookeeper.JavaAPI操作.分布式锁实现.模拟12306售票分布式锁.Zookeeper集群搭建.选举投票) 1.初始Zookeeper Zookeeper ...

最新文章

  1. python书籍_Python 之父宣布加盟微软!包邮送几本 Python 书籍压压惊!
  2. CentOS6.3 x86_64 mininstall 安装 apahce2.23+jdk1.7+tomecat7+mysql5.1.58
  3. 使用EditPlus 删除文本文件中多余的空行 和 EditPlus 选择列
  4. python中__name__的使用
  5. 《leetcode》spiral-matrix-ii(构造螺旋矩阵)
  6. 去掉xcode中警告的一些经验
  7. Android 图片相关整理
  8. CF1151FSonya and Informatics
  9. mhdd测试hdd硬盘软件,硬盘检测工具mhdd
  10. 锯齿波调制的FMCW雷达中频回波信号的公式推导
  11. 免费离线语音识别sdk
  12. 解决win7防火墙打不开的问题:错误代码1079和错误代码13
  13. 百度贴吧前负责人:做产品16年,我有9条心得
  14. earchs柱形图怎样使某个柱子变色
  15. 模拟登陆新版正方教务管理系统【可以获取学生基本/课表信息】
  16. 文件处理与垃圾回收机制
  17. “未来如此-绿数新生” 英特尔携手联想 用创新之力带来绿色之美
  18. PostgreSQL得出两个timestamp的秒数差值
  19. 21中质协6Sigma六西格玛绿黑带考试题库资料学习培训视频下载
  20. 一个简单的例子让你秒懂 python多线程

热门文章

  1. 修改电脑dns服务器地址
  2. [生化危机5]的图形渲染讲解
  3. linux用于暂时锁定用户帐号的命令是,linux用于暂时锁定用户帐号的命令是()
  4. C++实现简易的集合运算
  5. 知识文章:是谁控制了我们的浏览器?
  6. 区区几行代码你可以完成逆袭,从此成为抽象派画家,有图有真相
  7. EMQ 映云科技助力中移上研院建设西部首个国家级车联网先导示范区
  8. mybatis的if判断用法
  9. 怎样解决warning LNK4099: 未找到 PDB“vc100.pdb” 造成的链接 时间过长
  10. 概率密度变换公式 雅可比矩阵_学习笔记之——Jacobian matrix(雅可比矩阵)