一、前言

  前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类。

二、总体框图

  对于Watcher机制而言,主要涉及的类主要如下。

  

  说明:

  Watcher,接口类型,其定义了process方法,需子类实现。

  Event,接口类型,Watcher的内部类,无任何方法。

  KeeperState,枚举类型,Event的内部类,表示Zookeeper所处的状态。

  EventType,枚举类型,Event的内部类,表示Zookeeper中发生的事件类型。

  WatchedEvent,表示对ZooKeeper上发生变化后的反馈,包含了KeeperState和EventType。

  ClientWatchManager,接口类型,表示客户端的Watcher管理者,其定义了materialized方法,需子类实现。

  ZKWatchManager,Zookeeper的内部类,继承ClientWatchManager。

  MyWatcher,ZooKeeperMain的内部类,继承Watcher。

  ServerCnxn,接口类型,继承Watcher,表示客户端与服务端的一个连接。

  WatchManager,管理Watcher。

三、Watcher源码分析

  3.1 内部类

  Event,接口类型,表示事件代表的状态,除去其内部类,其源码结构如下  

public interface Event {}

  说明:可以看到,Event接口并没有定义任何属性和方法,但是其包含了KeeperState和EventType两个内部枚举类。

  3.2 接口方法  

abstract public void process(WatchedEvent event);

  说明:其代表了实现Watcher接口时必须实现的的方法,即定义进行处理,WatchedEvent表示观察的事件。

