为什么80%的码农都做不了架构师?>>>   

Leader Latch

Zookeeper在分布式系统中,常常被用于选主。在执行某个任务时,让所有的节点都知道有一个特别的,唯一的节点是任务的主节点,由主节点进行任务的执行,其他节点作为备用节点。通过这种热备方式,为分布式系统中任务执行的可控性,以及系统高可用性。

而Curator提供了两种选主机制,可以根据实际情况进行选用。

1. 关键API

org.apache.curator.framework.recipes.leader.LeaderLatch

2. 机制说明

LeaderLatch的方式,就是以一种抢占的方式来决定选主。比较简单粗暴,逻辑相对简单。类似非公平锁的抢占,所以,多节点是一个随机产生主节点的过程。基本就是,谁抢到就算谁的。

多个参与者(如:逻辑节点;某个线程等),指定在一个分组之下,每个分组内进行主节点抢占。

3. 用法

3.1 创建

方法1
public LeaderLatch(CuratorFramework client,String latchPath)

参数说明:

  • client : zk客户端链接
  • latchPath : 分组路径(zk中的path)
方法2
public LeaderLatch(CuratorFramework client,String latchPath,String id)

参数说明:

  • client : zk客户端链接
  • latchPath : 分组路径(zk中的path)
  • id : 参与者ID

3.2 使用

LeaderLatch创建好之后,必须执行:

leaderLatch.start();

这样,才能让leaderLatch开始参与选主过程。

由于LeaderLatch是一个不断抢占的过程,所以需要调用:

public boolean hasLeadership()

来检测当前参与者是否选主成功。这个方法是非阻塞的(立即返回),其结果只代表调用时的选主结果。所以,可以轮询此方法,或者当执行完本地逻辑后,需要执行分布式任务前检擦此方法。

不过,类似JDK中的CountDownLatch,LeaderLatch也提供了阻塞方法:

  1. 方法1
public void await()throws InterruptedException,EOFException

这个方法,会阻塞,直到选主成功。

  1. 方法2 为了避免方法1的长时间选主失败
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

这个方法会根据参数中指定的时间,作为等待的期限。到期后,返回选主结果。

对于LeaderLatch实例,无论是否轩主成功,最后都应该调用:

leaderLatch.close();

这样,才会把当前参与者的信息从选主分组中移除出去。如果,当前参与者是主,还会释放主的资格。避免死锁

4. 错误处理

在实际使用中,必须考虑链接问题引起的主身份丢失问题。 例如:当hasLeadership()返回true,之后链接出问题。 强烈建议:使用LeaderLatch时为其添加一个ConnectionStateListener

LeaderLatch实例会添加一个ConnectionStateListener来监听当前zk链接。 如果,链接不可用(SUSPENDED)则LeaderLatch会认为自己不在是主,等到链接恢复可用时,才可继续。 如果,链接断开(LOST),则LeaderLatch会认为自己不在是主,等到链接重新建立后,删除之前的参与者信息,然后重新参与选主。

5. 源码分析

5.1 LeaderLatch

5.1.1 类定义

先来看看类定义:

import java.io.Closeable;public class LeaderLatch implements Closeable
{...}

注意:实现了java.io.Closeable,所以你懂的, try()...catch{}。(3.2中的leaderLatch.close();

5.1.2 成员变量

public class LeaderLatch implements Closeable
{private final Logger log = LoggerFactory.getLogger(getClass());private final WatcherRemoveCuratorFramework client;private final String latchPath;private final String id;private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);private final AtomicBoolean hasLeadership = new AtomicBoolean(false);private final AtomicReference<String> ourPath = new AtomicReference<String>();private final ListenerContainer<LeaderLatchListener> listeners = new ListenerContainer<LeaderLatchListener>();private final CloseMode closeMode;private final AtomicReference<Future<?>> startTask = new AtomicReference<Future<?>>();private final ConnectionStateListener listener = new ConnectionStateListener(){@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState){handleStateChange(newState);}};private static final String LOCK_NAME = "latch-";private static final LockInternalsSorter sorter = new LockInternalsSorter(){@Overridepublic String fixForSorting(String str, String lockName){return StandardLockInternalsDriver.standardFixForSorting(str, lockName);}};public enum State { LATENT, STARTED, CLOSED }public enum CloseMode { SILENT, NOTIFY_LEADER }@VisibleForTestingvolatile CountDownLatch debugResetWaitLatch = null;
  • log : caurtor依赖slf4j
  • client : zk客户端(curator-framework提供)
  • latchPath : 分组路径(zk中的path)
  • id : 参与者ID
  • state
    • 内部枚举
    • 状态
      • LATENT 休眠
      • STARTED 已启动
      • CLOSED 已关闭
    • 使用AtomicReference原子化包装
  • hasLeadership
    • 是否为主
    • 使用AtomicBoolean原子化包装
  • ourPath
    • 使用AtomicReference原子化包装
  • listeners
    • 一组LeaderLatchListener监听器
  • closeMode
    • 内部枚举
    • LeaderLatch关闭方式
      • SILENT : 静默关闭,不触发相关监听器
      • NOTIFY_LEADER :关闭时触发监听器
  • startTask
    • 异步Future
    • 使用AtomicReference原子化包装
  • listener
    • 链接状态监听器
    • 参见 : 4. 错误处理
  • LOCK_NAME
    • 私有常量
  • sorter
    • 私有常量
    • 用于锁处理时,规范path
    • 对参与者进行排序
  • debugResetWaitLatch
    • volatile 可见性
    • reset()使用
    • 在测试时控制启动的时机,防止环境未初始化完成就处理了启动逻辑

