简介

Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper” 给Curator予高度评价。
Curator的maven依赖: 一般直接使用curator-recipes就行了,如果需要自己封装一些底层些的功能的话,例如增加连接管理重试机制等,则可以引入curator-framework包

<!-- 对zookeeper的底层api的一些封装 --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>2.12.0</version></dependency><!-- 封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式Barrier --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.12.0</version></dependency>

基本API

创建会话

使用静态工程方法创建

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.128.129:2181",5000, 5000, retryPolicy);

其中RetryPolicy为重试策略,第一个参数为baseSleepTimeMs初始的sleep时间,用于计算之后的每次重试的sleep时间。第二个参数为maxRetries,最大重试次数。

使用Fluent风格api创建

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.128.129:2181").sessionTimeoutMs(5000)  // 会话超时时间.connectionTimeoutMs(5000) // 连接超时时间.retryPolicy(retryPolicy).namespace("base") // 包含隔离名称.build();client.start();

创建数据节点

client.create().creatingParentContainersIfNeeded() // 递归创建所需父节点.withMode(CreateMode.PERSISTENT) // 创建类型为持久节点.forPath("/nodeA", "init".getBytes()); // 目录及内容

删除数据节点

          client.delete() .guaranteed()  // 强制保证删除.deletingChildrenIfNeeded() // 递归删除子节点.withVersion(10086) // 指定删除的版本号.forPath("/nodeA");

读取数据节点

读数据,返回值为byte[]

byte[] bytes = client.getData().forPath("/nodeA"); System.out.println(new String(bytes));

读stat

Stat stat = new Stat();client.getData().storingStatIn(stat).forPath("/nodeA");

修改数据节点

client.setData().withVersion(10086) // 指定版本修改.forPath("/nodeA", "data".getBytes());

事务

client.inTransaction().check().forPath("/nodeA").and().create().withMode(CreateMode.EPHEMERAL).forPath("/nodeB", "init".getBytes()).and().create().withMode(CreateMode.EPHEMERAL).forPath("/nodeC", "init".getBytes()).and().commit();

其他

      client.checkExists() // 检查是否存在.forPath("/nodeA"); client.getChildren().forPath("/nodeA"); // 获取子节点的路径

异步回调

异步创建节点

Executor executor = Executors.newFixedThreadPool(2);client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground((curatorFramework, curatorEvent) -> {System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));},executor).forPath("path");

监听器

Curator提供了三种Watcher(Cache)来监听结点的变化:

  • Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。产生的事件会传递给注册的PathChildrenCacheListener。
  • Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。
  • Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。
package com.cdai.codebase.bigdata.hadoop.zookeeper.curator;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.retry.RetryNTimes;/*** Curator framework watch test.*/
public class CuratorWatcherTest {/** Zookeeper info */private static final String ZK_ADDRESS = "192.168.1.100:2181";private static final String ZK_PATH = "/zktest";public static void main(String[] args) throws Exception {// 1.Connect to zkCuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS,new RetryNTimes(10, 5000));client.start();System.out.println("zk client start successfully!");// 2.Register watcherPathChildrenCache watcher = new PathChildrenCache(client,ZK_PATH,true    // if cache data);watcher.getListenable().addListener((client1, event) -> {ChildData data = event.getData();if (data == null) {System.out.println("No data in event[" + event + "]");} else {System.out.println("Receive event: "+ "type=[" + event.getType() + "]"+ ", path=[" + data.getPath() + "]"+ ", data=[" + new String(data.getData()) + "]"+ ", stat=[" + data.getStat() + "]");}});watcher.start(StartMode.BUILD_INITIAL_CACHE);System.out.println("Register zk watcher successfully!");Thread.sleep(Integer.MAX_VALUE);}}

Curator“菜谱”

既然Maven包叫做curator-recipes,那说明Curator有它独特的“菜谱”:

