Curator是netflix公司开源的一套zookeeper客户端。与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。Curator解决了很多zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册wathcer和NodeExistsException 异常等。

Curator由一系列的模块构成,对于一般开发者而言,常用的是curator-frameworkcurator-recipes。curator-framework只是对zookeeper客户端做了简单的封装,提供基本的连接和重试功能。curator-recipes 则提供了一些zookeeper的典型使用场景的实现。

一、curator-framework基本功能

1. 创建连接

// 通过CuratorFrameworkFactory静态方法创建客户端
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();
// 可以通过流式风格指定客户端的属性
CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181").retryPolicy(new ExponentialBackoffRetry(1000, 3)).connectionTimeoutMs(15 * 1000) //连接超时时间,默认15秒.sessionTimeoutMs(60 * 1000) //会话超时时间,默认60秒.namespace("arch") //设置命名空间.build();

2.节点操作

// 创建节点
client.create().forPath("/my/path", myData);
client.create().creatingParentsIfNeeded().forPath(path, payload);
client.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
// withProtection()用于保护边缘情况:在服务器上创建成功,而返回客户端失败
client.create().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, payload);// 修改节点
client.setData().forPath(path, payload);// 删除节点
client.delete().forPath(path);
client.delete().deletingChildrenIfNeeded().forPath(path);
// 解决在服务器上操作可能成功但在响应成功返回到客户端之前发生连接故障的边缘情况。
client.delete().guaranteed().forPath(path);// 查询节点
client.getData().forPath(path);
client.getChildren().forPath(path);

二、curator-recipes事件监听

zookeeper原生支持通过注册watcher来进行事件监听,但是其使用不是特别方便,需要开发人员自己反复注册watcher,比较繁琐。Curator引入Cache来实现对zookeeper服务端事务的监听。Cache是Curator中对事件监听的包装,其对事件的监听其实可以近似看作是一个本地缓存视图和远程Zookeeper视图的对比过程。同时Curator能够自动为开发人员处理反复注册监听,从而大大简化原生api开发的繁琐过程。

  • CuratorCache
 private static final String PATH = "/example/cache";public static void main(String[] args) throws Exception{...try (CuratorCache cache = CuratorCache.build(client, PATH)){// 监听增删改事件(子节点的事件同样会被监听)CuratorCacheListener listener = CuratorCacheListener.builder().forCreates(node -> System.out.println(String.format("Node created: [%s]", node))).forChanges((oldNode, node) -> System.out.println(String.format("Node changed. Old: [%s] New: [%s]", oldNode, node))).forDeletes(oldNode -> System.out.println(String.format("Node deleted. Old value: [%s]", oldNode))).forInitialized(() -> System.out.println("Cache initialized")).build();// 注册监听器cache.listenable().addListener(listener);cache.start();}

三、curator-recipes工具类

curator-recipes 提供的工具类模块:Lock(分布式锁)、Barriers(栅栏)、Counter(计数器)、Elections(选主)

1.Lock

  • 可重入锁Shared Reentrant Lock:和JUC中的ReentrantLock类似,但是支持分布式场景
// 创建锁
public InterProcessMutex(CuratorFramework client, String path)// 获取锁
public void acquire();
public boolean acquire(long time, TimeUnit unit);// 释放锁
release()// Revoking ZooKeeper recipes wiki定义了可协商的撤销机制。 为了撤销mutex, 调用下面的方法
// 将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
public void makeRevocable(RevocationListener<T> listener)

不可重入锁Shared Lock: 使用InterProcessSemaphoreMutex,调用方法类似,区别在于该锁是不可重入的,在同一个线程中不可重入
可重入读写锁Shared Reentrant Read Write Lock: 类似JDK的ReentrantReadWriteLock. 一个读写锁管理一对相关的锁。 一个负责读操作,另外一个负责写操作。 读操作在写锁没被使用时可同时由多个进程使用,而写锁使用时不允许读 (阻塞)。 此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。 这也意味着写锁可以降级成读锁, 比如请求写锁 —>读锁 —->释放写锁。 从读锁升级成写锁是不成的。

InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, "readWriteLock");
InterProcessReadWriteLock.ReadLock readLock = readWriteLock.readLock();
InterProcessReadWriteLock.WriteLock writeLock = readWriteLock.writeLock();
  • 信号量Shared Semaphore: 类似JDK的Semaphore,可以一次请求多个,一次返回多个,支持超时获取
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "semophore", 5);
Lease lease = semaphore.acquire();
semaphore.returnLease(lease);

如果客户端session由于某种原因比如crash丢掉, 那么这些客户端持有的信号量会自动释放, 其它客户端可以继续使用这些信号量。

  • 多锁对象Multi Shared Lock:Multi Shared Lock是一个锁的容器。 当调用acquire, 所有的锁都会被acquire,如果请求失败,所有的锁都会被release。 同样调用release时所有的锁都被release(失败被忽略)。
