文章目录

  • 1. zookeeper为什么可以做分布式锁 ?
  • 2. zookeeper实现非公平锁
  • 3. zookeeper实现公平锁
  • 4. zookeeper实现读写锁(共享锁)
  • 5. Curator客户端实现zookeeper分布式锁

1. zookeeper为什么可以做分布式锁 ?

在单机环境下,可以使用synchronized或者Reentrandlock进行加锁控制并发,但在分布式场景下,由于多个应用会产生多个实例对象,单机锁中的对象不再唯一,所以单机锁无法解决分布式环境下的并发问题!我们可以借助独立于应用之外的中间件来解决,比如
①:redis的setnx、Redission (了解redis实现分布式锁点击这里)
②:zookeeper的节点特性

zookeeper主要利用节点无法重复创建 以及节点的监听通知机制来实现分布式锁的,当一个线程在zookeeper中创建了一个/test节点后,其他线程再创建这个/test节点会提示创建失败!因为zookeeper内部执行命令也像redis一样是单线程,多线程下的操作节点请求会排队执行,当发现节点已存在,则提示节点已存在!

        当前线程加完锁,逻辑代码执行完毕后,还需要删除节点释放锁,供其他线程争抢。其他线程利用zookeeper的节点监听特性,一旦节点被修改(删除),就会受到来自服务端的消息,表示自己可以抢锁了,如果没有抢到继续 监听/等待 此节点。

zookeeper根据这两个特性可以做分布式锁,分布式锁又分为公平锁、非公平锁、读写锁等等。zookeeper对其都做了实现!

zookeeper分布式锁和redis分布式锁的区别?

redis分布式锁中向master节点setnx存储数据成功就算成功,当数据同步时,master节点突然挂掉,集群剩余节点会选举出新的master节点,当新的master节点执行setnx命令时,旧的已挂掉master节点中还存在未同步的setnx数据,这就会使分布式锁失效!出现数据一致性问题。

而zookeeper分布式锁采用leader-follwer模式,在创建/lock节点作为分布式锁时,只有半数以上的节点执行成功才算成功,就算leader节点挂掉,其他zookeeper集群节点中已经存在/lock节点,后续创建/lock行为不会执行成功,保证了强一致性。但也正是因为这点,相比于redis分布式锁,性能较低!

2. zookeeper实现非公平锁

可以使用 持久化节点临时节点 来实现。推荐使用临时节点,因为持久化节点还需要手动删除,人工维护大量无用节点。临时节点session关闭就会过期,zookeeper内部线程自动删除!

非公平锁加锁原理:

如上非公平锁的实现方式在高并发的场景下,性能会下降的比较厉害。主要原因是:所有的连接都在对同一个节点进行监听,当服务器检测到删除事件时,要通知所有的连接,所有的连接同时收到事件,再次并发竞争,竞争失败再次等待。这就是羊群效应。羊群效应需要大量通知其他连接,而且不止一次,会造成一定的资源浪费!为了避免羊群效应,可以采用公平锁的方式实现分布式锁!

3. zookeeper实现公平锁

公平锁的实现需要保证节点的顺序性,可以使用zookeeper的 持久化顺序节点临时顺序节点,推荐使用临时顺序节点,原因同上。

公平锁实现原理图:

        /lock节点可以用 Container节点类型,该节点类型规定,如果Container节点下面没有子节点,则Container节点在未来会被Zookeeper自动清除,定时任务默认60s 检查一次。使用 临时顺序子节点代表锁,在session关闭时,也会自动清理,进而触发/lock节点的自动删除,减少人工维护成本

如上借助于容器Container节点 和 临时顺序节点,可以避免同时多个节点的并发竞争锁,缓解了服务端压力。这种实现方式所有加锁请求都进行排队加锁,是公平锁的具体实现。

问题一:当前线程怎么判断自己能不能获取锁?

答:在公平锁的实现中,只有顺序最小的节点才能获取分布式锁,每当有一个线程进来就会对/lock节点下所有的子节点进行排序,并比较自己的序号是不是列表中最小的,如果是就获取锁,如果不是,就对序号比自己小的上一个节点进行监听。

问题二:排队中的某个子节点如果挂掉,整个监听序列是否会出问题?

