zookeeper主要是为了统一分布式系统中各个节点的工作状态,在资源冲突的情况下协调提供节点资源抢占,提供给每个节点了解整个集群所处状态的途径。这一切的实现都依赖于zookeeper中的事件监听和通知机制。

zookeeper中的事件和状态

事件和状态构成了zookeeper客户端连接描述的两个维度。注意,网上很多帖子都是在介绍zookeeper客户端连接的事件,但是忽略了zookeeper客户端状态的变化也是要进行监听和通知的。这里我们通过下面的两个表详细介绍zookeeper中的事件和状态(zookeeper API中被定义为@Deprecated的事件和状态就不介绍了)。

zookeeper客户端与zookeeper server连接的状态

连接状态 状态含义
KeeperState.Expired 客户端和服务器在ticktime的时间周期内,是要发送心跳通知的。这是租约协议的一个实现。客户端发送request,告诉服务器其上一个租约时间,服务器收到这个请求后,告诉客户端其下一个租约时间是哪个时间点。当客户端时间戳达到最后一个租约时间,而没有收到服务器发来的任何新租约时间,即认为自己下线(此后客户端会废弃这次连接,并试图重新建立连接)。这个过期状态就是Expired状态
KeeperState.Disconnected 就像上面那个状态所述,当客户端断开一个连接(可能是租约期满,也可能是客户端主动断开)这是客户端和服务器的连接就是Disconnected状态
KeeperState.SyncConnected 一旦客户端和服务器的某一个节点建立连接(注意,虽然集群有多个节点,但是客户端一次连接到一个节点就行了),并完成一次version、zxid的同步,这时的客户端和服务器的连接状态就是SyncConnected
KeeperState.AuthFailed zookeeper客户端进行连接认证失败时,发生该状态

需要说明的是,这些状态在触发时,所记录的事件类型都是:EventType.None。

zookeeper中的watch事件(当zookeeper客户端监听某个znode节点”/node-x”时)

zookeeper事件 事件含义
EventType.NodeCreated 当node-x这个节点被创建时,该事件被触发
EventType.NodeChildrenChanged 当node-x这个节点的直接子节点被创建、被删除、子节点数据发生变更时,该事件被触发。
EventType.NodeDataChanged 当node-x这个节点的数据发生变更时,该事件被触发
EventType.NodeDeleted 当node-x这个节点被删除时,该事件被触发。
EventType.None 当zookeeper客户端的连接状态发生变更时,即KeeperState.Expired、KeeperState.Disconnected、KeeperState.SyncConnected、KeeperState.AuthFailed状态切换时,描述的事件类型为EventType.None

watch机制

Znode发生变化(Znode本身的增加,删除,修改,以及子Znode的变化)可以通过Watch机制通知到客户端。那么要实现Watch,就必须实现org.apache.zookeeper.Watcher接口,并且将实现类的对象传入到可以Watch的方法中。Zookeeper中所有读操作(getData(),getChildren(),exists())都可以设置Watch选项。Watch事件具有one-time trigger(一次性触发)的特性,如果Watch监视的Znode有变化,那么就会通知设置该Watch的客户端。

在上述说道的所有读操作中,如果需要Watcher,我们可以自定义Watcher,如果是Boolean型变量,当为true时,则使用系统默认的Watcher,系统默认的Watcher是在Zookeeper的构造函数中定义的Watcher。参数中Watcher为空或者false,表示不启用Wather。

watch特性1:一次性触发器

客户端在Znode设置了Watch时,如果Znode内容发生改变,那么客户端就会获得Watch事件。例如:客户端设置getData("/znode1", true)后,如果/znode1发生改变或者删除,那么客户端就会得到一个/znode1的Watch事件,但是/znode1再次发生变化,那客户端是无法收到Watch事件的,除非客户端设置了新的Watch。

watch特性2:发送至客户端

Watch事件是异步发送到Client。Zookeeper可以保证客户端发送过去的更新顺序是有序的。例如:某个Znode没有设置watcher,那么客户端对这个Znode设置Watcher发送到集群之前,该客户端是感知不到该Znode任何的改变情况的。换个角度来解释:由于Watch有一次性触发的特点,所以在服务器端没有Watcher的情况下,Znode的任何变更就不会通知到客户端。不过,即使某个Znode设置了Watcher,且在Znode有变化的情况下通知到了客户端,但是在客户端接收到这个变化事件,但是还没有再次设置Watcher之前,如果其他客户端对该Znode做了修改,这种情况下,Znode第二次的变化客户端是无法收到通知的。这可能是由于网络延迟或者是其他因素导致,所以我们使用Zookeeper不能期望能够监控到节点每次的变化。Zookeeper只能保证最终的一致性,而无法保证强一致性。

