面向過程版:

package distributedLockProcess;import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class dl {private static final Logger LOG = LoggerFactory.getLogger(dl.class);//确保所有线程运行结束;private static final String CONNECTION_STRING = "192.168.1.201:2181,192.168.1.202:2181,192.168.1.203:2181";private static final int SESSION_TIMEOUT = 10000;private static final String GROUP_PATH = "/disLocks";private static final String SUB_PATH = "/disLocks/sub";private static final int THREAD_NUM = 10; public static CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);public static void main(String[] args) {for(int i=0; i < THREAD_NUM; i++){final int threadId = i;new Thread(){@Overridepublic void run() {final CountDownLatch countDownLatch = new CountDownLatch(1);try{//此線程連接ZooKeeperZooKeeper zk = new ZooKeeper(CONNECTION_STRING, SESSION_TIMEOUT, new Watcher(){@Overridepublic void process(WatchedEvent event){if (event.getState() == KeeperState.SyncConnected){countDownLatch.countDown();}}});countDownLatch.await();System.out.println(Thread.currentThread().getName() + " --- ZooKeeper.connect()");//GROUP_PATH不存在的话,由一个线程创建即可;if(zk.exists(GROUP_PATH, false)==null){LOG.info( Thread.currentThread().getName() + "节点创建成功, Path: "+ zk.create( GROUP_PATH,("该节点由线程"+Thread.currentThread().getName() + "创建").getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT )+ ", content: " + ("该节点由线程"+Thread.currentThread().getName() + "创建") );}String selfPath = zk.create(SUB_PATH,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);LOG.info(Thread.currentThread().getName()+"创建锁路径:"+selfPath);if(checkMinPath(zk, selfPath)){LOG.info(Thread.currentThread().getName() + "获取锁成功,赶紧干活!");dosomething();threadSemaphore.countDown();try {if(zk.exists(selfPath,false) == null){LOG.error(Thread.currentThread().getName()+"本节点已不在了...");return;}zk.delete(selfPath, -1);LOG.info(Thread.currentThread().getName() + "删除本节点:"+selfPath);zk.close();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (KeeperException e) {// TODO Auto-generated catch blocke.printStackTrace();}}} catch (Exception e){LOG.error("【第"+threadId+"个线程】 抛出的异常:");e.printStackTrace();}}}.start();}try {
//              Thread.sleep(60000);threadSemaphore.await();LOG.info("所有线程运行结束!");} catch (Exception e) {e.printStackTrace();}}protected static boolean checkMinPath(final ZooKeeper zk, final String selfPath) throws KeeperException, InterruptedException {List<String> subNodes = zk.getChildren(GROUP_PATH, false);Collections.sort(subNodes);int index = subNodes.indexOf( selfPath.substring(GROUP_PATH.length()+1));switch (index){case -1:{LOG.error(Thread.currentThread().getName()+"本节点已不在了..."+selfPath);return false;}case 0:{LOG.info(Thread.currentThread().getName()+"子节点中,我果然是老大"+selfPath);return true;}default:{final String waitPath = GROUP_PATH +"/"+ subNodes.get(index - 1);LOG.info(Thread.currentThread().getName()+"获取子节点中,排在我前面的"+waitPath);try{zk.getData(waitPath, new Watcher(){@Overridepublic void process(WatchedEvent event) {if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {  LOG.info(Thread.currentThread().getName()+ "收到情报,排我前面的家伙已挂,我是不是可以出山了?");  try {  if(checkMinPath(zk, selfPath)){  LOG.info(Thread.currentThread().getName() + "获取锁成功,赶紧干活!");dosomething();threadSemaphore.countDown();try {if(zk.exists(selfPath,false) == null){LOG.error(Thread.currentThread().getName()+"本节点已不在了...");} else {zk.delete(selfPath, -1);LOG.info(Thread.currentThread().getName() + "删除本节点:"+selfPath);zk.close();                                                }} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (KeeperException e) {// TODO Auto-generated catch blocke.printStackTrace();}}  } catch ( Exception e) {  e.printStackTrace();  }  } }}, new Stat());}catch(KeeperException e){if(zk.exists(waitPath,false) == null){LOG.info(Thread.currentThread().getName()+"子节点中,排在我前面的"+waitPath+"已失踪,幸福来得太突然?");return checkMinPath(zk, selfPath);}else{throw e;}} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}return false;}protected static void dosomething() {System.out.println("我正在獨享資源互斥地進行工作。。。");}
}

