Watcher触发

  • 我们从实际操作时候的表现来看Watcher的触发,比如Zookeeper中NodeDataChanged时间的触发是“Watcher监听的对应数据节点的数据内容发生变更”,需要修改节点数据那么必然和数据节点存储的位置DataTree有关系,我们从这里去寻找修改后触发Watcher的答案。
  • 我们从DataTree类中找到了修改节点的入口setData方法,我们从上篇内容中也知道了ServerCnxn存储到WatchManager中,并且以不同的维度存储了两份数据:
public Stat setData(String path, byte data[], int version, long zxid,long time) throws KeeperException.NoNodeException {Stat s = new Stat();DataNode n = nodes.get(path);if (n == null) {throw new KeeperException.NoNodeException();}byte lastdata[] = null;synchronized (n) {lastdata = n.data;n.data = data;n.stat.setMtime(time);n.stat.setMzxid(zxid);n.stat.setVersion(version);n.copyStat(s);}// now update if the path is in a quota subtree.String lastPrefix;if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {this.updateBytes(lastPrefix, (data == null ? 0 : data.length)- (lastdata == null ? 0 : lastdata.length));}dataWatches.triggerWatch(path, EventType.NodeDataChanged);return s;}
  • 以上setData方法流程就两个步骤:

    • 利用Path从存储节点的ConcurrentHashMap中获取节点信息,
    • 修改节点信息
    • 调用WatchManager 的triggerWatch方法
  • 可以看到以上代码是通过调用WatchManager的triggerWatch方法来触发相关事件
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {WatchedEvent e = new WatchedEvent(type,KeeperState.SyncConnected, path);HashSet<Watcher> watchers;synchronized (this) {watchers = watchTable.remove(path);......for (Watcher w : watchers) {HashSet<String> paths = watch2Paths.get(w);if (paths != null) {paths.remove(path);}}}for (Watcher w : watchers) {if (supress != null && supress.contains(w)) {continue;}w.process(e);}return watchers;}
  • 如上是triggerWatch源码中的触发逻辑有如下几个步骤:

    • 封装WatchedEvent:首先从通知参数中获取到通知状态KeeperState,事件类型EventType,节点路径Path封装成一个WatchedEvent对象
    • 查询Watcher对象:根据数据节点的节点路径从WatchTable中取出对应的Watcher,如果没有找到watcher,说明没有任何客户端在这个节点上注册过Watcher,直接退出。找到了这个Watcher将他取出来,同时直接从watchTable和watch2Path中删掉------这个步骤可以看出,watcher在服务端是一次性的,即触发一次就失效了。
    • 调用process方法来触发Watcher: 在最后的for循环中依次取出每一个watcher来调用每一个的process,我们看一下他的实现类有N多个如下图,实际调用是哪一个呢,我们得从之前的注册代码中去找答案。之前Zookeeper服务器注册到WatchManager中的是watcher的实现类ServerCnxn,所有我们直接看ServerCnxn的process实现方法就可以
  • 以下ServerCnxn对应的process是一个抽象方法,他的实现是NIOServerCnxn的实现:
public abstract void process(WatchedEvent event);
synchronized public void process(WatchedEvent event) {ReplyHeader h = new ReplyHeader(-1, -1L, 0);if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,"Deliver event " + event + " to 0x"+ Long.toHexString(this.sessionId)+ " through " + this);}// Convert WatchedEvent to a type that can be sent over the wireWatcherEvent e = event.getWrapper();sendResponse(h, e, "notification");}
  • 以上代码片段可以看出process方法逻辑比较简单以下几个步骤:

    • 将请求头标记为-1,标识当前是一个通知
    • 将watchedEvent包装成WatcherEvent,以方便进行网络传输序列化(之前篇解释过WatcherEvent用来网络传输用)
    • 向客户端发送该通知
  • 我们从上面步骤看其实他并没有处理客户端Watcher的逻辑,只是借用当前客户端连接的ServerCnxn对象来实现对客户端WatchedEvent的传递,真正的客户端Watcher回调与业务逻辑的执行肯定都在客户端这边

