一、简介

Curator是Netflix公司开源的一套Zookeeper客户端框架。了解过Zookeeper原生API都会清楚其复杂度。Curator帮助我们在其基础上进行封装、实现一些开发细节,包括接连重连、反复注册Watcher和NodeExistsException等。目前已经作为Apache的顶级项目出现,是最流行的Zookeeper客户端之一。从编码风格上来讲,它提供了基于Fluent的编程风格支持。

除此之外,Curator还提供了Zookeeper的各种应用场景:Recipe、共享锁服务、Master选举机制和分布式计数器等。

二、项目组件

名称 描述
Recipes Zookeeper典型应用场景的实现,这些实现是基于Curator Framework。
Framework Zookeeper API的高层封装,大大简化Zookeeper客户端编程,添加了例如Zookeeper连接管理、重试机制等。
Utilities 为Zookeeper提供的各种实用程序。
Client Zookeeper client的封装,用于取代原生的Zookeeper客户端(ZooKeeper类),提供一些非常有用的客户端特性。
Errors Curator如何处理错误,连接问题,可恢复的例外等。

三、Maven依赖

Curator的jar包已经发布到Maven中心,由以下几个artifact的组成。根据需要选择引入具体的artifact。但大多数情况下只用引入curator-recipes即可。

GroupID/Org ArtifactID/Name 描述
org.apache.curator curator-recipes 所有典型应用场景。需要依赖client和framework,需设置自动获取依赖。
org.apache.curator curator-framework 同组件中framework介绍。
org.apache.curator curator-client 同组件中client介绍。
org.apache.curator curator-test 包含TestingServer、TestingCluster和一些测试工具。
org.apache.curator curator-examples 各种使用Curator特性的案例。
org.apache.curator curator-x-discovery 在framework上构建的服务发现实现。
org.apache.curator curator-x-discoveryserver 可以和Curator Discovery一起使用的RESTful服务器。
org.apache.curator curator-x-rpc Curator framework和recipes非java环境的桥接。

根据上面的描述,开发人员大多数情况下使用的都是curator-recipes的依赖,此依赖的maven配置如下:

<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.12.0</version>
</dependency>

四、基本使用

4.1、创建会话

Curator的创建会话方式与原生的API和ZkClient的创建方式区别很大。Curator创建客户端是通过CuratorFrameworkFactory工厂类来实现的。其中,此工厂类提供了三种创建客户端的方法。 前两种方法是通过newClient来实现,仅参数不同而已。

4.1.1创建客户端方法API

public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
​
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
​
// 使用上面方法创建出一个CuratorFramework之后,需要再调用其start()方法完成会话创建。//demo1
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",retryPolicy);
client.start();
​
//demo2
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",5000,1000,retryPolicy);
client.start();
​
//其中参数RetryPolicy提供重试策略的接口,可以让用户实现自定义的重试策略。默认提供了以下实现,分别为ExponentialBackoffRetry、BoundedExponentialBackoffRetry、RetryForever、RetryNTimes、RetryOneTime、RetryUntilElapsed。

其中参数RetryPolicy提供重试策略的接口,可以让用户实现自定义的重试策略。默认提供了以下实现,分别为ExponentialBackoffRetry、BoundedExponentialBackoffRetry、RetryForever、RetryNTimes、RetryOneTime、RetryUntilElapsed。

进一步查看源代码可以得知,其实这两种方法内部实现一样,只是对外包装成不同的方法。它们的底层都是通过第三个方法builder来实现的。

RetryPolicy retryPolicy  = new ExponentialBackoffRetry(1000,3);
CuratorFramework Client = CuratorFrameworkFactory.builder().connectString("hadoop1:2181,hadoop2:2181,hadoop3:2181").sessionTimeoutMs(3000).connectionTimeoutMs(5000).retryPolicy(retryPolicy).build();
​
client.start();
client.blockUntilConnected();

参数:

  • connectString:zk的server地址,多个server之间使用英文逗号分隔开

  • connectionTimeoutMs:连接超时时间,如上是30s,默认是15s

  • sessionTimeoutMs:会话超时时间,如上是50s,默认是60s

  • retryPolicy:失败重试策略

    • ExponentialBackoffRetry:构造器含有三个参数 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

      • baseSleepTimeMs:初始的sleep时间,用于计算之后的每次重试的sleep时间,

        • 计算公式:当前sleep时间=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))

      • maxRetries:最大重试次数

      • maxSleepMs:最大sleep时间,如果上述的当前sleep计算出来比这个大,那么sleep用这个时间

    • 其他,查看org.apache.curator.RetryPolicy接口的实现类

    • start() 开始创建会话。

    • blockUntilConnected() 直到连接成功或超时。

