Watcher-- 数据变更通知

  • 我们知道Zookeeper提供来分布式数据的订阅/发布功能,一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能让多个订阅者同时监听某个主题对象,当这个被监听对象自身状态发生变化时候,会通知所有订阅者,Zookeeper中引入了Watcher机制来实现这种分布式通知功能,Zookeeper允许客户端向服务器节点注册一个Watcher监听,当服务器端节点发生指定触发的事件就会触发这个Watcher,之后服务端会向指定客户端发送一个事件通知,这样来实现一个分布式通知的功能,如下图所示的一个流程:
  • 上图中流程,Zookeeper的Watcher机制主要包括客户端线程,客户端WatcherManager,和Zookeeper服务器,流程上简单的说:
    • 客户端向Zookeeper服务器注册成功Watcher同时,将Watcher对象存储在客户端的WatcherManager
    • 当Zookeeper服务器触发Watcher事件后,向客户端发送通知
    • 客户端线程从WatcherManager中捞出对应的Watcher对象来执行回调逻辑

Watcher接口

  • 在Zookeeper中,接口Watcher表示一个标准的事件处理器,订阅来通知相关的逻辑,我们可以看他的源码:

    • EventType:事件类型
    • KeeperState:通知状态
    • Process(WatchedEvent event):会调方法
  • 其中事件类型和通知状态是有对应关系,如下表中所示
KeeperState EventType 触发条件解释 说明
SyncConnected None 客户端与服务器成功建立连接 客户端和服务器处于连接状态
SyncConnected NodeCreated Watcher 监听的对应数据节点成功创建 客户端和服务器处于连接状态
SyncConnected NodeDeleted Watcher监听的数据节点成功删除 客户端和服务器处于连接状态
SyncConnected NodeDataChanged Watcher监听的数据节点内容变更 客户端和服务器处于连接状态
SyncConnected NodeChildrenChanged Watcher监听的对应数据节点列表发生变更 客户端和服务器处于连接状态
Disconnected None 客户端与Zookeeper服务器断开连接 客户端和服务器断开了连接
Expired None 会话超时 客户端回话失效,通常同时也会收到SessionExpiredException异常
AuthFailed None 两种情况:使用错误scheme进行权限检查, SASL权限检查失败 通常同时收到AuthFailedException异常
Unknown 3.1.0后废弃
NoSYncConnected 3.1.0后废弃
  • 如上列举了Zookeeper中常见的几个通知状态和事件类型,其中针对NodeDateChange事件说明的节点的变更并不一定是内容变化,可能版本号DataVersion变化也是一样会触发。

  • 回调方法process 是Watcher接口中的一个回调方法,当Zookeeper服务器端向客户端发送一个Watcher事件通知的时候,客户端会对相应的Process方法进行回调,从而实现对事件的处理,Process方法定义如下

 void process(WatchedEvent event);
  • 如上参数WatcherEvent包含了一个事件的基本属性:
public class WatchedEvent {private final KeeperState keeperState; //通知状态private final EventType eventType; //  事件类型private String path; //   节点路径/*** Create a WatchedEvent with specified type, state and path*/public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {this.keeperState = keeperState;this.eventType = eventType;this.path = path;}
......
}
  • Zookeeper服务端生成WatchedEvent事件后会调用getWrapper方法将字节包装成一个可序列化的WatcherEvent,其实这是一个事务,都是对服务端事件的一个封装,不同的是WatchedEvent是我们逻辑事件中的一个对象,主要用来我们程序内部的事件容器,而WatcherEvent因为实现了序列化的接口,因此可以用于网络传输
  • 在服务端得到WatcherEvent后,通过网络传到客户端,还原成一个WatchedEvent,并传递给process,然后process方法根据入参就可以解析完整的服务端事件了。

工作机制

  • Zookeeper的Watcher机制可以有如下三个过程:

    • 客户端注册Watcher
    • 服务端处理watcher
    • 客户端回调Watcher
  • 以下类图说明各组件之间的关系:
客户端注册Watcher
  • 我们通过如下部分源码来分析Watcher的客户端注册,我们创建一个Zookeeper的客户端对象实例时,可以向构造方法中传入一个默认的Watcher:
//我们调用的方法
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {this(connectString, sessionTimeout, watcher, false);
}
//实际上初始化的方法
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {LOG.info("Initiating client connection, connectString={} sessionTimeout={} watcher={}", new Object[]{connectString, Integer.valueOf(sessionTimeout), watcher});if(clientConfig == null) {clientConfig = new ZKClientConfig();}this.clientConfig = clientConfig;this.watchManager = this.defaultWatchManager();this.watchManager.defaultWatcher = watcher;ConnectStringParser connectStringParser = new ConnectStringParser(connectString);this.hostProvider = aHostProvider;this.cnxn = this.createConnection(connectStringParser.getChrootPath(), this.hostProvider, sessionTimeout, this, this.watchManager, this.getClientCnxnSocket(), canBeReadOnly);this.cnxn.start();}
  • 如上源码中我们给定的Watcher对象实际上被保存在客户端ZKWatcherManager的defaultWatcher中,另外Zookeeper客户端也可以通过getData,getChildren,exist三个接口来向Zookeeper服务器注册Watcher,无论哪一种都一样,我们用getData方法的源码来分析:
 public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {PathUtils.validatePath(path); //校验Path格式正确性ZooKeeper.DataWatchRegistration wcb = null;if(watcher != null) {wcb = new ZooKeeper.DataWatchRegistration(watcher, path);//封装DataWatchRegistration}String serverPath = this.prependChroot(path);RequestHeader h = new RequestHeader();h.setType(4);GetDataRequest request = new GetDataRequest();request.setPath(serverPath);request.setWatch(watcher != null);GetDataResponse response = new GetDataResponse();ReplyHeader r = this.cnxn.submitRequest(h, request, response, wcb);if(r.getErr() != 0) {throw KeeperException.create(Code.get(r.getErr()), path);} else {if(stat != null) {DataTree.copyStat(response.getStat(), stat);}return response.getData();}}
  • 如上源码中参数Path, Watcher对象,getData接口注册Watcher后,做了两件事情

    • 先用这两个参数封装来一个DataWatchRegistration,其实就是初始化来Zookeeper服务器中的WatchRegistration里面的 watcher,clientPath,这部分用来暂时存储注册信息保存节点和Watcher的对应关系
    • 接着会向客户端请求request进行标记,将其设置为“使用watcher监听”。
  • 接着继续往下SubmitRequest方法:
ReplyHeader r = this.cnxn.submitRequest(h, request, response, wcb);
public ReplyHeader submitRequest(RequestHeader h,Record request,Record response,WatchRegistration watchRegistration,WatchDeregistration watchDeregistration) throws InterruptedException {ReplyHeader r = new ReplyHeader();Packet packet = queuePacket(h,r,request,response,null, null,null, null, watchRegistration,  watchDeregistration);......return r;}
  • 这个步骤中又一次将ClientCnxn中的WatchRegistration封装到Packet中,Zookeeper中,Packet可以被看作是一个最小通信协议单元,用于进行客户端与服务器之间的网络传输,任何需要传输的对象都需要包装成一个Packet对象,接着他被放入发送队列,如下queuePacket代码:
public Packet queuePacket(RequestHeader h,ReplyHeader r,Record request,Record response,AsyncCallback cb,String clientPath,String serverPath,Object ctx,WatchRegistration watchRegistration,WatchDeregistration watchDeregistration) {Packet packet = null;// Note that we do not generate the Xid for the packet yet. It is// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),// where the packet is actually sent.packet = new Packet(h, r, request, response, watchRegistration);packet.cb = cb;packet.ctx = ctx;packet.clientPath = clientPath;packet.serverPath = serverPath;packet.watchDeregistration = watchDeregistration;// The synchronized block here is for two purpose:// 1. synchronize with the final cleanup() in SendThread.run() to avoid race// 2. synchronized against each packet. So if a closeSession packet is added,// later packet will be notified.synchronized (state) {......outgoingQueue.add(packet);....}
  • 我们继续追这个outgoingQueue 队列,可以看到随后Zookeeper客户端会向服务器端发送这个请求,同时等待请求的返回,王朝请求发送后,会由客户端的SendThread线程的readResponse方法负责接受来自服务端的响应,finishPacket方法会从Packet中取出对于的Watcher并注册到ZKWatcherManager中去。
protected void finishPacket(Packet p) {int err = p.replyHeader.getErr();if (p.watchRegistration != null) {p.watchRegistration.register(err);}......
  • 如Packet中的Watchregistration就是我们刚才第一步getData中保存的节点对应的Watcher的注册信息。现在他又从这部分中取出来封装的Watcher,如下具体的register方法:
public void register(int rc) {if (shouldAddWatch(rc)) {Map<String, Set<Watcher>> watches = getWatches(rc);synchronized (watches) {Set<Watcher> watchers = watches.get(clientPath);if (watchers == null) {watchers = new HashSet<Watcher>();watches.put(clientPath, watchers);}watchers.add(watcher);}}}//getWatchesprotected final ZKWatchManager watchManager;protected Map<String, Set<Watcher>> getWatches(int rc) {return watchManager.dataWatches;}
  • 如上register方法中客户端将之前暂时保存的Watcher取出来之后,放入到getWatcher获取到的一个Map对象中,这个Mp对象就是ZkWatcherManager中的一个dataWatches,我们将刚才存入WatchRegistration中的临时信息取出用来初始化ZKWatchManager.dataWatches,用于将数据节点的路径和watcher对象进行一一映射,这样就完成来客户端Watcher的注册,整个Watcher流程如下

  • 如上流程中我们每次调用getData都会注册一个Watcher,如果这些Watcher都随着请求发送到服务器的话肯定会内存紧张,现实是这样的码,我们可以看之前代码中负责传输数据的对象Packet中,我们将WatchRegistration封装进去,如下Packet中的序列化方法createBB:

 public void createBB() {try {ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);boa.writeInt(-1, "len"); // We'll fill this in laterif (requestHeader != null) {requestHeader.serialize(boa, "header");//封装requestHeader}if (request instanceof ConnectRequest) {//封装requestrequest.serialize(boa, "connect");// append "am-I-allowed-to-be-readonly" flagboa.writeBool(readOnly, "readOnly");} else if (request != null) {request.serialize(boa, "request");}baos.close();this.bb = ByteBuffer.wrap(baos.toByteArray());this.bb.putInt(this.bb.capacity() - 4);this.bb.rewind();} catch (IOException e) {LOG.warn("Unexpected exception", e);}}
  • 如上源码中可以看到并没有整个对象完全序列化进去,zookeeper只是将requestHeader和request两个属性进行序列化,WatchRegistration并没有被序列化到底层字节数组中,所以不会进行网络传输

服务端处理Watcher

  • 上面讲解了客户端注册Watcher的过程,并且已经了解了最终客户端不会将Watcher对象真正床底到服务器,那么,服务端是怎么样完成客户端的Watcher注册,一下我们对这部分文件进行解析。
ServerCnxn存储
  • 我们先看下服务器接收Watcher并将其存储起来的过程,如下Zookeeper服务端处理Watcher序列图:
  • 我们先从源头分析客户端给了服务器那些信息,如下Zookeeper类中getData方法的源码:
......
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
......
  • 如上RequestHeader中type类型设置的 4 ,request中给定了节点path路径,以及一个boolean类型的watcher标识是否天剑监听。服务端收到来自客户端的请求后,在FinalRequestProcessor.processRequest()中会判断当前请求的类型type来做一个策略来决定处理不同类型的请求,如下源码:
switch (request.type) {......case OpCode.getData: {......Stat stat = new Stat();byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,getDataRequest.getWatch() ? cnxn : null);//为tru则会传递一个 ServerCnxn cnxn = request.cnxn;对象到实际的注册方法中,否则给nullrsp = new GetDataResponse(b, stat);break;}
......}
  • 如上,从getData请求的处理逻辑可以看到当getDataRequest.getwatch为true的时候,Zookeeper就认为当前客户端请求需要进行Watcher注册,于是将当前的ServerCnxn对象和数据节点路径传入getData方法
  • ServerCnxn是一个Zookeeper客户端和服务器之间的链接接口,代表了一个客户端和服务器的链接,ServerCnxn接口默认实现是NIOServerCNxn,同时3.4.0版本开始引入了Netty实现:NettyServerCnxn,都实现了Watcher接口并且实现process接口,所有把他看成一个Watcher对象,如下ServerCnxn对象以及两种process实现
public abstract class ServerCnxn implements Stats, Watcher {......public abstract void process(WatchedEvent event);......
}

  • 继续追getData源码,getZkDataBase获取到的ZKDatabase 对象,其中DataTree 对象是现在Zookeeper现有的节点数据的树形存储,我们可以通过path来从这获取到对应节点信息,如下获取DataNode,初始化节点状态,将DataNode天骄到WatchManager 对象中的WatchTable和watch2Paths中
 byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,getDataRequest.getWatch() ? cnxn : null);
//如下getData实现
public byte[] getData(String path, Stat stat, Watcher watcher)throws KeeperException.NoNodeException {DataNode n = nodes.get(path);if (n == null) {throw new KeeperException.NoNodeException();}synchronized (n) {n.copyStat(stat);if (watcher != null) {dataWatches.addWatch(path, watcher);}return n.data;}}
  • Watchmanager是Zookeeper服务端Watcher的管理者,内部管理的WatcherTable和Watch2Paths,所以一个节点存储了两次,不过是从如下两个未存存储

    • watchTable是从数据节点路径的粒度来托管Watcher
    • watch2Paths是从Watcher的粒度来空值时间触发需要出发的数据节点。
 */
public class WatchManager {private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);private final HashMap<String, HashSet<Watcher>> watchTable =new HashMap<String, HashSet<Watcher>>();private final HashMap<Watcher, HashSet<String>> watch2Paths =new HashMap<Watcher, HashSet<String>>();......}
  • WatcherManager数据结构如下
WatcherManager
- watchTable: HashMap<String, HashSet>(); + watch2Paths :new HashMap<Watcher, HashSet>();
+ addwatch(String ,Watcher): void + removeWatcher(Watcher): void + triggerWatch(String, EventType):Set +Trigger

上一篇Zookeeper–ZAB与Paxos算法联系与区别
下一篇Zookeeper–Watcher机制源码剖析二

Zookeeper--Watcher机制源码剖析一相关推荐

  1. zookeeper watch java_Apache ZooKeeper Watcher 机制源码解释

    分布式系统从根本上来说就是不同节点上的进程并发执行,并且相互之间对进程的行为进行协调处理的过程.不同节点上的进程互相协调行为的过程叫做分布式同步.许多分布式系统需要一个进程作为任务的协调者,执行一些其 ...

  2. Zookeeper--Watcher机制源码剖析二

    Watcher触发 我们从实际操作时候的表现来看Watcher的触发,比如Zookeeper中NodeDataChanged时间的触发是"Watcher监听的对应数据节点的数据内容发生变更& ...

  3. Spark存储机制源码剖析

    一.Shuffle结果的写入和读取 通过之前的文章Spark源码解读之Shuffle原理剖析与源码分析我们知道,一个Shuffle操作被DAGScheduler划分为两个stage,第一个stage是 ...

  4. Swoft 源码剖析 - Swoft 中的注解机制

    作者:bromine 链接:https://www.jianshu.com/p/ef7... 來源:简书 著作权归作者所有,本文已获得作者授权转载,并对原文进行了重新的排版. Swoft Github ...

  5. boost源码剖析之:多重回调机制signal(下)

    boost源码剖析之:多重回调机制signal(下) 刘未鹏 C++的罗浮宫(http://blog.csdn.net/pongba) 在本文的上篇中,我们大刀阔斧的剖析了signal的架构.不过还有 ...

  6. boost源码剖析之:多重回调机制signal(上)

    boost源码剖析之:多重回调机制signal(上) 刘未鹏 C++的罗浮宫(http://blog.csdn.net/pongba) boost库固然是技术的宝库,却更是思想的宝库.大多数程序员都知 ...

  7. 【Spring源码】Spring Transactional事务:传播机制(Propagation) 介绍 和 源码剖析

    [Spring源码]Spring Transactional事务:传播机制(Propagation) 源码剖析 关键词 AMethod调用BMethod,转载BMethod的角度来考虑:站在被调用者的 ...

  8. python源码剖析代码例子_Python源码剖析笔记5-模块机制

    python中经常用到模块,比如import xxx,from xxx import yyy这样子,里面的机制也是需要好好探究一下的,这次主要从黑盒角度来探测模块机制,源码分析点到为止,详尽的源码分析 ...

  9. Python源码剖析笔记5-模块机制

    本文简书地址: http://www.jianshu.com/p/14586ec50ab6 python中经常用到模块,比如import xxx,from xxx import yyy这样子,里面的机 ...

最新文章

  1. linux虚拟机上不了王,虚拟机上安装Linux时出现的问题及解决方法
  2. Linux 下的格式化输出命令:print
  3. ${}和#{}的区别
  4. linux编译boost
  5. 并行数据库 分布式数据库
  6. android两个耳机能连两部手机吗,AirPods使用技巧:如何让耳机同时连接两台手机...
  7. Oracle数据库----视图
  8. ArcGIS利用数据驱动工具条批量出图(python代码)
  9. 使用开源ASR框架在Mono和.NET C#中进行语音识别
  10. centos下安装Anaconda
  11. SSM+汽车销售平台 毕业设计-附源码171619
  12. ROS:Roboware Studio的安装
  13. 地图标识符号大全_资源小结:分省地图查询(9.1版)
  14. COLMAP简明教程 重建 转化深度图 导出相机参数 导入相机参数 命令行
  15. 小鑫の日常系列故事(七)——小纸条
  16. 域名dns污染,如何防治?
  17. amd为什么还用针脚_闲聊CPU针脚 一年一换都怪AMD不给力?
  18. SpringBoot修改启动图标(详细步骤)
  19. 论坛数据库设计初步设计
  20. AM335x SPL

热门文章

  1. 趣学算法之哥德巴赫猜想的实现
  2. sqlite数据库备份还原、导出导入
  3. 【一】Windows API 零门槛编程指南——MessageBox 基本使用及基础讲解
  4. (十)python3 只需3小时带你轻松入门——模块与包
  5. 自定义dialog弹窗html,自定义H5页面dialog弹窗
  6. 男孩子也是要护肤的!!!
  7. 震惊整个世界的新发现,科学界的大骗局
  8. 不懂这25个名词,好意思说你懂大数据?
  9. 中科大量子计算机科学家,中国科学院量子信息重点实验室
  10. python 字符编码处理_浅析Python 字符编码与文件处理