watch特性3:设置watch的数据内容

Znode改变有很多种方式,例如:节点创建,节点删除,节点改变,子节点改变等等。Zookeeper维护了两个Watch列表,一个节点数据Watch列表,另一个是子节点Watch列表。getData()和exists()设置数据Watch,getChildren()设置子节点Watch。两者选其一,可以让我们根据不同的返回结果选择不同的Watch方式,getData()和exists()返回节点的内容,getChildren()返回子节点列表。因此,setData()触发内容Watch,create()触发当前节点的内容Watch或者是其父节点的子节点Watch。delete()同时触发父节点的子节点Watch和内容Watch,以及子节点的内容Watch。

Zookeeper Watcher的运行机制

1,Watch是轻量级的,其实就是本地JVM的Callback,服务器端只是存了是否有设置了Watcher的布尔类型。(源码见:org.apache.zookeeper.server.FinalRequestProcessor)
2,在服务端,在FinalRequestProcessor处理对应的Znode操作时,会根据客户端传递的watcher变量,添加到对应的ZKDatabase(org.apache.zookeeper.server.ZKDatabase)中进行持久化存储,同时将自己NIOServerCnxn做为一个Watcher callback,监听服务端事件变化
3,Leader通过投票通过了某次Znode变化的请求后,然后通知对应的Follower,Follower根据自己内存中的zkDataBase信息,发送notification信息给zookeeper客户端。
4,Zookeeper客户端接收到notification信息后,找到对应变化path的watcher列表,挨个进行触发回调。