4.2、创建节点

Curator默认创建的是持久节点,内容为空。 可以在创建时选择节点类型:

  • 持久节点(PERSISTENT)

  • 持久顺序节点(PERSISTENT_SEQUENTIAL)

  • 临时节点(EPHEMERAL)

  • 临时顺序节点(EPHEMERAL_SEQUENTIAL)

4.2.1、创建一个初始内容为空的节点

client.create().forPath(path);

4.2.2、创建一个包含内容的节点

client.create().forPath(path,"我是内容".getBytes());

4.2.3、创建临时节点,并递归创建父节点

client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
//此处Curator和ZkClient一样封装了递归创建父节点的方法。在递归创建父节点时,父节点为持久节点。

4.3、删除节点

4.3.1、删除一个子节点

client.delete().forPath(path);

4.3.2、删除节点并递归删除其子节点

client.delete().deletingChildrenIfNeeded().forPath(path);

4.3.3、指定版本进行删除

client.delete().withVersion(1).forPath(path);
​
//如果版本不存在,则删除异常,信息如下:
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for

4.3.4、强制保证删除一个节点

client.delete().guaranteed().forPath(path);

4.4、读取数据

读取节点数据内容API相当简单,Curator提供了传入一个Stat,使用节点当前的Stat替换到传入的Stat的方法,查询方法执行完成之后,Stat引用已经执行当前最新的节点Stat。

// 普通查询
client.getData().forPath(path);
​
// 包含状态查询
Stat stat = new Stat();
client.getData().storingStatIn(stat()).forPath(path);

4.5、更新数据

更新数据,如果未传入version参数,那么更新当前最新版本,如果传入version则更新指定version,如果version已经变更,则抛出异常。

// 普通更新
client.setData().forPath(path,"新内容".getBytes());
​
// 指定版本更新
client.setData().withVersion(1).forPath(path);
​
更新出错,版本不一致异常:
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for

4.6、异步接口

在使用以上针对节点的操作API时,我们会发现每个接口都有一个inBackground()方法可供调用。此接口就是Curator提供的异步调用入口。对应的异步处理接口为BackgroundCallback。此接口指提供了一个processResult的方法,用来处理回调结果。其中processResult的参数event中的getType()包含了各种事件类型,getResultCode()包含了各种响应码。

重点说一下inBackground的以下接口:

public T inBackground(BackgroundCallback callback, Executor executor);
//此接口就允许传入一个Executor实例,用一个专门线程池来处理返回结果之后的业务逻辑。
/**
*  异步创建节点
*
* 注意:如果自己指定了线程池,那么相应的操作就会在线程池中执行,如果没有指定,
* 那么就会使用Zookeeper的EventThread线程对事件进行串行处理
* */
client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println("当前线程:" + Thread.currentThread().getName() + ",code:"+ event.getResultCode() + ",type:" + event.getType());}}, Executors.newFixedThreadPool(10)).forPath("/async-node01");
​
​
client.create().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println("当前线程:" + Thread.currentThread().getName() + ",code:" + event.getResultCode() + ",type:" + event.getType());}}).forPath("/async-node02");

五、事务管理

/** 事务管理:碰到异常,事务会回滚* 使用transaction()来控制事务* @throws Exception*/
public void testTransaction() throws Exception{//定义几个基本操作CuratorOp createOp = client.transactionOp().create().forPath("/curator/one_path","some data".getBytes());
​CuratorOp setDataOp = client.transactionOp().setData().forPath("/curator","other data".getBytes());
​CuratorOp deleteOp = client.transactionOp().delete().forPath("/curator");
​//事务执行结果List<CuratorTransactionResult> results = client.transaction().forOperations(createOp,setDataOp,deleteOp);
​//遍历输出结果for(CuratorTransactionResult result : results){System.out.println("执行结果是: " + result.getForPath() + "--" + result.getType());}
}
//因为节点“/curator”存在子节点,所以在删除的时候将会报错,事务回滚

六、监听器

Curator提供了三种Watcher(Cache)来监听结点的变化:

  • Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。产生的事件会传递给注册的PathChildrenCacheListener。

  • Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。

  • Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。