四、Event源码分析

  3.1 内部类

  1. KeeperState  

        public enum KeeperState { // 事件发生时Zookeeper的状态/** Unused, this state is never generated by the server */@Deprecated// 未知状态,不再使用,服务器不会产生此状态Unknown (-1), /** The client is in the disconnected state - it is not connected* to any server in the ensemble. */// 断开Disconnected (0),/** Unused, this state is never generated by the server */@Deprecated// 未同步连接,不再使用,服务器不会产生此状态NoSyncConnected (1),/** The client is in the connected state - it is connected* to a server in the ensemble (one of the servers specified* in the host connection parameter during ZooKeeper client* creation). */// 同步连接状态SyncConnected (3),/*** Auth failed state*/// 认证失败状态AuthFailed (4),/*** The client is connected to a read-only server, that is the* server which is not currently connected to the majority.* The only operations allowed after receiving this state is* read operations.* This state is generated for read-only clients only since* read/write clients aren't allowed to connect to r/o servers.*/// 只读连接状态ConnectedReadOnly (5),/*** SaslAuthenticated: used to notify clients that they are SASL-authenticated,* so that they can perform Zookeeper actions with their SASL-authorized permissions.*/// SASL认证通过状态SaslAuthenticated(6),/** The serving cluster has expired this session. The ZooKeeper* client connection (the session) is no longer valid. You must* create a new client connection (instantiate a new ZooKeeper* instance) if you with to access the ensemble. */// 过期状态Expired (-112);// 代表状态的整形值private final int intValue;     // Integer representation of value// for sending over wire// 构造函数KeeperState(int intValue) {this.intValue = intValue;}// 返回整形值public int getIntValue() {return intValue;}// 从整形值构造相应的状态public static KeeperState fromInt(int intValue) {switch(intValue) {case   -1: return KeeperState.Unknown;case    0: return KeeperState.Disconnected;case    1: return KeeperState.NoSyncConnected;case    3: return KeeperState.SyncConnected;case    4: return KeeperState.AuthFailed;case    5: return KeeperState.ConnectedReadOnly;case    6: return KeeperState.SaslAuthenticated;case -112: return KeeperState.Expired;default:throw new RuntimeException("Invalid integer value for conversion to KeeperState");}}}

  说明:KeeperState是一个枚举类,其定义了在事件发生时Zookeeper所处的各种状态,其还定义了一个从整形值返回对应状态的方法fromInt。

  2. EventType 

        public enum EventType { // 事件类型// 无None (-1),// 结点创建NodeCreated (1),// 结点删除NodeDeleted (2),// 结点数据变化NodeDataChanged (3),// 结点子节点变化NodeChildrenChanged (4);// 代表事件类型的整形 private final int intValue;     // Integer representation of value// for sending over wire// 构造函数EventType(int intValue) {this.intValue = intValue;}// 返回整形public int getIntValue() {return intValue;}// 从整形构造相应的事件public static EventType fromInt(int intValue) {switch(intValue) {case -1: return EventType.None;case  1: return EventType.NodeCreated;case  2: return EventType.NodeDeleted;case  3: return EventType.NodeDataChanged;case  4: return EventType.NodeChildrenChanged;default:throw new RuntimeException("Invalid integer value for conversion to EventType");}}           }}

  说明:EventType是一个枚举类,其定义了事件的类型(如创建节点、删除节点等事件),同时,其还定义了一个从整形值返回对应事件类型的方法fromInt。

五、WatchedEvent

  5.1 类的属性  

public class WatchedEvent {// Zookeeper的状态final private KeeperState keeperState;// 事件类型final private EventType eventType;// 事件所涉及节点的路径private String path;
}

  说明:WatchedEvent类包含了三个属性,分别代表事件发生时Zookeeper的状态、事件类型和发生事件所涉及的节点路径。

  5.2 构造函数

  1. public WatchedEvent(EventType eventType, KeeperState keeperState, String path)型构造函数 

    public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {// 初始化属性this.keeperState = keeperState;this.eventType = eventType;this.path = path;}

  说明:构造函数传入了三个参数,然后分别对属性进行赋值操作。

  2. public WatchedEvent(WatcherEvent eventMessage)型构造函数  

    public WatchedEvent(WatcherEvent eventMessage) {// 从eventMessage中取出相应属性进行赋值keeperState = KeeperState.fromInt(eventMessage.getState());eventType = EventType.fromInt(eventMessage.getType());path = eventMessage.getPath();}

  说明:构造函数传入了WatcherEvent参数,之后直接从该参数中取出相应属性进行赋值操作。

  对于WatchedEvent类的方法而言,相对简单,包含了几个getXXX方法,用于获取相应的属性值。

六、ClientWatchManager

  6.1 接口方法 

public Set<Watcher> materialize(Watcher.Event.KeeperState state,Watcher.Event.EventType type, String path);

  说明:该方法表示事件发生时,返回需要被通知的Watcher集合,可能为空集合。

七、ZKWatchManager

  7.1 类的属性 

    private static class ZKWatchManager implements ClientWatchManager {// 数据变化的Watchersprivate final Map<String, Set<Watcher>> dataWatches =new HashMap<String, Set<Watcher>>();// 节点存在与否的Watchersprivate final Map<String, Set<Watcher>> existWatches =new HashMap<String, Set<Watcher>>();// 子节点变化的Watchersprivate final Map<String, Set<Watcher>> childWatches =new HashMap<String, Set<Watcher>>();}

  说明:ZKWatchManager实现了ClientWatchManager,并定义了三个Map键值对,键为节点路径,值为Watcher。分别对应数据变化的Watcher、节点是否存在的Watcher、子节点变化的Watcher。

  7.2 核心方法分析

  1. materialize方法

        public Set<Watcher> materialize(Watcher.Event.KeeperState state,Watcher.Event.EventType type,String clientPath){// 新生成结果Watcher集合Set<Watcher> result = new HashSet<Watcher>();switch (type) { // 确定事件类型case None: // 无类型// 添加默认Watcher
                result.add(defaultWatcher);// 是否需要清空(提取对zookeeper.disableAutoWatchReset字段进行配置的值、Zookeeper的状态是否为同步连接)boolean clear = ClientCnxn.getDisableAutoResetWatch() &&state != Watcher.Event.KeeperState.SyncConnected;synchronized(dataWatches) { // 同步块for(Set<Watcher> ws: dataWatches.values()) {// 添加至结果集合
                        result.addAll(ws);}if (clear) { // 是否需要清空
                        dataWatches.clear();}}synchronized(existWatches) { // 同步块 for(Set<Watcher> ws: existWatches.values()) {// 添加至结果集合
                        result.addAll(ws);}if (clear) { // 是否需要清空
                        existWatches.clear();}}synchronized(childWatches) { // 同步块for(Set<Watcher> ws: childWatches.values()) {// 添加至结果集合
                        result.addAll(ws);}if (clear) { // 是否需要清空
                        childWatches.clear();}}// 返回结果return result;case NodeDataChanged: // 节点数据变化case NodeCreated: // 创建节点synchronized (dataWatches) { // 同步块// 移除clientPath对应的Watcher后全部添加至结果集合
                    addTo(dataWatches.remove(clientPath), result);}synchronized (existWatches) { // 移除clientPath对应的Watcher后全部添加至结果集合
                    addTo(existWatches.remove(clientPath), result);}break;case NodeChildrenChanged: // 节点子节点变化synchronized (childWatches) {// 移除clientPath对应的Watcher后全部添加至结果集合
                    addTo(childWatches.remove(clientPath), result);}break;case NodeDeleted: // 删除节点synchronized (dataWatches) { // 移除clientPath对应的Watcher后全部添加至结果集合
                    addTo(dataWatches.remove(clientPath), result);}// XXX This shouldn't be needed, but just in casesynchronized (existWatches) {// 移除clientPath对应的WatcherSet<Watcher> list = existWatches.remove(clientPath);if (list != null) {// 移除clientPath对应的Watcher后全部添加至结果集合
                        addTo(existWatches.remove(clientPath), result);LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");}}synchronized (childWatches) {// 移除clientPath对应的Watcher后全部添加至结果集合
                    addTo(childWatches.remove(clientPath), result);}break;default: // 缺省处理String msg = "Unhandled watch event type " + type+ " with state " + state + " on path " + clientPath;LOG.error(msg);throw new RuntimeException(msg);}// 返回结果集合return result;}}

  说明:该方法在事件发生后,返回需要被通知的Watcher集合。在该方法中,首先会根据EventType类型确定相应的事件类型,然后根据事件类型的不同做出相应的操作,如针对None类型,即无任何事件,则首先会从三个键值对中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合;针对NodeDataChanged和NodeCreated事件而言,其会从dataWatches和existWatches中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合。

