【弄nèng - Zookeeper】Zookeeper入门教程(三)—— 客户端Curator的基本API使用(Curator framework)
文章目录
- 1. Curator简介
- 2. Curator framework
- 3. Curator recipes
- 4. 基本Api
- 4.1 创建会话
- 4.1.1 使用静态工程方法创建
- 4.1.2 使用Fluent风格Api创建
- 4.1.3 RetryPolicy说明
- 4.1.4 namespace说明
- 4.2 创建节点
- 4.3 删除节点
- 4.4 读取数据
- 4.5 更新数据
- 4.6 判断节点是否存在
- 4.7 获取某个节点的所有子节点路径
- 4.8 事务
- 4.9 异步接口
- 源码地址
- 项目推荐
本文介绍zk客户端curator的使用,本文主要介绍Curator framework的使用
官方文档传送门
参考:http://www.throwable.club/2018/12/16/zookeeper-curator-usage/#Zookeeper%E5%AE%A2%E6%88%B7%E7%AB%AFCurator%E4%BD%BF%E7%94%A8%E8%AF%A6%E8%A7%A3
参考书籍:《从Paxos到ZooKeeper 分布式一致性原理与实践》
1. Curator简介
Curator是Netflix公司开源的Zookeeper客户端框架,它简化了Zookeeper客户端的使用,包括重连,反复注册Watcher,异常处理等。还封装了常用场景,包括Master选举,分布式锁等等。
Maven组件:
GroupID/Org | ArtifactID/Name | 描述 |
---|---|---|
org.apache.curator | curator-recipes | 常见应用场景,分布式锁,master选举等的封装,依赖client与framework包 |
org.apache.curator | curator-async | 具有O/R建模、迁移和许多其他特性的异步DSL。 |
org.apache.curator | curator-framework | 在Client基础上更进一步的封装,可大大简化ZooKeeper的使用。 |
org.apache.curator | curator-client | 它提供了zk实例创建/重连机制等,简单便捷.不过直接使用curator-client并不能减少太多的开发量 |
org.apache.curator | curator-test | 包含TestingServer、TestingCluster和一些用于测试的其他工具。 |
org.apache.curator | curator-examples | 示例用法。 |
org.apache.curator | curator-x-discovery | 服务发现实现。 |
org.apache.curator | curator-x-discovery-server | 可以注册中心一起使用的RESTful服务器。 |
2. Curator framework
官网传送门
Curator框架是一个高级API,可大大简化ZooKeeper的使用。它添加了许多基于ZooKeeper构建的功能,并处理了管理与ZooKeeper群集的连接并重试操作的复杂性。包含功能:
- 简化了原始的ZooKeeper方法,事件等
- 自动重连
- 监视NodeDataChanged事件,并根据需要调用updateServerList()
3. Curator recipes
官网传送门
Curator recipes 提供了一些高级特性,包括领导人选举,共享锁,路径缓存和观察者,分布式队列,分布式优先级队列等等。
4. 基本Api
pom
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>2.13.0</version></dependency>
curator从3.x版本需要zk3.5.x版本
4.1 创建会话
newClient静态工厂方法四个主要参数:
参数 | 说明 |
---|---|
connectionString | 服务器列表,格式host1:port1,host2:port2 |
retryPolicy | 重试策略,默认包含ExponentialBackoffRetry,RetryNTiMessage,RetryOneTime,RetryUntilElapsed |
sessionTimeoutMs | 会话超时,默认60000ms |
connectionTimeoutMs | 连接超时,默认15000ms |
4.1.1 使用静态工程方法创建
@Testpublic void client1() throws Exception {// 重试策略RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",5000, // 会话超时时间,默认60000ms10000, // 连接超时时间,默认15000msretryPolicy);client.start();Thread.sleep(Integer.MAX_VALUE);}
4.1.2 使用Fluent风格Api创建
private CuratorFramework client;/*** 使用Fluent风格API创建*/@Beforepublic void client() {// 重试策略RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);client = CuratorFrameworkFactory.builder().connectString("localhost:2181").sessionTimeoutMs(5000) // 会话超时时间,默认60000ms.connectionTimeoutMs(10000) // 连接超时时间,默认15000ms.retryPolicy(retryPolicy) // 重试策略.namespace("base") // 名称空间,如果这里设置了名称空间,那么所有路径都将预先指定名称空间.build();client.start();}
4.1.3 RetryPolicy说明
RetryPolicy :重试策略,默认包含ExponentialBackoffRetry,RetryNTiMessage,RetryOneTime,RetryUntilElapsed。
ExponentialBackoffRetry构造方法:
ExponentialBackoffRetry(int baseSleepTimes, int maxretries);
ExponentialBackoffRetry(int baseSleepTimes, int maxretries, int maxSleepMs);
参数 | 说明 |
---|---|
baseSleepTimes | 初始sleep时间 |
maxretries | 最大重试次数 |
maxSleepMs | 最大sleep时间 |
当前sleep计算公式
当前sleep时间 = baseSleepTimes * Math.max(1, random.nextInt(1 << (retryCount + 1)))
根据公式可以看出retryCount 越大,sleep时间越长。
4.1.4 namespace说明
名称空间,起到隔离作用,如果这里设置了名称空间,那么该客户端对数据节点的任何操作都基于该相对目录进行
4.2 创建节点
/*** 创建节点并初始化值,默认是持久节点*/@Testpublic void create1() throws Exception {String path = "/create";client.create().forPath(path, "init".getBytes()); // 指定路径和值}/*** 创建临时节点*/@Testpublic void create2() throws Exception {String path = "/create1";client.create().withMode(CreateMode.EPHEMERAL) // 创建临时节点.forPath(path, "init".getBytes());Thread.sleep(10000); // 10s后消失}/*** 递归创建所需要的父节点,ZK中所有非叶子节点必须是持久节点*/@Testpublic void create3() throws Exception {String path = "/zk-test/z1";client.create().creatingParentsIfNeeded() // 递归创建父节点.withMode(CreateMode.EPHEMERAL) // 创建临时节点.forPath(path, "init".getBytes());Thread.sleep(Integer.MAX_VALUE);}
4.3 删除节点
/*** 删除节点*/@Testpublic void delete() throws Exception {client.delete().guaranteed() // 强制保证删除,只要客户端会话存在,就会一直删除,直到成功。// 当客户端遇到网络异常,会记录这次删除操作.deletingChildrenIfNeeded() // 递归删除子节点// .withVersion(1) // 指定删除的版本号(可选).forPath("/create2");}
4.4 读取数据
/*** 读取数据*/@Testpublic void get() throws Exception {byte[] bytes = client.getData().forPath("/create");System.out.println(new String(bytes));}/*** 读取数据,同时获取该节点stat*/@Testpublic void get1() throws Exception {Stat stat = new Stat();client.getData().storingStatIn(stat).forPath("/create");}
4.5 更新数据
/*** 更新数据*/@Testpublic void set() throws Exception {client.setData()//.withVersion(2) // 指定版本修改(可选).forPath("/create", "init1".getBytes());}
4.6 判断节点是否存在
/*** 检查节点是否存在*/@Testpublic void exists() throws Exception {client.checkExists().forPath("/create");}
4.7 获取某个节点的所有子节点路径
/*** 获取某个节点的所有子节点路径*/@Testpublic void getChildren() throws Exception {List<String> list = client.getChildren().forPath("/");list.forEach(item ->{System.out.println(item);});}
4.8 事务
/*** 事务*/@Testpublic void transaction() throws Exception {client.inTransaction().check().forPath("/create").and().create().withMode(CreateMode.EPHEMERAL).forPath("/nodeB", "init".getBytes()).and().create().withMode(CreateMode.EPHEMERAL).forPath("/nodeC", "init".getBytes()).and().commit();}
4.9 异步接口
Curator使用BackgroundCallback接口实现异步化。
BackgroundCallback
public interface BackgroundCallback
{/*** Called when the async background operation completes** @param client the client* @param event operation result details* @throws Exception errors*/public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}
- CuratorFramework :当前客户端实例
- CuratorEvent :服务端事件,包含事件类型和响应码
CuratorEvent – 事件类型
事件类型 | 对应方法 |
---|---|
Event Type Event Methods | |
CREATE | getResultCode() and getPath() |
DELETE | getResultCode() and getPath() |
EXISTS | getResultCode(), getPath() and getStat() |
GET_DATA | getResultCode(), getPath(), getStat() and getData() |
SET_DATA | getResultCode(), getPath() and getStat() |
CHILDREN | getResultCode(), getPath(), getStat(), getChildren() |
SYNC | getResultCode(), getStat() |
GET_ACL | getResultCode(), getACLList() |
SET_ACL | getResultCode() |
TRANSACTION | getResultCode(), getOpResults() |
WATCHED | getWatchedEvent() |
GET_CONFIG | getResultCode(), getData() |
RECONFIG | getResultCode(), getData() |
CuratorEvent – 响应码
OK(0), SYSTEMERROR(-1),RUNTIMEINCONSISTENCY(-2),DATAINCONSISTENCY(-3),CONNECTIONLOSS(-4),MARSHALLINGERROR(-5),UNIMPLEMENTED(-6),OPERATIONTIMEOUT(-7),BADARGUMENTS(-8),APIERROR(-100),NONODE(-101),NOAUTH(-102),BADVERSION(-103),NOCHILDRENFOREPHEMERALS(-108),NODEEXISTS(-110),NOTEMPTY(-111),SESSIONEXPIRED(-112),INVALIDCALLBACK(-113),INVALIDACL(-114),AUTHFAILED(-115),SESSIONMOVED(-118),NOTREADONLY(-119);
事例 – 异步创建节点
/*** 异步创建节点*/@Testpublic void createSync() throws Exception {String path = "/create2";CountDownLatch semaphore = new CountDownLatch(2);// 自定义的Executor,因为EventThread是串行处理事件,如果某个事件特别复杂,就会影响其他事件的处理,所以复杂的事件需要单独一个线程池去处理。ExecutorService tp = Executors.newFixedThreadPool(2);System.out.println("Main thread: " + Thread.currentThread().getName());// 传入自定义的Executorclient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]");System.out.println("Thread name: " + Thread.currentThread().getName());semaphore.countDown();}}, tp).forPath(path, "init".getBytes());// 此处没有传入自定义的Executorclient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]");System.out.println("Thread name: " + Thread.currentThread().getName());semaphore.countDown();}}).forPath(path, "init".getBytes());semaphore.await();tp.shutdown();}
源码地址
IT-CLOUD-ZOOKEEPER :zookeeper客户端Curator事例
项目推荐
IT-CLOUD :IT服务管理平台,集成基础服务,中间件服务,监控告警服务等。
IT-CLOUD-ACTIVITI6 :Activiti教程源码。博文在本CSDN Activiti系列中。
IT-CLOUD-ELASTICSEARCH :elasticsearch教程源码。博文在本CSDN elasticsearch系列中。
IT-CLOUD-KAFKA :spring整合kafka教程源码。博文在本CSDN kafka系列中。
IT-CLOUD-KAFKA-CLIENT :kafka client教程源码。博文在本CSDN kafka系列中。
IT-CLOUD-ZOOKEEPER :zookeeper客户端Curator事例。博文在本CSDN zookeeper系列中。开源项目,持续更新中,喜欢请 Star~
【弄nèng - Zookeeper】Zookeeper入门教程(三)—— 客户端Curator的基本API使用(Curator framework)相关推荐
- 【弄nèng - Skywalking】入门篇(二)—— Skywalking集群部署
文章目录 一. 安装OAP 1.1 前置 1.2 下载 1.3 修改配置application.yml 1.4 高级部署 1.5 webapp配置 1.6 启动 二. 使用Agent 2.1 修改配置 ...
- SpringCloud 入门教程(三): 配置自动刷新
Spring Cloud 入门教程(三): 配置自动刷新 之前讲的配置管理, 只有在应用启动时会读取到GIT的内容, 之后只要应用不重启,GIT中文件的修改,应用无法感知, 即使重启Config Se ...
- 【弄nèng - Grafana】入门篇(十)—— Dashboard默认时间范围和刷新时间设置
文章目录 1. Dashboard通用设置 General 2. 修改Dashboard默认时间范围 3. 修改某个Panel时间 项目推荐 Dashboard修改默认时间范围 1. Dashboar ...
- qpython3可视图形界面_PySide——Python图形化界面入门教程(三)
PySide--Python图形化界面入门教程(三) --使用内建新号和槽 --Using Built-In Signals and Slots 上一个教程中,我们学习了如何创建和建立交互widget ...
- 【MATLAB Image Processing Toolbox 入门教程三】快速入门之“在多光谱图像中寻找植被”
[MATLAB Image Processing Toolbox 入门教程三] 本篇摘要 一.从多光谱图像文件导入彩色红外通道 二.构建近红外光谱散射图 三.计算植被系数并显示其定位 四.综合实例部分 ...
- python爬虫入门教程(三):淘女郎爬虫 ( 接口解析 | 图片下载 )
2019/10/28更新 网站已改版,代码已失效(其实早就失效了,但我懒得改...)此博文仅供做思路上的参考 代码使用python2编写,因已失效,就未改写成python3 爬虫入门系列教程: pyt ...
- R语言七天入门教程三:学习基本结构
R语言七天入门教程三:学习基本结构 一.编程的语言的基本结构 1.三种基本结构 绝大多数编程语言,都有三种最基本的程序结构:顺序结构.分支结构.循环结构.这三种结构的流程图如下所示(从左至右依次为:顺 ...
- 546、Zookeeper详细入门教程系列 -【Zookeeper内部原理】 2022.11.06
目录 一.Zookeeper内部原理 1.1 节点类型(Znode) 1.2 Stat结构体 1.3 监听器原理 1.4 选举机制 1.5 写数据流程 二.最后 三.参考链接 一.Zookeeper内 ...
- Python ln_Python入门教程(三):史上最全的Numpy计算函数总结,建议收藏!
点击上方 蓝字 关注我们 Numpy提供了灵活的.静态类型的.可编译的程序接口口来优化数组的计算,也被称作向量操作,因此在Python数据科学界Numpy显得尤为重要.Numpy的向量操作是通过通用函 ...
- python乘法表运算_Python入门教程(三):史上最全的Numpy计算函数总结,建议收藏!...
点击上方 蓝字 关注我们 Numpy提供了灵活的.静态类型的.可编译的程序接口口来优化数组的计算,也被称作向量操作,因此在Python数据科学界Numpy显得尤为重要.Numpy的向量操作是通过通用函 ...
最新文章
- git管理大项目或者大文件
- [微信小程序]滚动选择器
- 2007 China MVP Open Day
- PL/SQL Developer 和 instantclient客户端快速安装配置(图文)
- LeetCode 电子书!
- Qt界面设计器中的界面预览与程序运行时界面不一样
- springboot配置单独的参数文件
- TensorFlow2.0(六)--超参数搜索
- java实现通用查询_通用业务引用查询服务实现
- [经典控件]按钮和菜单
- 计算(数字)的观点看世界
- 利用Python进行数据分析笔记-时间序列(时区、周期、频率)
- Java高并发架构设计
- 简单比较 http https http2
- 计算机网络体系结构与数据通信(一) 概述
- pccad无法找到所需的动态链接库_关于PCCAD的,请大神回答!!感激不尽~~~~已经重新装过2次了!!!!...
- springbooot使用google验证码
- 朱晔的互联网架构实践心得S2E2:写业务代码最容易掉的8种坑
- python_6_17
- 微信小程序——弹窗组件