Zookeeper Java 客户端 ——Apache Curator

一、基本依赖

Curator 是 Netflix 公司开源的一个 Zookeeper 客户端,目前由 Apache 进行维护。与 Zookeeper 原生客户端相比,Curator 的抽象层次更高,功能也更加丰富,是目前 Zookeeper 使用范围最广的 Java 客户端。本篇文章主要讲解其基本使用,项目采用 Maven 构建,以单元测试的方法进行讲解,相关依赖如下:

<dependencies><!--Curator 相关依赖--><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.0.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.0.0</version></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.13</version></dependency><!--单元测试相关依赖--><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency>
</dependencies>

完整源码见本仓库: https://github.com/heibaiying/BigData-Notes/tree/master/code/Zookeeper/curator

二、客户端相关操作

2.1 创建客户端实例

这里使用 @Before 在单元测试执行前创建客户端实例,并使用 @After 在单元测试后关闭客户端连接。

public class BasicOperation {private CuratorFramework client = null;private static final String zkServerPath = "192.168.0.226:2181";private static final String nodePath = "/hadoop/yarn";@Beforepublic void prepare() {// 重试策略RetryPolicy retryPolicy = new RetryNTimes(3, 5000);client = CuratorFrameworkFactory.builder().connectString(zkServerPath).sessionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("workspace").build();  //指定命名空间后,client 的所有路径操作都会以/workspace 开头client.start();}@Afterpublic void destroy() {if (client != null) {client.close();}}
}

2.2 重试策略

在连接 Zookeeper 时,Curator 提供了多种重试策略以满足各种需求,所有重试策略均继承自 RetryPolicy 接口,如下图:

这些重试策略类主要分为以下两类:

  • RetryForever :代表一直重试,直到连接成功;
  • SleepingRetry : 基于一定间隔时间的重试。这里以其子类 ExponentialBackoffRetry 为例说明,其构造器如下:
/*** @param baseSleepTimeMs 重试之间等待的初始时间* @param maxRetries 最大重试次数* @param maxSleepMs 每次重试间隔的最长睡眠时间(毫秒)*/
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

2.3 判断服务状态

@Test
public void getStatus() {CuratorFrameworkState state = client.getState();System.out.println("服务是否已经启动:" + (state == CuratorFrameworkState.STARTED));
}

三、节点增删改查

3.1 创建节点

@Test
public void createNodes() throws Exception {byte[] data = "abc".getBytes();client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)      //节点类型.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(nodePath, data);
}

创建时可以指定节点类型,这里的节点类型和 Zookeeper 原生的一致,全部类型定义在枚举类 CreateMode 中:

public enum CreateMode {// 永久节点PERSISTENT (0, false, false),//永久有序节点PERSISTENT_SEQUENTIAL (2, false, true),// 临时节点EPHEMERAL (1, true, false),// 临时有序节点EPHEMERAL_SEQUENTIAL (3, true, true);....
}

2.2 获取节点信息

@Test
public void getNode() throws Exception {Stat stat = new Stat();byte[] data = client.getData().storingStatIn(stat).forPath(nodePath);System.out.println("节点数据:" + new String(data));System.out.println("节点信息:" + stat.toString());
}

如上所示,节点信息被封装在 Stat 类中,其主要属性如下:

public class Stat implements Record {private long czxid;private long mzxid;private long ctime;private long mtime;private int version;private int cversion;private int aversion;private long ephemeralOwner;private int dataLength;private int numChildren;private long pzxid;...
}

每个属性的含义如下:

状态属性 说明
czxid 数据节点创建时的事务 ID
ctime 数据节点创建时的时间
mzxid 数据节点最后一次更新时的事务 ID
mtime 数据节点最后一次更新时的时间
pzxid 数据节点的子节点最后一次被修改时的事务 ID
cversion 子节点的更改次数
version 节点数据的更改次数
aversion 节点的 ACL 的更改次数
ephemeralOwner 如果节点是临时节点,则表示创建该节点的会话的 SessionID;如果节点是持久节点,则该属性值为 0
dataLength 数据内容的长度
numChildren 数据节点当前的子节点个数

2.3 获取子节点列表

