ZooKeeper : Curator框架之Leader选举LeaderLatch
Leader选举
在分布式计算中,Leader
选举是指指定单个进程作为分布在多台计算机(节点)之间的某些任务的组织者的过程。在任务开始之前,所有网络节点都不知道哪个节点将充当任务的Leader
。然而,在执行Leader
选举算法之后,整个网络中的每个节点都会选举出一个特定的、唯一的节点作为任务Leader
。
LeaderLatch
Curator
框架提供了两种Leader
选举的实现,LeaderLatch
和LeaderSelector
,本篇博客介绍LeaderLatch
的使用。
测试代码
CuratorFrameworkProperties
类(提供CuratorFramework
需要的一些配置信息):
package com.kaven.zookeeper;import org.apache.curator.RetryPolicy;
import org.apache.curator.retry.ExponentialBackoffRetry;public class CuratorFrameworkProperties {// 连接地址public static final String CONNECT_ADDRESS = "192.168.31.175:9000";// 连接超时时间public static final int CONNECTION_TIMEOUT_MS = 40000;// Session超时时间public static final int SESSION_TIMEOUT_MS = 10000;// 命名空间public static final String NAMESPACE = "MyNamespace";// 重试策略public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);
}
LeaderLatchRunnable
类(实现了Runnable
接口,模拟分布式服务节点参与Leader
选举):
package com.kaven.zookeeper;import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class LeaderLatchRunnable implements Runnable{private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();@SneakyThrows@Overridepublic void run() {// 使用不同的CuratorFramework实例,表示不同的分布式服务节点CuratorFramework curator = getCuratorFramework();curator.start();assert curator.getState().equals(CuratorFrameworkState.STARTED);// 模拟随机加入的分布式服务节点int randomSleep = new Random().nextInt(1000);Thread.sleep(randomSleep);// 创建LeaderLatch实例(用于Leader选举)// curator是CuratorFramework实例,用于与ZooKeeper交互// "/services/leader"是latchPath,Leader节点会成功创建该节点(其他节点则会失败)// 将线程名(Thread.currentThread().getName())作为分布式服务节点的id// LeaderLatch.CloseMode.NOTIFY_LEADER表示close模式,即节点进行close操作后的模式LeaderLatch latch = new LeaderLatch(curator, "/services/leader",Thread.currentThread().getName(), LeaderLatch.CloseMode.NOTIFY_LEADER);// 给LeaderLatch实例添加监听器(LeaderLatchListenerImpl实例)// EXECUTOR_SERVICE表示执行该LeaderLatchListenerImpl实例的Executor实例latch.addListener(new LeaderLatchListenerImpl(latch), EXECUTOR_SERVICE);System.out.println(latch.getId() + "准备好了!");// 开始Leader选举latch.start();System.out.println(latch.getId() + "开始Leader选举!");}private CuratorFramework getCuratorFramework() {// 创建CuratorFramework实例return CuratorFrameworkFactory.builder().connectString(CuratorFrameworkProperties.CONNECT_ADDRESS).retryPolicy(CuratorFrameworkProperties.RETRY_POLICY).connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS).sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS).namespace(CuratorFrameworkProperties.NAMESPACE).build();}@RequiredArgsConstructorprivate static class LeaderLatchListenerImpl implements LeaderLatchListener {private final LeaderLatch LATCH;// 被选举为Leader节点时调用@SneakyThrows@Overridepublic void isLeader() {System.out.println("--------------------------------" + LATCH.getId() + "被选举为Leader--------------------------------");LATCH.getParticipants().forEach(System.out::println);// 睡眠5秒就close(该节点会从Leader选举中移除),其他节点会重新进行Leader选举Thread.sleep(5000);LATCH.close();}@Overridepublic void notLeader() {// 节点调用了close方法,只有在LeaderLatch.CloseMode.NOTIFY_LEADER模式下会调用该方法// LeaderLatch.CloseMode.SILENT模式下不会调用该方法System.out.println("--------------------------------" + LATCH.getId() + "离开,重新进行Leader选举--------------------------------");}}
}
启动类:
package com.kaven.zookeeper;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class Application {private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();public static void main(String[] args) throws Exception {for (int i = 0; i < 7; i++) {EXECUTOR_SERVICE.execute(new LeaderLatchRunnable());}Thread.sleep(10000000);}
}
模拟7
个分布式服务节点进行Leader
选举,输出如下所示:
pool-1-thread-6准备好了!
pool-1-thread-6开始Leader选举!
pool-1-thread-7准备好了!
pool-1-thread-7开始Leader选举!
pool-1-thread-5准备好了!
pool-1-thread-5开始Leader选举!
pool-1-thread-2准备好了!
pool-1-thread-2开始Leader选举!
pool-1-thread-4准备好了!
pool-1-thread-4开始Leader选举!
pool-1-thread-1准备好了!
pool-1-thread-1开始Leader选举!
pool-1-thread-3准备好了!
pool-1-thread-3开始Leader选举!
--------------------------------pool-1-thread-1被选举为Leader--------------------------------
Participant{id='pool-1-thread-1', isLeader=true}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-1离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-4被选举为Leader--------------------------------
Participant{id='pool-1-thread-4', isLeader=true}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-4离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-3被选举为Leader--------------------------------
Participant{id='pool-1-thread-3', isLeader=true}
Participant{id='pool-1-thread-5', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-3离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-5被选举为Leader--------------------------------
Participant{id='pool-1-thread-5', isLeader=true}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-5离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-6被选举为Leader--------------------------------
Participant{id='pool-1-thread-6', isLeader=true}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-6离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-7被选举为Leader--------------------------------
Participant{id='pool-1-thread-7', isLeader=true}
Participant{id='pool-1-thread-2', isLeader=false}
--------------------------------pool-1-thread-7离开,重新进行Leader选举--------------------------------
--------------------------------pool-1-thread-2被选举为Leader--------------------------------
Participant{id='pool-1-thread-2', isLeader=true}
--------------------------------pool-1-thread-2离开,重新进行Leader选举--------------------------------
CloseMode
枚举类:
/*** 节点(latch)调用close方法时如何处理监听器*/public enum CloseMode{/*** 当latch关闭时,不通知监听器(默认行为)*/SILENT,/*** 当latch关闭时,通知监听器*/NOTIFY_LEADER}
修改成LeaderLatch.CloseMode.SILENT
模式(默认模式,因此也可以不设置closeMode
属性):
LeaderLatch latch = new LeaderLatch(curator, "/services/leader",Thread.currentThread().getName(), LeaderLatch.CloseMode.SILENT);
输出变成如下所示:
pool-1-thread-7准备好了!
pool-1-thread-7开始Leader选举!
pool-1-thread-3准备好了!
pool-1-thread-3开始Leader选举!
pool-1-thread-6准备好了!
pool-1-thread-6开始Leader选举!
pool-1-thread-4准备好了!
pool-1-thread-4开始Leader选举!
pool-1-thread-5准备好了!
pool-1-thread-5开始Leader选举!
pool-1-thread-2准备好了!
pool-1-thread-2开始Leader选举!
pool-1-thread-1准备好了!
pool-1-thread-1开始Leader选举!
--------------------------------pool-1-thread-1被选举为Leader--------------------------------
Participant{id='pool-1-thread-1', isLeader=true}
Participant{id='pool-1-thread-2', isLeader=false}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-2被选举为Leader--------------------------------
Participant{id='pool-1-thread-2', isLeader=true}
Participant{id='pool-1-thread-3', isLeader=false}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-3被选举为Leader--------------------------------
Participant{id='pool-1-thread-3', isLeader=true}
Participant{id='pool-1-thread-6', isLeader=false}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-6被选举为Leader--------------------------------
Participant{id='pool-1-thread-6', isLeader=true}
Participant{id='pool-1-thread-7', isLeader=false}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-7被选举为Leader--------------------------------
Participant{id='pool-1-thread-7', isLeader=true}
Participant{id='pool-1-thread-4', isLeader=false}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-4被选举为Leader--------------------------------
Participant{id='pool-1-thread-4', isLeader=true}
Participant{id='pool-1-thread-5', isLeader=false}
--------------------------------pool-1-thread-5被选举为Leader--------------------------------
Participant{id='pool-1-thread-5', isLeader=true}
很显然监听器实例的notLeader
方法没有被调用。
/*** 从Leader选举中删除此实例* 如果此实例是Leader,则释放领导权* 释放领导权的唯一方法是调用 close()* 所有LeaderLatch实例最终都必须关闭*/public synchronized void close(CloseMode closeMode) throws IOException{Preconditions.checkState(state.compareAndSet(State.STARTED, State.CLOSED), "Already closed or has not been started");Preconditions.checkNotNull(closeMode, "closeMode cannot be null");cancelStartTask();try{setNode(null);client.removeWatchers();}catch ( Exception e ){ThreadUtils.checkInterrupted(e);throw new IOException(e);}finally{client.getConnectionStateListenable().removeListener(listener);switch ( closeMode ){case NOTIFY_LEADER:{ // 先设置hasLeadership属性(设置完后会调用监听器),再删除监听器setLeadership(false);listeners.clear();break;}default:{// 先删除监听器,再设置hasLeadership属性(设置完后会调用监听器,但已经没有监听器了)listeners.clear();setLeadership(false);break;}}}}
为什么LeaderLatchRunnable
实例的run
方法执行结束了,还能继续进行Leader
选举,因为执行了latch.start();
,有新线程被启动:
/*** 将此实例添加到Leader选举并尝试获得领导权*/public void start() throws Exception{Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");// 有新线程被启动startTask.set(AfterConnectionEstablished.execute(client, new Runnable(){@Overridepublic void run(){try{internalStart();}finally{startTask.set(null);}}}));}
LeaderLatch
类的public
方法如下图所示:
都是一些比较常规的方法,这里只介绍两个await
方法,其他方法比较简单。
await()
:调用该方法会导致当前线程等待,直到此实例获得领导权,除非线程被中断或关闭。如果此实例已经是Leader
,则此方法立即返回true
。否则,当前线程将被禁用并处于休眠状态,直到发生以下三种情况之一:- 此实例成为
Leader
- 其他线程中断当前线程
- 实例已关闭
- 此实例成为
await(long timeout, TimeUnit unit)
:调用该方法会导致当前线程等待,直到此实例获得领导权,除非线程被中断、指定的等待时间已过或实例已关闭。如果指定的等待时间已过或实例已关闭,则返回false
,如果等待时间小于或等于0
,则该方法根本不会等待。如果此实例已经是Leader
,则此方法立即返回true
。否则,当前线程将被禁用并处于休眠状态,直到发生以下四种情况之一:- 此实例成为
Leader
- 其他线程中断当前线程
- 指定的等待时间已过
- 实例已关闭
- 此实例成为
通过await
方法,使用LeaderLatch
进行Leader
选举就像使用CountDownLatch
一样方便,Curator
框架的Leader
选举实现LeaderLatch
就介绍到这里,源码以后会进行分析介绍,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。
ZooKeeper : Curator框架之Leader选举LeaderLatch相关推荐
- ZooKeeper集群与Leader选举
说说你对ZooKeeper集群与Leader选举的理解? ZooKeeper是一个开源分布式协调服务.分布式数据一致性解决方案.可基于ZooKeeper实现命名服务.集群管理.Master选举.分布式 ...
- springboot使用curator来实现leader选举
本文来说下springboot使用curator来实现leader选举 文章目录 概述 概述
- 面试官:说说你对ZooKeeper集群与Leader选举的理解?
作者:TalkingData 史天舒 来自:TalkingData ZooKeeper是一个开源分布式协调服务.分布式数据一致性解决方案.可基于ZooKeeper实现命名服务.集群管理.Master选 ...
- zookeeper的设计猜想-leader选举
当leader挂了,需要从其他follower节点中选择一个新的节点进行处理,这个时候就需要涉及到leader选举 从这个过程中,我们推导处了zookeeper的一些设计思想
- 集群没有leader_面试官问:说说你对ZooKeeper集群与Leader选举的理解?
ZooKeeper是一个开源分布式协调服务.分布式数据一致性解决方案.可基于ZooKeeper实现命名服务.集群管理.Master选举.分布式锁等功能. 高可用 为了保证ZooKeeper的可用性,在 ...
- ZooKeeper 技术内幕,Leader 选举是一个什么样的过程?
几个问题,引发思考: 什么时候 leader 选举? 选举的过程? 选举过程中,是否能提供服务? 选举结果,是否会丢失数据? 服务器角色 2 个小问题: 服务器节点有多少角色? 每个角色的作用? 角色 ...
- 分布式开发必须了解的Zookeeper的Leader选举机制(源码解析)
分布式开发必须知道的Zookeeper知识及其的Leader选举机制(ZAB原子广播协议) ZooKeeper是Hadoop下的一个子项目,它是一个针对大型分布式系统的可靠协调系统,提供的功能包括 ...
- zookeeper的Leader选举机制详解
转载自:https://www.toutiao.com/i6701570306445672963/?tt_from=copy_link&utm_campaign=client_share&am ...
- Zookeeper命令操作(初始Zookeeper、JavaAPI操作、分布式锁实现、模拟12306售票分布式锁、Zookeeper集群搭建、选举投票)
Zookeeper命令操作(初始Zookeeper.JavaAPI操作.分布式锁实现.模拟12306售票分布式锁.Zookeeper集群搭建.选举投票) 1.初始Zookeeper Zookeeper ...
最新文章
- python书籍_Python 之父宣布加盟微软!包邮送几本 Python 书籍压压惊!
- CentOS6.3 x86_64 mininstall 安装 apahce2.23+jdk1.7+tomecat7+mysql5.1.58
- 使用EditPlus 删除文本文件中多余的空行 和 EditPlus 选择列
- python中__name__的使用
- 《leetcode》spiral-matrix-ii(构造螺旋矩阵)
- 去掉xcode中警告的一些经验
- Android 图片相关整理
- CF1151FSonya and Informatics
- mhdd测试hdd硬盘软件,硬盘检测工具mhdd
- 锯齿波调制的FMCW雷达中频回波信号的公式推导
- 免费离线语音识别sdk
- 解决win7防火墙打不开的问题:错误代码1079和错误代码13
- 百度贴吧前负责人:做产品16年,我有9条心得
- earchs柱形图怎样使某个柱子变色
- 模拟登陆新版正方教务管理系统【可以获取学生基本/课表信息】
- 文件处理与垃圾回收机制
- “未来如此-绿数新生” 英特尔携手联想 用创新之力带来绿色之美
- PostgreSQL得出两个timestamp的秒数差值
- 21中质协6Sigma六西格玛绿黑带考试题库资料学习培训视频下载
- 一个简单的例子让你秒懂 python多线程
热门文章
- 修改电脑dns服务器地址
- [生化危机5]的图形渲染讲解
- linux用于暂时锁定用户帐号的命令是,linux用于暂时锁定用户帐号的命令是()
- C++实现简易的集合运算
- 知识文章:是谁控制了我们的浏览器?
- 区区几行代码你可以完成逆袭,从此成为抽象派画家,有图有真相
- EMQ 映云科技助力中移上研院建设西部首个国家级车联网先导示范区
- mybatis的if判断用法
- 怎样解决warning LNK4099: 未找到 PDB“vc100.pdb” 造成的链接 时间过长
- 概率密度变换公式 雅可比矩阵_学习笔记之——Jacobian matrix(雅可比矩阵)