点击关注公众号,实用技术文章及时了解

来源:blog.csdn.net/qq_40378034/

article/details/117014648

ZooKeeper源码的zookeeper-recipes目录下提供了分布式队列、分布式锁和选举的实现(GitHub地址:https://github.com/apache/zookeeper/tree/master/zookeeper-recipes)。本文主要对这几种实现做实现原理的解析和源码剖析:

1、分布式队列

使用路径为/queue的znode下的节点表示队列中的元素。/queue下的节点都是顺序持久化znode。这些znode名字的后缀数字表示了对应队列元素在队列中的位置。znode名字后缀数字越小,对应队列元素在队列中的位置越靠前

1)、offer方法

offer方法在/queue下面创建一个顺序znode。因为znode的后缀数字是/queue下面现有znode最大后缀数字加1,所以该znode对应的队列元素处于队尾

public class DistributedQueue {public boolean offer(byte[] data) throws KeeperException, InterruptedException {for (; ; ) {try {zookeeper.create(dir + "/" + prefix, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);return true;} catch (KeeperException.NoNodeException e) {zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);}}}

2)、element方法

public class DistributedQueue {public byte[] element() throws NoSuchElementException, KeeperException, InterruptedException {Map<Long, String> orderedChildren;while (true) {try {//获取所有排好序的子节点orderedChildren = orderedChildren(null);} catch (KeeperException.NoNodeException e) {throw new NoSuchElementException();}if (orderedChildren.size() == 0) {throw new NoSuchElementException();}//返回队头节点的数据for (String headNode : orderedChildren.values()) {if (headNode != null) {try {return zookeeper.getData(dir + "/" + headNode, false, null);} catch (KeeperException.NoNodeException e) {//另一个客户端已经移除了队头节点,尝试获取下一个节点}}}}}private Map<Long, String> orderedChildren(Watcher watcher) throws KeeperException, InterruptedException {Map<Long, String> orderedChildren = new TreeMap<>();List<String> childNames;childNames = zookeeper.getChildren(dir, watcher);for (String childName : childNames) {try {if (!childName.regionMatches(0, prefix, 0, prefix.length())) {LOG.warn("Found child node with improper name: {}", childName);continue;}String suffix = childName.substring(prefix.length());Long childId = Long.parseLong(suffix);orderedChildren.put(childId, childName);} catch (NumberFormatException e) {LOG.warn("Found child node with improper format : {}", childName, e);}}return orderedChildren;}

3)、remove方法

public class DistributedQueue {public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException {Map<Long, String> orderedChildren;while (true) {try {//获取所有排好序的子节点orderedChildren = orderedChildren(null);} catch (KeeperException.NoNodeException e) {throw new NoSuchElementException();}if (orderedChildren.size() == 0) {throw new NoSuchElementException();}//移除队头节点for (String headNode : orderedChildren.values()) {String path = dir + "/" + headNode;try {byte[] data = zookeeper.getData(path, false, null);zookeeper.delete(path, -1);return data;} catch (KeeperException.NoNodeException e) {//另一个客户端已经移除了队头节点,尝试移除下一个节点}}}}

2、分布式锁

1)、排他锁

排他锁的核心是如何保证当前有且仅有一个事务获取锁,并且锁被释放后,所有正在等待获取锁的事务都能够被通知到

定义锁

通过在ZooKeeper上创建一个子节点来表示一个锁,例如/exclusive_lock/lock节点就可以被定义为一个锁

获取锁

在需要获取排他锁时,所有的客户端都会试图通过调用create()接口,在/exclusive_lock节点下创建临时子节点/exclusive_lock/lock。ZooKeeper会保证在所有的客户端中,最终只有一个客户能够创建成功,那么就可以认为该客户端获取了锁。

同时,所有没有获取到锁的客户端就需要到/exclusive_lock节点上注册一个子节点变更的watcher监听,以便实时监听到lock节点的变更情况

释放锁

/exclusive_lock/lock是一个临时节点,因此在以下两种情况下,都有可能释放锁