  • :包括共享锁、共享可重入锁、读写锁等。
  • 选举:Leader选举算法。
  • Barrier:阻止分布式计算直至某个条件被满足的“栅栏”,可以看做JDK Concurrent包中Barrier的分布式实现。
  • 缓存:前面提到过的三种Cache及监听机制。
  • 持久化结点:连接或Session终止后仍然在Zookeeper中存在的结点。
  • 队列:分布式队列、分布式优先级队列等。

分布式锁

分布式编程时,比如最容易碰到的情况就是应用程序在线上多机部署,于是当多个应用同时访问某一资源时,就需要某种机制去协调它们。例如,现在一台应用正在rebuild缓存内容,要临时锁住某个区域暂时不让访问;又比如调度程序每次只想一个任务被一台应用执行等等。

下面的程序会启动两个线程t1和t2去争夺锁,拿到锁的线程会占用5秒。运行多次可以观察到,有时是t1先拿到锁而t2等待,有时又会反过来。Curator会用我们提供的lock路径的结点作为全局锁,这个结点的数据类似这种格式:[_c_64e0811f-9475-44ca-aa36-c1db65ae5350-lock-0000000005],每次获得锁时会生成这种串,释放锁时清空数据。

package com.cdai.codebase.bigdata.hadoop.zookeeper.curator;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;import java.util.concurrent.TimeUnit;/*** Curator framework's distributed lock test.*/
public class CuratorDistrLockTest {/** Zookeeper info */private static final String ZK_ADDRESS = "192.168.1.100:2181";private static final String ZK_LOCK_PATH = "/zktest";public static void main(String[] args) throws InterruptedException {// 1.Connect to zkCuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS,new RetryNTimes(10, 5000));client.start();System.out.println("zk client start successfully!");Thread t1 = new Thread(() -> {doWithLock(client);}, "t1");Thread t2 = new Thread(() -> {doWithLock(client);}, "t2");t1.start();t2.start();}private static void doWithLock(CuratorFramework client) {InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);try {if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {System.out.println(Thread.currentThread().getName() + " hold lock");Thread.sleep(5000L);System.out.println(Thread.currentThread().getName() + " release lock");}} catch (Exception e) {e.printStackTrace();} finally {try {lock.release();} catch (Exception e) {e.printStackTrace();}}}}

Leader选举

当集群里的某个服务down机时,我们可能要从slave结点里选出一个作为新的master,这时就需要一套能在分布式环境中自动协调的Leader选举方法。Curator提供了LeaderSelector监听器实现Leader选举功能。同一时刻,只有一个Listener会进入takeLeadership()方法,说明它是当前的Leader。注意:当Listener从takeLeadership()退出时就说明它放弃了“Leader身份”,这时Curator会利用Zookeeper再从剩余的Listener中选出一个新的Leader。autoRequeue()方法使放弃Leadership的Listener有机会重新获得Leadership,如果不设置的话放弃了的Listener是不会再变成Leader的。

package com.cdai.codebase.bigdata.hadoop.zookeeper.curator;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.EnsurePath;/*** Curator framework's leader election test.* Output:*  LeaderSelector-2 take leadership!*  LeaderSelector-2 relinquish leadership!*  LeaderSelector-1 take leadership!*  LeaderSelector-1 relinquish leadership!*  LeaderSelector-0 take leadership!*  LeaderSelector-0 relinquish leadership! *      ...*/
public class CuratorLeaderTest {/** Zookeeper info */private static final String ZK_ADDRESS = "192.168.1.100:2181";private static final String ZK_PATH = "/zktest";public static void main(String[] args) throws InterruptedException {LeaderSelectorListener listener = new LeaderSelectorListener() {@Overridepublic void takeLeadership(CuratorFramework client) throws Exception {System.out.println(Thread.currentThread().getName() + " take leadership!");// takeLeadership() method should only return when leadership is being relinquished.Thread.sleep(5000L);System.out.println(Thread.currentThread().getName() + " relinquish leadership!");}@Overridepublic void stateChanged(CuratorFramework client, ConnectionState state) {}};new Thread(() -> {registerListener(listener);}).start();new Thread(() -> {registerListener(listener);}).start();new Thread(() -> {registerListener(listener);}).start();Thread.sleep(Integer.MAX_VALUE);}private static void registerListener(LeaderSelectorListener listener) {// 1.Connect to zkCuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS,new RetryNTimes(10, 5000));client.start();// 2.Ensure pathtry {new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient());} catch (Exception e) {e.printStackTrace();}// 3.Register listenerLeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener);selector.autoRequeue();selector.start();}}

20200509 Curator入门相关推荐

  1. Apache Curator实战

    Apache Curator入门实战 Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeep ...

  2. 2.ZooKeeper客户端Curator「第三章 ZooKeeper Java客户端」「架构之路ZooKeeper理论和实战」

