前一段时间有讨论过用redis来实现分布式锁,讲到setNx不是原子性、redis新的set方法及其误删和守护线程,还为了原子性不得不使用redis的脚本。虽然最终分布式锁的这个效果是实现了,但是,不够优雅。这里讨论一下zookeeper对分布式锁的实现。

  首先说一下zk中节点的类型,共分为四个类型:持久节点、临时节点和持久有序节点、临时有序节点。 
  什么是持久节点呢?字面意思,就是持久化的节点,当创建持久节点后,客户端断开了和服务器的链接,持久节点仍旧存在,与之相反的就是临时节点了,临时节点会在客户端断开链接后消失(客户端主动自刎或者zk服务器清理门户)。 
  什么是顺序节点呢?先说下非顺序节点,比如节点“lock”,你只可以创建一次,创建第二次就会提示你“NodeExistsException”

Exception in thread "main" org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /lock

  假如使用的是顺序节点呢,上面这个异常就不会出现了,顺序节点会在节点名字后面追加一个序号,如果你使用“/lock”作为顺序节点,实际节点路径是“/lock0000000001”,十位数量的坑,绝对够用了。

  再来说一下zk的数据结构.zk是树形的数据结构, 
 
我们可以利用最左节点这个特性来实现分布式锁:如果有多个请求过来,会依次挂在指定节点下,我们每次取最左边的那个节点,执行完以后删除节点,让第二左节点接棒成为最左节点。

  下面就是具体的代码了: 
先看下Main.java,这个是入口