@Test
public void getChildrenNodes() throws Exception {List<String> childNodes = client.getChildren().forPath("/hadoop");for (String s : childNodes) {System.out.println(s);}
}

2.4 更新节点

更新时可以传入版本号也可以不传入,如果传入则类似于乐观锁机制,只有在版本号正确的时候才会被更新。

@Test
public void updateNode() throws Exception {byte[] newData = "defg".getBytes();client.setData().withVersion(0)     // 传入版本号,如果版本号错误则拒绝更新操作,并抛出 BadVersion 异常.forPath(nodePath, newData);
}

2.5 删除节点

@Test
public void deleteNodes() throws Exception {client.delete().guaranteed()                // 如果删除失败,那么在会继续执行,直到成功.deletingChildrenIfNeeded()  // 如果有子节点,则递归删除.withVersion(0)              // 传入版本号,如果版本号错误则拒绝删除操作,并抛出 BadVersion 异常.forPath(nodePath);
}

2.6 判断节点是否存在

@Test
public void existNode() throws Exception {// 如果节点存在则返回其状态信息如果不存在则为 nullStat stat = client.checkExists().forPath(nodePath + "aa/bb/cc");System.out.println("节点是否存在:" + !(stat == null));
}

三、监听事件

3.1 创建一次性监听

和 Zookeeper 原生监听一样,使用 usingWatcher 注册的监听是一次性的,即监听只会触发一次,触发后就销毁。示例如下:

@Test
public void DisposableWatch() throws Exception {client.getData().usingWatcher(new CuratorWatcher() {public void process(WatchedEvent event) {System.out.println("节点" + event.getPath() + "发生了事件:" + event.getType());}}).forPath(nodePath);Thread.sleep(1000 * 1000);  //休眠以观察测试效果
}

3.2 创建永久监听

Curator 还提供了创建永久监听的 API,其使用方式如下:

@Test
public void permanentWatch() throws Exception {// 使用 NodeCache 包装节点,对其注册的监听作用于节点,且是永久性的NodeCache nodeCache = new NodeCache(client, nodePath);// 通常设置为 true, 代表创建 nodeCache 时,就去获取对应节点的值并缓存nodeCache.start(true);nodeCache.getListenable().addListener(new NodeCacheListener() {public void nodeChanged() {ChildData currentData = nodeCache.getCurrentData();if (currentData != null) {System.out.println("节点路径:" + currentData.getPath() +"数据:" + new String(currentData.getData()));}}});Thread.sleep(1000 * 1000);  //休眠以观察测试效果
}

3.3 监听子节点

这里以监听 /hadoop 下所有子节点为例,实现方式如下:

@Test
public void permanentChildrenNodesWatch() throws Exception {// 第三个参数代表除了节点状态外,是否还缓存节点内容PathChildrenCache childrenCache = new PathChildrenCache(client, "/hadoop", true);/** StartMode 代表初始化方式:*    NORMAL: 异步初始化*    BUILD_INITIAL_CACHE: 同步初始化*    POST_INITIALIZED_EVENT: 异步并通知,初始化之后会触发 INITIALIZED 事件*/childrenCache.start(StartMode.POST_INITIALIZED_EVENT);List<ChildData> childDataList = childrenCache.getCurrentData();System.out.println("当前数据节点的子节点列表:");childDataList.forEach(x -> System.out.println(x.getPath()));childrenCache.getListenable().addListener(new PathChildrenCacheListener() {public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {switch (event.getType()) {case INITIALIZED:System.out.println("childrenCache 初始化完成");break;case CHILD_ADDED:// 需要注意的是: 即使是之前已经存在的子节点,也会触发该监听,因为会把该子节点加入 childrenCache 缓存中System.out.println("增加子节点:" + event.getData().getPath());break;case CHILD_REMOVED:System.out.println("删除子节点:" + event.getData().getPath());break;case CHILD_UPDATED:System.out.println("被修改的子节点的路径:" + event.getData().getPath());System.out.println("修改后的数据:" + new String(event.getData().getData()));break;}}});Thread.sleep(1000 * 1000); //休眠以观察测试效果
}