流程图

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;/*** Zookeeper Wathcher * 本类就是一个Watcher类(实现了org.apache.zookeeper.Watcher类)* @author(alienware)* @since 2015-6-14*/
public class ZooKeeperWatcher implements Watcher {/** 定义原子变量 */AtomicInteger seq = new AtomicInteger();/** 定义session失效时间 */private static final int SESSION_TIMEOUT = 10000;/** zookeeper服务器地址 */private static final String CONNECTION_ADDR = "192.168.1.121:2181,192.168.1.122:2181,192.168.1.123:2181";/** zk父路径设置 */private static final String PARENT_PATH = "/p";/** zk子路径设置 */private static final String CHILDREN_PATH = "/p/c";/** 进入标识 */private static final String LOG_PREFIX_OF_MAIN = "【Main】";/** zk变量 */private ZooKeeper zk = null;/**用于等待zookeeper连接建立之后 通知阻塞程序继续向下执行 */private CountDownLatch connectedSemaphore = new CountDownLatch(1);/*** 创建ZK连接* @param connectAddr ZK服务器地址列表* @param sessionTimeout Session超时时间*/public void createConnection(String connectAddr, int sessionTimeout) {this.releaseConnection();try {//this表示把当前对象进行传递到其中去(也就是在主函数里实例化的new ZooKeeperWatcher()实例对象)zk = new ZooKeeper(connectAddr, sessionTimeout, this);System.out.println(LOG_PREFIX_OF_MAIN + "开始连接ZK服务器");connectedSemaphore.await();} catch (Exception e) {e.printStackTrace();}}/*** 关闭ZK连接*/public void releaseConnection() {if (this.zk != null) {try {this.zk.close();} catch (InterruptedException e) {e.printStackTrace();}}}/*** 创建节点* @param path 节点路径* @param data 数据内容* @return */public boolean createPath(String path, String data, boolean needWatch) {try {//设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)this.zk.exists(path, needWatch);System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " + this.zk.create(    /**路径*/ path, /**数据*/data.getBytes(), /**所有可见*/Ids.OPEN_ACL_UNSAFE, /**永久存储*/CreateMode.PERSISTENT ) +     ", content: " + data);} catch (Exception e) {e.printStackTrace();return false;}return true;}/*** 读取指定节点数据内容* @param path 节点路径* @return*/public String readData(String path, boolean needWatch) {try {System.out.println("读取数据操作...");return new String(this.zk.getData(path, needWatch, null));} catch (Exception e) {e.printStackTrace();return "";}}/*** 更新指定节点数据内容* @param path 节点路径* @param data 数据内容* @return*/public boolean writeData(String path, String data) {try {System.out.println(LOG_PREFIX_OF_MAIN + "更新数据成功,path:" + path + ", stat: " +this.zk.setData(path, data.getBytes(), -1));} catch (Exception e) {e.printStackTrace();return false;}return true;}/*** 删除指定节点* * @param path*            节点path*/public void deleteNode(String path) {try {this.zk.delete(path, -1);System.out.println(LOG_PREFIX_OF_MAIN + "删除节点成功,path:" + path);} catch (Exception e) {e.printStackTrace();}}/*** 判断指定节点是否存在* @param path 节点路径*/public Stat exists(String path, boolean needWatch) {try {return this.zk.exists(path, needWatch);} catch (Exception e) {e.printStackTrace();return null;}}/*** 获取子节点* @param path 节点路径*/private List<String> getChildren(String path, boolean needWatch) {try {System.out.println("读取子节点操作...");return this.zk.getChildren(path, needWatch);} catch (Exception e) {e.printStackTrace();return null;}}/*** 删除所有节点*/public void deleteAllTestPath(boolean needWatch) {if(this.exists(CHILDREN_PATH, needWatch) != null){this.deleteNode(CHILDREN_PATH);}if(this.exists(PARENT_PATH, needWatch) != null){this.deleteNode(PARENT_PATH);}        }/*** 收到来自Server的Watcher通知后的处理。*/@Overridepublic void process(WatchedEvent event) {System.out.println("进入 process 。。。。。event = " + event);try {Thread.sleep(200);} catch (InterruptedException e) {e.printStackTrace();}if (event == null) {return;}// 连接状态KeeperState keeperState = event.getState();// 事件类型EventType eventType = event.getType();// 受影响的pathString path = event.getPath();//原子对象seq 记录进入process的次数String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";System.out.println(logPrefix + "收到Watcher通知");System.out.println(logPrefix + "连接状态:\t" + keeperState.toString());System.out.println(logPrefix + "事件类型:\t" + eventType.toString());if (KeeperState.SyncConnected == keeperState) {// 成功连接上ZK服务器if (EventType.None == eventType) {System.out.println(logPrefix + "成功连接上ZK服务器");connectedSemaphore.countDown();} //创建节点else if (EventType.NodeCreated == eventType) {System.out.println(logPrefix + "节点创建");try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}} //更新节点else if (EventType.NodeDataChanged == eventType) {System.out.println(logPrefix + "节点数据更新");try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}} //更新子节点else if (EventType.NodeChildrenChanged == eventType) {System.out.println(logPrefix + "子节点变更");try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}} //删除节点else if (EventType.NodeDeleted == eventType) {System.out.println(logPrefix + "节点 " + path + " 被删除");}else ;} else if (KeeperState.Disconnected == keeperState) {System.out.println(logPrefix + "与ZK服务器断开连接");} else if (KeeperState.AuthFailed == keeperState) {System.out.println(logPrefix + "权限检查失败");} else if (KeeperState.Expired == keeperState) {System.out.println(logPrefix + "会话失效");}else ;System.out.println("--------------------------------------------");}/*** <B>方法名称:</B>测试zookeeper监控<BR>* <B>概要说明:</B>主要测试watch功能<BR>* @param args* @throws Exception*/public static void main(String[] args) throws Exception {//建立watcher //当前客户端可以称为一个watcher 观察者角色ZooKeeperWatcher zkWatch = new ZooKeeperWatcher();//创建连接 zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);//System.out.println(zkWatch.zk.toString());Thread.sleep(1000);// 清理节点zkWatch.deleteAllTestPath(false);//-----------------第一步: 创建父节点 /p ------------------------//if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "", true)) {Thread.sleep(1000);//-----------------第二步: 读取节点 /p 和    读取/p节点下的子节点(getChildren)的区别 --------------//// 读取数据zkWatch.readData(PARENT_PATH, true);// 读取子节点(监控childNodeChange事件)zkWatch.getChildren(PARENT_PATH, true);// 更新数据zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");Thread.sleep(1000);// 创建子节点zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "", true);//-----------------第三步: 建立子节点的触发 --------------//
//            zkWatch.createPath(CHILDREN_PATH + "/c1", System.currentTimeMillis() + "", true);
//            zkWatch.createPath(CHILDREN_PATH + "/c1/c2", System.currentTimeMillis() + "", true);//-----------------第四步: 更新子节点数据的触发 --------------////在进行修改之前,我们需要watch一下这个节点:Thread.sleep(1000);zkWatch.readData(CHILDREN_PATH, true);zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");}Thread.sleep(10000);// 清理节点zkWatch.deleteAllTestPath(false);Thread.sleep(10000);zkWatch.releaseConnection();}}
 

zookeeper核心原理(Watcher、事件和状态)相关推荐

