前言

Curator是Apache开源的一个Java工具类,通过它操作Zookeeper会变得极度舒适!

前置条件:已掌握的基本操作,比如在后台可以增减节点、ACL权限设置等。

1.Zookeeper  原生API

1.超时重连,不支持自动,需要手动操作
2.Watch注册一次后会失效
3.不支持递归创建节点

2.Zookeeper  API 升级版 Curator

1.解决watcher的注册一次就失效
2.提供更多解决方案并且实现简单 
3.提供常用的ZooKeeper工具类
4.编程风格更爽,点点点就可以了
5.可以递归创建节点等

3. 知识点

1.使用curator建立与zk的连接
2.使用curator添加/递归添加节点
3.使用curator删除/递归删除节点
4.使用curator创建/验证 ACL(访问权限列表)
5.使用curator监听 单个/父 节点的变化(watch事件)
6.基于curator实现Zookeeper分布式锁(需要掌握基本的多线程知识)
7.基于curator实现分布式计数器


由于代码量比较大,下文只会涉及到重点代码片段,从而突出重点。


准备工作

1.Maven的pom.xml中配置Zookeeper和Curator的依赖

<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><!--建议和本地安装版本保持一致--><version>3.7.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.2.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.2.0</version>
</dependency>

curator-recipes:封装了一些高级特性,如:Cache事件监听、 Elections选举、分布式锁、分布式计数器、分布式Barrier、Queues队列等

一、使用Curator建立与Zookeeper服务连接

该类会被频繁使用,故抽离为一个单独的Utils,里面只存放前后台Connect的代码。

public class ZkConnectCuratorUtil {final static Logger log = LoggerFactory.getLogger(ZkConnectCuratorUtil.class);public CuratorFramework zkClient = null; //zk的客户端工具Curator(在本类通过new实例化的是,自动start)private static final int MAX_RETRY_TIMES = 3; //定义失败重试次数private static final int BASE_SLEEP_TIME_MS = 5000; //连接失败后,再次重试的间隔时间 单位:毫秒private static final int SESSION_TIME_OUT = 1000000; //会话存活时间,根据业务灵活指定 单位:毫秒private static final String ZK_SERVER_IP_PORT = "192.168.31.216:2181";//Zookeeper服务所在的IP和客户端端口private static final String NAMESPACE = "workspace";//指定后,默认操作的所有的节点都会在该工作空间下进行//本类通过new ZkCuratorUtil()时,自动连通zkClientpublic ZkConnectCuratorUtil() {RetryPolicy retryPolicy = new RetryNTimes(MAX_RETRY_TIMES, BASE_SLEEP_TIME_MS);//首次连接失败后,重试策略zkClient = CuratorFrameworkFactory.builder()//.authorization("digest", "root:root".getBytes())//登录超级管理(需单独配).connectString(ZK_SERVER_IP_PORT).sessionTimeoutMs(SESSION_TIME_OUT).retryPolicy(retryPolicy).namespace(NAMESPACE).build();zkClient.start();}public void closeZKClient() {if (zkClient != null) {this.zkClient.close();}}public static void main(String[] args) {ZkConnectCuratorUtil zkUtil=new ZkConnectCuratorUtil();boolean ifStarted=zkUtil.zkClient.isStarted();System.out.println("当前客户的状态:" + (ifStarted ? "连接中" : "已关闭"));zkUtil.closeZKClient();boolean ifClose = zkUtil.zkClient.isStarted();System.out.println("当前客户的状态:" + (ifClose ? "连接成功" : "已关闭"));}
}

下方预告:

增删改查均属前后台交互的操作,故统一写在CuratorDao.java中,统一管理。

各方法第一个入参(CuratorFramework zkClient),使用时通过如下代码获取:

ZkConnectCuratorUtil zkUtil=new ZkConnectCuratorUtil();//new的同时,zk也被启动
CuratorFramework zkClient=zkUtil.zkClient;

注: CuratorFramework相当于ZK原生API中的ZooKeeper类


二、 使用Curator来实现节点的增删改查

