服务端启动时,就启动线程通过NIO监听网络端口。每个连接都会有一个上下文环境对象,当接收到请求后,会在上下文环境对象中进行处理。

服务端启动线程,监听网络端口,(NIOServerCnxn.Factory):

  1 static public class Factory extends Thread {
  2         static {
  3             //设置全局的异常处理
  4             Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
  5                 public void uncaughtException(Thread t, Throwable e) {
  6                     LOG.error("Thread " + t + " died", e);
  7                 }
  8             });
  9             /**
 10              * jvm早期的nullpoint bug
 11              * http://bugs.sun.com/view_bug.do?bug_id=6427854
 12              */
 13             try {
 14                 Selector.open().close();
 15             } catch(IOException ie) {
 16                 LOG.error("Selector failed to open", ie);
 17             }
 18         }
 19         //服务器通道
 20         final ServerSocketChannel ss;
 21         //选择器
 22         final Selector selector = Selector.open();
 23         ZooKeeperServer zks;
 24
 25         final ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);
 26         //所有的上下文环境
 27         final HashSet<NIOServerCnxn> cnxns = new HashSet<NIOServerCnxn>();
 28         //ip对应的上下文环境
 29         final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap =
 30             new HashMap<InetAddress, Set<NIOServerCnxn>>( );
 31         int maxClientCnxns = 10;
 32
 33         public Factory(InetSocketAddress addr, int maxcc) throws IOException {
 34             setDaemon(true);
 35             //单个client链接最大数
 36             maxClientCnxns = maxcc;
 37           //创建服务器通道
 38             this.ss = ServerSocketChannel.open();
 39             ss.socket().setReuseAddress(true);
 40           //绑定端口
 41             ss.socket().bind(addr);
 42           //设置通道为非阻塞通道
 43             ss.configureBlocking(false);
 44           //把通道注册到选择器中
 45             ss.register(selector, SelectionKey.OP_ACCEPT);
 46         }
 47         public void run() {
 48             while (!ss.socket().isClosed()) {
 49                 try {
 50                     //选择一组键
 51                     selector.select(1000);
 52                     Set<SelectionKey> selected;
 53                     synchronized (this) {
 54                         selected = selector.selectedKeys();
 55                     }
 56                     ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
 57                             selected);
 58                     Collections.shuffle(selectedList);
 59                     for (SelectionKey k : selectedList) {
 60                         //如果通道已经准备好接收套接字
 61                         if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
 62                             SocketChannel sc = ((ServerSocketChannel) k
 63                                     .channel()).accept();
 64                             InetAddress ia = sc.socket().getInetAddress();
 65                             //判断最大连接数
 66                             int cnxncount = getClientCnxnCount(ia);
 67                             if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
 68                                 sc.close();
 69                             } else {
 70                                 // 配置为非阻塞
 71                                 sc.configureBlocking(false);
 72                               //把通道注册到选择器中
 73                                 SelectionKey sk = sc.register(selector,
 74                                         SelectionKey.OP_READ);
 75                                 NIOServerCnxn cnxn = createConnection(sc, sk);
 76                                 //给该通道附带一个上下文环境
 77                                 sk.attach(cnxn);
 78                                 addCnxn(cnxn);
 79                             }
 80                         } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
 81                             //通过上线文件来进行读写
 82                             NIOServerCnxn c = (NIOServerCnxn) k.attachment();
 83                             c.doIO(k);
 84                         } else {
 85                             if (LOG.isDebugEnabled()) {
 86                                 LOG.debug("Unexpected ops in select "
 87                                           + k.readyOps());
 88                             }
 89                         }
 90                     }
 91                     selected.clear();
 92                  catch (Exception e) {
 93                     LOG.warn("Ignoring exception", e);
 94                 }
 95             }
 96             clear();
 97         }
 98         //关闭
 99         public void shutdown() {
100                 try {
101                     ss.close();
102                     clear();
103                     this.interrupt();
104                     this.join();
105                 } catch (Exception e) {
106                     LOG.warn("Ignoring unexpected exception during shutdown", e);
107                 }
108                 try {
109                     selector.close();
110                 } catch (IOException e) {
111                     LOG.warn("Selector closing", e);
112                 }
113                 if (zks != null) {
114                     zks.shutdown();
115                 }
116         }
117         synchronized public void clear() {
118             selector.wakeup();
119             HashSet<NIOServerCnxn> cnxns;
120             synchronized (this.cnxns) {
121                 cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
122             }
123             // got to clear all the connections that we have in the selector
124             for (NIOServerCnxn cnxn: cnxns) {
125                 try {
126                     // don't hold this.cnxns lock as deadlock may occur
127                     cnxn.close();
128                 } catch (Exception e) {
129                     LOG.warn("Ignoring exception closing cnxn sessionid 0x"
130                             + Long.toHexString(cnxn.sessionId), e);
131                 }
132             }
133         }
134     }