public class Main {private static final Logger LOG = LoggerFactory.getLogger(Main.class);private static final int SESSION_TIMEOUT = 10000;private static final String CONNECTION_STRING = "192.168.2.201:2181,192.168.2.201:2182,192.168.2.201:2183";static final String GROUP_PATH = "/lockParent";static final String SUB_PATH = GROUP_PATH + "/lock";public static void main(String[] args) {for (int i = 0; i < DistributedLock.THREAD_NUM; i++) {final int threadId = i + 1;new Thread() {@Overridepublic void run() {try {DistributedLock dc = new DistributedLock(threadId, GROUP_PATH, SUB_PATH);// 1.实例化zk客户端dc.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);// 2.创建公共的lock父节点(有/无视 无/创建)dc.createParent(GROUP_PATH, "该节点由线程" + threadId + "创建", true);// 3.创建子节点dc.createChilden();// 4.判断当前节点是不是罪左节点,是的话则成功获得锁if (dc.checkMinPath()) {dc.getLockSuccess();}} catch (Exception e) {LOG.error("【第" + threadId + "个线程】 抛出的异常:");e.printStackTrace();}}}.start();}try {DistributedLock.threadSemaphore.await();LOG.info("所有线程运行结束!");} catch (InterruptedException e) {e.printStackTrace();}}
}

还有具体的执行类 DistributedLock.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;import java.util.List;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;public class DistributedLock implements Watcher {private static final Logger LOG = LoggerFactory.getLogger(DistributedLock.class);/** 父lock节点,持久有序节点,本demo所有的锁都是在这个节点下面 */private String groupPath;/** 具体的锁节点,隶属于groupPath下 */private String subPath;/** 当前线程信息,做日志输出时候用到 */private String LOG_PREFIX_OF_THREAD;/** zk的客户端连接 */private ZooKeeper zk = null;/** 本节点的path */private String selfPath;/** pre节点的path */private String waitPath;/** 模拟的请求个数 */static final int THREAD_NUM = 3;/** 确保连接zk成功才开始工作,避免抛出连接丢失异常 * <span>org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /lockParent</span>*/private CountDownLatch connectedSemaphore = new CountDownLatch(1);/** 确保所有线程运行结束; */static final CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);public DistributedLock(int threadId,String groupPath,String subPath) {this.groupPath = groupPath;this.subPath = subPath;LOG_PREFIX_OF_THREAD = "【第" + threadId + "个线程】";}/*** 获取锁成功*/public void getLockSuccess() throws KeeperException, InterruptedException {if (zk.exists(this.selfPath, false) == null) {LOG.error(LOG_PREFIX_OF_THREAD + "本节点已不在了...");return;}LOG.info(LOG_PREFIX_OF_THREAD + "获取锁成功,赶紧干活!");Thread.sleep(5000);LOG.info(LOG_PREFIX_OF_THREAD + "删除本节点:" + selfPath);zk.delete(this.selfPath, -1);releaseConnection();threadSemaphore.countDown();}/*** 检查自己是不是最小的节点* @return*/public boolean checkMinPath() throws KeeperException, InterruptedException {List<String> subNodes = zk.getChildren(groupPath, false);Collections.sort(subNodes);int index = subNodes.indexOf(selfPath.substring(groupPath.length() + 1));switch (index) {case -1: {LOG.error(LOG_PREFIX_OF_THREAD + "父lock节点下找不到本节点:" + selfPath);return false;}case 0: {LOG.info(LOG_PREFIX_OF_THREAD + "本节点是最左节点:" + selfPath);return true;}default: {this.waitPath = groupPath + "/" + subNodes.get(index - 1);LOG.info(LOG_PREFIX_OF_THREAD + "获取子节点中,排在我前面的节点是:" + waitPath);try {zk.getData(waitPath, true, new Stat());} catch (KeeperException e) {if (zk.exists(waitPath, false) == null) {LOG.info(LOG_PREFIX_OF_THREAD + "子节点中,排在我前面的" + waitPath + "已失踪,幸福来得太突然?");return checkMinPath();} else {throw e;}}return false;}}}/*** 创建ZK连接* @param connectString    ZK服务器地址列表* @param sessionTimeout   Session超时时间*/public void createConnection(String connectString, int sessionTimeout) throws IOException, InterruptedException {zk = new ZooKeeper(connectString, sessionTimeout, this);connectedSemaphore.await();}/*** 创建子节点* @return*/public void createChilden() throws KeeperException, InterruptedException {selfPath = zk.create(subPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);LOG.info(LOG_PREFIX_OF_THREAD + "创建锁路径:" + selfPath);}/*** 创建节点* @param path 节点path* @param data 初始数据内容* @return*/public void createParent(String path, String data, boolean needWatch)throws KeeperException, InterruptedException {if (null == zk.exists(path, needWatch)) {String createResult = this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);LOG.info(LOG_PREFIX_OF_THREAD + "节点创建成功, Path: " + createResult + ", content: " + data);}}/*** 关闭ZK连接*/public void releaseConnection() {if (this.zk != null) {try {this.zk.close();} catch (InterruptedException e) {}}LOG.info(LOG_PREFIX_OF_THREAD + "释放连接");}public void process(WatchedEvent event) {if (event == null) {return;}Event.KeeperState keeperState = event.getState();Event.EventType eventType = event.getType();if (Event.KeeperState.SyncConnected == keeperState) {if (Event.EventType.None == eventType) {LOG.info(LOG_PREFIX_OF_THREAD + "成功连接上ZK服务器");connectedSemaphore.countDown();} else if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {LOG.info(LOG_PREFIX_OF_THREAD + "收到情报,排我前面的家伙已挂,我是不是可以出山了?");try {if (checkMinPath()) {getLockSuccess();}} catch (Exception e) {e.printStackTrace();}}} else if (Event.KeeperState.Disconnected == keeperState) {LOG.info(LOG_PREFIX_OF_THREAD + "与ZK服务器断开连接");} else if (Event.KeeperState.AuthFailed == keeperState) {LOG.info(LOG_PREFIX_OF_THREAD + "权限检查失败");} else if (Event.KeeperState.Expired == keeperState) {LOG.info(LOG_PREFIX_OF_THREAD + "会话失效");}}
}

运行Main类,可以清楚的看到三个线程先后获取锁、等待获取锁的信息

2018-06-23 23:54:56,656 - 【第3个线程】成功连接上ZK服务器
2018-06-23 23:54:56,867 - 【第3个线程】创建锁路径:/lockParent/lock0000000007
2018-06-23 23:54:56,923 - 【第3个线程】本节点是最左节点:/lockParent/lock0000000007
2018-06-23 23:54:56,936 - 【第3个线程】获取锁成功,赶紧干活!
2018-06-23 23:54:59,604 - 【第2个线程】成功连接上ZK服务器
2018-06-23 23:54:59,612 - 【第1个线程】成功连接上ZK服务器
2018-06-23 23:54:59,686 - 【第2个线程】创建锁路径:/lockParent/lock0000000008
2018-06-23 23:54:59,698 - 【第1个线程】创建锁路径:/lockParent/lock0000000009
2018-06-23 23:54:59,706 - 【第1个线程】获取子节点中,排在我前面的节点是:/lockParent/lock0000000008
2018-06-23 23:54:59,709 - 【第2个线程】获取子节点中,排在我前面的节点是:/lockParent/lock0000000007
2018-06-23 23:55:01,937 - 【第3个线程】删除本节点:/lockParent/lock0000000007
2018-06-23 23:55:01,963 - 【第2个线程】收到情报,排我前面的家伙已挂,我是不是可以出山了?
2018-06-23 23:55:01,979 - 【第2个线程】本节点是最左节点:/lockParent/lock0000000008
2018-06-23 23:55:01,993 - 【第2个线程】获取锁成功,赶紧干活!
2018-06-23 23:55:02,029 - 【第3个线程】释放连接
2018-06-23 23:55:06,993 - 【第2个线程】删除本节点:/lockParent/lock0000000008
2018-06-23 23:55:07,008 - 【第1个线程】收到情报,排我前面的家伙已挂,我是不是可以出山了?
2018-06-23 23:55:07,032 - 【第1个线程】本节点是最左节点:/lockParent/lock0000000009
2018-06-23 23:55:07,039 - 【第2个线程】释放连接
2018-06-23 23:55:07,062 - 【第1个线程】获取锁成功,赶紧干活!
2018-06-23 23:55:12,063 - 【第1个线程】删除本节点:/lockParent/lock0000000009
2018-06-23 23:55:12,107 - 【第1个线程】释放连接
2018-06-23 23:55:12,107 - 所有线程运行结束!

假如觉得上面的实现不够简洁,curator开源项目提供zookeeper分布式锁实现 
这是maven依赖

<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.0.0</version>
</dependency>

具体代码:

public static void main(String[] args) throws Exception {//创建zookeeper的客户端RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("10.21.41.181:2181,10.21.42.47:2181,10.21.49.252:2181", retryPolicy);client.start();//创建分布式锁, 锁空间的根节点路径为/curator/lockInterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");mutex.acquire();//获得了锁, 进行业务流程System.out.println("Enter mutex");//完成业务流程, 释放锁mutex.release();//关闭客户端client.close();
}

短短的十行代码就能实现锁的效果,只需要在acquire和release代码中间进行我们的业务操作即可,十分简洁明了

zookeeper应用之分布式锁相关推荐