注意:

这些成员变量都是`final`类型。
并且,对于引用类型都进行原子化包装,避免并发问题

5.1.3 构造器

提供多个构造器模板,最终都是调用:

    public LeaderLatch(CuratorFramework client, String latchPath){this(client, latchPath, "", CloseMode.SILENT);}public LeaderLatch(CuratorFramework client, String latchPath, String id){this(client, latchPath, id, CloseMode.SILENT);}public LeaderLatch(CuratorFramework client, String latchPath, String id, CloseMode closeMode){this.client = Preconditions.checkNotNull(client, "client cannot be null").newWatcherRemoveCuratorFramework();this.latchPath = PathUtils.validatePath(latchPath);this.id = Preconditions.checkNotNull(id, "id cannot be null");this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");}

可以发现:

  1. 默认是采用CloseMode.SILENT方式关闭
  2. 默认id是空字符串
  3. clientlatchPathidcloseMode不能为空

5.1.4 启动

第3节,介绍过LeaderLatch是由start()启动选主过程:

public void start() throws Exception {Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");startTask.set(AfterConnectionEstablished.execute(client, new Runnable(){@Overridepublic void run(){try{internalStart();}finally{startTask.set(null);}}}));
}

可以发现

  1. 调用原子性CAS方法,将状态由休眠更新已启动
  2. 执行了一个异步任务来完成启动过程
    1. 使用一个链接可用后回调方式

      • AfterConnectionEstablished.execute()

        • 内部使用了一个ThreadUtils.newSingleThreadExecutor
        • 单线程的线程池
        • 所以本地多个LeaderLatch实例的启动过程是序列化方式执行的
    2. 使用成员变量startTask持有异步Future
    3. 启动完成后会制空startTask
      • 说明启动过程可能会有状态变化
  3. 启动的过程实际是由internalStart()方法来完成
private synchronized void internalStart() {if ( state.get() == State.STARTED ){client.getConnectionStateListenable().addListener(listener);try{reset();}catch ( Exception e ){ThreadUtils.checkInterrupted(e);log.error("An error occurred checking resetting leadership.", e);}}
}
  1. internalStart()使用synchronized

    • 同步调用
    • 使用this进行互斥锁对象
    • 同一个LeaderLatch对象的多次启动同样序列化执行
      • 即便绕过第2布,也同样可以保证不会重复启动
  2. 进行状态判断
    • synchronized内部,再次判断
    • 相当于Double check
  3. 在当前连接上注册自带的监听器
  4. 调用reset()完成启动逻辑
  5. 处理了异常
    • 触发线程中断

      • internalStart()是异步执行,通过中断可以进行更细节的控制
    • 避免粗暴的抛出异常
      • internalStart()是异步执行
      • 避免当前线程意外中断
      • 同时也避免了第2.1步骤中那个单线程的线程池频繁的进行线程开/关所带来的额外开销
@VisibleForTesting
void reset() throws Exception {setLeadership(false);setNode(null);BackgroundCallback callback = new BackgroundCallback(){@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception{if ( debugResetWaitLatch != null ){debugResetWaitLatch.await();debugResetWaitLatch = null;}if ( event.getResultCode() == KeeperException.Code.OK.intValue() ){setNode(event.getName());if ( state.get() == State.CLOSED ){setNode(null);}else{getChildren();}}else{log.error("getChildren() failed. rc = " + event.getResultCode());}}};client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
}
private synchronized void setLeadership(boolean newValue)
{boolean oldValue = hasLeadership.getAndSet(newValue);if ( oldValue && !newValue ){ // Lost leadership, was true, now falselisteners.forEach(new Function<LeaderLatchListener, Void>(){@Overridepublic Void apply(LeaderLatchListener listener){listener.notLeader();return null;}});}else if ( !oldValue && newValue ){ // Gained leadership, was false, now truelisteners.forEach(new Function<LeaderLatchListener, Void>(){@Overridepublic Void apply(LeaderLatchListener input){input.isLeader();return null;}});}notifyAll();
}
private void setNode(String newValue) throws Exception{String oldPath = ourPath.getAndSet(newValue);if ( oldPath != null ){client.delete().guaranteed().inBackground().forPath(oldPath);}}
  1. reset()的可见范围

    • 利于测试
    • 使用了com.google.common.annotations.VisibleForTesting
  2. 初始化选主状态false

    1. getAndSet设置
    2. 根据不同的情况触发不同的监听器
      1. 得到
      2. 失去
    3. notifyAll()
      • 唤醒所有的synchronized等待
  3. 制空上次path

    • 如果上一次path有残留,则delete服务器上的信息
  4. 在latchPath下创建一个EPHEMERAL_SEQUENTIAL节点

    • 临时顺序节点
    • 并注册了回调
      • 回掉获取latchPath的子节点
      • 并判断自身是否为主

5.1.5 选主

LeaderLatch的选主判断逻辑,是由上一节中第12步中注册的回调方法来触发。 实际由checkLeadership()方法处理:

private void checkLeadership(List<String> children) throws Exception{final String localOurPath = ourPath.get();List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children);int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1;if ( ourIndex < 0 ){log.error("Can't find our node. Resetting. Index: " + ourIndex);reset();}else if ( ourIndex == 0 ){setLeadership(true);}else{String watchPath = sortedChildren.get(ourIndex - 1);Watcher watcher = new Watcher(){@Overridepublic void process(WatchedEvent event){if ( (state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null) ){try{getChildren();}catch ( Exception ex ){ThreadUtils.checkInterrupted(ex);log.error("An error occurred checking the leadership.", ex);}}}};BackgroundCallback callback = new BackgroundCallback(){@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception{if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ){// previous node is gone - resetreset();}}};// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leakclient.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));}}

当获取到最新的参与者列表后:

  1. 对列表进行排序
  2. 如果自身处于列表第一位,则当选为主
  3. 否则,在latchPath上增加监听/回调
    1. 监听列表中上一位参与者

      • 当上一位参与者退出(节点被删除时)
      • 重新getChildren()再次进行选主
    2. 当latchPath发生变动(如:删除)
      • 调用reset(),重新进行启动过程

        • 即可导致hasLeadership()失效

6. 测试

package com.roc.curator.demo.leader.latchimport org.apache.commons.lang3.RandomStringUtils
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.leader.LeaderLatch
import org.apache.curator.retry.ExponentialBackoffRetry
import org.junit.Before
import org.junit.Test
import java.util.*
import java.util.concurrent.TimeUnit/*** Created by roc on 2017/5/25.*/
class LatchParticipant() {val LATCH_PATH: String = "/test/leader/latch"var client: CuratorFramework = CuratorFrameworkFactory.builder().connectString("0.0.0.0:8888").connectionTimeoutMs(5000).retryPolicy(ExponentialBackoffRetry(1000, 10)).sessionTimeoutMs(3000).build()@Before fun init() {client.start()}@Test fun runTest() {var id: String = RandomStringUtils.randomAlphabetic(10)println("id : $id ")val time = Date()var latch: LeaderLatch = LeaderLatch(client, LATCH_PATH, id)latch.start()println("$id 开始竞选 $time")while(!latch.await(3, TimeUnit.SECONDS)){println("$id 选主失败 : $time")println("当前主是:${latch.leader.id}")println("参与者:${latch.participants}")}println("$id 选主成功 $time")while (latch.hasLeadership()) {println("$id 执行 $time")TimeUnit.SECONDS.sleep(2)if (Math.random() > 0.89) {break;}}println("$id 结束此轮: $time")latch.close()}
}

zookeeper节点:

get /test/leader/latch/_c_9b313527-e0ed-410f-9510-30e5fd92b5c6-latch-0000000208
zNillKMfuB
cZxid = 0x1db19
ctime = Thu May 25 20:53:22 CST 2017
mZxid = 0x1db19
mtime = Thu May 25 20:53:22 CST 2017
pZxid = 0x1db19
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x15156529fae07e9
dataLength = 10
numChildren = 0

转载于:https://my.oschina.net/roccn/blog/909252

[Curator] Leader Latch 的使用与分析相关推荐

  1. Apache Curator Leader Election

    http://blog.csdn.net/collonn/article/details/43968655 用于Leader选举,也可以用Shared Reentrant Lock来实现. 如果需要集 ...

  2. 五、curator recipes之选举主节点Leader Latch

    简介 在分布式计算中,主节点选举是为了把某个进程作为主节点来控制其它节点的过程.在选举结束之前,我们不知道哪个节点会成为主节点.curator对于主节点选举有两种实现方式,本文示例演示Latch的实现 ...

  3. [Curator] Path Cache 的使用与分析

    为什么80%的码农都做不了架构师?>>>    Path Cache Path Cache其实就是用于对zk节点的监听.不论是子节点的新增.更新或者移除的时候,Path Cache都 ...

  4. leader选举的源码分析

    源码分析,最关键的是要找到一个入口,对于zk的leader选举,并不是由客户端来触发,而是在启动的时候会触发一次选举.因此我们可以直接去看启动脚本zkServer.sh中的运行命令 ZOOMAIN就是 ...

  5. Oracle等待事件(一)—— latch cache buffers chains 分析与优化思路

    一. 什么是CBC等待 首先我们需要知道CBC等待发生在哪里,为什么会发生,才能理解应该如何定位,如何处理. 首先,CBC latch是用于保护buffer cache的,因此CBC等待一定发生在bu ...

  6. leader选举的源码分析-Messenger

    在Messenger里面构建了两个线程,一个是WorkerSender,一个是WorkerReceiver. 这两个线程是分别用来发送和接收消息的线程.具体做什么,暂时先不分析. Messenger( ...

  7. leader选举的源码分析-startLeaderElection

    看到这个方法,有没有两眼放光的感觉?没错,前面铺垫了这么长,终于进入leader选举的方法了 synchronized public void startLeaderElection() { try ...

  8. leader选举的源码分析-QuorumPeer.start

    QuorumPeer.start方法,重写了Thread的start.也就是在线程启动之前,会做以下操作 1. 通过loadDataBase恢复快照数据 2. cnxnFactory.start()  ...

  9. 使用Zookeeper实现leader选举-Leader Latch

    参与选举的所有节点,会创建一个顺序节点,其中最小的节点会设置为master节点, 没抢到Leader的节点都监听前一个节点的删除事件,在前一个节点删除后进行重新抢主,当master节点手动调用clos ...

  10. leader选举的源码分析-FastLeaderElection.starter

    starter方法里面,设置了一些成员属性,并且构建了两个阻塞队列,分别是sendQueue和recvqueue.并且实例化了一个Messager private void starter(Quoru ...

最新文章

  1. 8 ServletContext
  2. 趋势畅想-搭载android系统的智能数码相机
  3. linux runable进程查询,关于Linux下进程的详解【进程查看与管理】
  4. 学习笔记:区块链概念入门
  5. 【转载】Eclipse vs IDEA快捷键对比大全(win系统)
  6. Layui 数据表格:用户个性化定制列(拖拽,隐藏)
  7. 机器学习基础知识(一):机器学习三大流派
  8. html实现颜色色板,JS实现的系统调色板完整实例
  9. Latex学习笔记 (8) 字体样式之衬线字体与无衬线体字体
  10. lammps教程:velocity命令三种使用方法
  11. ZKtime5.0考勤管理系统标准版客户端登录忘记登录密码
  12. 【Unity】天气特效:打雷下雨
  13. Hidden Markov Models Forward算法
  14. 深入浅出再谈Unity内存泄漏
  15. org.springframework.dao.EmptyResultDataAccessException: Incorrect result size: expected 1, actual 0
  16. 2021级-JAVA02 基础语法1--标识符、常量与变量、数据类型、运算符与表达式209 天
  17. 【更新】Project 读写管理控件Aspose.Tasks V17.5发布 | 附下载
  18. 牛客竞赛数学专题班生成函数I 题解
  19. 使用navicat连接腾讯云mysql数据库
  20. 11月15日 作业2,黑洞子弹,子弹发射位置朝着准星方向 UE4斯坦福 学习笔记

热门文章

  1. 二次拟合r方_使用SPSS拟合曲线
  2. php主页修改软件,程序安装后依然是老版主页的修改办法
  3. java生成点阵图_【图片】一个零基础的小白是如何脱变成Java后端工程师的?【java吧】_百度贴吧...
  4. 实现option上下移动_jQuery操作Select的Option上下移动及移除添加等等
  5. 如何运行导入的项目_从0到1学习Flink》—— Flink 项目如何运行?
  6. python人脸识别要怎么实现_详解如何用OpenCV + Python 实现人脸识别
  7. python网盘开发_python实现网盘自动化操作(GUI版)
  8. 深度之眼-科赛网二分类大赛入门之路
  9. jQuery.ajax 调用 服务(.aspx,.asmx)
  10. php 数据库备份还原