操作Zookeeper
可以通过图形化界面进行操作使用的工具是 zookeeper-dev-ZooInspector.jar
连接到我的zk之后:
1、Java操作zk
依赖:
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.6</version></dependency></dependencies>
代码:
package com.toov5.zookeeper;import java.io.IOException; import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper;public class JavaZKTest {private static final String CONNECTSTRING ="192.168.91.5";private static int SESSIONTIMEOUT=5000; //超时时间//使用Java并发包的 信号量 控制zk连接成功之后 开始创建private static final CountDownLatch countDownLatch = new CountDownLatch(1);public static void main(String[] args) throws IOException, KeeperException, InterruptedException {ZooKeeper zooKeeper = null;try {//1zk创建了一个连接zooKeeper = new ZooKeeper(CONNECTSTRING, SESSIONTIMEOUT, new Watcher() {public void process(WatchedEvent event) {//监听节点是否发生变化 连接成功 (代码从上往下执行,创建节点 直接copy)// 获取事件状态KeeperState keeperState = event.getState();// 获取事件类型EventType eventType = event.getType();if (KeeperState.SyncConnected == keeperState) { //状态判断if (EventType.None == eventType) {countDownLatch.countDown(); //--操作 到0时候 await启动了哦System.out.println("zk 启动连接..."); //才可以去创建节点的逻辑执行 需要用到信号量 }} }}); countDownLatch.await(); //不为0 一直等待~~//创建持久节点 //Ids.OPEN_ACL_UNSAFE连接权限 CreateMode对应好多模式 关于 SEQENTIAL 重名情况下 加了个id 保证唯一性String createNode = zooKeeper.create("/test666", "toov5".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);System.out.println("节点名称"+createNode);} catch (Exception e) {}finally {if (zooKeeper != null) { //关闭连接 zooKeeper.close(); }}} }
结果:
注意如果创建父子关系,必须先创建父 在创建子
然后:
说明:
- 1. 创建持久节点,并且允许任何服务器可以操作
String result = zk.create("/itmayiedu_Lasting", "Lasting".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("result:" + result);
- 2. 创建临时节点
String result = zk.create("/itmayiedu_temp", "temp".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("result:" + result);
创建节点(znode) 方法:
create:
提供了两套创建节点的方法,同步和异步创建节点方式。
同步方式:
参数1,节点路径《名称) : InodeName (不允许递归创建节点,也就是说在父节点不存在
的情况下,不允许创建子节点)
参数2,节点内容: 要求类型是字节数组(也就是说,不支持序列化方式,如果需要实现序
列化,可使用java相关序列化框架,如Hessian、Kryo框架)
参數3,节点权限: 使用Ids.OPEN_ACL_UNSAFE开放权限即可。(这个参数一般在权展
没有太高要求的场景下,没必要关注)
参数4,节点类型: 创建节点的类型: CreateMode,提供四种首点象型
PERSISTENT 持久化节点
PERSISTENT_SEQUENTIAL 顺序自动编号持久化节点,这种节点会根据当前已存在的节点数自动加 1
EPHEMERAL 临时节点, 客户端session超时这类节点就会被自动删除
EPHEMERAL_SEQUENTIAL 临时自动编号节点
Watcher
在ZooKeeper中,接口类Watcher用于表示一个标准的事件处理器,其定义了事件通知相关的逻辑,包含KeeperState和EventType两个枚举类,分别代表了通知状态和事件类型,同时定义了事件的回调方法:process(WatchedEvent event)。
什么是Watcher接口
同一个事件类型在不同的通知状态中代表的含义有所不同,表列举了常见的通知状态和事件类型。
Watcher通知状态与事件类型一览
KeeperState |
EventType |
触发条件 |
说明 |
None |
客户端与服务端成功建立连接 |
||
SyncConnected |
NodeCreated |
Watcher监听的对应数据节点被创建 |
|
NodeDeleted |
Watcher监听的对应数据节点被删除 |
此时客户端和服务器处于连接状态 |
|
NodeDataChanged |
Watcher监听的对应数据节点的数据内容发生变更 |
||
NodeChildChanged |
Wather监听的对应数据节点的子节点列表发生变更 |
||
Disconnected |
None |
客户端与ZooKeeper服务器断开连接 |
此时客户端和服务器处于断开连接状态 |
Expired |
Node |
会话超时 |
此时客户端会话失效,通常同时也会受到SessionExpiredException异常 |
AuthFailed |
None |
通常有两种情况,1:使用错误的schema进行权限检查 2:SASL权限检查失败 |
通常同时也会收到AuthFailedException异常 |
表7-3中列举了ZooKeeper中最常见的几个通知状态和事件类型。
回调方法process()
process方法是Watcher接口中的一个回调方法,当ZooKeeper向客户端发送一个Watcher事件通知时,客户端就会对相应的process方法进行回调,从而实现对事件的处理。process方法的定义如下:
abstract public void process(WatchedEvent event);
这个回调方法的定义非常简单,我们重点看下方法的参数定义:WatchedEvent。
WatchedEvent包含了每一个事件的三个基本属性:通知状态(keeperState),事件类型(EventType)和节点路径(path),其数据结构如图7-5所示。ZooKeeper使用WatchedEvent对象来封装服务端事件并传递给Watcher,从而方便回调方法process对服务端事件进行处理。
提到WatchedEvent,不得不讲下WatcherEvent实体。笼统地讲,两者表示的是同一个事物,都是对一个服务端事件的封装。不同的是,WatchedEvent是一个逻辑事件,用于服务端和客户端程序执行过程中所需的逻辑对象,而WatcherEvent因为实现了序列化接口,因此可以用于网络传输。
服务端在生成WatchedEvent事件之后,会调用getWrapper方法将自己包装成一个可序列化的WatcherEvent事件,以便通过网络传输到客户端。客户端在接收到服务端的这个事件对象后,首先会将WatcherEvent还原成一个WatchedEvent事件,并传递给process方法处理,回调方法process根据入参就能够解析出完整的服务端事件了。
需要注意的一点是,无论是WatchedEvent还是WatcherEvent,其对ZooKeeper服务端事件的封装都是机及其简单的。举个例子来说,当/zk-book这个节点的数据发生变更时,服务端会发送给客户端一个“ZNode数据内容变更”事件,客户端只能够接收到如下信
public class ZkClientWatcher implements Watcher {// 集群连接地址private static final String CONNECT_ADDRES = "192.168.110.159:2181,192.168.110.160:2181,192.168.110.162:2181";// 会话超时时间private static final int SESSIONTIME = 2000;// 信号量,让zk在连接之前等待,连接成功后才能往下走.private static final CountDownLatch countDownLatch = new CountDownLatch(1);private static String LOG_MAIN = "【main】 ";private ZooKeeper zk;public void createConnection(String connectAddres, int sessionTimeOut) {try {zk = new ZooKeeper(connectAddres, sessionTimeOut, this);System.out.println(LOG_MAIN + "zk 开始启动连接服务器....");countDownLatch.await();} catch (Exception e) {e.printStackTrace();}}public boolean createPath(String path, String data) {try {this.exists(path, true);this.zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);System.out.println(LOG_MAIN + "节点创建成功, Path:" + path + ",data:" + data);} catch (Exception e) {e.printStackTrace();return false;}return true;}/*** 判断指定节点是否存在* * @param path* 节点路径*/public Stat exists(String path, boolean needWatch) {try {return this.zk.exists(path, needWatch);} catch (Exception e) {e.printStackTrace();return null;}}public boolean updateNode(String path,String data) throws KeeperException, InterruptedException {exists(path, true);this.zk.setData(path, data.getBytes(), -1);return false;}public void process(WatchedEvent watchedEvent) {// 获取事件状态KeeperState keeperState = watchedEvent.getState();// 获取事件类型EventType eventType = watchedEvent.getType();// zk 路径String path = watchedEvent.getPath();System.out.println("进入到 process() keeperState:" + keeperState + ", eventType:" + eventType + ", path:" + path);// 判断是否建立连接if (KeeperState.SyncConnected == keeperState) {if (EventType.None == eventType) {// 如果建立建立成功,让后程序往下走System.out.println(LOG_MAIN + "zk 建立连接成功!");countDownLatch.countDown();} else if (EventType.NodeCreated == eventType) {System.out.println(LOG_MAIN + "事件通知,新增node节点" + path);} else if (EventType.NodeDataChanged == eventType) {System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被修改....");}else if (EventType.NodeDeleted == eventType) {System.out.println(LOG_MAIN + "事件通知,当前node节点" + path + "被删除....");}}System.out.println("--------------------------------------------------------");}public static void main(String[] args) throws KeeperException, InterruptedException {ZkClientWatcher zkClientWatcher = new ZkClientWatcher();zkClientWatcher.createConnection(CONNECT_ADDRES, SESSIONTIME); // boolean createResult = zkClientWatcher.createPath("/p15", "pa-644064");zkClientWatcher.updateNode("/pa2","7894561");}}
注意在创建节点时候,一定要写 “/” 比如“/test” !!!!!!
下面的例子很有意义的,监听的:
package com.toov5.zookeeper;import java.io.IOException; import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.jboss.netty.handler.codec.http.websocketx.WebSocketHandshakeException; import org.apache.zookeeper.ZooKeeper;public class JavaZKTest {private static final String CONNECTSTRING ="192.168.91.5";private static int SESSIONTIMEOUT=5000; //超时时间//使用Java并发包的 信号量 控制zk连接成功之后 开始创建private static final CountDownLatch countDownLatch = new CountDownLatch(1);public static void main(String[] args) throws IOException, KeeperException, InterruptedException {ZooKeeper zooKeeper = null;try {//1zk创建了一个连接zooKeeper = new ZooKeeper(CONNECTSTRING, SESSIONTIMEOUT, new Watcher() {public void process(WatchedEvent event) {//监听节点是否发生变化 连接成功 (代码从上往下执行,创建节点 直接copy)// 获取事件状态KeeperState keeperState = event.getState();// 获取事件类型EventType eventType = event.getType();if (KeeperState.SyncConnected == keeperState) { //状态判断if (EventType.None == eventType) {countDownLatch.countDown(); //--操作 到0时候 await启动了哦System.out.println("zk 启动连接..."); //才可以去创建节点的逻辑执行 需要用到信号量 }if (EventType.NodeCreated==eventType) {System.out.println("zk时间通知,获取当前在创建节点..");}} }}); countDownLatch.await(); //不为0 一直等待~~String path="/tooov5";zooKeeper.exists(path, true); //true时候有事件通知//创建持久节点 //Ids.OPEN_ACL_UNSAFE连接权限 CreateMode对应好多模式 关于 SEQENTIAL 重名情况下 加了个id 保证唯一性String createNode = zooKeeper.create(path, "toov5".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);System.out.println("节点名称"+createNode);} catch (Exception e) {}finally {if (zooKeeper != null) { //关闭连接 zooKeeper.close(); }}} }
代码这样修改:
节点的 增加 删除 修改 都可以监听到!!!!!!!!!
转载于:https://www.cnblogs.com/toov5/p/9898177.html
操作Zookeeper相关推荐
- ZooKeeper入门(二)Java操作zookeeper
首先是pom依赖: <dependency><groupId>org.apache.zookeeper</groupId><artifactId>zoo ...
- zookeeper教程,docker 安装,命令,python操作zookeeper,分布式队列,分布式锁
docker安装zookeeper服务端 首先安装单节点的服务端,如果安装多节点的服务端,需要为每个节点配置其他节点的地址. docker run --privileged=true -d --nam ...
- Zookeeper学习之源生API的使用(java与shell操作zookeeper)。
如果不会搭建zookeeper环境:请看此文章:Zookeeper学习之集群环境搭建 1.操作zookeeper(shell) 启动zookeeper客户端:zkCli.sh; 根据提示命令进行操作: ...
- 使用Kazoo操作ZooKeeper服务治理
使用Kazoo操作ZooKeeper服务治理 单机服务的可靠性及可扩展性有限,某台服务宕机可能会影响整个系统的正常使用:分布式服务能够有效地解决这一问题,但同时分布式服务也会带来一些新的问题,如:服务 ...
- python系列之:kazoo连接Zookeeper操作Zookeeper
python系列之:kazoo连接Zookeeper操作Zookeeper 一.连接zookeeper 二.读取zookeeper节点 三.读取zookeeper内容 四.kazoo连接Zookeep ...
- python 操作 zookeeper 快速入门
python 操作 zookeeper 快速入门 文章目录 python 操作 zookeeper 快速入门 什么是zookeeper python 操作 zk 快速入门 zk节点 创建节点 查询节点 ...
- 005_Java操作ZooKeeper
1. ZooKeeper的JavaClient是我们更轻松的去对ZooKeeper进行各种操作.我们仅需要引入zookeeper-3.7.0.jar和zookeeper-jute-3.7.0.jar两 ...
- 支持断线重连、永久watcher、递归操作 ZooKeeper 客户端
项目介绍 ZooKeeper本质上是一个分布式的小文件存储系统.原本是Apache Hadoop的一个组件,现在被拆分为一个Hadoop的独立子项目. Zookeeper 作为一个分布式的服务框架,主 ...
- Curator操作ZooKeeper
Curator极大简化了ZooKeeper的使用,增加了针对ZooKeeper集群中connection的管理. 节点的创建和删除 import java.util.List; import java ...
- ZooKeeper(二) idea中使用Java操作zookeeper
本章介绍使在idea环境中使用maven工程进行连接zookeeper,并进行简单的增删查改操作. 一.使用的pom依赖 添加zookeeper依赖,以及进行测试的junit测试依赖. <?xm ...
最新文章
- python爬虫scrapy步骤mac系统_python scrapy简单爬虫记录(实现简单爬取知乎)
- mysql variables 大记录 查询 慢_MySQL - 慢查询
- 产品要不要做先回答的10个问题
- SAP Cloud for Customer里的Sales Lead和Lead
- Linux之RPM 软件管理程序
- Enews博客/CMS/双模式主题源码
- kafka消息存储格式
- 两万字深度介绍分布式系统原理,这一篇就够了
- 【kafka】kafka 中 消息 record 格式
- SAP License:外购和自产货物视同销售业务理解
- ****阿里云使用+快速运维总结(不断更新)
- 微信小程序————样式
- 浅谈企业知识资产管理及建设思路
- 华为最强科普:什么是DSP?
- VS2013附加包含目录,添加相对路径
- 3gqq幻想西游〓宠物、副本、攻略、攻城、极品怪〓
- MPP(无主备)环境搭建
- unity黑白滤镜_unity图像优化美化滤镜渲染插件Beautify 6.2.7
- 单击屏幕亮屏流程分析
- Flutter 使用 ESC/POS蓝牙或以太网库控制热敏打印机