ZooKeeper客户端源码(一)——向服务端建立连接+会话建立+心跳保持长连接
首发CSDN:徐同学呀,原创不易,转载请注明源链接。我是徐同学,用心输出高质量文章,希望对你有所帮助。
一、从ZooKeeper实例初始化开始
ZooKeeper
提供了原生的客户端库,虽然不好用,但是能够更好理解客户端与服务端建立连接和通信的过程。比较流行的Apache Curator
也是对原生库的再封装。
向服务端建立连接,只需要实例化一个ZooKeeper
对象,将服务器地址列表传进去即可。因为发起连接请求是一个异步的过程,所以实例化ZooKeeper
时可以传一个Watcher
,会话建立成功之后,客户端会生成一个 “已经建立连接(SyncConnected
)” 的事件,进行回调通知。只有会话建立成功之后,才能与服务端进行通信。
String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
ZooKeeper zooKeeper = new ZooKeeper(connectString, 20000, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getState() == Event.KeeperState.SyncConnected) {System.out.println("会话建立成功");}}
});
ZooKeeper
实例初始化流程如下:
1、创建HostProvider
HostProvider
顾名思义就是“服务地址提供器”,默认实现类StaticHostProvider
。StaticHostProvider
核心思想是,将服务地址列表打乱,然后构成一个虚拟环,轮询向外提供服务地址。
如果StaticHostProvider
不满足需求,可以自定义实现HostProvider
。
(1)打乱服务地址列表
打乱地址就很简单了,用了Java官方提供的工具java.util.Collections#shuffle
。
// org.apache.zookeeper.client.StaticHostProvider#shuffle
private List<InetSocketAddress> shuffle(Collection<InetSocketAddress> serverAddresses) {List<InetSocketAddress> tmpList = new ArrayList<>(serverAddresses.size());tmpList.addAll(serverAddresses);Collections.shuffle(tmpList, sourceOfRandomness);return tmpList;
}
随机源生成:
Random sourceOfRandomness = new Random(System.currentTimeMillis() ^ this.hashCode())
随机源的种子是当前时间毫秒值掺杂(^
)当前StaticHostProvider
实例的hashCode
,使得每次生成的随机源更公平。
(2)构建虚拟环轮询负载
如何将一个服务地址列表构成一个环轮询负载的呢?
有两个游标,currentIndex
和 lastIndex
是实现“虚拟环轮询负载”的关键:
currentIndex
,指向当前选择的位置。每选择一次就加一,如果等于服务地址列表长度,就重置为0,这样就形成了一个环。lastIndex
,上次选择的位置。会话建立成功之后会将currentIndex
赋值给lastIndex
。
public InetSocketAddress next(long spinDelay) {boolean needToSleep = false;InetSocketAddress addr;synchronized (this) {// 省略部分无关代码// currentIndex自增,如果等于服务地址列表长度,就重置为0++currentIndex;if (currentIndex == serverAddresses.size()) {currentIndex = 0;}addr = serverAddresses.get(currentIndex);// 两个游标currentIndex、lastIndex// currentIndex 当前选择的位置,lastIndex上次选择的位置// lastIndex 什么时候设置呢?会话建立成功之后调用 onConnected,将currentIndex赋值给lastIndexneedToSleep = needToSleep || (currentIndex == lastIndex && spinDelay > 0);if (lastIndex == -1) {lastIndex = 0;}}// 如果 currentIndex和lastIndex且spinDelay>0,就需要休眠spinDelay时间,if (needToSleep) {try {Thread.sleep(spinDelay);} catch (InterruptedException e) {LOG.warn("Unexpected exception", e);}}// 解析InetSocketAddress,// 如果一个主机映射了多个ip地址(InetAddress)// 就打乱选择其中一个地址返回return resolve(addr);
}
当currentIndex
和lastIndex
相等时,且spinDelay>0
,就会休眠spinDelay
毫秒,然后再将选择的服务地址返回,为什么会有这个逻辑呢?
这是因为,如果轮询了一圈服务地址都没有成功建立连接,与其一味不停地重试,还不如休眠一段时间再试,可能成功的概率更高一些。
2、创建ConnectStringParser
ConnectStringParser
就是将 connectString
按一定格式解析成 InetSocketAddress
列表。connectString
格式如下:
127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/test
如果在服务地址后面指定了路径,后续的操作都是在该路径下进行。
public ConnectStringParser(String connectString) {// parse out chroot, if any// 解析chroot// connectString 可以指定某个路径,// 如127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/testint off = connectString.indexOf('/');if (off >= 0) {String chrootPath = connectString.substring(off);// ignore "/" chroot spec, same as nullif (chrootPath.length() == 1) {this.chrootPath = null;} else {PathUtils.validatePath(chrootPath);this.chrootPath = chrootPath;}connectString = connectString.substring(0, off);} else {this.chrootPath = null;}// 按 , 分割List<String> hostsList = split(connectString, ",");for (String host : hostsList) {int port = DEFAULT_PORT;String[] hostAndPort = NetUtils.getIPV6HostAndPort(host);if (hostAndPort.length != 0) {host = hostAndPort[0];if (hostAndPort.length == 2) {port = Integer.parseInt(hostAndPort[1]);}} else {int pidx = host.lastIndexOf(':');if (pidx >= 0) {// otherwise : is at the end of the string, ignoreif (pidx < host.length() - 1) {port = Integer.parseInt(host.substring(pidx + 1));}host = host.substring(0, pidx);}}// 封装未解析的InetSocketAddressserverAddresses.add(InetSocketAddress.createUnresolved(host, port));}
}
3、创建并启动ClientCnxn
ClientCnxn
是对客户端连接的抽象和封装,负责网络连接管理和watcher
管理。
还记得从ZooKeeper
构造器传入的Watcher
吗?它会作为默认Watcher
传给ZKWatchManager
,后续其他请求注册watcher
时,可以不用再定义,直接使用默认的Watcher
。
初始化SendThread
时会传入一个ClientCnxnSocket
,ClientCnxnSocket
是对网络底层的封装,默认实现类为ClientCnxnSocketNIO
。
ClientCnxn
创建好后会进行启动操作,就是启动SendThread
和EventThread
两个线程。后续的网络连接建立和通信都是由SendThread
线程负责。
二、向服务端建立连接
SendThread
启动后,会进入一个循环状态,首先判断是否已经建立连接,如果没有就通过hostProvider
选择一个服务地址发起连接。
网络底层处理类ClientCnxnSocket
,以ClientCnxnSocketNIO
实现为准,如下图是建立连接的过程:
ClientCnxnSocketNIO
底层是创建了非阻塞的SocketChannel
,然后注册OP_CONNECT
事件,并发起连接,如果此时能立刻连上,就继续进行会话建立流程。
如下模拟服务器一直连不上,多次通过hostProvider
轮询选择服务器重连的效果:
服务器地址选择了一圈以后,会休眠spinDelay
毫秒,也符合源码逻辑。
三、会话建立请求
连接建立之后,需要继续进行会话建立请求,因为每条连接都是有状态的,临时节点的归属就是以会话为依托,连接断开,会话失效,临时节点也就跟随着清除。
每条连接跟会话绑定,如果连接因为网络等问题断开,在会话有效期内重连上,就可以恢复之前的工作场景,而如果在会话失效之后重连上,就是非法连接,需要重新进行会话建立。
1、发起会话建立请求
发起会话建立,首先构建一个ConnectRequest
请求体:
ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
ConnectRequest
需要传入客户端最近一次的事务zxid
,会话超时时间,会话id,会话密码。首次建立会话,sessionId
为0,sessionPasswd
为空。
其次,将ConnectRequest
请求体包装进Packet
对象,Packet
是ZooKeeper
通信的最小单元,所有请求体都会包装进一个Packet
对象再序列化发送给服务端。
Packet(RequestHeader requestHeader,ReplyHeader replyHeader,Record request,Record response,WatchRegistration watchRegistration,boolean readOnly)
对于ConnectRequest
包装的Packet
,请求头RequestHeader
为null,并且会被放在请求队列outgoingQueue
的首位。
outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));
接着注册网络IO读写事件,后续请求包就会被发送给服务端了,发送完成后再将会话建立的Packet
从outgoingQueue
中移除。
注:发送到服务端的会话创建流程,暂时不需要了解,后面讲解服务端源码时会详细讲述。
2、会话建立响应
会话建立响应,需要和普通请求响应分开处理,如果接收到响应信息,判断客户端还未初始化完成,就认为这个响应一定是会话建立响应。
首先从底层网络缓冲区读取数据反序列化构建ConnectResponse
响应体,解析出经过服务器协商好的sessionTimeout
以及sessionId
和 sessionPasswd
。
根据协商的sessionTimeout
重新设置readTimeout
和connectTimeout
,readTimeout
是negotiatedSessionTimeout
的2/3,connectTimeout
是协商negotiatedSessionTimeout
与服务地址列表个数平均。
最后生成一个SyncConnected
的watcher
事件,交由EventThread
线程进行回调,这是唯一一个不需要向服务端注册的watcher
事件,完全由客户端自己生成和触发。
四、心跳保持长连接
网络建立连接,会话建立都完成后,就可以与服务端通信了,为了保持长连接的会话一直有效,在没有向服务端发送请求的一段时间内会发送心跳请求。而一段时间是多久?如下是计算下一次发送心跳请求时间的算法:
// clientCnxnSocket.getIdleSend为距离上次发送的时长
// readTimeout = sessionTimeout * 2 / 3
int timeToNextPing = readTimeout / 2- clientCnxnSocket.getIdleSend()- ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
根据计算,每隔sessionTimeout/3
如果没有发送任何请求,就发送一次心跳。多减1秒是为了防止因为竞态情况而丢失心跳请求。
发送的心跳请求数据很简单,没有请求体,只有请求头:
如下是在源码中加了日志后的心跳过程:
协商sessionTimeout
为9999,readTimeout
计算得6666,timeToNextPing
为3333,每次会多减1秒,大概每隔3秒发一次心跳。
五、总结
客户端与服务端建立的是长连接,如果连接失败,服务地址列表会轮询重试,直到连接成功,官方提供了默认的服务地址负载算法
StaticHostProvider
,也可以自己实现。每条连接是有状态的,只有建立了会话,才能真正开始与服务端通信。会话建立成功之后,会生成一个
SyncConnected
事件进行回调通知。会话是临时节点的基础,在会话有效期内断开重连,可以恢复上一次工作场景。
为了保持长连接的会话一直有效,在没有向服务端发送请求的一段时间内会发送心跳请求,心跳间隔时间为
sessionTimeout/3
。
如若文章有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。
ZooKeeper客户端源码(一)——向服务端建立连接+会话建立+心跳保持长连接相关推荐
- zookeeper源码分析之五服务端(集群leader)处理请求流程
leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcesso ...
- zookeeper源码分析之四服务端(单机)处理请求流程
上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析 ...
- ZooKeeper客户端源码(零)——客户端API使用
首发CSDN:徐同学呀,原创不易,转载请注明源链接.我是徐同学,用心输出高质量文章,希望对你有所帮助. 本篇源码基于ZooKeeper3.7.0版本. 一.建立连接和会话 客户端可以通过创建一个 Zo ...
- Netty 源码解析系列-服务端启动流程解析
netty源码解析系列 Netty 源码解析系列-服务端启动流程解析 Netty 源码解析系列-客户端连接接入及读I/O解析 五分钟就能看懂pipeline模型 -Netty 源码解析 1.服务端启动 ...
- 视频直播源码中关于服务端直播开播推送实现
在视频直播源码中直播app开播时需向客户推送开播消息通知用户,实现方式如下: 1.申请相应的推送服务三方,如下使用极光推送,获取相应的配置资料,并做好相应的配置 2.推送代码如下: /* 极光推送 * ...
- netty源码学习之服务端客户端初始化
文章目录 1. AbstractBootstrap类简介 1.1. 核心方法 2. netty服务端创建 2.1. 服务端启动入口 2.2. doBind()方法 2.3. netty服务初始化 2. ...
- zookeeper源码分析之一服务端启动过程
zookeeper简介 zookeeper是为分布式应用提供分布式协作服务的开源软件.它提供了一组简单的原子操作,分布式应用可以基于这些原子操作来实现更高层次的同步服务,配置维护,组管理和命名.zoo ...
- c语言传奇引擎源码,Ei3.0服务端引擎源代码+登陆网关+客户端插件
Ei3.0服务端引擎源代码+登陆网关+客户端插件 procedure TPANEL.Button3Click(Sender: TObject); var P:integer; mItem:DWORD; ...
- Zookeeper 客户端源码吐血总结
目录 一.几个重要的类 二.JAVA的基础知识 三.大致了解 四 从入门到放弃的讲解 Code1:ZK Code2:创建 Zookeeper实例,实例化ClientCnxn,实例化ClientCnxn ...
最新文章
- 使用xilinx的documentation navigator快速查找资料
- 用sql语句获取连续整数id中,缺失的最小id和最大id
- redis的lrange_thinkphp5操作redis系列教程】列表类型之lRange,lGetRange
- 陆奇:疫情后将出现哪些创业新机会?
- ubuntu解决安装Scrapy库时报x86_64-linux-gnu-gcc错误
- 使用tcl 创建vivado工程
- [转] 如何看透一个人
- BZOJ1468: Tree BZOJ3365: [Usaco2004 Feb]Distance Statistics 路程统计
- 小D课堂 - 新版本微服务springcloud+Docker教程_6-05 高级篇幅之高并发情况下
- spring cloud SnakeYAML RCE 漏洞复现
- CKA 认证笔记 - CKA 认证经验帖
- JSON.stringify(value, replacer, space)详解
- 操作系统实验 生产者/消费者模型
- 设计一款给爸爸妈妈用的手机
- Android 获取局域网内网IP地址
- IC卡和ID卡以及两者的比较
- 冒充linux内核,4岁小萝莉向Linux内核贡献代码修复「漏洞」而且已经合并到内核...
- iOS限制输入表情(emoji)
- 完整正则表达式语法列表
- 电脑摄像头未能创建连接服务器,电脑提示未能创建视频预览,请检查设备连接的原因及解决办法...