View Code

  上下文环境主要通过NIO读取请求数据,首先会读取4个字节的请求数据,该数据分为两种情况,一种是真正的数据包长度,一种是命令。如果是真正的数据包长度会按长度读取数据报,进行处理。如果是命令,会根据命令进行处理。

服务端通道上下文,通过NIO进行读写(NIOServerCnxn.doIO):

 1 void doIO(SelectionKey k) throws InterruptedException {
 2         try {
 3             //如果通道可以读取数据
 4             if (k.isReadable()) {
 5                 //读取数据到缓存中
 6                 int rc = sock.read(incomingBuffer);
 7                 //如果读满缓存
 8                 if (incomingBuffer.remaining() == 0) {
 9                     boolean isPayload;
10                     //如果第一次读取,则先读取长度内容,相应分配缓存;否则读取指定长度的数据内容
11                     if (incomingBuffer == lenBuffer) {
12                         incomingBuffer.flip();
13                         isPayload = readLength(k);
14                         incomingBuffer.clear();
15                     } else {
16                         isPayload = true;
17                     }
18                     if (isPayload) {
19                         //读取数据,初始化、读取请求数据封装成packet
20                         readPayload();
21                     }
22                 }
23             }
24           //如果通道可以读写数据
25             if (k.isWritable()) {
26                 //缓存中有数据需要写入
27                 if (outgoingBuffers.size() > 0) {
28                     //创建bytebuffer
29                     ByteBuffer directBuffer = factory.directBuffer;
30                     directBuffer.clear();
31                     //从队列中读取数据知道缓存读满
32                     for (ByteBuffer b : outgoingBuffers) {
33                         if (directBuffer.remaining() < b.remaining()) {
34                             b = (ByteBuffer) b.slice().limit(
35                                     directBuffer.remaining());
36                         }
37                         int p = b.position();
38                         directBuffer.put(b);
39                         b.position(p);
40                         if (directBuffer.remaining() == 0) {
41                             break;
42                         }
43                     }
44                     //将数据写入通道
45                     directBuffer.flip();
46                     int sent = sock.write(directBuffer);
47                     ByteBuffer bb;
48                     //从缓存中已经发送的删除数据
49                     while (outgoingBuffers.size() > 0) {
50                         bb = outgoingBuffers.peek();
51                         int left = bb.remaining() - sent;
52                         if (left > 0) {
53                             bb.position(bb.position() + sent);
54                             break;
55                         }
56                         sent -= bb.remaining();
57                         outgoingBuffers.remove();
58                     }
59                 }
60
61                 synchronized(this.factory){
62                     if (outgoingBuffers.size() == 0) {
63                         sk.interestOps(sk.interestOps()
64                                 & (~SelectionKey.OP_WRITE));
65                     } else {
66                         sk.interestOps(sk.interestOps()
67                                 | SelectionKey.OP_WRITE);
68                     }
69                 }
70             }
71         }catch (IOException e) {
72             close();
73         }
74     }
75     private void readPayload() throws IOException, InterruptedException {
76         if (incomingBuffer.remaining() != 0) { // have we read length bytes?
77             int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
78         }
79         if (incomingBuffer.remaining() == 0) {
80             //重置缓存
81             incomingBuffer.flip();
82             //如果没有进行初始化,首先要初始化;如果已经链接,则读取请求数据,封装成packet
83             if (!initialized) {
84                 readConnectRequest();
85             } else {
86                 readRequest();
87             }
88           //重置
89             lenBuffer.clear();
90             incomingBuffer = lenBuffer;
91         }
92     }

View Code

