curator

curator是Netflix开源的一个zookeeper客户端,后来捐给apache。curator框架在zookeeper原生API接口上进行了包装,解决了很多zookeeper客户端非常底层的细节开发。提供zookeeper各种应用场景(分布式锁、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装,实现了Fluent风格的API接口,是最流行的zookeeper客户端。

原生zookeeperAPI的不足:

  • 连接对象异步创建,需要开发人员自行编码等待
  • 连接没有自动超时重连机制
  • watcher一次注册只能生效一次
  • 不支持递归创建树形结点

curator的特点:

  • 解决session会话超时重连
  • watcher可以反复注册
  • 简化API使用
  • 支持Fluent风格
  • 提供了分布式锁服务、共享计数器、缓存等机制

环境搭建

创建一个maven工程 导入依赖

 <dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>2.6.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.14</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.6.0</version></dependency></dependencies>

先在Linux里打开集群服务端

在maven工程里创建测试类进行连接测试

public class CuratorConnection {public static void main(String[] args) {//创建连接对象CuratorFramework client= CuratorFrameworkFactory.builder().connectString("192.168.2.142:2181,192.168.2.142:2182,192.168.2.142:2183")//集群的ip地址和端口号.sessionTimeoutMs(5000)//超时时间.retryPolicy(new RetryOneTime(3000))//重连机制 超时3秒后重连一次.namespace("create")//指定命名空间.build();//构建连接//打开连接client.start();System.out.println(client.isStarted()?"连接集群成功":"连接失败");//关闭连接client.close();}
}

运行结果:

重连策略 retryPolicy

RetryOneTime 指定时间后重连一次
RetryNTimes 指定时间重连n次
RetryUntilElapsed 每隔指定时间后重连一次,不能超过设置总时间
ExponentialBackoffRetry 重连n次,时间基于参数计算,越来越长

        //3秒后重连1次RetryPolicy r1 = new RetryOneTime(3000);//每3秒重连1次,一共3次RetryPolicy r2 = new RetryNTimes(3, 3000);//每3秒重连1次,总时间10秒RetryPolicy r3 = new RetryUntilElapsed(10000, 3000);//随着重连次数增加,重连间隔变长(基于第一个参数计算)RetryPolicy r4 = new ExponentialBackoffRetry(1000, 3);

新增结点 create

创建好测试框架

public class CuratorCreate {private static final String IP="192.168.2.142:2181,192.168.2.142:2182,192.168.2.142:2183";private CuratorFramework client;@Beforepublic void connect(){//重连机制RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,3);//创建连接client= CuratorFrameworkFactory.builder().connectString(IP).sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("create").build();//打开连接client.start();System.out.println("连接创建成功");}@Afterpublic void close(){client.close();}
}

创建持久结点

 @Testpublic void create1() throws Exception{//新增结点client.create()//持久化结点.withMode(CreateMode.PERSISTENT)//权限列表为 world:anyone:cdrwa.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//arg1 结点路径 arg2 结点数据 由于指定了命名空间 会创建/create/node1.forPath("/node1","2020GetGoodOffer".getBytes());}

是真的JB慢 我都以为我电脑有问题

运行后在Linux端查询结果

自定义权限列表

 @Testpublic void create2() throws Exception{//自定义权限列表List<ACL> list=new ArrayList<>();Id id=new Id("ip","192.168.2.142");list.add(new ACL(ZooDefs.Perms.ALL,id));//新增结点client.create().withMode(CreateMode.PERSISTENT).withACL(list).forPath("/node2","2020GetGoodOffer".getBytes());}

又是等了2分钟。。生效


递归创建

 @Testpublic void create3() throws Exception{//递归创建client.create().creatingParentsIfNeeded()//如果父路径不存在则创建.withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath("/node3/node31","2020GetGoodOffer".getBytes());}

每次都要等2分钟。。


异步创建

 @Testpublic void create4() throws Exception{//异步创建client.create().creatingParentsIfNeeded()//如果父路径不存在则创建.withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).inBackground(new BackgroundCallback() {//异步回调方法@Overridepublic void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) {System.out.println("结点路径"+curatorEvent.getPath());System.out.println("事件类型"+curatorEvent.getType());}}).forPath("/node4","2020GetGoodOffer".getBytes());Thread.sleep(5000);}

更改结点数据 setData

和创建结点大同小异 由于过于耗时 就不演示了

public class CuratorSet {private static final String IP="192.168.2.142:2181,192.168.2.142:2182,192.168.2.142:2183";private CuratorFramework client;@Beforepublic void connect(){//重连机制RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,3);//创建连接client= CuratorFrameworkFactory.builder().connectString(IP).sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("set").build();//打开连接client.start();System.out.println("连接创建成功");}@Afterpublic void close(){client.close();}@Testpublic void set1() throws Exception{//更新结点client.setData().forPath("/node1","2020GetGoodOffer".getBytes());}@Testpublic void set2() throws Exception{//使用版本号更新 -1代表不参与client.setData().withVersion(-1)//不更新版本号.forPath("/node2","2020GetGoodOffer".getBytes());}@Testpublic void set3() throws Exception{//异步修改client.setData().withVersion(-1).inBackground(new BackgroundCallback() {//异步回调方法@Overridepublic void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) {System.out.println("结点路径"+curatorEvent.getPath());System.out.println("事件类型"+curatorEvent.getType());}}).forPath("/node4","2020GetGoodOffer".getBytes());Thread.sleep(5000);}
}

删除结点 delete

先创建一个结点

删除结点

 @Testpublic void del1() throws Exception{//删除结点client.delete().forPath("/node1");}

果然还是要2分钟,之后的就不演示了


使用版本号删除、递归删除、异步删除:

 @Testpublic void del2() throws Exception{//使用版本号删除 -1代表不参与client.delete().withVersion(-1)//不更新版本号.forPath("/node2");}@Testpublic void del3() throws Exception{//递归删除client.delete().deletingChildrenIfNeeded().forPath("/node3");}@Testpublic void del4() throws Exception{//异步删除client.delete().withVersion(-1).inBackground(new BackgroundCallback() {//异步回调方法@Overridepublic void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) {System.out.println("结点路径"+curatorEvent.getPath());System.out.println("事件类型"+curatorEvent.getType());}}).forPath("/node4");Thread.sleep(5000);}

查看结点 get

public class CuratorGet {private static final String IP="192.168.2.142:2181,192.168.2.142:2182,192.168.2.142:2183";private CuratorFramework client;@Beforepublic void connect(){//重连机制RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,3);//创建连接client= CuratorFrameworkFactory.builder().connectString(IP).sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("get").build();//打开连接client.start();System.out.println("连接创建成功");}@Afterpublic void close(){client.close();}@Testpublic void get1() throws Exception{//读取结点byte[] bytes = client.getData().forPath("/node1");System.out.println(new String(bytes));}@Testpublic void get2() throws Exception{//读取数据时读取结点属性Stat stat=new Stat();client.getData().storingStatIn(stat).forPath("/node2");}@Testpublic void get3() throws Exception{//异步读取client.getData().inBackground(new BackgroundCallback() {//异步回调方法@Overridepublic void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) {System.out.println("结点路径"+curatorEvent.getPath());System.out.println("事件类型"+curatorEvent.getType());}}).forPath("/node3");Thread.sleep(5000);}
}

真的是慢。。


读取子结点数据 getChildren

public class CuratorGetChild {private static final String IP="192.168.2.142:2181,192.168.2.142:2182,192.168.2.142:2183";private CuratorFramework client;@Beforepublic void connect(){//重连机制RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,3);//创建连接client= CuratorFrameworkFactory.builder().connectString(IP).sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("getChild").build();//打开连接client.start();System.out.println("连接创建成功");}@Afterpublic void close(){client.close();}@Testpublic void getChild1() throws Exception{//读取结点List<String> list = client.getChildren().forPath("/node1");System.out.println(list);}@Testpublic void getChild2() throws Exception{//读取数据时读取结点属性Stat stat=new Stat();client.getChildren().storingStatIn(stat).forPath("/node2");}@Testpublic void getChild3() throws Exception{//异步读取client.getChildren().inBackground(new BackgroundCallback() {//异步回调方法@Overridepublic void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) {System.out.println("结点路径"+curatorEvent.getPath());System.out.println("事件类型"+curatorEvent.getType());}}).forPath("/node3");Thread.sleep(5000);}
}

检查结点是否存在 checkExists

public class CuratorExists {private static final String IP="192.168.2.142:2181,192.168.2.142:2182,192.168.2.142:2183";private CuratorFramework client;@Beforepublic void connect(){//重连机制RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,3);//创建连接client= CuratorFrameworkFactory.builder().connectString(IP).sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("get").build();//打开连接client.start();System.out.println("连接创建成功");}@Afterpublic void close(){client.close();}@Testpublic void exists1() throws Exception{//判断结点是否存在Stat stat = client.checkExists().forPath("/node1");System.out.println(stat);}@Testpublic void exists2() throws Exception{//异步判断client.checkExists().inBackground(new BackgroundCallback() {//异步回调方法@Overridepublic void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) {System.out.println("结点路径"+curatorEvent.getPath());System.out.println("事件类型"+curatorEvent.getType());}}).forPath("/node2");Thread.sleep(5000);}
}

watcher

NodeCache 监听特定结点

@Testpublic void watcher1() throws Exception{//监视某个结点 arg1 连接对象 arg2 监视路径NodeCache nodeCache=new NodeCache(client,"/watcher1");//启动监视器nodeCache.start();System.out.println("监视器已打开");nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println("结点路径: "+nodeCache.getCurrentData().getPath());}});Thread.sleep(150000);//关闭监视器nodeCache.close();System.out.println("监视器已关闭");}

在Linux客户端修改时观察控制台输出


PathChildren Cache 监听子结点

@Testpublic void watcher2() throws Exception{//监视子结点 arg1 连接对象 arg2 监视路径 arg3 能否读取数据PathChildrenCache pathChildrenCache=new PathChildrenCache(client,"/watcher1",true);//启动监视器pathChildrenCache.start();System.out.println("监视器已打开");pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) {System.out.println("结点事件类型 :"+pathChildrenCacheEvent.getType());System.out.println("结点的路径: "+pathChildrenCacheEvent.getData().getPath());System.out.println("结点数据: "+new String(pathChildrenCacheEvent.getData().getData()));}});Thread.sleep(150000);//关闭监视器pathChildrenCache.close();System.out.println("监视器已关闭");}

当在linux客户端修改时,观察控制台输出


事务 inTransaction

假设node2结点不存在,不使用事务时,第一个操作是成功的

 @Testpublic void tran1() throws Exception{client.create().forPath("/node1","node1".getBytes());//成功client.setData().forPath("/node2","node2".getBytes());//失败}

可以看到操作2失败了

但是node1却成功创建了


使用事务来进行控制

@Testpublic void tran2() throws Exception{client.inTransaction()//开启事务.create().forPath("/node1","node1".getBytes()).and().setData().forPath("/node2","node2".getBytes()).and().commit();}

此时操作2还是失败的,但由于使用了事务,操作1也一起失败了


分布式锁

InterProcessMutex 分布式可重入排它锁

 @Testpublic void lock1() throws Exception{//排它锁InterProcessLock interProcessLock=new InterProcessMutex(client,"/lock1");System.out.println("等待获取锁对象");interProcessLock.acquire();for(int i=1;i<=10;i++){Thread.sleep(3000);System.out.println(i);}interProcessLock.release();System.out.println("等待释放锁");}

同时启动两个客户端

当第一个客户端释放了锁,第二个才能继续

InterProcessReadWriteLock 分布式读写锁
读锁之间是共享的

 @Testpublic void lock2() throws Exception{//读写锁InterProcessReadWriteLock interProcessReadWriteLock=new InterProcessReadWriteLock(client,"/lock1");InterProcessMutex interProcessLock = interProcessReadWriteLock.readLock();System.out.println("等待读锁对象");interProcessLock.acquire();for(int i=1;i<=10;i++){Thread.sleep(3000);System.out.println(i);}interProcessLock.release();System.out.println("等待释放锁");}



写锁之间、写锁与读锁都是互斥的,就不全部演示了

 @Testpublic void lock3() throws Exception{//读写锁InterProcessReadWriteLock interProcessReadWriteLock=new InterProcessReadWriteLock(client,"/lock1");InterProcessMutex interProcessLock = interProcessReadWriteLock.writeLock();System.out.println("等待写锁对象");interProcessLock.acquire();for(int i=1;i<=10;i++){Thread.sleep(3000);System.out.println(i);}interProcessLock.release();System.out.println("等待释放锁");}

监控命令

zookeeper支持某些特定四字命令与其交互,它们大多是查询命令,用来获取zookeeper服务的当前状态及相关信息。用户在客户端可以通过telnet或nc向zookeeper提交相应命令。

如果rpm检查没有telnet,先使用yum安装telnet的客户端和服务端


连接集群

mntr 集群状态

从上到下依次为
zookeeper版本
平均延时
最大延时
最小延时
收包数
发包数
连接数
堆积请求数
服务器角色
znode结点数量
watch数量
临时结点数量
数据大小
打开的文件描述符数量
最大文件描述符数量

也可以使用nc命令
先安装nc

使用nc命令执行mntr


conf 服务配置的详细信息

clientPort 客户端端口号
dataDir 数据快照文件目录 默认10万次操作生成一次快照
dataLogDir 事务日志文件目录,生产环境放在独立磁盘上
tickTime 服务器之间或客户端与服务器之间维持心跳的时间间隔
maxClientCnxns 最大连接数
minSessionTimeout 最小session超时时间 心跳时间x2 指定时间小于该时间默认使用此时间
maxSessionTimeout 最大session超时时间 心跳时间x20 指定时间大于该时间默认使用此时间
serverId 服务器编号
initLimit 集群中follow与leader之间初始连接能容忍的最大心跳数
syncLimit 集群中follow与leader之间请求和应答能容忍的最大心跳数
electionAlg 选举算法 3 基于TCP
electionPort 选举端口
quorumPort 集群之间的通信端口
peerType 是否观察者 1表示是


cons命令 所有连接到这台服务器的客户端连接/会话的详细信息

45692 客户端发送请求的端口号
queued 等待处理的请求数
received 收到的包数
sent 发送的包数
sid 会话id
lop 最后的操作的操作类型
est 连接时间戳
to 超时时间
lcxid 当前会话的操作id
lzxid 最大事务id
lresp 最后响应时间戳
llat 最新延时
minlat 最小延时
avglat 平均延时
maxlat 最大延时


crst 重置当前服务器所有连接/会话的统计信息


dump 列出未经处理的会话和临时结点

先创建一个临时结点

使用dump查询


envi 输出服务器环境配置信息

从上到下分别为:
zookeeper版本
host名字
Java版本
供应商
jre目录
Java classpath
java第三方类库路径
Java临时文件路径
JIT编译器名称
操作系统名字
操作系统位数
操作系统版本
用户名
用户目录
用户bin目录


ruok 测试服务器是否处于运行状态

返回imok(代表处于运行状态)


stat 输出服务器详细信息

从上到下依次为:
zookeeper版本
延时 最小/平均/最大
收包
发包
连接数 2 因为nc命令也会创建一个
堆积的未处理请求数
最大事务id
服务器模式
结点数


srvr

类似stat,少了连接的会话信息

srst 重置服务器


wchs 列出watcher信息

创建一个watcher

查看watcher信息


wchc 通过session分组列出watch的结点

默认不支持该命令


wchp 和wchc类似,根据结点路径分组

默认不支持该命令


【菜鸟教程】Zookeeper基础入门(使用curator)【下】相关推荐

  1. 菜鸟教程 + Java基础课程 + part2

    菜鸟教程 + Java基础课程 + part2 目录 菜鸟教程 + Java基础课程 + part2 八 Number &Math 类 1. Number类 2. Math类 3. Numbe ...

  2. 视频教程-20年Nodejs教程零基础入门到项目实战前端视频教程-Node.js

    20年Nodejs教程零基础入门到项目实战前端视频教程 7年的开发架构经验,曾就职于国内一线互联网公司,开发工程师,现在是某创业公司技术负责人, 擅长语言有node/java/python,专注于服务 ...

  3. 视频教程-19全新mysql教程零基础入门实战精讲mysql视频DBA数据库视频教程SQL教程-MySQL

    19全新mysql教程零基础入门实战精讲mysql视频DBA数据库视频教程SQL教程 7年的开发架构经验,曾就职于国内一线互联网公司,开发工程师,现在是某创业公司技术负责人, 擅长语言有node/ja ...

  4. 汇编语言系列教程之基础入门 (一)

    汇编语言系列教程之基础入门 (一) http://www.cnblogs.com/flyor/p/7208948.html 机器字长 机器字长是指CPU一次运算所能处理的数据的位数,一般来说这个数的和 ...

  5. Marvelous Designer布料和角色服装造型完整教程零基础入门到精通实用教学视频教程

    Marvelous Designer布料和角色服装造型完整教程零基础入门到精通实用教学视频教程 marvelous designer是目前世界上最流行的服装打板和模拟软件,能够即时的演算服装的打板,外 ...

  6. python零基础入门教程-零基础入门Python爬虫不知道怎么学?这是入门的完整教程...

    原标题:零基础入门Python爬虫不知道怎么学?这是入门的完整教程 这是一个适用于小白的Python爬虫免费教学课程,只有7节,让零基础的你初步了解爬虫,跟着课程内容能自己爬取资源.看着文章,打开电脑 ...

  7. python基础教程菜鸟教程-python基础菜鸟教程,Python的基础语法

    原标题:python基础菜鸟教程,Python的基础语法 什么是Python?Python是一门简单直观的编程语言,并且目前是开源的,可以方便任何人使用. Python的开发哲学:用一种方法,最好是只 ...

  8. 2023年黑马Java入门到精通教程--Java基础入门

    推荐教程:java零基础入门到精通 如何使用Java Java语言的产品是 JDK(Java Development Kit :Java开发者工具包) ,必须安装JDK才能使用Java语言. JDK产 ...

  9. java adt入门教程_Android基础入门教程目录

    第一章:环境搭建与开发相关(已完结 10/10) https://blog.csdn.net/coder_pig/article/details/50000773 Android基础入门教程--1.1 ...

  10. 零基础自学python教程-零基础入门学习Python_Python教程

    教程名称:零基础入门学习Python 课程目录: [易源码www.pnp8.com]000愉快的开始 [易源码www.pnp8.com]001我和Python的第一次亲密接触 [易源码www.pnp8 ...

最新文章

  1. LeetCode Gray Code(回溯法)
  2. SQL Server Indexes
  3. 40亿条/秒!Flink流批一体在阿里双11首次落地的背后
  4. scala部分应用函数_Scala中的部分函数
  5. %求余数 rand随机数
  6. 21 MM配置-采购-定义采购组
  7. 解耦知识蒸馏,让Hinton在7年前提出的方法重回SOTA行列(CVPR 2022)
  8. java 服务 容量评估,容器云平台容量规划及管理优化
  9. BZOJ4197: [Noi2015]寿司晚宴
  10. 一段和别人的对话,没事写出来(杂项)
  11. 使用vue构建一个可视化大数据平台
  12. MIPS32-单周期数据通路设计
  13. 计算机毕业设计java+jsp学科竞赛管理系统(源码+系统+mysql数据库+Lw文档)
  14. apdu 移动sim_SIM之APDU指令格式 | 学步园
  15. 怎么调节台式计算机字体大小,电脑上怎么调整字体大小
  16. 从小鸡仔开始的遗传算法
  17. 拉格朗日乘子法——从单约束到多约束的直观翻译
  18. 【微分方程数值解】常微分方程(一)欧拉方法和改进欧拉方法(附python算例,封装类)
  19. LinuxC编程中常见的段错误(非法操作内存)情形
  20. 压缩软件bandizip下载

热门文章

  1. 分享一个新软件 云端软件平台+个人使用心得
  2. Pure MVC 架构简述
  3. Android-日历CalendarView使用
  4. pomelo分布式聊天服务器详解
  5. CHAPTER 24 Dialog Systems and Chatbots
  6. Notification桌面提醒:HTML5新功能
  7. 2023五一杯数学建模A题B题C题思路分析汇总 五一数学建模思路
  8. 模拟实验室合成器插件-Arturia Analog Lab v5.2.0 WiN
  9. 虚拟光驱 daemon tools lite 4.4 下载 2011最新版
  10. 论文阅读_Robust Counterfactual Explanations on Graph Neural Networks