Zookeeper源码分析:Follower角色初始化
参考资料
<<从PAXOS到ZOOKEEPER分布式一致性原理与实践>>
zookeeper-3.0.0
Follower角色初始化
本文主要简述一下Follower角色初始化的流程,并概述一下主要的操作。
Follower角色初始化流程
case FOLLOWING:try {LOG.info("FOLLOWING");setFollower(makeFollower(logFactory)); // 如果是FOLLOWING状态则转换成follower 跟随主follower.followLeader();} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {follower.shutdown();setFollower(null);setPeerState(ServerState.LOOKING);}break;
处理角色流程跟Leader的角色的初始化流程类似,首先生成一个followe类,然后再调用该类的followLeader方法。我们继续查看makeFollower方法。
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {return new Follower(this, new FollowerZooKeeperServer(logFactory, this,new ZooKeeperServer.BasicDataTreeBuilder()));}
从执行可知,生成了一个Follower实例,并生成了一个FollowerZooKeeperServer实例作为参数,首先查看Follower的初始化方法。
Follower(QuorumPeer self,FollowerZooKeeperServer zk) {this.self = self; // 设置QuormPeer实例this.zk=zk; // 设置zk实例}
主要就是保存了传入参数。在初始化完成之后紧接着就执行了followLeader方法。
followLeader方法
void followLeader() throws InterruptedException {InetSocketAddress addr = null; // Find the leader by idVote current = self.getCurrentVote(); // 获取当前的投票for (QuorumServer s : self.quorumPeers.values()) { // 找到与投票id与保存的获取相同的ID if (s.id == current.id) {addr = s.addr; // 此时就是找出对应主的ID的地址break;}}if (addr == null) {LOG.warn("Couldn't find the leader with id = "+ current.id); // 如果为空则打印当前信息}LOG.info("Following " + addr);sock = new Socket(); // 生成一个socket 来连接主try {QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null); // 生成一个ACK 包sock.setSoTimeout(self.tickTime * self.initLimit); // 设置超时时间for (int tries = 0; tries < 5; tries++) { // 设置尝试次数try {//sock = new Socket();//sock.setSoTimeout(self.tickTime * self.initLimit);sock.connect(addr, self.tickTime * self.syncLimit); // 连接远端主机sock.setTcpNoDelay(true);break;} catch (ConnectException e) {if (tries == 4) { // 如果连接超时超过4次则直接将错误抛出LOG.error("Unexpected exception",e);throw e;} else {LOG.warn("Unexpected exception",e); // 如果小于4次则重新设置超时时间并重新生成socksock = new Socket();sock.setSoTimeout(self.tickTime * self.initLimit);}}Thread.sleep(1000); // 休眠}leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream())); // 获取读的内容bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); // 获取写的内容QuorumPacket qp = new QuorumPacket(); // 生成一个包qp.setType(Leader.LASTZXID); // 设置类型 类型为最后一次事务IDlong sentLastZxid = self.getLastLoggedZxid(); // 获取最后一次事务IDqp.setZxid(sentLastZxid); // 设置事务IDwritePacket(qp); // 发送该包readPacket(qp); // 读返回数据long newLeaderZxid = qp.getZxid(); // 获取回复的的事务IDif (qp.getType() != Leader.NEWLEADER) { // 如果回复的类型不是新主则报错LOG.error("First packet should have been NEWLEADER");throw new IOException("First packet should have been NEWLEADER");}readPacket(qp); synchronized (zk) {if (qp.getType() == Leader.DIFF) { // 判断类型 如果是DIFF 则加载数据LOG.info("Getting a diff from the leader!");zk.loadData();}else if (qp.getType() == Leader.SNAP) { // 如果是SNAP 则做快照LOG.info("Getting a snapshot from leader");// The leader is going to dump the databasezk.deserializeSnapshot(leaderIs); // 做快照String signature = leaderIs.readString("signature");if (!signature.equals("BenWasHere")) {LOG.error("Missing signature. Got " + signature);throw new IOException("Missing signature");}} else if (qp.getType() == Leader.TRUNC) { // 如果是TRUNC类型则截取日志//we need to truncate the log to the lastzxid of the leaderLOG.warn("Truncating log to get in sync with the leader 0x"+ Long.toHexString(qp.getZxid()));boolean truncated=zk.getLogWriter().truncateLog(qp.getZxid());if (!truncated) {// not able to truncate the logLOG.error("Not able to truncate the log "+ Long.toHexString(qp.getZxid()));System.exit(13);}zk.loadData(); // 截取日志之后保存日志}else {LOG.error("Got unexpected packet from leader "+ qp.getType() + " exiting ... " );System.exit(13);}zk.dataTree.lastProcessedZxid = newLeaderZxid; // 此时数据同步完成设置最后的事务ID}ack.setZxid(newLeaderZxid & ~0xffffffffL); // 设置ack内容的事务ID writePacket(ack); // 将该包内容发送出去sock.setSoTimeout(self.tickTime * self.syncLimit); // 设置超时时间zk.startup(); // 加载follower的处理流程while (self.running) { // follower开始运行readPacket(qp); // 读取接受到的数据switch (qp.getType()) { // 判断接受到的数据类型case Leader.PING: // 如果是PING // Send back the ping with our session dataByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos);HashMap<Long, Integer> touchTable = zk.getTouchSnapshot();for (Entry<Long, Integer> entry : touchTable.entrySet()) { // 返回快照里面的快照数据dos.writeLong(entry.getKey());dos.writeInt(entry.getValue());}qp.setData(bos.toByteArray());writePacket(qp); // 发送回去break;case Leader.PROPOSAL: // 如果是提交的事务TxnHeader hdr = new TxnHeader();BinaryInputArchive ia = BinaryInputArchive.getArchive(new ByteArrayInputStream(qp.getData()));Record txn = SerializeUtils.deserializeTxn(ia, hdr);if (hdr.getZxid() != lastQueued + 1) { // 检查事务ID是否一致LOG.warn("Got zxid 0x"+ Long.toHexString(hdr.getZxid())+ " expected 0x"+ Long.toHexString(lastQueued + 1));}lastQueued = hdr.getZxid(); // 获取事务IDzk.logRequest(hdr, txn); // 将事务记录到日志中break;case Leader.COMMIT:zk.commit(qp.getZxid()); // 提交日志中的事务break;case Leader.UPTODATE: // 生成快照zk.takeSnapshot();self.cnxnFactory.setZooKeeperServer(zk);break;case Leader.REVALIDATE: // 验证session是否活跃ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());DataInputStream dis = new DataInputStream(bis);long sessionId = dis.readLong();boolean valid = dis.readBoolean();synchronized (pendingRevalidations) {ServerCnxn cnxn = pendingRevalidations.remove(sessionId); // 如果返回为空则 遗失了会话if (cnxn == null) {LOG.warn("Missing session 0x"+ Long.toHexString(sessionId)+ " for validation");} else {cnxn.finishSessionInit(valid); // 返回是否合法}}ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,"Session 0x" + Long.toHexString(sessionId)+ " is valid: " + valid); break; case Leader.SYNC:zk.sync();break;}}} catch (IOException e) {LOG.warn("Exception when following the leader", e);try {sock.close();} catch (IOException e1) {e1.printStackTrace();}synchronized (pendingRevalidations) {// clear pending revalitionspendingRevalidations.clear(); // 清理数据pendingRevalidations.notifyAll();}}}
followLeader的执行流程,相对比较多,主要就是先获取主的地址,然后去通过一定的容错次数去连接主,在连接上主之后,然后向主发送当前follower的事务ID并根据该ID,再等待主返回的数据,根据主返回的数据来检查是否需要同步本地的数据来和主保持一致,最后就进入等待阶段,等待接收主发送过来的数据请求,包括处理PING、事务开始,事务提交,会话检查等工作。
总结
本文follower的内容的处理相对比较简单,流程比较清晰易懂,主要就是启动之后尝试去连接主,然后根据主返回的事务日志,将更新本地的数据同步,并接受主发送过来的同步,快照,事务提交等处理逻辑。在后续文章中,会分析从客户端发送过来之后leader与follower的处理过程是怎样,并概述一下大致的流程。由于本人才疏学浅,如有错误请批评指正。
Zookeeper源码分析:Follower角色初始化相关推荐
- zookeeper源码分析之五服务端(集群leader)处理请求流程
leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcesso ...
- zookeeper源码分析之恢复事务日志
zookeeper源码分析之恢复事务日志 前言 源码分析 查看事务日志命令 总结 前言 本文是基于zookeeper集群启动过程分析(https://blog.csdn.net/weixin_4244 ...
- zookeeper源码分析之四服务端(单机)处理请求流程
上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析 ...
- zookeeper源码分析之三客户端发送请求流程
znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的 ...
- Spring IOC 容器源码分析 - 余下的初始化工作
1. 简介 本篇文章是"Spring IOC 容器源码分析"系列文章的最后一篇文章,本篇文章所分析的对象是 initializeBean 方法,该方法用于对已完成属性填充的 bea ...
- Zookeeper源码分析(二) ----- zookeeper日志
zookeeper源码分析系列文章: Zookeeper源码分析(一) ----- 源码运行环境搭建 原创博客,纯手敲,转载请注明出处,谢谢! 既然我们是要学习源码,那么如何高效地学习源代码呢?答案就 ...
- linux源码分析之cpu初始化 kernel/head.s,linux源码分析之cpu初始化
linux源码分析之cpu初始化 kernel/head.s 收藏 来自:http://blog.csdn.net/BoySKung/archive/2008/12/09/3486026.aspx l ...
- celery源码分析-Task的初始化与发送任务
celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery的任务发送 在Django项目中使用了装饰器来包装待执行任务, from cel ...
- nginx源码分析之网络初始化
nginx作为一个高性能的HTTP服务器,网络的处理是其核心,了解网络的初始化有助于加深对nginx网络处理的了解,本文主要通过nginx的源代码来分析其网络初始化. 从配置文件中读取初始化信息 与网 ...
最新文章
- Kubernetes 会不会“杀死” DevOps?
- 莫烦Matplotlib可视化第四章多图合并显示代码学习
- 【JAVA基础篇】String类详解
- C/C++——getline()详解
- 【Flink】Flink CDC 数据同步 【视频笔记】
- java创建xml设置路径_java 写入xml文件 地址如何设置为局域网内的另一台服务器上...
- rabbitmq 查看消费者_(Windows环境下)RabbitMQ系列(一)安装以及入门使用
- MyBatis多参数传递之默认命名方式示例——MyBatis学习笔记之十二
- 算法:翻转链表 Reverse Linked List 三种方法实现,迭代解决人类思维,递归解决机器思维 reverse node
- 再谈本土EDA竞争力顺便聊聊DTCO在中国落地
- 转速开环恒压频比异步电动机调速系统仿真
- 使用EXCEL VBA代码自动群发带附件的邮件同时抄送给不同的人
- WPF随笔(七)--分页控件
- ikuai路由管理系统教程
- 零基础想要做好人物角色模型,先了解人体的构造!快来康康
- 电子表格是计算机几级,计算机一级电子表格文档.doc
- 如何使用 R 从 Internet 下载文件
- 全志A20编译调试笔记
- 使用codesense的GJB 8114模板对c++源代码规则检测示例
- 3 、库存是企业的墓场
热门文章
- 抗击疫情!阿里云为加速新药疫苗研发提供免费AI算力
- 1200亿次日均位置服务响应、20亿公里日均轨迹里程,百度地图发布新一代人工智能地图生态全景
- AI大佬“互怼”:Bengio和Gary Marcus隔空对谈深度学习发展现状
- 知识图谱公开课 | 详解事件抽取与事件图谱构建
- 一个可以卷起来的蓝牙键盘,简直是办公码字神器!
- 马云:“996 是一种巨大的福气”
- AI一分钟 | 小米发布小爱音箱mini,169元;天猫汽车无人贩卖机大楼落地,刷脸可购车试驾
- mybatis-plus团队新作:mybatis-mate 轻松搞定数据权限
- 趣图:老手调试多线程,666
- 为何每次用完 ThreadLocal 都要调用 remove()