客户端回调Watcher

  • 对于一个来自服务器的通知是通ServerCnxn中发送出来的,同样的,客户端这边的响应也是在一个类似的类中ClientCnxn中,ClientCnxn中通过SendThread来收事件通知
SendThread接收事件通知
  • 我们来看下ClientCnxn的接收通知的源码处理:
class SendThread extends ZooKeeperThread {private long lastPingSentNs;private final ClientCnxnSocket clientCnxnSocket;private Random r = new Random(System.nanoTime());        private boolean isFirstConnect = true;void readResponse(ByteBuffer incomingBuffer) throws IOException {ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);ReplyHeader replyHdr = new ReplyHeader();replyHdr.deserialize(bbia, "header");......if (replyHdr.getXid() == -1) {// -1 means notificationif (LOG.isDebugEnabled()) {LOG.debug("Got notification sessionid:0x"+ Long.toHexString(sessionId));}WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");// convert from a server path to a client pathif (chrootPath != null) {String serverPath = event.getPath();if(serverPath.compareTo(chrootPath)==0)event.setPath("/");else if (serverPath.length() > chrootPath.length())event.setPath(serverPath.substring(chrootPath.length()));else {LOG.warn("Got server path " + event.getPath()+ " which is too short for chroot path "+ chrootPath);}}WatchedEvent we = new WatchedEvent(event);if (LOG.isDebugEnabled()) {LOG.debug("Got " + we + " for sessionid 0x"+ Long.toHexString(sessionId));}eventThread.queueEvent( we );return;}......}
  • 对于一个服务端的响应客户端由SendThread.readResponse(ByteBuffer incomingBuffer)方法来统一处理,如果响应头replyHdr中标识的XID为-1,标识这个是一个通知类型响应,对其的处理大体上分为4个步骤

