参考资料

<<从PAXOS到ZOOKEEPER分布式一致性原理与实践>>
zookeeper-3.0.0

Follower角色初始化

本文主要简述一下Follower角色初始化的流程,并概述一下主要的操作。

Follower角色初始化流程

case FOLLOWING:try {LOG.info("FOLLOWING");setFollower(makeFollower(logFactory));                      // 如果是FOLLOWING状态则转换成follower 跟随主follower.followLeader();} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {follower.shutdown();setFollower(null);setPeerState(ServerState.LOOKING);}break;

处理角色流程跟Leader的角色的初始化流程类似,首先生成一个followe类,然后再调用该类的followLeader方法。我们继续查看makeFollower方法。

 protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {return new Follower(this, new FollowerZooKeeperServer(logFactory, this,new ZooKeeperServer.BasicDataTreeBuilder()));}

从执行可知,生成了一个Follower实例,并生成了一个FollowerZooKeeperServer实例作为参数,首先查看Follower的初始化方法。

    Follower(QuorumPeer self,FollowerZooKeeperServer zk) {this.self = self;                                       // 设置QuormPeer实例this.zk=zk;                                             // 设置zk实例}

主要就是保存了传入参数。在初始化完成之后紧接着就执行了followLeader方法。

followLeader方法
void followLeader() throws InterruptedException {InetSocketAddress addr = null;                                  // Find the leader by idVote current = self.getCurrentVote();                                           // 获取当前的投票for (QuorumServer s : self.quorumPeers.values()) {                              // 找到与投票id与保存的获取相同的ID if (s.id == current.id) {addr = s.addr;                                                          // 此时就是找出对应主的ID的地址break;}}if (addr == null) {LOG.warn("Couldn't find the leader with id = "+ current.id);                                                      // 如果为空则打印当前信息}LOG.info("Following " + addr);sock = new Socket();                                                            // 生成一个socket 来连接主try {QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);             // 生成一个ACK 包sock.setSoTimeout(self.tickTime * self.initLimit);                          // 设置超时时间for (int tries = 0; tries < 5; tries++) {                                   // 设置尝试次数try {//sock = new Socket();//sock.setSoTimeout(self.tickTime * self.initLimit);sock.connect(addr, self.tickTime * self.syncLimit);                 // 连接远端主机sock.setTcpNoDelay(true);break;} catch (ConnectException e) {if (tries == 4) {                                                   // 如果连接超时超过4次则直接将错误抛出LOG.error("Unexpected exception",e);throw e;} else {LOG.warn("Unexpected exception",e);                             // 如果小于4次则重新设置超时时间并重新生成socksock = new Socket();sock.setSoTimeout(self.tickTime * self.initLimit);}}Thread.sleep(1000);                                                     // 休眠}leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));                                            // 获取读的内容bufferedOutput = new BufferedOutputStream(sock.getOutputStream());          leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);                  // 获取写的内容QuorumPacket qp = new QuorumPacket();                                       // 生成一个包qp.setType(Leader.LASTZXID);                                                // 设置类型 类型为最后一次事务IDlong sentLastZxid = self.getLastLoggedZxid();                               // 获取最后一次事务IDqp.setZxid(sentLastZxid);                                                   // 设置事务IDwritePacket(qp);                                                            // 发送该包readPacket(qp);                                                             // 读返回数据long newLeaderZxid = qp.getZxid();                                          // 获取回复的的事务IDif (qp.getType() != Leader.NEWLEADER) {                                     // 如果回复的类型不是新主则报错LOG.error("First packet should have been NEWLEADER");throw new IOException("First packet should have been NEWLEADER");}readPacket(qp);                                                             synchronized (zk) {if (qp.getType() == Leader.DIFF) {                                      // 判断类型 如果是DIFF 则加载数据LOG.info("Getting a diff from the leader!");zk.loadData();}else if (qp.getType() == Leader.SNAP) {                                 // 如果是SNAP 则做快照LOG.info("Getting a snapshot from leader");// The leader is going to dump the databasezk.deserializeSnapshot(leaderIs);                                   // 做快照String signature = leaderIs.readString("signature");if (!signature.equals("BenWasHere")) {LOG.error("Missing signature. Got " + signature);throw new IOException("Missing signature");}} else if (qp.getType() == Leader.TRUNC) {                              // 如果是TRUNC类型则截取日志//we need to truncate the log to the lastzxid of the leaderLOG.warn("Truncating log to get in sync with the leader 0x"+ Long.toHexString(qp.getZxid()));boolean truncated=zk.getLogWriter().truncateLog(qp.getZxid());if (!truncated) {// not able to truncate the logLOG.error("Not able to truncate the log "+ Long.toHexString(qp.getZxid()));System.exit(13);}zk.loadData();                                                      // 截取日志之后保存日志}else {LOG.error("Got unexpected packet from leader "+ qp.getType() + " exiting ... " );System.exit(13);}zk.dataTree.lastProcessedZxid = newLeaderZxid;                         // 此时数据同步完成设置最后的事务ID}ack.setZxid(newLeaderZxid & ~0xffffffffL);                                 // 设置ack内容的事务ID writePacket(ack);                                                           // 将该包内容发送出去sock.setSoTimeout(self.tickTime * self.syncLimit);                          // 设置超时时间zk.startup();                                                               // 加载follower的处理流程while (self.running) {                                                      // follower开始运行readPacket(qp);                                                         // 读取接受到的数据switch (qp.getType()) {                                                 // 判断接受到的数据类型case Leader.PING:                                                       // 如果是PING // Send back the ping with our session dataByteArrayOutputStream bos = new ByteArrayOutputStream();  DataOutputStream dos = new DataOutputStream(bos);HashMap<Long, Integer> touchTable = zk.getTouchSnapshot();for (Entry<Long, Integer> entry : touchTable.entrySet()) {          // 返回快照里面的快照数据dos.writeLong(entry.getKey());dos.writeInt(entry.getValue());}qp.setData(bos.toByteArray());writePacket(qp);                                                    // 发送回去break;case Leader.PROPOSAL:                                                   // 如果是提交的事务TxnHeader hdr = new TxnHeader();BinaryInputArchive ia = BinaryInputArchive.getArchive(new ByteArrayInputStream(qp.getData()));Record txn = SerializeUtils.deserializeTxn(ia, hdr);if (hdr.getZxid() != lastQueued + 1) {                              // 检查事务ID是否一致LOG.warn("Got zxid 0x"+ Long.toHexString(hdr.getZxid())+ " expected 0x"+ Long.toHexString(lastQueued + 1));}lastQueued = hdr.getZxid();                                         // 获取事务IDzk.logRequest(hdr, txn);                                            // 将事务记录到日志中break;case Leader.COMMIT:zk.commit(qp.getZxid());                                            // 提交日志中的事务break;case Leader.UPTODATE:                                                   // 生成快照zk.takeSnapshot();self.cnxnFactory.setZooKeeperServer(zk);break;case Leader.REVALIDATE:                                                 // 验证session是否活跃ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());DataInputStream dis = new DataInputStream(bis);long sessionId = dis.readLong();boolean valid = dis.readBoolean();synchronized (pendingRevalidations) {ServerCnxn cnxn = pendingRevalidations.remove(sessionId);                                     // 如果返回为空则 遗失了会话if (cnxn == null) {LOG.warn("Missing session 0x"+ Long.toHexString(sessionId)+ " for validation");} else {cnxn.finishSessionInit(valid);                              // 返回是否合法}}ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,"Session 0x" + Long.toHexString(sessionId)+ " is valid: " + valid);                  break; case Leader.SYNC:zk.sync();break;}}} catch (IOException e) {LOG.warn("Exception when following the leader", e);try {sock.close();} catch (IOException e1) {e1.printStackTrace();}synchronized (pendingRevalidations) {// clear pending revalitionspendingRevalidations.clear();                       // 清理数据pendingRevalidations.notifyAll();}}}

followLeader的执行流程,相对比较多,主要就是先获取主的地址,然后去通过一定的容错次数去连接主,在连接上主之后,然后向主发送当前follower的事务ID并根据该ID,再等待主返回的数据,根据主返回的数据来检查是否需要同步本地的数据来和主保持一致,最后就进入等待阶段,等待接收主发送过来的数据请求,包括处理PING、事务开始,事务提交,会话检查等工作。

总结

本文follower的内容的处理相对比较简单,流程比较清晰易懂,主要就是启动之后尝试去连接主,然后根据主返回的事务日志,将更新本地的数据同步,并接受主发送过来的同步,快照,事务提交等处理逻辑。在后续文章中,会分析从客户端发送过来之后leader与follower的处理过程是怎样,并概述一下大致的流程。由于本人才疏学浅,如有错误请批评指正。

Zookeeper源码分析:Follower角色初始化相关推荐

  1. zookeeper源码分析之五服务端(集群leader)处理请求流程

    leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcesso ...

  2. zookeeper源码分析之恢复事务日志

    zookeeper源码分析之恢复事务日志 前言 源码分析 查看事务日志命令 总结 前言 本文是基于zookeeper集群启动过程分析(https://blog.csdn.net/weixin_4244 ...

  3. zookeeper源码分析之四服务端(单机)处理请求流程

    上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析 ...

  4. zookeeper源码分析之三客户端发送请求流程

    znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的 ...

  5. Spring IOC 容器源码分析 - 余下的初始化工作

    1. 简介 本篇文章是"Spring IOC 容器源码分析"系列文章的最后一篇文章,本篇文章所分析的对象是 initializeBean 方法,该方法用于对已完成属性填充的 bea ...

  6. Zookeeper源码分析(二) ----- zookeeper日志

    zookeeper源码分析系列文章: Zookeeper源码分析(一) ----- 源码运行环境搭建 原创博客,纯手敲,转载请注明出处,谢谢! 既然我们是要学习源码,那么如何高效地学习源代码呢?答案就 ...

  7. linux源码分析之cpu初始化 kernel/head.s,linux源码分析之cpu初始化

    linux源码分析之cpu初始化 kernel/head.s 收藏 来自:http://blog.csdn.net/BoySKung/archive/2008/12/09/3486026.aspx l ...

  8. celery源码分析-Task的初始化与发送任务

    celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery的任务发送 在Django项目中使用了装饰器来包装待执行任务, from cel ...

  9. nginx源码分析之网络初始化

    nginx作为一个高性能的HTTP服务器,网络的处理是其核心,了解网络的初始化有助于加深对nginx网络处理的了解,本文主要通过nginx的源代码来分析其网络初始化. 从配置文件中读取初始化信息 与网 ...

最新文章

  1. Kubernetes 会不会“杀死” DevOps?
  2. 莫烦Matplotlib可视化第四章多图合并显示代码学习
  3. 【JAVA基础篇】String类详解
  4. C/C++——getline()详解
  5. 【Flink】Flink CDC 数据同步 【视频笔记】
  6. java创建xml设置路径_java 写入xml文件 地址如何设置为局域网内的另一台服务器上...
  7. rabbitmq 查看消费者_(Windows环境下)RabbitMQ系列(一)安装以及入门使用
  8. MyBatis多参数传递之默认命名方式示例——MyBatis学习笔记之十二
  9. 算法:翻转链表 Reverse Linked List 三种方法实现,迭代解决人类思维,递归解决机器思维 reverse node
  10. 再谈本土EDA竞争力顺便聊聊DTCO在中国落地
  11. 转速开环恒压频比异步电动机调速系统仿真
  12. 使用EXCEL VBA代码自动群发带附件的邮件同时抄送给不同的人
  13. WPF随笔(七)--分页控件
  14. ikuai路由管理系统教程
  15. 零基础想要做好人物角色模型,先了解人体的构造!快来康康
  16. 电子表格是计算机几级,计算机一级电子表格文档.doc
  17. 如何使用 R 从 Internet 下载文件
  18. 全志A20编译调试笔记
  19. 使用codesense的GJB 8114模板对c++源代码规则检测示例
  20. 3 、库存是企业的墓场

热门文章

  1. 抗击疫情!阿里云为加速新药疫苗研发提供免费AI算力
  2. 1200亿次日均位置服务响应、20亿公里日均轨迹里程,百度地图发布新一代人工智能地图生态全景
  3. AI大佬“互怼”:Bengio和Gary Marcus隔空对谈深度学习发展现状
  4. 知识图谱公开课 | 详解事件抽取与事件图谱构建
  5. 一个可以卷起来的蓝牙键盘,简直是办公码字神器!
  6. 马云:“996 是一种巨大的福气”
  7. AI一分钟 | 小米发布小爱音箱mini,169元;天猫汽车无人贩卖机大楼落地,刷脸可购车试驾
  8. mybatis-plus团队新作:mybatis-mate 轻松搞定数据权限
  9. 趣图:老手调试多线程,666
  10. 为何每次用完 ThreadLocal 都要调用 remove()