  • 当前获取锁的客户端机器发生宕机,那么ZooKeeper上的这个临时节点就会被移除

  • 正常执行完业务逻辑后,客户端就会主动将自己创建的临时节点删除

无论在什么情况下移除了lock节点,ZooKeeper都会通知所有在/exclusive_lock节点上注册了子节点变更watcher监听的客户端。这些客户端在接收到通知后,再次重新发起分布式锁获取,即重复获取锁过程

2)、羊群效应

上面的排他锁的实现可能引发羊群效应:当一个特定的znode改变的时候ZooKeeper触发了所有watcher的事件,由于通知的客户端很多,所以通知操作会造成ZooKeeper性能突然下降,这样会影响ZooKeeper的使用

改进后的分布式锁实现

获取锁

首先,在Zookeeper当中创建一个持久节点ParentLock。当第一个客户端想要获得锁时,需要在ParentLock这个节点下面创建一个临时顺序节点Lock1

之后,Client1查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock1是不是顺序最靠前的一个。如果是第一个节点,则成功获得锁

这时候,如果再有一个客户端Client2前来获取锁,则在ParentLock下再创建一个临时顺序节点Lock2

Client2查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock2是不是顺序最靠前的一个,结果发现节点Lock2并不是最小的

于是,Client2向排序仅比它靠前的节点Lock1注册watcher,用于监听Lock1节点是否存在。这意味着Client2抢锁失败,进入了等待状态

这时候,如果又有一个客户端Client3前来获取锁,则在ParentLock下再创建一个临时顺序节点Lock3

Client3查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock3是不是顺序最靠前的一个,结果同样发现节点Lock3并不是最小的

于是,Client3向排序仅比它靠前的节点Lock2注册watcher,用于监听Lock2节点是否存在。这意味着Client3同样抢锁失败,进入了等待状态

这样一来,Client1得到了锁,Client2监听了Lock1,Client3监听了Lock2。这恰恰形成了一个等待队列,很像是Java当中ReentrantLock所依赖的AQS

释放锁

释放锁分为两种情况:

1.任务完成,客户端显示释放

当任务完成时,Client1会显示调用删除节点Lock1的指令

2.任务执行过程中,客户端崩溃

获得锁的Client1在任务执行过程中,如果客户端崩溃,则会断开与Zookeeper服务端的连接。根据临时节点的特性,相关联的节点Lock1会随之自动删除

由于Client2一直监听着Lock1的存在状态,当Lock1节点被删除,Client2会立刻收到通知。这时候Client2会再次查询ParentLock下面的所有节点,确认自己创建的节点Lock2是不是目前最小的节点。如果是最小,则Client2获得了锁

同理,如果Client2也因为任务完成或者节点崩溃而删除了节点Lock2,那么Client3就会接到通知

最终,Client3成功得到了锁

3)、共享锁

共享锁又称为读锁,在同一时刻可以允许多个线程访问,典型的就是ReentrantReadWriteLock里的读锁,它的读锁是可以被共享的,但是它的写锁确实每次只能被独占

定义锁

和排他锁一样,同样是通过ZooKeeper上的数据节点来表示一个锁,是一个类似于/shared_lock/[Hostname]-请求类型-序号的临时顺序节点,例如/shared_lock/192.168.0.1-R-0000000001,那么,这个节点就代表了一个共享锁,如下图所示:

获取锁

在需要获取共享锁时,所有客户端都会到/shared_lock这个节点下面创建一个临时顺序节点,如果当前是读请求,那么就创建例如/shared_lock/192.168.0.1-R-0000000001的节点;如果是写请求,那么就创建例如/shared_lock/192.168.0.1-W-0000000001的节点

判断读写顺序

每个锁竞争者,只需要关注/shared_lock节点下序号比自己小的那个节点是否存在即可,具体实现如下:

1)客户端调用create()方法创建一个类似于/shared_lock/[Hostname]-请求类型-序号的临时顺序节点

2)客户端调用getChildren()接口来获取所有已经创建的子节点列表

3)判断是否可以获取共享锁:

  • 读请求:没有比自己序号小的节点或者所有比自己序号小的节点都是读请求

  • 写请求:序号是否最小

4)如果无法获取共享锁,那么就调用exist()来对比自己小的那个节点注册watcher

  • 读请求:向比自己序号小的最后一个写请求节点注册watcher监听

  • 写请求:向比自己序号小的最后一个节点注册watcher监听

5)等待watcher通知,继续进入步骤2

释放锁

释放锁的逻辑和排他锁是一致的

整个共享锁的获取和释放流程如下图:

4)、排他锁源码解析

1)加锁过程

public class WriteLock extends ProtocolSupport {public synchronized boolean lock() throws KeeperException, InterruptedException {if (isClosed()) {return false;}//确认持久父节点是否存在ensurePathExists(dir);//真正获取锁的逻辑 调用ProtocolSupport的retryOperation()方法return (Boolean) retryOperation(zop);}
class ProtocolSupport {protected Object retryOperation(ZooKeeperOperation operation)throws KeeperException, InterruptedException {KeeperException exception = null;for (int i = 0; i < RETRY_COUNT; i++) {try {//调用LockZooKeeperOperation的execute()方法return operation.execute();} catch (KeeperException.SessionExpiredException e) {LOG.warn("Session expired {}. Reconnecting...", zookeeper, e);throw e;} catch (KeeperException.ConnectionLossException e) {if (exception == null) {exception = e;}LOG.debug("Attempt {} failed with connection loss. Reconnecting...", i);retryDelay(i);}}throw exception;}
public class WriteLock extends ProtocolSupport {private class LockZooKeeperOperation implements ZooKeeperOperation {private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir)throws KeeperException, InterruptedException {List<String> names = zookeeper.getChildren(dir, false);for (String name : names) {if (name.startsWith(prefix)) {id = name;LOG.debug("Found id created last time: {}", id);break;}}if (id == null) {id = zookeeper.create(dir + "/" + prefix, data, getAcl(), EPHEMERAL_SEQUENTIAL);LOG.debug("Created id: {}", id);}}@SuppressFBWarnings(value = "NP_NULL_PARAM_DEREF_NONVIRTUAL",justification = "findPrefixInChildren will assign a value to this.id")public boolean execute() throws KeeperException, InterruptedException {do {if (id == null) {long sessionId = zookeeper.getSessionId();String prefix = "x-" + sessionId + "-";//创建临时顺序节点findPrefixInChildren(prefix, zookeeper, dir);idName = new ZNodeName(id);}//获取所有子节点List<String> names = zookeeper.getChildren(dir, false);if (names.isEmpty()) {LOG.warn("No children in: {} when we've just created one! Lets recreate it...", dir);id = null;} else {//对所有子节点进行排序SortedSet<ZNodeName> sortedNames = new TreeSet<>();for (String name : names) {sortedNames.add(new ZNodeName(dir + "/" + name));}ownerId = sortedNames.first().getName();SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName);//是否存在序号比自己小的节点if (!lessThanMe.isEmpty()) {ZNodeName lastChildName = lessThanMe.last();lastChildId = lastChildName.getName();LOG.debug("Watching less than me node: {}", lastChildId);//有序号比自己小的节点,则调用exist()向前一个节点注册watcherStat stat = zookeeper.exists(lastChildId, new LockWatcher());if (stat != null) {return Boolean.FALSE;} else {LOG.warn("Could not find the stats for less than me: {}", lastChildName.getName());}} //没有序号比自己小的节点,则获取锁else {if (isOwner()) {LockListener lockListener = getLockListener();if (lockListener != null) {lockListener.lockAcquired();}return Boolean.TRUE;}}}}while (id == null);return Boolean.FALSE;}

2)解锁过程

public class WriteLock extends ProtocolSupport {public synchronized void unlock() throws RuntimeException {if (!isClosed() && id != null) {try {//删除当前节点,此时会触发后一个节点的watcherZooKeeperOperation zopdel = () -> {zookeeper.delete(id, -1);return Boolean.TRUE;};zopdel.execute();} catch (InterruptedException e) {LOG.warn("Unexpected exception", e);Thread.currentThread().interrupt();} catch (KeeperException.NoNodeException e) {} catch (KeeperException e) {LOG.warn("Unexpected exception", e);throw new RuntimeException(e.getMessage(), e);} finally {LockListener lockListener = getLockListener();if (lockListener != null) {lockListener.lockReleased();}id = null;}}}

3、选举

使用临时顺序znode来表示选举请求,创建最小后缀数字znode的选举请求成功。在协同设计上和分布式锁是一样的,不同之处在于具体实现。不同于分布式锁,选举的具体实现对选举的各个阶段做了细致的监控

public class LeaderElectionSupport implements Watcher {    public synchronized void start() {state = State.START;dispatchEvent(EventType.START);LOG.info("Starting leader election support");if (zooKeeper == null) {throw new IllegalStateException("No instance of zookeeper provided. Hint: use setZooKeeper()");}if (hostName == null) {throw new IllegalStateException("No hostname provided. Hint: use setHostName()");}try {//发起选举请求 创建临时顺序节点makeOffer();//选举请求是否被满足determineElectionStatus();} catch (KeeperException | InterruptedException e) {becomeFailed(e);}}private void makeOffer() throws KeeperException, InterruptedException {state = State.OFFER;dispatchEvent(EventType.OFFER_START);LeaderOffer newLeaderOffer = new LeaderOffer();byte[] hostnameBytes;synchronized (this) {newLeaderOffer.setHostName(hostName);hostnameBytes = hostName.getBytes();newLeaderOffer.setNodePath(zooKeeper.create(rootNodeName + "/" + "n_",hostnameBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL));leaderOffer = newLeaderOffer;}LOG.debug("Created leader offer {}", leaderOffer);dispatchEvent(EventType.OFFER_COMPLETE);}private void determineElectionStatus() throws KeeperException, InterruptedException {state = State.DETERMINE;dispatchEvent(EventType.DETERMINE_START);LeaderOffer currentLeaderOffer = getLeaderOffer();String[] components = currentLeaderOffer.getNodePath().split("/");currentLeaderOffer.setId(Integer.valueOf(components[components.length - 1].substring("n_".length())));//获取所有子节点并排序List<LeaderOffer> leaderOffers = toLeaderOffers(zooKeeper.getChildren(rootNodeName, false));for (int i = 0; i < leaderOffers.size(); i++) {LeaderOffer leaderOffer = leaderOffers.get(i);if (leaderOffer.getId().equals(currentLeaderOffer.getId())) {LOG.debug("There are {} leader offers. I am {} in line.", leaderOffers.size(), i);dispatchEvent(EventType.DETERMINE_COMPLETE);//如果当前节点是第一个,则成为Leaderif (i == 0) {becomeLeader();} //如果有选举请求在当前节点前面,则进行等待,调用exist()向前一个节点注册watcherelse {becomeReady(leaderOffers.get(i - 1));}break;}}}

●【练手项目】基于SpringBoot的ERP系统,自带进销存+财务+生产功能

●分享一套基于SpringBoot和Vue的企业级中后台开源项目,代码很规范!

●能挣钱的,开源 SpringBoot 商城系统,功能超全,超漂亮!

PS:因为公众号平台更改了推送规则,如果不想错过内容,记得读完点一下“在看”,加个“星标”,这样每次新文章推送才会第一时间出现在你的订阅列表里。点“在看”支持我们吧!

使用ZooKeeper实现分布式队列、分布式锁和选举详解相关推荐

