1. 简介

我们在之前的博文中讲解了如何使用redis实现分布式锁,其实除了 redis 还有 zookeeper 也能实现分布式锁。

废话不多说,直接上图。

从整个流程中可以看出,zk实现分布式锁,主要是靠zk的临时顺序节点和watch机制实现的。

2. quick start

Curator 是 Netflix 公司开源的一套 zookeeper 客户端框架,解决了很多 Zookeeper 客户端非常底层的细节开发工作,包括连接重连、反复注册 Watcher 和 NodeExistsException 异常等。

curator-recipes:封装了一些高级特性,如:Cache 事件监听、选举、分布式锁、分布式计数器、分布式 Barrier 等。

2.1 引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.2.0</version>
</dependency>

curator-recipes中已经依赖了zookeeper和curator-framework jar,所以这里不用额外的依赖其他jar。

2.2 测试代码

测试代码其实很简单,只需要几行代码而已,初始化客户端,创建锁对象,加锁 和 释放锁。

这里先把加锁的代码注释掉,试下不加锁的情况。

package com.ldx.zookeeper.controller;import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;/*** 分布式锁demo** @author ludangxin* @date 2021/9/4*/
@Slf4j
@RestController
@RequestMapping("lock")
@RequiredArgsConstructor
public class LockDemoController {/*** 库存数*/private Integer stock = 30;/*** zk client */private static CuratorFramework CLIENT;/*** 初始化连接信息*/@PostConstructprivate void init() {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CLIENT = CuratorFrameworkFactory.builder().connectString("localhost:2181").retryPolicy(retryPolicy).build();CLIENT.start();}@GetMapping("buy")public String buy() {// 可重入锁InterProcessMutex mutexLock = new InterProcessMutex(CLIENT, "/lock");try {// 加锁
//         mutexLock.acquire();if(this.stock > 0) {Thread.sleep(500);this.stock--;}log.info("剩余库存==={}", this.stock);} catch(Exception e) {log.error(e.getMessage());return "no";}finally {try {// 释放锁
//            mutexLock.release();} catch(Exception e) {log.error(e.getMessage());}}return "ok";}
}

2.3 启动测试

这里我们使用jemter进行模拟并发请求,当然我们这里只启动了一个server,主要是为了节约文章篇幅(启动多个server还得连接db...),能说明问题即可。

同一时刻发送一百个请求。

测试结果部分日志如下:

很明显出现了超卖了现象,并且请求是无序的(请求是非公平的)。

此时我们将注释的加锁代码打开,再进行测试。

文章福利:现在C++程序员面临的竞争压力越来越大。那么,作为一名C++程序员,怎样努力才能快速成长为一名高级的程序员或者架构师,或者说一名优秀的高级工程师或架构师应该有怎样的技术知识体系,这不仅是一个刚刚踏入职场的初级程序员,也是工作三五年之后开始迷茫的老程序员,都必须要面对和想明白的问题。为了帮助大家少走弯路,技术要做到知其然还要知其所以然。以下视频获取点击:C++架构师学习资料

如果想学习C++工程化、高性能及分布式、深入浅出。性能调优、TCP,协程,Nginx源码分析Nginx,ZeroMQ,MySQL,Redis,MongoDB,ZK,Linux内核,P2P,K8S,Docker,TCP/IP,协程,DPDK的朋友可以看一下这个学习地址:C/C++Linux服务器开发高级架构师/Linux后台架构师https://ke.qq.com/course/417774?flowToken=1013189

测试结果部分日志如下:

很明显没有出现超卖的现象。

通过zk 客户端工具查看创建的部分临时节点如下:

3. 源码解析

3.1 加锁逻辑

我们再通过查看Curator加锁源码来验证下我们的加锁逻辑。

首先我们查看InterProcessMutex::acquire()方法,并且我们通过注释可以得知该方法加的锁是可重入锁。

/*** Acquire the mutex - blocking until it's available. Note: the same thread* can call acquire re-entrantly. Each call to acquire must be balanced by a call* to {@link #release()}** @throws Exception ZK errors, connection interruptions*/
@Override
public void acquire() throws Exception
{if ( !internalLock(-1, null) ){throw new IOException("Lost connection while trying to acquire lock: " + basePath);}
}

查看internalLock方法如下。

private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();private boolean internalLock(long time, TimeUnit unit) throws Exception {// 获取当前线程Thread currentThread = Thread.currentThread();// 在map中查看当前线程有没有请求过LockData lockData = threadData.get(currentThread);if ( lockData != null) {// 请求过 则 +1 , 实现了锁的重入逻辑lockData.lockCount.incrementAndGet();return true;}// 尝试获取锁String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());if ( lockPath != null) {// 创建锁对象LockData newLockData = new LockData(currentThread, lockPath);// 添加到map中threadData.put(currentThread, newLockData);return true;}return false;
}

我们继续查看LockInternals::attemptLock()尝试获取锁逻辑如下。

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {final long      startMillis = System.currentTimeMillis();final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;int             retryCount = 0;String          ourPath = null;boolean         hasTheLock = false;boolean         isDone = false;while(!isDone) {// 成功标识isDone = true;try {// 创建锁ourPath = driver.createsTheLock(client, path, localLockNodeBytes);// 判断是否加锁成功hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);} catch( KeeperException.NoNodeException e ) {// 当StandardLockInternalsDriver 找不到锁定节点时,它会抛出会话过期等情况。因此,如果重试允许,则继续循环if( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) {isDone = false;} else {throw e;}}}if(hasTheLock) {return ourPath;}return null;
}

在这里先查看下创建锁的逻辑StandardLockInternalsDriver::createsTheLock(),如下。

@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {String ourPath;// 判断有没有传znode data 我们这里为nullif(lockNodeBytes != null) {ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);} else {// 创建Container父节点且创建临时的顺序节点ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);}return ourPath;
}

锁创建成功后我们再查看下程序是如何加锁的LockInternals::internalLockLoop()。

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {boolean     haveTheLock = false;boolean     doDelete = false;try {if(revocable.get() != null) {client.getData().usingWatcher(revocableWatcher).forPath(ourPath);}// 当客户端初始化好后 且 还没有获取到锁while((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {// 获取所有的子节点 且 递增排序List<String>        children = getSortedChildren();// 获取当前节点 pathString              sequenceNodeName = ourPath.substring(basePath.length() + 1);// 获取当前锁// 1. 先判断当前节点是不是下标为0的节点,即是不是序列值最小的节点。// 2. 如果是则获取锁成功,返回成功标识。// 3. 如果不是则返回比它小的元素作为被监听的节点PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);if(predicateResults.getsTheLock()) {// 获取锁成功 返回成功标识haveTheLock = true;} else {// 索取锁失败,则获取比它小的上一个节点元素String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();synchronized(this) {try {// 监听比它小的上一个节点元素client.getData().usingWatcher(watcher).forPath(previousSequencePath);// 如果设置了超时,则继续判断是否超时if(millisToWait != null) {millisToWait -= (System.currentTimeMillis() - startMillis);startMillis = System.currentTimeMillis();if(millisToWait <= 0) {doDelete = true;    break;}// 没有超时则 等待wait(millisToWait);} else {// 没有超时则 等待wait();}} catch(KeeperException.NoNodeException e) {// it has been deleted (i.e. lock released). Try to acquire again}}}}} catch(Exception e) {ThreadUtils.checkInterrupted(e);doDelete = true;throw e;} finally {// 报错即删除该节点if(doDelete) {deleteOurPath(ourPath);}}return haveTheLock;
}

最后 我们再看下上段代码中提到的很关键的方法driver.getsTheLock() 即 StandardLockInternalsDriver::getsTheLock()。

@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {// 获取当前节点的下标 int             ourIndex = children.indexOf(sequenceNodeName);validateOurIndex(sequenceNodeName, ourIndex);// 这里的maxLeases == 1,即当前节点的下标是不是0boolean         getsTheLock = ourIndex < maxLeases;// 如果当前节点的下标为0,则不返回被监听的节点(因为自己已经是最小的节点了),如果不是则返回比自己小的节点作为被监听的节点。String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);// 构造返回结果return new PredicateResults(pathToWatch, getsTheLock);
}

3.2 小节

其实加锁的源码还是比较清晰和易懂的,我们在这里再总结下。

  1. 执行InterProcessMutex::acquire()加锁方法。
  2. InterProcessMutex::internalLock()判断当前线程是加过锁,如果加过则加锁次数+1实现锁的重入,如果没有加过锁,则调用LockInternals::attemptLock()尝试获取锁。
  3. LockInternals::attemptLock()首先创建Container父节点再创建临时的顺序节点,然后执行加锁方法LockInternals::internalLockLoop()。
  4. LockInternals::internalLockLoop()先获取当前Container下的所有顺序子节点并且按照从小到大排序。调用StandardLockInternalsDriver::getsTheLock()方法加锁,先判断当前节点是不是最小的顺序节点,如果是则加锁成功,如果不是则返回上一个比他小的节点,作为被监听的节点。上一步加锁成功则返回true,如果失败则执行监听逻辑。

3.3 释放锁逻辑

@Override
public void release() throws Exception {/*Note on concurrency: a given lockData instancecan be only acted on by a single thread so locking isn't necessary*/// 获取当前线程Thread currentThread = Thread.currentThread();// 查看当前线程有没有锁LockData lockData = threadData.get(currentThread);if(lockData == null) {// 没有锁 还释放,报错throw new IllegalMonitorStateException("You do not own the lock: " + basePath);}// 有锁则 锁次数 -1int newLockCount = lockData.lockCount.decrementAndGet();// 如果锁的次数还大于0,说明还不能释放锁,因为重入的方法还未执行完if (newLockCount > 0) {return;}if (newLockCount < 0) {// 锁的次数小于0,报错throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);}try {// 删除节点internals.releaseLock(lockData.lockPath);}finally {// 从当前的map中移除threadData.remove(currentThread);}
}final void releaseLock(String lockPath) throws Exception{client.removeWatchers();revocable.set(null);deleteOurPath(lockPath);
}

4. redis 和 zookeeper

Zookeeper采用临时节点和事件监听机制可以实现分布式锁,Redis主要是通过setnx命令实现分布式锁。
Redis需要不断的去尝试获取锁,比较消耗性能,Zookeeper是可以通过对锁的监听,自动获取到锁,所以性能开销较小。
另外如果获取锁的jvm出现bug或者挂了,那么只能redis过期删除key或者超时删除key,Zookeeper则不存在这种情况,连接断开节点则会自动删除,这样会即时释放锁。

这样一听感觉zk的优势还是很大的。

但是要考虑一个情况在锁并发不高的情况下 zk没有问题 如果在并发很高的情况下 zk的数据同步 可能造成锁时延较长,在选举过程中需要接受一段时间zk不可用(因为ZK 是 CP 而 redis集群是AP)。

所以说没有哪个技术是适用于任何场景的,具体用哪个技术,还是要结合当前的技术架构和业务场景做选型和取舍。

Zookeeper实战-分布式锁的现实相关推荐

  1. ieee39节点系统介绍_Java秒杀系统实战系列-基于ZooKeeper的分布式锁优化秒杀逻辑...

    本文是"Java秒杀系统实战系列文章"的第十六篇,本文我们将继续秒杀系统的优化之路,采用统一协调调度中心中间件ZooKeeper控制秒杀系统中高并发多线程对于共享资源~代码块的并发 ...

  2. 浅析redis与zookeeper构建分布式锁的异同

    作者:架构小菜 链接:https://www.jianshu.com/p/508620a76e00 进程请求分布式锁时一般包含三个阶段:1. 进程请求获取锁:2. 获取到锁的进程持有锁并执行业务逻辑: ...

  3. 基于 Zookeeper 的分布式锁实现

    1. 背景 最近在学习 Zookeeper,在刚开始接触 Zookeeper 的时候,完全不知道 Zookeeper 有什么用.且很多资料都是将 Zookeeper 描述成一个"类 Unix ...

  4. zookeeper实现分布式锁的原理及具体使用案例

    zookeeper跟redis一样,也是基于内存的. 官网: http://zookeeper.apache.org/ zookeeper是分布式系统的协调服务,提供配置管理.分布式协同.命名的中心化 ...

  5. Redis与Zookeeper实现分布式锁区别

    1.分布式锁解决方案  1.采用数据库 不建议 性能不好 jdbc  2.基于Redis实现分布式锁(setnx)setnx也可以存入key,如果存入key成功返回1,如果存入的key已经存在了,返回 ...

  6. 漫画:如何用Zookeeper实现分布式锁?

    转载自   漫画:如何用Zookeeper实现分布式锁? 什么是临时顺序节点? 让我们来回顾一下Zookeeper节点的概念: Zookeeper的数据存储结构就像一棵树,这棵树由节点组成,这种节点叫 ...

  7. Redis与Zookeeper实现分布式锁的区别

    Redis与Zookeeper实现分布式锁的区别 1.分布式锁解决方案 1.采用数据库 不建议 性能不好 jdbc 2.基于Redis实现分布式锁(setnx)setnx也可以存入key,如果存入ke ...

  8. Zookeeper系列四:Zookeeper实现分布式锁、Zookeeper实现配置中心

    一.Zookeeper实现分布式锁 分布式锁主要用于在分布式环境中保证数据的一致性. 包括跨进程.跨机器.跨网络导致共享资源不一致的问题. 1. 分布式锁的实现思路 说明: 这种实现会有一个缺点,即当 ...

  9. 【Zookeeper】基于Zookeeper实现分布式锁

    1.概述 转载:基于Zookeeper实现分布式锁 1.1 为什么使用分布式锁 我们在开发应用的时候,如果需要对某一个共享变量进行多线程同步访问的时候,我们往往采用synchronized或者Lock ...

最新文章

  1. 当你学了现在的忘了前面的
  2. memcached客户端_小水谈Memcache---Memcached安装
  3. 将k8s制作成3D射击游戏,好玩到停不下来,附源码
  4. Asp.Net中WebForm与MVC,Web API模式对比
  5. java常见排序算法有哪些_Java中常用的6种排序算法详细分解
  6. Redmine数据库备份及搬家
  7. 数据挖掘之模型选择和融合
  8. MySQL索引分类入门
  9. 解决ThinkPad早期笔记本Broadcom博通系列无线网卡Win10掉线、受限、速度慢问题(ThinkPad E530为例)
  10. 卡内基梅隆大学计算机专业录取难,卡内基梅隆大学计算机录取
  11. Windows 打开和关闭默认共享方法汇总
  12. oracle监听生成trace,监听器控制程序lsnrctl跟踪trace file
  13. 企业加速推进数字化转型,程序员进国企靠谱吗?
  14. JAVA(一)依赖注入的简单理解
  15. 【Tableau Desktop 企业日常技巧16】Tableau下载和安装ODBC驱动 连接MySQL方法及过程详解
  16. RemoteViews的用法
  17. [2021.8纪中集训Day14]
  18. Metal 框架之创建纹理及纹理采样
  19. ProcessDefinition是干这个用的
  20. 汽车材料QC/T 942-2013 ELV中六价铬的检测

热门文章

  1. [phaser3学习]使用phaser3做一款飞刀小游戏
  2. 福州大学计算机陈晨,陈宜言大师工作室落户福州大学 搭建产学研一体化平台...
  3. mysql字段相同连接_mysql字符串连接,重复等字符串函数总结
  4. 网络分流算法——Cross法(也叫Scott—Hinsley法)
  5. Git 安装与卸载 gitk安装与优化
  6. iveryone火速上车 抢第一波
  7. 首版次高端软件申报条件和好处
  8. 卫星影像在浦北县白石水镇、北通镇等9个镇耕地开垦项目中的应用
  9. pdf转换成jpg教程
  10. 瀑布图 matlab 二维,Matlab中二维统计分析图和三维立体图