ZooKeeper 之Apache Curator 客户端使用
ZooKeeper 原生不足之处:
- 超时重连,不支持自动,需要手动操作
- Watch注册一次后会失效
- 不支持递归创建节点
Apache Curator
apache的开源项目,解决watcher注册一次就失效的问题,api更加简单易用,提供更多解决方案并且实现简单:如分布式锁
Maven 添加 Apache Curator 依赖
<!--zookeeper相关--><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.0.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.0.0</version></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.6</version></dependency>
zk基本操作
zk命名空间以及创建节点,节点的增删改查
import java.util.List;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;public class CuratorOperator {public CuratorFramework client = null;public static final String zkServerPath = "192.168.254.130:2181";/*** 实例化zk客户端*/public CuratorOperator() {/*** curator链接zookeeper的策略:RetryNTimes* n:重试的次数* sleepMsBetweenRetries:每次重试间隔的时间*/RetryPolicy retryPolicy = new RetryNTimes(3, 5000);client = CuratorFrameworkFactory.builder().connectString(zkServerPath).sessionTimeoutMs(10000).retryPolicy(retryPolicy)//命名空间之后创建的节点都会在workspace工作空间里面.namespace("workspace").build();client.start();}/**** @Description: 关闭zk客户端连接*/public void closeZKClient() {if (client != null) {this.client.close();}}public static void main(String[] args) throws Exception {// 实例化CuratorOperator cto = new CuratorOperator();boolean isZkCuratorStarted = cto.client.isStarted();System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));// 创建节点String nodePath = "/bushro/demo";byte[] data = "data".getBytes();cto.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//默认的权限"world", "anyone".forPath(nodePath, data);更新节点数据byte[] newData = "newdata".getBytes();cto.client.setData().withVersion(0).forPath(nodePath, newData);// 删除节点cto.client.delete().guaranteed() // 如果删除失败,那么在后端还是继续会删除,直到成功.deletingChildrenIfNeeded() // 如果有子节点,就删除.withVersion(0).forPath(nodePath);// 读取节点数据Stat stat = new Stat();byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);System.out.println("节点" + nodePath + "的数据为: " + new String(data));System.out.println("该节点的版本号为: " + stat.getVersion());// 查询子节点List<String> childNodes = cto.client.getChildren().forPath(nodePath);System.out.println("开始打印子节点:");for (String s : childNodes) {System.out.println(s);}// 判断节点是否存在,如果不存在则为空Stat statExist = cto.client.checkExists().forPath(nodePath + "/abc");System.out.println(statExist);Thread.sleep(100000);cto.closeZKClient();boolean isZkCuratorStarted2 = cto.client.isStarted();System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));}public final static String ADD_PATH = "/super/imooc/d";}
watch与acl的操作
一次注册多次监听
为节点添加watch事件
// 为节点添加watcher//NodeCache: 监听数据节点的变更,会触发事件final NodeCache nodeCache = new NodeCache(cto.client, nodePath);// buildInitial : 初始化的时候获取node的值并且缓存nodeCache.start(true);if (nodeCache.getCurrentData() != null) {System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));} else {System.out.println("节点初始化数据为空...");}nodeCache.getListenable().addListener(new NodeCacheListener() {public void nodeChanged() throws Exception {if (nodeCache.getCurrentData() == null) {System.out.println("空");return;}String data = new String(nodeCache.getCurrentData().getData());System.out.println("节点路径:" + nodeCache.getCurrentData().getPath() + "数据:" + data);}});
为子节点添加watch事件
// 为子节点添加watcher// PathChildrenCache: 监听数据节点的增删改,会触发事件String childNodePathCache = nodePath;// cacheData: 设置缓存节点的数据状态final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, childNodePathCache, true);/*** StartMode: 初始化方式* POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件(比较好)* NORMAL:异步初始化* BUILD_INITIAL_CACHE:同步初始化*/childrenCache.start(StartMode.POST_INITIALIZED_EVENT);List<ChildData> childDataList = childrenCache.getCurrentData();System.out.println("当前数据节点的子节点数据列表:");for (ChildData cd : childDataList) {String childData = new String(cd.getData());System.out.println(childData);}childrenCache.getListenable().addListener(new PathChildrenCacheListener() {public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){System.out.println("子节点初始化ok...");}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){String path = event.getData().getPath();System.out.println("添加子节点:" + event.getData().getPath());System.out.println("子节点数据:" + new String(event.getData().getData()));}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){System.out.println("删除子节点:" + event.getData().getPath());}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){System.out.println("修改子节点路径:" + event.getData().getPath());System.out.println("修改子节点数据:" + new String(event.getData().getData()));}}});
watcher统一配置
例如创建一个存放redis的配置文件节点,里面放入相关操作的json数据如
{“type”:“add”,“url”:“ftp://192.168.254.130/config/redis.xml”,“remark”:“add”}
{“type”:“update”,“url”:“ftp://192.168.254.130/config/redis.xml”,“remark”:“update”}
{“type”:“delete”,“url”:"",“remark”:“delete”}
在程序中我们可以把节点数据转成实体类,根据type的类型来判断进行相应的操作,当zookeeper集群中监听到变化,每台机子就去下载最新的配置文件这样就不用一台一台机子的修改过去,节省时间。
public class Client1 {public CuratorFramework client = null;public static final String zkServerPath = "192.168.254.130:2181";public Client1() {RetryPolicy retryPolicy = new RetryNTimes(3, 5000);client = CuratorFrameworkFactory.builder().connectString(zkServerPath).sessionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("workspace").build();client.start();}public void closeZKClient() {if (client != null) {this.client.close();}}// public final static String CONFIG_NODE = "/bushro/demo/redis-config";public final static String CONFIG_NODE_PATH = "/bushro/demo";public final static String SUB_PATH = "/redis-config";public static CountDownLatch countDown = new CountDownLatch(1);public static void main(String[] args) throws Exception {Client1 cto = new Client1();System.out.println("client1 启动成功...");final PathChildrenCache childrenCache = new PathChildrenCache(cto.client, CONFIG_NODE_PATH, true);childrenCache.start(StartMode.BUILD_INITIAL_CACHE);// 添加监听事件childrenCache.getListenable().addListener(new PathChildrenCacheListener() {public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {// 监听节点变化if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){String configNodePath = event.getData().getPath();if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)) {System.out.println("监听到配置发生变化,节点路径为:" + configNodePath);// 读取节点数据String jsonConfig = new String(event.getData().getData());System.out.println("节点" + CONFIG_NODE_PATH + "的数据为: " + jsonConfig);// 从json转换配置(转实体类)RedisConfig redisConfig = null;if (StringUtils.isNotBlank(jsonConfig)) {redisConfig = JsonUtils.jsonToPojo(jsonConfig, RedisConfig.class);}// 配置不为空则进行相应操作if (redisConfig != null) {String type = redisConfig.getType();String url = redisConfig.getUrl();String remark = redisConfig.getRemark();// 判断事件if (type.equals("add")) {System.out.println("监听到新增的配置,准备下载...");// ... 连接ftp服务器,根据url找到相应的配置Thread.sleep(500);System.out.println("开始下载新的配置文件,下载路径为<" + url + ">");// ... 下载配置到你指定的目录Thread.sleep(1000);System.out.println("下载成功,已经添加到项目中");// ... 拷贝文件到项目目录} else if (type.equals("update")) {System.out.println("监听到更新的配置,准备下载...");// ... 连接ftp服务器,根据url找到相应的配置Thread.sleep(500);System.out.println("开始下载配置文件,下载路径为<" + url + ">");// ... 下载配置到你指定的目录Thread.sleep(1000);System.out.println("下载成功...");System.out.println("删除项目中原配置文件...");Thread.sleep(100);// ... 删除原文件System.out.println("拷贝配置文件到项目目录...");// ... 拷贝文件到项目目录} else if (type.equals("delete")) {System.out.println("监听到需要删除配置");System.out.println("删除项目中原配置文件...");}}}}}});countDown.await();cto.closeZKClient();}}
acl权限操作与认证授权
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;import com.imooc.utils.AclUtils;public class CuratorAcl {public CuratorFramework client = null;public static final String zkServerPath = "192.168.254.130:2181";public CuratorAcl() {RetryPolicy retryPolicy = new RetryNTimes(3, 5000);client = CuratorFrameworkFactory.builder().authorization("digest", "bushro1:123456".getBytes())//使用账号密码认证,可以认证多个用户.connectString(zkServerPath).sessionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("workspace").build();client.start();}public void closeZKClient() {if (client != null) {this.client.close();}}public static void main(String[] args) throws Exception {// 实例化CuratorAcl cto = new CuratorAcl();boolean isZkCuratorStarted = cto.client.isStarted();System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));String nodePath = "/acl/father/child/sub";List<ACL> acls = new ArrayList<ACL>();Id bushro1 = new Id("digest", AclUtils.getDigestUserPwd("bushro1:123456"));Id bushro2 = new Id("digest", AclUtils.getDigestUserPwd("bushro2:123456"));acls.add(new ACL(Perms.ALL, bushro1));acls.add(new ACL(Perms.READ, bushro2));acls.add(new ACL(Perms.DELETE | Perms.CREATE, bushro2));// 创建节点byte[] data = "spiderman".getBytes();cto.client.create().creatingParentsIfNeeded()//递归方式创建.withMode(CreateMode.PERSISTENT).withACL(acls)//该方式只有对最后一个节点设置权限,相应的父节点默认都是"world", "anyone",// 如果后面添加true那么所有创建的节点都是设置的权限.forPath(nodePath, data);//设置权限cto.client.setACL().withACL(acls).forPath("/curatorNode");// 更新节点数据byte[] newData = "batman".getBytes();cto.client.setData().withVersion(0).forPath(nodePath, newData);// 删除节点cto.client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(0).forPath(nodePath);// 读取节点数据Stat stat = new Stat();byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);System.out.println("节点" + nodePath + "的数据为: " + new String(data));System.out.println("该节点的版本号为: " + stat.getVersion());cto.closeZKClient();boolean isZkCuratorStarted2 = cto.client.isStarted();System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));}}
ZooKeeper 之Apache Curator 客户端使用相关推荐
- Apache Curator客户端的使用
当前已有的三种API客户端 zk原生API :不支持超时自动重连,不支持节点递归创建 zkclient:无文档,异常处理弱爆了(简单的抛出RuntimeException) Apache Curato ...
- Apache ZooKeeper - 使用Apache Curator操作ZK
文章目录 原生ZK API VS Curator Curator 概述 Maven依赖 会话创建 静态工厂方式创建会话 使用 fluent 风格创建会话 创建节点 protection 模式 ,规避僵 ...
- 【zookeeper】Apache curator优点介绍
文章目录 1. 简介 2. 项目组件 2.1 版本 2.2 项目组件 2.3 Maven依赖 3. 案例及功能说明 3.1 创建会话 3.1.1 重试策略 3.1.2 创建节点 3.1.3 删除节点 ...
- zookeeper简介以及C客户端用法
zookeeper简介以及C客户端用法 前言 简介 zookeeper保证 理解zookeeper的顺序一致性 zookeeper 接口 安装 zoo.cfg参数详解 常用命令 C API zooke ...
- Zookeeper 原生客户端、可视化工具 ZooInspector 、Apache Curator
目录 Zookeeper 原生客户端 Apache Curator 开源客户端 可视化客户端工具 ZooInspector Zookeeper 原生客户端 1.类似 Redis 有多种 Java 客户 ...
- Zookeeper Java 客户端 ——Apache Curator
Zookeeper Java 客户端 --Apache Curator 一.基本依赖 二.客户端相关操作 2.1 创建客户端实例 2.2 重试策略 ...
- Zookeeper分布式一致性原理(七):Curator客户端
1. Curator简介 Curator是Netfix公司开源的一套Zookeeper客户端.Curator解决了很多Zookeeper客户端非常底层的细节开发工作,包括重连.反复注册Watcher和 ...
- Apache Curator操作zookeeper的API使用
curator简介与客户端之间的异同点 常用的zookeeper java客户端: zookeeper原生Java API zkclient Apache curator ZooKeeper原生Jav ...
- 基于Apache Curator框架的ZooKeeper基本用法详解
简介 Apache Curator是一个比较完善的ZooKeeper客户端框架,通过封装的一套高级API 简化了ZooKeeper的操作.通过查看官方文档,可以发现Curator主要解决了三类问题: ...
最新文章
- rhel6编译ssh的src.rpm包并修改spec参数
- 记一次Java进程突然消失问题
- Halcon的数据类型
- 如何防御潜在的破坏性DDoS攻击—Vecloud微云
- 36.LEN() 函数
- form data怎么接收_VUE发送Formdata数据,NodeJS接收
- 两个ESP8266一个作为服务器一个作为客户端实现互相通讯
- 【闲】获取视频选集(每集)名字
- Facebook pop
- 【WinForm】TextBox只能输入数字
- 如何控制input的输入方向
- web端上传图片的几种方式
- 工赋开发者社区 | 新一波JavaScript Web框架
- 关于BUUCTF yxx和异性相吸
- 约定由于配置(Convention over Configuration)
- Javascript JS 网页分享到QQ空间QQ,java实现qq分享,url带参数的方法
- 2022年最新安徽建筑安全员考试题库及答案
- 数字标牌 android,【浩鑫推出全球首款英特尔方案+Android系统数字标牌播放器】PjTime.COM 新品快讯 Intel...
- 好教程推荐系列:《Qt 5.9 C++开发指南》
- Vue.js 2.0 混合