  1. SpringBoot2.x整合轻量级分布式定时任务ShedLock3.x的使用详解

    目录 前言 SpringBoot2.x整合轻量级分布式定时任务ShedLock3.x的使用详解 一.关于ShedLock 二.ShedLock的三个核心组件 三.ShedLock使用三步走 四.Spr ...

  2. hbase 二进制数据写入_分布式数据库HBase的架构设计详解(有彩蛋)

    原标题:分布式数据库HBase的架构设计详解(有彩蛋) 本文根据DBAplus社群第99期线上分享整理而成,文末还有好书送哦~ 讲师介绍 陈鸿威 云财经大数据CTO 曾任百度高级工程师,现主持设计开发 ...

  3. 国行ps4服务器维护,赶快回家试试!国行PS4终解除锁区附详解教程

    是不是买了国行PS4但是无法和其他服务器玩家联机非常的苦恼呢?现在这个现状即将打破,是不是兴奋的想回家试试你的主机呢?让我们先来还原下整个事情经过,感觉就是索尼在老主机寿终正寝前给玩家放松的福利- 近 ...

  4. java 消息队列详解_Java消息队列-Spring整合ActiveMq的详解

    本篇文章主要介绍了详解Java消息队列-Spring整合ActiveMq ,小编觉得挺不错的,现在分享给大家,也给大家做个参考.一起跟随小编过来看看吧 1.概述 首先和大家一起回顾一下Java 消息服 ...

