Zookeeper 客户端之 Curator
之前写的一个在 Linux 上安装部署 Zookeeper 的笔记,其他操作系统请自行谷歌教程吧。
本文案例工程已经同步到了 github,传送门。
PS : 目前还没有看过Curator的具体源码,所以不会涉及到任何源码解析、实现原理的东西;本篇主要是实际使用时的一些记录,以备后用。如果文中错误之处,希望各位指出。
Curator 客户端的初始化和初始化时机
在实际的工程中,Zookeeper 客户端的初始化会在程序启动期间完成。
初始化时机
在 Spring 或者 SpringBoot 工程中最常见的就是绑定到容器启动的生命周期或者应用启动的生命周期中:
- 监听 ContextRefreshedEvent 事件,在容器刷新完成之后初始化 Zookeeper
- 监听 ApplicationReadyEvent/ApplicationStartedEvent 事件,初始化 Zookeeper 客户端
除了上面的方式之外,还有一种常见的是绑定到 bean 的生命周期中
- 实现 InitializingBean 接口 ,在 afterPropertiesSet 中完成 Zookeeper 客户端初始化
关于 SpringBoot中的事件机制可以参考之前写过的一篇文章:SpringBoot-SpringBoot中的事件机制。
Curator 初始化
这里使用 InitializingBean 的这种方式,代码如下:
public class ZookeeperCuratorClient implements InitializingBean {private CuratorFramework curatorClient;@Value("${glmapper.zookeeper.address:localhost:2181}")private String connectString;@Value("${glmapper.zookeeper.baseSleepTimeMs:1000}")private int baseSleepTimeMs;@Value("${glmapper.zookeeper.maxRetries:3}")private int maxRetries;@Value("${glmapper.zookeeper.sessionTimeoutMs:6000}")private int sessionTimeoutMs;@Value("${glmapper.zookeeper.connectionTimeoutMs:6000}")private int connectionTimeoutMs;@Overridepublic void afterPropertiesSet() throws Exception {// custom policyRetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);// to build curatorClientcuratorClient = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();curatorClient.start();}public CuratorFramework getCuratorClient() {return curatorClient;}
}
复制代码
glmapper.zookeeper.xxx 是本例中需要在配置文件中配置的 zookeeper 的一些参数,参数解释如下:
- baseSleepTimeMs:重试之间等待的初始时间
- maxRetries:最大重试次数
- connectString:要连接的服务器列表
- sessionTimeoutMs:session 超时时间
- connectionTimeoutMs:连接超时时间
另外,Curator 客户端初始化时还需要指定重试策略,RetryPolicy 接口是 Curator 中重试连接(当zookeeper失去连接时使用)策略的顶级接口,其类继承体系如下图所示:
- RetryOneTime:只重连一次
- RetryNTime:指定重连的次数N
- RetryUtilElapsed:指定最大重连超时时间和重连时间间隔,间歇性重连直到超时或者链接成功
- ExponentialBackoffRetry:基于 "backoff"方式重连,和 RetryUtilElapsed 的区别是重连的时间间隔是动态的。
- BoundedExponentialBackoffRetry: 同 ExponentialBackoffRetry的区别是增加了最大重试次数的控制
除上述之外,在一些场景中,需要对不同的业务进行隔离,这种情况下,可以通过设置 namespace 来解决,namespace 实际上就是指定zookeeper的根路径,设置之后,后面的所有操作都会基于该根目录。
Curator 基础 API 使用
检查节点是否存在
checkExists 方法返回的是一个 ExistsBuilder 构造器,这个构建器将返回一个 Stat 对象,就像调用了 org.apache.zookeeper.ZooKeeper.exists()一样。null 表示它不存在,而实际的 Stat 对象表示存在。
public void checkNodeExist(String path) throws Exception {Stat stat = curatorClient.checkExists().forPath(path);if (stat != null){throw new RuntimeException("path = "+path +" has bean exist.");}
}
复制代码
建议在实际的应用中,操作节点时对所需操作的节点进行 checkExists。
新增节点
非递归方式创建节点
curatorClient.create().forPath("/glmapper"); curatorClient.create().forPath("/glmapper/test"); 复制代码
先创建/glmapper,然后再在/glmapper 下面创建 /test ,如果直接使用 /glmapper/test 没有先创建 /glmapper 时,会抛出异常:
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /glmapper/test 复制代码
如果需要在创建节点时指定节点中数据,则可以这样:
curatorClient.create().forPath("/glmapper","data".getBytes()); 复制代码
指定节点类型(EPHEMERAL 临时节点)
curatorClient.create().withMode(CreateMode.EPHEMERAL).forPath("/glmapper","data".getBytes()); 复制代码
递归方式创建节点
递归方式创建节点有两个方法,creatingParentsIfNeeded 和 creatingParentContainersIfNeeded。在新版本的 zookeeper 这两个递归创建方法会有区别; creatingParentContainersIfNeeded() 以容器模式递归创建节点,如果旧版本 zookeeper,此方法等于creatingParentsIfNeeded()。
在非递归方式情况下,如果直接创建 /glmapper/test 会报错,那么在递归的方式下则是可以的
curatorClient.create().creatingParentContainersIfNeeded().forPath("/glmapper/test"); 复制代码
在递归调用中,如果不指定 CreateMode,则默认
PERSISTENT
,如果指定为临时节点,则最终节点会是临时节点,父节点仍旧是PERSISTENT
删除节点
非递归删除节点
curatorClient.delete().forPath("/glmapper/test"); 复制代码
指定具体版本
curatorClient.delete().withVersion(-1).forPath("/glmapper/test"); 复制代码
使用 guaranteed 方式删除,guaranteed 会保证在session有效的情况下,后台持续进行该节点的删除操作,直到删除掉
curatorClient.delete().guaranteed().withVersion(-1).forPath("/glmapper/test"); 复制代码
递归删除当前节点及其子节点
curatorClient.delete().deletingChildrenIfNeeded().forPath("/glmapper/test"); 复制代码
获取节点数据
获取节点数据
byte[] data = curatorClient.getData().forPath("/glmapper/test");
复制代码
根据配置的压缩提供程序对数据进行解压缩处理
byte[] data = curatorClient.getData().decompressed().forPath("/glmapper/test");
复制代码
读取数据并获得Stat信息
Stat stat = new Stat();
byte[] data = curatorClient.getData().storingStatIn(stat).forPath("/glmapper/test");
复制代码
更新节点数据
设置指定值
curatorClient.setData().forPath("/glmapper/test","newData".getBytes());
复制代码
设置数据并使用配置的压缩提供程序压缩数据
curatorClient.setData().compressed().forPath("/glmapper/test","newData".getBytes());
复制代码
设置数据,并指定版本
curatorClient.setData().withVersion(-1).forPath("/glmapper/test","newData".getBytes());
复制代码
获取子列表
List<String> childrenList = curatorClient.getChildren().forPath("/glmapper");
复制代码
事件
Curator 也对 Zookeeper 典型场景之事件监听进行封装,这部分能力实在 curator-recipes 包下的。
事件类型
在使用不同的方法时会有不同的事件发生
public enum CuratorEventType
{//Corresponds to {@link CuratorFramework#create()}CREATE,//Corresponds to {@link CuratorFramework#delete()}DELETE,//Corresponds to {@link CuratorFramework#checkExists()}EXISTS,//Corresponds to {@link CuratorFramework#getData()}GET_DATA,//Corresponds to {@link CuratorFramework#setData()}SET_DATA,//Corresponds to {@link CuratorFramework#getChildren()}CHILDREN,//Corresponds to {@link CuratorFramework#sync(String, Object)}SYNC,//Corresponds to {@link CuratorFramework#getACL()}GET_ACL,//Corresponds to {@link CuratorFramework#setACL()}SET_ACL,//Corresponds to {@link Watchable#usingWatcher(Watcher)} or {@link Watchable#watched()}WATCHED,//Event sent when client is being closedCLOSING
}
复制代码
事件监听
一次性监听方式:Watcher
利用 Watcher 来对节点进行监听操作,可以典型业务场景需要使用可考虑,但一般情况不推荐使用。
byte[] data = curatorClient.getData().usingWatcher(new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {System.out.println("监听器 watchedEvent:" + watchedEvent);}}).forPath("/glmapper/test");
System.out.println("监听节点内容:" + new String(data));
// 第一次变更节点数据
curatorClient.setData().forPath("/glmapper/test","newData".getBytes());
// 第二次变更节点数据
curatorClient.setData().forPath("/glmapper/test","newChangedData".getBytes());
复制代码
上面这段代码对 /glmapper/test 节点注册了一个 Watcher 监听事件,并且返回当前节点的内容。后面进行两次数据变更,实际上第二次变更时,监听已经失效,无法再次获得节点变动事件了。测试中控制台输出的信息如下:
监听节点内容:data
watchedEvent:WatchedEvent state:SyncConnected type:NodeDataChanged path:/glmapper/test
复制代码
CuratorListener 方式
CuratorListener 监听,此监听主要针对 background 通知和错误通知。使用此监听器之后,调用inBackground 方法会异步获得监听,对于节点的创建或修改则不会触发监听事件。
CuratorListener listener = new CuratorListener(){@Overridepublic void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println("event : " + event);}};
// 绑定监听器
curatorClient.getCuratorListenable().addListener(listener);
// 异步获取节点数据
curatorClient.getData().inBackground().forPath("/glmapper/test");
// 更新节点数据
curatorClient.setData().forPath("/glmapper/test","newData".getBytes());
复制代码
测试中控制台输出的信息如下:
event : CuratorEventImpl{type=GET_DATA, resultCode=0, path='/glmapper/test', name='null', children=null, context=null, stat=5867,5867,1555140974671,1555140974671,0,0,0,0,4,0,5867
, data=[100, 97, 116, 97], watchedEvent=null, aclList=null}
复制代码
这里只触发了一次监听回调,就是 getData 。
Curator 引入的 Cache 事件监听机制
Curator 引入了 Cache 来实现对 Zookeeper 服务端事件监听,Cache 事件监听可以理解为一个本地缓存视图与远程 Zookeeper 视图的对比过程。Cache 提供了反复注册的功能。Cache 分为两类注册类型:节点监听和子节点监听。
NodeCache
监听数据节点本身的变化。对节点的监听需要配合回调函数来进行处理接收到监听事件之后的业务处理。NodeCache 通过 NodeCacheListener 来完成后续处理。
String path = "/glmapper/test"; final NodeCache nodeCache = new NodeCache(curatorClient,path); //如果设置为true则在首次启动时就会缓存节点内容到Cache中。 nodeCache.start(true); nodeCache.start(); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("触发监听回调,当前节点数据为:" + new String(nodeCache.getCurrentData().getData())); } }); curatorClient.setData().forPath(path,"1".getBytes()); curatorClient.setData().forPath(path,"2".getBytes()); curatorClient.setData().forPath(path,"3".getBytes()); curatorClient.setData().forPath(path,"4".getBytes()); curatorClient.setData().forPath(path,"5".getBytes()); curatorClient.setData().forPath(path,"6".getBytes()); 复制代码
注意:在测试过程中,nodeCache.start(),NodeCache 在先后多次修改监听节点的内容时,出现了丢失事件现象,在用例执行的5次中,仅一次监听到了全部事件;如果 nodeCache.start(true),NodeCache 在先后多次修改监听节点的内容时,不会出现丢失现象。
NodeCache不仅可以监听节点内容变化,还可以监听指定节点是否存在。如果原本节点不存在,那么Cache就会在节点被创建时触发监听事件,如果该节点被删除,就无法再触发监听事件。
PathChildrenCache
PathChildrenCache 不会对二级子节点进行监听,只会对子节点进行监听。
String path = "/glmapper"; PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorClient,path,true); // 如果设置为true则在首次启动时就会缓存节点内容到Cache中。 nodeCache.start(true); pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {System.out.println("-----------------------------");System.out.println("event:" + event.getType());if (event.getData()!=null){System.out.println("path:" + event.getData().getPath());}System.out.println("-----------------------------");} }); zookeeperCuratorClient.createNode("/glmapper/test","data".getBytes(),CreateMode.PERSISTENT); Thread.sleep(1000); curatorClient.setData().forPath("/glmapper/test","1".getBytes()); Thread.sleep(1000); curatorClient.setData().forPath("/glmapper/test","2".getBytes()); Thread.sleep(1000); zookeeperCuratorClient.createNode("/glmapper/test/second","data".getBytes(),CreateMode.PERSISTENT); Thread.sleep(1000); curatorClient.setData().forPath("/glmapper/test/second","1".getBytes()); Thread.sleep(1000); curatorClient.setData().forPath("/glmapper/test/second","2".getBytes()); Thread.sleep(1000); 复制代码
注意:在测试过程中发现,如果连续两个操作之间不进行一定时间的间隔,会导致无法监听到下一次事件。因此只会监听子节点,所以对二级子节点 /second 下面的操作是监听不到的。测试中控制台输出的信息如下:
----------------------------- event:CHILD_ADDED path:/glmapper/test ----------------------------- ----------------------------- event:INITIALIZED ----------------------------- ----------------------------- event:CHILD_UPDATED path:/glmapper/test ----------------------------- ----------------------------- event:CHILD_UPDATED path:/glmapper/test ----------------------------- 复制代码
TreeCache
TreeCache 使用一个内部类
TreeNode
来维护这个一个树结构。并将这个树结构与ZK节点进行了映射。所以TreeCache 可以监听当前节点下所有节点的事件。String path = "/glmapper"; TreeCache treeCache = new TreeCache(curatorClient,path); treeCache.getListenable().addListener((client,event)-> {System.out.println("-----------------------------");System.out.println("event:" + event.getType());if (event.getData()!=null){System.out.println("path:" + event.getData().getPath());}System.out.println("-----------------------------"); }); treeCache.start(); zookeeperCuratorClient.createNode("/glmapper/test","data".getBytes(),CreateMode.PERSISTENT); Thread.sleep(1000); curatorClient.setData().forPath("/glmapper/test","1".getBytes()); Thread.sleep(1000); curatorClient.setData().forPath("/glmapper/test","2".getBytes()); Thread.sleep(1000); zookeeperCuratorClient.createNode("/glmapper/test/second","data".getBytes(),CreateMode.PERSISTENT); Thread.sleep(1000); curatorClient.setData().forPath("/glmapper/test/second","1".getBytes()); Thread.sleep(1000); curatorClient.setData().forPath("/glmapper/test/second","2".getBytes()); Thread.sleep(1000); 复制代码
测试中控制台输出的信息如下:
----------------------------- event:NODE_ADDED path:/glmapper ----------------------------- ----------------------------- event:NODE_ADDED path:/glmapper/test ----------------------------- ----------------------------- event:NODE_UPDATED path:/glmapper/test ----------------------------- ----------------------------- event:NODE_UPDATED path:/glmapper/test ----------------------------- ----------------------------- event:NODE_ADDED path:/glmapper/test/second ----------------------------- ----------------------------- event:NODE_UPDATED path:/glmapper/test/second ----------------------------- ----------------------------- event:NODE_UPDATED path:/glmapper/test/second ----------------------------- 复制代码
事务操作
CuratorFramework 的实例包含 inTransaction( ) 接口方法,调用此方法开启一个 ZooKeeper 事务。 可以复合create、 setData、 check、and/or delete 等操作然后调用 commit() 作为一个原子操作提交。
// 开启事务
CuratorTransaction curatorTransaction = curatorClient.inTransaction();
Collection<CuratorTransactionResult> commit = // 操作1
curatorTransaction.create().withMode(CreateMode.EPHEMERAL).forPath("/glmapper/transaction").and()// 操作2 .delete().forPath("/glmapper/test").and()// 操作3.setData().forPath("/glmapper/transaction", "data".getBytes()).and()// 提交事务.commit();
Iterator<CuratorTransactionResult> iterator = commit.iterator();
while (iterator.hasNext()){CuratorTransactionResult next = iterator.next();System.out.println(next.getForPath());System.out.println(next.getResultPath());System.out.println(next.getType());
}
复制代码
这里debug看了下Collection信息,面板如下:
异步操作
前面提到的增删改查都是同步的,但是 Curator 也提供了异步接口,引入了 BackgroundCallback 接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback 接口中一个重要的回调值为 CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。
在使用上也是非常简单的,只需要带上 inBackground() 就行,如下:
curatorClient.getData().inBackground().forPath("/glmapper/test");
复制代码
通过查看 inBackground 方法定义可以看到,inBackground 支持自定义线程池来处理返回结果之后的业务逻辑。
public T inBackground(BackgroundCallback callback, Executor executor);
复制代码
这里就不贴代码了。
小结
本文主要围绕 Curator 的基本 API 进行了学习记录,对于原理及源码部分没有涉及。这部分如果有时间在慢慢研究吧。另外像分布式锁、分布式自增序列等实现停留在理论阶段,没有实践,不敢妄论,用到再码吧。
Zookeeper 客户端之 Curator相关推荐
- zookeeper客户端库curator分析
zookeeper客户端库curator分析 前言 综述 zookeeper保证 理解zookeeper的顺序一致性 之前使用zookeeper客户端踩到的坑 curator 连接保证 连接状态监控以 ...
- Zookeeper开源客户端框架Curator的简单使用
为什么80%的码农都做不了架构师?>>> Curator最初由Netflix的Jordan Zimmerman开发, Curator提供了一套Java类库, 可以更容易的使用Z ...
- 聊聊、Zookeeper 客户端 Curator
[Curator] 和 ZkClient 一样,Curator 也是开源客户端,Curator 是 Netflix 公司开源的一套框架. <dependency><groupId ...
- Zookeeper客户端Curator使用详解
http://www.jianshu.com/p/70151fc0ef5d Zookeeper客户端Curator使用详解 简介 Curator是Netflix公司开源的一套zookeeper客户端框 ...
- [转载]Zookeeper开源客户端框架Curator简介
转载声明:http://macrochen.iteye.com/blog/1366136 Zookeeper开源客户端框架Curator简介 博客分类: Distributed Open Source ...
- Zookeeper客户端Curator详解
一.Curator 客户端使用 Curator是 Netflix公司开源的一套ZooKeeper客户端框架,和 ZkClient一样它解决了非常底层的细节开发工作,包括连接.重连.反复注册Watche ...
- ZooKeeper客户端Curator的基本使用
前提:ZooKeeper版本:3.4.14 Curator版本:2.13.0 1.什么是Curator Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Z ...
- zookeeper客户端 curator的使用
curator简介 Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,提供了各种应用场景的实现封装,flue ...
- 【高级篇】详解Zookeeper客户端Curator
一.序言 之前分享过一篇关于Curotor的基本应用[基础篇]详解Zookeeper客户端Curator,如果对Curator没有了解的可以看看,本文分享关于Curator的一些高级特性,在监听和le ...
最新文章
- IntelliJ IDEA使用教程(非常全面)
- 半径为r的均匀带电球体_半径为R的均匀带电球面,总带电量为Q,设无穷远处的电势为零,则距离球心为r(r=R)的P点处的电场强度的大小和电势为...
- mybatis新增返回主键值
- Java LocalDate类| 带示例的format()方法
- 一个方便的图片载入框架——ImageViewEx
- python第二十二天,configparser模块 subprocess 模块,xlrd 模块(表格处理)
- YUM更换源(1)--yum找不到安装包
- java线程死锁研究
- 【Android】 修复ijkPlayer进行m3u8 hls流播放时seek进度条拖动不准确的问题
- 理解FPS游戏中的矩阵方框透视自瞄
- 让子弹飞,是什么意思?
- 基于轨迹的游客行为特征分析
- t6服务器验证密码失败,用友T6软件T6服务无法启动,提示SA密码错误
- 云端运行python_云端部署python代码及安装MySQL
- 不做律师,玩起了电子合同,这家入选微软加速器第十期的电子合同企业究竟是什么来头?
- video.js播放m3u8视频
- RISCV学习笔记7.8(开源虚拟机篇)--AlmaLinux虚拟机安装modelsim
- SQLI DUMB SERIES-6
- 效果超牛的基于声波通信和声音指纹的微信互动平台
- ODB++数据解析二