Curator极大简化了ZooKeeper的使用,增加了针对ZooKeeper集群中connection的管理。

节点的创建和删除

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat;public class CuratorBase {static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";static final int SESSION_TIMEOUT = 35000;//会话超时时间,默认为60000,单位:ms static final int CONNECTION_TIMEOUT=60000;//连接超时时间,默认为15000,单位:ms  public static void main(String[] args) throws Exception {//重试策略:初试时间为10s,最大重试次数为20RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20);//创建连接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(retryPolicy).build();//开启连接
        cf.start();//建立节点 指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent/p1","p1 value".getBytes());Thread.sleep(30000);//删除节点cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/persistent");cf.close();}
}

run as--java application

线程休眠30s后,执行节点删除操作

节点内容的修改

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat;public class CuratorBase {static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";static final int SESSION_TIMEOUT = 35000;//会话超时时间,默认为60000,单位:ms static final int CONNECTION_TIMEOUT=60000;//连接超时时间,默认为15000,单位:ms  public static void main(String[] args) throws Exception {//重试策略:初试时间为10s,最大重试次数为20RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20);//创建连接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(retryPolicy).build();//开启连接
        cf.start();//创建节点cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent/p1","p1 value".getBytes());//cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent/p2","p2 value".getBytes());//读取节点String ret1 = new String(cf.getData().forPath("/persistent/p1"));System.out.println(ret1);//修改节点cf.setData().forPath("/persistent/p1", "new p1 value".getBytes());String ret2 = new String(cf.getData().forPath("/persistent/p1"));System.out.println(ret2);cf.close();}
}

Eclipse的console输出

Eclipse的ZooKeeper Explorer内容

节点操作的回调函数

节点的新增、修改、删除,都可以设置其回调函数。该回调函数可以输出服务器的状态码、服务器事件类型等内容。还可以加入一个线程池进行优化操作。在批量节点操作的时候,可以用线程池去规划callback,可以将很多的任务放到队列中,使用线程池中的线程将队列中的任务进行处理。线程池中线程的个数可以根据具体的机器配置而定。

下面代码中,节点的创建操作是一个异步的过程,不会阻塞主线程main的执行,代码中将主线程main休眠,子线程在执行完节点的创建操作后执行回调函数并输出相关内容。若不添加主线程休眠的代码,则主线程执行完代码后结束,此时节点创建的子线程还没有完成节点的创建,因main线程的结束子线程也结束,进而就不能完成节点创建和回调函数的执行。

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat;public class CuratorBase {static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";static final int SESSION_TIMEOUT = 35000;//会话超时时间,默认为60000,单位:ms static final int CONNECTION_TIMEOUT=60000;//连接超时时间,默认为15000,单位:ms  public static void main(String[] args) throws Exception {//重试策略:初试时间为10s,最大重试次数为20RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20);//创建连接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(retryPolicy).build();//开启连接
        cf.start();// 绑定回调函数ExecutorService pool = Executors.newCachedThreadPool();cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {System.out.println("code:" + ce.getResultCode());System.out.println("type:" + ce.getType());System.out.println("线程为:" + Thread.currentThread().getName());}}, pool).forPath("/persistent/p2","p2 value".getBytes());System.out.println("主线程:"+Thread.currentThread().getName());Thread.sleep(Integer.MAX_VALUE);cf.close();}
}

Eclipse中console输出

ZooKeeper Explorer中内容

获取子节点

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat;public class CuratorBase {static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181";static final int SESSION_TIMEOUT = 35000;//会话超时时间,默认为60000,单位:ms static final int CONNECTION_TIMEOUT=60000;//连接超时时间,默认为15000,单位:ms  public static void main(String[] args) throws Exception {//重试策略:初试时间为10s,最大重试次数为20RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20);//创建连接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(retryPolicy).build();//开启连接
        cf.start();// 绑定回调函数ExecutorService pool = Executors.newCachedThreadPool();cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception {System.out.println("code:" + ce.getResultCode());System.out.println("type:" + ce.getType());System.out.println("线程为:" + Thread.currentThread().getName());}}, pool).forPath("/persistent/p2","p2 value".getBytes());System.out.println("主线程:"+Thread.currentThread().getName());Thread.sleep(20000);//主线程休眠20s,等待节点创建完毕// 读取子节点getChildren方法 和 判断节点是否存在checkExists方法List<String> list = cf.getChildren().forPath("/persistent");for(String p : list){System.out.println(p);}Stat stat_p1 = cf.checkExists().forPath("/persistent/p1");System.out.println(stat_p1);Stat stat_p2 = cf.checkExists().forPath("/persistent/p2");System.out.println(stat_p2);cf.close();}
}