服务端通道上下文,读取长度,如果是命令,另起线程处理命令操作(NIOServerCnxn.readLength)

 1 private boolean readLength(SelectionKey k) throws IOException {
 2         //如果是请求数据,根据长度分配缓存;如果是命令,执行相应命令。
 3         int len = lenBuffer.getInt();
 4         if (!initialized && checkFourLetterWord(k, len)) {
 5             return false;
 6         }
 7         if (len < 0 || len > BinaryInputArchive.maxBuffer) {
 8             throw new IOException("Len error " + len);
 9         }
10         if (zk == null) {
11             throw new IOException("ZooKeeperServer not running");
12         }
13         incomingBuffer = ByteBuffer.allocate(len);
14         return true;
15     }
16     private boolean checkFourLetterWord(final SelectionKey k, final int len)
17             throws IOException
18             {
19                 //获取命令
20                 String cmd = cmd2String.get(len);
21                 /** cancel the selection key to remove the socket handling
22                  * from selector. This is to prevent netcat problem wherein
23                  * netcat immediately closes the sending side after sending the
24                  * commands and still keeps the receiving channel open.
25                  * The idea is to remove the selectionkey from the selector
26                  * so that the selector does not notice the closed read on the
27                  * socket channel and keep the socket alive to write the data to
28                  * and makes sure to close the socket after its done writing the data
29                  */
30                 if (k != null) {
31                     try {
32                         k.cancel();
33                     } catch(Exception e) {
34                         LOG.error("Error cancelling command selection key ", e);
35                     }
36                 }
37                 //根据命令类型,执行相应内容
38                 final PrintWriter pwriter = new PrintWriter(
39                         new BufferedWriter(new SendBufferWriter()));
40                 if (len == ruokCmd) {
41                     RuokCommand ruok = new RuokCommand(pwriter);
42                     ruok.start();
43                     return true;
44                 } else if (len == getTraceMaskCmd) {
45                     TraceMaskCommand tmask = new TraceMaskCommand(pwriter);
46                     tmask.start();
47                     return true;
48                 } else if (len == setTraceMaskCmd) {
49                     int rc = sock.read(incomingBuffer);
50                     if (rc < 0) {
51                         throw new IOException("Read error");
52                     }
53
54                     incomingBuffer.flip();
55                     long traceMask = incomingBuffer.getLong();
56                     ZooTrace.setTextTraceLevel(traceMask);
57                     SetTraceMaskCommand setMask = new SetTraceMaskCommand(pwriter, traceMask);
58                     setMask.start();
59                     return true;
60                 } else if (len == enviCmd) {
61                     EnvCommand env = new EnvCommand(pwriter);
62                     env.start();
63                     return true;
64                 } else if (len == confCmd) {
65                     ConfCommand ccmd = new ConfCommand(pwriter);
66                     ccmd.start();
67                     return true;
68                 } else if (len == srstCmd) {
69                     StatResetCommand strst = new StatResetCommand(pwriter);
70                     strst.start();
71                     return true;
72                 } else if (len == crstCmd) {
73                     CnxnStatResetCommand crst = new CnxnStatResetCommand(pwriter);
74                     crst.start();
75                     return true;
76                 } else if (len == dumpCmd) {
77                     DumpCommand dump = new DumpCommand(pwriter);
78                     dump.start();
79                     return true;
80                 } else if (len == statCmd || len == srvrCmd) {
81                     StatCommand stat = new StatCommand(pwriter, len);
82                     stat.start();
83                     return true;
84                 } else if (len == consCmd) {
85                     ConsCommand cons = new ConsCommand(pwriter);
86                     cons.start();
87                     return true;
88                 } else if (len == wchpCmd || len == wchcCmd || len == wchsCmd) {
89                     WatchCommand wcmd = new WatchCommand(pwriter, len);
90                     wcmd.start();
91                     return true;
92                 }
93                 return false;
94             }

View Code