面向對象重構版:

package distributedLockObject;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;public class AbstractZooKeeper implements Watcher{protected ZooKeeper zookeeper;protected CountDownLatch countDownLatch = new CountDownLatch(1);public ZooKeeper connect(String hosts, int SESSION_TIMEOUT) throws IOException, InterruptedException{zookeeper = new ZooKeeper(hosts, SESSION_TIMEOUT, this);countDownLatch.await();System.out.println("AbstractZooKeeper.connect()");return zookeeper;}public void process(WatchedEvent event){if (event.getState() == KeeperState.SyncConnected){countDownLatch.countDown();}}public void close() throws InterruptedException{zookeeper.close();}
}
package distributedLockObject;import java.util.Collections;
import java.util.List;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class DistributedLock    {private ZooKeeper zk = null;private String selfPath;private String waitPath;private String LOG_PREFIX_OF_THREAD=Thread.currentThread().getName();private static final String GROUP_PATH = "/disLocks";private static final String SUB_PATH = "/disLocks/sub";private static final Logger LOG = LoggerFactory.getLogger(DistributedLock.class);private Watcher  watcher;public DistributedLock(ZooKeeper  zk ) {this.zk = zk; }public Watcher getWatcher() {return watcher;}public void setWatcher(Watcher watcher) {this.watcher = watcher;}/*** 获取锁* @return*/public boolean  getLock()  throws KeeperException, InterruptedException {selfPath = zk.create(SUB_PATH,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);LOG.info(LOG_PREFIX_OF_THREAD+"创建锁路径:"+selfPath);if(checkMinPath()){return true;}return false;}/*** 创建节点* @param path 节点path* @param data 初始数据内容* @return*/public boolean createPath( String path, String data  ) throws KeeperException, InterruptedException {if(zk.exists(path, false)==null){LOG.info( LOG_PREFIX_OF_THREAD + "节点创建成功, Path: "+ this.zk.create( path,data.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT )+ ", content: " + data );}return true;}public void unlock(){try {if(zk.exists(this.selfPath,false) == null){LOG.error(LOG_PREFIX_OF_THREAD+"本节点已不在了...");return;}zk.delete(this.selfPath, -1);LOG.info(LOG_PREFIX_OF_THREAD + "删除本节点:"+selfPath);zk.close();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (KeeperException e) {// TODO Auto-generated catch blocke.printStackTrace();}}/*** 检查自己是不是最小的节点* @return*/public  boolean checkMinPath() throws KeeperException, InterruptedException {List<String> subNodes = zk.getChildren(GROUP_PATH, false);Collections.sort(subNodes);int index = subNodes.indexOf( selfPath.substring(GROUP_PATH.length()+1));switch (index){case -1:{LOG.error(LOG_PREFIX_OF_THREAD+"本节点已不在了..."+selfPath);return false;}case 0:{LOG.info(LOG_PREFIX_OF_THREAD+"子节点中,我果然是老大"+selfPath);return true;}default:{this.waitPath = GROUP_PATH +"/"+ subNodes.get(index - 1);LOG.info(LOG_PREFIX_OF_THREAD+"获取子节点中,排在我前面的"+waitPath);try{zk.getData(waitPath, this.watcher, new Stat());return false;}catch(KeeperException e){if(zk.exists(waitPath,false) == null){LOG.info(LOG_PREFIX_OF_THREAD+"子节点中,排在我前面的"+waitPath+"已失踪,幸福来得太突然?");return checkMinPath();}else{throw e;}}}}}public String getWaitPath() {return waitPath;}}
package distributedLockObject;public interface DoTemplate {void dodo();
}
package distributedLockObject;import java.io.IOException;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;public class LockService {//确保所有线程运行结束;private static final String CONNECTION_STRING = "192.168.1.201:2181,192.168.1.202:2181,192.168.1.203:2181";private static final int THREAD_NUM = 10; public static   CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);private static final String GROUP_PATH = "/disLocks";private static final int SESSION_TIMEOUT = 10000;AbstractZooKeeper az = new AbstractZooKeeper();public void doService(DoTemplate doTemplate){try {ZooKeeper zk = az.connect(CONNECTION_STRING,SESSION_TIMEOUT);DistributedLock dc = new DistributedLock(zk);LockWatcher lw = new LockWatcher(dc,doTemplate);dc.setWatcher(lw);//GROUP_PATH不存在的话,由一个线程创建即可;dc.createPath(GROUP_PATH, "该节点由线程"+Thread.currentThread().getName() + "创建");boolean rs = dc.getLock();if (rs==true) {lw.dosomething();dc.unlock();}} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (KeeperException e) {// TODO Auto-generated catch blocke.printStackTrace();}}
}
package distributedLockObject;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class LockWatcher implements Watcher{private static final Logger LOG = LoggerFactory.getLogger(LockWatcher.class);private DistributedLock distributedLock;private  DoTemplate doTemplate;public LockWatcher(DistributedLock distributedLock,DoTemplate doTemplate) {// TODO Auto-generated constructor stubthis.distributedLock = distributedLock;this.doTemplate = doTemplate;}@Overridepublic void process(WatchedEvent event) {// TODO Auto-generated method stubif (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(distributedLock.getWaitPath())) {  LOG.info(Thread.currentThread().getName()+ "收到情报,排我前面的家伙已挂,我是不是可以出山了?");  try {  if(distributedLock.checkMinPath()){  dosomething();distributedLock.unlock();}  } catch ( Exception e) {  e.printStackTrace();  }  } }public   void dosomething(){LOG.info(Thread.currentThread().getName() + "获取锁成功,赶紧干活!");doTemplate.dodo();TestLock.threadSemaphore.countDown();}}
package distributedLockObject;
import java.util.concurrent.CountDownLatch;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class TestLock {private static final Logger LOG = LoggerFactory.getLogger(TestLock.class);//确保所有线程运行结束;private static final int THREAD_NUM = 10; public static   CountDownLatch threadSemaphore = new CountDownLatch(THREAD_NUM);public static void main(String[] args) {for(int i=0; i < THREAD_NUM; i++){final int threadId = i;new Thread(){@Overridepublic void run() {try{new LockService().doService(new DoTemplate() {@Overridepublic void dodo() {// TODO Auto-generated method stubLOG.info("我要修改一个文件。。。。"+threadId);}});} catch (Exception e){LOG.error("【第"+threadId+"个线程】 抛出的异常:");e.printStackTrace();}}}.start();}try {
//              Thread.sleep(60000);threadSemaphore.await();LOG.info("所有线程运行结束!");} catch (Exception e) {e.printStackTrace();}}
}

ZooKeeper編程02--多線程的分佈式鎖相关推荐

  1. socket c/s分佈式編程

    SOCKET: Socket接口介于应用程序与硬件之间.对Socket的理解可以简化为:它是封装了数据流(Stream)的从机器到机器的一条软接线,通过这条软接线,并借助于线两端的收发程序,网络上的机 ...

  2. linux多线程编程和linux 2.6下的nptl,Linux多線程編程和Linux 2.6下的NPTL

    這幾天由於工作需要,琢磨了一下Linux下的多線程的相關資料.Linux下最常用的多線程支持庫為Pthread庫,它是glibc庫的組成部分.但是關於Pthread的說明文檔非常缺乏,特別是對POSI ...

  3. linux 有名管道pipe,linux 用無名管道pipe和有名管道fifo實現線程間通信

    1.pipe 用與實現同一個進程下不同線程間的通信(跟IPC進程間通信中的具有血緣關系的進程通信實現方式一樣) #include #include #include #include #include ...

  4. mysql 开启 thread pool_MySQL線程池(THREAD POOL)的處理

    背景介紹 MySQL常用(目前線上使用)的線程調度方式是one-thread-per-connection(每連接一個線程),server為每一個連接創建一個線程來服務,連接斷開后,這個線程進入thr ...

  5. ZooKeeper編程01--RMI服務的多服務器管理

    服務器端與客戶端都要用到: public interface ZkInfo {String ZK_CONNECTION_STRING = "192.168.1.201:2181,192.16 ...

  6. 自定義 ForkJoinPool 線程池,并消除classLoader加载失败的问题

    自定義 ForkJoinPool 線程池,并消除classLoader加载失败的问题 添加 setContextClassLoader 写入classLoader 信息 import java.uti ...

  7. java 原子量_Java線程:新特征-原子量

    所謂的原子量即操作變量的操作是"原子的",該操作不可再分,因此是線程安全的. 為何要使用原子變量呢,原因是多個線程對單個變量操作也會引起一些問題.在Java5之前,可以通過vola ...

  8. java方法中 thread,Java中的線程Thread方法之---join()

    上一篇我們說到了Thread中的stop方法,這一篇我們再來看一下方法join的使用,那么方法Join是干啥用的? 簡單回答,同步,如何同步? 怎么實現的? 下面將逐個回答. join方法從字面上的意 ...

  9. 应用调优常用技巧-線程池

    应用调优常用技巧-線程池 应用调优常用技巧 - 线程池 線程池的好處 核心API-操作類 核心API-監控類 2-2 线程池BlockingQueue详解.选择与调优 調優技巧 2-3 线程池Sche ...

最新文章

  1. CPU步进号(版本号)
  2. 微调Faster-R-CNN-InceptionV2完成高准确率安全帽检测任务
  3. 遍历处理path及其子目录所有文件
  4. 沈南鹏问了微软CEO四个问题
  5. ubuntu18找不到wifi适配器
  6. java 字符菜单_java该怎么写左侧菜单树
  7. php jwt token 解析,JSON Web Token(JWT)入坑详解
  8. 不要相信 errno 可靠
  9. 深度学习入门实例——基于keras的mnist手写数字识别
  10. set.seed(7)什么意思
  11. 瑞典皇家理工学院工程类表
  12. java opencv3轮廓_如何在OpenCV中获得单独的轮廓(并填充它们)?
  13. 字符编码笔记:ASCII,Unicode和 UTF-8
  14. Oracle数据库常见版本
  15. 目标检测 SSD网络结构
  16. 三进制 四进制计算机原理,三进制计算机(中国三进制计算机)
  17. 用什么擦地最干净脑筋急转弯_你没想过的“脑筋急转弯”,才是启发孩子智力的法宝(附资源下载)...
  18. windows下安装禅道
  19. 随着裁员浪潮滚滚而来,科技工作者的泡沫是否即将破灭?
  20. WEB项目的相对路径与绝对路径

热门文章

  1. 观察者模式Java实现
  2. php 邮件验证_PHP程序来验证电子邮件地址
  3. scala怎么做幂运算_Scala幂(幂)函数示例
  4. mui实现分享功能_MUI 分享功能(微信、QQ 、朋友圈)
  5. rtmp流\http流测试地址
  6. 汇编程序中,字符数据和ASCII的对应关系
  7. android 中文api 在线测试,android webview测试方法
  8. 分布式是写出来的(六)
  9. 466. 统计重复个数 golang[转]
  10. C语言atoi函数的用法