文章目录

  • Watch 机制
    • API 使用
    • Watch 机制的底层原理
    • 客户端 Watch 注册实现过程 ZKWatchManager
    • 服务端 Watch 注册实现过程 WatchManager
    • 服务端 Watch 事件的触发过程
    • 客户端回调的处理过程
    • 小结
  • 实现一个分布式的发布订阅功能


Watch 机制

ZooKeeper 又一关键技术——Watch 监控机制 。


API 使用

ZooKeeper 的客户端可以通过 Watch 机制来订阅当服务器上某一节点的数据或状态发生变化时收到相应的通知,我们可以通过向 ZooKeeper 客户端的构造方法中传递 Watcher 参数的方式实现

new ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
  • connectString 服务端地址

  • sessionTimeout:超时时间

  • Watcher:监控事件

Watcher 将作为整个 ZooKeeper 会话期间的上下文 ,一直被保存在客户端 ZKWatchManager 的 defaultWatcher 中 .


除此之外,ZooKeeper 客户端也可以通过 getDataexistsgetChildren 三个接口来向 ZooKeeper 服务器注册 Watcher,从而方便地在不同的情况下添加 Watch 事件

getData(String path, Watcher watcher, Stat stat)

知道了 ZooKeeper 添加服务器监控事件的方式,下面我们来讲解一下触发通知的条件

上图中列出了客户端在不同会话状态下,相应的在服务器节点所能支持的事件类型。

例如在客户端连接服务端的时候,可以对数据节点的创建、删除、数据变更、子节点的更新等操作进行监控。


Watch 机制的底层原理


其结构很像设计模式中的”观察者模式“,一个对象或者数据节点可能会被多个客户端监控,当对应事件被触发时,会通知这些对象或客户端。

我们可以将 Watch 机制理解为是分布式环境下的观察者模式。

所以接下来就以观察者模式的角度点来看看 ZooKeeper 底层 Watch 是如何实现的。

通常我们在实现观察者模式时,最核心或者说关键的代码就是创建一个列表来存放观察者。

而在 ZooKeeper 中则是在客户端和服务器端分别实现两个存放观察者列表,即:ZKWatchManager 和 WatchManager。

其核心操作就是围绕着这两个展开的。


客户端 Watch 注册实现过程 ZKWatchManager

在发送一个 Watch 监控事件的会话请求时,ZooKeeper 客户端主要做了两个工作:

1. 标记该会话是一个带有 Watch 事件的请求
2. 将 Watch 事件存储到 ZKWatchManager

我们以 getData 接口为例子

当发送一个带有 Watch 事件的请求时,客户端首先会把该会话标记为带有 Watch 监控的事件请求,之后通过 DataWatchRegistration 类来保存 watcher 事件和节点的对应关系

public byte[] getData(final String path, Watcher watcher, Stat stat){...WatchRegistration wcb = null;if (watcher != null) {wcb = new DataWatchRegistration(watcher, clientPath);}RequestHeader h = new RequestHeader();request.setWatch(watcher != null);...GetDataResponse response = new GetDataResponse();ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);}

之后客户端向服务器发送请求时,是将请求封装成一个 Packet 对象,并添加到一个等待发送队列 outgoingQueue 中:

public Packet queuePacket(RequestHeader h, ReplyHeader r,...) {Packet packet = null;...packet = new Packet(h, r, request, response, watchRegistration);...outgoingQueue.add(packet); ...return packet;}

最后,ZooKeeper 客户端就会向服务器端发送这个请求,完成请求发送后。调用负责处理服务器响应的 SendThread 线程类中的 readResponse 方法接收服务端的回调,并在最后执行 finishPacket()方法将 Watch 注册到 ZKWatchManager 中:

private void finishPacket(Packet p) {int err = p.replyHeader.getErr();if (p.watchRegistration != null) {p.watchRegistration.register(err);}...}

服务端 Watch 注册实现过程 WatchManager

下面我们来看一下服务端是如何处理一个 Watch 事件。

Zookeeper 服务端处理 Watch 事件基本有 2 个过程:

1. 解析收到的请求是否带有 Watch 注册事件
2. 将对应的 Watch 事件存储到 WatchManager

