监听机制是Zookeeper的一个重要特性,例如:Zookeeper实现的高可用集群、分布式锁,就利用到了这一特性。

在Zookeeper被监听的结点对象/信息发生了改变,就会触发监听机制,通知注册者。

注册监听机制

创建客户端,创建默认监听器

在创建zookeeper客户端实例时,需要下列参数。

new ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

三个参数分别的含义为:

connectString 服务端地址 sessionTimeout:超时时间 Watcher:监控器

这个 Watcher 将作为整个 ZooKeeper 会话期间的上下文 ,一直被保存在客户端 ZKWatchManager 的 defaultWatcher 中,==在开启对某个节点或信息的监控后,但是并没有指定额外的监控器==,则会默认调用这个监控器的方法。

对指定结点进行特殊监听处理

除此之外,ZooKeeper 客户端也可以通过 getData、exists 和 getChildren 三个接口来向 ZooKeeper 服务器注册 Watcher,从而方便地在不同的情况下添加 Watch 事件:

getData(String path, Watcher watcher, Stat stat)

Zookeeper只能在成功连接上客户端后,才能使得监控机制起作用;且仅支持4种事件的监听。

  1. 结点的增加
  2. 结点的删除
  3. 结点所携带信息的更改
  4. 结点的子结点的更改

底层原理

Zookeeper监听机制是观察者模式实现的。

在观察者模式中,最重要的一个属性就是需要一个列表用于保存观察者。

而在Zookeeper监听机制中,也实现了这个一个列表,在客户端和服务端分别维护了ZKWatchManager和WatchManager。

客户端Watch注册实现过程

在发送一个Watch事件的会话请求时,Zookeeper客户端主要做了两件事

  • 标记该会话是一个带有 Watch 事件的请求
  • 将 Watch 事件存储到 ZKWatchManager

以 getData 接口为例。当发送一个带有 Watch 事件的请求时,客户端首先会把该会话标记为带有 Watch 监控的事件请求,之后通过 DataWatchRegistration 类来保存 watcher 事件和节点的对应关系:

public byte[] getData(final String path, Watcher watcher, Stat stat){    ...    WatchRegistration wcb = null;    // 如果watcher不为null,即有watcher对象    if (watcher != null) {        wcb = new DataWatchRegistration(watcher, clientPath);    }    RequestHeader h = new RequestHeader();    // 标记请求为带有监听器的    request.setWatch(watcher != null);    ...    GetDataResponse response = new GetDataResponse();    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);}

之后客户端向服务器发送请求时,是将请求封装成一个 Packet 对象,并添加到一个等待发送队列 outgoingQueue 中:

public Packet queuePacket(RequestHeader h, ReplyHeader r,...) {    Packet packet = null;    ...    packet = new Packet(h, r, request, response, watchRegistration);    ...    outgoingQueue.add(packet);     ...    return packet;}

最后,ZooKeeper 客户端就会向服务器端发送这个请求,完成请求发送后。调用负责处理服务器响应的 SendThread 线程类中的 readResponse 方法接收服务端的回调,并在最后执行 finishPacket()方法将 Watch 注册到 ZKWatchManager 中:

private void finishPacket(Packet p) {    int err = p.replyHeader.getErr();    if (p.watchRegistration != null) {        p.watchRegistration.register(err);    }    ...}

服务端 Watch 注册实现过程

Zookeeper 服务端处理 Watch 事件基本有 2 个过程:

  • 解析收到的请求是否带有 Watch 注册事件
  • 将对应的 Watch 事件存储到 WatchManager

服务端 Watch 事件的触发过程

以 setData 接口即“节点数据内容发生变更”事件为例。

在 setData 方法内部执行完对节点数据的变更后,会调用 WatchManager.triggerWatch 方法触发数据变更事件。

Set triggerWatch(String path, EventType type...) {    WatchedEvent e = new WatchedEvent(type,                                      KeeperState.SyncConnected, path);    Set watchers;    synchronized (this) {        watchers = watchTable.remove(path);        ...            for (Watcher w : watchers) {                Set paths = watch2Paths.get(w);                if (paths != null) {                    paths.remove(path);                }            }    }    for (Watcher w : watchers) {        if (supress != null && supress.contains(w)) {            continue;        }        w.process(e);    }    return watchers;}

watchers与paths的关系:

双向绑定关系。

由于zk的监听机制是一次性的(触发即销毁),当path2触发了监听事件后,立马从watchTable中销毁监听事件,获取watchers;并且path2结点的事件已经出发了,所以也要将每个watcher对应的paths中去除path2;然后调用watchers中每个watcher的process()函数完成一次监听回调。

客户端回调的处理过程

SendThread

此方法是客户端用于处理服务端的统一请求,replyHdr.getXid()值为-1时,则响应为通知类型的信息,最后调用eventThread.queueEvent()将事件交由eventThread处理。

if (replyHdr.getXid() == -1) {    ...    WatcherEvent event = new WatcherEvent();    event.deserialize(bbia, "response");    ...    if (chrootPath != null) {        String serverPath = event.getPath();        if(serverPath.compareTo(chrootPath)==0)            event.setPath("/");            ...            event.setPath(serverPath.substring(chrootPath.length()));            ...    }    WatchedEvent we = new WatchedEvent(event);    ...    eventThread.queueEvent( we );}

EventThread

根据触发的事件类型,去监听器列表查询对应的路径所对应的监听器,并统一放到集合result中,由于Zookeeper事件是一次触发即销毁,所以也要从watchManager中移除监听器。

public Set materialize(...){Set result = new HashSet();...switch (type) {    ...case NodeDataChanged:case NodeCreated:    synchronized (dataWatches) {        addTo(dataWatches.remove(clientPath), result);    }    synchronized (existWatches) {        addTo(existWatches.remove(clientPath), result);    }    break;    ....}return result;}

完成了对监听器的取出后,将查询到Watcher放到对应的waitEvents任务队列中,调用 EventThread 类中的 run 方法对事件进行处理。

而处理事件,无非就是执行我们注册事件时,写下的process()函数。

总结

Zookeeper的监听机制是基于观察者模式设计的。其方式就是通过在客户端和服务端都维护一张表(zkWatcherManager、watcherManager),用于存放监听器对象。

注册监听器过程,就是在调用接口的过程,将监听器进行注册,首先在本地客户端进行一个注册管理,然后传递服务端之后,又根据是否含有监听器,在服务端进行注册管理。

触发监听事件的过程:

  1. 服务端,通过触发的路径path,通过watcherManager找到对应的监听器集合,通过调用process()方法将信息发送至每个监听器原来的客户端;
  2. 客户端,通过判断是否是通知事件,通过zkWatcherManager找到对应的监听器集合,通过调用process()方法将执行对应的应答处理。

Watcher的客户端实现和服务端使用不同的实现

当在监听事件触发之后,客户端和服务端几乎都做了同样的事(通过Path找到Watcher然后执行process()),但是他们做的事不同的事,服务端的Watcher的process()的作用是将path和触发的event发送至客户端,然后再次通过path和event找到watcher执行process(),这时候,执行的代码即为开发者所需要执行的==监听事件对应的应答处理process()==。

思考 -> 为什么Zookeeper要维护两份Watcher清单(zkWatcherManager + WatcherManager)?

使用反证法,来证明这样设计的优秀之处。

  • 假设只在客户端维护Watcher清单,当服务端的事件触发之后,服务端没有Watcher清单,不知道是哪几个客户端订阅了这个事件,只能将事件发送给所有的客户端,既浪费了带宽,也浪费了客户端处理响应的资源。
  • 假设只在服务端维护Watcher清单,当服务端的事件触发之后,服务端发送给订阅了该事件的客户端,客户端确会因为没有Watcher对象,而无法执行对应的事件应答处理,导致需要服务端将对应的处理方法,通过网络传递,则会加重网络的传输压力。

作者:eddieVim
链接:https://juejin.cn/post/6903693927434420232

zookeeper源码_阿里P8带你从源码级别——深挖Zookeeper监听机制相关推荐

  1. Spring5源码 - 13 Spring事件监听机制_@EventListener源码解析

    文章目录 Pre 概览 开天辟地的时候初始化的处理器 @EventListener EventListenerMethodProcessor afterSingletonsInstantiated 小 ...

  2. Spring5源码 - 12 Spring事件监听机制_异步事件监听应用及源码解析

    文章目录 Pre 实现原理 应用 配置类 Event事件 事件监听 EventListener 发布事件 publishEvent 源码解析 (反推) Spring默认的事件广播器 SimpleApp ...

  3. Spring5源码 - 11 Spring事件监听机制_源码篇

    文章目录 pre 事件监听机制的实现原理[观察者模式] 事件 ApplicationEvent 事件监听者 ApplicationEvent 事件发布者 ApplicationEventMultica ...

  4. 物联网云监控平台设备管理iot源码,MQTT/ONENET带APP端源码

    大型物联网平台全套源码 物联网云监控IOT设备管理源码带APP端 开发语言:PHP 数据库:MYSQL 开发工具:phpstrom 源码类型:全开源免费分享 物联网云监控WEB设备管理iot源码,MQ ...

  5. 优酷视频怎么转二维码_优酷视频转二维码

    有的人想将自己制作的视频分享出去,那么该如何将视频转换成二维码来分享呢?其实方法很简单,直接在视频界面中就能生成,这里和大家讲讲. 01. 登录自己的优酷帐号,然后点击右上角的上传按钮. 优酷视频怎么 ...

  6. Apache ZooKeeper - 事件监听机制详解

    文章目录 事件监听机制命令 Zookeeper事件类型 实操 -w get -w /path 监听节点数据的变化 ls -w /path 监听子节点的变化(增,删) [监听目录] ls -w /pat ...

  7. Ⅵ:zookeeper的Watcher事件监听机制

    2021最新zookeeper系列 ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤️ ❤ ...

  8. 【zookeeper】zookeeper 的监听机制

    文章目录 1.概述 2. shell案例 1.概述 视频:zookeeper znode 存储系统解密 上一篇文章:[zookeeper]zookeeper znode 存储系统解密 许多大数据组件都 ...

  9. springcloud是什么_阿里P8道出,入职阿里必会199道SpringCloud面试题,你能掌握多少?...

    前言 Spring Cloud 自 2016 年 1 月发布第一个 Angel.SR5 版本,到目前 2020 年 3 月发布 Hoxton.SR3 版本,已经历经了 4 年时间.这 4 年时间里,S ...

  10. 单独组件_阿里P8年薪百万大牛-教你打造一个Android组件化开发框架

    作者简介 本篇来自 lucky_billy 的投稿,分享了他的开源组件化框架,详细地讲解框架形成的思路,希望对大家有所帮助. lucky_billy 的博客地址: http://blog.csdn.n ...

最新文章

  1. ntdll 异常代码0xc0000374_不要把异常当做业务逻辑,这性能可能你无法承受
  2. JAVA微服务框架,Jeecg-P3 1.0.0 重构版本发布
  3. eclipse打包项目为aar_新生日常牢骚之作业打包
  4. 字典排序什么意思_列表及字典的排序
  5. RK30SDK系统重启源码分析
  6. python列表、元组、字典、集合区别及他们之间的转换(超全)
  7. laravel 跨域解决方案
  8. Unity官网注册账号绑定手机国籍更改失败
  9. 技术栈(technology stack)
  10. c语言中calloc是什么意思,calloc(c语言calloc是什么意思)
  11. 攻防视角下的信息收集
  12. 怎么使用Navicat连接数据库?
  13. 名帖63 欧阳询 楷书《九成宫醴泉铭》
  14. python爬取网站的图片
  15. 软件测试如何编写计划文档
  16. 闵帆老师《论文写作》课学习心得
  17. 招标采购腐败与欺诈行为有哪些?该如何预防?
  18. Linux倒序赋值用molloc函数,请教一个C语言函数malloc的问题
  19. iOS-检测 iOS 系统网络权限被关闭
  20. Java练习题——超市贴花

热门文章

  1. java date sethours,如何替换不推荐使用的方法Date.setHours(int)?
  2. Ubuntu 更改文件夹权限
  3. 《别输在不会表达上》— 综合素质提升书籍
  4. Eclipse 中如何设置字体大小与样式
  5. HTML如何引入外部JS文件
  6. 7-7 整数的分类处理 (20 分)
  7. rk3399_secureboot在linux环境中操作说明
  8. 八大排序算法思想介绍
  9. 使用单例时的三种单例写法
  10. java中的IO流之文件复制