首发CSDN:徐同学呀,原创不易,转载请注明源链接。我是徐同学,用心输出高质量文章,希望对你有所帮助。

一、从ZooKeeper实例初始化开始

ZooKeeper 提供了原生的客户端库,虽然不好用,但是能够更好理解客户端与服务端建立连接和通信的过程。比较流行的Apache Curator也是对原生库的再封装。

向服务端建立连接,只需要实例化一个ZooKeeper对象,将服务器地址列表传进去即可。因为发起连接请求是一个异步的过程,所以实例化ZooKeeper时可以传一个Watcher,会话建立成功之后,客户端会生成一个 “已经建立连接(SyncConnected)” 的事件,进行回调通知。只有会话建立成功之后,才能与服务端进行通信。

String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
ZooKeeper zooKeeper = new ZooKeeper(connectString, 20000, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {System.out.println("会话建立成功");}}
});

ZooKeeper实例初始化流程如下:

1、创建HostProvider

HostProvider顾名思义就是“服务地址提供器”,默认实现类StaticHostProviderStaticHostProvider核心思想是,将服务地址列表打乱,然后构成一个虚拟环,轮询向外提供服务地址。

如果StaticHostProvider不满足需求,可以自定义实现HostProvider

(1)打乱服务地址列表

打乱地址就很简单了,用了Java官方提供的工具java.util.Collections#shuffle

// org.apache.zookeeper.client.StaticHostProvider#shuffle
private List<InetSocketAddress> shuffle(Collection<InetSocketAddress> serverAddresses) {List<InetSocketAddress> tmpList = new ArrayList<>(serverAddresses.size());tmpList.addAll(serverAddresses);Collections.shuffle(tmpList, sourceOfRandomness);return tmpList;
}

随机源生成:

Random sourceOfRandomness = new Random(System.currentTimeMillis() ^ this.hashCode())

随机源的种子是当前时间毫秒值掺杂(^)当前StaticHostProvider实例的hashCode,使得每次生成的随机源更公平。

(2)构建虚拟环轮询负载

如何将一个服务地址列表构成一个环轮询负载的呢?

有两个游标,currentIndexlastIndex是实现“虚拟环轮询负载”的关键:

  • currentIndex,指向当前选择的位置。每选择一次就加一,如果等于服务地址列表长度,就重置为0,这样就形成了一个环。
  • lastIndex,上次选择的位置。会话建立成功之后会将currentIndex赋值给lastIndex
public InetSocketAddress next(long spinDelay) {boolean needToSleep = false;InetSocketAddress addr;synchronized (this) {// 省略部分无关代码// currentIndex自增,如果等于服务地址列表长度,就重置为0++currentIndex;if (currentIndex == serverAddresses.size()) {currentIndex = 0;}addr = serverAddresses.get(currentIndex);// 两个游标currentIndex、lastIndex// currentIndex 当前选择的位置,lastIndex上次选择的位置// lastIndex 什么时候设置呢?会话建立成功之后调用 onConnected,将currentIndex赋值给lastIndexneedToSleep = needToSleep || (currentIndex == lastIndex && spinDelay > 0);if (lastIndex == -1) {lastIndex = 0;}}// 如果 currentIndex和lastIndex且spinDelay>0,就需要休眠spinDelay时间,if (needToSleep) {try {Thread.sleep(spinDelay);} catch (InterruptedException e) {LOG.warn("Unexpected exception", e);}}// 解析InetSocketAddress,// 如果一个主机映射了多个ip地址(InetAddress)// 就打乱选择其中一个地址返回return resolve(addr);
}

currentIndexlastIndex相等时,且spinDelay>0,就会休眠spinDelay毫秒,然后再将选择的服务地址返回,为什么会有这个逻辑呢?

这是因为,如果轮询了一圈服务地址都没有成功建立连接,与其一味不停地重试,还不如休眠一段时间再试,可能成功的概率更高一些。

2、创建ConnectStringParser

ConnectStringParser 就是将 connectString 按一定格式解析成 InetSocketAddress 列表。connectString格式如下:

127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/test

如果在服务地址后面指定了路径,后续的操作都是在该路径下进行。

public ConnectStringParser(String connectString) {// parse out chroot, if any// 解析chroot// connectString 可以指定某个路径,// 如127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/testint off = connectString.indexOf('/');if (off >= 0) {String chrootPath = connectString.substring(off);// ignore "/" chroot spec, same as nullif (chrootPath.length() == 1) {this.chrootPath = null;} else {PathUtils.validatePath(chrootPath);this.chrootPath = chrootPath;}connectString = connectString.substring(0, off);} else {this.chrootPath = null;}// 按 , 分割List<String> hostsList = split(connectString, ",");for (String host : hostsList) {int port = DEFAULT_PORT;String[] hostAndPort = NetUtils.getIPV6HostAndPort(host);if (hostAndPort.length != 0) {host = hostAndPort[0];if (hostAndPort.length == 2) {port = Integer.parseInt(hostAndPort[1]);}} else {int pidx = host.lastIndexOf(':');if (pidx >= 0) {// otherwise : is at the end of the string, ignoreif (pidx < host.length() - 1) {port = Integer.parseInt(host.substring(pidx + 1));}host = host.substring(0, pidx);}}// 封装未解析的InetSocketAddressserverAddresses.add(InetSocketAddress.createUnresolved(host, port));}
}

3、创建并启动ClientCnxn

ClientCnxn是对客户端连接的抽象和封装,负责网络连接管理和watcher管理。

还记得从ZooKeeper构造器传入的Watcher吗?它会作为默认Watcher传给ZKWatchManager,后续其他请求注册watcher时,可以不用再定义,直接使用默认的Watcher

初始化SendThread时会传入一个ClientCnxnSocketClientCnxnSocket是对网络底层的封装,默认实现类为ClientCnxnSocketNIO

ClientCnxn创建好后会进行启动操作,就是启动SendThreadEventThread两个线程。后续的网络连接建立和通信都是由SendThread线程负责。

二、向服务端建立连接

SendThread启动后,会进入一个循环状态,首先判断是否已经建立连接,如果没有就通过hostProvider选择一个服务地址发起连接。

网络底层处理类ClientCnxnSocket,以ClientCnxnSocketNIO实现为准,如下图是建立连接的过程:

ClientCnxnSocketNIO底层是创建了非阻塞的SocketChannel,然后注册OP_CONNECT事件,并发起连接,如果此时能立刻连上,就继续进行会话建立流程。

如下模拟服务器一直连不上,多次通过hostProvider轮询选择服务器重连的效果:

服务器地址选择了一圈以后,会休眠spinDelay毫秒,也符合源码逻辑。

三、会话建立请求

连接建立之后,需要继续进行会话建立请求,因为每条连接都是有状态的,临时节点的归属就是以会话为依托,连接断开,会话失效,临时节点也就跟随着清除。

每条连接跟会话绑定,如果连接因为网络等问题断开,在会话有效期内重连上,就可以恢复之前的工作场景,而如果在会话失效之后重连上,就是非法连接,需要重新进行会话建立。

1、发起会话建立请求

发起会话建立,首先构建一个ConnectRequest请求体:

ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);