Eclipse的console输出

若上面代码将Thread.sleep(20000);删除,有时会出现下面的异常,原因是节点创建和main主线程的执行是异步的。

转载于:https://www.cnblogs.com/cat520/p/9412815.html

Curator操作ZooKeeper相关推荐

  1. Apache Curator操作zookeeper的API使用

    curator简介与客户端之间的异同点 常用的zookeeper java客户端: zookeeper原生Java API zkclient Apache curator ZooKeeper原生Jav ...

  2. 【Zookeeper】基本使用:Curator操作Zookeeper

    针对zookeeper,比较常用的Java客户端有zkclient.curator. 由于 Curator 对于 zookeeper 的抽象层次比较高,简化了 zookeeper客户端的开发量.使得c ...

  3. 基于Curator的Zookeeper操作实战

    前言 Zookeeper操作方式 这篇文章主要说的是利用java来操作zookeeper,就如操作mysql数据库一样,主要是实现增删改查功能,而实现这些功能的方式主要有以下三种: zookeeper ...

  4. Apache ZooKeeper - 使用Apache Curator操作ZK

    文章目录 原生ZK API VS Curator Curator 概述 Maven依赖 会话创建 静态工厂方式创建会话 使用 fluent 风格创建会话 创建节点 protection 模式 ,规避僵 ...

  5. ZooKeeper入门(二)Java操作zookeeper

    首先是pom依赖: <dependency><groupId>org.apache.zookeeper</groupId><artifactId>zoo ...

  6. zookeeper教程,docker 安装,命令,python操作zookeeper,分布式队列,分布式锁

    docker安装zookeeper服务端 首先安装单节点的服务端,如果安装多节点的服务端,需要为每个节点配置其他节点的地址. docker run --privileged=true -d --nam ...

  7. Zookeeper学习之源生API的使用(java与shell操作zookeeper)。

    如果不会搭建zookeeper环境:请看此文章:Zookeeper学习之集群环境搭建 1.操作zookeeper(shell) 启动zookeeper客户端:zkCli.sh; 根据提示命令进行操作: ...

  8. 使用Kazoo操作ZooKeeper服务治理

    使用Kazoo操作ZooKeeper服务治理 单机服务的可靠性及可扩展性有限,某台服务宕机可能会影响整个系统的正常使用:分布式服务能够有效地解决这一问题,但同时分布式服务也会带来一些新的问题,如:服务 ...

  9. python系列之:kazoo连接Zookeeper操作Zookeeper

    python系列之:kazoo连接Zookeeper操作Zookeeper 一.连接zookeeper 二.读取zookeeper节点 三.读取zookeeper内容 四.kazoo连接Zookeep ...

最新文章

  1. 人脸服务器如何与门禁系统对接,人脸识别门禁系统终端设备接口说明
  2. python自动化办公都能做什么菜-python+selenium自动化(一)之环境搭建
  3. python绘制3维图-1、2、3维图见过,用Python画出来的六维图见过么?
  4. .net Core+Dapper MySQL增删改查
  5. 如何做一款成功的APP应用
  6. P5 Matlab/Simulink 在时域分析中的应用-《Matlab/Simulink与控制系统仿真》程序指令总结
  7. url 编码 与 接口签名
  8. mysql hint 简书_MySQL
  9. Codeforces-741A-Arpa's loud Owf and Mehrdad's evil plan(找有向图环及最大公倍数计算)
  10. OpenShift 4 之 GitOps(2)用ArgoCD部署应用
  11. 7大编程误区,你避开了吗?
  12. UVA - 10298 后缀数组(仅观赏)
  13. 20200628每日一句
  14. python自带的单元测试框架,最好的python单元测试框架
  15. Mac安装telnet工具和使用
  16. Redis源码之——跳表skiplist原理和源码调试
  17. linux office 永中,永中Office Linux版
  18. 面试官,不要再问我三次握手和四次挥手
  19. PG创建临时表时添加on commit drop参数
  20. 福州安卓培训_关于利用东风商学院开展电控发动机维修技师远程培训的通知

热门文章

  1. Java集合框架概述及Collection接口方法讲解
  2. LeetCode 88 合并两个有序数组
  3. USACO-Section1.2 Friday the Thirteenth (简单日期处理)
  4. 冯诺依曼计算机的组成
  5. golang 数组、指针数组、数组指针使用总结
  6. Python客户端syn连接
  7. 截取、拼接字符串,memcpy
  8. 基于 Java Web 的毕业设计选题管理平台--选题报告与需求规格说明书
  9. 附9 elasticsearch-curator + Linux定时任务
  10. Size Balanced Tree