  5. Redis系列教程(八):分布式锁的由来、及Redis分布式锁的实现详解

    在很多场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务.分布式锁等.那具体什么是分布式锁,分布式锁应用在哪些业务场景.如何来实现分布式锁呢?今天来探讨分布式锁这个话题. ...

  6. 分布式文件系统HDFS实践及原理详解part3

    HDFS原理 说明:3.5开头目录是因为和上篇文章内容同属一章,所以开头使用了3.5 3.5 HDFS核心设计 3.5.1 心跳机制 1. Hadoop 是 Master/Slave 结构,Maste ...

  7. 分布式架构系列: 负载均衡技术详解 | 技术头条

    戳蓝字"CSDN云计算"关注我们哦! 技术头条:干货.简洁.多维全面.更多云计算精华知识尽在眼前,get要点.solve难题,统统不在话下! 作者:ITFLY8 转自:架构师技术联 ...

  8. zookeeper集群节点热扩容和迁移详解

    推荐阅读 Helm3(K8S 资源对象管理工具)视频教程:https://edu.csdn.net/course/detail/32506 Helm3(K8S 资源对象管理工具)博客专栏:https: ...

  9. 【消息队列】五个问题详解消息中间件

    1.消息中间件是什么 消息队列,又叫做消息中间件.是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成.通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程 ...

最新文章

  1. 关于项目数据库设计--投票系统
  2. WINCE6.0+S3C6410基于SD卡启动
  3. matlab神经网络1:功能特色
  4. silverlight 不可
  5. python configparser模块来 读取 、 创建 和 修改 配置文件
  6. java装饰模式理解_Java设计模式之装饰模式趣谈
  7. 全场景解析!基于 Flink 的12个实时数仓生产实践
  8. layui开发使用文档(镜像网址)
  9. 俄罗斯方块-C语言-完整代码
  10. 歌评-《Rex Incognito 尘世闲游》-陈致逸
  11. ios百度地图开发之路径规划
  12. Spark+Scala:数据分析统计
  13. js实现form的submit请求
  14. 数据挖掘:Apriori 关联规则分析算法原理分析与代码实现
  15. MobPush for Flutter
  16. java后台获取Excel后缀名以及sheet页名称
  17. PMM 监控原理以及部署
  18. jasper 导出html,使用jasper导出HTML并解决图片显示问题代码
  19. 程序猿解决BUG之总结
  20. 软考的证书含金量高吗?

热门文章

  1. 2000元档855旗舰来了 网友:都过时了,哪有人买
  2. 已免押1000亿!芝麻信用:靠信用出去浪
  3. 线性表:1.什么是线性表
  4. TCP/IP的初步理解,TCP和UDP的区别
  5. 数据结构二叉树线索化
  6. tracker服务器列表2020_个人服务器采购整理分享
  7. Linux系统C/C++通用错误码实现模板
  8. Linux系统有线网络抓包程序
  9. java 异常处理 简书_Java基础知识8-异常处理
  10. linux在python的虚拟环境下运行程序_在win10和linux上分别安装Python虚拟环境的方法步骤...