ConnectRequest需要传入客户端最近一次的事务zxid,会话超时时间,会话id,会话密码。首次建立会话,sessionId为0,sessionPasswd为空。

其次,将ConnectRequest请求体包装进Packet对象,PacketZooKeeper通信的最小单元,所有请求体都会包装进一个Packet对象再序列化发送给服务端。

Packet(RequestHeader requestHeader,ReplyHeader replyHeader,Record request,Record response,WatchRegistration watchRegistration,boolean readOnly)

对于ConnectRequest包装的Packet,请求头RequestHeader为null,并且会被放在请求队列outgoingQueue的首位。

outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));

接着注册网络IO读写事件,后续请求包就会被发送给服务端了,发送完成后再将会话建立的PacketoutgoingQueue中移除。

注:发送到服务端的会话创建流程,暂时不需要了解,后面讲解服务端源码时会详细讲述。

2、会话建立响应

会话建立响应,需要和普通请求响应分开处理,如果接收到响应信息,判断客户端还未初始化完成,就认为这个响应一定是会话建立响应。

首先从底层网络缓冲区读取数据反序列化构建ConnectResponse响应体,解析出经过服务器协商好的sessionTimeout以及sessionIdsessionPasswd

根据协商的sessionTimeout重新设置readTimeoutconnectTimeoutreadTimeoutnegotiatedSessionTimeout的2/3,connectTimeout是协商negotiatedSessionTimeout与服务地址列表个数平均。

最后生成一个SyncConnectedwatcher事件,交由EventThread线程进行回调,这是唯一一个不需要向服务端注册的watcher事件,完全由客户端自己生成和触发。

四、心跳保持长连接

网络建立连接,会话建立都完成后,就可以与服务端通信了,为了保持长连接的会话一直有效,在没有向服务端发送请求的一段时间内会发送心跳请求。而一段时间是多久?如下是计算下一次发送心跳请求时间的算法:

// clientCnxnSocket.getIdleSend为距离上次发送的时长
// readTimeout = sessionTimeout * 2 / 3
int timeToNextPing = readTimeout / 2- clientCnxnSocket.getIdleSend()- ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);

根据计算,每隔sessionTimeout/3如果没有发送任何请求,就发送一次心跳。多减1秒是为了防止因为竞态情况而丢失心跳请求。

发送的心跳请求数据很简单,没有请求体,只有请求头:

如下是在源码中加了日志后的心跳过程:

协商sessionTimeout为9999,readTimeout计算得6666,timeToNextPing为3333,每次会多减1秒,大概每隔3秒发一次心跳。