1.使用curator(递归)添加节点

    //级联创建节点(原生API不支持/后台客户端也不支持,但是Curator支持)public static void createNodes(CuratorFramework zkClient,String nodePath,String nodeData) throws Exception {zkClient.create().creatingParentContainersIfNeeded()//创建父节点,如果需要的话.withMode(CreateMode.PERSISTENT) //指定节点是临时的,还是永久的.withACL(Ids.OPEN_ACL_UNSAFE) //指定节点的操作权限.forPath(nodePath, nodeData.getBytes());System.out.println(nodePath+"节点已成功创建…");}

2.使用curator(递归)删除节点

    //删除node节点及其子节点public static void deleteNodeWithChild(CuratorFramework zkClient,String nodePath) throws Exception {zkClient.delete().guaranteed()                //保证删除:如果删除失败,那么在后端还是继续会删除,直到成功.deletingChildrenIfNeeded()  //级联删除子节点//.withVersion(1)//版本号可以据需使用.forPath(nodePath);System.out.println(nodePath+"节点已删除成功…");}

3.使用curator更新节点数据

//更新节点data数据public static void updateNodeData(CuratorFramework zkClient,String nodePath,String nodeNewData) throws Exception {zkClient.setData().withVersion(0).forPath(nodePath, nodeNewData.getBytes());//版本号据需使用,默认可以不带System.out.println(nodePath+"节点数据已修改成功…");}

4.使用curator查询节点数据

    //查询node节点数据public static void getNodeData(CuratorFramework zkClient,String nodePath) throws Exception {Stat stat=new Stat();byte [] data=zkClient.getData().storingStatIn(stat).forPath(nodePath);System.out.println("节点"+nodePath+"的数据为"+new String(data));System.out.println("节点的版本号为:"+stat.getVersion());}

5.使用curator查询节点的子节点

    //打印node子节点public static void printChildNodes(CuratorFramework zkClient,String parentNodePath) throws Exception {List<String> childNodes= zkClient.getChildren().forPath(parentNodePath);System.out.println("开始打印子节点");for (String str : childNodes) {System.out.println(str);}}

6.使用curator判断节点是否存在

    //判断node节点是否存在public static void checkNodeExists(CuratorFramework zkClient,String nodePath) throws Exception {Stat stat=zkClient.checkExists().forPath(nodePath);System.out.println(null==stat?"节点不存在":"节点存在");}

关于CuratorFramework 的更多用法,点击这里 。

三、 使用Curator高级API特性之Cache缓存监控节点变化

cache是一种缓存机制,可以借助cache实现监听。

简单来说,cache在客户端缓存了znode的各种状态,当感知到zk集群的znode状态变化,会触发event事件,注册的监听器会处理这些事件。

curator支持的cache种类有4种Path Cache,Node Cache,Tree Cache,Curator Cache

1)Path Cache

Path Cache用来观察ZNode的子节点并缓存状态,如果ZNode的子节点被创建,更新或者删除,那么Path Cache会更新缓存,并且触发事件给注册的监听器。

它是通过PathChildrenCache类来实现的,监听器注册是通过PathChildrenCacheListener。

2)Node Cache

Node Cache用来观察ZNode自身,如果ZNode节点本身被创建,更新或者删除,那么Node Cache会更新缓存,并触发事件给注册的监听器。

它是通过NodeCache类来实现的,监听器对应的接口为NodeCacheListener。

3)Tree Cache

Tree Cache是上两种的合体,Tree Cache观察的是自身+所有子节点的所有数据,并缓存所有节点数据。

它是通过TreeCache类来实现的,监听器对应的接口为TreeCacheListener。

4)Curator Cache ( requires ZooKeeper 3.6+)

Curator Cache,是在zk3.6新版本添加的特性,该版本的出现是为了逐步淘汰上面3监听。

它是通过CuratorCache类来实现的,监听器对应的接口为CuratorCacheListener。

Curator一次性的watch

import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.WatchedEvent;public class MyCuratorWatcher implements CuratorWatcher {@Overridepublic void process(WatchedEvent event) throws Exception {System.out.println("触发watcher,节点路径为:" + event.getPath());switch (event.getType()) {case NodeCreated:break;default:break;}}
}
 //一次性的watchpublic static void watchOnce(CuratorFramework zkClient,String nodePath) throws Exception {zkClient.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);}

1.NodeCache监听当前节点变化

