从构建分布式秒杀系统聊聊分布式锁

1.案例介绍

在尝试了解分布式锁之前,大家可以想象一下,什么场景下会使用分布式锁?

单机应用架构中,秒杀案例使用ReentrantLcok或者synchronized来达到秒杀商品互斥的目的。然而在分布式系统中,会存在多台机器并行去实现同一个功能。也就是说,在多进程中,如果还使用以上JDK提供的进程锁,来并发访问数据库资源就可能会出现商品超卖的情况。因此,需要我们来实现自己的分布式锁。

实现一个分布式锁应该具备的特性:

  • 高可用、高性能的获取锁与释放锁
  • 在分布式系统环境下,一个方法或者变量同一时间只能被一个线程操作
  • 具备锁失效机制,网络中断或宕机无法释放锁时,锁必须被删除,防止死锁
  • 具备阻塞锁特性,即没有获取到锁,则继续等待获取锁
  • 具备非阻塞锁特性,即没有获取到锁,则直接返回获取锁失败
  • 具备可重入特性,一个线程中可以多次获取同一把锁,比如一个线程在执行一个带锁的方法,该方法中又调用了另一个需要相同锁的方法,则该线程可以直接执行调用的方法,而无需重新获得锁

在之前的秒杀案例中,我们曾介绍过关于分布式锁几种实现方式:

  • 基于数据库实现分布式锁
  • 基于 Redis 实现分布式锁
  • 基于 Zookeeper 实现分布式锁

前两种对于分布式生产环境来说并不是特别推荐,高并发下数据库锁性能太差,Redis在锁时间限制和缓存一致性存在一定问题。这里我们重点介绍一下 Zookeeper 如何实现分布式锁。

2.实现原理

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它内部是一个分层的文件系统目录树结构,规定同一个目录下只能存在唯一文件名。

ZooKeeper数据模型与文件系统目录树(源自网络)

3.数据模型

  • PERSISTENT 持久化节点,节点创建后,不会因为会话失效而消失
  • EPHEMERAL 临时节点, 客户端session超时此类节点就会被自动删除
  • EPHEMERAL_SEQUENTIAL 临时自动编号节点
  • PERSISTENT_SEQUENTIAL 顺序自动编号持久化节点,这种节点会根据当前已存在的节点数自动加 1

4.监视器(watcher)

当创建一个节点时,可以注册一个该节点的监视器,当节点状态发生改变时,watch被触发时,ZooKeeper将会向客户端发送且仅发送一条通知,因为watch只能被触发一次。

根据zookeeper的这些特性,我们来看看如何利用这些特性来实现分布式锁:

  • 创建一个锁目录lock
  • 线程A获取锁会在lock目录下,创建临时顺序节点
  • 获取锁目录下所有的子节点,然后获取比自己小的兄弟节点,如果不存在,则说明当前线程顺序号最小,获得锁
  • 线程B创建临时节点并获取所有兄弟节点,判断自己不是最小节点,设置监听(watcher)比自己次小的节点
  • 线程A处理完,删除自己的节点,线程B监听到变更事件,判断自己是最小的节点,获得锁

5.代码分析

尽管ZooKeeper已经封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。但是如果让一个普通开发者去手撸一个分布式锁还是比较困难的,在秒杀案例中我们直接使用 Apache 开源的curator 开实现 Zookeeper 分布式锁。

这里我们使用以下版本,截止目前最新版4.0.1,maven依赖如下:

<!-- zookeeper 分布式锁,注意zookeeper版本:zookeeper-3.5.1-alpha-->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.12.0</version>
</dependency>

首先,我们看下InterProcessLock接口中的几个方法:

package org.apache.curator.framework.recipes.locks;import java.util.concurrent.TimeUnit;public interface InterProcessLock
{/*** Acquire the mutex - blocking until it's available. Each call to acquire must be balanced by a call* to {@link #release()}** @throws Exception ZK errors, connection interruptions*/public void acquire() throws Exception;/*** Acquire the mutex - blocks until it's available or the given time expires. Each call to acquire that returns true must be balanced by a call* to {@link #release()}** @param time time to wait* @param unit time unit* @return true if the mutex was acquired, false if not* @throws Exception ZK errors, connection interruptions*/public boolean acquire(long time, TimeUnit unit) throws Exception;/*** Perform one release of the mutex.** @throws Exception ZK errors, interruptions, current thread does not own the lock*/public void release() throws Exception;/*** Returns true if the mutex is acquired by a thread in this JVM** @return true/false*/boolean isAcquiredInThisProcess();
}

InterProcessLock接口总共有4个实现类,结构如下

  • InterProcessMultiLock:管理多个锁的容器最为单一对象,当调用acquire方法获取锁时,所有的锁被同时获取,业务处理完成后,线程释放锁时也是同时释放所有的锁

    /*** A container that manages multiple locks as a single entity. When {@link #acquire()} is called,* all the locks are acquired. If that fails, any paths that were acquired are released. Similarly, when* {@link #release()} is called, all locks are released (failures are ignored).*/
    public class InterProcessMultiLock implements InterProcessLock
    {private final List<InterProcessLock> locks;/*** Creates a multi lock of {@link InterProcessMutex}s** @param client the client* @param paths list of paths to manage in the order that they are to be locked*/public InterProcessMultiLock(CuratorFramework client, List<String> paths){// paths get checked in each individual InterProcessMutex, so trust them herethis(makeLocks(client, paths));}
    
  • InterProcessMutex:支持可重入锁

    /*** A re-entrant mutex that works across JVMs. Uses Zookeeper to hold the lock. All processes in all JVMs that* use the same lock path will achieve an inter-process critical section. Further, this mutex is* "fair" - each user will get the mutex in the order requested (from ZK's point of view)*/
    public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex>
    {private final LockInternals internals;private final String basePath;private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();private static class LockData{final Thread owningThread;final String lockPath;final AtomicInteger lockCount = new AtomicInteger(1);private LockData(Thread owningThread, String lockPath){this.owningThread = owningThread;this.lockPath = lockPath;}}private static final String LOCK_NAME = "lock-";
    
  • InterProcessSemaphoreMutex : 不支持可重入锁

    /*** A NON re-entrant mutex that works across JVMs. Uses Zookeeper to hold the lock. All processes in all JVMs that* use the same lock path will achieve an inter-process critical section.*/
    public class InterProcessSemaphoreMutex implements InterProcessLock
    {private final InterProcessSemaphoreV2 semaphore;private volatile Lease lease;/*** @param client the client* @param path path for the lock*/public InterProcessSemaphoreMutex(CuratorFramework client, String path){this.semaphore = new InterProcessSemaphoreV2(client, path, 1);}
    
  • InternalInterProcessMutex:zk读写锁的静态内部类,充当zk读写锁

    public class InterProcessReadWriteLock
    {private final InterProcessMutex readMutex;private final InterProcessMutex writeMutex;private static class InternalInterProcessMutex extends InterProcessMutex{private final String lockName;private final byte[] lockData;InternalInterProcessMutex(CuratorFramework client, String path, String lockName, byte[] lockData, int maxLeases, LockInternalsDriver driver){super(client, path, lockName, maxLeases, driver);this.lockName = lockName;this.lockData = lockData;}@Overridepublic Collection<String> getParticipantNodes() throws Exception{Collection<String>  nodes = super.getParticipantNodes();Iterable<String>    filtered = Iterables.filter(nodes,new Predicate<String>(){@Overridepublic boolean apply(String node){return node.contains(lockName);}});return ImmutableList.copyOf(filtered);}@Overrideprotected byte[] getLockNodeBytes(){return lockData;}}
    

这里以支持可重复锁InterProcessMutex针对源码展开介绍,了解zk充当分布式锁的锁机制,加锁和解锁过程

1)获取锁:

 /*** Acquire the mutex - blocking until it's available. Note: the same thread* can call acquire re-entrantly. Each call to acquire must be balanced by a call* to {@link #release()}** @throws Exception ZK errors, connection interruptions*/@Overridepublic void acquire() throws Exception{if ( !internalLock(-1, null) ){throw new IOException("Lost connection while trying to acquire lock: " + basePath);}}/*** Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread* can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call* to {@link #release()}** @param time time to wait* @param unit time unit* @return true if the mutex was acquired, false if not* @throws Exception ZK errors, connection interruptions*/@Overridepublic boolean acquire(long time, TimeUnit unit) throws Exception{return internalLock(time, unit);}

具体实现:

 private boolean internalLock(long time, TimeUnit unit) throws Exception{/*Note on concurrency: a given lockData instancecan be only acted on by a single thread so locking isn't necessary*/Thread currentThread = Thread.currentThread();LockData lockData = threadData.get(currentThread);if ( lockData != null ){// re-enteringlockData.lockCount.incrementAndGet();return true;}// 尝试获取zk锁String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());if ( lockPath != null ){LockData newLockData = new LockData(currentThread, lockPath);threadData.put(currentThread, newLockData);return true;}//获取锁超时或者zk通信异常返回失败return false;}

Zookeeper获取锁实现:

 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception{final long      startMillis = System.currentTimeMillis();final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;// 子节点标识final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;// 重试次数int             retryCount = 0;String          ourPath = null;boolean         hasTheLock = false;boolean         isDone = false;// 自旋锁,循环获取锁while ( !isDone ){isDone = true;try{//在锁节点下创建临时且有序的子节点,例如:_c_008c1b07-d577-4e5f-8699-8f0f98a013b4-lock-000000001ourPath = driver.createsTheLock(client, path, localLockNodeBytes);//如果当前子节点序号最小,获得锁则直接返回,否则阻塞等待前一个子节点删除通知(release释放锁)hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);}catch ( KeeperException.NoNodeException e ){// gets thrown by StandardLockInternalsDriver when it can't find the lock node// this can happen when the session expires, etc. So, if the retry allows, just try it all againif ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ){isDone = false;}else{throw e;}}}if ( hasTheLock ){return ourPath;}return null;}

判断是否为最小节点:

 private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception{boolean     haveTheLock = false;boolean     doDelete = false;try{if ( revocable.get() != null ){client.getData().usingWatcher(revocableWatcher).forPath(ourPath);}// 自旋获取锁while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ){// 获取所有子节点集合List<String>        children = getSortedChildren();// 判断当前子节点是否是最小子节点String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slashPredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);if ( predicateResults.getsTheLock() ){ // 当前节点是最小子节点,获得锁haveTheLock = true;}else{ // 获取前一个节点,用于监听String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();synchronized(this){try {// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leakclient.getData().usingWatcher(watcher).forPath(previousSequencePath);if ( millisToWait != null ){millisToWait -= (System.currentTimeMillis() - startMillis);startMillis = System.currentTimeMillis();if ( millisToWait <= 0 ){doDelete = true;    // timed out - delete our nodebreak;}wait(millisToWait);}else{wait();}}catch ( KeeperException.NoNodeException e ) {// it has been deleted (i.e. lock released). Try to acquire again// 如果前一个子节点已经被删除则throw exception,只需要自旋获取一次即可}}}}}catch ( Exception e ){ThreadUtils.checkInterrupted(e);doDelete = true;throw e;}finally{if ( doDelete ){deleteOurPath(ourPath);}}return haveTheLock;}

2) 释放锁:

 /*** Perform one release of the mutex if the calling thread is the same thread that acquired it. If the* thread had made multiple calls to acquire, the mutex will still be held when this method returns.** @throws Exception ZK errors, interruptions, current thread does not own the lock*/@Overridepublic void release() throws Exception{/*Note on concurrency: a given lockData instancecan be only acted on by a single thread so locking isn't necessary*/Thread currentThread = Thread.currentThread();LockData lockData = threadData.get(currentThread);if ( lockData == null ){throw new IllegalMonitorStateException("You do not own the lock: " + basePath);}// 获取重复梳理int newLockCount = lockData.lockCount.decrementAndGet();if ( newLockCount > 0 ){return;}if ( newLockCount < 0 ){throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);}try{internals.releaseLock(lockData.lockPath);}finally{// 移除当前线程获取zk锁数据threadData.remove(currentThread);}}

6.测试案例

为了更好的理解其原理和代码分析中获取锁的过程,这里我们实现一个简单的Demo:

package com.netease.it.access.web.util;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;/*** 基于curator的zookeeper分布式锁* Created by dujiayong on 2020/3/8.*/
public class CuratorUtil {private static final String ZK_LOCK_DIR = "/curator/lock";public static void main(String[] args) {// 1:定义重试策略:初始睡眠1s 最大重试3次RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);// 2.通过工厂创建连接CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);// 3.打开连接client.start();// 4.分布式锁final InterProcessLock lock = new InterProcessMutex(client, ZK_LOCK_DIR);ExecutorService executorService = Executors.newFixedThreadPool(5);for (int i = 0; i < 5; i++) {executorService.submit(new Runnable() {@Overridepublic void run() {boolean acquire = false;try {acquire = lock.acquire(5000, TimeUnit.MILLISECONDS);if (acquire) {System.out.println("线程->" + Thread.currentThread().getName() + "获得锁成功");} else {System.out.println("线程->" + Thread.currentThread().getName() + "获得锁失败");}// 模拟业务,延迟4sThread.sleep(4000);} catch (Exception e) {e.printStackTrace();} finally {if (acquire) {try {lock.release();} catch (Exception e) {e.printStackTrace();}}}}});}}}

这里我们开启5个线程,每个线程获取锁的最大等待时间为5秒,为了模拟具体业务场景,方法中设置4秒等待时间。开始执行main方法,通过ZooInspector监控/curator/lock下的节点如下图:

对,没错,设置4秒的业务处理时长就是为了观察生成了几个顺序节点。果然如案例中所述,每个线程都会生成一个节点并且还是有序的。

观察控制台,我们会发现只有两个线程获取锁成功,另外三个线程超时获取锁失败会自动删除节点。线程执行完毕我们刷新一下/curator/lock节点,发现刚才创建的五个子节点已经不存在了。


来源:https://www.cnblogs.com/williamjie/p/9406031.html

zookeeper原理博客:

  • zookeeper实现分布式锁(非公平锁)原理

从构建分布式秒杀系统聊聊分布式锁相关推荐

  1. 从构建分布式秒杀系统聊聊限流特技

    前言 俗话说的好,冰冻三尺非一日之寒,滴水穿石非一日之功,罗马也不是一天就建成的.两周前秒杀案例初步成型,分享到了中国最大的同 性友网站-码云.同时也收到了不少小伙伴的建议和投诉.我从不认为分布式.集 ...

  2. 从构建分布式秒杀系统聊聊线程池

    前言 从0到1构建分布式秒杀系统案例的代码已经全部上传至码云,文章也被分发到各个平台.其中也收到了不少小伙伴喜欢和反馈,有网友如是说: 说实话,能用上的不多,中小企业都不可能用到,大型企业也不是一个人 ...

  3. 从构建分布式秒杀系统聊聊验证码

    2019独角兽企业重金招聘Python工程师标准>>> 前言 为了拦截大部分请求,秒杀案例前端引入了验证码.淘宝上很多人吐槽,等输入完秒杀活动结束了,对,结束了...... 当然了, ...

  4. 从构建分布式秒杀系统聊聊WebSocket推送通知 1

    前言 秒杀架构到后期,我们采用了消息队列的形式实现抢购逻辑,那么之前抛出过这样一个问题:消息队列异步处理完每个用户请求后,如何通知给相应用户秒杀成功? 场景映射 首先,我们举一个生活中比较常见的例子: ...

  5. disruptor框架为什么不流行_从构建分布式秒杀系统聊聊Disruptor高性能队列

    前言 秒杀架构持续优化中,基于自身认知不足之处在所难免,也请大家指正,共同进步.文章标题来自码友的建议,希望可以把阻塞队列ArrayBlockingQueue这个队列替换成Disruptor,由于之前 ...

  6. 从构建分布式秒杀系统聊聊WebSocket推送通知

    前言 秒杀架构到后期,我们采用了消息队列的形式实现抢购逻辑,那么之前抛出过这样一个问题:消息队列异步处理完每个用户请求后,如何通知给相应用户秒杀成功? 场景映射 首先,我们举一个生活中比较常见的例子: ...

  7. SpringBoot开发案例从0到1构建分布式秒杀系统

    前言 最近,被推送了不少秒杀架构的文章,忙里偷闲自己也总结了一下互联网平台秒杀架构设计,当然也借鉴了不少同学的思路.俗话说,脱离案例讲架构都是耍流氓,最终使用SpringBoot模拟实现了部分秒杀场景 ...

  8. 分布式秒杀系统的设计

    分布式秒杀系统的设计 前言 不知道你在面试的过程中有没有被问到如何设计一个分布式秒杀系统?本篇博客根据大神们的梳理的体系并结合自己实际的项目经验,大致描述我们在设计分布式秒杀系统需要关注的核心内容-- ...

  9. 秒杀商城项目----Dubbo+Zookeeper分布式秒杀系统模块分析

    优化为分布式秒杀系统 一. Seckill-cache--缓存模块 ①. RedisConfig_redis配置信息 ②. RedisPoolFactory_jedis连接池 ③. RedisLock ...

最新文章

  1. NXT节点搭建(二)环境搭建
  2. php v9 分页静态,PHPCMS V9自定义栏目伪静态实现方法(列表页/分页/内容页)
  3. netstat -an 查看端口
  4. MySQL小表join大表的正确使用姿势(straight_join 关键字的使用)
  5. java oca_OCA的Java拼图游戏第3部分
  6. python执行文件函数,python如何运行函数
  7. 实战:Redis 性能测试
  8. 怎么用计算机计算年月份,如何使用Excel计算两个日期之间的月数?
  9. (转自ztp800201) Android - 自定义标题栏(在标题栏中增加按钮和文本居中)
  10. python基于web可视化_python可视化(转载)
  11. python常用单词有多少_Python常用单词
  12. python if else格式_Python进阶之路 3.4.2 条件语句(if、else和elif)
  13. 如何把DEBIAN变成UBUNTU-DESKTOP最少化安装
  14. 毛驴县令第二季简介及其下载
  15. vscode 快速新建一个HTML文件
  16. 苹果电脑win10蓝牙音响卡顿_win10系统蓝牙音箱卡顿声音断断续续的处理办法
  17. 关于软件开发中遇到的问题解决思路
  18. 29岁了还一事无成是人生的常态?
  19. 网络架构模式 B/S C/S
  20. 微软服务器 客户机,网络客户端和服务器技术简介

热门文章

  1. java.lang.IllegalStateException: getWriter() has already been called for this response问题解决
  2. cad2020安装1603错误_解决CAD安装过程中出现1603致命错误的方法
  3. 图形学笔记(二)——线画图元
  4. python----列表 例题 创建一个列表,命名为foods, 在列表中保存5个食物士豆,西红柿面条,苹果,披萨并打印出每一个食物的名字
  5. Python - 批量生成幻影坦克图片
  6. 读王竹峰老师 《一个数据库十年老兵的思考与总结》 有感
  7. 《Image-to-Image Translation with Conditional Adversarial Networks》文章翻译
  8. 错误状态0xc00002e1解决方法
  9. 品牌LOGO设计丨商业实践设计思路大揭秘 难怪他接单不断
  10. 梦想中的办公之所!华为云(莲湖)联合发展中心用四招让幸福工作成为可能