znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的集中管理,集群管理,分布式锁等等。

知识准备:

zookeeper定义的状态有:

Unknown (-1),Disconnected (0),NoSyncConnected (1),SyncConnected (3),AuthFailed (4),ConnectedReadOnly (5),SaslAuthenticated(6),Expired (-112);

事件定义的的类型有:None (-1),NodeCreated (1),NodeDeleted (2),NodeDataChanged (3),NodeChildrenChanged (4),DataWatchRemoved (5),ChildWatchRemoved (6);

watcher定义的的类型有Children(1), Data(2), Any(3);

在上一篇

zookeeper源码分析之一客户端

中,我们连接zookeeper时,启动了一个MyWatcher

protected void connectToZK(String newHost) throws InterruptedException, IOException {if (zk != null && zk.getState().isAlive()) {zk.close();}host = newHost;boolean readOnly = cl.getOption("readonly") != null;if (cl.getOption("secure") != null) {System.setProperty(ZooKeeper.SECURE_CLIENT, "true");System.out.println("Secure connection is enabled");}
        zk = new ZooKeeper(host,Integer.parseInt(cl.getOption("timeout")),new MyWatcher(), readOnly);}

创建zookeeper示例时,使用到watchManager:

    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly, HostProvider aHostProvider)throws IOException {LOG.info("Initiating client connection, connectString=" + connectString+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);watchManager = defaultWatchManager();watchManager.defaultWatcher = watcher;ConnectStringParser connectStringParser = new ConnectStringParser(connectString);hostProvider = aHostProvider;        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),hostProvider, sessionTimeout, this, watchManager,getClientCnxnSocket(), canBeReadOnly);cnxn.start();}

将传进来的MyWatcher作为默认watcher,存入watchManager,然后通过ClientCnxn包装后,启动线程。

  那我们先了解一下ClientCnxn吧,ClientCnxn管理客户端socket的io,它维护了一组可以连接上的server及当需要转换时可以透明的转换到的一组server。

先了解一下如何获取socket的吧:

    private static ClientCnxnSocket getClientCnxnSocket() throws IOException {String clientCnxnSocketName = System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);if (clientCnxnSocketName == null) {clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();}try {return (ClientCnxnSocket) Class.forName(clientCnxnSocketName).newInstance();} catch (Exception e) {IOException ioe = new IOException("Couldn't instantiate "+ clientCnxnSocketName);ioe.initCause(e);throw ioe;}}

  接着启动ClientCnxn的start()方法,在此方法中启动了两个线程:

    public void start() {sendThread.start();eventThread.start();}

其中SendThread类为发送的请求队列提供服务,并且产生心跳。它同时也产生ReadThread。

