在zookeeper的第三篇有个流程图,不知道是否还有印象。

这个图在刚开始是非常抽象的。现在就可以对照这个图来说一下源码来的流程了。
因为上一篇已经讲解了zookeeper怎么选取leader的,所以这里主要是来看看选出来之后的流程了。
所以我们还是要定位到这个方法中。org.apache.zookeeper.server.quorum.QuorumPeer#run。

    public void run() {setName("QuorumPeer" + "[myid=" + getId() + "]" +cnxnFactory.getLocalAddress());System.out.println("当前线程:" + Thread.currentThread().getName());LOG.debug("Starting quorum peer");//这个try的内容看不懂要干啥?try {jmxQuorumBean = new QuorumBean(this);MBeanRegistry.getInstance().register(jmxQuorumBean, null);for(QuorumServer s: getView().values()){ZKMBeanInfo p;if (getId() == s.id) {p = jmxLocalPeerBean = new LocalPeerBean(this);try {MBeanRegistry.getInstance().register(p, jmxQuorumBean);} catch (Exception e) {LOG.warn("Failed to register with JMX", e);jmxLocalPeerBean = null;}} else {p = new RemotePeerBean(s);try {MBeanRegistry.getInstance().register(p, jmxQuorumBean);} catch (Exception e) {LOG.warn("Failed to register with JMX", e);}}}} catch (Exception e) {LOG.warn("Failed to register with JMX", e);jmxQuorumBean = null;}try {/** Main loop*///这里我有一个误区,我以为break是直接跳出了while循环,但实际是只是跳出了switch,还是基础不够扎实。break不但用于跳出循环,也用于跳出switch。//测试文件:com.xq.test.TestWhilewhile (running) {switch (getPeerState()) {case LOOKING:LOG.info("LOOKING");if (Boolean.getBoolean("readonlymode.enabled")) {//如果开启了只读LOG.info("Attempting to start ReadOnlyZooKeeperServer");// Create read-only server but don't start it immediatelyfinal ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this,new ZooKeeperServer.BasicDataTreeBuilder(),this.zkDb);// Instead of starting roZk immediately, wait some grace// period before we decide we're partitioned.//// Thread is used here because otherwise it would require// changes in each of election strategy classes which is// unnecessary code coupling.Thread roZkMgr = new Thread() {public void run() {try {// lower-bound grace period to 2 secssleep(Math.max(2000, tickTime));if (ServerState.LOOKING.equals(getPeerState())) {roZk.startup();}} catch (InterruptedException e) {LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");} catch (Exception e) {LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);}}};try {roZkMgr.start();setBCVote(null);setCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {LOG.warn("Unexpected exception",e);setPeerState(ServerState.LOOKING);} finally {// If the thread is in the the grace period, interrupt// to come out of waiting.roZkMgr.interrupt();roZk.shutdown();}} else {//没有开启只读try {setBCVote(null);//不断更新投票,直到选出leadersetCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {LOG.warn("Unexpected exception", e);setPeerState(ServerState.LOOKING);}}break;case OBSERVING:try {LOG.info("OBSERVING");setObserver(makeObserver(logFactory));observer.observeLeader();} catch (Exception e) {LOG.warn("Unexpected exception",e );                        } finally {observer.shutdown();setObserver(null);setPeerState(ServerState.LOOKING);}break;case FOLLOWING:try {LOG.info("FOLLOWING");//产出一个适合follower的类setFollower(makeFollower(logFactory));follower.followLeader();} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {follower.shutdown();setFollower(null);setPeerState(ServerState.LOOKING);}break;case LEADING:LOG.info("LEADING");try {//产出一个适合leader的类setLeader(makeLeader(logFactory));//这个只是选择过半能接受的epoch,假设有更新的zxid没有启动//在这个确定epoch的时候它启动了,那么这个leader会直接丢失多的数据吗?看代码好像是的leader.lead();setLeader(null);} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {if (leader != null) {leader.shutdown("Forcing shutdown");setLeader(null);}setPeerState(ServerState.LOOKING);}break;}}} finally {LOG.warn("QuorumPeer main thread exited");try {MBeanRegistry.getInstance().unregisterAll();} catch (Exception e) {LOG.warn("Failed to unregister with JMX", e);}jmxQuorumBean = null;jmxLocalPeerBean = null;}}

这段代码肯定很熟悉。但是上一篇仅仅是为了讲解选leader的过程。这段代码中的角色分配完之后的角色代码讲解还并没有说。简单的说,我们主要关注那四个状态:LOOKING, FOLLOWING, LEADING, OBSERVING。这四个状态的含义应该都能了解。不懂就百度一下。上一篇主要说了LOOKING状态所做的事情,接下来就从源码分析一下其它三个状态的初始化。所以我们这么长的代码的关键部分:

            while (running) {switch (getPeerState()) {case LOOKING:LOG.info("LOOKING");if (Boolean.getBoolean("readonlymode.enabled")) {//如果开启了只读LOG.info("Attempting to start ReadOnlyZooKeeperServer");// Create read-only server but don't start it immediatelyfinal ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this,new ZooKeeperServer.BasicDataTreeBuilder(),this.zkDb);// Instead of starting roZk immediately, wait some grace// period before we decide we're partitioned.//// Thread is used here because otherwise it would require// changes in each of election strategy classes which is// unnecessary code coupling.Thread roZkMgr = new Thread() {public void run() {try {// lower-bound grace period to 2 secssleep(Math.max(2000, tickTime));if (ServerState.LOOKING.equals(getPeerState())) {roZk.startup();}} catch (InterruptedException e) {LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");} catch (Exception e) {LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);}}};try {roZkMgr.start();setBCVote(null);setCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {LOG.warn("Unexpected exception",e);setPeerState(ServerState.LOOKING);} finally {// If the thread is in the the grace period, interrupt// to come out of waiting.roZkMgr.interrupt();roZk.shutdown();}} else {//没有开启只读try {setBCVote(null);//不断更新投票,直到选出leadersetCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {LOG.warn("Unexpected exception", e);setPeerState(ServerState.LOOKING);}}break;case OBSERVING:try {LOG.info("OBSERVING");setObserver(makeObserver(logFactory));observer.observeLeader();} catch (Exception e) {LOG.warn("Unexpected exception",e );                        } finally {observer.shutdown();setObserver(null);setPeerState(ServerState.LOOKING);}break;case FOLLOWING:try {LOG.info("FOLLOWING");//产出一个适合follower的类setFollower(makeFollower(logFactory));follower.followLeader();} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {follower.shutdown();setFollower(null);setPeerState(ServerState.LOOKING);}break;case LEADING:LOG.info("LEADING");try {//产出一个适合leader的类setLeader(makeLeader(logFactory));//这个只是选择过半能接受的epoch,假设有更新的zxid没有启动//在这个确定epoch的时候它启动了,那么这个leader会直接丢失多的数据吗?看代码好像是的leader.lead();setLeader(null);} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {if (leader != null) {leader.shutdown("Forcing shutdown");setLeader(null);}setPeerState(ServerState.LOOKING);}break;}}

我们先从最常规的LEADING状态说起。

                case LEADING:LOG.info("LEADING");try {//产出一个适合leader的类setLeader(makeLeader(logFactory));//这个只是选择过半能接受的epoch,假设有更新的zxid没有启动//在这个确定epoch的时候它启动了,那么这个leader会直接丢失多的数据吗?看代码好像是的leader.lead();setLeader(null);} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {if (leader != null) {leader.shutdown("Forcing shutdown");setLeader(null);}setPeerState(ServerState.LOOKING);}break;

它的主要方法就是leader.lead()方法了,其它方法太简单,也不重要,没有说的必要。下面来看看这个方法:org.apache.zookeeper.server.quorum.Leader#lead。

    void lead() throws IOException, InterruptedException {self.end_fle = Time.currentElapsedTime();long electionTimeTaken = self.end_fle - self.start_fle;self.setElectionTimeTaken(electionTimeTaken);LOG.info("LEADING - LEADER ELECTION TOOK - {}", electionTimeTaken);self.start_fle = 0;self.end_fle = 0;zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);try {self.tick.set(0);zk.loadData();//再加载一次?要做什么?leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());// Start thread that waits for connection requests from // new followers.//开个线程等follower连接进来cnxAcceptor = new LearnerCnxAcceptor();cnxAcceptor.start();readyToStart = true;//获取最新的epoch,也是过半机制,选择过半中最大的那一个long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());//设置zk.setZxid(ZxidUtils.makeZxid(epoch, 0));synchronized(this){lastProposed = zk.getZxid();}//构建发送包向各个follower发送数据newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),null, null);//获取低32位,即zxidif ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {LOG.info("NEWLEADER proposal has Zxid of "+ Long.toHexString(newLeaderProposal.packet.getZxid()));}//等待follower确认epoch,依然是过半验证waitForEpochAck(self.getId(), leaderStateSummary);self.setCurrentEpoch(epoch);// We have to get at least a majority of servers in sync with// us. We do this by waiting for the NEWLEADER packet to get// acknowledgedtry {//虽然有英文注释,但还是不很懂,反正是要接收到过半的确认。好像是为了确认zxid也是一致的waitForNewLeaderAck(self.getId(), zk.getZxid());} catch (InterruptedException e) {shutdown("Waiting for a quorum of followers, only synced with sids: [ "+ getSidSetString(newLeaderProposal.ackSet) + " ]");HashSet<Long> followerSet = new HashSet<Long>();for (LearnerHandler f : learners)followerSet.add(f.getSid());if (self.getQuorumVerifier().containsQuorum(followerSet)) {LOG.warn("Enough followers present. "+ "Perhaps the initTicks need to be increased.");}Thread.sleep(self.tickTime);self.tick.incrementAndGet();return;}//然后启动,跟单机类似startZkServer();/*** WARNING: do not use this for anything other than QA testing* on a real cluster. Specifically to enable verification that quorum* can handle the lower 32bit roll-over issue identified in* ZOOKEEPER-1277. Without this option it would take a very long* time (on order of a month say) to see the 4 billion writes* necessary to cause the roll-over to occur.* * This field allows you to override the zxid of the server. Typically* you'll want to set it to something like 0xfffffff0 and then* start the quorum, run some operations and see the re-election.*/String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");if (initialZxid != null) {long zxid = Long.parseLong(initialZxid);zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);}if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {self.cnxnFactory.setZooKeeperServer(zk);}// Everything is a go, simply start counting the ticks// WARNING: I couldn't find any wait statement on a synchronized// block that would be notified by this notifyAll() call, so// I commented it out//synchronized (this) {//    notifyAll();//}// We ping twice a tick, so we only update the tick every other// iterationboolean tickSkip = true;while (true) {Thread.sleep(self.tickTime / 2);if (!tickSkip) {self.tick.incrementAndGet();}HashSet<Long> syncedSet = new HashSet<Long>();// lock on the followers when we use it.syncedSet.add(self.getId());for (LearnerHandler f : getLearners()) {// Synced set is used to check we have a supporting quorum, so only// PARTICIPANT, not OBSERVER, learners should be usedif (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {syncedSet.add(f.getSid());}f.ping();}// check leader running statusif (!this.isRunning()) {shutdown("Unexpected internal error");return;}if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {//if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {// Lost quorum, shutdownshutdown("Not sufficient followers synced, only synced with sids: [ "+ getSidSetString(syncedSet) + " ]");// make sure the order is the same!// the leader goes to lookingreturn;} tickSkip = !tickSkip;}} finally {zk.unregisterJMX(this);}}

是不是被这段代码吓到了。其实也可以理解,因为作为leader肯定要比follower干更多的事。具体干的些什么。按照一般人的想法想一想,假如你是leader的话,你需要带好一个团队,应该怎么做?当然每个人想法不同,所以肯定答案各式各样。但是zookeeper作为leader的话也是要尽可能完成应付的责任。
1.首先要做的是加载一遍本地的事务和快照(之前加载过,不是很明白):zk.loadData()
2.另起一个线程去接收来自follower的连接。org.apache.zookeeper.server.quorum.Leader.LearnerCnxAcceptor#run

        public void run() {try {while (!stop) {try{Socket s = ss.accept();// start with the initLimit, once the ack is processed// in LearnerHandler switch to the syncLimits.setSoTimeout(self.tickTime * self.initLimit);s.setTcpNoDelay(nodelay);BufferedInputStream is = new BufferedInputStream(s.getInputStream());LearnerHandler fh = new LearnerHandler(s, is, Leader.this);fh.start();} catch (SocketException e) {if (stop) {LOG.info("exception while shutting down acceptor: "+ e);// When Leader.shutdown() calls ss.close(),// the call to accept throws an exception.// We catch and set stop to true.stop = true;} else {throw e;}} catch (SaslException e){LOG.error("Exception while connecting to quorum learner", e);}}} catch (Exception e) {LOG.warn("Exception while accepting follower", e);}}

我们粗略的看下这段代码:首先监听了一个Socket,当读到有个新的请求过来就特意开一个线程来进行交互,我们看这个线程的名字:LearnerHandler。顾名思义。就是用于学习处理leader的请求的。那么可以想一想:一个learner过来就开一个线程,这个线程的作用是什么呢?作用有多又大了。来先看run的代码:org.apache.zookeeper.server.quorum.LearnerHandler#run

    @Overridepublic void run() {try {leader.addLearnerHandler(this);tickOfNextAckDeadline = leader.self.tick.get()+ leader.self.initLimit + leader.self.syncLimit;ia = BinaryInputArchive.getArchive(bufferedInput);bufferedOutput = new BufferedOutputStream(sock.getOutputStream());oa = BinaryOutputArchive.getArchive(bufferedOutput);QuorumPacket qp = new QuorumPacket();ia.readRecord(qp, "packet");if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){LOG.error("First packet " + qp.toString()+ " is not FOLLOWERINFO or OBSERVERINFO!");return;}byte learnerInfoData[] = qp.getData();if (learnerInfoData != null) {if (learnerInfoData.length == 8) {ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);this.sid = bbsid.getLong();} else {LearnerInfo li = new LearnerInfo();ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);this.sid = li.getServerid();this.version = li.getProtocolVersion();}} else {this.sid = leader.followerCounter.getAndDecrement();}LOG.info("Follower sid: " + sid + " : info : "+ leader.self.quorumPeers.get(sid));if (qp.getType() == Leader.OBSERVERINFO) {learnerType = LearnerType.OBSERVER;}            long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());long peerLastZxid;StateSummary ss = null;long zxid = qp.getZxid();long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);if (this.getVersion() < 0x10000) {// we are going to have to extrapolate the epoch informationlong epoch = ZxidUtils.getEpochFromZxid(zxid);ss = new StateSummary(epoch, zxid);// fake the messageleader.waitForEpochAck(this.getSid(), ss);} else {byte ver[] = new byte[4];ByteBuffer.wrap(ver).putInt(0x10000);QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);oa.writeRecord(newEpochPacket, "packet");bufferedOutput.flush();QuorumPacket ackEpochPacket = new QuorumPacket();ia.readRecord(ackEpochPacket, "packet");if (ackEpochPacket.getType() != Leader.ACKEPOCH) {LOG.error(ackEpochPacket.toString()+ " is not ACKEPOCH");return;}ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());leader.waitForEpochAck(this.getSid(), ss);}peerLastZxid = ss.getLastZxid();/* the default to send to the follower */int packetToSend = Leader.SNAP;long zxidToSend = 0;long leaderLastZxid = 0;/** the packets that the follower needs to get updates from **/long updates = peerLastZxid;/* we are sending the diff check if we have proposals in memory to be able to * send a diff to the */ ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();ReadLock rl = lock.readLock();try {//数据同步代码rl.lock();        final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();LOG.info("Synchronizing with Follower sid: " + sid+" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)+" minCommittedLog=0x"+Long.toHexString(minCommittedLog)+" peerLastZxid=0x"+Long.toHexString(peerLastZxid));LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {// Follower is already sync with us, send empty diffLOG.info("leader and follower are in sync, zxid=0x{}",Long.toHexString(peerLastZxid));packetToSend = Leader.DIFF;zxidToSend = peerLastZxid;} else if (proposals.size() != 0) {LOG.debug("proposal size is {}", proposals.size());if ((maxCommittedLog >= peerLastZxid)&& (minCommittedLog <= peerLastZxid)) {LOG.debug("Sending proposals to follower");// as we look through proposals, this variable keeps track of previous// proposal Id.long prevProposalZxid = minCommittedLog;// Keep track of whether we are about to send the first packet.// Before sending the first packet, we have to tell the learner// whether to expect a trunc or a diffboolean firstPacket=true;// If we are here, we can use committedLog to sync with// follower. Then we only need to decide whether to// send trunc or notpacketToSend = Leader.DIFF;zxidToSend = maxCommittedLog;for (Proposal propose: proposals) {// skip the proposals the peer already hasif (propose.packet.getZxid() <= peerLastZxid) {prevProposalZxid = propose.packet.getZxid();continue;} else {// If we are sending the first packet, figure out whether to trunc// in case the follower has some proposals that the leader doesn'tif (firstPacket) {firstPacket = false;// Does the peer have some proposals that the leader hasn't seen yetif (prevProposalZxid < peerLastZxid) {// send a trunc message before sending the diffpacketToSend = Leader.TRUNC;                                        zxidToSend = prevProposalZxid;updates = zxidToSend;}}queuePacket(propose.packet);QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),null, null);queuePacket(qcommit);}}} else if (peerLastZxid > maxCommittedLog) {LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",Long.toHexString(maxCommittedLog),Long.toHexString(updates));packetToSend = Leader.TRUNC;zxidToSend = maxCommittedLog;updates = zxidToSend;} else {LOG.warn("Unhandled proposal scenario");}} else {// just let the state transfer happenLOG.debug("proposals is empty");}               LOG.info("Sending " + Leader.getPacketType(packetToSend));leaderLastZxid = leader.startForwarding(this, updates);} finally {rl.unlock();}QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,ZxidUtils.makeZxid(newEpoch, 0), null, null);if (getVersion() < 0x10000) {oa.writeRecord(newLeaderQP, "packet");} else {queuedPackets.add(newLeaderQP);}bufferedOutput.flush();//Need to set the zxidToSend to the latest zxidif (packetToSend == Leader.SNAP) {zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();}oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");bufferedOutput.flush();/* if we are not truncating or sending a diff just send a snapshot */if (packetToSend == Leader.SNAP) {LOG.info("Sending snapshot last zxid of peer is 0x"+ Long.toHexString(peerLastZxid) + " " + " zxid of leader is 0x"+ Long.toHexString(leaderLastZxid)+ "sent zxid of db as 0x" + Long.toHexString(zxidToSend));// Dump data to peerleader.zk.getZKDatabase().serializeSnapshot(oa);oa.writeString("BenWasHere", "signature");}bufferedOutput.flush();// Start sending packetsnew Thread() {public void run() {Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress());try {sendPackets();} catch (InterruptedException e) {LOG.warn("Unexpected interruption",e);}}}.start();/** Have to wait for the first ACK, wait until * the leader is ready, and only then we can* start processing messages.*/qp = new QuorumPacket();ia.readRecord(qp, "packet");if(qp.getType() != Leader.ACK){LOG.error("Next packet was supposed to be an ACK");return;}LOG.info("Received NEWLEADER-ACK message from " + getSid());leader.waitForNewLeaderAck(getSid(), qp.getZxid());syncLimitCheck.start();// now that the ack has been processed expect the syncLimitsock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);/** Wait until leader starts up*/synchronized(leader.zk){while(!leader.zk.isRunning() && !this.isInterrupted()){leader.zk.wait(20);}}// Mutation packets will be queued during the serialize,// so we need to mark when the peer can actually start// using the data//queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));while (true) {qp = new QuorumPacket();ia.readRecord(qp, "packet");long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;if (qp.getType() == Leader.PING) {traceMask = ZooTrace.SERVER_PING_TRACE_MASK;}if (LOG.isTraceEnabled()) {ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);}tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;ByteBuffer bb;long sessionId;int cxid;int type;switch (qp.getType()) {case Leader.ACK:if (this.learnerType == LearnerType.OBSERVER) {if (LOG.isDebugEnabled()) {LOG.debug("Received ACK from Observer  " + this.sid);}}syncLimitCheck.updateAck(qp.getZxid());leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());break;case Leader.PING:// Process the touchesByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());DataInputStream dis = new DataInputStream(bis);while (dis.available() > 0) {long sess = dis.readLong();int to = dis.readInt();leader.zk.touch(sess, to);}break;case Leader.REVALIDATE:bis = new ByteArrayInputStream(qp.getData());dis = new DataInputStream(bis);long id = dis.readLong();int to = dis.readInt();ByteArrayOutputStream bos = new ByteArrayOutputStream();DataOutputStream dos = new DataOutputStream(bos);dos.writeLong(id);boolean valid = leader.zk.touch(id, to);if (valid) {try {//set the session owner// as the follower that// owns the sessionleader.zk.setOwner(id, this);} catch (SessionExpiredException e) {LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e);}}if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,"Session 0x" + Long.toHexString(id)+ " is valid: "+ valid);}dos.writeBoolean(valid);qp.setData(bos.toByteArray());queuedPackets.add(qp);break;case Leader.REQUEST:                    bb = ByteBuffer.wrap(qp.getData());sessionId = bb.getLong();cxid = bb.getInt();type = bb.getInt();bb = bb.slice();Request si;if(type == OpCode.sync){si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());} else {si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());}si.setOwner(this);leader.zk.submitRequest(si);break;default:LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));break;}}} catch (IOException e) {if (sock != null && !sock.isClosed()) {LOG.error("Unexpected exception causing shutdown while sock "+ "still open", e);//close the socket to make sure the //other side can see it being closetry {sock.close();} catch(IOException ie) {// do nothing}}} catch (InterruptedException e) {LOG.error("Unexpected exception causing shutdown", e);} finally {LOG.warn("******* GOODBYE " + (sock != null ? sock.getRemoteSocketAddress() : "<null>")+ " ********");shutdown();}}

我们从这里开始说起:

long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());

这里的前半部分都是用于验证和辨别的,真正需要做的事要从这开始说起。
我们还是回到这里。这里拿到的是learner的epoch。为什么这么说呢?我们先看代码qp.getZxid(),这个qp是从哪里来的?这前面定义出的qp,并且这个qp就是learner发过来的。怎么证明呢?往下看。
我们继续看代码。qp从zxid中取到了epoch?好了,接下来就是证明了。
qp是learner发的包?
我们通过调试看到了qp的数据:

这里有个type=11.一看这个type肯定就是类型标记。
最后在org.apache.zookeeper.server.quorum.Leader中找到了对应的意思:

    /*** This message type is sent by a follower to pass the last zxid. This is here* for backward compatibility purposes.*/final static int FOLLOWERINFO = 11;

我们看字面意思FOLLOWERINFO 。即follower的信息。再不行再来理解一下上面注释的含义:

其它看不懂就算了,红框总能懂吧。所以拿到的就是learner最新的zxid。
zxid能取到epoch?
这个就算是常规理论。zxid由64位组成。高32位是epoch,低32位是事务id。

 static public long getEpochFromZxid(long zxid) {return zxid >> 32L;}

我们现在回过去看文章开头的图片。是不是能深刻理解FOLLOWERINFO 了。
我们拿它的epoch干什么呢?
其实就是为了选取一个合适的epoch,因为即使是leader也无法保证自己的epoch就是最新的。所以我们看这段代码:

long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);

这段代码就是提议选取届号(由leader发起)。
org.apache.zookeeper.server.quorum.Leader#getEpochToPropose

  public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {synchronized(connectingFollowers) {if (!waitingForNewEpoch) {return epoch;}if (lastAcceptedEpoch >= epoch) {epoch = lastAcceptedEpoch+1;}if (isParticipant(sid)) {connectingFollowers.add(sid);}//构建集群验证器,验证过半机制QuorumVerifier verifier = self.getQuorumVerifier();if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers)) {waitingForNewEpoch = false;self.setAcceptedEpoch(epoch);connectingFollowers.notifyAll();} else {long start = Time.currentElapsedTime();long cur = start;long end = start + self.getInitLimit()*self.getTickTime();while(waitingForNewEpoch && cur < end) {connectingFollowers.wait(end - cur);cur = Time.currentElapsedTime();}if (waitingForNewEpoch) {throw new InterruptedException("Timeout while waiting for epoch from quorum");        }}return epoch;}}

这段代码看着有点多其实主要就是做一件事:就是过半的机器参与了选举epoch,并在过半的中挑出最大的epoch+1作为最新的epoch。

然后一旦统计出来就构造一个LEADERINFO发送给各个learner。

QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);

目的就是告诉各个learner我已经选出最新的epoch了,你们都去更新一下,以后就以这个epoch开始。

然后嘛!就是等待回应了。

                QuorumPacket ackEpochPacket = new QuorumPacket();ia.readRecord(ackEpochPacket, "packet");if (ackEpochPacket.getType() != Leader.ACKEPOCH) {LOG.error(ackEpochPacket.toString()+ " is not ACKEPOCH");return;}ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());leader.waitForEpochAck(this.getSid(), ss);

这个回应就是由learner构造并发送的数据,由leader接收。这下基本对应上了开始图中最基本的届号的选举了。说实在话。这个流程要两边对比着看,因为这里存在很大的交互性,单边解释太难理解了。对比于leader。follower做的事情就简单许多了。我们到目前为止应该就存在这几个问题。
1.follower什么时候去主动连接leader。
2.follower什么时候构造FOLLOWERINFO 数据包发给leader。
3.follower构造ACKEPOCH发给leader的时机。
因为本篇交互是分开讲的,所以初次看会很懵,建议对比源码看一下。会对多线程有很好的帮助。还有就是篇幅太长,OBSERVING就不说了,只说follower,因为follower与observer很类似。

                case FOLLOWING:try {LOG.info("FOLLOWING");//产出一个适合follower的类setFollower(makeFollower(logFactory));follower.followLeader();} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {follower.shutdown();setFollower(null);setPeerState(ServerState.LOOKING);}break;

我们还是看看关键代码部分:follower.followLeader();
即org.apache.zookeeper.server.quorum.Follower#followLeader。

    void followLeader() throws InterruptedException {self.end_fle = Time.currentElapsedTime();long electionTimeTaken = self.end_fle - self.start_fle;self.setElectionTimeTaken(electionTimeTaken);//耗时计算LOG.info("FOLLOWING - LEADER ELECTION TOOK - {}", electionTimeTaken);self.start_fle = 0;self.end_fle = 0;fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);try {//拿出leader的服务器地址QuorumServer leaderServer = findLeader();            try {//连接leaderconnectToLeader(leaderServer.addr, leaderServer.hostname);long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);//总逻辑就是拿到最新的zxid,并且将epoch,zxid等信息给到leader。//check to see if the leader zxid is lower than ours//this should never happen but is just a safety checklong newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);if (newEpoch < self.getAcceptedEpoch()) {//检查leader的epoch是不是更小,一般不可能发生,但是为了安全再检查一遍LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)+ " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));throw new IOException("Error: Epoch of leader is lower");}syncWithLeader(newEpochZxid);//与leader同步,通过快照和zxid来同步。并且开始做请求处理链QuorumPacket qp = new QuorumPacket();//同步完了之后会一直陷入这个循环出不来了while (this.isRunning()) {readPacket(qp);processPacket(qp);}} catch (Exception e) {LOG.warn("Exception when following the leader", e);try {sock.close();} catch (IOException e1) {e1.printStackTrace();}// clear pending revalidationspendingRevalidations.clear();}} finally {zk.unregisterJMX((Learner)this);}}

这个其实对照前面3个问题对照就没啥问题了。不过还是来一一说一下:
1.follower什么时候去主动连接leader。

connectToLeader(leaderServer.addr, leaderServer.hostname);

这里就是造出这个类后就立马去连接了。

2.follower什么时候构造FOLLOWERINFO 数据包发给leader。

long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);//总逻辑就是拿到最新的zxid,并且将epoch,zxid等信息给到leader。

我们看看registerWithLeader:org.apache.zookeeper.server.quorum.Learner#registerWithLeader

   protected long registerWithLeader(int pktType) throws IOException{/** Send follower info, including last zxid and sid*///拿到本机最新的zxidlong lastLoggedZxid = self.getLastLoggedZxid();//构造要发送的结构数据。QuorumPacket qp = new QuorumPacket();                qp.setType(pktType);//这里不是相当于拿到这个epoch的起始位置吗?这里的确是的qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));/** Add sid to payload*///构造leader信息数据?self.getId()拿到的是myid(也可以认为是sid)LearnerInfo li = new LearnerInfo(self.getId(), 0x10000);ByteArrayOutputStream bsid = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);//作为一个记录保存到本机boa.writeRecord(li, "LearnerInfo");//空的?这一步干啥的?qp.setData(bsid.toByteArray());//就是序列化,然后发送出去?这里发送的什么数据?writePacket(qp, true);readPacket(qp);//这部分是干啥?反正是反序列化final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());//从leader那最新的获取最新的epochif (qp.getType() == Leader.LEADERINFO) {// we are connected to a 1.0 server so accept the new epoch and read the next packetleaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();byte epochBytes[] = new byte[4];final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);if (newEpoch > self.getAcceptedEpoch()) {wrappedEpochBytes.putInt((int)self.getCurrentEpoch());self.setAcceptedEpoch(newEpoch);} else if (newEpoch == self.getAcceptedEpoch()) {// since we have already acked an epoch equal to the leaders, we cannot ack// again, but we still need to send our lastZxid to the leader so that we can// sync with it if it does assume leadership of the epoch.// the -1 indicates that this reply should not count as an ack for the new epochwrappedEpochBytes.putInt(-1);} else {throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());}QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);writePacket(ackNewEpoch, true);return ZxidUtils.makeZxid(newEpoch, 0);} else {if (newEpoch > self.getAcceptedEpoch()) {self.setAcceptedEpoch(newEpoch);}if (qp.getType() != Leader.NEWLEADER) {LOG.error("First packet should have been NEWLEADER");throw new IOException("First packet should have been NEWLEADER");}return qp.getZxid();}}

这里就明确了构造出了qp发给leader了。

3.follower构造ACKEPOCH发给leader的时机。
这个也是在org.apache.zookeeper.server.quorum.Learner#registerWithLeader中。
其中有段代码是这样的:

         QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);writePacket(ackNewEpoch, true);

结合前后来看它是被if (qp.getType() == Leader.LEADERINFO)包裹住的。也就是说只有接收到来自leader的LEADERINFO类型的才会封装ACKEPOCH去发给leader。这样是不是就串起了选举届号的流程了。
这里大家可能有个疑问。就是这里是if,如果拿到的不是LEADERINFO呢?
其实前面是会阻塞的,因为follower是要拿到leader发的数据才能获取到届号。如果获取到了,那么就说明拿到了LEADERINFO,所以ACKEPOCH基本是必定构造的。关于阻塞与非阻塞又涉及到socket的知识了。这个就自己了解了。就这一点点流程就写了这么多。哎。后面的下篇说吧

zookeeper(五)集群角色epoch的选取相关推荐

  1. zookeeper + kafka集群搭建详解

    文章目录 一.消息队列介绍 1.1 为什么需要消息队列 (MO) 1.2 使用消息队列的好处 (1)解耦 (2)可恢复性 (3)缓冲 (4)灵活性 & 峰值处理能力 (5)异步通信很多时候,用 ...

  2. zookeeper+kafka集群部署+storm集群

    zookeeper+kafka集群部署+storm集群 一.环境安装前准备: 准备三台机器 操作系统:centos6.8 jdk:jdk-8u111-linux-x64.gz zookeeper:zo ...

  3. Zookeeper服务器集群的搭建与操作

    ZooKeeper 作用:Zookeeper 可以用来保证数据在zk集群之间的数据的事务性一致(原子操作). 介绍:Zookeeper 是 Google 的 Chubby一个开源的实现,是 Hadoo ...

  4. 一条数据的HBase之旅,简明HBase入门教程4:集群角色

    [摘要] 本文主要介绍HBase与HDFS的关系,一些关键进程角色,以及在部署上的建议 HBase与HDFS 我们都知道HBase的数据是存储于HDFS里面的,相信大家也都有这么的认知: HBase是 ...

  5. 3、kubeadm部署Kubernetes 网络插件flannel、Calico、weave 并设置集群角色

    Kubernetes(k8s)是自动化容器操作的开源平台,这些操作包括部署,调度和节点集群间扩展. Kubernetes不仅支持Docker,还支持Rocket,这是另一种容器技术. 使用Kubern ...

  6. 【ZooKeeper】集群安装与配置

    单机模式 下载zookeeper的安装包之后, 解压到合适目录. 进入zookeeper目录下的conf子目录, 创建zoo.cfg: tickTime=2000 dataDir=/Users/app ...

  7. 搭建zookeeper+kafka集群

      搭建zookeeper+kafka集群 一.环境及准备 集群环境:   软件版本: 部署前操作: 关闭防火墙,关闭selinux(生产环境按需关闭或打开) 同步服务器时间,选择公网ntpd服务器或 ...

  8. zookeeper做集群后启动不了,大部分原因是防火墙未关闭

    zookeeper做单机版,可以正常启动:但是zookeeper做集群后启动不了,大部分原因是防火墙未关闭. centos的关闭防火墙方法比较独立. systemctl stop firewalld. ...

  9. zookeeper+kafka集群安装之中的一个

    zookeeper+kafka集群安装之中的一个 准备3台虚拟机, 系统是RHEL64服务版. 1) 每台机器配置例如以下: $ cat /etc/hosts ... # zookeeper host ...

最新文章

  1. Linux下磁盘分区工具cfdisk的使用
  2. 18 Java面试之 Oracle 和 Mysql 数据库
  3. 《研磨设计模式》chap14 迭代器模式(2)算工资举例
  4. 【记录】ubuntu18.04 终端下No module named numpy No module name cv2
  5. android的提示页面,android 页面加载中,友情提示界面-Fun言
  6. 模型训练速度过慢,GPU利用率低
  7. 2019蓝桥杯国赛B组第九题
  8. 【mysql】Innodb三大特性之double write
  9. java并发集合面试题,那些经常被问的JAVA面试题(1)—— 集合部分
  10. java数组综合练习_69期-Java SE-005_二维数组、综合练习-001-002
  11. 小量数据和海量数据分页显示存储过程
  12. 海思Hi3519AV100sensor移植之一-- imx307
  13. 计算机课程设计红绿灯,labview红绿灯课程设计报告
  14. QGIS 3.0 使用教程
  15. BP神经网络算法基本原理,bp神经网络的应用案例
  16. win10 网卡优先级修改
  17. 汽车汽配行业SaaS多租户系统助力车企打通行业壁垒,构建数字化管理平台
  18. 贪心(优先队列) - New Year Snowmen - CodeForces - 140C
  19. 查看Mac本机路由器IP地址
  20. 爬虫学习笔记--爬取静态网页

热门文章

  1. Kubernetes--k8s---存活探针和就绪探针的最佳实践
  2. 全国天气查询API接口
  3. 《Kinect应用开发实战:用最自然的方式与机器对话》一3.5 从深度图像到骨骼图...
  4. WPS表格excel实现下拉搜索(简单几步,轻松搞定)
  5. 服务器真实ip怎么隐藏?
  6. 嵌入式Linux驱动学习【9】—— Nor Flash
  7. jffs2的目录项查找过程
  8. JAVA-----乱码的处理 乱码的解决方法总结
  9. java方法案例:判断整数是奇数还是偶数
  10. 网络传输介质有哪几种