一、Curator 客户端使用

Curator是 Netflix公司开源的一套ZooKeeper客户端框架,和 ZkClient一样它解决了非常底层的细节开发工作,包括连接、重连、反复注册Watcher的问题以及 NodeExistsException异常等。

Curator还为 ZooKeeper客户端框架提供了一些比较普遍的、开箱即用的、分布式开发用的解决方案,例如Recipe、共享锁服务、Master选举机制和分布式计算器等,帮助开发者避免了“重复造轮子”的无效开发工作。

Curator 客户端使用更加方便,功能更加强大,目前应用更加广泛。

官方网站:https://curator.apache.org/

添加依赖:

  • curator-framework:是对ZooKeeper的底层API的一些封装。
  • curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。
    <!-- Zookeeper 原生API客户端 --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.6.3</version></dependency><!-- Curator 客户端   --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.1.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.1.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency>

1、创建一个客户端实例

在使用 curator-framework包操作 ZooKeeper前,首先要创建一个客户端实例(CuratorFramework类型的对象)。

有两种方法:

  • 使用工厂类CuratorFrameworkFactory的静态 newClient()方法。
  • 使用工厂类CuratorFrameworkFactory的静态 builder构造者方法。
public class CuratorClientConnectUtils {private static String CONNECT_STR = "192.168.xxx.xxx:2181"; //集群模式下用逗号隔开private static class InnerClass{private static CuratorFramework client = clientConnect2();}public static CuratorFramework getInstance(){return InnerClass.client;}public static void main(String[] args){//CuratorFramework client = clientConnect1();CuratorFramework client = clientConnect2();//启动客户端,必须要有client.start();System.out.println("==CuratorFramework==" + client);}/*** 使用工厂类CuratorFrameworkFactory的静态newClient()方法。* @throws Exception*/public static CuratorFramework clientConnect1() {// 重试策略, 失败重连3次RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 3);//创建客户端实例CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECT_STR, retryPolicy);return client;}/*** 使用工厂类CuratorFrameworkFactory的静态builder构造者方法。* @throws Exception*/public static CuratorFramework clientConnect2() {// 重试策略, 失败重连3次RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 3);//创建客户端实例CuratorFramework client = CuratorFrameworkFactory.builder().connectString(CONNECT_STR).sessionTimeoutMs(3000) // 会话超时时间.connectionTimeoutMs(50000) // 连接超时时间.retryPolicy(retryPolicy).namespace("Curator_Workspace") // 指定隔离名称,表示所有节点的操作都会在该工作空间下进行。不指定时,使用自定义的节点path.build();return client;}
}

部分参数说明:

  • connectionString: 服务器地址列表,集群用逗号分隔, 如 host1:port1,host2:port2,host3:port3 。
  • retryPolicy: 重试策略,当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。
    Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部,可以通过判断服务器返回的keeperException 的状态代码来判断是否进行重试处理,如果返回的是 OK 表示一切操作都没有问题,而 SYSTEMERROR 表示系统或服务端错误。
  • 超时时间: Curator 客户端创建过程中,有两个超时时间的设置。
    • 一个是sessionTimeoutMs 会话超时时间,用来设置该条会话在 ZooKeeper 服务端的失效时间。
    • 一个是 connectionTimeoutMs 客户端创建会话的超时时间,用来限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。
      sessionTimeoutMs 作用在服务端,而 connectionTimeoutMs 作用在客户端。

2、CRUD实例

1)创建节点

在 Curator 中,客户端可以使用 create 方法 创建数据节点,并通过 withMode 方法 指定节点类型(持久化节点,临时节点,顺序节点,临时顺序节点,持久化顺序节点等),默认是持久化节点,之后调用 forPath 方法 来指定节点的路径和数据信息。 也支持级联创建节点需要指定 creatingParentsIfNeeded方法

2)获取数据

客户端使用 getData()方法 获取节点数据。

3)更新节点
客户端使用 setData() 方法更新 ZooKeeper 服务上的数据节点,在 setData 方法 的后边,通过 forPath 函数来指定更新的数据节点路径以及要更新的数据。

4)删除节点

客户端使用 delete方法 删除节点。

  • guaranteed方法:主要作用是保障删除成功。
    其底层工作方式是:只要该客户端的会话有效,就会在后台持续发起删除请求,直到该数据节点在 ZooKeeper 服务端被删除。
  • deletingChildrenIfNeeded方法:指定了该函数后,系统在删除该数据节点的时候会以递归的方式直接删除其子孙节点。

5)异步接口

Curator 引入了 BackgroundCallback 接口 ,用来处理服务器端返回来的信息,这个处理过程是在异步线程中调用,默认在 EventThread 中调用,也可以自定义线程池。

