curator是Netflix公司开源的一个ZooKeeper客户端封装。curator 对于锁这块做了一些封装,curator 提供了InterProcessMutex 这样一个 api。除了分布式锁之外,还提供了 leader 选举、分布式队列等常用的功能。本文主要以InterProcessMutex为例,介绍一下这个分布式可重入排它锁的实现原理。

Curator的几种锁方案:

  • InterProcessMutex:分布式可重入排它锁
  • InterProcessSemaphoreMutex:分布式排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器

接下来我们以InterProcessMutex为例,介绍一下这个分布式可重入排它锁的实现原理

1、实例化InterProcessMutex

Zookeeper 利用 path 创建临时顺序节点,实现公平锁的核心,也是最常用的一个构造函数。

public InterProcessMutex(CuratorFramework client, String path) {this(client, path, new StandardLockInternalsDriver());
}

maxLeases=1,表示可以获得分布式锁的线程数量(跨 JVM)为 1,即为互斥锁

   public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver){this(client, path, LOCK_NAME, 1, driver);
}

internals 的类型为 LockInternals ,InterProcessMutex 将分布式锁的申请和释放操作委托给internals 执行

InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {basePath = PathUtils.validatePath(path);internals = new LockInternals(client, driver, path, lockName, maxLeases);
}

2、InterProcessMutex.acquire

实例化完成的InterProcessMutex对象,开始调用acquire()方法来尝试加锁:

当时间为-1,表示无限等待

public void acquire() throws Exception
{if ( !internalLock(-1, null) ){throw new IOException("Lost connection while trying to acquire lock: " + basePath);}
}

指定获得锁最大的等待的时间,抢夺时,如果出现堵塞,会在超过该时间后,返回false。通过设置整个超时时间,避免出现大量的临时节点累积以及线程堵塞的问题。

public boolean acquire(long time, TimeUnit unit) throws Exception
{return internalLock(time, unit);
}

3、InterProcessMutex.internalLock

