1. 为了更好的实现Java操作ZooKeeper服务器, 后来出现了非常强大的Curator框架, 目前是Apache的顶级项目。里面提供了更多丰富的操作, 例如Session超时重连、主从选举、分布式计数器、分布式锁等等适用于各种复杂的ZooKeeper场景的API封装。

2. Curator包含了几个包

2.1. curator-framework: 对ZooKeeper的底层API的一些封装。

2.2. curator-client: 提供一些客户端的操作, 例如重试策略等。

2.3. curator-recipes: 封装了一些高级特性, 如: Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。

3. Curator框架中使用链式编程风格, 易读性更强, 使用工厂方法创建连接对象。可以使用CuratorFrameworkFactory工厂创建连接, 参数1: connectString连接字符串; 参数2: retryPolicy重试连接策略; 参数3: sessionTimeoutMs会话超时时间, 默认为60 000ms; 参数4: connectionTimeoutMs连接超时时间, 默认是15 000ms。

4. 创建节点例子

4.1. 新建一个名为Curator的Java项目, 拷入相关jar包

4.2. 创建节点代码

package com.fj.zkcurator;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;public class Create {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static void main(String[] args) {// 1. 重试策略, 初试时间为1s, 最多可重试10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通过工厂创建连接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 开启连接cf.start();// 4. 创建节点try {// 4.1. creatingParentsIfNeeded()可以递归创建节点// 4.2. withMode(CreateMode.PERSISTENT)创建持久化节点String result = cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/root/child", "I am root first child.".getBytes());System.out.println("创建结果: " + result);} catch (Exception e1) {e1.printStackTrace();}// 5. 关闭连接cf.close();}
}

4.3. 运行结果

5. 异步创建节点例子

5.1. 异步创建节点

package com.fj.zkcurator;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;public class CreateInBackground {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static final CountDownLatch cdl = new CountDownLatch(1);public static void main(String[] args) {// 1. 重试策略, 初试时间为1s, 最多可重试10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通过工厂创建连接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 开启连接cf.start();// 4. 创建节点// 4.1. 线程池ExecutorService pool = Executors.newCachedThreadPool();try {// 4.2. creatingParentsIfNeeded()可以递归创建节点// 4.3. withMode(CreateMode.PERSISTENT)创建持久化节点// 4.4. inBackground异步后台创建节点cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {// 异步执行完成, 发送信号量, 让后续阻塞程序能够继续向下执行cdl.countDown();System.out.println("resultCode: " + ce.getResultCode());System.out.println("type: " + ce.getType());}}, pool).forPath("/root/child01", "I am root first child.".getBytes());// 进行阻塞cdl.await();} catch (Exception e1) {e1.printStackTrace();}// 5. 关闭连接cf.close();}
}

5.2. 运行结果

6. 获取子节点例子

6.1. 获取子节点

package com.fj.zkcurator;import java.util.List;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;public class GetChildren {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static void main(String[] args) {// 1. 重试策略, 初试时间为1s, 最多可重试10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通过工厂创建连接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 开启连接cf.start();// 4. 获取某路径的子节点try {List<String> children = cf.getChildren().forPath("/root");for (String child : children) {System.out.println(child);}} catch (Exception e) {e.printStackTrace();}// 5. 关闭连接cf.close();}
}

6.2. 运行结果

7. 获取和设置值例子

7.1. 获取和设置值

package com.fj.zkcurator;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;public class GetSetData {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static void main(String[] args) {// 1. 重试策略, 初试时间为1s, 最多可重试10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通过工厂创建连接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 开启连接cf.start();// 4. 设置和获取节点数据try {// 4.1. 设置节点数据byte[] data = cf.getData().forPath("/root/child01");System.out.println(new String(data));// 4.2. 设置节点数据, 返回属性信息Stat stat = cf.setData().forPath("/root/child01", "I am root first child. modify 006".getBytes());System.out.println("czxid: " + stat.getCzxid() + ", ctime: " + stat.getCtime() + ", cversion: " + stat.getCversion());System.out.println("mzxid: " + stat.getMzxid() + ", mtime: " + stat.getMtime() + ", pzxid: " + stat.getPzxid());System.out.println("version: " + stat.getVersion() + ", dataLength: " + stat.getDataLength() + ", aversion: " + stat.getAversion());System.out.println("numChildren: "+ stat.getNumChildren() + ", ephemeralOwner: " + stat.getEphemeralOwner());} catch (Exception e1) {e1.printStackTrace();}// 5. 关闭连接cf.close();}
}

7.2. 运行结果

8. 检查节点是否存在和删除节点例子

8.1. 检查节点是否存在和删除节点

package com.fj.zkcurator;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;public class CheckExistsDelete {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static void main(String[] args) {// 1. 重试策略, 初试时间为1s, 最多可重试10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通过工厂创建连接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 开启连接cf.start();// 4. 删除节点try {// 4.1. 检查节点是否存在Stat stat = cf.checkExists().forPath("/root/child01");if(stat == null) {System.out.println("/root/child01不存在.");}if(stat != null) {System.out.println("开始删除/root/child01节点.");// 4.2. deletingChildrenIfNeeded()递归删除节点cf.delete().deletingChildrenIfNeeded().forPath("/root/child01");if(cf.checkExists().forPath("/root/child01") == null) {System.out.println("/root/child01节点删除成功.");}}} catch (Exception e) {e.printStackTrace();}// 5. 关闭连接cf.close();}
}

8.2. 运行结果

9. 我们使用NodeCache的方式去客户端实例中注册一个监听缓存, 然后实现对应的监听方法, 监听方式CuratorCacheListener。

10. 监听缓存例子

10.1. 监听缓存

package com.fj.zkcurator;import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.CuratorCacheListenerBuilder.ChangeListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;public class CuratorCacheCuratorCacheListener {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 15 * 1000;public static final String PATH = "/curatorCache";public static final ThreadLocalRandom random = ThreadLocalRandom.current();public static void main(String[] args) {// 1. 重试策略, 初试时间为1s, 最多可重试10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通过工厂创建连接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 开启连接cf.start();// 4. 创建CuratorCacheListenerCuratorCacheListener listener = CuratorCacheListener.builder().forCreates(new Consumer<ChildData>() {@Overridepublic void accept(ChildData node) {System.out.println(String.format("%s created. value: [%s]", node.getPath(), new String(node.getData())));}}).forChanges(new ChangeListener() {@Overridepublic void event(ChildData oldNode, ChildData node) {System.out.println(String.format("%s changed. oldValue: [%s] newValue: [%s]", node.getPath(),new String(oldNode.getData()), new String(node.getData())));}}).forDeletes(new Consumer<ChildData>() {@Overridepublic void accept(ChildData oldNode) {System.out.println(String.format("%s deleted. oldValue: [%s]", oldNode.getPath(), new String(oldNode.getData())));}}).forInitialized(new Runnable() {@Overridepublic void run() {System.out.println("\r\nCache initialized.");}}).build();// 5. 创建CuratorCacheCuratorCache cache = CuratorCache.build(cf, PATH);// 6. 注册监听cache.listenable().addListener(listener);// 7. 启动cachecache.start();// 8. 创建、删除和设置节点try {for (int i = 0; i < 10; ++i) {int depth = random.nextInt(1, 3);String path = makeRandomPath(random, depth);System.out.println("\r\npath = " + path);if(nodeExist(cf, path)) {if(random.nextBoolean()) {cf.setData().forPath(path, UUID.randomUUID().toString().getBytes());} else {cf.delete().deletingChildrenIfNeeded().forPath(path);}} else {String result = cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, UUID.randomUUID().toString().getBytes());System.out.println("result = " + result);}Thread.sleep(5000);}} catch (Exception e) {e.printStackTrace();}// 9. 关闭连接cf.close();}private static boolean nodeExist(CuratorFramework cf, String path) throws Exception {return cf.checkExists().forPath(path) != null ? true : false;}private static String makeRandomPath(ThreadLocalRandom random, int depth) {if (depth == 0) {return PATH;}return makeRandomPath(random, depth - 1) + "/" + random.nextInt(3);}
}

10.2. 运行结果

11. 重复注册例子

11.1. 监听

package com.fj.zkcurator.scene;import java.util.function.Consumer;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.framework.recipes.cache.CuratorCacheListenerBuilder.ChangeListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;public class CacheWather {private static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";private static final int sessionTimeoutMs = 10 * 60 * 1000;private static final int connectionTimeoutMs = 15 * 1000;private static final String PATH = "/superCuratorCache";private CuratorFramework cf = null;public CacheWather() {// 1. 重试策略, 初试时间为1s, 最多可重试10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通过工厂创建连接cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 开启连接cf.start();// 4. 创建CuratorCacheListenerCuratorCacheListener listener = CuratorCacheListener.builder().forCreates(new Consumer<ChildData>() {@Overridepublic void accept(ChildData node) {System.out.println(String.format("%s created. value: [%s]", node.getPath(), new String(node.getData())));}}).forChanges(new ChangeListener() {@Overridepublic void event(ChildData oldNode, ChildData node) {System.out.println(String.format("%s changed. oldValue: [%s] newValue: [%s]", node.getPath(),new String(oldNode.getData()), new String(node.getData())));}}).forDeletes(new Consumer<ChildData>() {@Overridepublic void accept(ChildData oldNode) {System.out.println(String.format("%s deleted. oldValue: [%s]", oldNode.getPath(), new String(oldNode.getData())));}}).forInitialized(new Runnable() {@Overridepublic void run() {System.out.println("\r\nCache initialized.");}}).build();// 5. 创建CuratorCacheCuratorCache cache = CuratorCache.build(cf, PATH);// 6. 注册监听cache.listenable().addListener(listener);// 7. 启动cachecache.start();// 8. 如果缓存节点不存在就创建try {if(cf.checkExists().forPath(PATH) == null) {cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(PATH, "init".getBytes());}} catch (Exception e) {e.printStackTrace();}}public void close() {if(cf != null) {cf.close();}}
}

11.2. UserApp

package com.fj.zkcurator.scene;public class UserApp {public static void main(String[] args) {CacheWather ca = new CacheWather();System.out.println("UserApp启动成功...");try {Thread.sleep(5*60*1000);} catch (InterruptedException e) {e.printStackTrace();}ca.close();}
}

11.3. PayApp

package com.fj.zkcurator.scene;public class PayApp {public static void main(String[] args) {CacheWather ca = new CacheWather();System.out.println("PayApp启动成功...");try {Thread.sleep(5*60*1000);} catch (InterruptedException e) {e.printStackTrace();}ca.close();}
}

11.4. OperationNode

package com.fj.zkcurator.scene;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;public class OperationNode {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static void main(String[] args) {// 1. 重试策略, 初试时间为1s, 最多可重试10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通过工厂创建连接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 开启连接cf.start();// 4. 节点操作try {String r1 = cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/superCuratorCache/child01", "child01.".getBytes());System.out.println("创建结果: " + r1);Thread.sleep(1000);String r2 = cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/superCuratorCache/child02", "child02.".getBytes());System.out.println("创建结果: " + r2);Thread.sleep(1000);cf.delete().deletingChildrenIfNeeded().forPath("/superCuratorCache/child02");Thread.sleep(1000);Stat stat = cf.setData().forPath("/superCuratorCache/child01", "modify child01 data.".getBytes());System.out.println("czxid: " + stat.getCzxid() + ", ctime: " + stat.getCtime() + ", cversion: " + stat.getCversion());System.out.println("mzxid: " + stat.getMzxid() + ", mtime: " + stat.getMtime() + ", pzxid: " + stat.getPzxid());System.out.println("version: " + stat.getVersion() + ", dataLength: " + stat.getDataLength() + ", aversion: " + stat.getAversion());System.out.println("numChildren: "+ stat.getNumChildren() + ", ephemeralOwner: " + stat.getEphemeralOwner());} catch (Exception e1) {e1.printStackTrace();}// 5. 关闭连接cf.close();}
}

11.5. 运行UserApp

11.6. 运行PayApp

11.7. 运行OperationNode

11.8. UserApp监听到节点变化

11.9. PayApp监听到节点变化

11.10. 再次运行UserApp

11.11. 再次运行PayApp

006_Curator框架一相关推荐

  1. ssh(Struts+spring+Hibernate)三大框架整合-简述

    ssh(Struts+spring+Hibernate)三大框架配合使用来开发项目,是目前javaee最流行的开发方式,必须掌握: 注意: 为了稳健起见,每加入一个框架,我们就需要测试一下,必须通过才 ...

  2. Gin 框架学习笔记(03)— 输出响应与渲染

    在 Gin 框架中,对 HTTP 请求可以很方便有多种不同形式的响应.比如响应为 JSON . XML 或者是 HTML 等. ​ Context 的以下方法在 Gin 框架中把内容序列化为不同类型写 ...

  3. Gin 框架学习笔记(02)— 参数自动绑定到结构体

    参数绑定模型可以将请求体自动绑定到结构体中,目前支持绑定的请求类型有 JSON .XML .YAML 和标准表单 form数据 foo=bar&boo=baz 等.换句话说,只要定义好结构体, ...

  4. QT学习之状态机框架

    状态机框架 创建状态机

  5. 【Spring】框架简介

    [Spring]框架简介 Spring是什么 Spring是分层的Java SE/EE应用full-stack轻量级开源框架,以IOC(Inverse Of Control:反转控制)和AOP(Asp ...

  6. 开源自动化机器学习框架

    20211101 在 Airbnb 使用机器学习预测房源的价格 https://blog.csdn.net/weixin_33735077/article/details/87976278?spm=1 ...

  7. Keras框架下的保存模型和加载模型

    在Keras框架下训练深度学习模型时,一般思路是在训练环境下训练出模型,然后拿训练好的模型(即保存模型相应信息的文件)到生产环境下去部署.在训练过程中我们可能会遇到以下情况: 需要运行很长时间的程序在 ...

  8. Adam那么棒,为什么还对SGD念念不忘 (1) —— 一个框架看懂优化算法

    机器学习界有一群炼丹师,他们每天的日常是: 拿来药材(数据),架起八卦炉(模型),点着六味真火(优化算法),就摇着蒲扇等着丹药出炉了. 不过,当过厨子的都知道,同样的食材,同样的菜谱,但火候不一样了, ...

  9. 一个框架看懂优化算法之异同 SGD/AdaGrad/Adam

    Adam那么棒,为什么还对SGD念念不忘 (1) -- 一个框架看懂优化算法 机器学习界有一群炼丹师,他们每天的日常是: 拿来药材(数据),架起八卦炉(模型),点着六味真火(优化算法),就摇着蒲扇等着 ...

最新文章

  1. Servlet一次乱码排查后的总结
  2. 解读SAP Hybris为何获国内B2B用户青睐?
  3. mysql pdm_mysql 生成pdm
  4. JMW-Label标签设计打印源码
  5. 微信小程序之自定义模态弹窗(带动画)实例
  6. 禁用sslv3协议linux,SSLv3协议漏洞修复方法
  7. 【线段树】 SPOJ 2713 Can you answer these queries IV
  8. GO超详细基础语法黑点
  9. 小甲鱼c语言版:八皇后问题解决思路
  10. 网易回应暴力裁员事件并道歉了!连发两声明:当事人绩效确不合格...刘强东说了这句话,意外上热搜...
  11. Unity进阶之ET网络游戏开发框架 01-下载、运行
  12. html li 点图片,html中ul li前面小黑点样式 ul li一些样式
  13. 工程项目成本费用明细表_这么简单?成本费用明细表这么填就对了
  14. HTML的表格边框的合并
  15. excel打开密码忘记了_行李箱密码忘记了怎么办?教你3招轻松打开
  16. mac之间快速传递文件-from-jianshu-狂奔的胖蜗牛
  17. 创蓝253云通讯 paas 短信发送接口和定义说明
  18. 关系数据库-关系代数-数据库习题
  19. android apk可安装成功但无法运行提示dex文件异常
  20. Windows自启动方式完全总结

热门文章

  1. Test2 unit2
  2. 第七阶段 jsp(369---el---jstl)
  3. hdu 5199 Gunner(STL之map,水)
  4. 问题清空easyui required=true的提示信息所在位置不对。乱跑的解决办法
  5. 设置上传文件的最大大小
  6. spring boot 批量更新数据
  7. WPF WindowStyle为None
  8. 实训项目1-熟练使用VMware安装Windows server 2012
  9. c++入门之 再话类
  10. django 的form登录 注册