通过NodeCacheListener接口持续监听节点的变化来实现

    //持续监听的watchpublic static void watchForeverByNodeCache(CuratorFramework zkClient,String nodePath) throws Exception {final NodeCache nodeCache=new NodeCache(zkClient, nodePath);//把监听节点,转换为nodeCachenodeCache.start(false);//默认为false  设置为true时,会自动把节点数据存放到nodeCache中;设置为false时,初始化数据为空ChildData cacheData=nodeCache.getCurrentData(); if(null==cacheData) {System.out.println("NodeCache节点的初始化数据为空……");}else {System.out.println("NodeCache节点的初始化数据为"+new String(cacheData.getData()));}//设置循环监听nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {ChildData cdata=nodeCache.getCurrentData();if(null==cdata) {System.out.println("节点发生了变化,可能刚刚被删除!");nodeCache.close();//关闭监听}else {String data=new String(cdata.getData());String path=nodeCache.getCurrentData().getPath();System.out.println("节点路径"+path+"数据发生了变化,最新数据为:"+data);}}});}

2.PathChildrenCache只监听子节点变化

通过PathChildrenCacheListener接口持续监听子节点来实现

    //持续监听watch子节点的任何变化public static void watchForeverByPathChildrenCache(CuratorFramework zkClient,String nodePath) throws Exception {final PathChildrenCache childrenCache=new PathChildrenCache(zkClient, nodePath,true);//把监听节点,转换为childrenCache/*** StartMode:初始化方式*    POST_INITIALIZED_EVENT: 异步初始化,初始化之后会触发事件(会进入下面的第一个case)* NORMAL:异步初始化 (不会进入下面的第一个case)*  BUILD_INITIAL_CACHE: 同步初始化(把节点数据同步缓存到Cache中)*/childrenCache.start(StartMode.NORMAL);List<ChildData> childDataList=childrenCache.getCurrentData();System.out.println("当前节点所有子节点的数据列表如下:");for (ChildData childData : childDataList) {System.out.println(new String(childData.getData()));}childrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {switch (event.getType()) {case INITIALIZED:System.out.println("子节点初始化OK…");break;case CHILD_ADDED:System.out.println("子节点"+event.getData().getPath()+"已被成功添加,数据data="+new String(event.getData().getData()));break;case CHILD_UPDATED:System.out.println("子节点"+event.getData().getPath()+"数据发生变化,新数据data="+new String(event.getData().getData()));break;case CHILD_REMOVED:System.out.println("子节点"+event.getData().getPath()+"已被移除~");break;case CONNECTION_RECONNECTED:System.out.println("正在尝试重新建立连接…");break;case CONNECTION_SUSPENDED:System.out.println("连接状态被暂时停止…");break;default:break;}}});}

3.TreeCache是上两者的合体,既监听自身,也监听所有子节点变化

通过TreeCacheListener接口来实现

    public static void treeCache(CuratorFramework zkClient) throws Exception {final String path = "/treeChildrenCache";final TreeCache treeCache = new TreeCache(zkClient, path);treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {switch (event.getType()){case NODE_ADDED:System.out.println("节点变动触发:NODE_ADDED:" + event.getData().getPath());break;case NODE_REMOVED:System.out.println("节点变动触发:NODE_REMOVED:" + event.getData().getPath());break;case NODE_UPDATED:System.out.println("节点变动触发:NODE_UPDATED:" + event.getData().getPath());break;case CONNECTION_LOST:System.out.println("节点变动触发:CONNECTION_LOST:" + event.getData().getPath());break;case CONNECTION_RECONNECTED:System.out.println("节点变动触发:CONNECTION_RECONNECTED:" + event.getData().getPath());break;case CONNECTION_SUSPENDED:System.out.println("节点变动触发:CONNECTION_SUSPENDED:" + event.getData().getPath());break;case INITIALIZED:System.out.println("节点变动触发:INITIALIZED:" + event.getData().getPath());break;default:break;}}});//据需可以继续做一些其他的增删改操作zkClient.create().withMode(CreateMode.PERSISTENT).forPath(path);Thread.sleep(1000);zkClient.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");Thread.sleep(1000);zkClient.delete().forPath(path + "/c1");Thread.sleep(1000);zkClient.delete().forPath(path);Thread.sleep(1000);zkClient.close();}