  1. api 创建zookeeper客户端_zookeeper分布式锁原理及实现

    前言 本文介绍下 zookeeper方式 实现分布式锁 原理简介 zookeeper实现分布式锁的原理就是多个节点同时在一个指定的节点下面创建临时会话顺序节点,谁创建的节点序号最小,谁就获得了锁,并且 ...

  2. 关于Zookeeper来实现分布式锁的几个问题

    本文来说下Zookeeper实现分布式锁的几个问题 文章目录 概述 zk基本锁原理 监听通知机制 zk锁优化原理 zk锁的优缺点 本文小结 概述 zookeeper锁相关基础知识 zookeeper锁 ...

  3. zookeeper java版本号_GitHub - anlijie/java-lock: java版本的用Zookeeper实现的分布式锁

    java-lock java版本的用Zookeeper实现的分布式锁 代码已经测过,可以直接使用! #业务场景 在分布式情况,生成全局订单号ID 生成订单号方案 使用时间戳 使用UUID 推特 (Tw ...

  4. 基于zookeeper实现的分布式锁

    http://www.jiacheo.org/blog/122 zookeeper是hadoop下面的一个子项目, 用来协调跟hadoop相关的一些分布式的框架, 如hadoop, hive, pig ...

  5. Zookeeper之Curator分布式锁简单模拟12306抢票

