文章目录

  • 2021SC@SDUSC
  • 前言
  • ZooKeeper中网络通信执行流程
  • 总结

2021SC@SDUSC

前言

接下来将进入源码世界来一步一步分析客户端与服务端之间是如何通过ClientCnxn/ServerCnxn来建立起网络通信的。而这次内容将分为三章来讲述,在本章中将介绍客户端是如何将请求发送到服务端的,后两章将分别介绍服务端是如何响应客户端请求的以及客户端收到服务端的响应之后是如何操作的。

ZooKeeper中网络通信执行流程

① 在ZooKeeper的构造函数中,创建了客户端与服务端之间的ClientCnxn交互连接。从而能使客户端发出的请求通过该交互连接传输给服务端,其中createConnection方法返回的是ClientCnxn。

        // 创建客户端连接,并初始化SendThread和EventThread  cnxn = createConnection(connectStringParser.getChrootPath(),hostProvider,sessionTimeout,this,watchManager,getClientCnxnSocket(),canBeReadOnly);// 启动SendThread和EventThread cnxn.start();

创建客户端连接的具体代码如下:

     protected ClientCnxn createConnection(// 客户端路径String chrootPath,// 服务端HostProvider hostProvider,// 会话超时int sessionTimeout,ZooKeeper zooKeeper,// 客户端监听器ClientWatchManager watcher,// 客户端连接SocketClientCnxnSocket clientCnxnSocket,boolean canBeReadOnly) throws IOException {return new ClientCnxn(chrootPath,hostProvider,sessionTimeout,this,watchManager,clientCnxnSocket,canBeReadOnly);}

② sendThread是ClientCnxn的内部类,也是ZooKeeper中的一个线程,核心是run()方法。

(1)在run()方法中,如果客户端连接没有开始创建,那么会调用sendThread()中的startConnect()方法进行异步连接。

        public void run() {clientCnxnSocket.introduce(this, sessionId, outgoingQueue);clientCnxnSocket.updateNow();clientCnxnSocket.updateLastSendAndHeard();int to;long lastPingRwServer = Time.currentElapsedTime();final int MAX_SEND_PING_INTERVAL = 10000; //10 secondsInetSocketAddress serverAddress = null;while (state.isAlive()) {try {// 如果客户端连接没有连接起来if (!clientCnxnSocket.isConnected()) {// don't re-establish connection if we are closingif (closing) {break;}if (rwServerAddress != null) {serverAddress = rwServerAddress;rwServerAddress = null;} else {serverAddress = hostProvider.next(1000);}onConnecting(serverAddress);//异步连接startConnect(serverAddress);// Update now to start the connection timer right after we make a connection attemptclientCnxnSocket.updateNow();clientCnxnSocket.updateLastSendAndHeard();}// 如果客户端连接已经连接上服务端if (state.isConnected()) {// determine whether we need to send an AuthFailed event.if (zooKeeperSaslClient != null) {boolean sendAuthEvent = false;if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {try {zooKeeperSaslClient.initialize(ClientCnxn.this);} catch (SaslException e) {LOG.error("SASL authentication with Zookeeper Quorum member failed.", e);changeZkState(States.AUTH_FAILED);sendAuthEvent = true;}}KeeperState authState = zooKeeperSaslClient.getKeeperState();if (authState != null) {if (authState == KeeperState.AuthFailed) {// An authentication error occurred during authentication with the Zookeeper Server.changeZkState(States.AUTH_FAILED);sendAuthEvent = true;} else {if (authState == KeeperState.SaslAuthenticated) {sendAuthEvent = true;}}}if (sendAuthEvent) {eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null));if (state == States.AUTH_FAILED) {eventThread.queueEventOfDeath();}}}// 下一次查询超时时间to = readTimeout - clientCnxnSocket.getIdleRecv();} else {// 递减连接超时时间to = connectTimeout - clientCnxnSocket.getIdleRecv();}// 如果会话超时,包括连接超时if (to <= 0) {String warnInfo = String.format("Client session timed out, have not heard from server in %dms for session id 0x%s",clientCnxnSocket.getIdleRecv(),Long.toHexString(sessionId));LOG.warn(warnInfo);throw new SessionTimeoutException(warnInfo);}// 如果发送为空闲状态,则发送Ping包if (state.isConnected()) {//1000(1 second) is to prevent race condition missing to send the second ping//also make sure not to send too many pings when readTimeout is smallint timeToNextPing = readTimeout / 2- clientCnxnSocket.getIdleSend()- ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVALif (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {sendPing();clientCnxnSocket.updateLastSend();} else {if (timeToNextPing < to) {to = timeToNextPing;}}}// 如果是只读模式,则寻找R/W server,如果找到,则清理之前的连接,并重新连接到R/W serverif (state == States.CONNECTEDREADONLY) {long now = Time.currentElapsedTime();int idlePingRwServer = (int) (now - lastPingRwServer);if (idlePingRwServer >= pingRwTimeout) {lastPingRwServer = now;idlePingRwServer = 0;pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout);// 同步测试下个server是否是R/W server,如果是则抛出RWServerFoundExceptionpingRwServer();}to = Math.min(to, pingRwTimeout - idlePingRwServer);}//处理IOclientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);} catch (Throwable e) {if (closing) {// closing so this is expectedLOG.warn("An exception was thrown while closing send thread for session 0x{}.",Long.toHexString(getSessionId()),e);break;} else {LOG.warn("Session 0x{} for server {}, Closing socket connection. "+ "Attempting reconnect except it is a SessionExpiredException.",Long.toHexString(getSessionId()),serverAddress,e);// At this point, there might still be new packets appended to outgoingQueue.// they will be handled in next connection or cleared up if closed.cleanAndNotifyState();}}}synchronized (state) {//清理之前的连接,找下一台server连接cleanup();}clientCnxnSocket.close();if (state.isAlive()) {eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));}eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));ZooTrace.logTraceMessage(LOG,ZooTrace.getTextTraceLevel(),"SendThread exited loop for session: 0x" + Long.toHexString(getSessionId()));}

(2)在startConnect()中调用了clientCnxnSocket.connect(addr)进行异步连接,默认为NIO实现的连接。

        private void startConnect(InetSocketAddress addr) throws IOException {// 初始化并创建连接saslLoginFailed = false;if (!isFirstConnect) {//如果不是第一次连接,则尝试休眠一段时间后唤醒try {Thread.sleep(ThreadLocalRandom.current().nextLong(1000));} catch (InterruptedException e) {LOG.warn("Unexpected exception", e);}}//将改变状态为连接中changeZkState(States.CONNECTING);// 主机端口String hostPort = addr.getHostString() + ":" + addr.getPort();MDC.put("myid", hostPort);setName(getName().replaceAll("\\(.*\\)", "(" + hostPort + ")"));if (clientConfig.isSaslClientEnabled()) {try {if (zooKeeperSaslClient != null) {zooKeeperSaslClient.shutdown();}zooKeeperSaslClient = new ZooKeeperSaslClient(SaslServerPrincipal.getServerPrincipal(addr, clientConfig), clientConfig);} catch (LoginException e) {LOG.warn("SASL configuration failed. "+ "Will continue connection to Zookeeper server without "+ "SASL authentication, if Zookeeper server allows it.", e);eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null));saslLoginFailed = true;}}logStartConnect(addr);// 开始异步连接clientCnxnSocket.connect(addr);}

(3)connect()方法为具体创建连接的方法,在这里使用默认的NIO实现的连接进行分析,connect()的具体实现在ClientCnxnSocketNIO中。在connect()方法中又调用了registerAndConnect(sock, addr)方法来注册连接事件,尝试连接。

    void connect(InetSocketAddress addr) throws IOException {// 创建客户端SocketChannelSocketChannel sock = createSock();try {// 注册连接事件registerAndConnect(sock, addr);} catch (UnresolvedAddressException | UnsupportedAddressTypeException | SecurityException | IOException e) {LOG.error("Unable to open socket to {}", addr);sock.close();throw e;}//session还未初始化initialized = false;//重置2个读buffer,准备下一次读lenBuffer.clear();incomingBuffer = lenBuffer;}

(4)如果连接成功,那么会调用send.Thread.primeConnection()方法来初始化并创建session等操作。

    void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {sockKey = sock.register(selector, SelectionKey.OP_CONNECT);//尝试连接boolean immediateConnect = sock.connect(addr);//如果连接成功,则调用primeConnection()去创建session等操作if (immediateConnect) {sendThread.primeConnection();}}

(5)primeConnection()方法主要是为了创建session,也是为了将客户端发送的请求添加到发送队列中,即outgoingQueue。

        void primeConnection() throws IOException {LOG.info("Socket connection established, initiating session, client: {}, server: {}",clientCnxnSocket.getLocalSocketAddress(),clientCnxnSocket.getRemoteSocketAddress());isFirstConnect = false;// 客户端sessionId默认为0long sessId = (seenRwServerBefore) ? sessionId : 0;// 构建连接请求ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);// We add backwards since we are pushing into the front// Only send if there's a pending watch// TODO: here we have the only remaining use of zooKeeper in// this class. It's to be eliminated!if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) {List<String> dataWatches = zooKeeper.getDataWatches();List<String> existWatches = zooKeeper.getExistWatches();List<String> childWatches = zooKeeper.getChildWatches();List<String> persistentWatches = zooKeeper.getPersistentWatches();List<String> persistentRecursiveWatches = zooKeeper.getPersistentRecursiveWatches();if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()|| !persistentWatches.isEmpty() || !persistentRecursiveWatches.isEmpty()) {Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();Iterator<String> persistentWatchesIter = prependChroot(persistentWatches).iterator();Iterator<String> persistentRecursiveWatchesIter = prependChroot(persistentRecursiveWatches).iterator();long setWatchesLastZxid = lastZxid;while (dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext()|| persistentWatchesIter.hasNext() || persistentRecursiveWatchesIter.hasNext()) {List<String> dataWatchesBatch = new ArrayList<String>();List<String> existWatchesBatch = new ArrayList<String>();List<String> childWatchesBatch = new ArrayList<String>();List<String> persistentWatchesBatch = new ArrayList<String>();List<String> persistentRecursiveWatchesBatch = new ArrayList<String>();int batchLength = 0;// Note, we may exceed our max length by a bit when we add the last// watch in the batch. This isn't ideal, but it makes the code simpler.while (batchLength < SET_WATCHES_MAX_LENGTH) {final String watch;if (dataWatchesIter.hasNext()) {watch = dataWatchesIter.next();dataWatchesBatch.add(watch);} else if (existWatchesIter.hasNext()) {watch = existWatchesIter.next();existWatchesBatch.add(watch);} else if (childWatchesIter.hasNext()) {watch = childWatchesIter.next();childWatchesBatch.add(watch);}  else if (persistentWatchesIter.hasNext()) {watch = persistentWatchesIter.next();persistentWatchesBatch.add(watch);} else if (persistentRecursiveWatchesIter.hasNext()) {watch = persistentRecursiveWatchesIter.next();persistentRecursiveWatchesBatch.add(watch);} else {break;}batchLength += watch.length();}Record record;int opcode;if (persistentWatchesBatch.isEmpty() && persistentRecursiveWatchesBatch.isEmpty()) {// maintain compatibility with older servers - if no persistent/recursive watchers// are used, use the old version of SetWatchesrecord = new SetWatches(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, childWatchesBatch);opcode = OpCode.setWatches;} else {record = new SetWatches2(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch,childWatchesBatch, persistentWatchesBatch, persistentRecursiveWatchesBatch);opcode = OpCode.setWatches2;}RequestHeader header = new RequestHeader(ClientCnxn.SET_WATCHES_XID, opcode);Packet packet = new Packet(header, new ReplyHeader(), record, null, null);outgoingQueue.addFirst(packet);}}}for (AuthData id : authInfo) {outgoingQueue.addFirst(new Packet(new RequestHeader(ClientCnxn.AUTHPACKET_XID, OpCode.auth),null,new AuthPacket(0, id.scheme, id.data),null,null));}// 组合成网络层的Packet对象,添加到发送队列,对于ConnectRequest其requestHeader为nulloutgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));// connectionPrimed()方法里面封装了确保读写事件都能监听clientCnxnSocket.connectionPrimed();LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress());}

③ 当客户端发出的请求进入发送队列中,SendThread这个线程会开始doTransport处理将发送队列的中的请求发送到服务端。

    void doTransport(int waitTimeOut,Queue<Packet> pendingQueue,ClientCnxn cnxn) throws IOException, InterruptedException {selector.select(waitTimeOut);Set<SelectionKey> selected;synchronized (this) {selected = selector.selectedKeys();}// Everything below and until we get back to the select is// non blocking, so time is effectively a constant. That is// Why we just have to do this once, hereupdateNow();for (SelectionKey k : selected) {SocketChannel sc = ((SocketChannel) k.channel());// 如果之前连接没有立马连上,则在这里处理OP_CONNECT事件if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {if (sc.finishConnect()) {updateLastSendAndHeard();updateSocketAddresses();sendThread.primeConnection();}//如果可读或者可写,则处理} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {doIO(pendingQueue, cnxn);}}if (sendThread.getZkState().isConnected()) {// 如果之前的连接已经连上// 如果在outgoingQueue中找到可发送的包,则可写if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {enableWrite();}}//释放资源selected.clear();}

④假如我们从doTransport()中获取到了enableWrite()可写资源,即可将请求队列中的请求发送给服务端。SendThread会执行ClientCnxnSocketNIO中的doIO()方法。

        //如果可写if (sockKey.isWritable()) {// sendThread从发送队列中取出请求包Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());// 如果请求包不为空if (p != null) {// 修改上一次发送时间updateLastSend();// 当发送请求包时,必然申请到了缓冲区资源,序列化请求包到缓冲区if (p.bb == null) {if ((p.requestHeader != null)&& (p.requestHeader.getType() != OpCode.ping)&& (p.requestHeader.getType() != OpCode.auth)) {p.requestHeader.setXid(cnxn.getXid());}//序列化p.createBB();}//写数据sock.write(p.bb);//如果没有剩余数据,即写完,则发送成功if (!p.bb.hasRemaining()) {//已发送的业务包+1sentCount.getAndIncrement();//从发送队列中删除该请求包outgoingQueue.removeFirstOccurrence(p);//如果是业务请求,则添加到Pending队列,方便对server端返回做相应处理,如果是其他请求,发完就丢弃if (p.requestHeader != null&& p.requestHeader.getType() != OpCode.ping&& p.requestHeader.getType() != OpCode.auth) {synchronized (pendingQueue) {pendingQueue.add(p);}}}}//如果发送队列为空,则收回写的权限if (outgoingQueue.isEmpty()) {disableWrite();} else if (!initialized && p != null && !p.bb.hasRemaining()) {//如果没有写完disableWrite();} else {enableWrite();}}

⑤由于第一个请求包是ConnectRequest连接请求包,它构造的packet没有header,所以发完直接丢弃,但是SendThread还需要监听服务端的返回,以确认连上,并进行session的初始化。至于服务端是如何响应该请求的,将在下一章进行介绍。

总结

上述网络通信流程:根据ClientCnxn创建TCP连接,发出ConnectRequest请求包给服务端。

ZooKeeper源码分析之完整网络通信流程(一)相关推荐

  1. zookeeper源码分析之五服务端(集群leader)处理请求流程

    leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcesso ...

  2. zookeeper源码分析之四服务端(单机)处理请求流程

    上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析 ...

  3. zookeeper源码分析之三客户端发送请求流程

    znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的 ...

  4. zookeeper源码分析之恢复事务日志

    zookeeper源码分析之恢复事务日志 前言 源码分析 查看事务日志命令 总结 前言 本文是基于zookeeper集群启动过程分析(https://blog.csdn.net/weixin_4244 ...

  5. Zookeeper源码分析(二) ----- zookeeper日志

    zookeeper源码分析系列文章: Zookeeper源码分析(一) ----- 源码运行环境搭建 原创博客,纯手敲,转载请注明出处,谢谢! 既然我们是要学习源码,那么如何高效地学习源代码呢?答案就 ...

  6. HDFS源码分析DataXceiver之整体流程

    在<HDFS源码分析之DataXceiverServer>一文中,我们了解到在DataNode中,有一个后台工作的线程DataXceiverServer.它被用于接收来自客户端或其他数据节 ...

  7. Fuchsia源码分析--系统调用流程

    Fuchsia源码分析--系统调用流程 以zx_channel_create为例 Fuchsia系统调用的定义 Fuchsia系统调用定义文件的编译 Fuchsia系统调用用户空间的调用流程 zx_c ...

  8. 源码分析Dubbo服务提供者启动流程-上篇

    本节将详细分析Dubbo服务提供者的启动流程,请带着如下几个疑问进行本节的阅读,因为这几个问题将是接下来几篇文章分析的重点内容.  1.什么时候建立与注册中心的连接.  2.服务提供者什么时候向注册中 ...

  9. [Abp vNext 源码分析] - 1. 框架启动流程分析

    一.简要说明 本篇文章主要剖析与讲解 Abp vNext 在 Web API 项目下的启动流程,让大家了解整个 Abp vNext 框架是如何运作的.总的来说 ,Abp vNext 比起 ABP 框架 ...

最新文章

  1. java基础编程题(2)
  2. ping 命令还能这么玩?
  3. MySQL常见错误代码及代码说明
  4. hibernate ORM related
  5. unity服务器文件传输,Unity 3D简单C#文件发送到FPT服务器示例脚本?
  6. Javascript调用后台方法
  7. spring+hibernate:在applicationCOntext.XML中配置C3P0参数说明
  8. vuex的基础小案例(黑马教程)
  9. 八卦与十二地支方位图_[天干地支五行八卦图] 天干地支八卦方位图
  10. 正则验证车牌号(含新能源)
  11. effective c++读书随记
  12. 百度SEO站群WeLive免费在线客服系统 v5
  13. 理解分布式账本技术: 经济学视角
  14. 一些常用的第三方平台和开放平台
  15. 人生苦短_人生苦短,懂事太晚!
  16. 如何设置自定义任务栏图标_轻松自定义Windows 7任务栏图标
  17. 学习笔记(5):第01章-互联网的概述(历史发展+技术发展+常见应用)-互联网的接入(手把手教你调试ADSL宽带技术)
  18. 如何成为一门领域的专家2
  19. ToolBar 修改菜单字体和颜色
  20. 设计多选按钮ListChooseView

热门文章

  1. rsync运行时出现skipping non-regular file
  2. 数据分析: kaggle比赛 - 销量预测
  3. 算术-几何平均不等式
  4. Windows10 彻底关闭系统更新(2022.12.26更新)
  5. 国外LEAD赚钱的一些习惯
  6. 最新消息,青岛的农贸市场将迎来大变革
  7. 【STM32-HAL库】一步步搭建出FOC矢量控制(附C代码)
  8. 启动项中删除微PE工具箱
  9. POJ - 3067
  10. 支付宝 实现 移动网页支付、PC网页支付、混合APP支付(支持微信支付)