服务端通道上下文,如果是请求数据,处理请求数据(NIOServerCnxn.readRequest\NIOServerCnxn.readConnectRequest):

 1 private void readRequest() throws IOException {
 2         //反序列化请求数据
 3         InputStream bais = new ByteBufferInputStream(incomingBuffer);
 4         BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
 5         RequestHeader h = new RequestHeader();
 6         h.deserialize(bia, "header");
 7         incomingBuffer = incomingBuffer.slice();
 8         if (h.getType() == OpCode.auth) {
 9             //如果是认证请求
10             AuthPacket authPacket = new AuthPacket();
11             ZooKeeperServer.byteBuffer2Record(incomingBuffer, authPacket);
12             String scheme = authPacket.getScheme();
13             //进行认证
14             AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
15             if (ap == null
16                     || (ap.handleAuthentication(this, authPacket.getAuth())
17                             != KeeperException.Code.OK)) {
18                 // 认证失败,返回失败内容
19                 ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
20                         KeeperException.Code.AUTHFAILED.intValue());
21                 sendResponse(rh, null, null);
22                 //关闭链接
23                 sendCloseSession();
24                 disableRecv();
25             } else {
26                 //认证成功,返回成功内容
27                 ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
28                         KeeperException.Code.OK.intValue());
29                 sendResponse(rh, null, null);
30             }
31             return;
32         } else {
33             //如果是请求,提交到zk处理
34             Request si = new Request(this, sessionId, h.getXid(), h.getType(), incomingBuffer, authInfo);
35             si.setOwner(ServerCnxn.me);
36             zk.submitRequest(si);
37         }
38     }
39
40     private void readConnectRequest() throws IOException, InterruptedException {
41         //反序列化链接请求对象
42         BinaryInputArchive bia = BinaryInputArchive
43                 .getArchive(new ByteBufferInputStream(incomingBuffer));
44         ConnectRequest connReq = new ConnectRequest();
45         connReq.deserialize(bia, "connect");
46         if (connReq.getLastZxidSeen() > zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
47             throw new CloseRequestException(msg);
48         }
49         sessionTimeout = connReq.getTimeOut();
50         byte passwd[] = connReq.getPasswd();
51         //初始化session
52         disableRecv();
53         if (connReq.getSessionId() != 0) {
54             long clientSessionId = connReq.getSessionId();
55             factory.closeSessionWithoutWakeup(clientSessionId);
56             setSessionId(clientSessionId);
57             zk.reopenSession(this, sessionId, passwd, sessionTimeout);
58         } else {
59             zk.createSession(this, passwd, sessionTimeout);
60         }
61         initialized = true;
62     }

View Code

服务端通道上下文,写返回数据(NIOServerCnxn.sendResponse)

 1 synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
 2         try {
 3             //序列化返回结果,
 4             ByteArrayOutputStream baos = new ByteArrayOutputStream();
 5             BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
 6             try {
 7                 baos.write(fourBytes);
 8                 bos.writeRecord(h, "header");
 9                 if (r != null) {
10                     bos.writeRecord(r, tag);
11                 }
12                 baos.close();
13             } catch (IOException e) {
14                 LOG.error("Error serializing response");
15             }
16             //写入数据长度
17             byte b[] = baos.toByteArray();
18             ByteBuffer bb = ByteBuffer.wrap(b);
19             bb.putInt(b.length - 4).rewind();
20             //
21             sendBuffer(bb);
22          } catch(Exception e) {
23             LOG.warn("Unexpected exception. Destruction averted.", e);
24          }
25     }
26     void sendBuffer(ByteBuffer bb) {
27         try {
28             if (bb != closeConn) {
29                 //直接发送数据
30                 if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) {
31                     try {
32                         sock.write(bb);
33                     } catch (IOException e) {
34                         // we are just doing best effort right now
35                     }
36                 }
37                 if (bb.remaining() == 0) {
38                     packetSent();
39                     return;
40                 }
41             }
42             //写入缓存中。
43             synchronized(this.factory){
44                 sk.selector().wakeup();
45                 outgoingBuffers.add(bb);
46                 if (sk.isValid()) {
47                     sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
48                 }
49             }
50
51         } catch(Exception e) {
52             LOG.error("Unexpected Exception: ", e);
53         }
54     }

View Code

 

转载于:https://www.cnblogs.com/zhangwanhua/p/9182544.html

