[Curator] Leader Latch 的使用与分析
为什么80%的码农都做不了架构师?>>>
Leader Latch
Zookeeper在分布式系统中,常常被用于选主。在执行某个任务时,让所有的节点都知道有一个特别的,唯一的节点是任务的主节点,由主节点进行任务的执行,其他节点作为备用节点。通过这种热备方式,为分布式系统中任务执行的可控性,以及系统高可用性。
而Curator提供了两种选主机制,可以根据实际情况进行选用。
1. 关键API
org.apache.curator.framework.recipes.leader.LeaderLatch
2. 机制说明
LeaderLatch的方式,就是以一种抢占的方式来决定选主。比较简单粗暴,逻辑相对简单。类似非公平锁的抢占,所以,多节点是一个随机产生主节点的过程。基本就是,谁抢到就算谁的。
多个参与者(如:逻辑节点;某个线程等),指定在一个分组之下,每个分组内进行主节点抢占。
3. 用法
3.1 创建
方法1
public LeaderLatch(CuratorFramework client,String latchPath)
参数说明:
- client : zk客户端链接
- latchPath : 分组路径(zk中的path)
方法2
public LeaderLatch(CuratorFramework client,String latchPath,String id)
参数说明:
- client : zk客户端链接
- latchPath : 分组路径(zk中的path)
- id : 参与者ID
3.2 使用
LeaderLatch创建好之后,必须执行:
leaderLatch.start();
这样,才能让leaderLatch开始参与选主过程。
由于LeaderLatch是一个不断抢占的过程,所以需要调用:
public boolean hasLeadership()
来检测当前参与者是否选主成功。这个方法是非阻塞的(立即返回),其结果只代表调用时的选主结果。所以,可以轮询此方法,或者当执行完本地逻辑后,需要执行分布式任务前检擦此方法。
不过,类似JDK中的CountDownLatch
,LeaderLatch也提供了阻塞方法:
- 方法1
public void await()throws InterruptedException,EOFException
这个方法,会阻塞,直到选主成功。
- 方法2 为了避免方法1的长时间选主失败
public boolean await(long timeout,TimeUnit unit)throws InterruptedException
这个方法会根据参数中指定的时间,作为等待的期限。到期后,返回选主结果。
对于LeaderLatch实例,无论是否轩主成功,最后都应该调用:
leaderLatch.close();
这样,才会把当前参与者的信息从选主分组中移除出去。如果,当前参与者是主,还会释放主的资格。避免死锁。
4. 错误处理
在实际使用中,必须考虑链接问题引起的主身份丢失问题。 例如:当
hasLeadership()
返回true,之后链接出问题。 强烈建议:使用LeaderLatch时为其添加一个ConnectionStateListener
LeaderLatch实例会添加一个ConnectionStateListener
来监听当前zk链接。 如果,链接不可用(SUSPENDED)则LeaderLatch会认为自己不在是主,等到链接恢复可用时,才可继续。 如果,链接断开(LOST),则LeaderLatch会认为自己不在是主,等到链接重新建立后,删除之前的参与者信息,然后重新参与选主。
5. 源码分析
5.1 LeaderLatch
5.1.1 类定义
先来看看类定义:
import java.io.Closeable;public class LeaderLatch implements Closeable
{...}
注意:实现了java.io.Closeable
,所以你懂的, try()...catch{}。(3.2中的leaderLatch.close();
)
5.1.2 成员变量
public class LeaderLatch implements Closeable
{private final Logger log = LoggerFactory.getLogger(getClass());private final WatcherRemoveCuratorFramework client;private final String latchPath;private final String id;private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);private final AtomicBoolean hasLeadership = new AtomicBoolean(false);private final AtomicReference<String> ourPath = new AtomicReference<String>();private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();private final CloseMode closeMode;private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>();private final ConnectionStateListener listener = new ConnectionStateListener(){@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState){handleStateChange(newState);}};private static final String LOCK_NAME = "latch-";private static final LockInternalsSorter sorter = new LockInternalsSorter(){@Overridepublic String fixForSorting(String str, String lockName){return StandardLockInternalsDriver.standardFixForSorting(str, lockName);}};public enum State { LATENT, STARTED, CLOSED }public enum CloseMode { SILENT, NOTIFY_LEADER }@VisibleForTestingvolatile CountDownLatch debugResetWaitLatch = null;
- log : caurtor依赖slf4j
- client : zk客户端(curator-framework提供)
- latchPath : 分组路径(zk中的path)
- id : 参与者ID
- state
- 内部枚举
- 状态
- LATENT 休眠
- STARTED 已启动
- CLOSED 已关闭
- 使用
AtomicReference
原子化包装
- hasLeadership
- 是否为主
- 使用
AtomicBoolean
原子化包装
- ourPath
- 使用
AtomicReference
原子化包装
- 使用
- listeners
- 一组
LeaderLatchListener
监听器
- 一组
- closeMode
- 内部枚举
- LeaderLatch关闭方式
- SILENT : 静默关闭,不触发相关监听器
- NOTIFY_LEADER :关闭时触发监听器
- startTask
- 异步Future
- 使用
AtomicReference
原子化包装
- listener
- 链接状态监听器
- 参见 : 4. 错误处理
- LOCK_NAME
- 私有常量
- sorter
- 私有常量
- 用于锁处理时,规范path
- 对参与者进行排序
- debugResetWaitLatch
- volatile 可见性
- reset()使用
- 在测试时控制启动的时机,防止环境未初始化完成就处理了启动逻辑
注意:
这些成员变量都是`final`类型。
并且,对于引用类型都进行原子化包装,避免并发问题
5.1.3 构造器
提供多个构造器模板,最终都是调用:
public LeaderLatch(CuratorFramework client, String latchPath){this(client, latchPath, "", CloseMode.SILENT);}public LeaderLatch(CuratorFramework client, String latchPath, String id){this(client, latchPath, id, CloseMode.SILENT);}public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode){this.client = Preconditions.checkNotNull(client, "client cannot be null").newWatcherRemoveCuratorFramework();this.latchPath = PathUtils.validatePath(latchPath);this.id = Preconditions.checkNotNull(id, "id cannot be null");this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");}
可以发现:
- 默认是采用
CloseMode.SILENT
方式关闭 - 默认
id
是空字符串 client
、latchPath
、id
、closeMode
不能为空
5.1.4 启动
第3节,介绍过LeaderLatch是由start()
启动选主过程:
public void start() throws Exception {Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");startTask.set(AfterConnectionEstablished.execute(client, new Runnable(){@Overridepublic void run(){try{internalStart();}finally{startTask.set(null);}}}));
}
可以发现
- 调用原子性CAS方法,将状态由休眠
更新
到已启动
- 执行了一个异步任务来完成启动过程
- 使用一个链接可用后回调方式
AfterConnectionEstablished.execute()
- 内部使用了一个
ThreadUtils.newSingleThreadExecutor
- 单线程的线程池
- 所以本地多个LeaderLatch实例的启动过程是序列化方式执行的
- 内部使用了一个
- 使用成员变量
startTask
持有异步Future - 启动完成后会制空
startTask
- 说明启动过程可能会有状态变化
- 使用一个链接可用后回调方式
- 启动的过程实际是由
internalStart()
方法来完成
private synchronized void internalStart() {if ( state.get() == State.STARTED ){client.getConnectionStateListenable().addListener(listener);try{reset();}catch ( Exception e ){ThreadUtils.checkInterrupted(e);log.error("An error occurred checking resetting leadership.", e);}}
}
internalStart()
使用synchronized
- 同步调用
- 使用this进行互斥锁对象
- 同一个LeaderLatch对象的多次启动同样序列化执行
- 即便绕过第2布,也同样可以保证不会重复启动
- 进行状态判断
synchronized
内部,再次判断- 相当于Double check
- 在当前连接上注册自带的监听器
- 调用
reset()
完成启动逻辑 - 处理了异常
- 触发线程中断
internalStart()
是异步执行,通过中断可以进行更细节的控制
- 避免粗暴的抛出异常
internalStart()
是异步执行- 避免当前线程意外中断
- 同时也避免了第2.1步骤中那个单线程的线程池频繁的进行线程开/关所带来的额外开销
- 触发线程中断
@VisibleForTesting
void reset() throws Exception {setLeadership(false);setNode(null);BackgroundCallback callback = new BackgroundCallback(){@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception{if ( debugResetWaitLatch != null ){debugResetWaitLatch.await();debugResetWaitLatch = null;}if ( event.getResultCode() == KeeperException.Code.OK.intValue() ){setNode(event.getName());if ( state.get() == State.CLOSED ){setNode(null);}else{getChildren();}}else{log.error("getChildren() failed. rc = " + event.getResultCode());}}};client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
}
private synchronized void setLeadership(boolean newValue)
{boolean oldValue = hasLeadership.getAndSet(newValue);if ( oldValue && !newValue ){ // Lost leadership, was true, now falselisteners.forEach(new Function<LeaderLatchListener, Void>(){@Overridepublic Void apply(LeaderLatchListener listener){listener.notLeader();return null;}});}else if ( !oldValue && newValue ){ // Gained leadership, was false, now truelisteners.forEach(new Function<LeaderLatchListener, Void>(){@Overridepublic Void apply(LeaderLatchListener input){input.isLeader();return null;}});}notifyAll();
}
private void setNode(String newValue) throws Exception{String oldPath = ourPath.getAndSet(newValue);if ( oldPath != null ){client.delete().guaranteed().inBackground().forPath(oldPath);}}
reset()的可见范围
- 利于测试
- 使用了
com.google.common.annotations.VisibleForTesting
初始化选主状态false
- getAndSet设置
- 根据不同的情况触发不同的监听器
- 得到
- 失去
- notifyAll()
- 唤醒所有的synchronized等待
制空上次path
- 如果上一次path有残留,则delete服务器上的信息
在latchPath下创建一个
EPHEMERAL_SEQUENTIAL
节点- 临时顺序节点
- 并注册了回调
- 回掉获取latchPath的子节点
- 并判断自身是否为主
5.1.5 选主
LeaderLatch的选主判断逻辑,是由上一节中第12步中注册的回调方法来触发。 实际由checkLeadership()
方法处理:
private void checkLeadership(List<String> children) throws Exception{final String localOurPath = ourPath.get();List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;if ( ourIndex < 0 ){log.error("Can't find our node. Resetting. Index: " + ourIndex);reset();}else if ( ourIndex == 0 ){setLeadership(true);}else{String watchPath = sortedChildren.get(ourIndex - 1);Watcher watcher = new Watcher(){@Overridepublic void process(WatchedEvent event){if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) ){try{getChildren();}catch ( Exception ex ){ThreadUtils.checkInterrupted(ex);log.error("An error occurred checking the leadership.", ex);}}}};BackgroundCallback callback = new BackgroundCallback(){@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception{if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ){// previous node is gone - resetreset();}}};// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leakclient.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));}}
当获取到最新的参与者列表后:
- 对列表进行排序
- 如果自身处于列表第一位,则当选为主
- 否则,在latchPath上增加监听/回调
- 监听列表中上一位参与者
- 当上一位参与者退出(节点被删除时)
- 重新
getChildren()
再次进行选主
- 当latchPath发生变动(如:删除)
- 调用
reset()
,重新进行启动过程- 即可导致
hasLeadership()
失效
- 即可导致
- 调用
- 监听列表中上一位参与者
6. 测试
package com.roc.curator.demo.leader.latchimport org.apache.commons.lang3.RandomStringUtils
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.leader.LeaderLatch
import org.apache.curator.retry.ExponentialBackoffRetry
import org.junit.Before
import org.junit.Test
import java.util.*
import java.util.concurrent.TimeUnit/*** Created by roc on 2017/5/25.*/
class LatchParticipant() {val LATCH_PATH: String = "/test/leader/latch"var client: CuratorFramework = CuratorFrameworkFactory.builder().connectString("0.0.0.0:8888").connectionTimeoutMs(5000).retryPolicy(ExponentialBackoffRetry(1000, 10)).sessionTimeoutMs(3000).build()@Before fun init() {client.start()}@Test fun runTest() {var id: String = RandomStringUtils.randomAlphabetic(10)println("id : $id ")val time = Date()var latch: LeaderLatch = LeaderLatch(client, LATCH_PATH, id)latch.start()println("$id 开始竞选 $time")while(!latch.await(3, TimeUnit.SECONDS)){println("$id 选主失败 : $time")println("当前主是:${latch.leader.id}")println("参与者:${latch.participants}")}println("$id 选主成功 $time")while (latch.hasLeadership()) {println("$id 执行 $time")TimeUnit.SECONDS.sleep(2)if (Math.random() > 0.89) {break;}}println("$id 结束此轮: $time")latch.close()}
}
zookeeper节点:
get /test/leader/latch/_c_9b313527-e0ed-410f-9510-30e5fd92b5c6-latch-0000000208
zNillKMfuB
cZxid = 0x1db19
ctime = Thu May 25 20:53:22 CST 2017
mZxid = 0x1db19
mtime = Thu May 25 20:53:22 CST 2017
pZxid = 0x1db19
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x15156529fae07e9
dataLength = 10
numChildren = 0
转载于:https://my.oschina.net/roccn/blog/909252
[Curator] Leader Latch 的使用与分析相关推荐
- Apache Curator Leader Election
http://blog.csdn.net/collonn/article/details/43968655 用于Leader选举,也可以用Shared Reentrant Lock来实现. 如果需要集 ...
- 五、curator recipes之选举主节点Leader Latch
简介 在分布式计算中,主节点选举是为了把某个进程作为主节点来控制其它节点的过程.在选举结束之前,我们不知道哪个节点会成为主节点.curator对于主节点选举有两种实现方式,本文示例演示Latch的实现 ...
- [Curator] Path Cache 的使用与分析
为什么80%的码农都做不了架构师?>>> Path Cache Path Cache其实就是用于对zk节点的监听.不论是子节点的新增.更新或者移除的时候,Path Cache都 ...
- leader选举的源码分析
源码分析,最关键的是要找到一个入口,对于zk的leader选举,并不是由客户端来触发,而是在启动的时候会触发一次选举.因此我们可以直接去看启动脚本zkServer.sh中的运行命令 ZOOMAIN就是 ...
- Oracle等待事件(一)—— latch cache buffers chains 分析与优化思路
一. 什么是CBC等待 首先我们需要知道CBC等待发生在哪里,为什么会发生,才能理解应该如何定位,如何处理. 首先,CBC latch是用于保护buffer cache的,因此CBC等待一定发生在bu ...
- leader选举的源码分析-Messenger
在Messenger里面构建了两个线程,一个是WorkerSender,一个是WorkerReceiver. 这两个线程是分别用来发送和接收消息的线程.具体做什么,暂时先不分析. Messenger( ...
- leader选举的源码分析-startLeaderElection
看到这个方法,有没有两眼放光的感觉?没错,前面铺垫了这么长,终于进入leader选举的方法了 synchronized public void startLeaderElection() { try ...
- leader选举的源码分析-QuorumPeer.start
QuorumPeer.start方法,重写了Thread的start.也就是在线程启动之前,会做以下操作 1. 通过loadDataBase恢复快照数据 2. cnxnFactory.start() ...
- 使用Zookeeper实现leader选举-Leader Latch
参与选举的所有节点,会创建一个顺序节点,其中最小的节点会设置为master节点, 没抢到Leader的节点都监听前一个节点的删除事件,在前一个节点删除后进行重新抢主,当master节点手动调用clos ...
- leader选举的源码分析-FastLeaderElection.starter
starter方法里面,设置了一些成员属性,并且构建了两个阻塞队列,分别是sendQueue和recvqueue.并且实例化了一个Messager private void starter(Quoru ...
最新文章
- 8 ServletContext
- 趋势畅想-搭载android系统的智能数码相机
- linux runable进程查询,关于Linux下进程的详解【进程查看与管理】
- 学习笔记:区块链概念入门
- 【转载】Eclipse vs IDEA快捷键对比大全(win系统)
- Layui 数据表格:用户个性化定制列(拖拽,隐藏)
- 机器学习基础知识(一):机器学习三大流派
- html实现颜色色板,JS实现的系统调色板完整实例
- Latex学习笔记 (8) 字体样式之衬线字体与无衬线体字体
- lammps教程:velocity命令三种使用方法
- ZKtime5.0考勤管理系统标准版客户端登录忘记登录密码
- 【Unity】天气特效:打雷下雨
- Hidden Markov Models Forward算法
- 深入浅出再谈Unity内存泄漏
- org.springframework.dao.EmptyResultDataAccessException: Incorrect result size: expected 1, actual 0
- 2021级-JAVA02 基础语法1--标识符、常量与变量、数据类型、运算符与表达式209 天
- 【更新】Project 读写管理控件Aspose.Tasks V17.5发布 | 附下载
- 牛客竞赛数学专题班生成函数I 题解
- 使用navicat连接腾讯云mysql数据库
- 11月15日 作业2,黑洞子弹,子弹发射位置朝着准星方向 UE4斯坦福 学习笔记
热门文章
- 二次拟合r方_使用SPSS拟合曲线
- php主页修改软件,程序安装后依然是老版主页的修改办法
- java生成点阵图_【图片】一个零基础的小白是如何脱变成Java后端工程师的?【java吧】_百度贴吧...
- 实现option上下移动_jQuery操作Select的Option上下移动及移除添加等等
- 如何运行导入的项目_从0到1学习Flink》—— Flink 项目如何运行?
- python人脸识别要怎么实现_详解如何用OpenCV + Python 实现人脸识别
- python网盘开发_python实现网盘自动化操作(GUI版)
- 深度之眼-科赛网二分类大赛入门之路
- jQuery.ajax 调用 服务(.aspx,.asmx)
- php 数据库备份还原