客户端可以通过 inBackground方法添加 BackgroundCallback 接口来异步处理操作。异步处理回调我们可以在 processResult方法操作。实例中异步创建节点使用了默认的线程。

CRUD实例如下:

public class CRUDService {public static void main(String[] args) throws Exception {CuratorFramework client = CuratorClientConnectUtils.getInstance();// 启动客户端client.start();String path1 = "/curator_znode/syncCreateNode.node1/node1"; //允许级联创建String path2 = "/curator_znode/asyncCreateNode.node1";String path3 = "/curator_znode2";//createOneNode(client, path3, "value path");syncCreateNode(client, path1, "VALUE11");//asyncCreateNode(client, path2, "VALUE22");//asyncCreateNode2(client, path2, "VALUE22");//String nodeData = getNodeData(client, path1);//System.out.println("getNodeData, path1=:" + nodeData);updateNodeData(client, path1, "value33333");//deleteNode(client, path3);String path = "/curator_znode";//deleteChildrenIfNeeded(client, path);}/*** 创建单节点** @param client* @param path - 如果是级联节点,父节点不存在时,会报错:KeeperErrorCode = NoNode for /curator_znode/asyncCreateNode.node1* @param data* @throws Exception*/public static void createOneNode(CuratorFramework client, String path, String data) throws Exception {String path1 = client.create().withMode(CreateMode.PERSISTENT).forPath(path, data.getBytes(StandardCharsets.UTF_8));System.out.println("createOneNode:path1=" + path1);}/*** 同步级联创建节点(原生API不支持/后台客户端也不支持,但是Curator支持)*  父节点不存在,则会创建*  最后的子节点如果存在,则会报错:KeeperErrorCode = NodeExists for /curator_znode/syncCreateNode.node1/node1* @param client* @param path* @param data*/public static void syncCreateNode(CuratorFramework client, String path, String data) {try {String path1 = client.create().creatingParentsIfNeeded() //级联创建,父节点不存在,则会创建.withMode(CreateMode.PERSISTENT) //持久节点.forPath(path, data.getBytes(StandardCharsets.UTF_8));System.out.println("syncCreateNode:path1=" + path1);} catch (Exception e) {e.printStackTrace();}}/*** 异步级联创建节点* @param client* @param path* @param data*/public static void asyncCreateNode(CuratorFramework client, String path, String data) throws Exception {client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).inBackground(new BackgroundCallback() {// 创建成功的回调@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println("asyncCreateNode=========" + event.getName() + ":" + event.getPath());}}).forPath(path, data.getBytes(StandardCharsets.UTF_8));TimeUnit.MILLISECONDS.sleep(100);}/*** 异步级联创建节点* @param client* @param path* @param data*/public static void asyncCreateNode2(CuratorFramework client, String path, String data) throws Exception {ExecutorService executorService = Executors.newSingleThreadExecutor();client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).inBackground(new BackgroundCallback() {// 创建成功的回调@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println("asyncCreateNode====使用自定义线程池=====" + event.getName() + ":" + event.getPath());}}, executorService).forPath(path, data.getBytes(StandardCharsets.UTF_8));TimeUnit.MILLISECONDS.sleep(100);}/*** 获取节点数据* @param client* @param path*/public static String getNodeData(CuratorFramework client, String path) throws Exception {byte[] data = client.getData().storingStatIn(new Stat()).forPath(path);return new String(data, StandardCharsets.UTF_8);}/*** 修改节点数据* @param client* @param path* @param data*/public static void updateNodeData(CuratorFramework client, String path, String data) throws Exception {Stat stat = client.setData()//.withVersion(-1) //可以根据需要使用.forPath(path, data.getBytes(StandardCharsets.UTF_8));System.out.println("updateNodeData path, stat:" + stat);}/*** 删除该节点,该节点必须为空节点* 如果该节点有子节点会报错:KeeperErrorCode = Directory not empty for /curator_znode* @param client* @param path*/public static void deleteNode(CuratorFramework client, String path) throws Exception {client.delete()//.withVersion(1) // 版本号不匹配时,无法删除,会报错:KeeperErrorCode = BadVersion for /curator_znode.forPath(path); //删除该节点}/*** 级联删除该节点以及子孙节点* @param client* @param path*/public static void deleteChildrenIfNeeded(CuratorFramework client, String path) throws Exception {client.delete().guaranteed() //保证删除:如果删除失败,那么在后端还是继续会删除,直到成功.deletingChildrenIfNeeded()//.withVersion(1).forPath(path); //级联删除}/*** 判断node节点是否存在* @param client* @param nodePath* @return true - 表示节点存在* @throws Exception*/public static boolean checkNodeExists(CuratorFramework client,String nodePath) throws Exception {Stat stat = client.checkExists().forPath(nodePath);System.out.println(null==stat ? "节点不存在" : "节点存在");return null != stat;}}

2.1 报错:KeeperErrorCode = ConnectionLoss

14:49:35.317 [Curator-Framework-0] ERROR org.apache.curator.framework.imps.CuratorFrameworkImpl - Background operation retry gave up
org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss
...
14:49:35.319 [Curator-Framework-0] ERROR org.apache.curator.framework.imps.CuratorFrameworkImpl - Background retry gave up
org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLossat org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:981)
...
14:49:36.313 [Curator-ConnectionStateManager-0] WARN org.apache.curator.framework.state.ConnectionStateManager - Session timeout has elapsed while SUSPENDED. Injecting a session expiration. Elapsed ms: 1002. Adjusted session timeout ms: 1000

报错信息如上,查看了一下:

具体是什么时间超时判断,暂时不清楚。我尝试把会话时间调到3000之后。问题就解决了。

3、Cache缓存监控节点变化

cache是一种缓存机制,可以借助 cache实现监听。

简单来说,cache在客户端缓存了 znode的各种状态,当感知到 zk集群的 znode状态变化,会触发 event事件,注册的监听器会处理这些事件。

curator支持的 cache种类有4种:

  • Path Cache
    Path Cache用来观察ZNode的子节点并缓存状态,但是不会对二级子节点进行监听。如果ZNode的子节点被创建,更新或者删除,那么Path Cache会更新缓存,并且触发事件给注册的监听器。
  • Node Cache
    Node Cache用来观察ZNode自身,如果ZNode节点本身被创建,更新或者删除,那么Node Cache会更新缓存,并触发事件给注册的监听器。
  • Tree Cache
    Tree Cache是上面两种的合体,Tree Cache观察的是自身+所有子节点的所有数据,并缓存所有节点数据。
    TreeCache 使用一个内部类TreeNode来维护这个一个树结构。并将这个树结构与ZK节点 进行了映射。所以TreeCache 可以监听当前节点下所有节点的事件。
  • Curator Cache
    Curator Cache,是在 zk3.6新版本添加的特性,之前版本上面3种监听可以单独使用,在 zk3.6+版本之后,上面3种监听被标记为过时,并使用 CuratorCache取代了上面3种监听

Curator Cache类型是通过 CuratorCache类来实现的,监听器对应的接口为 CuratorCacheListener。

Curator Cache使用实例如下:

public class CuratorCacheTest {public static void main(String[] args) throws Exception {CuratorFramework client = CuratorClientConnectUtils.getInstance();// 启动客户端client.start();String path1 = "/curatorCache1";String path2 = "/curatorCache2";curatorCache1(client, path1);curatorCache2(client, path2);if(CRUDService.checkNodeExists(client, path1)){CRUDService.deleteChildrenIfNeeded(client, path1);//CRUDService.deleteNode(client, path1);}if(CRUDService.checkNodeExists(client, path2)){CRUDService.deleteChildrenIfNeeded(client, path2);//CRUDService.deleteNode(client, path2);}CRUDService.syncCreateNode(client, path1, "VALUE22");CRUDService.asyncCreateNode(client, path2, "VALUE22");CRUDService.updateNodeData(client, path2, "value444");String path22 = path2 + "/node1";CRUDService.syncCreateNode(client, path22, "VALUE22");//让线程休眠(为了方便测试)TimeUnit.MINUTES.sleep(3);}public static void curatorCache1(CuratorFramework zkClient, String path) {CuratorCache curatorCache = CuratorCache.build(zkClient, path);curatorCache.listenable().addListener(new CuratorCacheListener() {@Overridepublic void event(Type type, ChildData oldData, ChildData newdata) {switch (type) {//各种判断case NODE_CREATED:System.out.println("==该节点被创建,type=" + type);break;default:System.out.println("==该节点操作,type=" + type);break;}}});//开启缓存。 这将导致缓存根节点的完整刷新,并为所有找到的节点生成事件,等等。curatorCache.start();}public static void curatorCache2(CuratorFramework zkClient, String path) throws InterruptedException {CuratorCache curatorCache = CuratorCache.builder(zkClient,path).build();//构建监听器://1.node cache:CuratorCacheListener.builder().forNodeCache(new NodeCacheListener(){} );//2.path cache:CuratorCacheListener.builder().forPathChildrenCache();//3.tree cache:CuratorCacheListener.builder().forTreeCache.forTreeCache();CuratorCacheListener listener = CuratorCacheListener.builder().forNodeCache(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println("节点改变了...");}}).build();//添加监听curatorCache.listenable().addListener(listener);//开启缓存curatorCache.start();}
}

在zk服务端,使用命令操作了节点,监听被触发。

– 求知若饥,虚心若愚。

Zookeeper客户端Curator详解相关推荐