八、总结

  针对Watcher机制的第一部分的源码分析就已经完成,可以看到此部分的源码相对简单,之后会分析org.apache.zookeeper.server下的WatchManager和ClientWatchManager所在外部类ZooKeeper,也谢谢各位园友的观看~

转载于:https://www.cnblogs.com/leesf456/p/6286827.html

【Zookeeper】源码分析之Watcher机制(一)相关推荐

  1. zookeeper源码分析之六session机制

    zookeeper中session意味着一个物理连接,客户端连接服务器成功之后,会发送一个连接型请求,此时就会有session 产生. session由sessionTracker产生的,sessio ...

  2. zookeeper源码分析之四服务端(单机)处理请求流程

    上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析 ...

  3. zookeeper源码分析之五服务端(集群leader)处理请求流程

    leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcesso ...

  4. zookeeper源码分析之三客户端发送请求流程

    znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的 ...

  5. Zookeeper源码分析(二) ----- zookeeper日志

    zookeeper源码分析系列文章: Zookeeper源码分析(一) ----- 源码运行环境搭建 原创博客,纯手敲,转载请注明出处,谢谢! 既然我们是要学习源码,那么如何高效地学习源代码呢?答案就 ...

  6. Memcached源码分析 - 内存存储机制Slabs(5)

    Memcached源码分析 - 网络模型(1) Memcached源码分析 - 命令解析(2) Memcached源码分析 - 数据存储(3) Memcached源码分析 - 增删改查操作(4) Me ...

  7. Dubbo系列(二)源码分析之SPI机制

    Dubbo系列(二)源码分析之SPI机制 在阅读Dubbo源码时,常常看到 ExtensionLoader.getExtensionLoader(*.class).getAdaptiveExtensi ...

  8. zookeeper源码分析之恢复事务日志

    zookeeper源码分析之恢复事务日志 前言 源码分析 查看事务日志命令 总结 前言 本文是基于zookeeper集群启动过程分析(https://blog.csdn.net/weixin_4244 ...

  9. Python3.5源码分析-垃圾回收机制

    Python3源码分析 本文环境python3.5.2. 参考书籍<<Python源码剖析>> python官网 Python3的垃圾回收概述 随着软硬件的发展,大多数语言都已 ...

  10. 鸿蒙内核源码分析:调度机制篇

    作者 | 深入研究鸿蒙,鸿蒙内核发烧友 出品 | CSDN(ID:CSDNnews) 头图 | CSDN 下载自东方 IC 阅读之前建议先读本系列其他文章,以便对本文任务调度机制的理解. 为什么要学这 ...

最新文章

  1. PyTorch 源码解读之分布式训练了解一下?
  2. 合并分支到master上
  3. 伏威谈淘宝网的高并发处理与压力测试(转)
  4. 【POJ1083】 Moving Tables (并行的搬运)
  5. java中equals方法的用法以及==的用法(转)
  6. 好慌!支付宝App现“不锈钢内裤” 官方解释:已改为“煮内裤的锅”
  7. Fibonacci算法
  8. 最详细的JavaScript高级教程(十一)正则表达式
  9. oracle12c不使用cdb模式,oracle 12c non-cdb升级成cdb模式
  10. 北上杭是梦!“郑福贵”才是中国智慧城市的真相
  11. 部署ROS2 Bouncy版本时遇到的一些问题
  12. 麓言科技设计师你要有想法
  13. axios get怎么还会显示跨域_axios 跨域问题的解决 (接口 Phal 框架)
  14. python地图匹配_基于隐马尔科夫模型(HMM)的地图匹配(Map-Matching)算法
  15. flux和redux
  16. shell命令:打印除第一列外所有列
  17. 【开源电机驱动】锁定反相驱动
  18. Java 什么是反射及反射的应用
  19. 中学教师计算机运用培训简报,第十中学“教育信息化能力提升”培训活动简报...
  20. python画素描画_Python素描画的两种程序解析

热门文章

  1. 下一代云计算?容器云和微服务时代的来临
  2. 一场员工高管间的口水战,员工输了
  3. 通知:小密圈暂停服务
  4. 计算机网络负载均衡图片,负载均衡计算机网络课程网.ppt
  5. Coursera机器学习week11 单元测试
  6. Kail Linux渗透测试教程之ARP侦查Netdiscover端口扫描Zenmap与黑暗搜索引擎Shodan
  7. Requirements Analysis with 'pseud-Formal' Method
  8. 今天开始每天一点ffmpeg知识。千里之行 。
  9. 解决从github下载项目速度过慢
  10. c/c++的预处理定义 Stringizing Operator (#) Charizing Operator (#@) Token-Pasting Operator (##)