参考资料

<<从PAXOS到ZOOKEEPER分布式一致性原理与实践>>
zookeeper-3.0.0

Leader角色初始化

在上文的选举完成之后,每个zk实例都会根据选举结果进入对应的角色,本文主要就是讲述Leader的初始化相关内容。

Leader初始化流程

case LEADING:LOG.info("LEADING");try {setLeader(makeLeader(logFactory));                          // 设置成主状态leader.lead();                                              // 接听所有事件请求setLeader(null);                                            // 如果失去当前主  则将主设置为空} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {if (leader != null) {                                       // 设置为空并重置状态leader.shutdown("Forcing shutdown");setLeader(null);}setPeerState(ServerState.LOOKING);}break;

在角色进入到LEADING时, 此时就会进入生产一个leader实例并调用该leader实例的lead方法进入主角色开始执行。首先查看makeLeader方法。

protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {return new Leader(this, new LeaderZooKeeperServer(logFactory,this,new ZooKeeperServer.BasicDataTreeBuilder()));}

此时就是初始化了一个新的Leader类并传入QuormPeer实例并初始化了一个LeaderZooKeeperServer实例。

该类的初始化方法如下;

    Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {this.self = self;try {ss = new ServerSocket(self.getQuorumAddress().getPort());           // 监听一下本地服务端口} catch (BindException e) {LOG.error("Couldn't bind to port "+ self.getQuorumAddress().getPort());throw e;}this.zk=zk;}

主要就是保存对应的实例并监听本地的端口。接着就执行了该类的lead方法。

leader.lead方法
    void lead() throws IOException, InterruptedException {self.tick = 0;                                                              // 计数置零zk.loadData();                                                              // zk加载数据 主要就是将会话删除旧的恢复可用的zk.startup();                                                               // zk创建会话 并注册调用链处理函数long epoch = self.getLastLoggedZxid() >> 32L;                               // 获取epoch 值 并加1epoch++;zk.setZxid(epoch << 32L);                                                   // 设置zxid值zk.dataTree.lastProcessedZxid = zk.getZxid();                               // 获取最后一次提交事物idlastProposed = zk.getZxid();newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),null, null);                                                        // 生成一个新leader的包if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {              // 判断值是否为0LOG.warn("NEWLEADER proposal has Zxid of "+ newLeaderProposal.packet.getZxid());}outstandingProposals.add(newLeaderProposal);                                // 添加事务// Start thread that waits for connection requests from // new followers.cnxAcceptor = new FollowerCnxAcceptor();                                    // 开启线程接受follower的信息cnxAcceptor.start();// We have to get at least a majority of servers in sync with// us. We do this by waiting for the NEWLEADER packet to get// acknowledgednewLeaderProposal.ackCount++;                                               // ack统计 默认包括自己 所有先加1while (newLeaderProposal.ackCount <= self.quorumPeers.size() / 2) {         // 检查回复是否大于集群总数的一半if (self.tick > self.initLimit) {                                       // 检查tick是否超过限制次数// Followers aren't syncing fast enough,// renounce leadership!shutdown("Waiting for " + (self.quorumPeers.size() / 2)             // 超过限制次数 则停止并返回  并继续进行选举+ " followers, only synced with "+ newLeaderProposal.ackCount);if (followers.size() >= self.quorumPeers.size() / 2) {              LOG.warn("Enough followers present. "+"Perhaps the initTicks need to be increased.");}return;}Thread.sleep(self.tickTime);                                            // 休眠self.tick++;                                                            // 增加tick 值}if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {    // 如果获取类熟属性 如果不是leaderServes则设置zkself.cnxnFactory.setZooKeeperServer(zk);}// Everything is a go, simply start counting the ticks// WARNING: I couldn't find any wait statement on a synchronized// block that would be notified by this notifyAll() call, so// I commented it out//synchronized (this) {//    notifyAll();//}// We ping twice a tick, so we only update the tick every other// iterationboolean tickSkip = true;while (true) {Thread.sleep(self.tickTime / 2);                                        // 休眠一半的tickTime时间if (!tickSkip) { self.tick++;}int syncedCount = 0;// lock on the followers when we use it.synchronized (followers) {                                              // 获取所有的followers并发送synced请求for (FollowerHandler f : followers) {if (f.synced()) {syncedCount++;}f.ping();                                                       // 发送ping请求}}if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {           // 检查是否获得半数以上的回复 如果没有则停止并重新进入选举流程// Lost quorum, shutdownshutdown("Only " + syncedCount + " followers, need "+ (self.quorumPeers.size() / 2));// make sure the order is the same!// the leader goes to lookingreturn;}tickSkip = !tickSkip;}}

lead方法,主要就是先加载会话相关的数据,然后再注册请求过来的调用链处理函数;在完成之后就进入等待,等待followers发来的确认消息,当获得的响应数超过一半时,就跳出等待;然后就定时检查followers的周期是否超时,并且是否存活,定时给followers发送ping消息。

FollowerCnxAcceptor获取followers的响应
class FollowerCnxAcceptor extends Thread{private volatile boolean stop = false;@Overridepublic void run() {try {while (!stop) {                                                         // 检查是否在运行try{Socket s = ss.accept();                                         // 接受follower的连接请求s.setSoTimeout(self.tickTime * self.syncLimit);                 // 设置该连接的过期时间s.setTcpNoDelay(true);                                          // 是否开启TCP_NODELAYnew FollowerHandler(s, Leader.this);                            // 新注册一个FollowerHandler} catch (SocketException e) {if (stop) {LOG.info("exception while shutting down acceptor: "+ e);// When Leader.shutdown() calls ss.close(),// the call to accept throws an exception.// We catch and set stop to true.stop = true;} else {throw e;}}}} catch (Exception e) {LOG.warn("Exception while accepting follower", e);}}public void halt() {stop = true;}}

通过一个线程来完成接受followers的连接,每接受一个连接就初始化一个FollowerHandler,并设置连接的超时时间等条件,并且设置最多网络只有一个未被确认的网络包,依次提高传输效率降低分组的报文个数。

FollowerHandler的处理流程

FollowerHandler类就是处理有关消息的发送的相关具体操作类。

    FollowerHandler(Socket sock, Leader leader) throws IOException {super("FollowerHandler-" + sock.getRemoteSocketAddress()); this.sock = sock;this.leader = leader;leader.addFollowerHandler(this);                                    // 添加到leader的followers列表中start();                                                            // 开启run方法运行}

由于该类继承自线程类,调用start方法就是执行了run函数;

    @Overridepublic void run() {try {ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));                                                // 初始化接入流bufferedOutput = new BufferedOutputStream(sock.getOutputStream());          // 初始化输入流oa = BinaryOutputArchive.getArchive(bufferedOutput);QuorumPacket qp = new QuorumPacket();                                       // 生成一个包ia.readRecord(qp, "packet");                                                // 读取输入数据if (qp.getType() != Leader.LASTZXID) {                                      // 检查类型LOG.error("First packet " + qp.toString()+ " is not LASTZXID!");                                         // 如果不等于最后的事务ID则报错返回return;}long peerLastZxid = qp.getZxid();                                           // 获取事务IDint packetToSend = Leader.SNAP;boolean logTxns = true;long zxidToSend = 0;// we are sending the diffsynchronized(leader.zk.committedLog) {                                      // 如果提交日志的大小不等于0if (leader.zk.committedLog.size() != 0) {if ((leader.zk.maxCommittedLog >= peerLastZxid)                     // 如果当前的最大日志大于接受事务ID&& (leader.zk.minCommittedLog <= peerLastZxid)) {           // 并且当前的最小日志小于接受事务IDpacketToSend = Leader.DIFF;zxidToSend = leader.zk.maxCommittedLog;                         // 发送日志设置成最大日志for (Proposal propose: leader.zk.committedLog) {                // 遍历获取事务日志if (propose.packet.getZxid() > peerLastZxid) {              // 如果获取的日志大于当前接受的事务IDqueuePacket(propose.packet);                            // 将数据发送给followers同步数据QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),null, null);queuePacket(qcommit);                                   // 添加到发送队列中}}}}else {logTxns = false;}            }long leaderLastZxid = leader.startForwarding(this, peerLastZxid);           // 加入到要处理的列表中QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,leaderLastZxid, null, null);                                        // 生成一个新的包oa.writeRecord(newLeaderQP, "packet");                                      // 发送该包bufferedOutput.flush();// a special case when both the ids are the sameif (peerLastZxid == leaderLastZxid) {                                       // 检查事务ID与当前最后的事务ID是否相同packetToSend = Leader.DIFF;                                             // 检查日志是否有不一样的zxidToSend = leaderLastZxid;}//check if we decided to send a diff or we need to send a truncate// we avoid using epochs for truncating because epochs make things// complicated. Two epochs might have the last 32 bits as same.// only if we know that there is a committed zxid in the queue that// is less than the one the peer has we send a trunc else to make// things simple we just send sanpshot.if (logTxns && (peerLastZxid > leader.zk.maxCommittedLog)) {// this is the only case that we are sure that// we can ask the follower to truncate the logpacketToSend = Leader.TRUNC;                                           // 截断日志zxidToSend = leader.zk.maxCommittedLog;}oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");       // 写入新的包确定了类型与ID值bufferedOutput.flush();// only if we are not truncating or fast sycningif (packetToSend == Leader.SNAP) {                                          // 如果数据没有改变LOG.warn("Sending snapshot last zxid of peer is 0x"+ Long.toHexString(peerLastZxid) + " " + " zxid of leader is 0x"+ Long.toHexString(leaderLastZxid));// Dump data to followerleader.zk.serializeSnapshot(oa);                                        // 将序列化快照发送给followeroa.writeString("BenWasHere", "signature"); }bufferedOutput.flush();//// Mutation packets will be queued during the serialize,// so we need to mark when the follower can actually start// using the data//queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));            // 添加到队列中// Start sending packetsnew Thread() {public void run() {Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress());try {sendPackets();                                                  // 启动线程发送数据} catch (InterruptedException e) {LOG.warn("Interrupted",e);}}}.start();while (true) {qp = new QuorumPacket();                                                // 生成一个包ia.readRecord(qp, "packet");                                            // 读包的数据long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;if (qp.getType() == Leader.PING) {traceMask = ZooTrace.SERVER_PING_TRACE_MASK;}ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);tickOfLastAck = leader.self.tick;ByteBuffer bb;long sessionId;int cxid;int type;switch (qp.getType()) {                                                 // 获取读入包的类型case Leader.ACK:leader.processAck(qp.getZxid(), sock.getLocalSocketAddress());      // 确认获取了ACK信息break;case Leader.PING:// Process the touchesByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());DataInputStream dis = new DataInputStream(bis);                     // 处理ping类型消息while (dis.available() > 0) {long sess = dis.readLong();int to = dis.readInt();leader.zk.touch(sess, to);                                      // 获取sess值更新seesion}break;case Leader.REVALIDATE:bis = new ByteArrayInputStream(qp.getData());                       // 验证session是否存活dis = new DataInputStream(bis);long id = dis.readLong();int to = dis.readInt();ByteArrayOutputStream bos = new ByteArrayOutputStream();DataOutputStream dos = new DataOutputStream(bos);dos.writeLong(id);boolean valid = leader.zk.touch(id, to);ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,"Session 0x" + Long.toHexString(id)+ " is valid: "+ valid);dos.writeBoolean(valid);qp.setData(bos.toByteArray());queuedPackets.add(qp);break;case Leader.REQUEST:bb = ByteBuffer.wrap(qp.getData());                                 // 处理请求sessionId = bb.getLong();cxid = bb.getInt();type = bb.getInt();bb = bb.slice();if(type == OpCode.sync){leader.zk.submitRequest(new FollowerSyncRequest(this, sessionId, cxid, type, bb,qp.getAuthinfo()));                                     // 如果是同步则提交请求到同步请求} else {leader.zk.submitRequest(null, sessionId, type, cxid, bb,qp.getAuthinfo());                                          // 否则直接提交数据去处理}break;default:}}} catch (IOException e) {if (sock != null && !sock.isClosed()) {LOG.error("FIXMSG",e);}} catch (InterruptedException e) {LOG.error("FIXMSG",e);} finally {LOG.warn("******* GOODBYE " + (sock != null ? sock.getRemoteSocketAddress() : "<null>")         // 打印信息+ " ********");// Send the packet of deathtry {queuedPackets.put(proposalOfDeath);                                     // 关闭发送的线程} catch (InterruptedException e) {LOG.error("FIXMSG",e);}shutdown();                                                                 // 重置并移除在leader中的该handler}}public void shutdown() {try {if (sock != null && !sock.isClosed()) {                                     // 检查sock是否关闭 如果没关则关闭sock.close();}} catch (IOException e) {LOG.error("FIXMSG",e);}leader.removeFollowerHandler(this);                                             // 移除该handler           }

run函数主要就是先同步数据,检查获取从的包的事务ID如果ID不同则将当前主的数据同步发送给从,主要完成了数据同步的工作,在检查完成之后,就会启动一个单独的线程去发送数据给从,并且主会监听从发送过来的请求并将该请求处理。从这段执行流程也可知followe会转发客户端的请求到主上面来,全局只有主来处理客户端的数据请求。

    private void sendPackets() throws InterruptedException {long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;while (true) {QuorumPacket p;p = queuedPackets.take();                                   // 获取队列中的数据if (p == proposalOfDeath) {                                 // 如果要停止则停止循环// Packet of death!break;}if (p.getType() == Leader.PING) {                           // 获取待发送消息类型traceMask = ZooTrace.SERVER_PING_TRACE_MASK;}ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);try {oa.writeRecord(p, "packet");                           // 发送该消息bufferedOutput.flush();} catch (IOException e) {if (!sock.isClosed()) {LOG.warn("Unexpected exception",e);}break;}}}

启动之后就又一个单独的线程专门监听发送队列并从该队列中取数据发送给从。至此,Leader角色的主要的流程基本执行完成。

总结

本文主要是分析了Leader角色的启动流程,主要就是先恢复重建本地的日志和事物数据,然后接受从的请求,并比较从的数据是否和主数据一致,如果不一致则从主中发送数据给从达到数据同步。然后再监听从的响应请求并处理,其中包括如果从接受的客户端的请求会转发给主处理,基本的处理流程就是这样。由于本人才疏学浅,如有错误请批评指正。

Zookeeper源码分析:Leader角色初始化相关推荐

  1. zookeeper源码分析之五服务端(集群leader)处理请求流程

    leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcesso ...

  2. zookeeper源码分析之恢复事务日志

    zookeeper源码分析之恢复事务日志 前言 源码分析 查看事务日志命令 总结 前言 本文是基于zookeeper集群启动过程分析(https://blog.csdn.net/weixin_4244 ...

  3. zookeeper源码分析之四服务端(单机)处理请求流程

    上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析 ...

  4. zookeeper源码分析之三客户端发送请求流程

    znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的 ...

  5. Spring IOC 容器源码分析 - 余下的初始化工作

    1. 简介 本篇文章是"Spring IOC 容器源码分析"系列文章的最后一篇文章,本篇文章所分析的对象是 initializeBean 方法,该方法用于对已完成属性填充的 bea ...

  6. Zookeeper源码分析(二) ----- zookeeper日志

    zookeeper源码分析系列文章: Zookeeper源码分析(一) ----- 源码运行环境搭建 原创博客,纯手敲,转载请注明出处,谢谢! 既然我们是要学习源码,那么如何高效地学习源代码呢?答案就 ...

  7. linux源码分析之cpu初始化 kernel/head.s,linux源码分析之cpu初始化

    linux源码分析之cpu初始化 kernel/head.s 收藏 来自:http://blog.csdn.net/BoySKung/archive/2008/12/09/3486026.aspx l ...

  8. celery源码分析-Task的初始化与发送任务

    celery源码分析 本文环境python3.5.2,celery4.0.2,django1.10.x系列 celery的任务发送 在Django项目中使用了装饰器来包装待执行任务, from cel ...

  9. nginx源码分析之网络初始化

    nginx作为一个高性能的HTTP服务器,网络的处理是其核心,了解网络的初始化有助于加深对nginx网络处理的了解,本文主要通过nginx的源代码来分析其网络初始化. 从配置文件中读取初始化信息 与网 ...

最新文章

  1. GetLogicalDriveStringS获取驱动器根路径
  2. 关于Java中next() nextLine()的区别
  3. 科技馆游记(精华)-------- double篇
  4. 8个你应该了解的正则表达式
  5. H.263 H.263+ Payload Type
  6. Web框架——Flask系列之WTF表单验证练习(七)
  7. html网页放大时文字不换行_WEB前端-html基础
  8. eos 编译笔记(注意点)
  9. python整体设计目标_Python 入門語法和類型(学习)
  10. centos6/7 yum安装mysql客户端和rpm包方式安装方式
  11. Java面试之synchronized和Lock有什么区别?
  12. 关于用户自定义控件与引用该控件的页面之间的javascript脚本冲突
  13. 蔚来汽车提交IPO招股书:三年亏百亿,腾讯为大股东
  14. [剑指offer] 旋转数组的最小数字
  15. Reporting Service RDLC 数据换行解决方案
  16. 算法导论答案 16.2-4
  17. 人人都可以做深度学习应用:入门篇(下)
  18. 【CF633H】Fibonacci-ish II 莫队+线段树
  19. 关于Tungsten Fabic版本问题,这一篇文章说清了
  20. 11-OAuth2.0实战:网关层统一认证授权

热门文章

  1. 推荐 6 个好用到爆的 Pycharm 插件
  2. 5GtoB即将迎来规模商用,如何共创行业新价值?
  3. 微软推出“ Group Transcribe”应用,多人多语言会议实时高准确度文字转录并翻译
  4. 5年5亿美金,华为昇腾如何构建全行业AI生态?
  5. 必读!53个Python经典面试题详解
  6. 深度残差收缩网络:借助注意力机制实现特征的软阈值化
  7. 如何优雅地使用pdpipe与Pandas构建管道?
  8. 微软开源的自动机器学习工具上新了:NNI概览及新功能详解
  9. KDD 2019高维稀疏数据上的深度学习Workshop论文汇总
  10. 自动驾驶人的福音!Lyft公开Level 5部署平台Flexo细节