3分钟理解zookeeper的watcher机制
代码呈上
package jeff.zookeeper.watcher;import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;/*** Zookeeper Wathcher * 本类就是一个Watcher类(实现了org.apache.zookeeper.Watcher类)* @authorjeff*/
public class ZooKeeperWatcher implements Watcher {/** 定义原子变量 */AtomicInteger seq = new AtomicInteger();/** 定义session失效时间 */private static final int SESSION_TIMEOUT = 10000;/** zookeeper服务器地址 */private static final String CONNECTION_ADDR = "192.168.98.98:2181,192.168.98.99:2181,192.168.98.100:2181";/** zk父路径设置 */private static final String PARENT_PATH = "/p";/** zk子路径设置 */private static final String CHILDREN_PATH = "/p/c1";/** 进入标识 */private static final String LOG_PREFIX_OF_MAIN = "【Main】";/** zk变量 */private ZooKeeper zk = null;/** 信号量设置,用于等待zookeeper连接建立之后 通知阻塞程序继续向下执行 */private CountDownLatch connectedSemaphore = new CountDownLatch(1);/*** 创建ZK连接* @param connectAddr ZK服务器地址列表* @param sessionTimeout Session超时时间*/public void createConnection(String connectAddr, int sessionTimeout) {this.releaseConnection();try {zk = new ZooKeeper(connectAddr, sessionTimeout, this);System.out.println(LOG_PREFIX_OF_MAIN + "开始连接ZK服务器");connectedSemaphore.await();} catch (Exception e) {e.printStackTrace();}}/*** 关闭ZK连接*/public void releaseConnection() {if (this.zk != null) {try {this.zk.close();} catch (InterruptedException e) {e.printStackTrace();}}}/*** 创建节点* @param path 节点路径* @param data 数据内容* @return */public boolean createPath(String path, String data,boolean watch) {try {//设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)this.zk.exists(path, watch);System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " + this.zk.create( /**路径*/ path, /**数据*/data.getBytes(), /**所有可见*/Ids.OPEN_ACL_UNSAFE, /**永久存储*/CreateMode.PERSISTENT ) + ", content: " + data);} catch (Exception e) {e.printStackTrace();return false;}return true;}/*** 读取指定节点数据内容* @param path 节点路径* @return*/public String readData(String path, boolean needWatch) {try {return new String(this.zk.getData(path, needWatch, null));} catch (Exception e) {e.printStackTrace();return "";}}/*** 更新指定节点数据内容* @param path 节点路径* @param data 数据内容* @return*/public boolean writeData(String path, String data) {try {System.out.println(LOG_PREFIX_OF_MAIN + "更新数据成功,path:" + path + ", stat: " +this.zk.setData(path, data.getBytes(), -1));} catch (Exception e) {e.printStackTrace();}return false;}/*** 删除指定节点* * @param path* 节点path*/public void deleteNode(String path) {try {this.zk.delete(path, -1);System.out.println(LOG_PREFIX_OF_MAIN + "删除节点成功,path:" + path);} catch (Exception e) {e.printStackTrace();}}/*** 判断指定节点是否存在* @param path 节点路径*/public Stat exists(String path, boolean needWatch) {try {return this.zk.exists(path, needWatch);} catch (Exception e) {e.printStackTrace();return null;}}/*** 获取子节点* @param path 节点路径*/private List<String> getChildren(String path, boolean needWatch) {try {return this.zk.getChildren(path, needWatch);} catch (Exception e) {e.printStackTrace();return null;}}/*** 删除所有节点*/public void deleteAllTestPath() {if(this.exists(CHILDREN_PATH, false) != null){this.deleteNode(CHILDREN_PATH);}if(this.exists(PARENT_PATH, false) != null){this.deleteNode(PARENT_PATH);} }/*** 收到来自Server的Watcher通知后的处理。*/@Overridepublic void process(WatchedEvent event) {System.out.println("进入 process 。。。。。event = " + event);try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}if (event == null) {return;}// 连接状态KeeperState keeperState = event.getState();// 事件类型EventType eventType = event.getType();// 受影响的pathString path = event.getPath();String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";System.out.println(logPrefix + "收到Watcher通知");System.out.println(logPrefix + "连接状态:\t" + keeperState.toString());System.out.println(logPrefix + "事件类型:\t" + eventType.toString());if (KeeperState.SyncConnected == keeperState) {// 成功连接上ZK服务器if (EventType.None == eventType) {System.out.println(logPrefix + "成功连接上ZK服务器");connectedSemaphore.countDown();} //创建节点else if (EventType.NodeCreated == eventType) {System.out.println(logPrefix + "节点创建");try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}this.exists(path, true);} //更新节点else if (EventType.NodeDataChanged == eventType) {System.out.println(logPrefix + "节点数据更新");try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(logPrefix + "数据内容: " + this.readData(PARENT_PATH, true));} //更新子节点else if (EventType.NodeChildrenChanged == eventType) {System.out.println(logPrefix + "子节点变更");try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(logPrefix + "子节点列表:" + this.getChildren(PARENT_PATH, true));} //删除节点else if (EventType.NodeDeleted == eventType) {System.out.println(logPrefix + "节点 " + path + " 被删除");}else ;} else if (KeeperState.Disconnected == keeperState) {System.out.println(logPrefix + "与ZK服务器断开连接");} else if (KeeperState.AuthFailed == keeperState) {System.out.println(logPrefix + "权限检查失败");} else if (KeeperState.Expired == keeperState) {System.out.println(logPrefix + "会话失效");}else ;System.out.println("--------------------------------------------");}/*** <B>方法名称:</B>测试zookeeper监控<BR>* <B>概要说明:</B>主要测试watch功能<BR>* @param args* @throws Exception*/public static void main(String[] args) throws Exception {//建立watcherZooKeeperWatcher zkWatch = new ZooKeeperWatcher();//创建连接zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);//System.out.println(zkWatch.zk.toString());Thread.sleep(1000);// 清理节点//zkWatch.deleteAllTestPath();if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "",true)) {Thread.sleep(1000);// 读取数据System.out.println("---------------------- read parent ----------------------------");zkWatch.readData(PARENT_PATH, true);// 更新数据zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");Thread.sleep(1000);// 读取子节点System.out.println("---------------------- read children path ----------------------------");zkWatch.getChildren(PARENT_PATH, true);// 创建子节点zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "",true);//Thread.sleep(1000);//zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");}//Thread.sleep(5000);// 清理节点//zkWatch.deleteAllTestPath();Thread.sleep(1000);zkWatch.releaseConnection();}}
zk的事件类型和状态类型
数据变化对应的事件类型和状态类型:
事件类型(跟Znode节点相关的):
EventType.NodeCreated
EventType.NodeDataChanged
EventType.NodeChildrenChanged
EventType.NodeDeleted
EventType.NONE ---连接上zk后触发此事件类型
状态类型(跟客户端实例相关的):
KeeperState.Disconnected
KeeperState.SyncConnected
KeeperState.AuthFailed
KeeperState.Expired
EventType.NONE事件
当zk客户端连接到zk服务端触发EventType.NONE事件,此时watcher的事件状态是KeeperState.SyncConnected。
//建立watcherZooKeeperWatcher zkWatch = new ZooKeeperWatcher();//创建连接zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);//System.out.println(zkWatch.zk.toString());
上例子中会打印:
【Main】开始连接ZK服务器
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:None path:null
【Watcher-1】收到Watcher通知
【Watcher-1】连接状态: SyncConnected
【Watcher-1】事件类型: None
【Watcher-1】成功连接上ZK服务器
可以看到此时的事件触发的path为null,因为并不是节点触发而是启动连接成功所触发。
EventType.NodeCreated
例子中当我们创建/p节点时会触发节点创建事件:
/*** 创建节点* @param path 节点路径* @param data 数据内容* @return */public boolean createPath(String path, String data,boolean watch) {try {//设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)this.zk.exists(path, watch);System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " + this.zk.create( /**路径*/ path, /**数据*/data.getBytes(), /**所有可见*/Ids.OPEN_ACL_UNSAFE, /**永久存储*/CreateMode.PERSISTENT ) + ", content: " + data);} catch (Exception e) {e.printStackTrace();return false;}return true;}* 创建节点* @param path 节点路径* @param data 数据内容* @return */public boolean createPath(String path, String data,boolean watch) {try {//设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)this.zk.exists(path, watch);System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " + this.zk.create( /**路径*/ path, /**数据*/data.getBytes(), /**所有可见*/Ids.OPEN_ACL_UNSAFE, /**永久存储*/CreateMode.PERSISTENT ) + ", content: " + data);} catch (Exception e) {e.printStackTrace();return false;}return true;}
打印结果:
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/p
【Main】节点创建成功, Path: /p, content: 1520667455362
【Watcher-2】收到Watcher通知
【Watcher-2】连接状态: SyncConnected
【Watcher-2】事件类型: NodeCreated
【Watcher-2】节点创建
--------------------------------------------
注意此时我们传入了一个watch的bool类型值,如果此时watch值为false,那么上边的结果还会打印吗?不会。
EventType.NodeDataChanged
因为zk的watcher事件的监听是一次性的,也就是说当zk服务端将事件发生的结果通知到zk的watcher客户端后,此前这个事件发生节点(比如/p)将不会再被监听到任何事件,提问:如下的/p节点的数据更新事件是否会被监听打印?
不会,除非再次设置/p的watch为true,如何操作呢?我们可以借助exists方法或者getData方法:
那么我们放开如下代码:
观察下数据更新事件的打印:
【Main】开始连接ZK服务器
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:None path:null
【Watcher-1】收到Watcher通知
【Watcher-1】连接状态: SyncConnected
【Watcher-1】事件类型: None
【Watcher-1】成功连接上ZK服务器
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/p
【Main】节点创建成功, Path: /p, content: 1520668957729
【Watcher-2】收到Watcher通知
【Watcher-2】连接状态: SyncConnected
【Watcher-2】事件类型: NodeCreated
【Watcher-2】节点创建
--------------------------------------------
---------------------- read parent ----------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/p
【Main】更新数据成功,path:/p, stat: 34359738397,34359738398,1520697336511,1520697337551,1,0,0,0,13,0,34359738397【Watcher-3】收到Watcher通知
【Watcher-3】连接状态: SyncConnected
【Watcher-3】事件类型: NodeDataChanged
【Watcher-3】节点数据更新
【Watcher-3】数据内容: 1520668958801
--------------------------------------------
【Main】更新数据成功,path:/p, stat: 34359738397,34359738398,1520697336511,1520697337551,1,0,0,0,13,0,34359738397【Watcher-3】收到Watcher通知 【Watcher-3】连接状态: SyncConnected 【Watcher-3】事件类型: NodeDataChanged 【Watcher-3】节点数据更新 【Watcher-3】数据内容: 1520668958801 --------------------------------------------
EventType.NodeChildrenChanged
放开如下代码,请问是否能监听到子节点的NodeChildrenChanged事件
这样是不能的,只会打印子节点的创建事件,原因是/p节点的子节点创建事件虽然触发,但是子节点的改变事件并没有设置watch为true。我们设置zkWatch.getChildren(PARENT_PATH,true);再次运行,发现子节点change事件在子节点创建后被打印:
【Main】开始连接ZK服务器
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:None path:null
【Watcher-1】收到Watcher通知
【Watcher-1】连接状态: SyncConnected
【Watcher-1】事件类型: None
【Watcher-1】成功连接上ZK服务器
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/p
【Main】节点创建成功, Path: /p, content: 1520669809911
【Watcher-2】收到Watcher通知
【Watcher-2】连接状态: SyncConnected
【Watcher-2】事件类型: NodeCreated
【Watcher-2】节点创建
--------------------------------------------
---------------------- read parent ----------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/p
【Main】更新数据成功,path:/p, stat: 34359738417,34359738418,1520698188655,1520698189687,1,0,0,0,13,0,34359738417【Watcher-3】收到Watcher通知
【Watcher-3】连接状态: SyncConnected
【Watcher-3】事件类型: NodeDataChanged
【Watcher-3】节点数据更新
【Watcher-3】数据内容: 1520669810962
--------------------------------------------
---------------------- read children path ----------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/p/c1
【Main】节点创建成功, Path: /p/c1, content: 1520669812011
【Watcher-4】收到Watcher通知
【Watcher-4】连接状态: SyncConnected
【Watcher-4】事件类型: NodeCreated
【Watcher-4】节点创建
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/p
【Watcher-5】收到Watcher通知
【Watcher-5】连接状态: SyncConnected
【Watcher-5】事件类型: NodeChildrenChanged
【Watcher-5】子节点变更
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/p
【Watcher-5】收到Watcher通知
【Watcher-5】连接状态: SyncConnected
【Watcher-5】事件类型: NodeChildrenChanged
【Watcher-5】子节点变更
EventType.NodeDeleted
我们放开如下代码:
观察打印:
【Main】开始连接ZK服务器
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:None path:null
【Watcher-1】收到Watcher通知
【Watcher-1】连接状态: SyncConnected
【Watcher-1】事件类型: None
【Watcher-1】成功连接上ZK服务器
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/p
【Main】节点创建成功, Path: /p, content: 1520670313945
【Watcher-2】收到Watcher通知
【Watcher-2】连接状态: SyncConnected
【Watcher-2】事件类型: NodeCreated
【Watcher-2】节点创建
--------------------------------------------
---------------------- read parent ----------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/p
【Main】更新数据成功,path:/p, stat: 34359738424,34359738425,1520698692709,1520698693724,1,0,0,0,13,0,34359738424【Watcher-3】收到Watcher通知
【Watcher-3】连接状态: SyncConnected
【Watcher-3】事件类型: NodeDataChanged
【Watcher-3】节点数据更新
【Watcher-3】数据内容: 1520670315011
--------------------------------------------
---------------------- read children path ----------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/p/c1
【Main】节点创建成功, Path: /p/c1, content: 1520670316035
【Watcher-4】收到Watcher通知
【Watcher-4】连接状态: SyncConnected
【Watcher-4】事件类型: NodeCreated
【Watcher-4】节点创建
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/p
【Watcher-5】收到Watcher通知
【Watcher-5】连接状态: SyncConnected
【Watcher-5】事件类型: NodeChildrenChanged
【Watcher-5】子节点变更
【Watcher-5】子节点列表:[c1]
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeDeleted path:/p/c1
【Main】删除节点成功,path:/p/c1
【Main】删除节点成功,path:/p
【Watcher-6】收到Watcher通知
【Watcher-6】连接状态: SyncConnected
【Watcher-6】事件类型: NodeDeleted
【Watcher-6】节点 /p/c1 被删除
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/p
【Watcher-7】收到Watcher通知
【Watcher-7】连接状态: SyncConnected
【Watcher-7】事件类型: NodeChildrenChanged
【Watcher-7】子节点变更
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeDeleted path:/p/c1
【Main】删除节点成功,path:/p/c1
【Main】删除节点成功,path:/p
【Watcher-6】收到Watcher通知
【Watcher-6】连接状态: SyncConnected
【Watcher-6】事件类型: NodeDeleted
【Watcher-6】节点 /p/c1 被删除
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/p
【Watcher-7】收到Watcher通知
【Watcher-7】连接状态: SyncConnected
【Watcher-7】事件类型: NodeChildrenChanged
【Watcher-7】子节点变更
由于我们对子节点/p/c1设置了监听:
所以当删除了/p/c1事件触发后也同时触发了子节点改变事件。
---------------------------------------------客官,好的话请打赏哦--------------------------------------------------------------------------
3分钟理解zookeeper的watcher机制相关推荐
- zookeeper的watcher机制
ZooKeeper 提供了分布式数据的发布/订阅功能.一个典型的发布/阅模型系统定义了一种一对多的订阅关系,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态变化时,会通知所有订阅者,使它 ...
- 品味ZooKeeper之Watcher机制_2
品味ZooKeeper之Watcher机制 本文思维导图如下: 前言 Watcher机制是zookeeper最重要三大特性数据节点Znode+Watcher机制+ACL权限控制中的其中一个,它是zk很 ...
- Zookeeper的Watcher机制及Watcher原理分析
Zookeeper的Watcher机制及Watcher原理分析 1 什么是Watcher监听机制 Watcher 监听机制是 Zookeeper中非常重要的特性,我们基于zookeeper上创建的节点 ...
- Zookeeper之Watcher机制详解
概念 Zookeeper提供了数据的发布/订阅功能.多个订阅者可监听某一特定主题对象(节点).当主题对象发生改变(数据内容改变,被删除等),会实时通知所有订阅者. Zookeeper采用了Watche ...
- zookeeper的watcher机制原理详解
文章目录 客户端注册Watcher 服务端处理getData请求 服务端处理setData请求 客户端触发Watcher 我们都知道在zookeeper中存在watcher机制,查询服务端节点数据时可 ...
- ZooKeeper入门(三)zookeeper的Watcher机制
process方法是Watcher接口中的一个回调方法,当ZooKeeper向客户端发送一个Watcher事件通知时,客户端就会对相应的process方法进行回调,从而实现对事件的处理. import ...
- 带你轻松理解Zookeeper的选举机制
一,Zookeeper选举过程中服务器的状态. LOOKING:寻找leader状态,该状态下,服务器认为当前集群没有leader,会发起leader选举.在选举过程中,所有服务器的状态都是LOOKI ...
- zookeeper 中 Watcher 通知机制的一点理解
首先,ZooKeeper 提供了分布式数据的发布/订阅功能. 这让我想到一种模式,观察者模式(发布订阅模式):一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能够让多个订阅者同时监听某一个主题 ...
- zookeeper watch java_Apache ZooKeeper Watcher 机制源码解释
分布式系统从根本上来说就是不同节点上的进程并发执行,并且相互之间对进程的行为进行协调处理的过程.不同节点上的进程互相协调行为的过程叫做分布式同步.许多分布式系统需要一个进程作为任务的协调者,执行一些其 ...
最新文章
- Android-如何开发一个功能强大的图片选择器
- cas无法使用_并发编程中cas的这三大问题你知道吗?
- php 如何把u5fb,php如何将json中的unicode编码转为汉字?
- Google开源项目风格指南-笔记
- buu Windows系统密码
- 48. C# -- 事件
- chrome和safari_私人浏览器-如何在Chrome和Safari中使用隐身模式
- python中template是什么意思啊_Python中Template使用的一个小技巧
- java char i=2+#039;2#039;;_图说String(三)String中#039;+#039;和StringBuilder的区别
- java九九成表发_用EXCEL可多种办法生成99乘法表
- git升级后jenkins的报错
- Response AddHeader使用实例
- 小学计算机兴趣小组计划书,兴趣小组计划
- GD32F103实战笔记
- 简谈FPGA实现高斯滤波
- PHP运行的环境安装
- Python智能语音机器人
- Git新手入门视频教程
- Vue3+Vite3 SSR基本搭建
- Hadoop学习----HDFS