    • 反序列化:replyHdr.deserialize(bbia, “header”); 方法Zookeeper客户端接收到请求首先将字节流转换成WatcherEvent对象
    • 处理Chrootpath:如果客户端设置了chrootPath属性,那么要对服务端传过来的完整的节点路径进行chrootPath处理,生成客户端的一个相对节点路径,例如客户端设置差rootPath为/appl,那么针对服务器端传来的响应节点路径为/appl/locks,经过chrootPath处理后,就变成相对路径/locks。
    • 还原WatchedEvent:通过接收到的WatcherEvent得到WatchedEvent
    • 回调Watcher,最后将WatchedEvent对象交给EventThread线程,在下一个轮询周期中进行Watcher回调。
EventThread处理事件通知
  • 如上流程中,服务的的Watcher时间通知最终交给了EventThread线程来处理,EventThread是Zookeeper客户端中专门用来处理服务器端通知的事件线程,我们看下EventThread中是怎么处理的。由上面代码中queueEvent 方法入口:
public void queueEvent(WatchedEvent event) {if (event.getType() == EventType.None&& sessionState == event.getState()) {return;}sessionState = event.getState();// materialize the watchers based on the eventWatcherSetEventPair pair = new WatcherSetEventPair(watcher.materialize(event.getState(), event.getType(),event.getPath()),event);// queue the pair (watch set & event) for later processingwaitingEvents.add(pair);}
  • 如上 QueueEvent方法首先根据该通知事件从ZKWatchManager中取出所有相关Watcher,materalize方法如下:
    private final Map<String, Set<Watcher>> dataWatches =new HashMap<String, Set<Watcher>>();private final Map<String, Set<Watcher>> existWatches =new HashMap<String, Set<Watcher>>();private final Map<String, Set<Watcher>> childWatches =new HashMap<String, Set<Watcher>>();public Set<Watcher> materialize(Watcher.Event.KeeperState state,Watcher.Event.EventType type,String clientPath){Set<Watcher> result = new HashSet<Watcher>();switch (type) {case None:......case NodeDataChanged:case NodeCreated:synchronized (dataWatches) {addTo(dataWatches.remove(clientPath), result);}synchronized (existWatches) {addTo(existWatches.remove(clientPath), result);}break;......}}final private void addTo(Set<Watcher> from, Set<Watcher> to) {if (from != null) {to.addAll(from);}}
  • 以上代码中客户端识别出EventType后,会从相应的Watcher存储(即上代码中dataWatches,existWatches,childWatches中一个或者多个,比如NodeCreated 事件类型从dataWatches ,和 existWatches中所有watcher)中去掉对应的Watcher,此处用的remove,标识客户端的Watcher机制统一也是一次性的,触发后该Watcher就失效了。
  • 接着利用Watcher封装成一个WatcherSetEventPair,并且将这个对象加入一个阻塞队列LinkedBlockingQueue 中,并且我们在ClientCnxn类中能找到一个run方法,这个方法会不断的从阻塞队列中take出数据然后再发送对应的通知process进行串行同步处理,这里的Watcher才是真正的客户端注册的Watcher,调用这个Watcher的process方法就可以实现回调了,保证FIFO。
@Overridepublic void run() {try {isRunning = true;while (true) {Object event = waitingEvents.take();if (event == eventOfDeath) {wasKilled = true;} else {processEvent(event);}......}} catch (InterruptedException e) {......}
......}

Watcher特性总结

  • 通过我们上面的分析,了解了Watcher机制的相关接口定义以及Watcher的各类事件,我们以Zookeeper节点的数据内容过期接口为例,从Zookeeper客户端进行Watcher注册,服务的处理Watcher以及客户端回调watcher三方面阶段讲解了ZooKeeper的Watcher工作机制,发现Watcher有以下几个特点:
一次性
  • 无论是客户端还是服务端,一个Watcher被触发,ZooKeeper都会将其从相应的存储中移除,因此,开发人员在Watcher的使用上要记住的一点是要反复注册,这样的设计有效的减轻了服务端的压力,如果注册一个Watcher后一直有效,针对那些非常频繁的节点,服务端会不断向客户端发送事件通知,无论对网络还是服务器性能都有非常大影响
客户端串行执行
  • 客户端watcher回调的过程是一个串行同步的过程,这保证了顺序性,同事需要开发人员注意不要因为一个Watcher的处理逻辑影响了整个客户端Watcher回调
轻量级
  • WatchedEvent是整个ZooKeeperWatcher通知机制的最小单元,这个数据结构只包含三部分内容:通知状态,事件类型,节点路径。也就是说,Watcher通知非常简单,只告诉客户端发生了事件,而不说明具体的内容。

    • 例如针对NodeDataChanged时间ZooKeeper的Watcher只通知客户端指定数据节点的数据内容发生了变更,而对于原始数据以及变更后的数据都无法直接获取,需要主动获取,这也是Watcher机制的一个重要特性
  • 另外客户端向服务端注册Watcher时候,并不会吧客户端真实的watcher对象传到服务端,仅仅是在客户端请求中使用boolean类型属性标记,同事服务端也只保存当前连接的ServerCnxn对象
  • 轻量级设计使Watcher机制在网络开销和服务端内存开销上都非常廉价。

上一篇Zookeeper–Watcher机制源码剖析一
下一篇Zookeeper实践与应用- Canal

Zookeeper--Watcher机制源码剖析二相关推荐

  1. zookeeper watch java_Apache ZooKeeper Watcher 机制源码解释

    分布式系统从根本上来说就是不同节点上的进程并发执行,并且相互之间对进程的行为进行协调处理的过程.不同节点上的进程互相协调行为的过程叫做分布式同步.许多分布式系统需要一个进程作为任务的协调者,执行一些其 ...

  2. Zookeeper--Watcher机制源码剖析一

    Watcher-- 数据变更通知 我们知道Zookeeper提供来分布式数据的订阅/发布功能,一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能让多个订阅者同时监听某个主题对象,当这个被监听对 ...

  3. Spark存储机制源码剖析

    一.Shuffle结果的写入和读取 通过之前的文章Spark源码解读之Shuffle原理剖析与源码分析我们知道,一个Shuffle操作被DAGScheduler划分为两个stage,第一个stage是 ...

  4. Django Rest Framework源码剖析(二)-----权限

    一.简介 在上一篇博客中已经介绍了django rest framework 对于认证的源码流程,以及实现过程,当用户经过认证之后下一步就是涉及到权限的问题.比如订单的业务只能VIP才能查看,所以这时 ...

  5. pymavlink 源码剖析(二)之生成代码

    文章目录 1 引言 2 C 代码生成 3 generate_one 函数分析 4 MAVTemplate 5 头文件生成 相关: pymavlink 源码剖析(一)之XML文件的数据解析 MAVLin ...

  6. Android多线程:深入分析 Handler机制源码(二)

    前言 在Android开发的多线程应用场景中,Handler机制十分常用 接下来,深入分析 Handler机制的源码,希望加深理解 目录 1. Handler 机制简介 定义 一套 Android 消 ...

  7. as工程放到源码编译_Flutter源码剖析(二):源码的阅读与调试环境配置

    综述 Flutter从架构上来说有3部分: 用Dart写的Framework层,面向开发者 用Java/Kotlin写的Embdder层(For Android,iOS是OC/Swift),纯Flut ...

  8. flutter 真机无法调试 sdk报错_Flutter源码剖析(二):源码的阅读与调试环境配置

    综述 Flutter从架构上来说有3部分: 用Dart写的Framework层,面向开发者 用Java/Kotlin写的Embdder层(For Android,iOS是OC/Swift),纯Flut ...

  9. roid IPC 通讯机制源码分析 二 .

    Client A与Binder kernel通信: kernel\drivers\android\Binder.c) static int binder_open(struct inode *nodp ...

最新文章

  1. Redis 笔记(14)— 持久化及数据恢复(数据持久方式 RDB 和 AOF、数据恢复、混合持久化)
  2. Easy Problem 7 求反数字字符串
  3. 研究人员发现物联网存在安全漏洞
  4. 视觉SLAM总结——视觉特征子综述
  5. chrome 悬停大图插件_Google Chrome浏览器的悬停卡:我不想要的我最喜欢的新东西
  6. 信息学奥赛一本通(1068:与指定数字相同的数的个数)
  7. 【重难点】【RabbitMQ 02】如何避免消息重复投递和消息重复消费、如何防止消息丢失、如何保证消息的顺序性、如何保证消息队列的可用性
  8. 专题三、ArrayList遍历方式以及效率比较
  9. AWS中国EC2 80端口无法访问
  10. 360极速浏览器查看保存密码的操作步骤
  11. already opened by ClassLoader
  12. 担任电气工程师,在软件开发方面建立成功的职业生涯
  13. Mac电脑为什么适合编程?
  14. jsp页面获取到后台的cookie(记住我)
  15. 十二、Hi3556移植RTL8189 WIFI驱动
  16. 万字长文:盘点2022全球10大数据泄漏事件(红蓝攻防角度)
  17. 股票资管软件和股票跟单软件的代码是一样的可以共用
  18. Java 中各种空(''、\u0000、null)的区别?
  19. 关于ACM比赛的感悟
  20. 有 ABCD 四个人要在夜里过一座桥,他们通过这座桥分别需要耗时 1、2、5、10 分钟,现在只有一支手电,过桥时必须带有手电,并且同时最多只能两个人一起过桥。请问如何安排能够让四个人尽快都过桥。

热门文章

  1. 晋中学院计算机考研,晋中学院有多少人死在考研路上
  2. 掌握这个姿势,女友不再叨叨叨
  3. 岛国小姐姐来例假时,男朋友背着她偷偷查手机......
  4. 5部适合学英语的动画电影,快和孩子一起看!
  5. 黑科技轮胎:有能发电的,脑洞简直不要太大...
  6. STEAM教育风口正劲,如何培养STEAM思维?
  7. 原来这些行业的“潜规则”是这样的...
  8. mysql插入时间区间_mybatis插入数据时返回主键以及MySQL根据时间区间查询问题总结...
  9. 打印机一直显示正在打印中_中国和桌面3D打印机正在引领3D打印市场
  10. oracle挂证多少钱一个月_惊呆,一条sql竟然把Oracle搞挂了