当 ZooKeeper 服务器接收到一个客户端请求后,首先会对请求进行解析,判断该请求是否包含 Watch 事件.

ZooKeeper 底层是通过 FinalRequestProcessor 类中的 processRequest 函数实现的。当 getDataRequest.getWatch() 值为 True 时,表明该请求需要进行 Watch 监控注册。并通过 zks.getZKDatabase().getData 函数将 Watch 事件注册到服务端的 WatchManager 中

public void processRequest(Request request) {...byte b[] =   zks.getZKDatabase().getData(getDataRequest.getPath(), stat,getDataRequest.getWatch() ? cnxn : null);rsp = new GetDataResponse(b, stat);..}

服务端 Watch 事件的触发过程

在客户端和服务端都对 watch 注册完成后,我们接下来看一下在 ZooKeeper 中触发一个 Watch 事件的底层实现过程:

以 setData 接口即“节点数据内容发生变更”事件为例。在 DataTree#setData 方法内部执行完对节点数据的变更后,会调用 WatchManager.triggerWatch 方法触发数据变更事件。

public Stat setData(String path, byte data[], ...){Stat s = new Stat();DataNode n = nodes.get(path);...dataWatches.triggerWatch(path, EventType.NodeDataChanged);return s;}

那看下 triggerWatch

首先,封装了一个具有会话状态、事件类型、数据节点 3 种属性的 WatchedEvent 对象。之后查询该节点注册的 Watch 事件,如果为空说明该节点没有注册过 Watch 事件。如果存在 Watch 事件则添加到定义的 Wathcers 集合中,并在 WatchManager 管理中删除。最后,通过调用 process 方法向客户端发送通知。

 Set<Watcher> triggerWatch(String path, EventType type...) {WatchedEvent e = new WatchedEvent(type,  KeeperState.SyncConnected, path);Set<Watcher> watchers;synchronized (this) {watchers = watchTable.remove(path);...for (Watcher w : watchers) {Set<String> paths = watch2Paths.get(w);if (paths != null) {paths.remove(path);}}}for (Watcher w : watchers) {if (supress != null && supress.contains(w)) {continue;}w.process(e);}return watchers;}

客户端回调的处理过程

知道了服务器端 Watch 事件的触发过程后,我们来看一下客户端接收到通知后如何进行操作的。

客户端使用 SendThread.readResponse() 方法来统一处理服务端的相应。

首先反序列化服务器发送请求头信息 replyHdr.deserialize(bbia, “header”),并判断相属性字段 xid 的值为 -1,表示该请求响应为通知类型。在处理通知类型时,首先将己收到的字节流反序列化转换成 WatcherEvent 对象。

接着判断客户端是否配置了 chrootPath 属性,如果为 True 说明客户端配置了 chrootPath 属性。需要对接收到的节点路径进行 chrootPath 处理。

最后调用 eventThread.queueEvent( )方法将接收到的事件交给 EventThread 线程进行处理

