zookeeper开源客户端Curator介绍(六)
原文地址,转载请注明出处: https://blog.csdn.net/qq_34021712/article/details/82872311 ©王赛超
上一篇文章 介绍了zookeeper原生API的使用,使用过原生API不得不说,有很多的问题,比如:不能递归创建和删除节点、Watcher只能使用一次、还有很多可以解决分布式应用问题的api(比如分布式锁,leader选举等),但由于ZooKeeper提供的原始API并不是很易用,在其基础上封装一些高级应用又是一件很复杂的事情。
这个时候,Curator出现了,Curator是Netflix公司开源的一个Zookeeper客户端,后捐献给Apache,Curator框架在zookeeper原生API接口上进行了包装,解决了很多ZooKeeper客户端非常底层的细节开发。提供ZooKeeper各种应用场景(recipe, 比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装,实现了Fluent风格的API接口,是最好用,最流行的zookeeper的客户端。
Curator主要从以下几个方面降低了zk使用的复杂性:
- 重试机制:提供可插拔的重试机制, 它将给捕获所有可恢复的异常配置一个重试策略,并且内部也提供了几种标准的重试策略(比如指数补偿)
- 连接状态监控: Curator初始化之后会一直对zk连接进行监听,一旦发现连接状态发生变化将会作出相应的处理
- zk客户端实例管理:Curator会对zk客户端到server集群的连接进行管理,并在需要的时候重建zk实例,保证与zk集群连接的可靠性
- 各种使用场景支持:Curator实现了zk支持的大部分使用场景(甚至包括zk自身不支持的场景),这些实现都遵循了zk的最佳实践,并考虑了各种极端情况
参考官网
http://curator.apache.org/index.html
兼容性介绍
虽然ZooKeeper开发团队仍然将ZooKeeper 3.5.x视为“测试版”,但实际情况是它被许多用户用于生产,当然了生产上也有很多3.4.x的版本,Curator现在最新版本是Curator 4.0,Curator 4.0对ZooKeeper 3.5.x有很强的依赖性如果您使用的是ZooKeeper 3.5.x,则无需执行任何操作 - 只需使用Curator 4.0即可。对ZooKeeper 3.4.x也是兼容的,但是你必须在maven依赖中排除 zookeeper依赖包,如下:
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.0.1</version>
</dependency>
项目组成
git地址:https://github.com/apache/curator 整个框架由下面这些模块组成:
模块名称 | 描述 |
---|---|
curator-recipes | 所有典型应用场景。例如:分布式锁,Master选举等。需要依赖client和framework,需设置自动获取依赖。 |
curator-async | jdk8 异步操作 |
curator-framework | Zookeeper API的高层封装,大大简化Zookeeper客户端编程,添加了例如Zookeeper连接管理、重试机制等。 |
curator-client | Zookeeper client的封装,用于取代原生的Zookeeper客户端(ZooKeeper类),提供一些非常有用的客户端特性。 |
curator-test | 包含TestingServer,TestingCluster和一些其他有助于测试的工具。 |
curator-examples | 各种使用Curator特性的例子。 |
curator-x-discovery | 服务注册发现,在SOA /分布式系统中,服务需要相互寻找。curator-x-discovery提供了服务注册,找到特定服务的单个实例,和通知服务实例何时更改。 |
curator-x-discovery-server | 服务注册发现管理器,可以和curator-x-discovery 或者非java程序程序使用RESTful Web服务以注册,删除,查询等服务。 |
下面学习Curator常用节点的增删改查操作
pom添加依赖
一般依赖recipes就可以了,内部依赖有client和framework 能用的上的功能都有了,另外注意,记得排除zookeeper的包,我使用的客户端是3.5.3,所以使用当前最新的4.0.1。
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.0.1</version>
</dependency>
创建会话
上面已经说过了,curator是Fluent风格API,如果您以前没有使用它,它可能看起来很奇怪,所以建议您熟悉这种风格。
Curator的创建会话方式与原生的API创建方式区别很大。Curator创建客户端为CuratorFramework,是由CuratorFrameworkFactory工厂类来实现的,CuratorFramework是线程安全的,要连接的每个ZooKeeper集群只需要一个 CuratorFramework对象就可以了。
CuratorFrameworkFactory提供了两种构造CuratorFramework的方法,一种是直接newClient,还有一种方式是使用CuratorFrameworkFactory.builder(),提供了细粒度的控制。
CuratorFrameworkFactory提供了两个默认的直接构造CuratorFramework的方法,如下:
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy);
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy);
其中connectString参数就是zk的地址,参数RetryPolicy提供重试策略的接口,可以让用户实现自定义的重试策略。
使用上面方法创建出一个CuratorFramework之后,需要再调用其start()方法完成会话创建。
下面给出创建CuratorFramework例子代码:
package com.zookeeper.test.framework;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;/*** @author: WangSaiChao* @date: 2018/9/12* @description: 创建会话的例子*/
public class CreateClientExamples{public static void main(String[] args) {String connectionString = "127.0.0.1";ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE);/*** connectionString zk地址* retryPolicy 重试策略* 默认的sessionTimeoutMs为60000* 默认的connectionTimeoutMs为15000*/CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);curatorFramework.start();/*** connectionString zk地址* sessionTimeoutMs 会话超时时间* connectionTimeoutMs 连接超时时间* retryPolicy 重试策略*/CuratorFramework curatorFramework1 = CuratorFrameworkFactory.newClient(connectionString, 60000, 15000, retryPolicy);curatorFramework1.start();/*** connectionString zk地址* sessionTimeoutMs 会话超时时间* connectionTimeoutMs 连接超时时间* namespace 每个curatorFramework 可以设置一个独立的命名空间,之后操作都是基于该命名空间,比如操作 /app1/message 其实操作的是/node1/app1/message* retryPolicy 重试策略*/CuratorFramework curatorFramework2 = CuratorFrameworkFactory.builder().connectString(connectionString).sessionTimeoutMs(60000).connectionTimeoutMs(15000).namespace("/node1").retryPolicy(retryPolicy).build();curatorFramework2.start();}
}
重试策略
上面创建会话的例子中,我们使用了ExponentialBackoffRetry的重试策略, retryPolicy默认提供了以下实现,分别为:
ExponentialBackoffRetry:
/*** 随着重试次数增加重试时间间隔变大,指数倍增长baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))。* 有两个构造方法* baseSleepTimeMs初始sleep时间* maxRetries最多重试几次* maxSleepMs最大的重试时间* 如果在最大重试次数内,根据公式计算出的睡眠时间超过了maxSleepMs,将打印warn级别日志,并使用最大sleep时间。* 如果不指定maxSleepMs,那么maxSleepMs的默认值为Integer.MAX_VALUE。*/
public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries);
public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs);
BoundedExponentialBackoffRetry:
/*** 继承与ExponentialBackoffRetry ,BoundedExponentialBackoffRetry只有一个三个参数构造器,效果跟ExponentialBackoffRetry三个函数构造器是一样的,只是内部实现不一样。* baseSleepTimeMs初始sleep时间* maxSleepTimeMs最大sleep时间* maxRetries最大重试次数*/
public BoundedExponentialBackoffRetry(int baseSleepTimeMs, int maxSleepTimeMs, int maxRetries);
RetryForever:永远重试策略
/*** RetryForever:永远重试策略* retryIntervalMs重试时间间隔*/
public RetryForever(int retryIntervalMs);
RetryNTimes:重试N次
/*** RetryNTimes:重试N次* n重试几次* sleepMsBetweenRetries每次重试间隔时间*/
public RetryNTimes(int n, int sleepMsBetweenRetries);
RetryOneTime:只重试一次
/*** RetryOneTime:只重试一次* sleepMsBetweenRetry:每次重试间隔的时间*/
public RetryOneTime(int sleepMsBetweenRetry);
RetryUntilElapsed:一直重试,直到超过指定时间
/*** RetryUntilElapsed:一直重试,直到超过指定时间* maxElapsedTimeMs最大重试时间* sleepMsBetweenRetries每次重试间隔时间*/
public RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries);
SleepingRetry:抽象类,上面的RetryNTimes就是继承它
常用的API操作
package com.zookeeper.test.framework;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;import java.util.Collections;/*** @author: WangSaiChao* @date: 2018/9/12* @description: 节点操作例子*/
public class CrudExamples {public static void main(String[] args) throws Exception {String connectionString = "192.168.58.42:2181";ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3,Integer.MAX_VALUE);CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);curatorFramework.start();//=========================创建节点=============================/*** 创建一个 允许所有人访问的 持久节点*/curatorFramework.create().creatingParentsIfNeeded()//递归创建,如果没有父节点,自动创建父节点.withMode(CreateMode.PERSISTENT)//节点类型,持久节点.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//设置ACL,和原生API相同.forPath("/node10/child_01","123456".getBytes());/*** 创建一个容器节点*/curatorFramework.create().creatingParentContainersIfNeeded()//递归创建,如果没有父节点,自动创建父节点.withMode(CreateMode.CONTAINER)//节点类型,容器节点,当最后一个子节点删除的时候,服务器会在未来的一段时间将该容器节点删除(不是立刻删除).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//设置ACL,和原生API相同.forPath("/node11","123".getBytes());/*** 创建一个定时节点* 临时节点存在时间为3秒* ACL为 digest:wangsaichao:123456:cdrwa*/curatorFramework.create().withTtl(3000)//设置临时节点的有效期,单位是毫秒.creatingParentContainersIfNeeded()//递归创建,如果没有父节点,自动创建父节点.withMode(CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL)//节点类型,定时节点,在一定的时间内,如果没有操作该节点,并且该节点没有子节点,那么服务器将自动删除该节点.withACL(Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "wangsaichao:G2RdrM8e0u0f1vNCj/TI99ebRMw="))))//ACL.forPath("/node12","123".getBytes());//=========================获取节点=============================/*** 获取节点 /node10/child_01 数据 和stat信息*/Stat node10Stat = new Stat();byte[] node10 = curatorFramework.getData().storingStatIn(node10Stat)//获取stat信息存储到stat对象.forPath("/node10/child_01");System.out.println("=====>该节点信息为:" + new String(node10));System.out.println("=====>该节点的数据版本号为:" + node10Stat.getVersion());/*** 获取节点信息 并且留下 Watcher事件 该Watcher只能触发一次*/byte[] bytes = curatorFramework.getData().usingWatcher(new Watcher() {@Overridepublic void process(WatchedEvent event) {System.out.println("=====>wathcer触发了。。。。");System.out.println(event);}}).forPath("/node10/child_01");System.out.println("=====>获取到的节点数据为:"+new String(bytes));Thread.sleep(1000000);//=========================设置节点=============================Stat stat = curatorFramework.setData().withVersion(-1).forPath("/node10/child_01", "I love you".getBytes());System.out.println("=====>修改之后的版本为:" + stat.getVersion());//=========================删除节点=============================/*** 删除node节点 不递归 如果有子节点,将报异常*/Void aVoid = curatorFramework.delete().forPath("/node10");System.out.println("=====>" + aVoid);/*** 递归删除,如果有子节点 先删除子节点*/curatorFramework.delete().deletingChildrenIfNeeded().forPath("/node10");curatorFramework.close();/*** 删除*/curatorFramework.delete().guaranteed().forPath("/node11");//=========================判断节点是否存在=============================/*** CuratorFramework类有一个判断节点是否存在的接口checkExists(),该接口返回一个org.apache.zookeeper.data.Stat对象,对象中有一个ephemeralOwner属性。* 如果该节点是持久化节点,ephemeralOwner的值为0* 如果该节点是临时节点,ephemeralOwner的值大于0*/Stat existsNodeStat = curatorFramework.checkExists().forPath("/node10");if(existsNodeStat == null){System.out.println("=====>节点不存在");}if(existsNodeStat.getEphemeralOwner() > 0){System.out.println("=====>临时节点");}else{System.out.println("=====>持久节点");}}
}
zookeeper开源客户端Curator介绍(六)相关推荐
- Zookeeper开源客户端Curator之基本功能讲解
简介 Curator是Netflix公司开源的一套Zookeeper客户端框架.了解过Zookeeper原生API都会清楚其复杂度.Curator帮助我们在其基础上进行封装.实现一些开发细节,包括接连 ...
- [转载]Zookeeper开源客户端框架Curator简介
转载声明:http://macrochen.iteye.com/blog/1366136 Zookeeper开源客户端框架Curator简介 博客分类: Distributed Open Source ...
- Zookeeper详解(三)——开源客户端curator
开源客户端curator (true re de) curator是Netflix公司开源的一个zookeeper客户端,后捐献给apache,curator框架在zookeeper原生API接口上进 ...
- Zookeeper的客户端Curator基本使用
本文来说下Zookeeper的客户端Curator基本使用 文章目录 Curator概述 Curator包结构 创建会话 使用静态工程方法创建客户端 Fluent风格的Api创建会话 创建包含隔离命名 ...
- Zookeeper开源客户端框架Curator的简单使用
为什么80%的码农都做不了架构师?>>> Curator最初由Netflix的Jordan Zimmerman开发, Curator提供了一套Java类库, 可以更容易的使用Z ...
- Zookeeper分布式一致性原理(六):Zookeeper开源客户端zkClient
zkClient 是GitHub上的一个开源Zookeeper客户端项目,是由Datameer的工程师Stefan和Groschupf和PeteVoss一起开发的.zkClient在Zookeeper ...
- ZooKeeper学习笔记五 ZooKeeper开源客户端ZkClient
本文学习资源来自<从Paxos到ZooKeeper分布式一致性原理与实践> ZkClient ZkClient 在ZooKeeper原生API接口之上进行了封装,是一个更易用的ZooKee ...
- 【zookeeper】Apache curator优点介绍
文章目录 1. 简介 2. 项目组件 2.1 版本 2.2 项目组件 2.3 Maven依赖 3. 案例及功能说明 3.1 创建会话 3.1.1 重试策略 3.1.2 创建节点 3.1.3 删除节点 ...
- ZooKeeper客户端Curator的基本使用
前提:ZooKeeper版本:3.4.14 Curator版本:2.13.0 1.什么是Curator Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Z ...
最新文章
- 理解spark中的job、stage、task
- 《蓝色协议BLUE PROTOCOL》技术分享解读
- percona toolkit for mysql_Percona Toolkit for MySQL安装(CentOS5.8) | 学步园
- vue-awesome-swiper使用
- 【EFCORE笔记】异步查询工作原理注释标记
- 10-Linux与windows文件互传-pscp坑---- 'pscp' 不是内部或外部命令,也不是可运行的程序或批处理文件...
- Java主类结构:变量与常量
- GIS学习第一课:USGS遥感数据下载
- android win10 驱动安装失败,解决在win10系统下小米手机驱动安装失败的具体步骤...
- 英雄无敌3 Def 文件对应生物列表
- java基础之多线程与Lambda表达式
- 微软Hololens学院教程-Hologram 212-Voice(语音)【微软教程已经更新,本文是老版本】...
- 关闭和开启笔记本自带键盘。
- idea 报错Output directory is not specified错误
- 有理展开定理与递推数列通项公式
- 【数据结构】B树和B+树的笔记详细诠释
- Pytorch中tensor维度和torch.max()函数中dim参数的理解
- 大于或小于100万,1000万,1亿,10亿,1000亿,万亿,亿亿,10亿亿,100亿亿上下的10个质数(素数)...
- 美团前端必会面试题(附答案)
- 使用Python对FPS游戏读写操作
热门文章
- java 创建mdi窗体_Java制作MDI窗体源代码是什么?
- touchpad-indicator托盘图标消失
- 理论力学专题:守恒量的证明
- 构建网络空间命运共同体 !麒麟信安参展2021年世界互联网大会“互联网之光”博览会
- PYQT5+pygame+爬虫实现本地播放器及按歌手下载(循环播放,上,下曲,进度条,声音,网络下载。写的很烂,勿喷)
- Using sensor messages with tf(Using Stamped datatypes with tf::MessageFilter)
- 国际手机号码的正则表达式
- 公网开放80、8000端口
- 单片机音频驱动实验C语言,单片机实验:音乐改编
- 【UI设计】—界面设计原则