答:公平锁中,临时顺序节点中的每一个节点都监听着前一个序号比它小的节点。以 A - B -C 为例 ,A为最小的节点。如果B节点挂掉(或被删除),此时A节点仍在处理业务逻辑,并未释放,那么整个序列将变为不连续的。但是B挂掉后(或被删除),会对C发送通知机制,C接收到B已挂掉的通知,要去竞争锁啊,所以C会获取/lock节点下所有的子节点,排序并比较自己是不是最小的节点,结果发现并不是,因为A节点还在,然后B会对序号比它小上一个节点A进行监听,自动跳过了意外挂掉的B节点,这样就完成了监听关系的自动维护!

问题三:顺序子节点已创建,但服务端响应失败,造成节点多次创建怎么办?

答:客户端发送节点创建命令,服务器接收到并创建成功,但是给客户端响应的时候,服务器闪断,又在session超时时间内连上了。此时节点已创建成功,但客户端并没有收到节点创建成功的消息,它认为节点创建失败,由于客户端重试机制存在,会重新发起创建,这样就导致节点多次创建!!导致部分顺序节点一直存在服务器中,不会被释放,被称为僵尸节点。可以通过Curator客户端的protection 模式来解决僵尸节点问题。protection 模式会通过一个uuid来检测服务器节点,重试时如果uuid已存在,则表示已创建,就不会再次创建节点!

    // protection 模式,防止由于异常原因,导致僵尸节点@Testpublic void testCreate() throws Exception {CuratorFramework curatorFramework = getCuratorFramework();String forPath = curatorFramework.create()//防僵死.withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator-node", "some-data".getBytes());log.info("curator create node :{}  successfully.", forPath);}

4. zookeeper实现读写锁(共享锁)

zookeeper的公平锁和非公平锁的实现有一个共同的特质,就是都是互斥锁,同一时间只能有一个请求占用。如果是大量的并发上来,无论读写,所有的请求都得加锁,性能是会急剧下降的。那是不是真的所有的请求都需要加锁呢?答案是否定的!如果数据没有任何写操作只有读操作的话,是不需要加锁的,

如果读数据的请求还没读完,这个时候来了一个写请求,怎么办呢?有人已经在读数据了,这个时候是不能写数据的,不然数据就不正确了。直到前面读锁全部释放掉以后,写请求才能执行,所以需要给所有的读请求加一个读标识(读锁),让写请求知道,这个时候是不能修改数据的。不然数据就不一致了。

如果已经有人在写数据了,再来一个请求写数据,也是不允许的,这样也会导致数据的不一致,所以所有的写请求都需要加一个写标识(写锁),是为了避免同时对共享数据进行写操作。

zookeeper对读写锁的实现原理如下:

①:read请求,如果前面的请求都是read,则直接获取锁(读读共享),如果前面的请求有write请求,则该read请求不能直接获取锁(读写互斥),需要对前面的write请求节点进行监听。如果前面有多个write请求,则对离当前最近的write请求进行监听

②:write请求,无论前面请求是read还是write请求,都会对其监听,和公平锁和非公平锁性质一致,对其他行为互斥!

Curator客户端也实现了zookeeper读写锁,底层与实现公平锁不同的是:读写锁增加了读、写标识,在获取锁时,根据读写标识区分不同的读写逻辑

  1. 写请求:与Curator客户端实现的公平锁逻辑一致
  2. 读请求:遍历容器节点中所有子节点后,如果当前读节点前面都是读请求,则直接获取锁,执行业务逻辑;如果当前读节点前面有写请求,则记录所有写请求节点位置后,找到自己前边最近的节点,进行监听。等待这个写请求释放,自己再获取锁

5. Curator客户端实现zookeeper分布式锁

以分布式场景下减库存为例:

1. 引入Curator依赖

      <dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.0.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.8</version></dependency>

2. 注册Curator客户端CuratorFramework

@Configuration
public class CuratorCfg {@Bean(initMethod = "start")public CuratorFramework curatorFramework(){RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); //重试策略CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.100.100:2181", retryPolicy); //连接地址return client;}}

3. 使用Curator客户端保证减库存的安全性