    前言 上一篇文章 介绍了zookeeper原生API的使用,使用过原生API不得不说,有很多的问题,比如:不能递归创建和删除节点.Watcher只能使用一次.还有很多可以解决分布式应用问题的api(比 ...

  3. 豌豆夹Redis解决方案Codis源码剖析:Proxy代理

    豌豆夹Redis解决方案Codis源码剖析:Proxy代理 1.预备知识 1.1 Codis Codis就不详细说了,摘抄一下GitHub上的一些项目描述: Codis is a proxy base ...

  4. 豌豆夹Redis解决方式Codis源代码剖析:Proxy代理

    豌豆夹Redis解决方式Codis源代码剖析:Proxy代理 1.预备知识 1.1 Codis Codis就不详细说了,摘抄一下GitHub上的一些项目描写叙述: Codis is a proxy b ...

  5. 【Elasticsearch】Curator 从入门到实战

    1.概述 转载:Curator 从入门到实战 Curator 是elasticsearch 官方的一个索引管理工具,可以通过配置文件的方式帮助我们对指定的一批索引进行创建/删除.打开/关闭.快照/恢复 ...

  6. 【弄nèng - Zookeeper】Zookeeper入门教程(三)—— 客户端Curator的基本API使用(Curator framework)

    文章目录 1. Curator简介 2. Curator framework 3. Curator recipes 4. 基本Api 4.1 创建会话 4.1.1 使用静态工程方法创建 4.1.2 使 ...

  7. 【菜鸟教程】Zookeeper基础入门(使用curator)【下】

    curator curator是Netflix开源的一个zookeeper客户端,后来捐给apache.curator框架在zookeeper原生API接口上进行了包装,解决了很多zookeeper客 ...

  8. Zookeeper的快速入门(Curator)

    ZooKeeper 一.初始ZooKeeper Zookeeper 是 Apache的一个项目,并且是一个树形目录服务,简称zk. Zookeeper 是一个分布式的.开源的分布式应用程序的协调服务. ...

  9. 【Zookeeper实战】Zookeeper入门到实战看这篇就够了

    1. 前言 在上一篇[Zookeeper入门]相关概念总结 中已经完美的讲解了 Zookeeper入门 相关概念总结,接下来讲讲ZooKeeper 实战使用. 这篇文章简单给演示一下 ZooKeepe ...

最新文章

  1. PLOS Biology: 发现一种固氮玉米
  2. 互联网+时代,SAP助力跨境电商全面升级研讨会圆满落幕
  3. 数据结构实验之求二叉树后序遍历和层次遍历
  4. 【数据结构与算法】之深入解析“最小栈”的求解思路与算法示例
  5. 自从我这样撸代码以后,公司网页的浏览量提高了107%!
  6. (40)Xilinx PLL IP核配置(一)(第8天)
  7. segger公司调试cortex-m内核出现hardfault的方法
  8. QQ坦白说解密解决方案(二)
  9. ADS-B放大器KU1090
  10. javascript设置网页快捷键
  11. java定义枚举并赋值_java中枚举的特性是什么?如何赋值?
  12. aspx如何获取aspx.cs中定义的变量、方法;
  13. 《宝塔面板教程5》:如何上传网站程序安装自己的网站
  14. python关于类的通俗描述?
  15. 解决No “rule to make target `../skin_test.qss', needed by `debug/qrc_resource.cpp'. Stop.”
  16. 圆桌会回顾 | SecureBoost:挑战千万级别样本训练之性能提升篇
  17. 手机谷歌 访问_更新谷歌正式发布安卓10正式版和windows版
  18. 用正交变换将二次型化为标准形
  19. chrome在新标签页打开_如何使用Google Chrome在计算机之间同步打开的标签页
  20. 行式和列式存储说明以及OLAP特点介绍

热门文章

  1. 会员管理系统,建议收藏!
  2. 小内存堆管理算法详细解析
  3. がいねんとれいさいのにちじょう
  4. 如何开通个人微信公众号(订阅号)
  5. 【实例学模式】一针见血装饰器模式
  6. Android 播放视频
  7. 当他不再爱你的时候!
  8. JAVA爬需要账号登录的网_如何用 Python 爬取需要登录的网站?
  9. mysql与redis数据同步(c/c++)(写mysql同步到redis,并且以json格式保存)
  10. 教育与人生:教师节有感