Zookeeper Java 客户端 ——Apache Curator相关推荐

  1. [转载]Zookeeper开源客户端框架Curator简介

    转载声明:http://macrochen.iteye.com/blog/1366136 Zookeeper开源客户端框架Curator简介 博客分类: Distributed Open Source ...

  2. 2.ZooKeeper客户端Curator「第三章 ZooKeeper Java客户端」「架构之路ZooKeeper理论和实战」

    前言 上一篇文章 介绍了zookeeper原生API的使用,使用过原生API不得不说,有很多的问题,比如:不能递归创建和删除节点.Watcher只能使用一次.还有很多可以解决分布式应用问题的api(比 ...

  3. Zookeeper开源客户端框架Curator的简单使用

    为什么80%的码农都做不了架构师?>>>    Curator最初由Netflix的Jordan Zimmerman开发, Curator提供了一套Java类库, 可以更容易的使用Z ...

  4. Zookeeper java客户端ZkClient使用详解

    简介 ZKClient是一个Zookeeper客户端框架,是对Zookeeper原生API的封装.使得使用更方便.功能更多. 查看之前必须要对Zookeeper的基本命令操作.Watch机制.Acl等 ...

  5. zookeeper Java客户端API的使用方法

    1.创建会话 2.创建节点(异步.同步) 3.删除节点(异步.同步) 4.读取数据(异步.同步) 5.节点检测(异步.同步) 6.更新数据(异步.同步) 7.ACL权限控制 演示代码下载(代码来自极客 ...

  6. Zookeeper 原生客户端、可视化工具 ZooInspector 、Apache Curator

    目录 Zookeeper 原生客户端 Apache Curator 开源客户端 可视化客户端工具 ZooInspector Zookeeper 原生客户端 1.类似 Redis 有多种 Java 客户 ...

  7. Zookeeper的java客户端Curator

    Zookeeper的java客户端Curator 常见的zookeeper java API: Curator API 常用操作 建立连接 添加节点 删除节点 修改节点 查询节点 watch事件监听 ...

  8. Apache Curator操作zookeeper的API使用

    curator简介与客户端之间的异同点 常用的zookeeper java客户端: zookeeper原生Java API zkclient Apache curator ZooKeeper原生Jav ...

  9. Apache Curator客户端的使用

    当前已有的三种API客户端 zk原生API :不支持超时自动重连,不支持节点递归创建 zkclient:无文档,异常处理弱爆了(简单的抛出RuntimeException) Apache Curato ...

最新文章

  1. 为什么要使用多用户开源商城系统
  2. timestamp 数据类型
  3. java框架篇---spring IOC依赖注入
  4. 2. JSF---托管Bean
  5. Docker容器管理总结
  6. eclipse下web开发中缓存问题
  7. 使用Python进行地理编码和反向地理编码
  8. C/C++获取二维数组行列数
  9. 给dubbo接口添加白名单——dubbo Filter的使用
  10. 5月8日——iOS中的3D Touch效果
  11. pytorch--nn模块(1)
  12. Django book2.0 contact表单
  13. react 移动端 h5 端日历组件 周日历 月日历 周视图 月视图
  14. android 渐变动画,Android-实现背景渐变动画
  15. 2021年全球网络保险收入大约9593.9百万美元,预计2028年达到68230百万美元,2022至2028期间,年复合增长率CAGR为35.1%
  16. clone远程代码 在不同电脑上git_Git 同一电脑配置多个远程仓库
  17. ES6 --promise了解
  18. PHP+TP框架实现微信公众号开发之发送模板消息
  19. IT行业都有哪些岗位?
  20. 路由器局域网IP(内网IP)和外网IP的关系

热门文章

  1. 批量将记事本文本文件转为 UTF8 等编码格式
  2. memory内存占用过高 解决方法
  3. 谷歌浏览器插件:selenium元素定位器chropath
  4. FPE修改教程进阶(地址编辑部分)
  5. 基金指标: beta,alpha,Sharpe Ratio
  6. 使用pandas清洗数据(中文字符串的正则使用)
  7. 简单查看linux是否能访问外网及拥有的公网IP
  8. JavaScript完整原型链图解
  9. 基于javaweb的超市收银管理系统(java+ssm+html+mysql)
  10. 安全技术的发展:物理隔离三步曲