    @AutowiredCuratorFramework curatorFramework;@PostMapping("/stock/deduct")public Object reduceStock(Integer id) throws Exception {//初始化参数InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, "/product_" + id);try {//添加分布式锁interProcessMutex.acquire();// 减库存 ,方法内容见下文orderService.reduceStock(id);} catch (Exception e) {if (e instanceof RuntimeException) {throw e;}}finally {//释放分布式锁interProcessMutex.release();}return "ok:" + port;}============================减库存方法==================@Transactionalpublic void reduceStock(Integer id){// 1.    获取库存Product product = productMapper.getProduct(id);// 模拟耗时业务处理sleep( 500); // 其他业务处理if (product.getStock() <=0 ) {throw new RuntimeException("out of stock");}// 2.    减库存int i = productMapper.deductStock(id);if (i==1){// 3.   减库存成功,增加一条订单Order order = new Order();order.setUserId(UUID.randomUUID().toString());order.setPid(id);orderMapper.insert(order);}else{throw new RuntimeException("deduct stock fail, retry.");}}

使用Curator客户端实现zookeeper分布式锁使用已完结,使用起来很简单,acquire()方法是分布式锁的核心!Curator是采用容器 Container节点 和 临时顺序节点 来实现zookeeper分布式锁的!

acquire()方法源码如下:

    private boolean internalLock(long time, TimeUnit unit) throws Exception{。。。。Thread currentThread = Thread.currentThread();//从threadData中获取当前线程的锁LockData lockData = threadData.get(currentThread);if ( lockData != null ){// 如果锁不为空,代表之前已经加过了,是可重入锁,继续执行!lockData.lockCount.incrementAndGet();return true;}//尝试创建节点,加锁String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());//如果加锁成功,把锁信息和线程信息封装到threadData返回,用于前面可重入锁获取if ( lockPath != null ){LockData newLockData = new LockData(currentThread, lockPath);threadData.put(currentThread, newLockData);return true;}return false;}

其中attemptLock方法封装了加锁逻辑,这里篇幅有限,不再贴代码了,具体流程如下:

  1. 在容器节点中创建一个临时顺序节点
        if ( lockNodeBytes != null ){//如果Container节点存在,创建一个Protection模式下的临时顺序节点ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);}else{ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);}return ourPath;
  1. 对容器节点中的子节点排序,并与当前节点比较
     //排序List<String>        children = getSortedChildren();`
         //获取当前节点索引位置int             ourIndex = children.indexOf(sequenceNodeName);validateOurIndex(sequenceNodeName, ourIndex);//与其他节点比较boolean         getsTheLock = ourIndex < maxLeases;String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);//返回比较结果return new PredicateResults(pathToWatch, getsTheLock);
  1. 如果当前节点是最小的节点,则获取锁。第一次进来,容器节点中只有一个子节点,就是当前节点,也是最小节点,会获取锁
         //如果上一步比较结果为trueif ( predicateResults.getsTheLock() ){//获取锁haveTheLock = true;}
  1. 如果当前节点节点不是最小的节点,对它前面比他小的第一个节点创建监听,并调用wait()方法等待。等待时间等于acquire(Time)中定义的时间。如果没有,则一直等待。
         // 监听client.getData().usingWatcher(watcher).forPath(previousSequencePath);// 等待wait(millisToWait);
  1. 直到前一个节点被删除,锁释放,因为当前线程监听着前一个节点,所以当前线程会收到通知,然后再次循环第二步代码逻辑,排序并比较自己是不是序号最小的节点。

zookeeper专题:使用zookeeper实现分布式锁相关推荐

  1. relation does not exist报错是什么意思_为什么Zookeeper天生就是一副分布式锁的胚子?...

    " 什么是分布式锁?分布式锁是控制分布式系统之间同步访问共享资源的一种方式.在分布式系统中,常常需要协调他们的动作. 图片来自 Pexels 如果不同的系统或是同一个系统的不同主机之间共享了 ...

  2. ZooKeeper : Curator框架之分布式锁InterProcessReadWriteLock

    InterProcessReadWriteLock 跨JVM工作的可重入读/写互斥锁,使用Zookeeper来持有锁,所有JVM中使用相同锁路径的所有进程都将实现进程间临界区.这个互斥锁是公平的,每个 ...

  3. Zookeeper命令操作(初始Zookeeper、JavaAPI操作、分布式锁实现、模拟12306售票分布式锁、Zookeeper集群搭建、选举投票)

    Zookeeper命令操作(初始Zookeeper.JavaAPI操作.分布式锁实现.模拟12306售票分布式锁.Zookeeper集群搭建.选举投票) 1.初始Zookeeper Zookeeper ...

  4. ZooKeeper(五) 使用Zookeeper有序临时节点实现分布式锁

    当使用zookeeper实现分布式锁时,当有新的请求需要进入需要同步加锁代码时,在zookeeper加锁代码中会在加锁的共同父节点下创建一个新的临时有需节点.创建完成后会获取加锁父节点下所有子节点.判 ...

  5. 基于zookeeper的瞬时节点实现分布式锁

    zookeeper的数据结构 zookeeper的观察器 可设置观察器的3个方法:getData();getChildren();exists(); 节点数据发生变化,发送给客户端 观察器只能监控一次 ...

  6. zookeeper操作封装——curator使用分布式锁使用

    文章目录 1. 基本操作 1.1 建立连接 1.2 建立结点 1.3 查询结点 查询数据 查询子结点 查看结点信息 1.4 修改结点 普通修改 带乐观锁的修改 1.5 删除 删除单个结点 删除带子结点 ...

  7. zookeeper专题:zookeeper的节点类型,数据持久化机制

    文章目录 1. zookeeper的安装 2. zookeeper的节点类型 3. zookeeper命令解析 4.zookeeper的监听通知机制 5. zookeeper的常规配置 6. zook ...

  8. zookeeper专题:zookeeper集群模式下,leader选举流程分析

    文章目录 Zookeeper 集群模式一共有三种类型的角色 1. zookeeper启动时leader选举流程 1.1 加载配置文件,设置基本信息 1.2 指定快速选举算法,启动多级队列.线程 1.3 ...

  9. zookeeper专题:zookeeper集群搭建和客户端连接

    文章目录 1. Zookeeper 集群模式介绍 2. zookeeper 集群搭建 3. 使用curate客户端连接zookeeper集群 1. Zookeeper 集群模式介绍 Zookeeper ...

  10. 纠结!分布式锁到底用Redis好还是ZooKeeper好?

    前言 1. 什么是分布式锁 分布式锁是控制分布式系统之间同步访问共享资源的一种方式.在分布式系统中,常常需要协调他们的动作.如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些 ...

最新文章

  1. 二.第五单元     lvm管理
  2. 使用Python运算一个字符串表达式
  3. Eureka-服务发现
  4. 3. 机器学习中为什么需要梯度下降_梯度提升(Gradient Boosting)算法
  5. 服务器压力测试系列二:服务器监控工具tsar安装
  6. $ is not defined与SpringMVC访问静态资源
  7. golang中的reflect(反射)
  8. 恒流源差分放大电路静态分析_差分放大电路常见的形式
  9. linux修改网卡名称命令,linux修改网卡名称
  10. android查看签名工具,签名获取工具app_apk签名工具安卓版_手机apk签名工具安卓版-多特软件站安卓网...
  11. 滚石特写: 沉默7年后, Magic Leap用魔幻现实主义式科技重新定义了自己
  12. Web服务器群集——Nginx企业级优化
  13. 5G承载网络技术发展趋势
  14. 微信公众平台帐号迁移条件及流程
  15. Flutter异常收集
  16. passing 'unsigned char [150]' to parameter of type 'char *' converts between pointers to integer typ
  17. 2020技术类博客撰写发布一揽子解决方案
  18. 从键盘输入一行英文字符串,把所有小写字母变成大写字母,其他字母和字符保持不变。
  19. ES6学习笔记(二)
  20. 【引用】iPhone开发内存管理

热门文章

  1. PyQt5笔记(04) -- 文本框的使用
  2. Java Se相关测试题(偏线程、集合)含答案及详解
  3. spark的rdd的含义_Spark里边:到底是什么RDD
  4. php通过js发送请求数据,使用原生javascript发送ajax请求数据的步骤
  5. async/await 顺序执行和并行
  6. php 中 stream_select 中的小窟窿.
  7. LigoWave(力格微)无线网桥稳定性探秘——私有协议
  8. Java 异步回调机制实例解析
  9. 解读Tom介绍的Oracle Database 12c的12个新特性
  10. Boot目录下内容丢失导致系统无法启动