基于Curator的Zookeeper操作实战
前言
Zookeeper操作方式
这篇文章主要说的是利用java来操作zookeeper,就如操作mysql数据库一样,主要是实现增删改查功能,而实现这些功能的方式主要有以下三种:
- zookeeper官方提供的原生的api
- zkclient
- Apache Curator
简单说下三种方式的区别与各自的优劣:
- zookeeper自带的客户端是官方提供的,比较底层、使用起来写代码麻烦,很多功能需要自己来实现、不够直接。
- zkclient是另一个开源的ZooKeeper客户端。
- Apache Curator是Apache的开源项目,封装了zookeeper自带的客户端,使用相对简便,易于使用。
Curator介绍
这篇主要讲解使用基于Curator操作Zookeeper的操作案例。
Apache Curator是一个比较完善的ZooKeeper客户端框架,通过封装的一套高级API 简化了ZooKeeper的操作。通过查看官方文档,可以发现Curator主要解决了三类问题:
- 封装ZooKeeper client与ZooKeeper server之间的连接处理
- 提供了一套Fluent风格的操作API
- 提供ZooKeeper各种应用场景(recipe, 比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装
基于Curator使用Zookeeper
导入依赖
<!-- ZooKeeper 之 Curator--><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.2.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.2.0</version></dependency>
Zookeeper客户端
创建客户端
使用CuratorFrameworkFactory的两个静态工厂方法来创建zookeeper客户端对象,主要需要设置以下参数:
- connectString:zookeeper服务器地址及端口号,多个zookeeper服务器地址以“,”分隔。
- sessionTimeoutMs:会话超时时间,单位毫秒,默认为60000ms。
- connectionTimeoutMs:连接超时时间,单位毫秒,默认为15000ms。
- namespace: 为了实现不同的Zookeeper业务之间的隔离,往往会为每个业务分配一个独立的命名空间,即指定一个Zookeeper根路径
- retryPolicy:重试连接策略,有四种实现,分别为:
- ExponentialBackoffRetry(重试指定的次数, 且每一次重试之间停顿的时间逐渐增加)
- RetryNtimes(指定最大重试次数的重试策略)
- RetryOneTimes(仅重试一次)、
- RetryUntilElapsed(一直重试直到达到规定的时间)
//Zookeeper客户端private CuratorFramework client; @Beforepublic void testConnect() {client = CuratorFrameworkFactory.builder().connectString("192.168.124.18:2181") //连接地址和端口号.sessionTimeoutMs(10000) //会话超时时间.connectionTimeoutMs(1000) // 连接超时时间.namespace("/test") //名称空间.retryPolicy(new ExponentialBackoffRetry(1000, 10)) //重试策略.build();client.start();//开启连接}
关闭客户端
@Afterpublic void testClose() {if (client != null) {client.close();}}
节点操作
创建节点
主要有以下四种:
- 创建普通节点(默认即是持久化节点)
- 创建多集节点
- 创建指定节点类型的节点:
- PERSISTENT:持久化节点
- PERSISTENT_SEQUENTIAL:持久化且带序列号节点
- EPHEMERAL:临时节点
- EPHEMERAL_SEQUENTIAL:临时且带序列号节点
- 创建带指定数据的节点
@Testpublic void testCreateNode() throws Exception {//1.普通创建client.create().forPath("/test1");//2.创建多集节点client.create().creatingParentContainersIfNeeded().forPath("/test2/demo");//3.设置创建节点类型client.create().withMode(CreateMode.EPHEMERAL).forPath("/test3");//4.创建带指定数据的节点client.create().forPath("/test4", "This is test4".getBytes(StandardCharsets.UTF_8));}
更新节点
@Testpublic void testUpdateNode() throws Exception {//更新一个节点的数据内容client.setData().forPath("/test4", "第一次更新".getBytes());//更新一个节点的数据内容,强制指定版本进行更新client.setData().withVersion(1).forPath("/test4", "第二次更新".getBytes());}
查询节点
@Testpublic void testQueryNode() throws Exception {Stat stat1 = client.checkExists().forPath("/test1");System.out.println("路径节点/test1是否存在:" + (stat1 != null));byte[] bytes = client.getData().forPath("/test2");System.out.println("路径/test2的数据是:" + new String(bytes));Stat stat2 = new Stat();bytes = client.getData().storingStatIn(stat2).forPath("/test4");System.out.println("路径/test4的数据是:" + new String(bytes));System.out.println("路径/test4的数据的版本是:" + stat2.getVersion());}
删除节点
@Testpublic void testDeleteNode() throws Exception {//只能删除叶子节点client.delete().forPath("/test1");//删除一个节点,并递归删除其所有子节点client.delete().deletingChildrenIfNeeded().forPath("/test2");//强制指定版本进行删除client.delete().withVersion(1).forPath("/test4");//注意:由于一些网络原因,上述的删除操作有可能失败,使用guaranteed()// 如果删除失败,会记录下来,只要会话有效,就会不断的重试,直到删除成功为止client.delete().guaranteed().forPath("/test3");}
异步接口
上面提到的创建、删除、更新、读取等方法都是同步的,Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。
CuratorEventType
事件类型 | 对应CuratorFramework实例的方法 |
---|---|
CREATE | create() |
DELETE | delete() |
EXISTS | checkExists() |
GET_DATA | getData() |
SET_DATA | setData() |
CHILDREN | getChildren() |
SYNC | sync(String,Object) |
GET_ACL | getACL() |
SET_ACL | setACL() |
WATCHED | Watcher(Watcher) |
CLOSING | close() |
响应码(getResultCode())
响应码 | 意义 |
---|---|
0 | OK,即调用成功 |
-4 | ConnectionLoss,即客户端与服务端断开连接 |
-110 | NodeExists,即节点已经存在 |
-112 | SessionExpired,即会话过期 |
异步创建节点
Executor executor = Executors.newFixedThreadPool(2);
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground((curatorFramework, curatorEvent) -> { System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));},executor).forPath("path");
监听器
Curator提供了三种Watcher(Cache)来监听结点的变化:
Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。
Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。产生的事件会传递给注册的PathChildrenCacheListener。
Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。
Node Cache
@Testpublic void testNodeCache() throws Exception {//最后一个参数表示是否进行压缩NodeCache cache = new NodeCache(client, "/super", false);cache.start(true);//只会监听节点的创建和修改,删除不会监听cache.getListenable().addListener(() -> {System.out.println("路径:" + cache.getCurrentData().getPath());System.out.println("数据:" + new String(cache.getCurrentData().getData()));System.out.println("状态:" + cache.getCurrentData().getStat());});client.create().forPath("/nodeCache", "1234".getBytes());Thread.sleep(1000);client.setData().forPath("/nodeCache", "5678".getBytes());Thread.sleep(1000);client.delete().forPath("/nodeCache");Thread.sleep(5000);}
Path Cache
@Testpublic void testPathChildrenCache() throws Exception {//第三个参数表示是否接收节点数据内容PathChildrenCache childrenCache = new PathChildrenCache(client, "/super", true);/*** 如果不填写这个参数,则无法监听到子节点的数据更新如果参数为PathChildrenCache.StartMode.BUILD_INITIAL_CACHE,则会预先创建之前指定的/super节点如果参数为PathChildrenCache.StartMode.POST_INITIALIZED_EVENT,效果与BUILD_INITIAL_CACHE相同,只是不会预先创建/super节点参数为PathChildrenCache.StartMode.NORMAL时,与不填写参数是同样的效果,不会监听子节点的数据更新操作*/childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);childrenCache.getListenable().addListener((framework, event) -> {switch (event.getType()) {case CHILD_ADDED:System.out.println("CHILD_ADDED,类型:" + event.getType() + ",路径:" + event.getData().getPath() + ",数据:" +new String(event.getData().getData()) + ",状态:" + event.getData().getStat());break;case CHILD_UPDATED:System.out.println("CHILD_UPDATED,类型:" + event.getType() + ",路径:" + event.getData().getPath() + ",数据:" +new String(event.getData().getData()) + ",状态:" + event.getData().getStat());break;case CHILD_REMOVED:System.out.println("CHILD_REMOVED,类型:" + event.getType() + ",路径:" + event.getData().getPath() + ",数据:" +new String(event.getData().getData()) + ",状态:" + event.getData().getStat());break;default:break;}});client.create().forPath("/pathChildrenCache", "123".getBytes());client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1", "c1内容".getBytes());//经测试,不会监听到本节点的数据变更,只会监听到指定节点下子节点数据的变更client.setData().forPath("/pathChildrenCache", "456".getBytes());client.setData().forPath("/pathChildrenCache/c1", "c1新内容".getBytes());client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");Thread.sleep(5000);}
Tree Cache
@Testpublic void testTreeCache() throws Exception {TreeCache treeCache = new TreeCache(client, "/treeCache");treeCache.start();treeCache.getListenable().addListener((curatorFramework, treeCacheEvent) -> {switch (treeCacheEvent.getType()) {case NODE_ADDED:System.out.println("NODE_ADDED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())+ ",状态:" + treeCacheEvent.getData().getStat());break;case NODE_UPDATED:System.out.println("NODE_UPDATED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())+ ",状态:" + treeCacheEvent.getData().getStat());break;case NODE_REMOVED:System.out.println("NODE_REMOVED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())+ ",状态:" + treeCacheEvent.getData().getStat());break;default:break;}});client.create().forPath("/treeCache", "123".getBytes());client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/treeCache/c1", "456".getBytes());client.setData().forPath("/treeCache", "789".getBytes());client.setData().forPath("/treeCache/c1", "910".getBytes());client.delete().forPath("/treeCache/c1");client.delete().forPath("/treeCache");Thread.sleep(5000);}
事务管理
碰到异常,事务会回滚
@Testpublic void testTransaction() throws Exception{//定义几个基本操作CuratorOp createOp = client.transactionOp().create().forPath("/curator/one_path","some data".getBytes());CuratorOp setDataOp = client.transactionOp().setData().forPath("/curator","other data".getBytes());CuratorOp deleteOp = client.transactionOp().delete().forPath("/curator");//事务执行结果List<CuratorTransactionResult> results = client.transaction().forOperations(createOp,setDataOp,deleteOp);//遍历输出结果//因为节点“/curator”存在子节点,所以在删除的时候将会报错,事务回滚for(CuratorTransactionResult result : results){System.out.println("执行结果是: " + result.getForPath() + "--" + result.getType());}}
Leader选举
在分布式计算中, leader elections是很重要的一个功能, 这个选举过程是这样子的: 指派一个进程作为组织者,将任务分发给各节点。 在任务开始前, 哪个节点都不知道谁是leader(领导者)或者coordinator(协调者). 当选举算法开始执行后, 每个节点最终会得到一个唯一的节点作为任务leader. 除此之外, 选举还经常会发生在leader意外宕机的情况下,新的leader要被选举出来。
在zookeeper集群中,leader负责写操作,然后通过Zab协议实现follower的同步,leader或者follower都可以处理读操作。
Curator 有两种leader选举的recipe,分别是
- LeaderSelector:所有存活的客户端不间断的轮流做Leader
- LeaderLatch:一旦选举出Leader,除非有客户端挂掉重新触发选举,否则不会交出领导权
public class CuratorLeaderTest {/** Zookeeper info */private static final String ZK_ADDRESS = "192.168.1.100:2181";private static final String ZK_PATH = "/zktest";public static void main(String[] args) throws InterruptedException {LeaderSelectorListener listener = new LeaderSelectorListener() {@Overridepublic void takeLeadership(CuratorFramework client) throws Exception {System.out.println(Thread.currentThread().getName() + " take leadership!");// takeLeadership() method should only return when leadership is being relinquished.Thread.sleep(5000L);System.out.println(Thread.currentThread().getName() + " relinquish leadership!");}@Overridepublic void stateChanged(CuratorFramework client, ConnectionState state) {}};new Thread(() -> {registerListener(listener);}).start();new Thread(() -> {registerListener(listener);}).start();new Thread(() -> {registerListener(listener);}).start();Thread.sleep(Integer.MAX_VALUE);}private static void registerListener(LeaderSelectorListener listener) {// 1.Connect to zkCuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS,new RetryNTimes(10, 5000));client.start();// 2.Ensure pathtry {new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient());} catch (Exception e) {e.printStackTrace();}// 3.Register listenerLeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener);selector.autoRequeue();selector.start();}
}
分布式锁
分布式编程时,比如最容易碰到的情况就是应用程序在线上多机部署,于是当多个应用同时访问某一资源时,就需要某种机制去协调它们。
具体实现请参考我的上一篇文章:Zookeeper实现分布式锁
基于Curator的Zookeeper操作实战相关推荐
- kfaka storm写入mysql_基于Storm+Kafka+Zookeeper锁+Memcached+mysql架构全方位系统Storm项目案例实战...
基于Storm+Kafka+Zookeeper锁+Memcached+mysql架构全方位系统Storm项目案例实战 适应人群 有一定Storm基础.Kafka基础.Memcached基础.Zooke ...
- 分布式锁(基于redis和zookeeper)详解
分布式锁(基于redis和zookeeper)详解 https://blog.csdn.net/a15835774652/article/details/81775044 为什么写这篇文章? 目前网上 ...
- 【Elasticsearch】Curator 从入门到实战
1.概述 转载:Curator 从入门到实战 Curator 是elasticsearch 官方的一个索引管理工具,可以通过配置文件的方式帮助我们对指定的一批索引进行创建/删除.打开/关闭.快照/恢复 ...
- Kafka原理+操作+实战
Kafka原理+操作+实战 前面我和大家交流了kafka的部署安装.对于部署安装这都是小意思,不值得太多的提及.重点还是需要知道kafka原理.熟练掌握kafka命令以及灵活用于kafka场景. 干货 ...
- MongoDB+集成SpringBoot+索引+并发优化 - 基于《MongoDB进阶与实战:唐卓章》
文章目录 MongoDB - 基于<MongoDB进阶与实战:唐卓章> 一.首次安装 服务安装 配置文件修改 可视化工具 Docker部署 二.基本使用 2.1 概念解析 2.2 Mong ...
- 基于 Kafka 和 ZooKeeper 的分布式消息队列原理
转载:https://gitbook.cn/books/5bc446269a9adf54c7ccb8bc/index.html 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量 ...
- 再谈基于 Kafka 和 ZooKeeper 的分布式消息队列原理
关于分布式消息队列,我在几个月前写过一篇文章:<深入浅出理解基于 Kafka 和 ZooKeeper 的分布式消息队列 >.最近,由于写作课程<分布式中间件实践之路>的契机,我 ...
- iKcamp|基于Koa2搭建Node.js实战(含视频)☞ 路由koa-router
路由koa-router--MVC 中重要的环节:Url 处理器 ?? iKcamp 制作团队 原创作者:大哼.阿干.三三.小虎.胖子.小哈.DDU.可木.晃晃 文案校对:李益.大力萌.Au.DDU. ...
- 第三章:Python基础の函数和文件操作实战
本課主題 Set 集合和操作实战 函数介紹和操作实战 参数的深入介绍和操作实战 format 函数操作实战 lambda 表达式介绍 文件操作函数介紹和操作实战 本周作业 Set 集合和操作实战 Se ...
最新文章
- C/C++程序训练6---歌德巴赫猜想的证明_JAVA
- 机器学习实战笔记(Python实现)-01-机器学习实战
- 安装deepin linux
- python练习题:列表排序
- java bean 验证_Java Bean验证基础
- 面试官不讲武德,居然让我讲讲蠕虫和金丝雀!
- 如何将c语言程序封装供python调用_C++调用python
- 如何正确的通过 C++ Primer 学习 C++?(转自知乎)
- Java的正则表达式
- 【Day02】测试 Primise、setTimeout等的执行顺序
- 资深架构师谈云原生生态的基石Kubernetes
- RSLogix 5000 含序列号 20.03版本,带授权
- grub4dos 制作WIN7、WINPE2003、Ubuntu、dos工具箱多启动U盘 (不量产)
- 细胞自动机,那是什么?
- angular项目如何配置国际化(i18n)?
- PS进阶篇——如何PS软件给图片部分位置打马赛克(四)
- 解密:股票短线起涨点的挂单玄机!
- 爱快软路由设置DHCP多个LAN处于同一网段
- 阿里合伙人制度的意义及法律分析
- Spring(MVC)框架