锁的可重入。每个InterProcessMutex实例,都会持有一个ConcurrentMap类型的threadData对象,以线程对象作为Key,以LockData作为Value值。

 private boolean internalLock(long time, TimeUnit unit) throws Exception{Thread currentThread = Thread.currentThread();LockData lockData = threadData.get(currentThread);if ( lockData != null ){// 实现可重入,同一线程再次 acquire,首先判断当前的映射表内(threadData)是否有该线程的锁信息,如果有则原子+1,然后返回lockData.lockCount.incrementAndGet();return true;}// 映射表内没有对应的锁信息,尝试通过LockInternals 获取锁String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());if ( lockPath != null ){// 成功获取锁,记录信息到映射表   LockData newLockData = new LockData(currentThread, lockPath);threadData.put(currentThread, newLockData);return true;}return false;
}

4 LockInternals.attemptLock

抢夺锁,尝试获取锁,并返回锁对应的 Zookeeper 临时顺序节点的路径。在 Zookeeper 中创建的临时顺序节点的路径,相当于一把待激活的分布式锁, 激活条件:同级目录子节点,名称排序最小(排队,公平锁)。

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{final long      startMillis = System.currentTimeMillis();// 无限等待时,millisToWait 为 nullfinal Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;// 创建 ZNode 节点时的数据内容,无关紧要,这里为 null,采用默认值(IP 地址)final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;// 当前已经重试次数,与CuratorFramework的重试策略有关int             retryCount = 0;String          ourPath = null;boolean         hasTheLock = false;  // 是否已经持有分布式锁boolean         isDone = false;   // 是否已经完成尝试获取分布式锁的操作while ( !isDone ){isDone = true;try{// 从 InterProcessMutex 的构造函数可知实际 driver 为 StandardLockInternalsDriver 的实例// 在Zookeeper中创建临时顺序节点ourPath = driver.createsTheLock(client, path, localLockNodeBytes);// 循环等待来激活分布式锁,实现锁的公平性hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);}catch ( KeeperException.NoNodeException e ){// 因 为 会 话 过 期 等 原 因 ,StandardLockInternalsDriver 因为无法找到创建的临时顺序节点而抛出 NoNodeException 异常if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ){// 满足重试策略尝试重新获取锁isDone = false;}else{// 不满足重试策略则继续抛出NoNodeExceptionthrow e;}}}if ( hasTheLock ){// 如果成功获得分布式锁,返回临时顺序节点的路径,上层将其封装成锁信息记录在映射表,方便锁重入return ourPath;}// 如果获取分布式锁失败,返回 nullreturn null;
}

5 driver.createsTheLock

这个driver的createsTheLock方法就是在创建这个锁,即在zookeeper的指定路径上,创建一个临时序列节点。注意:此时只是纯粹的创建了一个节点,不是说线程已经持有了锁。

public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{String ourPath;// lockNodeBytes 不为 null 则作为数据节点内容,否则采用默认内容(IP 地址)if ( lockNodeBytes != null ){// creatingParentContainersIfNeeded:用于创建父节点,如果不支持 CreateMode.CONTAINER,那么将采用 CreateMode.PERSISTENT,//CreateMode.EPHEMERAL_SEQUENTIAL:临时顺序节点,Zookeeper 能保证在节点产生的顺序性// 依据顺序来激活分布式锁,从而也实现了分布式锁的公平性,ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);}else{ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);}return ourPath;
}

6 LockInternals.internalLockLoop

循环等待来激活分布式锁,实现锁的公平性,判断自身是否能够持有锁。如果不能,进入wait,等待被唤醒。

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{boolean     haveTheLock = false;  // 是否已经持有分布式锁,默认为falseboolean     doDelete = false;  // 是否需要删除子节点try{if ( revocable.get() != null ){client.getData().usingWatcher(revocableWatcher).forPath(ourPath);}while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ){// 获取排序后的子节点列表,并且从小到大根据节点名称后10位数字进行排序。List<String>        children = getSortedChildren();// 获取前面自己创建的临时顺序子节点的名称String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash// 实现锁的公平性的核心逻辑PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);if ( predicateResults.getsTheLock() ){// 获得了锁,中断循环,继续返回上层haveTheLock = true;}else{// 没有获得到锁,就去监听上一个临时顺序节点String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();synchronized(this){try {// exists()会导致导致资源泄漏,因此 exists()可以监听不存在的 ZNode,因此采用 getData()// 上一临时顺序节点如果被删除,会唤醒当前线程继续竞争锁,正常情况下能直接获得锁,因为锁是公平的client.getData().usingWatcher(watcher).forPath(previousSequencePath);if ( millisToWait != null ){millisToWait -= (System.currentTimeMillis() - startMillis);startMillis = System.currentTimeMillis();if ( millisToWait <= 0 ){doDelete = true;     // 获取锁超时,标记删除之前创建的临时顺序节点break;}wait(millisToWait);  // 等待被唤醒,限时等待}else{wait();// 等待被唤醒,无限等待}}catch ( KeeperException.NoNodeException e ) {// client.getData()可能调用时抛出 NoNodeException,原因可能是锁被释放或会话过期(连接丢失)等// 这里并没有做任何处理,因为外层是 while 循环,再次执行 driver.getsTheLock 时会调用 validateOurIndex// 此 时 会 抛 出NoNodeException,从而进入下面的 catch 和 finally 逻辑,重新抛出上层尝试重试获取锁并删除临时顺序节点}}}}}catch ( Exception e ){// 标记删除,在 finally 删除之前创建的临时顺序节点(后台不断尝试)ThreadUtils.checkInterrupted(e);doDelete = true;// 重新抛出,尝试重新获取锁throw e;}finally{if ( doDelete ){deleteOurPath(ourPath);}}return haveTheLock;
}

7 driver.getsTheLock

进入 StandardLockInternalsDriver中。判断是否可以持有锁,判断规则:当前创建的节点是否在上一步获取到的子节点列表的首位。

public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{// 之前创建的临时顺序节点在排序后的子节点列表中的索引int             ourIndex = children.indexOf(sequenceNodeName);// 校验之前创建的临时顺序节点是否有效validateOurIndex(sequenceNodeName, ourIndex);// 由 InterProcessMutex 的构造函数可知,maxLeases 为 1,即只有 ourIndex 为 0 时,线程才能持有锁,或者说该线程创建的临时顺序节点激活了锁// Zookeeper 的临时顺序节点特性能保证跨多个 JVM 的线程并发创建节点时的顺序性,越早创建临时顺序节点成功的线程会更早地激活锁或获得锁boolean         getsTheLock = ourIndex < maxLeases;// 如果已经获得了锁,则无需监听任何节点,否则需要监听上一顺序节点(ourIndex-1)// 因 为 锁 是 公 平 的 , 因 此 无 需 监 听 除 了(ourIndex-1)以外的所有节点,这是为了减少羊群效应String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);// 返回获取锁的结果,交由上层继续处理(添加监听等操作)return new PredicateResults(pathToWatch, getsTheLock);
}static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException
{if ( ourIndex < 0 ){// 由于会话过期或连接丢失等原因,该线程创建的临时顺序节点被 Zookeeper 服务端删除,往外抛出 NoNodeException// 如果在重试策略允许范围内,则进行重新尝试获取锁,这会重新重新生成临时顺序节点throw new KeeperException.NoNodeException("Sequential path not found: " + sequenceNodeName);}
}

以上就是整个锁的抢夺过程,下面来分析锁释放的逻辑

8 释放锁,InterProcessMutex.release

释放的逻辑比较简单,首先减少重入锁的计数,直到变成0。然后释放锁,即移除移除Watchers & 删除创建的节点,最后从threadData中,删除自己线程的缓存。

public void release() throws Exception
{Thread currentThread = Thread.currentThread();LockData lockData = threadData.get(currentThread);if ( lockData == null ){// 无法从映射表中获取锁信息,不持有锁throw new IllegalMonitorStateException("You do not own the lock: " + basePath);}int newLockCount = lockData.lockCount.decrementAndGet();if ( newLockCount > 0 ){// 锁是可重入的,初始值为 1,原子-1 到 0,锁才释放return;}if ( newLockCount < 0 ){throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);}try{// lockData != null && newLockCount == 0,释放锁资源internals.releaseLock(lockData.lockPath);}finally{// 最后从映射表中移除当前线程的锁信息threadData.remove(currentThread);}
}

扩展:Zookeeper 数据的同步流程

当整个集群在启动时,或者当 leader 节点出现网络中断、崩溃等情况时,ZAB 协议就会进入恢复模式并选举产生新的 Leader,当 leader 服务器选举出来后,并且集群中有过半的机器和该 leader 节点完成数据同步后(同步指的是数据同步,用来保证集群中过半的机器能够和 leader 服务器的数据状态保持一致),ZAB 协议就会退出恢复模式。当集群中已经有过半的 Follower 节点完成了和 Leader 状态同步以后,那么整个集群就进入了消息广播模式。这个时候,在 Leader 节点正常工作时,启动一台新的服务器加入到集群,那这个服务器会直接进入数据恢复模式,和leader 节点进行数据同步。同步完成后即可正常对外提供非事务请求的处理。需要注意的是:leader 节点可以处理事务请求和非事务请求,follower 节点只能处理非事务请求,如果 follower 节点接收到非事务请求,会把这个请求转发给 Leader 服务器。

消息广播的实现原理:

消息广播的过程实际上是一个简化版本的二阶段提交过程

  1. leader 接收到消息请求后,将消息赋予一个全局唯一的64 位自增 id,叫:zxid,通过 zxid 的大小比较既可以实现因果有序这个特征
  2. leader 为每个 follower 准备了一个 FIFO 队列(通过 TCP协议来实现,以实现了全局有序这一个特点)将带有 zxid的消息作为一个提案(proposal)分发给所有的 follower
  3. 当 follower 接收到 proposal,先把 proposal 写到磁盘,写入成功以后再向 leader 回复一个 ack
  4. 当 leader 接收到合法数量(超过半数节点)的 ACK 后,leader 就会向这些 follower 发送 commit 命令,同时会在本地执行该消息
  5. 当 follower 收到消息的 commit 命令以后,会提交该消息

leader 的投票过程,不需要 Observer 的 ack,也就是Observer 不需要参与投票过程,但是 Observer 必须要同步 Leader 的数据从而在处理请求的时候保证数据的一致性

总结

总结:InterProcessMutex通过在zookeeper的某路径节点下创建临时序列节点来实现分布式锁,即每个线程(跨进程的线程)获取同一把锁前,都需要在同样的路径下创建一个节点,节点名字由uuid + 递增序列组成。而通过对比自身的序列数是否在所有子节点中最小的一位,来判断是否成功获取到了锁。当获取锁失败时,它会添加watcher来监听前一个节点的变动情况,然后进行等待状态。直到watcher的事件生效将自己唤醒,或者超时时间异常返回。

Curator实现分布式锁的基本原理相关推荐

  1. Curator实现分布式锁的基本原理-LockInternals.internalLockLoop

    // 循环等待来激活分布式锁,实现锁的公平性 private boolean internalLockLoop(long startMillis, Long millisToWait, String ...

  2. Curator实现分布式锁的基本原理-createsTheLock

    // From StandardLockInternalsDriver // 在Zookeeper中创建临时顺序节点 public String createsTheLock(CuratorFrame ...

  3. Curator实现分布式锁的基本原理-LockInternals.attemptLock

    // 尝试获取锁,并返回锁对应的Zookeeper临时顺序节点的路径 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes ...

  4. Curator实现分布式锁的基本原理-InterProcessMutex.internalLock

    private boolean internalLock(long time, TimeUnit unit) throws Exception{ Thread currentThread = Thre ...

  5. Curator实现分布式锁的基本原理-构造函数

    // 最常用 public InterProcessMutex(CuratorFramework client, String path){// Zookeeper利用path创建临时顺序节点,实现公 ...

  6. Curator实现分布式锁的基本原理-getTheLock

    // From StandardLockInternalsDriver public PredicateResults getsTheLock(CuratorFramework client, Lis ...

  7. Curator实现分布式锁的基本原理-InterProcessMutex.acquire

    // 无限等待 public void acquire() throws Exception{ if ( !internalLock(-1, null) ){ throw new IOExceptio ...

  8. 分布式架构-ZK客户端工具Curator框架分布式锁及基本使用

    分布式架构-基于Curator分布式锁及基本使用 一.Curator Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作 ...

  9. Apache Curator之分布式锁原理(二)

    本文主要讲解如下内容: 为什么要使用分布式锁? 分布式锁特性! 分布式锁的实现方式有哪些? Curator分布式锁原理 Curator分布式锁实现类UML及相关类的介绍 基于Redis,数据库实现分布 ...

最新文章

  1. 理解oracle中连接和会话
  2. python绘图—— matplotlib
  3. 关于HTML、js加密、混淆、源码保护、代码安全,防止解压直接看源码
  4. C# ---- GC中代的递增规律
  5. 判断上三角矩阵 (15 分)
  6. eslint是什么_一起来用 eslint 吧
  7. 项目中SQL语句的一些应用总结
  8. vmware之VMware Remote Console (VMRC) SDK(三)
  9. linux 安装mysql(rpm文件安装)
  10. 巴比特CEO王雷:未来几年会迎来区块链企业上市小高潮,或将诞生万亿级企业...
  11. DP4398:国产兼容替代CS4398立体声24位/192kHz音频解码芯片
  12. 普通平键的主要尺寸有_平键的基本参数
  13. Android性能分析之---卡顿分析
  14. VS2019制作DLL文件
  15. Notes Fifth Day-渗透攻击-红队-信息收集
  16. [译] WWDC 2018:关于iOS 12、iPad Pro、新MacBook及更多产品的所有预测
  17. 添加mshtml.tlb的警告
  18. 简单设置 Amazon CloudFront
  19. 渗透测试之路:ThinkPHP漏洞复现
  20. cnpm安装淘宝镜像源

热门文章

  1. zabbix 快速入门
  2. CVE和CWE的区别
  3. java hql查询_Spring 中常用的hql查询方法(getHibernateTemplate())(转)
  4. 有关程序员的几则冷笑话
  5. Java打印折纸游戏
  6. 光滑曲线_高等数学八:(3)曲线积分与路径无关的条件
  7. 字母消消乐游戏(C语言版本_2023首篇新作)
  8. 公积金网厅显示连接服务器错误,住房公积金网厅解决方案
  9. Failed to introspect Class [com.ssm.controller.OrderController] from ClassLoader [ParallelWebappClas
  10. u云支付 php05,优云易支付-免签约支付平台-彩虹易支付,1分钟快速接入支付功能...