  1. 深入了解Zookeeper核心原理

    ZNode 这个应该算是Zookeeper中的基础,数据存储的最小单元.在Zookeeper中,类似文件系统的存储结构,被Zookeeper抽象成了树,树中的每一个节点(Node)被叫做ZNode.Z ...

  2. zookeeper核心原理

    zookeeper名字由来 其名字汉译为动物管理员,因为Hadoop,Hbase,Hive等大数据技术的图标都是动物,而zookeeper作为Hadoop,Hbase集群的协调者来讲,像是一个动物园的 ...

  3. java zookeeper 主从热备_zookeeper 核心原理

    zookeeper 核心原理 1.了解zookeeper的设计 2.zookeeper集群角色 3.深入分析ZAB协议 4.从源码层面分析leader选举的实现过程 5.关于zookeeper的数据存 ...

  4. Zookeeper工作原理(详细)

    1.Zookeeper的角色 领导者(leader),负责进行投票的发起和决议,更新系统状态 学习者(learner),包括跟随者(follower)和观察者(observer) follower用于 ...

  5. Zookeeper系列(二)、核心原理

    上一篇我们介绍了Zookeeper的一些基础知识,本篇来讲解zk内部的一些核心原理,帮助我们更好的理解zk的工作机制. 目录 选举机制 Leader选举流程 Leader选举原理 Watch机制 会话 ...

  6. 关于linux内核的wait等待事件和wakeup的核心原理

    关于linux内核的wait等待事件和wakeup的核心原理 上图注意仔细观察. 其实所有的wait_XXX等wait_event.sleep系列函数,都是 1)设置线程状态, 2)调用schedul ...

  7. Ⅵ:zookeeper的Watcher事件监听机制

    2021最新zookeeper系列 ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤ ...

  8. 一文彻底搞懂 zookeeper 核心知识点(修订版)

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 初识 zookeeper Zookeeper 它作为Had ...

  9. 一文教你掌握 ZooKeeper 核心知识

    ZooKeeper 是一个分布式协调服务 ,由 Apache 进行维护. ZooKeeper 可以视为一个高可用的文件系统. ZooKeeper 可以用于发布/订阅.负载均衡.命令服务.分布式协调/通 ...

  10. zookeeper工作原理、安装配置、工具命令简介

    1 Zookeeper简介 Zookeeper 是分布式服务框架,主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务.状态同步服务.集群管理.分布式应用配置项的管理等等. ZooK ...

最新文章

  1. 梯度提升决策树(GBDT)与XGBoost、LightGBM
  2. 基因 ID 匹配利器
  3. 待续未完- 自己写后台内容管理程序 - 完全手写不用框架的
  4. HTTPS 证书配置
  5. linux让数值依次递增的快捷键,如何将文件名批量修改成上一级文件夹的名字。如:A(文件夹名)-01这样依次递增?...
  6. 错误: 非法的表达式开始_虽然这两个C语言宏定义很简单,但是能在程序运行前找到错误代码...
  7. hdu 1232 畅通工程 最小生成树 并查集
  8. UVA10192 Vacation【LCS+DP+记忆化递归】
  9. python列表推导式使用
  10. NYOJ 972 核桃的数量(蓝桥杯)
  11. 与体育行业有关的e–r图_国家体育产业统计分类
  12. 我的第一本社会心理学(part3)--自我概念
  13. 教你自定义Windows10微软输入法
  14. 将csv格式转换为excel后缀为xlsx
  15. 韩国WA15-6819B高性能DSP数字功放芯片
  16. WINFORM时间控件(DATATIMEPICKER)的显示格式设置
  17. LUA语言教程 [转]
  18. 软件测试面试题:关闭浏览器中quit和close的区别
  19. 试题 算法训练 逗志芃的危机 (Java实现 通俗易懂)
  20. html需要电脑什么配置,买电脑主要看什么配置和参数

热门文章

  1. SpringBoot学习笔记(15):动态数据源切换
  2. https协议为什么比http协议更加安全
  3. JavaScript开发者的工具箱
  4. 排序算法——直接选择排序
  5. 第一个Django模型
  6. 【Vegas原创】ctrl shift无法切换输入法的解决方法
  7. adobe出的cookbook
  8. ES6系列之let/const及块级作用域
  9. 正怎表达式在爬虫里的应用
  10. mysql常用sql命令