代码呈上

package jeff.zookeeper.watcher;import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;/*** Zookeeper Wathcher * 本类就是一个Watcher类(实现了org.apache.zookeeper.Watcher类)* @authorjeff*/
public class ZooKeeperWatcher implements Watcher {/** 定义原子变量 */AtomicInteger seq = new AtomicInteger();/** 定义session失效时间 */private static final int SESSION_TIMEOUT = 10000;/** zookeeper服务器地址 */private static final String CONNECTION_ADDR = "192.168.98.98:2181,192.168.98.99:2181,192.168.98.100:2181";/** zk父路径设置 */private static final String PARENT_PATH = "/p";/** zk子路径设置 */private static final String CHILDREN_PATH = "/p/c1";/** 进入标识 */private static final String LOG_PREFIX_OF_MAIN = "【Main】";/** zk变量 */private ZooKeeper zk = null;/** 信号量设置,用于等待zookeeper连接建立之后 通知阻塞程序继续向下执行 */private CountDownLatch connectedSemaphore = new CountDownLatch(1);/*** 创建ZK连接* @param connectAddr ZK服务器地址列表* @param sessionTimeout Session超时时间*/public void createConnection(String connectAddr, int sessionTimeout) {this.releaseConnection();try {zk = new ZooKeeper(connectAddr, sessionTimeout, this);System.out.println(LOG_PREFIX_OF_MAIN + "开始连接ZK服务器");connectedSemaphore.await();} catch (Exception e) {e.printStackTrace();}}/*** 关闭ZK连接*/public void releaseConnection() {if (this.zk != null) {try {this.zk.close();} catch (InterruptedException e) {e.printStackTrace();}}}/*** 创建节点* @param path 节点路径* @param data 数据内容* @return */public boolean createPath(String path, String data,boolean watch) {try {//设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)this.zk.exists(path, watch);System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " + this.zk.create( /**路径*/ path, /**数据*/data.getBytes(), /**所有可见*/Ids.OPEN_ACL_UNSAFE, /**永久存储*/CreateMode.PERSISTENT ) +     ", content: " + data);} catch (Exception e) {e.printStackTrace();return false;}return true;}/*** 读取指定节点数据内容* @param path 节点路径* @return*/public String readData(String path, boolean needWatch) {try {return new String(this.zk.getData(path, needWatch, null));} catch (Exception e) {e.printStackTrace();return "";}}/*** 更新指定节点数据内容* @param path 节点路径* @param data 数据内容* @return*/public boolean writeData(String path, String data) {try {System.out.println(LOG_PREFIX_OF_MAIN + "更新数据成功,path:" + path + ", stat: " +this.zk.setData(path, data.getBytes(), -1));} catch (Exception e) {e.printStackTrace();}return false;}/*** 删除指定节点* * @param path*            节点path*/public void deleteNode(String path) {try {this.zk.delete(path, -1);System.out.println(LOG_PREFIX_OF_MAIN + "删除节点成功,path:" + path);} catch (Exception e) {e.printStackTrace();}}/*** 判断指定节点是否存在* @param path 节点路径*/public Stat exists(String path, boolean needWatch) {try {return this.zk.exists(path, needWatch);} catch (Exception e) {e.printStackTrace();return null;}}/*** 获取子节点* @param path 节点路径*/private List<String> getChildren(String path, boolean needWatch) {try {return this.zk.getChildren(path, needWatch);} catch (Exception e) {e.printStackTrace();return null;}}/*** 删除所有节点*/public void deleteAllTestPath() {if(this.exists(CHILDREN_PATH, false) != null){this.deleteNode(CHILDREN_PATH);}if(this.exists(PARENT_PATH, false) != null){this.deleteNode(PARENT_PATH);}      }/*** 收到来自Server的Watcher通知后的处理。*/@Overridepublic void process(WatchedEvent event) {System.out.println("进入 process 。。。。。event = " + event);try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}if (event == null) {return;}// 连接状态KeeperState keeperState = event.getState();// 事件类型EventType eventType = event.getType();// 受影响的pathString path = event.getPath();String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";System.out.println(logPrefix + "收到Watcher通知");System.out.println(logPrefix + "连接状态:\t" + keeperState.toString());System.out.println(logPrefix + "事件类型:\t" + eventType.toString());if (KeeperState.SyncConnected == keeperState) {// 成功连接上ZK服务器if (EventType.None == eventType) {System.out.println(logPrefix + "成功连接上ZK服务器");connectedSemaphore.countDown();} //创建节点else if (EventType.NodeCreated == eventType) {System.out.println(logPrefix + "节点创建");try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}this.exists(path, true);} //更新节点else if (EventType.NodeDataChanged == eventType) {System.out.println(logPrefix + "节点数据更新");try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(logPrefix + "数据内容: " + this.readData(PARENT_PATH, true));} //更新子节点else if (EventType.NodeChildrenChanged == eventType) {System.out.println(logPrefix + "子节点变更");try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(logPrefix + "子节点列表:" + this.getChildren(PARENT_PATH, true));} //删除节点else if (EventType.NodeDeleted == eventType) {System.out.println(logPrefix + "节点 " + path + " 被删除");}else ;} else if (KeeperState.Disconnected == keeperState) {System.out.println(logPrefix + "与ZK服务器断开连接");} else if (KeeperState.AuthFailed == keeperState) {System.out.println(logPrefix + "权限检查失败");} else if (KeeperState.Expired == keeperState) {System.out.println(logPrefix + "会话失效");}else ;System.out.println("--------------------------------------------");}/*** <B>方法名称:</B>测试zookeeper监控<BR>* <B>概要说明:</B>主要测试watch功能<BR>* @param args* @throws Exception*/public static void main(String[] args) throws Exception {//建立watcherZooKeeperWatcher zkWatch = new ZooKeeperWatcher();//创建连接zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);//System.out.println(zkWatch.zk.toString());Thread.sleep(1000);// 清理节点//zkWatch.deleteAllTestPath();if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "",true)) {Thread.sleep(1000);// 读取数据System.out.println("---------------------- read parent ----------------------------");zkWatch.readData(PARENT_PATH, true);// 更新数据zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");Thread.sleep(1000);// 读取子节点System.out.println("---------------------- read children path ----------------------------");zkWatch.getChildren(PARENT_PATH, true);// 创建子节点zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "",true);//Thread.sleep(1000);//zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");}//Thread.sleep(5000);// 清理节点//zkWatch.deleteAllTestPath();Thread.sleep(1000);zkWatch.releaseConnection();}}

zk的事件类型和状态类型

数据变化对应的事件类型和状态类型:

事件类型(跟Znode节点相关的):

EventType.NodeCreated

EventType.NodeDataChanged

EventType.NodeChildrenChanged

EventType.NodeDeleted

EventType.NONE  ---连接上zk后触发此事件类型

状态类型(跟客户端实例相关的):

KeeperState.Disconnected

KeeperState.SyncConnected

KeeperState.AuthFailed

KeeperState.Expired