    分布式锁 在我们进行单机应用开发涉及到并发应用同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程代码的同步问题,这时候多线程运行都是在同一个jvm之下,没有任何问题 但是当 ...

  6. 阿里面试官让我用Zk(Zookeeper)实现分布式锁

    点赞再看,养成习惯,微信搜索[三太子敖丙]关注这个互联网苟且偷生的工具人. 本文 GitHub https://github.com/JavaFamily 已收录,有一线大厂面试完整考点.资料以及我的 ...

  7. java 通过redis实现倒计时_突破Java面试(42) - Redis amp; ZooKeeper两种分布式锁实现的优劣...

    0 Github 1 面试题 一般实现分布式锁都有哪些方式?使用redis如何设计分布式锁?使用zk来设计分布式锁可以吗?这两种分布式锁的实现方式哪种效率比较高? 2 考点分析 一般先问问你zk,然后 ...

  8. zookeeper如何实现分布式锁解决羊群效应

    本文主要讲述在使用ZooKeeper进行分布式锁的实现过程中,如何有效的避免"羊群效应( herd effect)"的出现. 一般的分布式锁实现 这里简单的讲下一般的分布式锁如何实 ...

  9. 用zookeeper来实现分布式锁

    结合我们前面对zookeeper特性的分析和理解,我们可以利用zookeeper节点的特性来实现独占锁,就是同级节点的唯一性,多个进程往zookeeper的指定节点下创建一个相同名称的节点,只有一个能 ...

最新文章

  1. SDNU 1209.磊磊的随机数
  2. 推荐系统算法_机器学习和推荐系统(二)推荐算法简介
  3. 简单检测CDN链接是否有效
  4. linux 视频学习
  5. SpringBoot整合Shiro实现权限控制,验证码
  6. 如何修改Myeclipse中代码的字体大小?
  7. java xml 空节点_java:利用xpath删除xml中的空节点
  8. 带的动android的笔记本,实战解析 Win8触控本能驾驭Android吗?
  9. C# Winfrom Chart 图表控件 柱状图、折线图
  10. adb 切换默认桌面_android tv 模拟器默认桌面修改 Alternate Launcher开机自动启动app...
  11. 我的世界热力膨胀JAVA_我的世界热力膨胀MOD矿物类型介绍
  12. springCloud学习【4】之elasticsearch(1)
  13. C语言编程>第十四周 ⑦ 请编写一个函数fun,它的功能是:计算n门课程的平均分,计算结果作为函数值返回。
  14. Double 判断小数位数
  15. 【Qt入门第23篇】 数据库(三)利用QSqlQuery类执行SQL语句
  16. linux tail
  17. 男人必学的几样家常炒菜,尤其是面对一个不会做饭的媳妇。
  18. XPE常见问题FAQ
  19. 硬件描述语言Verilog学习(一)
  20. 嘎吱作响Creaks for mac(解谜游戏)中文版支持m1

热门文章

  1. python中返回上一步操作_selenium操作iframe元素,切入退出操作
  2. js计算器代码加减乘除_如何用jQuery做一个简易版计算器
  3. TOMCAT部署项目的方式
  4. 鼠标放到控件上 DIV悬浮提示效果(四种)
  5. emq数据储存到mysql_EMQ X 规则引擎系列(三)存储消息到 InfluxDB 时序数据库
  6. 只有一个显示器但是显示两个显示器_小米34寸曲面显示器深度体验 办公体验极佳 但是还有个大弱点...
  7. python约瑟夫环_Python实现约瑟夫环问题的方法
  8. 单片机与普通微型计算机不同在于,单片机与普通微型计算机的不同之处
  9. 异步通信在生活中的例子_聊聊工作中经常遇到的“异步”,你掌握了多少
  10. image vb6 图片自适应_请问如何实现图片在窗体上的大小自适应?