zookeeper服务端相关推荐

  1. 服务器启动文件,[Zookeeper] 服务端之单机版服务器启动

    1 服务器端整体概览图 概览图 ServerCnxnFactory:负责与client之间的网络交互,支持NIO(默认)以及Netty SessionTrackerImpl:会话管理器 Datadir ...

  2. Zookeeper系列(十)zookeeper的服务端启动详述

    作者:leesf    掌控之中,才会成功:掌控之外,注定失败.出处:http://www.cnblogs.com/leesf456/p/6105276.html尊重原创,大家功能学习进步: 一.前言 ...

  3. 使用Kazoo操作ZooKeeper服务治理

    使用Kazoo操作ZooKeeper服务治理 单机服务的可靠性及可扩展性有限,某台服务宕机可能会影响整个系统的正常使用:分布式服务能够有效地解决这一问题,但同时分布式服务也会带来一些新的问题,如:服务 ...

  4. 8、Zookeeper服务注册与发现原理浅析

    了解Zookeeper的我们都知道,Zookeeper是一种分布式协调服务,在分布式应用中,主要用来实现分布式服务的注册与发现以及分布式锁,本文我们简单介绍一下Zookeeper是如何实现服务的注册与 ...

  5. zookeeper源码分析之五服务端(集群leader)处理请求流程

    leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcesso ...

  6. zookeeper源码分析之四服务端(单机)处理请求流程

    上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析 ...

  7. zookeeper源码分析之一服务端启动过程

    zookeeper简介 zookeeper是为分布式应用提供分布式协作服务的开源软件.它提供了一组简单的原子操作,分布式应用可以基于这些原子操作来实现更高层次的同步服务,配置维护,组管理和命名.zoo ...

  8. 限制会话id服务端不共享_不懂 Zookeeper?看完不懂你打我

    高并发分布式开发技术体系已然非常的庞大,从国内互联网企业使用情况,可发现RPC.Dubbo.ZK是最基础的技能要求.关于Zookeeper你是不是还停留在Dubbo注册中心的印象中呢?还有它的工作原理 ...

  9. Zookeeper应用:服务端上下线

    需求 客户端感知服务器的上下线. 示意图 步骤 服务端: 1.所有机子向Zookeeper注册,注册znode为临时的. 2.有机子下线,连接断开后被Zookeeper自动删除,触发监听事件. 3.有 ...

  10. ZooKeeper客户端源码(一)——向服务端建立连接+会话建立+心跳保持长连接

    首发CSDN:徐同学呀,原创不易,转载请注明源链接.我是徐同学,用心输出高质量文章,希望对你有所帮助. 一.从ZooKeeper实例初始化开始 ZooKeeper 提供了原生的客户端库,虽然不好用,但 ...

最新文章

  1. iPhone App开发实战手册学习笔记(5)之IOS常用机制
  2. 简单介绍python的input,print,eval函数
  3. C++指针数组、数组指针、数组名及二维数组技巧汇总
  4. boost::mp11::mp_take相关用法的测试程序
  5. Java文件能编译成lib吗_Makefile用于将一些.cpp和.h编译成lib
  6. ImageView缩放选项
  7. 学安全工程用不用计算机,上重点大学的末流专业,不如上普通大学的重点专业,你赞成吗?...
  8. python后台返回cookie_Django框架设置cookies与获取cookies操作详解
  9. 【英语学习】【Level 07】U07 Stories of my Life L6 An experience worth remembering
  10. J2EE 重载跟覆盖的概念以及区别
  11. matlab 韩明距离_科学网—Matlab中 pdist 函数详解(各种距离的生成) - 朱新宇的博文...
  12. 【转载】google搜索从入门到精通
  13. phpstudy下载、安装、配置、网站部署
  14. Windows Server 2016关闭自动更新
  15. 教师计算机研修培训日志感言,2019年教师研修感言
  16. 百度在线笔试计算机视觉,[转载]2014 百度 计算机视觉笔试
  17. 常用Unity平台解释
  18. 模仿懒惰加载的图片加载方法
  19. ps中如何把图片变白底
  20. c语言中 p2 amp n1表示什么意思,2005年9月全国计算机等级考试二级C语言笔试试题及答案mjj...

热门文章

  1. 调用codesoft,打印条码,批量连续打印,变量打印
  2. [原]gimp的python控制台以及python的PIL图像处理库
  3. '用户 'sa' 登录失败。该用户与可信 SQL Server 连接无关联,做JSP项目连接数据库 ....
  4. Redis应用(四)——在Spring框架中的应用
  5. 借助Sigar API获取网络信息
  6. Ajax请求生成中文乱码问题
  7. Java面试题超详细讲解系列之四【Jvm篇】
  8. Java神鬼莫测之MyBatis中$与#的区别(五)
  9. 每天一道剑指offer-调整数组顺序使奇数位于偶数前面
  10. php正则字母,PHP匹配连续的数字或字母的正则表达式