/*** 在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理*/
ExecutorService pool = Executors.newFixedThreadPool(2);
​
/*** 监听数据节点的变化情况*/
final NodeCache nodeCache = new NodeCache(client, "/zk-huey/cnode", false);nodeCache.start(true);nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println("Node data is changed, new data: " +new String(nodeCache.getCurrentData().getData()));}},pool);
​
/*** 监听子节点的变化情况*/
final PathChildrenCache childrenCache = new PathChildrenCache(client, "/zk-huey", true);childrenCache.start(StartMode.POST_INITIALIZED_EVENT);childrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {switch (event.getType()) {case CHILD_ADDED:System.out.println("CHILD_ADDED: " + event.getData().getPath());break;case CHILD_REMOVED:System.out.println("CHILD_REMOVED: " + event.getData().getPath());break;case CHILD_UPDATED:System.out.println("CHILD_UPDATED: " + event.getData().getPath());break;default:break;}}},pool);
​client.setData().forPath("/zk-huey/cnode", "world".getBytes());
​Thread.sleep(10 * 1000);pool.shutdown();client.close();

七、分布式锁思路

分布式编程时,比如最容易碰到的情况就是应用程序在线上多机部署,于是当多个应用同时访问某一资源时,就需要某种机制去协调它们。例如,现在一台应用正在rebuild缓存内容,要临时锁住某个区域暂时不让访问;又比如调度程序每次只想一个任务被一台应用执行等等。

下面的程序会启动两个线程t1和t2去争夺锁,拿到锁的线程会占用5秒。运行多次可以观察到,有时是t1先拿到锁而t2等待,有时又会反过来。Curator会用我们提供的lock路径的结点作为全局锁,这个结点的数据类似这种格式:[_c_64e0811f-9475-44ca-aa36-c1db65ae5350-lock-0000000005],每次获得锁时会生成这种串,释放锁时清空数据。

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;
​
import java.util.concurrent.TimeUnit;
​
/*** Curator framework's distributed lock test.*/
public class CuratorDistrLockTest {
​/** Zookeeper info */private static final String ZK_ADDRESS = "192.168.1.100:2181";private static final String ZK_LOCK_PATH = "/zktest";
​public static void main(String[] args) throws InterruptedException {// 1.Connect to zkCuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS,new RetryNTimes(10, 5000));client.start();System.out.println("zk client start successfully!");
​Thread t1 = new Thread(() -> {doWithLock(client);}, "t1");Thread t2 = new Thread(() -> {doWithLock(client);}, "t2");
​t1.start();t2.start();}
​private static void doWithLock(CuratorFramework client) {InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);try {if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {System.out.println(Thread.currentThread().getName() + " hold lock");Thread.sleep(5000L);System.out.println(Thread.currentThread().getName() + " release lock");}} catch (Exception e) {e.printStackTrace();} finally {try {lock.release();} catch (Exception e) {e.printStackTrace();}}}
}

八、Leader选举

当集群里的某个服务down机时,我们可能要从slave结点里选出一个作为新的master,这时就需要一套能在分布式环境中自动协调的Leader选举方法。Curator提供了LeaderSelector监听器实现Leader选举功能。同一时刻,只有一个Listener会进入takeLeadership()方法,说明它是当前的Leader。注意:当Listener从takeLeadership()退出时就说明它放弃了“Leader身份”,这时Curator会利用Zookeeper再从剩余的Listener中选出一个新的Leader。autoRequeue()方法使放弃Leadership的Listener有机会重新获得Leadership,如果不设置的话放弃了的Listener是不会再变成Leader的。

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.EnsurePath;
​
/*** Curator framework's leader election test.* Output:*  LeaderSelector-2 take leadership!*  LeaderSelector-2 relinquish leadership!*  LeaderSelector-1 take leadership!*  LeaderSelector-1 relinquish leadership!*  LeaderSelector-0 take leadership!*  LeaderSelector-0 relinquish leadership! *      ...*/
public class CuratorLeaderTest {
​/** Zookeeper info */private static final String ZK_ADDRESS = "192.168.1.100:2181";private static final String ZK_PATH = "/zktest";
​public static void main(String[] args) throws InterruptedException {LeaderSelectorListener listener = new LeaderSelectorListener() {@Overridepublic void takeLeadership(CuratorFramework client) throws Exception {System.out.println(Thread.currentThread().getName() + " take leadership!");
​// takeLeadership() method should only return when leadership is being relinquished.Thread.sleep(5000L);
​System.out.println(Thread.currentThread().getName() + " relinquish leadership!");}
​@Overridepublic void stateChanged(CuratorFramework client, ConnectionState state) {}};
​new Thread(() -> {registerListener(listener);}).start();
​new Thread(() -> {registerListener(listener);}).start();
​new Thread(() -> {registerListener(listener);}).start();
​Thread.sleep(Integer.MAX_VALUE);}
​private static void registerListener(LeaderSelectorListener listener) {// 1.Connect to zkCuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS,new RetryNTimes(10, 5000));client.start();
​// 2.Ensure pathtry {new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient());} catch (Exception e) {e.printStackTrace();}
​// 3.Register listenerLeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener);selector.autoRequeue();selector.start();}
}

Curator的基本使用相关推荐