  • EventType.NONE事件

当zk客户端连接到zk服务端触发EventType.NONE事件,此时watcher的事件状态是KeeperState.SyncConnected。

                //建立watcherZooKeeperWatcher zkWatch = new ZooKeeperWatcher();//创建连接zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);//System.out.println(zkWatch.zk.toString());

上例子中会打印:

【Main】开始连接ZK服务器
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:None path:null
【Watcher-1】收到Watcher通知
【Watcher-1】连接状态:    SyncConnected
【Watcher-1】事件类型:    None
【Watcher-1】成功连接上ZK服务器

可以看到此时的事件触发的path为null,因为并不是节点触发而是启动连接成功所触发。

  • EventType.NodeCreated

例子中当我们创建/p节点时会触发节点创建事件:

/*** 创建节点* @param path 节点路径* @param data 数据内容* @return */public boolean createPath(String path, String data,boolean watch) {try {//设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)this.zk.exists(path, watch);System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " + this.zk.create(    /**路径*/ path, /**数据*/data.getBytes(), /**所有可见*/Ids.OPEN_ACL_UNSAFE, /**永久存储*/CreateMode.PERSISTENT ) +     ", content: " + data);} catch (Exception e) {e.printStackTrace();return false;}return true;}* 创建节点* @param path 节点路径* @param data 数据内容* @return */public boolean createPath(String path, String data,boolean watch) {try {//设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)this.zk.exists(path, watch);System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " + this.zk.create(   /**路径*/ path, /**数据*/data.getBytes(), /**所有可见*/Ids.OPEN_ACL_UNSAFE, /**永久存储*/CreateMode.PERSISTENT ) +     ", content: " + data);} catch (Exception e) {e.printStackTrace();return false;}return true;}

打印结果:

进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/p
【Main】节点创建成功, Path: /p, content: 1520667455362
【Watcher-2】收到Watcher通知
【Watcher-2】连接状态:    SyncConnected
【Watcher-2】事件类型:    NodeCreated
【Watcher-2】节点创建
--------------------------------------------

注意此时我们传入了一个watch的bool类型值,如果此时watch值为false,那么上边的结果还会打印吗?不会。

  • EventType.NodeDataChanged

因为zk的watcher事件的监听是一次性的,也就是说当zk服务端将事件发生的结果通知到zk的watcher客户端后,此前这个事件发生节点(比如/p)将不会再被监听到任何事件,提问:如下的/p节点的数据更新事件是否会被监听打印?

不会,除非再次设置/p的watch为true,如何操作呢?我们可以借助exists方法或者getData方法:

那么我们放开如下代码:

观察下数据更新事件的打印:

【Main】开始连接ZK服务器
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:None path:null
【Watcher-1】收到Watcher通知
【Watcher-1】连接状态:    SyncConnected
【Watcher-1】事件类型:    None
【Watcher-1】成功连接上ZK服务器
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/p
【Main】节点创建成功, Path: /p, content: 1520668957729
【Watcher-2】收到Watcher通知
【Watcher-2】连接状态:    SyncConnected
【Watcher-2】事件类型:    NodeCreated
【Watcher-2】节点创建
--------------------------------------------
---------------------- read parent ----------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/p
【Main】更新数据成功,path:/p, stat: 34359738397,34359738398,1520697336511,1520697337551,1,0,0,0,13,0,34359738397【Watcher-3】收到Watcher通知
【Watcher-3】连接状态:    SyncConnected
【Watcher-3】事件类型:    NodeDataChanged
【Watcher-3】节点数据更新
【Watcher-3】数据内容: 1520668958801
--------------------------------------------
【Main】更新数据成功,path:/p, stat: 34359738397,34359738398,1520697336511,1520697337551,1,0,0,0,13,0,34359738397【Watcher-3】收到Watcher通知
【Watcher-3】连接状态:    SyncConnected
【Watcher-3】事件类型:    NodeDataChanged
【Watcher-3】节点数据更新
【Watcher-3】数据内容: 1520668958801
--------------------------------------------
  • EventType.NodeChildrenChanged

放开如下代码,请问是否能监听到子节点的NodeChildrenChanged事件

这样是不能的,只会打印子节点的创建事件,原因是/p节点的子节点创建事件虽然触发,但是子节点的改变事件并没有设置watch为true。我们设置zkWatch.getChildren(PARENT_PATH,true);再次运行,发现子节点change事件在子节点创建后被打印:

【Main】开始连接ZK服务器
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:None path:null
【Watcher-1】收到Watcher通知
【Watcher-1】连接状态:    SyncConnected
【Watcher-1】事件类型:    None
【Watcher-1】成功连接上ZK服务器
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/p
【Main】节点创建成功, Path: /p, content: 1520669809911
【Watcher-2】收到Watcher通知
【Watcher-2】连接状态:    SyncConnected
【Watcher-2】事件类型:    NodeCreated
【Watcher-2】节点创建
--------------------------------------------
---------------------- read parent ----------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/p
【Main】更新数据成功,path:/p, stat: 34359738417,34359738418,1520698188655,1520698189687,1,0,0,0,13,0,34359738417【Watcher-3】收到Watcher通知
【Watcher-3】连接状态:    SyncConnected
【Watcher-3】事件类型:    NodeDataChanged
【Watcher-3】节点数据更新
【Watcher-3】数据内容: 1520669810962
--------------------------------------------
---------------------- read children path ----------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/p/c1
【Main】节点创建成功, Path: /p/c1, content: 1520669812011
【Watcher-4】收到Watcher通知
【Watcher-4】连接状态:    SyncConnected
【Watcher-4】事件类型:    NodeCreated
【Watcher-4】节点创建
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/p
【Watcher-5】收到Watcher通知
【Watcher-5】连接状态:    SyncConnected
【Watcher-5】事件类型:    NodeChildrenChanged
【Watcher-5】子节点变更
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/p
【Watcher-5】收到Watcher通知
【Watcher-5】连接状态:    SyncConnected
【Watcher-5】事件类型:    NodeChildrenChanged
【Watcher-5】子节点变更
  • EventType.NodeDeleted

我们放开如下代码:

观察打印:

【Main】开始连接ZK服务器
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:None path:null
【Watcher-1】收到Watcher通知
【Watcher-1】连接状态:    SyncConnected
【Watcher-1】事件类型:    None
【Watcher-1】成功连接上ZK服务器
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/p
【Main】节点创建成功, Path: /p, content: 1520670313945
【Watcher-2】收到Watcher通知
【Watcher-2】连接状态:    SyncConnected
【Watcher-2】事件类型:    NodeCreated
【Watcher-2】节点创建
--------------------------------------------
---------------------- read parent ----------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/p
【Main】更新数据成功,path:/p, stat: 34359738424,34359738425,1520698692709,1520698693724,1,0,0,0,13,0,34359738424【Watcher-3】收到Watcher通知
【Watcher-3】连接状态:    SyncConnected
【Watcher-3】事件类型:    NodeDataChanged
【Watcher-3】节点数据更新
【Watcher-3】数据内容: 1520670315011
--------------------------------------------
---------------------- read children path ----------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/p/c1
【Main】节点创建成功, Path: /p/c1, content: 1520670316035
【Watcher-4】收到Watcher通知
【Watcher-4】连接状态:    SyncConnected
【Watcher-4】事件类型:    NodeCreated
【Watcher-4】节点创建
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/p
【Watcher-5】收到Watcher通知
【Watcher-5】连接状态:    SyncConnected
【Watcher-5】事件类型:    NodeChildrenChanged
【Watcher-5】子节点变更
【Watcher-5】子节点列表:[c1]
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeDeleted path:/p/c1
【Main】删除节点成功,path:/p/c1
【Main】删除节点成功,path:/p
【Watcher-6】收到Watcher通知
【Watcher-6】连接状态:    SyncConnected
【Watcher-6】事件类型:    NodeDeleted
【Watcher-6】节点 /p/c1 被删除
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/p
【Watcher-7】收到Watcher通知
【Watcher-7】连接状态:    SyncConnected
【Watcher-7】事件类型:    NodeChildrenChanged
【Watcher-7】子节点变更
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeDeleted path:/p/c1
【Main】删除节点成功,path:/p/c1
【Main】删除节点成功,path:/p
【Watcher-6】收到Watcher通知
【Watcher-6】连接状态:    SyncConnected
【Watcher-6】事件类型:    NodeDeleted
【Watcher-6】节点 /p/c1 被删除
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/p
【Watcher-7】收到Watcher通知
【Watcher-7】连接状态:    SyncConnected
【Watcher-7】事件类型:    NodeChildrenChanged
【Watcher-7】子节点变更

由于我们对子节点/p/c1设置了监听:

所以当删除了/p/c1事件触发后也同时触发了子节点改变事件。

---------------------------------------------客官,好的话请打赏哦--------------------------------------------------------------------------

3分钟理解zookeeper的watcher机制相关推荐

  1. zookeeper的watcher机制

    ZooKeeper 提供了分布式数据的发布/订阅功能.一个典型的发布/阅模型系统定义了一种一对多的订阅关系,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态变化时,会通知所有订阅者,使它 ...

  2. 品味ZooKeeper之Watcher机制_2

    品味ZooKeeper之Watcher机制 本文思维导图如下: 前言 Watcher机制是zookeeper最重要三大特性数据节点Znode+Watcher机制+ACL权限控制中的其中一个,它是zk很 ...

  3. Zookeeper的Watcher机制及Watcher原理分析

    Zookeeper的Watcher机制及Watcher原理分析 1 什么是Watcher监听机制 Watcher 监听机制是 Zookeeper中非常重要的特性,我们基于zookeeper上创建的节点 ...

  4. Zookeeper之Watcher机制详解

    概念 Zookeeper提供了数据的发布/订阅功能.多个订阅者可监听某一特定主题对象(节点).当主题对象发生改变(数据内容改变,被删除等),会实时通知所有订阅者. Zookeeper采用了Watche ...

  5. zookeeper的watcher机制原理详解

    文章目录 客户端注册Watcher 服务端处理getData请求 服务端处理setData请求 客户端触发Watcher 我们都知道在zookeeper中存在watcher机制,查询服务端节点数据时可 ...

  6. ZooKeeper入门(三)zookeeper的Watcher机制

    process方法是Watcher接口中的一个回调方法,当ZooKeeper向客户端发送一个Watcher事件通知时,客户端就会对相应的process方法进行回调,从而实现对事件的处理. import ...

  7. 带你轻松理解Zookeeper的选举机制

    一,Zookeeper选举过程中服务器的状态. LOOKING:寻找leader状态,该状态下,服务器认为当前集群没有leader,会发起leader选举.在选举过程中,所有服务器的状态都是LOOKI ...

  8. zookeeper 中 Watcher 通知机制的一点理解

    首先,ZooKeeper 提供了分布式数据的发布/订阅功能. 这让我想到一种模式,观察者模式(发布订阅模式):一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能够让多个订阅者同时监听某一个主题 ...

  9. zookeeper watch java_Apache ZooKeeper Watcher 机制源码解释

    分布式系统从根本上来说就是不同节点上的进程并发执行,并且相互之间对进程的行为进行协调处理的过程.不同节点上的进程互相协调行为的过程叫做分布式同步.许多分布式系统需要一个进程作为任务的协调者,执行一些其 ...

最新文章

  1. Android-如何开发一个功能强大的图片选择器
  2. cas无法使用_并发编程中cas的这三大问题你知道吗?
  3. php 如何把u5fb,php如何将json中的unicode编码转为汉字?
  4. Google开源项目风格指南-笔记
  5. buu Windows系统密码
  6. 48. C# -- 事件
  7. chrome和safari_私人浏览器-如何在Chrome和Safari中使用隐身模式
  8. python中template是什么意思啊_Python中Template使用的一个小技巧
  9. java char i=2+#039;2#039;;_图说String(三)String中#039;+#039;和StringBuilder的区别
  10. java九九成表发_用EXCEL可多种办法生成99乘法表
  11. git升级后jenkins的报错
  12. Response AddHeader使用实例
  13. 小学计算机兴趣小组计划书,兴趣小组计划
  14. GD32F103实战笔记
  15. 简谈FPGA实现高斯滤波
  16. PHP运行的环境安装
  17. Python智能语音机器人
  18. Git新手入门视频教程
  19. Vue3+Vite3 SSR基本搭建
  20. Hadoop学习----HDFS

热门文章

  1. 数据显示ETH燃烧的有多猛
  2. 【读书笔记】计算广告(第1部分)
  3. 计算机基础知识(二)
  4. Win10 21H2 19044+vs2019 WDK驱动开发,错误 MSB8040缓解Spectre 漏洞的库以及输出SXS.DLL的垃圾信息
  5. 在windows上搭建DZ(Discuz)论坛-部署完成
  6. 如何修改安卓日志缓冲区大小?
  7. 植物肉品牌v2进军中国市场,带来牛肉糜、猪肉糜及牛肉汉堡饼等
  8. 100000行级别数据的 Excel 导入优化之路
  9. 检查python是否安装成功
  10. Jetson AGX Orin刷机教程,奶奶看完都说会了!