五、总结

  1. 客户端与服务端建立的是长连接,如果连接失败,服务地址列表会轮询重试,直到连接成功,官方提供了默认的服务地址负载算法 StaticHostProvider,也可以自己实现。

  2. 每条连接是有状态的,只有建立了会话,才能真正开始与服务端通信。会话建立成功之后,会生成一个SyncConnected事件进行回调通知。

  3. 会话是临时节点的基础,在会话有效期内断开重连,可以恢复上一次工作场景。

  4. 为了保持长连接的会话一直有效,在没有向服务端发送请求的一段时间内会发送心跳请求,心跳间隔时间为sessionTimeout/3

如若文章有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。

ZooKeeper客户端源码(一)——向服务端建立连接+会话建立+心跳保持长连接相关推荐

  1. zookeeper源码分析之五服务端(集群leader)处理请求流程

    leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcesso ...

  2. zookeeper源码分析之四服务端(单机)处理请求流程

    上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析 ...

  3. ZooKeeper客户端源码(零)——客户端API使用

    首发CSDN:徐同学呀,原创不易,转载请注明源链接.我是徐同学,用心输出高质量文章,希望对你有所帮助. 本篇源码基于ZooKeeper3.7.0版本. 一.建立连接和会话 客户端可以通过创建一个 Zo ...

  4. Netty 源码解析系列-服务端启动流程解析

    netty源码解析系列 Netty 源码解析系列-服务端启动流程解析 Netty 源码解析系列-客户端连接接入及读I/O解析 五分钟就能看懂pipeline模型 -Netty 源码解析 1.服务端启动 ...

  5. 视频直播源码中关于服务端直播开播推送实现

    在视频直播源码中直播app开播时需向客户推送开播消息通知用户,实现方式如下: 1.申请相应的推送服务三方,如下使用极光推送,获取相应的配置资料,并做好相应的配置 2.推送代码如下: /* 极光推送 * ...

  6. netty源码学习之服务端客户端初始化

    文章目录 1. AbstractBootstrap类简介 1.1. 核心方法 2. netty服务端创建 2.1. 服务端启动入口 2.2. doBind()方法 2.3. netty服务初始化 2. ...

  7. zookeeper源码分析之一服务端启动过程

    zookeeper简介 zookeeper是为分布式应用提供分布式协作服务的开源软件.它提供了一组简单的原子操作,分布式应用可以基于这些原子操作来实现更高层次的同步服务,配置维护,组管理和命名.zoo ...

  8. c语言传奇引擎源码,Ei3.0服务端引擎源代码+登陆网关+客户端插件

    Ei3.0服务端引擎源代码+登陆网关+客户端插件 procedure TPANEL.Button3Click(Sender: TObject); var P:integer; mItem:DWORD; ...

  9. Zookeeper 客户端源码吐血总结

    目录 一.几个重要的类 二.JAVA的基础知识 三.大致了解 四 从入门到放弃的讲解 Code1:ZK Code2:创建 Zookeeper实例,实例化ClientCnxn,实例化ClientCnxn ...

最新文章

  1. 使用xilinx的documentation navigator快速查找资料
  2. 用sql语句获取连续整数id中,缺失的最小id和最大id
  3. redis的lrange_thinkphp5操作redis系列教程】列表类型之lRange,lGetRange
  4. 陆奇:疫情后将出现哪些创业新机会?
  5. ubuntu解决安装Scrapy库时报x86_64-linux-gnu-gcc错误
  6. 使用tcl 创建vivado工程
  7. [转] 如何看透一个人
  8. BZOJ1468: Tree BZOJ3365: [Usaco2004 Feb]Distance Statistics 路程统计
  9. 小D课堂 - 新版本微服务springcloud+Docker教程_6-05 高级篇幅之高并发情况下
  10. spring cloud SnakeYAML RCE 漏洞复现
  11. CKA 认证笔记 - CKA 认证经验帖
  12. JSON.stringify(value, replacer, space)详解
  13. 操作系统实验 生产者/消费者模型
  14. 设计一款给爸爸妈妈用的手机
  15. Android 获取局域网内网IP地址
  16. IC卡和ID卡以及两者的比较
  17. 冒充linux内核,4岁小萝莉向Linux内核贡献代码修复「漏洞」而且已经合并到内核...
  18. iOS限制输入表情(emoji)
  19. 完整正则表达式语法列表
  20. 电脑摄像头未能创建连接服务器,电脑提示未能创建视频预览,请检查设备连接的原因及解决办法...

热门文章

  1. 划分问题——动态规划
  2. 怎么申请注册学校邮箱?
  3. windows hosts文件恢复
  4. Jmeter环境变量配置
  5. 响应服务器553 5.7.1,错误代码553.5 7.1 sender是什么
  6. 有效项目治理的快速操作指南
  7. 微软WP8.1:用户最期盼的十个升级要点(转载自远景论坛)
  8. xml文档声明及基本语法
  9. 文章伪原创怎么做(什么是伪原创)
  10. Java实现连连看源代码文档_JAVA实现连连看游戏