  1. Curator Cache

    1.Curator Cache 与原生ZooKeeper Wacher区别 原生的ZooKeeper Wacher是一次性的:一个Wacher一旦触发就会被移出,如果你想要反复使用Wacher,就要在 ...

  2. 十六、curator recipes之DistributedIdQueue

    简介 curator实现了一种分布式ID队列,也是遵循FIFO原则,比普通队列新增的一个点是ID队列可以根据ID对队列元素进行操作,比如移除该元素. 官方文档:http://curator.apach ...

  3. Zookeeper开源客户端框架Curator的简单使用

    为什么80%的码农都做不了架构师?>>>    Curator最初由Netflix的Jordan Zimmerman开发, Curator提供了一套Java类库, 可以更容易的使用Z ...

  4. Curator counters

    2019独角兽企业重金招聘Python工程师标准>>> 这个比较好理解,分布式数字,类似AtomicInteger系列,Curator有2个实现: 第一个: package cura ...

  5. java curator_[java,zk]在 linux 上快速搭建 zookeeper curator 开发环境

    在这篇博客中简单介绍一下,如何快速的在 linux 操作系统上搭建使用 zookeeper 客户端 curator 编程的单机环境. 在前几篇博客中,介绍的是使用 zookeeper 原生提供的 AP ...

  6. Curator: ZooKeeper的使用配方

    Curator: ZooKeeper的使用配方 ZooKeeper 作为分布式的存储方式, 有很多种使用场景, 把典型的使用场景提取出来, 成为"配方", 方便用户参考. Cura ...

  7. 【ZK-curator使用异常】KeeperErrorCode = Unimplemented for /***

    [ZK-curator使用异常]KeeperErrorCode = Unimplemented for /*** 参考文章: (1)[ZK-curator使用异常]KeeperErrorCode = ...

  8. 五、curator recipes之选举主节点Leader Latch

    简介 在分布式计算中,主节点选举是为了把某个进程作为主节点来控制其它节点的过程.在选举结束之前,我们不知道哪个节点会成为主节点.curator对于主节点选举有两种实现方式,本文示例演示Latch的实现 ...

  9. 聊聊、Zookeeper 客户端 Curator

    [Curator]   和 ZkClient 一样,Curator 也是开源客户端,Curator 是 Netflix 公司开源的一套框架. <dependency><groupId ...

  10. [Curator] Path Cache 的使用与分析

    为什么80%的码农都做不了架构师?>>>    Path Cache Path Cache其实就是用于对zk节点的监听.不论是子节点的新增.更新或者移除的时候,Path Cache都 ...

最新文章

  1. WannaCry的UWP版,哈哈哈
  2. AngularJS开发指南7:AngularJS本地化,国际化,以及兼容IE低版本浏览器
  3. Django之部署NGINX+uWSGI
  4. MIUI 10 Android 原生字体,[教程] MIUI10全局字体替换教程,了解一下?
  5. Selenium3 + Python3自动化测试系列——多窗口切换
  6. 热血江湖战无止境与服务器连接不稳定,《热血江湖》V14.0“战无止境”新版玩不停...
  7. 程序员面试金典 - 面试题 17.22. 单词转换(BFS)
  8. 理解Java集合框架里面的的transient关键字
  9. qt 线程接收线程 moveToThread 特性
  10. Python学习笔记(八)随机数的处理
  11. del退役了/del 滚回来了
  12. Diy页面服务端渲染解决方案
  13. samba服务器常用指令
  14. Java connot reduce_hadoop错误:org.apache.hadoop.mapreduce.lib.input.FileSplit cannot be cast t...
  15. MS发起的PDP上下文激活过程----PPP和PDP激活是什么区别
  16. Delphi开发工具DevExpress VCL全新发布v21.1.5
  17. ImageJ自动批量多通道图片无损分离为单色荧光图
  18. c语言如何多核运行程序,对于多线程程序,单核cpu和多核cpu如何工作?
  19. 红米4A全版本通刷_2016111 2016112_官方线刷包_救砖包_解账户锁
  20. 无线鼠标迟钝但并不是电量问题

热门文章

  1. html中亮度怎么写,HTML+CSS+JS模仿win10亮度调节效果的示例代码
  2. java导出excel与word文档
  3. 基于SVM算法的人脸微笑识别
  4. recon-ng详细使用教程
  5. Renesas_based_intro
  6. 3个字节转换为另外3个字节的简单加密算法
  7. Java打印折纸游戏
  8. 移动端跨平台技术总结
  9. 如何关闭华为手机连接电脑自动弹出cd驱动器和类似文件管理文件夹
  10. 【PC自动化测试-11】窗口控件的类型分类