1、创建会话

1.1 创建会话的描述

1.2 实现

public class CuratorBase {private CuratorFramework client = null;public CuratorBase() {}/*** 连接服务端* * @param addr* @param sessionOuttime*/public void conn(String addr, int sessionOuttime) {// 1 重试策略:初试时间为1s 重试10次RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2 通过工厂创建连接client = CuratorFrameworkFactory.builder().connectString(addr).sessionTimeoutMs(sessionOuttime).retryPolicy(retryPolicy).build();// 3 开启连接client.start();}
}

2、创建节点

2.1 创建节点的描述

2.2 实现

/*** 节点的创建,支持递归创建节点* * @param path*            节点的路径* @param data*            节点的数据*/public void createNode(String path, byte[] data) {try {client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(Ids.OPEN_ACL_UNSAFE).forPath(path, data);} catch (Exception e) {e.printStackTrace();}}

3、删除节点

3.1 删除节点的描述

3.2 实现

/*** 删除节点* * @param path* @param version*/public void deleteNode(String path, int version) {try {client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(version).forPath(path);} catch (Exception e) {e.printStackTrace();}}

4、 关于异步操作

4.1 异步操作的描述

4.2 删除节点的异步实现

/*** 带有回调函数的删除节点* * @param path* @param version*/public void deleteNodeInBack(String path, int version) {try {client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(version).inBackground(new DeleteCallBack()).forPath(path);} catch (Exception e) {e.printStackTrace();}}

5、读取数据

6、更新数据

7、读取子节点

8、设置watcher

8.1 关于watcher的概述

8.2 NodeCache

/*** 增加节点的Watcher,用于监听节点信息的变化* * @param path*/public void addNodeDataWatcher(String path) {try {final NodeCache nodeCache = new NodeCache(client, path);nodeCache.start(true);nodeCache.getListenable().addListener(new NodeCacheListener() {public void nodeChanged() throws Exception {System.out.println("=========NodeCache========");String data = new String(nodeCache.getCurrentData().getData());System.out.println("path="+ nodeCache.getCurrentData().getPath() + ":data="+ data);System.out.println("=========NodeCache========");}});} catch (Exception e) {e.printStackTrace();}}

8.3 PathChildrenCache

/*** 增加孩子节点的监听,用于监听孩子节点的信息变化* * @param path* @throws Exception*/public void addChildWatcher(String path) throws Exception {final PathChildrenCache cache = new PathChildrenCache(this.client,path, true);// 在初始化的时候就进行缓存监听cache.start(StartMode.POST_INITIALIZED_EVENT);System.out.println("=========PathChildrenCache========");System.out.println("刚开始的缓存数据" + cache.getCurrentData().size());cache.getListenable().addListener(new PathChildrenCacheListener() {public void childEvent(CuratorFramework client,PathChildrenCacheEvent event) throws Exception {switch (event.getType()) {case INITIALIZED:System.out.println("客户端子节点cache初始化数据完成");System.out.println("size=" + cache.getCurrentData().size());break;case CHILD_ADDED:System.out.println("添加子节点的路径:" + event.getData().getPath());System.out.println("添加子节点的数据:"+ new String(event.getData().getData()));break;case CHILD_UPDATED:System.out.println("修改子节点路径:" + event.getData().getPath());System.out.println("修改子节点数据:"+ new String(event.getData().getData()));break;case CHILD_REMOVED:System.out.println("删除子节点的路径:" + event.getData().getPath());System.out.println("删除子节点的数据:"+ new String(event.getData().getData()));break;default:break;}System.out.println("=========PathChildrenCache========");}});}

9、用到的类

9.1 CuratorBase.java

package com.curator.base;import java.util.ArrayList;
import java.util.List;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;public class CuratorBase {private CuratorFramework client = null;public CuratorBase() {}/*** 连接服务端* * @param addr* @param sessionOuttime*/public void conn(String addr, int sessionOuttime) {// 1 重试策略:初试时间为1s 重试10次RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2 通过工厂创建连接client = CuratorFrameworkFactory.builder().connectString(addr).sessionTimeoutMs(sessionOuttime).retryPolicy(retryPolicy).build();// 3 开启连接client.start();}/*** 关闭客户端*/public void closeClient() {if (client != null)this.client.close();}/*** 节点的创建,支持递归创建节点* * @param path*            节点的路径* @param data*            节点的数据*/public void createNode(String path, byte[] data) {try {client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(Ids.OPEN_ACL_UNSAFE).forPath(path, data);} catch (Exception e) {e.printStackTrace();}}/*** 删除节点* * @param path* @param version*/public void deleteNode(String path, int version) {try {client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(version).forPath(path);} catch (Exception e) {e.printStackTrace();}}/*** 带有回调函数的删除节点* * @param path* @param version*/public void deleteNodeInBack(String path, int version) {try {client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(version).inBackground(new DeleteCallBack()).forPath(path);} catch (Exception e) {e.printStackTrace();}}/*** 进行数据信息的读取* * @param path*            节点的路径* @param stat*            数据的状态信息* @return 数据的字节数组格式*/public byte[] readNode(String path, Stat stat) {byte[] data = new byte[10];try {data = client.getData().storingStatIn(stat).forPath(path);} catch (Exception e) {e.printStackTrace();}return data;}/*** 节点信息的更新* * @param path* @param data* @param version*/public void updateNode(String path, byte[] data, int version) {try {client.setData().withVersion(version).forPath(path, data);} catch (Exception e) {e.printStackTrace();}}/*** 子节点的读取* * @param path* @return*/public List<String> getChildren(String path) {List<String> childrens = new ArrayList<String>();try {childrens = client.getChildren().forPath(path);} catch (Exception e) {e.printStackTrace();}return childrens;}/*** 增加节点的Watcher,用于监听节点信息的变化* * @param path*/public void addNodeDataWatcher(String path) {try {final NodeCache nodeCache = new NodeCache(client, path);nodeCache.start(true);nodeCache.getListenable().addListener(new NodeCacheListener() {public void nodeChanged() throws Exception {System.out.println("=========NodeCache========");String data = new String(nodeCache.getCurrentData().getData());System.out.println("path="+ nodeCache.getCurrentData().getPath() + ":data="+ data);System.out.println("=========NodeCache========");}});} catch (Exception e) {e.printStackTrace();}}/*** 增加孩子节点的监听,用于监听孩子节点的信息变化* * @param path* @throws Exception*/public void addChildWatcher(String path) throws Exception {final PathChildrenCache cache = new PathChildrenCache(this.client,path, true);// 在初始化的时候就进行缓存监听cache.start(StartMode.POST_INITIALIZED_EVENT);System.out.println("=========PathChildrenCache========");System.out.println("刚开始的缓存数据" + cache.getCurrentData().size());cache.getListenable().addListener(new PathChildrenCacheListener() {public void childEvent(CuratorFramework client,PathChildrenCacheEvent event) throws Exception {switch (event.getType()) {case INITIALIZED:System.out.println("客户端子节点cache初始化数据完成");System.out.println("size=" + cache.getCurrentData().size());break;case CHILD_ADDED:System.out.println("添加子节点的路径:" + event.getData().getPath());System.out.println("添加子节点的数据:"+ new String(event.getData().getData()));break;case CHILD_UPDATED:System.out.println("修改子节点路径:" + event.getData().getPath());System.out.println("修改子节点数据:"+ new String(event.getData().getData()));break;case CHILD_REMOVED:System.out.println("删除子节点的路径:" + event.getData().getPath());System.out.println("删除子节点的数据:"+ new String(event.getData().getData()));break;default:break;}System.out.println("=========PathChildrenCache========");}});}public static void main(String[] args) throws Exception {final String CONNECT_ADDR = "192.168.2.101:2181";final int SESSION_OUTTIME = 5000;// msfinal String PARENT_PATH = "/testWatch";final String CHILDREN_PATH_1 = "/testWatch/children1";final String CHILDREN_PATH_2 = "/testWatch/children2";CuratorBase curatorBase = null;try {curatorBase = new CuratorBase();// 1.连接服务端curatorBase.conn(CONNECT_ADDR, SESSION_OUTTIME);// 1.1 PathChildrenCachecuratorBase.createNode(PARENT_PATH, "parent".getBytes());curatorBase.addChildWatcher(PARENT_PATH);// 2.节点的创建(支持递归的创建)// 2.1 NodeCachecuratorBase.addNodeDataWatcher(CHILDREN_PATH_1);curatorBase.createNode(CHILDREN_PATH_1, "children1".getBytes());curatorBase.createNode(CHILDREN_PATH_2, "children2".getBytes());Thread.sleep(2000);// 3.获取节点的数据// Stat stat = new Stat();// byte[] readNode = curatorBase.readNode(CHILDREN_PATH_1, stat);// System.out.println(new String(readNode));// System.out.println(stat);// 4.获取孩子节点信息List<String> childrens = curatorBase.getChildren(PARENT_PATH);System.out.println("子节点的列表为" + childrens);// 5.更新节点信息curatorBase.updateNode(CHILDREN_PATH_1, "haha".getBytes(), -1);// 6.删除节点curatorBase.deleteNode(CHILDREN_PATH_2, -1);Thread.sleep(5000000);// 删除节点信息curatorBase.deleteNode(PARENT_PATH, -1);} catch (Exception e) {e.printStackTrace();} finally {curatorBase.closeClient();}}
}

9.2  DeleteCallBack.java

package com.curator.base;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;/*** * 可以获取到,节点的路径,数据。以及事件的类型*/
public class DeleteCallBack implements BackgroundCallback {public void processResult(CuratorFramework client, CuratorEvent event)throws Exception {System.out.println(event.getPath() + ",data=" + event.getData());System.out.println("event type=" + event.getType());System.out.println("event code=" + event.getResultCode());}}

10、代码下载

Curator的使用相关推荐

  1. Curator Cache

    1.Curator Cache 与原生ZooKeeper Wacher区别 原生的ZooKeeper Wacher是一次性的:一个Wacher一旦触发就会被移出,如果你想要反复使用Wacher,就要在 ...

  2. 十六、curator recipes之DistributedIdQueue

    简介 curator实现了一种分布式ID队列,也是遵循FIFO原则,比普通队列新增的一个点是ID队列可以根据ID对队列元素进行操作,比如移除该元素. 官方文档:http://curator.apach ...

  3. Zookeeper开源客户端框架Curator的简单使用

    为什么80%的码农都做不了架构师?>>>    Curator最初由Netflix的Jordan Zimmerman开发, Curator提供了一套Java类库, 可以更容易的使用Z ...

  4. Curator counters

    2019独角兽企业重金招聘Python工程师标准>>> 这个比较好理解,分布式数字,类似AtomicInteger系列,Curator有2个实现: 第一个: package cura ...

  5. java curator_[java,zk]在 linux 上快速搭建 zookeeper curator 开发环境

    在这篇博客中简单介绍一下,如何快速的在 linux 操作系统上搭建使用 zookeeper 客户端 curator 编程的单机环境. 在前几篇博客中,介绍的是使用 zookeeper 原生提供的 AP ...

  6. Curator: ZooKeeper的使用配方

    Curator: ZooKeeper的使用配方 ZooKeeper 作为分布式的存储方式, 有很多种使用场景, 把典型的使用场景提取出来, 成为"配方", 方便用户参考. Cura ...

  7. 【ZK-curator使用异常】KeeperErrorCode = Unimplemented for /***

    [ZK-curator使用异常]KeeperErrorCode = Unimplemented for /*** 参考文章: (1)[ZK-curator使用异常]KeeperErrorCode = ...

  8. 五、curator recipes之选举主节点Leader Latch

    简介 在分布式计算中,主节点选举是为了把某个进程作为主节点来控制其它节点的过程.在选举结束之前,我们不知道哪个节点会成为主节点.curator对于主节点选举有两种实现方式,本文示例演示Latch的实现 ...

  9. 聊聊、Zookeeper 客户端 Curator

    [Curator]   和 ZkClient 一样,Curator 也是开源客户端,Curator 是 Netflix 公司开源的一套框架. <dependency><groupId ...

  10. [Curator] Path Cache 的使用与分析

    为什么80%的码农都做不了架构师?>>>    Path Cache Path Cache其实就是用于对zk节点的监听.不论是子节点的新增.更新或者移除的时候,Path Cache都 ...

最新文章

  1. C#和Unity游戏开发者大师班2021 (2D,3D和FPS)
  2. (一)Android Studio 安装部署 华丽躲坑
  3. Unicode编码完全探究(三)之联通乱码
  4. Netlink实现热拔插监控
  5. 01-基本配置与测试
  6. Gym - 100989J -(DFS)
  7. python标准库学习5 ---bisect — Array bisection algorithm
  8. RHCSA红帽认证考点2022(红帽认证系统管理员)
  9. 蔡为东:行之有效的IT技术团队管理实践
  10. OMRON继电器基础讲解
  11. 用Javascript实现随机抽奖
  12. python语言的实验心得体会范文_关于实验总结心得体会范文
  13. HadoopYarn设置Fair Scheduler公平调度器
  14. pvq真值表_逻辑学真值表
  15. 深度剖析WiFi的SSID问题
  16. 无尘间手把手教你西数开盘
  17. 《说服力-让你的PPT会说话》9月上海公开课简章
  18. 单目标优化算法测试函数python绘制及相关代码
  19. 《Android Studio开发实战》学习(二)- 聊天室
  20. 618小红书品牌营销复盘「保姆级教学」 !

热门文章

  1. C语言求整数的绝对值
  2. 将进酒:《惜樽空》敦煌抄本
  3. Python 判断无向图是否存在环
  4. 阿里云中间件是什么,有哪些产品
  5. 【Android -- 技术周刊】第 020 期
  6. t420i升级固态硬盘提升_surface laptop3固态升级指南——拆机、换固态硬盘、重装系统...
  7. QT5.9用自定义字体修改qlabel字体、大小以及颜色
  8. 【教程】Ubuntu安装、使用gephi
  9. 微型计算机中既能作为输出设备,在微机的硬件设备中,有一种设备在程序设计中既可以当做输出设备,又可以当做输入设备,这种设备是ß...
  10. 回文数,用scratch编程实现回文数