  1. 软件架构-zookeeper之curator详解

    为了更好的实现java操作zookeeper服务器,后来出现Curator框架,非常的强大,目前已经是apache的顶级项目,里面提供了更多丰富的操作.例如:session超时重连,主从选举,分布式计 ...

  2. Zookeeper客户端Curator使用详解

    http://www.jianshu.com/p/70151fc0ef5d Zookeeper客户端Curator使用详解 简介 Curator是Netflix公司开源的一套zookeeper客户端框 ...

  3. 【高级篇】详解Zookeeper客户端Curator

    一.序言 之前分享过一篇关于Curotor的基本应用[基础篇]详解Zookeeper客户端Curator,如果对Curator没有了解的可以看看,本文分享关于Curator的一些高级特性,在监听和le ...

  4. 【基础篇】详解Zookeeper客户端Curator

    一.前言 Zookeeper被广泛应用于分布式环境下各种应用程序的协调,而Curator无疑是Zookeeper客户端中的瑞士军刀,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重 ...

  5. ZooKeeper客户端Curator的基本使用

    前提:ZooKeeper版本:3.4.14      Curator版本:2.13.0 1.什么是Curator Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Z ...

  6. 【ZooKeeper】配置文件详解

    ZooKeeper的配置文件详解 zkServer.sh读取的默认配置文件是$ZOOKEEPER_HOME/conf/zoo.cfg.如果要用其它配置文件.如下传递配置文件参数: zkServer.s ...

  7. 什么是ZooKeeper?可以做什么?ZooKeeper分布式事务详解篇

    前言 什么是ZooKeeper,你真的了解它吗.我们一起来看看吧~ 一.什么是 ZooKeeper? ZooKeeper 是 Apache 的一个顶级项目,为分布式应用提供高效.高可用的分布式协调服务 ...

  8. SVN的Windows和Linux客户端操作详解

    SVN的Windows和Linux客户端操作详解 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 一.Windows客户端操作 1.安装SVN客户端 a>.去官网下载svn软件 ...

  9. LCS2005客户端配置详解:LCS2005系列之二

    LCS2005客户端配置详解 上篇博文中我们介绍了如何部署LCS2005标准版,今天我们要再进一步,配置好LCS2005的客户端,使用户能够使用LCS提供的即时通讯服务来进行彼此间的信息交流.实验拓扑 ...

最新文章

  1. 用for实现Go的while和do...while
  2. 关于对FLASH开发,starling、starling feathers、starling MVC框架的理解
  3. 在用数据绑定的时候我为什么不能把焦点移出(Tab out)我的控件?(译)
  4. 产品中的实名认证该怎么设计?
  5. Java文件合并变得语义化
  6. mysql 主从 now_MySql主从复制搭建方法
  7. jdbc mysql url写法_MySQL第04篇:JDBC
  8. 独立站的优势是什么?独立站注意事项有哪些?
  9. 轮环(Ouroboros)世界观介绍,摘自Guide Book
  10. vue导入音乐_现在哪个软件听音乐完全免费?
  11. jupyterlab nb_conda 增加 删除_Jupyter lab
  12. 病毒周报(100201至100207)
  13. LeetCode笔记
  14. mysql 重复最多的_MySQL查询重复出现次数最多的记录
  15. 中科院ICTCLAS分词汉语词性标记集
  16. Mendix批量发送邮件给多人待办事项提醒
  17. mysql查询低效语句_MySQL数据库中查找执行从命慢的SQL语句
  18. 如何在没有 USB 数据线的情况下使用 Android Studio 在手机中安装 Android
  19. 通过Eclipse创建一个Project ,Java Project 和Tomcat Project 生成的目录和文件
  20. 司空见惯 - 神奇的数字7

热门文章

  1. C++ for循环嵌套 实现 打印10行10列星图
  2. 案例 | 巴别鸟为弘睿构建企业知识库
  3. 最让我感动的图片...
  4. 【TypeScript】TS与Vue
  5. Flying-Saucer使用HTML或者FTL(Freemarker模板)生成PDF
  6. python将word表格转写入excel
  7. 我的创作纪念日---祈愿
  8. 使用pytorch搭建AlexNet网络模型
  9. EXCEL如何实现两个表的相应数据关联
  10. 事业单位工资计算机公积金计算,求问事业单位住房公积金如何计算