// 它的构造函数需要包含的锁的集合,或者一组ZooKeeper的path。
public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

2.Barriers

单栅栏DistributedBarrier
使用方法:

  1. 主导client设置一个栅栏
  2. 其他客户端就会调用waitOnBarrier()等待栅栏移除,程序处理线程阻塞
  3. 主导client移除栅栏,其他客户端的处理程序就会同时继续运行。
DistributedBarrier barrier = new DistributedBarrier(client, "/barrier");
barrier.setBarrier();    // 设置栅栏
barrier.waitOnBarrier();   // 等待栅栏移除
barrier.removeBarrier();    // 移除栅栏
  • 双栅栏DistributedDoubleBarrier:与JUC中的CyclicBarrier比较像,但是可以支持拦截两次
// memberQty是成员数量,memberQty的值只是一个阈值,而不是一个限制值。当等待栅栏的数量大于或等于这个值栅栏就会打开!
public DistributedDoubleBarrier(CuratorFramework client, String barrierPath, int memberQty)doubleBarrier.enter();
doubleBarrier.leave();

工作原理:

  1. 从多个客户端在同一个路径上创建双栅栏(DistributedDoubleBarrier),然后调用enter()方法,等待栅栏数量达到memberQty时就可以进入栅栏。
  2. 栅栏数量达到memberQty,多个客户端同时停止阻塞继续运行,直到执行leave()方法,等待memberQty个数量的栅栏同时阻塞到leave()方法中。
  3. memberQty个数量的栅栏同时阻塞到leave()方法中,多个客户端的leave()方法停止阻塞,继续运行。

3.Counter

利用ZooKeeper可以实现一个集群共享的计数器。 只要使用相同的path就可以得到最新的计数器值, 这是由ZooKeeper的一致性保证的。Curator有两个计数器, 一个是用int来计数,一个用long来计数。

  • SharedCount:使用int类型来计数
  • DistributedAtomicLong:分布式原子类

4.Elections

  • Leader Latch:随机从候选者中选出一台作为leader,选中之后除非调用close()释放leadship,否则其他的后选择无法成为leader
 // 在同一个路径下争锁private static final String PATH = "/demo/leader";public static void main(String[] args) {List<LeaderLatch> latchList = new ArrayList<>();List<CuratorFramework> clients = new ArrayList<>();try {for (int i = 0; i < 10; i++) {// 创建clientCuratorFramework client = getClient();client.start();clients.add(client);final LeaderLatch leaderLatch = new LeaderLatch(client, PATH, "client#" + i);// 设置监听器,可以设置在成为或没成为Leader后的行为leaderLatch.addListener(new LeaderLatchListener() {@Overridepublic void isLeader() {System.out.println(leaderLatch.getId() + ":I am leader. I am doing jobs!");}@Overridepublic void notLeader() {System.out.println(leaderLatch.getId() + ":I am not leader. I will do nothing!");}});latchList.add(leaderLatch);leaderLatch.start();}Thread.sleep(1000 * 60);} catch (Exception e) {e.printStackTrace();} finally {for (CuratorFramework client : clients) {CloseableUtils.closeQuietly(client);}for (LeaderLatch leaderLatch : latchList) {CloseableUtils.closeQuietly(leaderLatch);}}}
  • Leader Election:通过LeaderSelectorListener可以对领导权进行控制, 在适当的时候释放领导权(执行完takeLeadership()方法),这样每个节点都有可能获得领导权。 而LeaderLatch则一直持有leadership, 除非调用close方法,否则它不会释放领导权。