if (replyHdr.getXid() == -1) {...WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");...if (chrootPath != null) {String serverPath = event.getPath();if(serverPath.compareTo(chrootPath)==0)event.setPath("/");...event.setPath(serverPath.substring(chrootPath.length()));...}WatchedEvent we = new WatchedEvent(event);...eventThread.queueEvent( we );}

接下来我们来看一下 EventThread.queueEvent() 方法内部的执行逻辑。

其主要工作分为 2 点:
第 1 步按照通知的事件类型,从 ZKWatchManager 中查询注册过的客户端 Watch 信息。客户端在查询到对应的 Watch 信息后,会将其从 ZKWatchManager 的管理中删除。因此这里也请你多注意,客户端的 Watcher 机制是一次性的,触发后就会被删除。

public Set<Watcher> materialize(...){Set<Watcher> result = new HashSet<Watcher>();...switch (type) {...case NodeDataChanged:case NodeCreated:synchronized (dataWatches) {addTo(dataWatches.remove(clientPath), result);}synchronized (existWatches) {addTo(existWatches.remove(clientPath), result);}break;....}return result;}

完成了第 1 步工作获取到对应的 Watcher 信息后,将查询到的 Watcher 存储到 waitingEvents 队列中,调用 EventThread 类中的 run 方法会循环取出在 waitingEvents 队列中等待的 Watcher 事件进行处理。

public void run() {try {isRunning = true;while (true) {Object event = waitingEvents.take();if (event == eventOfDeath) {wasKilled = true;} else {processEvent(event);}if (wasKilled)synchronized (waitingEvents) {if (waitingEvents.isEmpty()) {isRunning = false;break;}}}...}

最后调用 processEvent(event) 方法来最终执行实现了 Watcher 接口的 process()方法。

private void processEvent(Object event) {...if (event instanceof WatcherSetEventPair) {WatcherSetEventPair pair = (WatcherSetEventPair) event;for (Watcher watcher : pair.watchers) {try {watcher.process(pair.event);} catch (Throwable t) {LOG.error("Error while calling watcher ", t);}}}}

小结

ZooKeeper 中 Watch 机制的,大体上ZooKeeper 实现的方式是通过客服端和服务端分别创建有观察者的信息列表。客户端调用 getData、exist 等接口时,首先将对应的 Watch 事件放到本地的 ZKWatchManager 中进行管理。服务端在接收到客户端的请求后根据请求类型判断是否含有 Watch 事件,并将对应事件放到 WatchManager 中进行管理。

在事件触发的时候服务端通过节点的路径信息查询相应的 Watch 事件通知给客户端,客户端在接收到通知后,首先查询本地的 ZKWatchManager 获得对应的 Watch 信息处理回调操作。

这种设计不但实现了一个分布式环境下的观察者模式,而且通过将客户端和服务端各自处理 Watch 事件所需要的额外信息分别保存在两端,减少彼此通信的内容,提升了服务的处理性能。


实现一个分布式的发布订阅功能

来搞个实际应用来加深我们对 ZooKeeper 中 Watch 机制的理解。

提到 ZooKeeper 的应用场景,可能第一时间会想到最为典型的发布订阅功能。

发布订阅功能可以看作是一个一对多的关系,即一个服务或数据的发布者可以被多个不同的消费者调用。一般一个发布订阅模式的数据交互可以分为消费者主动请求生产者信息的拉取模式,和生产者数据变更时主动推送给消费者的推送模式。ZooKeeper 采用了两种模式结合的方式实现订阅发布功能。

下面我们来分析一个具体案例:

在系统开发的过程中会用到各种各样的配置信息,如数据库配置项、第三方接口、服务地址等,我们可以用配置管理功能自动完成服务器配置信息的维护,利用ZooKeeper 的发布订阅功能就能解决这个问题。

可以把诸如数据库配置项这样的信息存储在 ZooKeeper 数据节点中。比如下图中的 /confs/data_item1。

服务器集群客户端对该节点添加 Watch 事件监控,当集群中的服务启动时,会读取该节点数据获取数据配置信息。而当该节点数据发生变化时,ZooKeeper 服务器会发送 Watch 事件给各个客户端,集群中的客户端在接收到该通知后,重新读取节点的数据库配置信息。

我们使用 Watch 机制实现了一个分布式环境下的配置管理功能,通过对 ZooKeeper 服务器节点添加数据变更事件,实现当数据库配置项信息变更后,集群中的各个客户端能接收到该变更事件的通知,并获取最新的配置信息。要注意一点是,我们提到 Watch 具有一次性,所以当我们获得服务器通知后要再次添加 Watch 事件。

Apache ZooKeeper - Watch 机制的底层原理相关推荐

  1. mysql哨兵机制_Redis 哨兵机制以及底层原理深入解析,这次终于搞清楚了

    前面我们基于实际案例搭建了缓存高可用方案(分布式缓存高可用方案,我们都是这么干的)同时提到了redis主从架构下是如何保证高可用的,讲到了它是通过redis sentinel的机制来实现的. 今天我们 ...

  2. 最容易理解的反射机制的底层原理

    看了好多关于Java反射机制的文章,大多都太过官方,消化起来比较稍显费劲,本篇,我会依据自己的理解去阐述什么是Java的反射机制,反射用在什么地方,以及怎么来使用? 开篇前,我们还是要了解一下,什么是 ...

  3. iOS之深入解析objc_msgSend消息转发机制的底层原理

    一.抛砖引玉 objc_msgSend() 消息发送的过程就是 通过 SEL 查找 IMP 的过程 . objc_msgSend() 是用 汇编语言 实现的,使用汇编实现的优势是: 消息发送的过程需要 ...

  4. java语言中 负责并发编程的机制是_Java并发编程艺术-并发机制的底层原理实现...

    Java编程语言允许线程访问共享变量,为了确保共享变量能被准确和一致的更新,线程应该确保通过排他锁单独获得这个变量. volatile借助Java内存模型保证所有线程能够看到最新的值.(内存可见性) ...

  5. 【MySQL进阶】MySQL事务隔离与锁机制底层原理万字总结(建议收藏!!)

    [MySQL进阶]MySQL事务隔离与锁机制底层原理万字总结(建议收藏!!) 参考资料: 美团技术团队:Innodb中事务隔离级别和锁的关系 数据库的锁,到底锁的是什么? 阿里面试:说说一致性读实现原 ...

  6. Java集合框架底层原理

    Java集合框架 Java集合框架 List集合 ArrayList底层实现原理 ArrayList数组扩容技术(数组拷贝) 扩容大小 查询和删除 集合中的泛型 LinkedList Vector 线 ...

  7. Java并发机制的底层实现原理

    Java代码在编译后会变成Java字节码,字节码被类加载器加载到JVM里,JVM执行字节码,最终需要转化为汇编指令在CPU上执行,Java中所使用的并发机制依赖于JVM的实现和CPU的指令.本章我们将 ...

  8. 《Java并发编程的艺术》一一第2章Java并发机制的底层实现原理

    第2章Java并发机制的底层实现原理 2.1 volatile的应用 Java代码在编译后会变成Java字节码,字节码被类加载器加载到JVM里,JVM执行字节码,最终需要转化为汇编指令在CPU上执行, ...

  9. Go语言潜力有目共睹,但它的Goroutine机制底层原理你了解吗?

    来源 | 后端技术指南针(ID:gh_ed1e2b37dcb6) Go语言的巨大潜力有目共睹,今天我们来学习Go语言的Goroutine机制,这也可能是Go语言最为吸引人的特性了,理解它对于掌握Go语 ...

最新文章

  1. SQL查询1064报错 [ERR] 1064 - You have an error in your SQL syntax; check the manual.......
  2. python写出的程序如何给别人使用-利用这10个工具,你可以写出更好的Python代码...
  3. 从喧闹与富有中搞懂搜索和拓扑
  4. 循环结构作业c语言,C语言循环结构练习题带答案(最新整理)
  5. centos7 下安装mysql5.7
  6. devstack mysql_DevStack部署OpenStack开发环境 - 问题总结
  7. [AppScan深入浅出]修复漏洞:会话标识未更新
  8. 五子棋小游戏(C++)
  9. 三角网导线平差实例_三角网闭合导线计算()
  10. excel画风玫瑰图_如何用excel制作风向玫瑰图
  11. 谷歌浏览器不支持ocx控件
  12. java后台生成二维码以及页面显示二维码方式
  13. 弗拉基米尔·多罗宁_罗紫琳新欢俄亿万富豪 女星与老外的那些事儿
  14. 批处理命令一日一教学
  15. 【MIS你了解多少】你现在遇到的问题都是宝贵的财富
  16. 10-113 A1-7在产品表中找出库存量小于订购量的产品信息
  17. 如何排查CPU 100%的应用
  18. 【Python】照片扩展信息提取
  19. 计算机网络编辑员题目,大学生考证:网络编辑考试
  20. Linux安装tomcat详细教程

热门文章

  1. php 接口数组排序,php 数组排序
  2. android 之intent(意图)详解
  3. java.lang.NullPointerException: Attempt to invoke virtual method 'int java.lang.Integer.intValue()'
  4. C语言指针是什么?1分钟彻底理解C语言指针的概念
  5. bagging 与boosting
  6. Ubuntu 里的Spyder不能切换中文输入
  7. pytorch 实现transformer
  8. tableau必知必会之拖拽功能失效是怎么回事
  9. 数据挖掘十大算法--K-均值聚类算法
  10. nodejs html引用js_NodeJS与模块系统