4.Curator Cache,是在zk3.6新版本添加的特性,Curator需5.*+

它的出现是为了替换以上3个监听(NodeCache、PathCache、TreeCache),它通过CuratorCacheListener.builder().for***来选择对应的监听。最后再通过curatorCache.listenable().addListener(listener);注册监听。

 public static void curatorCache1(CuratorFramework zkClient) {final String path = "/curatorCache";CuratorCache curatorCache = CuratorCache.build(zkClient, path);curatorCache.listenable().addListener(new CuratorCacheListener() {@Overridepublic void event(Type type, ChildData oldData, ChildData newdata) {switch (type) {case NODE_CREATED://各种判断break;default:break;}}});}public static void curatorCache2(CuratorFramework zkClient) throws InterruptedException {final String path = "/curatorCache";CuratorCache curatorCache = CuratorCache.builder(zkClient,path).build();//构建监听器//新旧对照://1.node cache--> CuratorCacheListener.builder().forNodeCache(new NodeCacheListener(){} );//2.path cache--> CuratorCacheListener.builder().forPathChildrenCache();//3.tree cache--> CuratorCacheListener.builder().forTreeCache.forTreeCache();CuratorCacheListener listener = CuratorCacheListener.builder().forNodeCache(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println("节点改变了...");}}).build();//添加监听curatorCache.listenable().addListener(listener);//开启监听curatorCache.start();//让线程休眠30s(为了方便测试)Thread.sleep(1000 * 30);}

5.测试环节

    public static void main(String[] args) throws Exception {ZkConnectCuratorUtil cto = new ZkConnectCuratorUtil();CuratorFramework zkClient=cto.zkClient;//获取zk客户端CuratorDao dao=new CuratorDao();String nodePath="/super/succ";dao.createNodes(zkClient, nodePath, "super");//创建节点
//      dao.updateNodeData(zkClient, nodePath, "hello");//更新节点数据
//      dao.deleteNodeWithChild(zkClient, nodePath);
//      dao.getNodeData(zkClient, nodePath);
//      dao.printChildNodes(zkClient, nodePath);
//      dao.checkNodeExists(zkClient, nodePath);
//      dao.watchOnce(zkClient, nodePath);
//      dao.watchForeverByNodeCache(zkClient, nodePath);
//      dao.watchForeverByPathChildrenCache(zkClient, nodePath);Thread.sleep(300000); //延迟sleep时间,便于后才修改节点,看前台是否会继续触发watchcto.closeZKClient();}

四、使用Curator创建/验证ACL(访问权限列表)

为了更清晰的表示ACL的代码实现,下面代码与上面代码完全隔离,新建CuratorAcl.java

准备环节

1.连通Zk时,就指定登录权限

下面代码判断,比上面的ZkConnectCuratorUtil.java中的登录代码,多了一行账号密码。连通zk时,就指定账号密码,避免后面操作需要登录权限的节点,每次都输入账号密码。通常情况下,账号密码可以通过构造参数传入;也可以在操作节点时指定登录权限。

//本类代码,只涉及ACL操作
public class CuratorAcl {public CuratorFramework client = null;public static final String workspace="workspace";public static final String zkServerPath = "192.168.31.216:2181";public CuratorAcl() {RetryPolicy retryPolicy = new RetryNTimes(3, 5000);client = CuratorFrameworkFactory.builder().authorization("digest", "mayun:mayun".getBytes())//通常情况下,登录账号、密码可以通过构造参数传入,暂时固定,据需修改.connectString(zkServerPath).sessionTimeoutMs(20000).retryPolicy(retryPolicy).namespace(workspace).build();client.start();}public void closeZKClient() {if (client != null) {this.client.close();}}
}

2.写一个把明文的账号密码转换为加密后的密文的工具类

//把明文的账号密码转换为加密后的密文
public class AclUtils {public static String getDigestUserPwd(String loginId_Username_Passwd) {String digest = "";try {digest = DigestAuthenticationProvider.generateDigest(loginId_Username_Passwd);} catch (NoSuchAlgorithmException e) {e.printStackTrace();}return digest;}public static void main(String[] args) throws IOException, InterruptedException, KeeperException, Exception {String id = "mayun:mayun";String idDigested = getDigestUserPwd(id);System.out.println(idDigested); // mayun:KThXmEntEPZyHsQk7tbP5ZzEevk=}
}

下面各方法,均在CuratorAcl.java内部完成


1.使用自定义工具类AclUtils,一次性给多个用户赋Acl权限

    public static List<ACL> getAcls() throws NoSuchAlgorithmException{List<ACL> acls=new ArrayList<ACL>();Id mayun =new Id("digest", AclUtils.getDigestUserPwd("mayun:mayun"));Id lilei =new Id("digest", AclUtils.getDigestUserPwd("lilei:lilei"));acls.add(new ACL(Perms.ALL, mayun));//给mayun一次性赋值所有权限acls.add(new ACL(Perms.READ, lilei));acls.add(new ACL(Perms.DELETE | Perms.CREATE, lilei));//给lilei分两次赋权限(目的:看不同的赋权方式)return acls;}

2.级联创建节点,并赋予节点操作权限

    //级联创建节点,并赋予节点操作权限public static void createNodesCascade(CuratorAcl cto,String nodePath,String nodeData,List<ACL> acls) throws Exception {String result=cto.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls, true)//给节点赋权限.forPath(nodePath, nodeData.getBytes());System.out.println("创建成功,result="+result);     }

3.读取节点数据

    // 读取节点数据public  void getNodeData(CuratorAcl cto,String nodePath) throws Exception {Stat stat = new Stat();byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);if(null!=stat) {System.out.println("节点" + nodePath + "的数据为: " + new String(data));System.out.println("该节点的版本号为: " + stat.getVersion());}}

4.修改具有ACL权限节点的data数据

    //修改具有ACL权限节点的datapublic void modNodeDataWhichWithAcl(CuratorAcl cto,String nodePath,String nodeNewData) throws Exception {cto.getNodeData(cto, nodePath);System.out.println("节点修改后的数据为:"+nodeNewData);cto.client.setData().forPath(nodePath, nodeNewData.getBytes());System.out.println("修改成功");}

5.两种方法判断node节点是否存(优先使用第一种)

    //两种方法判断node节点是否存public void checkNodeExists(CuratorAcl cto,String nodePath) throws Exception {cto.getNodeData(cto, nodePath);System.out.println("-----------=================-------------");//判断节点是否存在,方法一(路径前面会自动添加workspace)Stat stat=cto.client.checkExists().forPath(nodePath);System.out.println("======="+stat==null?"不存在":"存在");//判断节点是否存在,方法二(路径前面需手动添加workspace)Stat stat2 = cto.client.getZookeeperClient().getZooKeeper().exists("/"+workspace+nodePath, false);System.out.println("======="+stat2==null?"不存在":"存在");}

6.ACL权限的main方法测试

通过java代码给某个节点添加ACL权限后,后台登陆zk客户端时,是无法直接操作该节点被ACL控制的权限的操作的,要想操作具有ACL权限的节点,方法只有两个。
1、知道该节点输入用户都有哪些,用这些用户的账号密码登录

2、使用超级用户登录(需要单独配置,如何配置超级用户(见:三、5))

#getAcl /succ/testDigest 查看都有哪些用户对该节点有操作权限

#addauth digest succ:succ 登录

 public static void main(String[] args) throws Exception {CuratorAcl cto = new CuratorAcl();boolean isZkCuratorStarted = cto.client.isStarted();System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接成功" : "已关闭"));String nodePath1 = "/acl/tom/bin";String nodePath2 = "/acl/father/child/sub";
//      cto.createNodesCascade(cto, nodePath1, "aclTest", getAcls());//首次创建,报错,只能创建父节点,子节点无法创建
//      cto.client.setACL().withACL(getAcls()).forPath("/curatorNode");//给节点创建权限
//      cto.getNodeData(cto, "/super");
//      cto.getNodeData(cto, "/acl");cto.checkNodeExists(cto, nodePath2);cto.closeZKClient();boolean isZkCuratorStarted2 = cto.client.isStarted();System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接成功" : "已关闭"));}

五、分布式锁

假如某网上商城,在单线程单系统情况下,不会存在库存为负数的情况。

然后多线程、微服务的情况下,这种情况不加处理就难以避免了,例如:库存10,A进来订购6,在锁定这个6个商品的同时,在高并发情况下,就有可能有个B和A在同一时间点,也订购了7个商品,那么库存就有可能会变成 负3。

为了避免这个情况的出现,就需要有个分布式锁,监控这库存,锁定每笔交易……

Curator的5种分布式锁及其对应的核心类:

1.重入式排它锁 Shared Reentrant Lock,实现类:InterProcessMutex

2.不可重入排它锁 Shared Lock ,实现类:InterProcessSemaphoreMutex

3.可重入读写锁 Shared Reentrant Read Write Lock,实现类: InterProcessReadWriteLock 、InterProcessLock

4.多锁对象容器(多共享锁) Multi Shared Lock,将多个锁作为单个实体管理的容器,实现类:InterProcessMultiLock、InterProcessLock

5.共享信号锁Shared Semaphore ,实现类:InterProcessSemaphoreV2

跨 JVM 工作的计数信号量。使用相同锁路径的所有 JVM 中的所有进程将实现进程间有限的租用集。此外,这个信号量大多是“公平的”——每个用户将按照请求的顺序获得租用(从 ZK 的角度来看)。

有两种模式可用于确定信号量的最大租用。在第一种模式中,最大租用是由给定路径的用户维护的约定。在第二种模式中,SharedCountReader 用作给定路径的信号量的方法,以确定最大租用。

1.重入式排它锁InterProcessMutex

public InterProcessMutex(CuratorFramework client, String path)

获取/释放锁的API

   public void acquire() throws Exception;//获取锁,获取不到锁一直阻塞,zk连接中断则抛异常public boolean acquire(long time, TimeUnit unit) throws Exception;//获取锁,超过该时间后,直接返回false,zk连接中断则抛异常public void release() throws Exception;//释放锁

通过release()方法释放锁。InterProcessMutex 实例可以重用。Revoking ZooKeeper recipes wiki定义了可协商的撤销机制。为了撤销mutex, 调用下面的方法

/**
* 将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
* Parameters:
* listener - the listener
*/
public void makeRevocable(RevocationListener<T> listener)

2.不可重入排它锁InterProcessSemaphoreMutex

public InterProcessSemaphoreMutex(CuratorFramework client, String path)

使用InterProcessSemaphoreMutex,调用方法类似,区别在于该锁是不可重入的,在同一个线程中不可重入

3.可重入读写锁InterProcessReadWriteLock 、InterProcessLock

一个读写锁管理一对相关的锁。一个负责读操作,另外一个负责写操作。读操作在写锁没被使用时可同时由多个进程使用,而写锁使用时不允许读 (阻塞)。此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。这也意味着写锁可以降级成读锁, 比如请求写锁 —>读锁 —->释放写锁。从读锁升级成写锁是不成的。

4.多锁对象容器(多共享锁) ,将多个锁作为单个实体管理,InterProcessMultiLock、InterProcessLock

Multi Shared Lock是一个锁的容器。当调用acquire, 所有的锁都会被acquire(上锁),如果请求失败,所有的锁都会被release (释放锁)。同样调用release时所有的锁都被release(失败被忽略)。基本上,它就是组锁的代表,在它上面的请求释放操作都会传递给它包含的所有的锁。主要涉及两个类:InterProcessMultiLock、InterProcessLock

它的构造函数需要包含的锁的集合,或者一组ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

5.分布式锁示例

 public class ZkLock {final static Logger log = LoggerFactory.getLogger(ZkLock.class);public CuratorFramework zkClient = null; // zk的客户端工具Curator(在本类通过new实例化的是,自动start)private static final int BASE_SLEEP_TIME_MS = 1000; // 连接失败后,再次重试的间隔时间 单位:毫秒private static final int MAX_RETRY_TIMES = 10; // 定义失败重试次数private static final int SESSION_TIME_OUT = 1000000; // 会话存活时间,根据业务灵活指定 单位:毫秒private static final String ZK_SERVER_IP_PORT = "192.168.31.216:2181";// Zookeeper服务所在的IP和客户端端口private static final String NAMESPACE = "workspace";// 指定后,默认操作的所有的节点都会在该工作空间下进行static int j = 10;//初始化zk客户端public ZkLock() {// 重试策略:初试时间为1s 重试10次RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRY_TIMES);// 通过工厂建立连接zkClient = CuratorFrameworkFactory.builder().connectString(ZK_SERVER_IP_PORT) // 连接地址.sessionTimeoutMs(SESSION_TIME_OUT).retryPolicy(retryPolicy)// 重试策略.build();zkClient.start();}    }

下面是核心测试方法

public static void lockTest(CuratorFramework zkClient) throws InterruptedException {// 使用分布式锁,所有系统同时监听同一个节点,达到分布式锁的目的final InterProcessMutex lock = new InterProcessMutex(zkClient, "/test");final CountDownLatch countDownLatch = new CountDownLatch(1);for (int i = 0; i < 10; i++) {//启动10个线程new Thread(new Runnable() {@Overridepublic void run() {try {countDownLatch.await();// 线程等待一起执行lock.acquire();// 分布式锁,数据同步// 处理业务j--;System.out.println(j);} catch (Exception e) {e.printStackTrace();} finally {try {// 释放锁lock.release();} catch (Exception e) {e.printStackTrace();}}}}, "t" + i).start();}Thread.sleep(1000);countDownLatch.countDown();// 模拟十个线程一起并发.指定一起执行}public static void main(String[] args) throws InterruptedException {ZkLock zkl=new ZkLock();ZkLock.lockTest(zkl.zkClient);}

六、分布式计数器

利用Zookeeper可以实现一个集群共享的计数器。只要使用相同的path就可以得到最新的计数器值, 这是由ZooKeeper的一致性保证的。Curator有两个计数器:DistributedAtomicInteger,DistributedAtomicLong。这个两个除了计数范围(int、long)不同外,没有任何不同。操作也非常简单,跟AtomicInteger大同小异。

increment() //加1
decrement() //减1
compareAndSet(Integer expectedValue, Integer newValue) //cas操作
get() //获取当前值
add():增加特定的值
subtract(): 减去特定的值
trySet(): 尝试设置计数值

使用的时候,必须检查返回结果的succeeded(), 它代表此操作是否成功。如果操作成功, preValue()代表操作前的值, postValue()代表操作后的值。

1.代码示例

 public static void count(CuratorFramework zkClient) throws Exception {//分布式计数器DistributedAtomicInteger counter=new DistributedAtomicInteger(zkClient,"/super",new RetryNTimes(3,100));//初始化counter.forceSet(0);AtomicValue<Integer> value = counter.increment();//原子自增System.out.println("原值为"+value.preValue());System.out.println("更改后的值为"+value.postValue());System.out.println("状态"+value.succeeded());}public static void main(String[] args) throws Exception {ZkLock zkl=new ZkLock();//ZkLock.lockTest(zkl.zkClient);ZkLock.count(zkl.zkClient);}

尾言

力求语言简洁,清晰描述出Curator的常规使用方法,如有不正之处请批评指正。另外Curator还有一些高端的用法:分布式屏障—Barrier、Double-barrier,分布式队列DistributedQueueDistributed Queue(数据量很小的话,酌情考虑可以使用,大的话不建议使用),后续持续更新!

附注

1、Zookeeper集群的快速搭建(图文详解)

2、Zookeeper常规命令 | Watch监控命令 | ACL权限操作 | 四字命令详解 | 配置super超级用户权限

3、Zookeeper 的计数器CountDownLatch 的使用(模拟火箭发射)

4、Zookeeper图形化的客户端(ZooInspector)| 图形化的监控工具(taoKeeper)的下载和使用

Zookeeper后端开发工具Curator的使用 | Curator对节点的增删改查 | ACL权限控制 | 分布式锁 | 分布式计数器 | 附带最新版本下载相关推荐

  1. zookeeper 数据节点的增删改查

    1.连接服务端 [root@localhost bin]# ./zkCli.sh -server 127.0.0.1:2181 Connecting to 127.0.0.1:2181 2018-05 ...

  2. 【敏捷开发】Node.js(nodejs)实现一个接口完成增删改查聚合接口4个功能,最大限度节省接口数量,正所谓“一口多用”(基础版、免登陆、无鉴权)

    注意,前情提示: 本代码基于<Node.js(nodejs)对本地JSON文件进行增.删.改.查操作(轻车熟路)> 传送门Node.js(nodejs)对本地JSON文件进行增.删.改.查 ...

  3. vue2.0.js基础开发使用心得(结合实际项目对数据的增删改查)

    1.首先申明,没有使用vue 的组件,以及脚手架等,都是一些基础语法的使用. ------------------------------------------------------------- ...

  4. MongoDBC++开发(四)bsoncxx::builder::stream::document的增删改查操作

    2. mongocxx 续上一篇,接着看一下mongocxx中的文件有些什么. 先看一下基本的增删改查操作: 2.1 connect.cpp 数据库的链接 参见之前的博客. 2.2 create.cp ...

  5. Zookeeper 图形化的客户端工具(ZooInspector)| 图形化的监控工具(taoKeeper)的下载和使用 | 后端开发工具Curator的高级应用

    前言 Zookeeper周边知识点较多,单篇文章难以表述,分成多篇,可据需点击进入. 本篇重点介绍,ZooInspector和taoKeeper. 1.Zookeeper的下载和安装 | 集群快速搭建 ...

  6. 电信报表java_china_netcom 用java和框架Strus开发的电信报表系统,信息 的增删改查 Develop 238万源代码下载- www.pudn.com...

    文件名称: china_netcom下载 收藏√  [ 5  4  3  2  1 ] 开发工具: Java 文件大小: 532 KB 上传时间: 2014-09-04 下载次数: 1 提 供 者: ...

  7. java springboot整合zookeeper入门教程(增删改查)

    java springboot整合zookeeper增删改查入门教程 zookeeper的安装与集群搭建参考:https://www.cnblogs.com/zwcry/p/10272506.html ...

  8. zookeeper curator客户端之增删改查

    zookeeper curator客户端之增删改查 zookeeper安装:https://www.cnblogs.com/zwcry/p/10272506.html curator客户端是Apach ...

  9. Curator基本操作(Zookeeper节点增删改查)

    Curator是Zookeeper的Java客户端库,官网为 https://curator.apache.org . 环境 Ubuntu 22.04 Zookeeper 3.7.1 JDK 17.0 ...

最新文章

  1. 《预训练周刊》第27期:谷歌发布最新看图说话模型、GitHub:平台上30%的新代码受益于AI助手Copilot...
  2. PC Lint 初学
  3. C++实现Linux下弹出U盘的方法
  4. 网站发布外链如何防止后期被删除?
  5. DFT实训教程笔记3(bibili版本)-SOC Scan Implementtation Scan Practice Session II
  6. Linux 交换空间优化(swap 优化)(积极使用交换空间占比,可能会使程序运行缓慢!)
  7. KindEditor中使用val()获取content内容后图片不显示
  8. windows server 启用 vss_windows服务器常用的安全加固方法
  9. 串口与modem流量控制大全(1)
  10. php搜索文件名,PHP搜索文件且列出文件名的代码参考
  11. python functools模块方法
  12. java隐式参数的作用_隐式参数_scala教程_田守枝Java技术博客
  13. NTSD命令用法详解
  14. Tensorflow教程之语音识别
  15. 怎么把知网的外文文献翻译成中文_知网查重中文译成英文可行吗?
  16. Linux cp命令的内涵
  17. 爱立信、意大利电信及高通公司基于5G毫米波创下远程传输速度纪录
  18. MacBook常用快捷键
  19. 外卖CPS小程序部署指南,个人获取美团外卖小程序跳转链接
  20. 节假日读取接口_2018年节假日API接口,直接计算好的

热门文章

  1. element ui——Pagination 自定义分页样式
  2. 诺基亚java游戏那种_非诺基亚手机如何玩S40 Java游戏_网易手机频道
  3. 华为鸿蒙最大合作伙伴,全球第三大手机系统「鸿蒙」上线,这19款能抢先用…...
  4. 路由器wds设置最终版、一些ios基础知识
  5. 解决sql注入问题(丛林战争项目)
  6. 精选收集50个计算机热门视频教程免费下载
  7. OKR和KPI如何结合使用
  8. OKR制定实例大全--让你制定OKR不发愁
  9. 涞水智人,回溯万年文明
  10. openresty通过ffi调用一个c编写的base64动态库