private static final String PATH = "/demo/leader";public class LeaderSelectorTest {public static void main(String[] args) {List<LeaderSelector> selectors = new ArrayList<>();List<CuratorFramework> clients = new ArrayList<>();try {for (int i = 0; i < 10; i++) {CuratorFramework client = getClient();client.start();clients.add(client);final String name = "client#" + i;LeaderSelector leaderSelector = new LeaderSelector(client, PATH, new LeaderSelectorListenerAdapter() {// 在被选举为leader时调用,调用结束后不再担任leader@Overridepublic void takeLeadership(CuratorFramework client) throws Exception {System.out.println(name + ":I am leader.");Thread.sleep(2000);}});// 设置后,执行完takeLeadership后重新进入队列排队leaderSelector.autoRequeue();leaderSelector.start();selectors.add(leaderSelector);}Thread.sleep(Integer.MAX_VALUE);} catch (....

5.Queues

  • DistributedQueue:分布式队列
  • DistributedIdQueue:DistributedQueue的一个版本,允许ID与队列项相关联。如果需要,可以从队列中删除项目。
  • DistributedPriorityQueue:分布式优先级队列
  • DistributedDelayQueue:分布式延时队列

参考文献:https://zhuanlan.zhihu.com/p/72857145
https://curator.apache.org/

Curator使用手册相关推荐

  1. dubbo 注册中心zookeeper 手册

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家.点击跳转到教程. zookeeper 注册中心 Zookeeper 是 Apacahe Hadoop 的子项目,是一个 ...

  2. 分布式训练使用手册-paddle 数据并行

    分布式训练使用手册¶ 分布式训练基本思想¶ 分布式深度学习训练通常分为两种并行化方法:数据并行,模型并行,参考下图: 在模型并行方式下,模型的层和参数将被分布在多个节点上,模型在一个mini-batc ...

  3. 正则语法完全正则表达式手册_语法格式重点

    20211202 https://blog.csdn.net/lc11535/article/details/103266263 该表达式打开re.U(re.UNICODE)标志. python –& ...

  4. CUDA C++编程手册(总论)

    CUDA C++编程手册(总论) CUDA C++ Programming Guide The programming guide to the CUDA model and interface. C ...

  5. html iso标准文档,HTML ISO-8859-1 参考手册

    # HTML ISO-8859-1 参考手册 HTML 4.01 支持 ISO 8859-1 (Latin-1) 字符集. ISO-8859-1 的较低部分(从 1 到 127 之间的代码)是最初的 ...

  6. Curator Cache

    1.Curator Cache 与原生ZooKeeper Wacher区别 原生的ZooKeeper Wacher是一次性的:一个Wacher一旦触发就会被移出,如果你想要反复使用Wacher,就要在 ...

  7. 最好的程序界面就是用户无需去阅读操作手册就知道该如何使用的界面

    最好的程序界面就是用户无需去阅读操作手册就知道该如何使用的界面. 原则 1.一致性  如果你可以在一个列表的项目上双击后能 够弹出对话框,那么应该在任何列表中双击都能弹出对话框.要有统一的字体写号.统 ...

  8. ffmpeg linux安装_ffmpeg命令中文手册

    功能 视频转换和编辑工具 示例 1.从mp4视频文件中提取音频并保存为mp3音频格式 [root@node_116 video]# ffmpeg -i video.mp4 -vn sound.mp3f ...

  9. java修炼手册3.8_Java修炼手册

    <Java修炼手册免费版>是一款专为想要自学JAVA的用户打造的全方位掌上学习软件,拥有各阶段的科学课程模块,从视频,资料,图片,演示等多种方面全面引导用户科学而快速的融入JAVA的世界中 ...

最新文章

  1. Ruby 之 Block, Proc, Lambda 联系--区别,转载
  2. 和12岁小同志搞创客开发:两个控制器之间如何实现通信?
  3. 使用CSS对页面加载的淡入效果
  4. [云炬创业基础笔记]第七章创业资源测试3
  5. 谷歌发布MetNet神经网络模型预测天气
  6. 笨办法学 Python · 续 练习 27:`tr`
  7. apktool d test.apk报错:Unsupported major.minor version 52.0
  8. 苹果mac强大的截图录像工具:Snagit
  9. 纬衡多个用户荣获“第五届建筑创作奖”
  10. 【人工智能】归结演绎推理
  11. 使用知用电流探头时如何设置示波器参数
  12. nodejs爬虫实战(一):抽屉新热榜
  13. 芯片常见的三种封装形式
  14. IOS UIImageView 汤姆猫实例
  15. 0xFEFEFEFE 处有未经处理的异常(在xx中): 0xC00001A5: 检测到无效的异常处理程序例程。
  16. 模板 2018-01-27 分解因数 分解质因数
  17. 聂易铭:3月11日数字货币插针诱惑,做多只能浅尝辄止
  18. 大数据治理:那些年,我们一起踩过的坑
  19. 安卓开发中许多应用到的资源
  20. 京东无人超市的成长之路 | 如何利用人工智能技术在零售业做产品创新?

热门文章

  1. 新产品开发中TR1,TR2,TR3..具体指什么?
  2. java基于安卓Android微信小程序的音乐论坛uniAPP小程序
  3. 批处理命令一日一教学
  4. 金山与永中,谁主沉浮?
  5. 学计算机笔记本屏幕多大,笔记本屏幕尺寸有哪些 2分钟让你全整明白【详解】...
  6. 盘点2020年北京市小升初考试关于信息学竞赛的那些事儿!
  7. 笔记本电脑外接显示器 卡_如何向Mac笔记本电脑添加和配置外接显示器
  8. 现货黄金有什么需要注意的?
  9. C#查找Excel()重复项
  10. Android UI换皮肤或 白天黑夜模式