我们看一下SendThread的run方法的主体:

                    if (!clientCnxnSocket.isConnected()) {// don't re-establish connection if we are closingif (closing) {break;}startConnect();clientCnxnSocket.updateLastSendAndHeard();}if (state.isConnected()) {// determine whether we need to send an AuthFailed event.if (zooKeeperSaslClient != null) {boolean sendAuthEvent = false;if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {try {zooKeeperSaslClient.initialize(ClientCnxn.this);} catch (SaslException e) {LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);state = States.AUTH_FAILED;sendAuthEvent = true;}}KeeperState authState = zooKeeperSaslClient.getKeeperState();if (authState != null) {if (authState == KeeperState.AuthFailed) {// An authentication error occurred during authentication with the Zookeeper Server.state = States.AUTH_FAILED;sendAuthEvent = true;} else {if (authState == KeeperState.SaslAuthenticated) {sendAuthEvent = true;}}}if (sendAuthEvent == true) {eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,authState,null));}}to = readTimeout - clientCnxnSocket.getIdleRecv();} else {to = connectTimeout - clientCnxnSocket.getIdleRecv();}if (to <= 0) {String warnInfo;warnInfo = "Client session timed out, have not heard from server in "+ clientCnxnSocket.getIdleRecv()+ "ms"+ " for sessionid 0x"+ Long.toHexString(sessionId);LOG.warn(warnInfo);throw new SessionTimeoutException(warnInfo);}if (state.isConnected()) {//1000(1 second) is to prevent race condition missing to send the second ping//also make sure not to send too many pings when readTimeout is small int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVALif (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {sendPing();clientCnxnSocket.updateLastSend();} else {if (timeToNextPing < to) {to = timeToNextPing;}}}// If we are in read-only mode, seek for read/write serverif (state == States.CONNECTEDREADONLY) {long now = Time.currentElapsedTime();int idlePingRwServer = (int) (now - lastPingRwServer);if (idlePingRwServer >= pingRwTimeout) {lastPingRwServer = now;idlePingRwServer = 0;pingRwTimeout =Math.min(2*pingRwTimeout, maxPingRwTimeout);pingRwServer();}to = Math.min(to, pingRwTimeout - idlePingRwServer);} clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);

ClientCnxnSocketNetty实现了ClientCnxnSocket的抽象方法,它负责连接到server,读取/写入网络流量,并作为网络数据层和更高packet层的中间层。其生命周期如下:

     loop:- try:- - !isConnected()- - - connect()- - doTransport()- catch:- - cleanup()close()

从上述描述中,我们可以看到ClientCnxnSocket的工作流程,先判断是否连接,没有连接则调用connect方法进行连接,有连接则直接使用;然后调用doTransport方法进行通信,若连接过程中出现异常,则调用cleanup()方法;最后关闭连接。故最主要的流程为doTransport()方法:

 @Overridevoid doTransport(int waitTimeOut,List<Packet> pendingQueue,ClientCnxn cnxn)throws IOException, InterruptedException {try {if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {return;}Packet head = null;if (needSasl.get()) {if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {return;}} else {if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {return;}}// check if being waken up on closing.if (!sendThread.getZkState().isAlive()) {// adding back the patck to notify of failure in conLossPacket().
                addBack(head);return;}// channel disconnection happenedif (disconnected.get()) {addBack(head);throw new EndOfStreamException("channel for sessionid 0x"+ Long.toHexString(sessionId)+ " is lost");}if (head != null) {doWrite(pendingQueue, head, cnxn);}} finally {updateNow();}}

我们简化一下上面的程序,一个是异常处理addBack(head),另一个正常流程处理doWrite(pendingQueue, head, cnxn),我们先抛掉异常,走正常流程看看:

先获取Packet:

Packet head = null;if (needSasl.get()) {if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {return;}} else {if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {return;}}

其中,protected LinkedBlockingDeque<Packet> outgoingQueue是一个链表阻塞队列,保存发出的请求;

然后执行doWrite方法:

 /*** doWrite handles writing the packets from outgoingQueue via network to server.*/private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {updateNow();while (true) {if (p != WakeupPacket.getInstance()) {if ((p.requestHeader != null) &&(p.requestHeader.getType() != ZooDefs.OpCode.ping) &&(p.requestHeader.getType() != ZooDefs.OpCode.auth)) {p.requestHeader.setXid(cnxn.getXid());synchronized (pendingQueue) {pendingQueue.add(p);}}              sendPkt(p);}if (outgoingQueue.isEmpty()) {break;}p = outgoingQueue.remove();}}

dowrite方法负责将outgoingQueue的报文通过网络写到服务器上。发送报文程序如上红色所示:

    private void sendPkt(Packet p) {// Assuming the packet will be sent out successfully. Because if it fails,// the channel will close and clean up queues.
        p.createBB();updateLastSend();sentCount++;channel.write(ChannelBuffers.wrappedBuffer(p.bb));}

1. Packet报文的结构如下:

 /*** This class allows us to pass the headers and the relevant records around.*/static class Packet {RequestHeader requestHeader;ReplyHeader replyHeader;Record request;Record response;ByteBuffer bb;/** Client's view of the path (may differ due to chroot) **/String clientPath;/** Servers's view of the path (may differ due to chroot) **/String serverPath;boolean finished;AsyncCallback cb;Object ctx;WatchRegistration watchRegistration;public boolean readOnly;WatchDeregistration watchDeregistration;/** Convenience ctor */Packet(RequestHeader requestHeader, ReplyHeader replyHeader,Record request, Record response,WatchRegistration watchRegistration) {this(requestHeader, replyHeader, request, response,watchRegistration, false);}Packet(RequestHeader requestHeader, ReplyHeader replyHeader,Record request, Record response,WatchRegistration watchRegistration, boolean readOnly) {this.requestHeader = requestHeader;this.replyHeader = replyHeader;this.request = request;this.response = response;this.readOnly = readOnly;this.watchRegistration = watchRegistration;}public void createBB() {try {ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);boa.writeInt(-1, "len"); // We'll fill this in laterif (requestHeader != null) { requestHeader.serialize(boa, "header");}if (request instanceof ConnectRequest) {request.serialize(boa, "connect");// append "am-I-allowed-to-be-readonly" flagboa.writeBool(readOnly, "readOnly");} else if (request != null) {request.serialize(boa, "request");}baos.close();this.bb = ByteBuffer.wrap(baos.toByteArray());this.bb.putInt(this.bb.capacity() - 4);this.bb.rewind();} catch (IOException e) {LOG.warn("Ignoring unexpected exception", e);}}@Overridepublic String toString() {StringBuilder sb = new StringBuilder();sb.append("clientPath:" + clientPath);sb.append(" serverPath:" + serverPath);sb.append(" finished:" + finished);sb.append(" header:: " + requestHeader);sb.append(" replyHeader:: " + replyHeader);sb.append(" request:: " + request);sb.append(" response:: " + response);// jute toString is horrible, remove unnecessary newlinesreturn sb.toString().replaceAll("\r*\n+", " ");}}

从createBB方法中,我们看到在底层实际的网络传输序列化中,zookeeper只会讲requestHeader和request两个属性进行序列化,即只有这两个会被序列化到底层字节数组中去进行网络传输,不会将watchRegistration相关的信息进行网络传输。

2. 更新最后一次发送updateLastSend

    void updateLastSend() {this.lastSend = now;}

3. 使用nio channel 发送字节缓存到server

channel.write(ChannelBuffers.wrappedBuffer(p.bb));

其中,bb的类型为ByteBuffer,在packet中进行了初始化。

                this.bb = ByteBuffer.wrap(baos.toByteArray());this.bb.putInt(this.bb.capacity() - 4);this.bb.rewind();

小结:

zookeeper客户端和服务器的连接主要是通过ClientCnxnSocket来实现的,有两个具体的实现类ClientCnxnSocketNetty和ClientCnxnSocketNIO,其工作流程如下:

  先判断是否连接,没有连接则调用connect方法进行连接,有连接则进入下一步;

  然后调用doTransport方法进行通信,若连接过程中出现异常,则调用cleanup()方法;

  最后关闭连接。

上述的发现可以在SendThread的run方法中体现。

另:Zookeeper的特性--》顺序一致性:按照客户端发送请求的顺序更新数据。我们再sendThread里可以看到多次更新时间戳来保证顺序一致性,如下:

转载于:https://www.cnblogs.com/davidwang456/p/5000927.html

zookeeper源码分析之三客户端发送请求流程相关推荐

  1. Zookeeper源码分析:主从角色关系流程概述

    参考资料 <<从PAXOS到ZOOKEEPER分布式一致性原理与实践>> zookeeper-3.0.0 主从关系概述 在概述了主从角色的初始化流程之后,本文主要来梳理分析一下 ...

  2. 修改拦截器里的请求头_OkHttp4 源码分析(1) 请求流程分析

    square/okhttp​github.com 本文基于OkHttp4.7.1分析 同步请求示例代码 OkHttpClient client = new OkHttpClient.Builder() ...

  3. zookeeper源码分析之五服务端(集群leader)处理请求流程

    leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcesso ...

  4. zookeeper源码分析之四服务端(单机)处理请求流程

    上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析 ...

  5. Zookeeper源码分析(二) ----- zookeeper日志

    zookeeper源码分析系列文章: Zookeeper源码分析(一) ----- 源码运行环境搭建 原创博客,纯手敲,转载请注明出处,谢谢! 既然我们是要学习源码,那么如何高效地学习源代码呢?答案就 ...

  6. 【OkHttp】OkHttp 源码分析 ( 同步 / 异步 Request 请求执行原理分析 )

    OkHttp 系列文章目录 [OkHttp]OkHttp 简介 ( OkHttp 框架特性 | Http 版本简介 ) [OkHttp]Android 项目导入 OkHttp ( 配置依赖 | 配置 ...

  7. zookeeper源码分析之恢复事务日志

    zookeeper源码分析之恢复事务日志 前言 源码分析 查看事务日志命令 总结 前言 本文是基于zookeeper集群启动过程分析(https://blog.csdn.net/weixin_4244 ...

  8. HBase源码分析之HRegion上compact流程分析(三)

    在<HBase源码分析之HRegion上compact流程分析(二)>一文中,我们没有讲解真正执行合并的CompactionContext的compact()方法.现在我们来分析下它的具体 ...

  9. Spark源码分析之Sort-Based Shuffle读写流程

    一 概述 我们知道Spark Shuffle机制总共有三种: # 未优化的Hash Shuffle:每一个ShuffleMapTask都会为每一个ReducerTask创建一个单独的文件,总的文件数是 ...

最新文章

  1. linux_一些shell命令分析记录
  2. leetcode算法题--相交链表
  3. WindowsForm 计算器
  4. 互联网1分钟 | 0121 Vlog陌生人社交APP「自言」为年轻人打造生活视频分享平台;周鸿祎:智能设备要警惕“海豚音攻击”...
  5. 阿里再开源!基于JAVA的模块化开发框架JarsLink
  6. Tomcat配置虚拟内存
  7. Git笔记-Connection reset by 13.229.188.59 port 22 fatal: Could not read from remote repository.
  8. 智能化改造!AI技术在传统企业大有可为!
  9. 2021年终总结2022未来展望——人生天地之间,若白驹过隙,忽然而已
  10. python机器学习:线性回归_房价和房屋尺寸关系的线性拟合
  11. javascript事件机制
  12. 点云的密度 曝光时间_200倍的提速!华人博士生提出大场景三维点云语义分割新框架...
  13. idea springboot启动报SLF4J:Failed to load class “org.slf4j.impl.StaticLoggerBinder”
  14. kali Linux桌面环境切换
  15. Verilog——三角波发生器(状态机)
  16. Adobe illustrator/Ai 2019 软件安装包
  17. pg_freespacemap
  18. codeforces-750【C思维】
  19. 硬件描述语言实验五:四位加法器实验
  20. 美国计算机游戏设计专业排名,2018美国游戏设计留学院校排名

热门文章

  1. c++基础——程序流程结构之选择结构
  2. matlab找数据的转账点,nodejs开发EOS转账服务的两种方案
  3. 安卓中radiobutton不进入监听事件_Laravel模型事件的实现原理详解
  4. python安装pyqt5 qml_PyQt5:PyQt5程序打包2
  5. docker 不包含依赖 打包_从零开始学K8s: 4.Docker是什么
  6. 一部分 数据 迁移_软件测试员12小时惊魂记:数据库迁移出大事故,如何测试?...
  7. linux 实验2 进程创建,实验2Linux进程控制与通信
  8. Qt中的QPrintDialog
  9. C++中函数重载分析
  10. 建模的常用手段:组合与聚合