zookeeper系列(三)zookeeper的使用--开源客户端
作者:leesf 掌控之中,才会成功;掌控之外,注定失败, 原创博客地址:http://www.cnblogs.com/leesf456/ 奇文共欣赏,大家共同学习进步。
一、前言
上一篇博客已经介绍了如何使用Zookeeper提供的原生态Java API进行操作,本篇博文主要讲解如何通过开源客户端来进行操作。
二、ZkClient
ZkClient是在Zookeeper原声API接口之上进行了包装,是一个更易用的Zookeeper客户端,其内部还实现了诸如Session超时重连、Watcher反复注册等功能。
2.1 添加依赖,使用maven管理直接添加配置文件即可
在pom.xml文件中添加如下内容即可。
<dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.2</version>
</dependency>
2.2 创建会话
使用ZkClient可以轻松的创建会话,连接到服务端
package com.hust.grid.leesf.zkClient;import java.io.IOException;
import org.I0Itec.zkclient.ZkClient;/*** 使用ZkClient创建会话,连接到服务端*/
public class Create_Session_Sample {//会话超时时间private static final int SESSION_TIMEOUT = 5000;//ZkClient的实例对象private static ZkClient zkClient = null;/*** 连接服务器,创建一个会话* @param host 127.0.0.1:2181*/public void connect(String host){zkClient = new ZkClient(host, SESSION_TIMEOUT);System.out.println("ZooKeeper session established");}public static void main(String[] args) throws IOException, InterruptedException {Create_Session_Sample createSessionSample = new Create_Session_Sample();createSessionSample.connect("127.0.0.1:2181");}
}
运行结果:结果表明已经成功创建会话。
ZooKeeper session established.
2.3 创建节点
ZkClient提供了递归创建节点的接口,即其帮助开发者完成父节点的创建,再创建子节点。
package com.hust.grid.leesf.zkClient;import java.io.IOException;
import org.I0Itec.zkclient.ZkClient;
/*** ZkClient提供了递归创建节点的接口,即其帮助开发者完成父节点的创建,再创建子节点*/
public class Create_Node_Sample {//会话超时时间private static final int SESSION_TIMEOUT = 5000;//ZkClient的实例对象private static ZkClient zkClient = null;/*** 连接服务器,创建一个会话* @param host 127.0.0.1:2181*/public void connect(String host){zkClient = new ZkClient(host, SESSION_TIMEOUT);System.out.println("ZooKeeper session established");}public static void main(String[] args) throws IOException, InterruptedException {Create_Session_Sample createSessionSample = new Create_Session_Sample();createSessionSample.connect("127.0.0.1:2181");String path = "/zk-book/c1";//创建父子节点zkClient.createPersistent(path, true);System.out.println("success create znode.");}
}
运行结果:
success create znode.
结果表明已经成功创建了节点,值得注意的是,在原生态接口中是无法创建成功的(父节点不存在),但是通过ZkClient可以递归的先创建父节点,再创建子节点。
2.4 删除节点
ZkClient提供了递归删除节点的接口,即其帮助开发者先删除所有子节点(存在),再删除父节点
package com.hust.grid.leesf.zkClient;import java.io.IOException;
import org.I0Itec.zkclient.ZkClient;
/*** ZkClient提供了递归删除节点的接口,即其帮助开发者先删除所有子节点(存在),再删除父节点*/
public class Del_Data_Sample {//会话超时时间private static final int SESSION_TIMEOUT = 5000;//ZkClient的实例对象private static ZkClient zkClient = null;/*** 连接服务器,创建一个会话* @param host 127.0.0.1:2181*/public void connect(String host){zkClient = new ZkClient(host, SESSION_TIMEOUT);System.out.println("ZooKeeper session established");}public static void main(String[] args) throws IOException, InterruptedException {Del_Data_Sample delDataSample = new Del_Data_Sample();delDataSample.connect("127.0.0.1:2181");String path = "/zk-book";zkClient.createPersistent(path, "");zkClient.createPersistent(path + "/c1", "");System.out.println("success create znode.");zkClient.deleteRecursive(path);System.out.println("success delete znode.");}}
运行结果:结果表明ZkClient可直接删除带子节点的父节点,因为其底层先删除其所有子节点,然后再删除父节点。
ZooKeeper session established
success create znode.
success delete znode.
2.5 获取子节点
package com.hust.grid.leesf.zkClient;import java.util.List;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;public class Get_Children_Sample {//会话超时时间private static final int SESSION_TIMEOUT = 5000;//ZkClient的实例对象private static ZkClient zkClient = null;/*** 连接服务器,创建一个会话* @param host 127.0.0.1:2181*/public void connect(String host){zkClient = new ZkClient(host, SESSION_TIMEOUT);System.out.println("ZooKeeper session established");}public static void main(String[] args) throws InterruptedException {Get_Children_Sample getChildrenSample = new Get_Children_Sample();getChildrenSample.connect("127.0.0.1:2181");String path = "/zk-book";zkClient.subscribeChildChanges(path, new IZkChildListener() {/*** 子节点的路径被变更时回调此方法*/public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {System.out.println(parentPath + " 's child changed, currentChilds:" + currentChilds);}});zkClient.createPersistent(path);Thread.sleep(1000);zkClient.createPersistent(path + "/c1");Thread.sleep(1000);zkClient.delete(path + "/c1");Thread.sleep(1000);zkClient.delete(path);Thread.sleep(Integer.MAX_VALUE);}
}
运行结果:
/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:[c1]
/zk-book 's child changed, currentChilds:[]
/zk-book 's child changed, currentChilds:null
结果表明:
客户端可以对一个不存在的节点进行子节点变更的监听;
一旦客户端对一个节点注册了子节点列表变更监听之后,那么当该节点的子节点列表发生变更时,服务端都会通知客户端,并将最新的子节点列表发送给客户端,该节点本身的创建或删除也会通知到客户端;(给节点注册了儿子节点的监听事件,当子节点或本节点变动时都会通知客户端,冰倩返回新的节点列表)
2.6获取节点的数据,当订阅节点数据变动时出发事件
package com.hust.grid.leesf.zkClient;import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;public class Get_Data_Sample {//会话超时时间private static final int SESSION_TIMEOUT = 5000;//ZkClient的实例对象private static ZkClient zkClient = null;/*** 连接服务器,创建一个会话* @param host 127.0.0.1:2181*/public void connect(String host){zkClient = new ZkClient(host, SESSION_TIMEOUT);System.out.println("ZooKeeper session established");}public static void main(String[] args) throws InterruptedException {Get_Data_Sample getDataSample = new Get_Data_Sample();getDataSample.connect("127.0.0.1:2181");String path = "/zk-book";//创建一个临时节点zkClient.createEphemeral(path, "123");//节点数据变动时订阅监控zkClient.subscribeDataChanges(path, new IZkDataListener() {public void handleDataDeleted(String dataPath) throws Exception {System.out.println("Node " + dataPath + " deleted.");}public void handleDataChange(String dataPath, Object data) throws Exception {System.out.println("Node " + dataPath + " changed, new data: " + data);}});//获取path节点的数据System.out.println(zkClient.readData(path));//修改path节点的数据zkClient.writeData(path, "456");Thread.sleep(1000);//删除path节点的数据zkClient.delete(path);Thread.sleep(Integer.MAX_VALUE);}
}
2.7 检测节点是否存在,直接使用客户端检测节点是否存在,结果返回false不存在,true存在;
public class Exist_Node_Sample {public static void main(String[] args) throws Exception {String path = "/zk-book";ZkClient zkClient = new ZkClient("127.0.0.1:2181", 2000);System.out.println("Node " + path + " exists " + zkClient.exists(path));}
三、Curator客户端
Curator解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连,反复注册Watcher和NodeExistsException异常等,现已成为Apache的顶级项目。
3.1 添加依赖
使用maven管理时,在pom.xml文件中添加如下内容即可
<!-- https://mvnrepository.com/artifact/org.apache.curator/apache-curator --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>2.4.2</version></dependency>
3.2 创建会话
Curator除了使用一般方法创建会话外,还可以使用fluent风格进行创建;
package com.hust.grid.leesf.curator;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
/*** 使用Curator客户端,创建会话* @author songzl**/
public class Create_Session_Sample {public static void main(String[] args) throws Exception {//重试策略:重试时间每间隔1000毫秒,最大重试次数3RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);//第一种方式新建客户端:服务端ip和端口,session超时时间,连接的超时时间,重试策略实例CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000, 3000, retryPolicy);//大多数的方法在客户端启动之后才能工作client.start();System.out.println("Zookeeper session1 established. ");//第二种方式新建客户端:这里设置的“base”被作为根路径使用CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181").sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("base").build();client1.start();System.out.println("Zookeeper session2 established. "); }
}
注意:值得注意的是session2会话含有隔离命名空间,即客户端对Zookeeper上数据节点的任何操作都是相对/base目录进行的,这有利于实现不同的Zookeeper的业务之间的隔离。
3.3 创建节点
通过使用Fluent风格的接口,开发人员可以进行自由组合来完成各种类型节点的创建。
package com.hust.grid.leesf.curator;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;public class Create_Node_Sample {//服务端ip和端口号private String host = "127.0.0.1:2181";//session超时时间private static final int sessionTimeOut = 5000;//连接的超时时间private static final int connectTimeOut = 3000;//初始化Curator客户端private static CuratorFramework client = null;//重试策略:重试时间每间隔1000毫秒,最大重试次数3private RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);public void getCuratorFrameworkByNewClient(){//第一种方式新建客户端:服务端ip和端口,session超时时间,连接的超时时间,重试策略实例client = CuratorFrameworkFactory.newClient(host, sessionTimeOut, connectTimeOut, retryPolicy);//大多数的方法在客户端启动之后才能工作client.start();}public void getCuratorFrameworkByBuilder(){//第二种方式新建客户端:这里设置的“base”被作为此连接的根路径使用
// client = CuratorFrameworkFactory.builder().connectString(host)
// .sessionTimeoutMs(sessionTimeOut).retryPolicy(retryPolicy).namespace("base").build();//不设置namespaceclient = CuratorFrameworkFactory.builder().connectString(host).sessionTimeoutMs(sessionTimeOut).retryPolicy(retryPolicy).build();client.start();}public static void main(String[] args) throws Exception {String path = "/zk-book/c1";Create_Node_Sample createNodeSample = new Create_Node_Sample();createNodeSample.getCuratorFrameworkByBuilder();client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());System.out.println("success create znode: " + path);}}
3.4节点的增、删、改、查
package com.hust.grid.leesf.curator;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;public class Create_Node_Sample {//服务端ip和端口号private String host = "127.0.0.1:2181";//session超时时间private static final int sessionTimeOut = 5000;//连接的超时时间private static final int connectTimeOut = 3000;//初始化Curator客户端private static CuratorFramework client = null;//重试策略:重试时间每间隔1000毫秒,最大重试次数3private RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);public void getCuratorFrameworkByNewClient(){//第一种方式新建客户端:服务端ip和端口,session超时时间,连接的超时时间,重试策略实例client = CuratorFrameworkFactory.newClient(host, sessionTimeOut, connectTimeOut, retryPolicy);//大多数的方法在客户端启动之后才能工作client.start();}public void getCuratorFrameworkByBuilder(){//第二种方式新建客户端:这里设置的“base”被作为此连接的根路径使用
// client = CuratorFrameworkFactory.builder().connectString(host)
// .sessionTimeoutMs(sessionTimeOut).retryPolicy(retryPolicy).namespace("base").build();//不设置namespaceclient = CuratorFrameworkFactory.builder().connectString(host).sessionTimeoutMs(sessionTimeOut).retryPolicy(retryPolicy).build();client.start();}/*** 创建节点* @param path*/public void createNode(String path){try {Stat stat = client.checkExists().forPath(path);client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());String context = new String(client.getData().storingStatIn(stat).forPath(path));System.out.println("success create znode: " + path + "节点内容为:"+context);Thread.sleep(Integer.MAX_VALUE);//创建成功后需要阻塞一下线程保持回话} catch (Exception e) {System.out.println("fail create znode: " + path);}}/*** 更新节点数据* @param path*/public void updateNode(String path){try {Stat stat = client.checkExists().forPath(path);System.out.println("初始的版本号:"+stat.getVersion());Stat stat1 = client.setData().withVersion(stat.getVersion()).forPath(path,"songzl".getBytes());System.out.println("更新后的版本号:"+stat1.getVersion());String context = new String(client.getData().storingStatIn(stat).forPath(path));System.out.println("success set node data 路径为:" +path+ "更新的内容: " + context);} catch (Exception e) {System.out.println("Fail set node data " + e.getMessage());}}/*** 删除节点数据* @param path*/public void deleteNode(String path){try {Stat stat = client.checkExists().forPath(path);client.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath(path);System.out.println("success delete znode " + path);} catch (Exception e) {System.out.println("fail delete znode " + path);}}/*** 获取节点的数据* @param path*/public void getNode(String path){try {Stat stat = client.checkExists().forPath(path); System.out.println(stat); String context = new String(client.getData().storingStatIn(stat).forPath(path));System.out.println("success get node data:"+context);} catch (Exception e) {System.out.println("fail get node data");}}public static void main(String[] args) throws Exception {String path = "/zk-book/c1";Create_Node_Sample createNodeSample = new Create_Node_Sample();createNodeSample.getCuratorFrameworkByBuilder();createNodeSample.createNode(path);}}
总结:
方法名 | 描述 |
---|---|
create() | 开始创建操作, 可以调用额外的方法(比如方式mode 或者后台执行background) 并在最后调用forPath()指定要操作的ZNode |
delete() | 开始删除操作. 可以调用额外的方法(版本或者后台处理version or background)并在最后调用forPath()指定要操作的ZNode |
checkExists() | 开始检查ZNode是否存在的操作. 可以调用额外的方法(监控或者后台处理)并在最后调用forPath()指定要操作的ZNode |
getData() | 开始获得ZNode节点数据的操作. 可以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的ZNode |
setData() | 开始设置ZNode节点数据的操作. 可以调用额外的方法(版本或者后台处理) 并在最后调用forPath()指定要操作的ZNode |
getChildren() | 开始获得ZNode的子节点列表。 以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的ZNode |
inTransaction() | 开始是原子ZooKeeper事务. 可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交 |
3.5 异步接口
如同Zookeeper原生API提供了异步接口,Curator也提供了异步接口。在Zookeeper中,所有的异步通知事件处理都是由EventThread这个线程来处理的,EventThread线程用于串行处理所有的事件通知,其可以保证对事件处理的顺序性,但是一旦碰上复杂的处理单元,会消耗过长的处理时间,从而影响其他事件的处理,Curator允许用户传入Executor实例,这样可以将比较复杂的事件处理放到一个专门的线程池中去。
package com.hust.grid.leesf.curator;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;public class Create_Node_Background_Sample {//服务端ip和端口号private String host = "127.0.0.1:2181";//session超时时间private static final int sessionTimeOut = 5000;//连接的超时时间private static final int connectTimeOut = 3000;//初始化Curator客户端private static CuratorFramework client = null;//重试策略:重试时间每间隔1000毫秒,最大重试次数3private RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);//对执行中的线程进行管理,等待线程完成某些操作后,再对此线程做处理(起到过河拆桥、卸磨杀驴的作用)static CountDownLatch semaphore = new CountDownLatch(2);//创建一个线程池,此线程池共享队列中的任务,直到队列中所有任务处理完(只要队列中有任务,就不停不休的执行,除非手动杀死线程)static ExecutorService tp = Executors.newFixedThreadPool(2);//第一种方式新建客户端:服务端ip和端口,session超时时间,连接的超时时间,重试策略实例public void getCuratorFrameworkByNewClient(){client = CuratorFrameworkFactory.newClient(host, sessionTimeOut, connectTimeOut, retryPolicy);//大多数的方法在客户端启动之后才能工作client.start();}//第二种方式新建客户端:这里设置的“base”被作为此连接的根路径使用public void getCuratorFrameworkByBuilder(){
// client = CuratorFrameworkFactory.builder().connectString(host)
// .sessionTimeoutMs(sessionTimeOut).retryPolicy(retryPolicy).namespace("base").build();//不设置namespaceclient = CuratorFrameworkFactory.builder().connectString(host).sessionTimeoutMs(sessionTimeOut).retryPolicy(retryPolicy).build();client.start();}public static void main(String[] args) throws Exception {Create_Node_Background_Sample createNodeBackgroundSample = new Create_Node_Background_Sample();createNodeBackgroundSample.getCuratorFrameworkByBuilder();System.out.println("Main thread: " + Thread.currentThread().getName());String path = "/zk-book";//方法一:此方法使用一个线程池当做后台执行器,可以将比较复杂的事件处理放到一个专门的线程池中去client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println("使用专门的线程池执行:event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());semaphore.countDown();}}, tp).forPath(path, "init".getBytes());//方法二:普通的异步回调函数创建一个临时节点,EventThread线程用于串行处理所有的事件通知,其可以保证对事件处理的顺序性,但是一旦碰上复杂的处理单元,会消耗过长的处理时间client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println("不适用线程池执行:event[code: " + event.getResultCode() + ", type: " + event.getType() + "]" + ", Thread of processResult: " + Thread.currentThread().getName());semaphore.countDown();}}).forPath(path, "init".getBytes());semaphore.await();tp.shutdown();}
}
3.6 Curator除了提供很便利的API,还提供了一些典型的应用场景,开发人员可以使用参考更好的理解如何使用Zookeeper客户端,所有的都在recipes包中,只需要在pom.xml中添加如下依赖即可。
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.4.2</version>
</dependency>
3.6.1 curator-recipes对节点的监听:一下是我代码测试的结论,若有误,请大家指正。
NodeCache将节点数据保存到本地缓存中,给这些数据注册一个监听器,当被新增、更新时触发监听器,(我测试时发现删除时监听器不触发);
NodeCache节点缓存不是线程安全的,在不同步的情况下不能保持同步;当多线程更新数据时必须使用版本号,避免错误覆盖其他进程数据;
NodeCache注册一个监听器,若session不超时、监听器没有被移除,此监听器一直有效并且可以重复使用(多次更新节点数据均被成功触发);
NodeCache也可以同时注册多个监听器,session不超时、监听器没有被移除,所有的监听器也都是可以正常被触发;
package com.hust.grid.leesf.curator;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
/*** NodeCache将节点数据保存到本地缓存中,给这些数据注册一个监听器,当被增删改时触发监听器;* 需要注意:这个缓存不是线程安全的,不能保证同步;当更新数据时必须使用版本号,避免数据错误覆盖;* NodeCache的监听器可以重复添加多个,并且都会触发;也可以移除,若不移除则可一直使用;*/
public class NodeCache_Sample {//服务端ip和端口号private String host = "127.0.0.1:2181";//session超时时间private static final int sessionTimeOut = 5000;//连接的超时时间private static final int connectTimeOut = 3000;//初始化Curator客户端private static CuratorFramework client = null;//重试策略:重试时间每间隔1000毫秒,最大重试次数3private RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);//对执行中的线程进行管理,等待线程完成某些操作后,再对此线程做处理(起到过河拆桥、卸磨杀驴的作用)static CountDownLatch semaphore = new CountDownLatch(1);//创建一个线程池,此线程池共享队列中的任务,直到队列中所有任务处理完(只要队列中有任务,就不停不休的执行,除非手动杀死线程)static ExecutorService tp = Executors.newFixedThreadPool(1);//第一种方式新建客户端:服务端ip和端口,session超时时间,连接的超时时间,重试策略实例public void getCuratorFrameworkByNewClient(){client = CuratorFrameworkFactory.newClient(host, sessionTimeOut, connectTimeOut, retryPolicy);//大多数的方法在客户端启动之后才能工作client.start();}//第二种方式新建客户端:这里设置的“base”被作为此连接的根路径使用public void getCuratorFrameworkByBuilder(){
// client = CuratorFrameworkFactory.builder().connectString(host)
// .sessionTimeoutMs(sessionTimeOut).retryPolicy(retryPolicy).namespace("base").build();//不设置namespaceclient = CuratorFrameworkFactory.builder().connectString(host).sessionTimeoutMs(sessionTimeOut).retryPolicy(retryPolicy).build();client.start();}/*** path路径的节点数据放到NodeCache的本地缓存中,并且给nodeCache添加一个监听;* 不使用单独的线程池处理* @param path * @throws Exception*/public void nodeCacheAddListener(String path) throws Exception{final NodeCache cache = new NodeCache(client, path, false);cache.start(true);cache.getListenable().addListener(new NodeCacheListener() {public void nodeChanged() throws Exception {System.out.println("Node data update, new data: " + new String(cache.getCurrentData().getData())+",线程名字:"+Thread.currentThread().getName());}});}//封装nodeCacheAddListener方法public void warpNodeCacheAddListener(String path) throws Exception{getCuratorFrameworkByBuilder();//实例化client//path节点的缓存,创建一个监听器,不使用线程池,监听器可以注册多个nodeCacheAddListener(path);//创建时也调用监听器(之前还没有path节点,此时才创建,但是缓存节点的监听器被触发了)client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());//更行path节点数据时,缓存节点的监听器被触发了client.setData().forPath(path, "songzl".getBytes());//删除path节点时,不触发监听(因为节点给删除了,监听也就被移除了,还调用个毛线)client.delete().deletingChildrenIfNeeded().forPath(path);}/*** path路径的节点数据放到NodeCache的本地缓存中,并且给nodeCache添加一个监听;* 使用单独的线程池处理* @param path * @throws Exception*/public void nodeCacheAddListenerExecutor(String path) throws Exception{final NodeCache cache = new NodeCache(client, path, false);cache.start(true);cache.getListenable().addListener(new NodeCacheListener() {public void nodeChanged() throws Exception {System.out.println("Node data update, new data: " + new String(cache.getCurrentData().getData())+",线程名字:"+Thread.currentThread().getName());semaphore.countDown();}},tp);}//封装nodeCacheAddListenerExecutor方法public void warpNodeCacheAddListenerExecutor(String path) throws Exception{getCuratorFrameworkByBuilder();//实例化client//path节点的缓存,创建一个监听器,使用线程池,监听器可以注册多个nodeCacheAddListenerExecutor(path);//创建时也调用监听器(之前还没有path节点,此时才创建,但是缓存节点的监听器被触发了)client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());//更行path节点数据时,缓存节点的监听器被触发了client.setData().forPath(path, "songzl".getBytes());//删除path节点时,不触发监听(因为节点给删除了,监听也就被移除了,还调用个毛线)client.delete().deletingChildrenIfNeeded().forPath(path);semaphore.await();tp.shutdown();}public static void main(String[] args) throws Exception {String path = "/zk-book/nodecache";//实例化clientNodeCache_Sample nodeCacheSample = new NodeCache_Sample();nodeCacheSample.warpNodeCacheAddListener(path);Thread.sleep(Integer.MAX_VALUE);}
}
3.6.2 curator-recipes对 子节点监听,3.6.1的节点性质子节点均具备,还有下面的几种特性;
给子节点添加的监听器,参数event提供了增删改的类型判断,因此字节点的增删改均会触发监听器;
给子节点注册监听器使用的path是父节点的绝对路径,当父节点路径下有子节点增改删时,触发子节点的监听器,操作父节点时不会触发;
在对子节点修改过于频繁时,若不阻塞线程,会丢失监听器调用次数(我用代码测试时发现总是少调用,原来是对子节点操作过于频繁,导致监听器调用次数丢失);
/*** 给子节点添加监听器* @param path* @throws Exception*/public void childNodeCacheAddListener(String path) throws Exception{PathChildrenCache cache = new PathChildrenCache(client, path, true);cache.start(StartMode.POST_INITIALIZED_EVENT);cache.getListenable().addListener(new PathChildrenCacheListener() {public void childEvent(CuratorFramework client,PathChildrenCacheEvent event) throws Exception {switch (event.getType()) {case CHILD_ADDED:System.out.println("CHILD_ADDED," + event.getData().getPath());break;case CHILD_UPDATED:System.out.println("CHILD_UPDATED," + event.getData().getPath());break;case CHILD_REMOVED:System.out.println("CHILD_REMOVED," + event.getData().getPath());break;default:break;}}});}public static void main(String[] args) throws Exception {String path = "/zk-demo";//实例化clientNodeCache_Sample nodeCacheSample = new NodeCache_Sample();nodeCacheSample.getCuratorFrameworkByBuilder();nodeCacheSample.childNodeCacheAddListener(path);client.create().withMode(CreateMode.PERSISTENT).forPath(path);Thread.sleep(1000);client.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");Thread.sleep(1000);//必须阻塞下线程,避免快速操作来不及触发监听器就被下一个覆盖client.setData().forPath(path + "/c1", "songzl".getBytes()); System.out.println("第一次修改子节点内容:"+ new String(client.getData().forPath(path + "/c1")));Thread.sleep(1000);//必须阻塞下线程,避免快速操作来不及触发监听器就被下一个覆盖client.setData().forPath(path + "/c1", "wangxn".getBytes()); System.out.println("第二次修改子节点内容"+ new String(client.getData().forPath(path + "/c1")));Thread.sleep(1000);//必须阻塞下线程,避免快速操作来不及触发监听器就被下一个覆盖client.delete().forPath(path + "/c1");client.delete().forPath(path);}
打印结果:由下图可以明确得出结论
3.7 Master选举,这个是zookeeper的核心之一;
借助Zookeeper,开发者可以很方便地实现Master选举功能,其大体思路如下:选择一个根节点,如/master_select,多台机器同时向该节点创建一个子节点/master_select/lock,利用Zookeeper特性,最终只有一台机器能够成功创建,成功的那台机器就是Master。
package com.hust.grid.leesf.curator;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.retry.ExponentialBackoffRetry;public class Recipes_MasterSelect {//服务端ip和端口号private static String host = "127.0.0.1:2181";//重试策略:重试时间每间隔1000毫秒,最大重试次数3private static RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);//Curator客户端static CuratorFramework client = CuratorFrameworkFactory.builder().connectString(host).retryPolicy(retryPolicy).build();//主服务路径static String master_path = "/curator_recipes_master_path";public static void main(String[] args) throws Exception {client.start();LeaderSelector selector = new LeaderSelector(client, master_path, new LeaderSelectorListenerAdapter() {public void takeLeadership(CuratorFramework client) throws Exception {System.out.println("成为Master角色");Thread.sleep(3000);//模拟Master的业务System.out.println("完成Master操作,释放Master权利");}});selector.autoRequeue();selector.start();Thread.sleep(Integer.MAX_VALUE);}}
以上结果会反复循环,并且当一个应用程序完成Master逻辑后,另外一个应用程序的相应方法才会被调用,即当一个应用实例成为Master后,其他应用实例会进入等待,直到当前Master挂了或者推出后才会开始选举Master。
3.8 分布式锁也是zookeeper的核心之一(实现数据一致性的原理)
为了保证数据的一致性,经常在程序的某个运行点需要进行同步控制。以流水号生成场景为例,普通的后台应用通常采用时间戳方式来生成流水号,但是在用户量非常大的情况下,可能会出现并发问题。
package com.hust.grid.leesf.curator;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
/*** zookeeper实现分布式锁*/
public class Recipes_Lock {static String lock_path = "/curator_recipes_lock_path";static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181").retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();public static void main(String[] args) throws Exception {client.start();/*** InterProcessMutex是跨JVM的互斥锁,该锁是由zookeeper控制;* 重点是它实现了分布式锁,当不同服务器的进程对同一个节点操作时是安全的受锁控制的;* 并且此分布式锁是绝对公平的,用户都是按照请求的顺序获取互斥锁,依次执行;*/final InterProcessMutex lock = new InterProcessMutex(client, lock_path);final CountDownLatch down = new CountDownLatch(1);for (int i = 0; i < 30; i++) {new Thread(new BuildOrderNo(lock,down)).start();}Thread.sleep(2000);//模拟生成订单前的其他业务,当操作完后开始生成订单down.countDown();}
}
/*** 生成订单号类* @author songzl**/
class BuildOrderNo implements Runnable{private InterProcessMutex lock;private CountDownLatch down;public BuildOrderNo(InterProcessMutex lock,CountDownLatch down){this.lock = lock;this.down = down;}public void run() {try {down.await();lock.acquire();//获取互斥锁,检测当前线程是否可以执行SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");String orderNo = sdf.format(new Date());System.out.println("生成的订单号是 : " + orderNo+"子线程名字:"+Thread.currentThread().getName());//如果调用线程是获得它的线程,那么执行一个互斥锁。如果线程已经多次调用获取,当这个方法返回时,互斥锁仍然会被保留。lock.release();//释放互斥锁,具备检查功能} catch (Exception e) {e.printStackTrace();}}
}
3.9分布式计数器
分布式计数器的典型应用是统计系统的在线人数,借助Zookeeper也可以很方便实现分布式计数器功能:指定一个Zookeeper数据节点作为计数器,多个应用实例在分布式锁的控制下,通过更新节点的内容来实现计数功能。
package com.hust.grid.leesf.curator;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
/*** 分布式计数器*/
public class Recipes_DistAtomicInt {static String distatomicint_path = "/curator_recipes_distatomicint_path";static CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181").retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();public static void main(String[] args) throws Exception {client.start();/*** 创建一个增量的计数器;它首先尝试使用乐观锁定,如果失败就选择一个可选的互斥,对于乐观和互斥,可以使用重试策略重试增量。*/DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(client, distatomicint_path,new RetryNTimes(3, 1000));AtomicValue<Integer> rc = atomicInteger.add(1);System.out.println(rc.preValue());System.out.println("Result: " + rc.succeeded());System.out.println(rc.postValue());}
}
转载于:https://www.cnblogs.com/aoshicangqiong/p/7912576.html
zookeeper系列(三)zookeeper的使用--开源客户端相关推荐
- Zookeeper整合JAVA应用之Curator开源客户端使用案例
Curator是Netflix公司开源的一套ZooKeeper客户端框架,和ZkClient一样它解决了非常底层的细节开发工作,包括连接.重连.反复注册Watcher的问题以及NodeExistsEx ...
- 【Zookeeper系列】ZooKeeper管理分布式环境中的数据(转)
原文地址:https://www.cnblogs.com/sunddenly/p/4092654.html 引言 本节本来是要介绍ZooKeeper的实现原理,但是ZooKeeper的原理比较复杂,它 ...
- Redis系列(三)-Redis发布订阅及客户端编程
阅读目录 发布订阅模型 Redis中的发布订阅 客户端编程示例 0.3版本Hredis 发布订阅模型 在应用级其作用是为了减少依赖关系,通常也叫观察者模式.主要是把耦合点单独抽离出来作为第三方,隔离易 ...
- Zookeeper系列(二)、核心原理
上一篇我们介绍了Zookeeper的一些基础知识,本篇来讲解zk内部的一些核心原理,帮助我们更好的理解zk的工作机制. 目录 选举机制 Leader选举流程 Leader选举原理 Watch机制 会话 ...
- Zookeeper详解(三)——开源客户端curator
开源客户端curator (true re de) curator是Netflix公司开源的一个zookeeper客户端,后捐献给apache,curator框架在zookeeper原生API接口上进 ...
- [转载]Zookeeper开源客户端框架Curator简介
转载声明:http://macrochen.iteye.com/blog/1366136 Zookeeper开源客户端框架Curator简介 博客分类: Distributed Open Source ...
- Zookeeper之开源客户端ZkClient
ZkClient是由Datameer的工程师开发的开源客户端,对Zookeeper的原生API进行了包装,实现了超时重连.Watcher反复注册等功能. ZKClient版本及源码 maven依赖 Z ...
- ZooKeeper系列(三)
前面虽然配置了集群模式的Zookeeper,但是为了方面学建议在伪分布式模式的Zookeeper学习Zookeeper的shell命令. 一.Zookeeper的四字命令 Zookeeper支持某些特 ...
- ZooKeeper系列(4):ZooKeeper的配置文件详解
ZooKeeper系列文章:https://www.cnblogs.com/f-ck-need-u/p/7576137.html#zk zkServer.sh读取的默认配置文件是$ZOOKEEPER_ ...
- 微服务系列之ZooKeeper注册中心和Nacos注册中心Nacos和Zookeeper对比
一.ZooKeeper注册中心 Zookeeper 是 Apache Hadoop 的子项目,是一个树型的目录服务,支持变更推送,适合作为 Dubbo 服务的注册中心,工业强度较高,可用于生产环境,推 ...
最新文章
- 日本搞出奇妙充电屋,坐在任意位置都能隔空充电!研究登上Nature子刊
- 在子线程中弹土司的一段代码
- Spring.net 模块组成
- flowvisor 命令_mininet+FlowVisor+OpenDayLight环境搭建及实验一
- 异常规范之异常的概念
- 第4章 Python 数字图像处理(DIP) - 频率域滤波1 - 傅里叶级数和变换简史
- 前端基础-git(三):git和GitHub的一些基础操作
- 打不过 Chrome 的 Firefox,我为什么要选择?
- label字符自动换行(转自网络)
- WPF太阳、地球、月球运动轨迹模拟
- Kafka从上手到实践 - Kafka集群:配置Broker | 凌云时刻
- 安装nvidia digits
- sublime 配置快捷键
- Leetcode每日一题:使括号有效的最少添加(括号匹配)
- javacpp-opencv图像处理系列:国内车辆牌照检测识别系统(万份测试准确率79.7%以上)
- linux一键分区脚本,【Shell】Linux中分区脚本
- Python之列表(学习笔记)
- 椭圆 标准方程 离心率 圆的标准方程
- python怎么分行读取txt文件_python怎么读取txt文件内容
- android颜色识别
热门文章
- bootstrap4 图标和文字行内对齐
- Kubernetes 小白学习笔记(4)--kubernetes是什么
- FISCO BCOS简介
- Hyperledger Fabric 或 Composer的configtx.yaml配置文件解析
- star cd linux安装,STAR-CD的Linux版安装详细过程
- php自定义按钮,vue实现自定义按钮的方法介绍(附代码)
- mysql handbook_MySQL 8 Administrator’s Guide
- 极简桌面 android 2.3,极简桌面(手机桌面)V3.1 for android 免费版
- php 面向对象编程(class)之从入门到崩溃 高级篇
- php 代码的分离和调用及注意事项(版本、变量及cookie与session的区别,PHP包含文件函数include、include_once、require、require_once区别和总结)