lead流程

Leader选举出来之后,会创建一个Leader实例,然后调用lead()方法进行lead流程,接下来看一下lead()方法:

    void lead() throws IOException, InterruptedException {self.end_fle = Time.currentElapsedTime();long electionTimeTaken = self.end_fle - self.start_fle;self.setElectionTimeTaken(electionTimeTaken);ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimeTaken,QuorumPeer.FLE_TIME_UNIT);self.start_fle = 0;self.end_fle = 0;zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);try {self.tick.set(0);zk.loadData();leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());// Start thread that waits for connection requests from// new followers.cnxAcceptor = new LearnerCnxAcceptor();cnxAcceptor.start();long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());zk.setZxid(ZxidUtils.makeZxid(epoch, 0));synchronized(this){lastProposed = zk.getZxid();}newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),null, null);if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {LOG.info("NEWLEADER proposal has Zxid of "+ Long.toHexString(newLeaderProposal.packet.getZxid()));}QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();QuorumVerifier curQV = self.getQuorumVerifier();if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {// This was added in ZOOKEEPER-1783. The initial config has version 0 (not explicitly// specified by the user; the lack of version in a config file is interpreted as version=0).// As soon as a config is established we would like to increase its version so that it// takes presedence over other initial configs that were not established (such as a config// of a server trying to join the ensemble, which may be a partial view of the system, not the full config).// We chose to set the new version to the one of the NEWLEADER message. However, before we can do that// there must be agreement on the new version, so we can only change the version when sending/receiving UPTODATE,// not when sending/receiving NEWLEADER. In other words, we can't change curQV here since its the committed quorum verifier,// and there's still no agreement on the new version that we'd like to use. Instead, we use// lastSeenQuorumVerifier which is being sent with NEWLEADER message// so its a good way to let followers know about the new version. (The original reason for sending// lastSeenQuorumVerifier with NEWLEADER is so that the leader completes any potentially uncommitted reconfigs// that it finds before starting to propose operations. Here we're reusing the same code path for// reaching consensus on the new version number.)// It is important that this is done before the leader executes waitForEpochAck,// so before LearnerHandlers return from their waitForEpochAck// hence before they construct the NEWLEADER message containing// the last-seen-quorumverifier of the leader, which we change belowtry {QuorumVerifier newQV = self.configFromString(curQV.toString());newQV.setVersion(zk.getZxid());self.setLastSeenQuorumVerifier(newQV, true);} catch (Exception e) {throw new IOException(e);}}newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()){newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());}// We have to get at least a majority of servers in sync with// us. We do this by waiting for the NEWLEADER packet to get// acknowledgedwaitForEpochAck(self.getId(), leaderStateSummary);self.setCurrentEpoch(epoch);self.setLeaderAddressAndId(self.getQuorumAddress(), self.getId());try {waitForNewLeaderAck(self.getId(), zk.getZxid());} catch (InterruptedException e) {shutdown("Waiting for a quorum of followers, only synced with sids: [ "+ newLeaderProposal.ackSetsToString() + " ]");HashSet<Long> followerSet = new HashSet<Long>();for(LearnerHandler f : getLearners()) {if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){followerSet.add(f.getSid());}}boolean initTicksShouldBeIncreased = true;for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs) {if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {initTicksShouldBeIncreased = false;break;}}if (initTicksShouldBeIncreased) {LOG.warn("Enough followers present. "+"Perhaps the initTicks need to be increased.");}return;}startZkServer();/*** WARNING: do not use this for anything other than QA testing* on a real cluster. Specifically to enable verification that quorum* can handle the lower 32bit roll-over issue identified in* ZOOKEEPER-1277. Without this option it would take a very long* time (on order of a month say) to see the 4 billion writes* necessary to cause the roll-over to occur.** This field allows you to override the zxid of the server. Typically* you'll want to set it to something like 0xfffffff0 and then* start the quorum, run some operations and see the re-election.*/String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");if (initialZxid != null) {long zxid = Long.parseLong(initialZxid);zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);}if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {self.setZooKeeperServer(zk);}self.adminServer.setZooKeeperServer(zk);// Everything is a go, simply start counting the ticks// WARNING: I couldn't find any wait statement on a synchronized// block that would be notified by this notifyAll() call, so// I commented it out//synchronized (this) {//    notifyAll();//}// We ping twice a tick, so we only update the tick every other// iterationboolean tickSkip = true;// If not null then shutdown this leaderString shutdownMessage = null;while (true) {synchronized (this) {long start = Time.currentElapsedTime();long cur = start;long end = start + self.tickTime / 2;while (cur < end) {wait(end - cur);cur = Time.currentElapsedTime();}if (!tickSkip) {self.tick.incrementAndGet();}// We use an instance of SyncedLearnerTracker to// track synced learners to make sure we still have a// quorum of current (and potentially next pending) view.SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());if (self.getLastSeenQuorumVerifier() != null&& self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {syncedAckSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());}syncedAckSet.addAck(self.getId());for (LearnerHandler f : getLearners()) {if (f.synced()) {syncedAckSet.addAck(f.getSid());}}// check leader running statusif (!this.isRunning()) {// set shutdown flagshutdownMessage = "Unexpected internal error";break;}if (!tickSkip && !syncedAckSet.hasAllQuorums()) {// Lost quorum of last committed and/or last proposed// config, set shutdown flagshutdownMessage = "Not sufficient followers synced, only synced with sids: [ "+ syncedAckSet.ackSetsToString() + " ]";break;}tickSkip = !tickSkip;}for (LearnerHandler f : getLearners()) {f.ping();}}if (shutdownMessage != null) {shutdown(shutdownMessage);// leader goes in looking state}} finally {zk.unregisterJMX(this);}}void shutdown(String reason) {LOG.info("Shutting down");if (isShutdown) {return;}LOG.info("Shutdown called",new Exception("shutdown Leader! reason: " + reason));if (cnxAcceptor != null) {cnxAcceptor.halt();}// NIO should not accept conenctionsself.setZooKeeperServer(null);self.adminServer.setZooKeeperServer(null);try {ss.close();} catch (IOException e) {LOG.warn("Ignoring unexpected exception during close",e);}self.closeAllConnections();// shutdown the previous zkif (zk != null) {zk.shutdown();}synchronized (learners) {for (Iterator<LearnerHandler> it = learners.iterator(); it.hasNext();) {LearnerHandler f = it.next();it.remove();f.shutdown();}}isShutdown = true;}

分析如下:

  • 1.计算出选举用时并进行性能指标统计
  • 2.注册JMX服务
  • 3.恢复数据和会话,并创建新的数据快照
  • 4.创建Leader和Learner之间通讯的Acceptor线程并启动它
  • 5.获取当前集群领导纪元并计算出zxid
  • 6.根据当前投票验证器版本判断是否需要更新版本为zxid
  • 7.针对NEWLEADER消息创建一个提案并添加相应的投票验证器
  • 8.等待initLimit * tickTime时间周期的领导纪元的ack,投票通过后设置currentEpoch并设置Leader的投票地址和serverId
  • 9.等待initLimit * tickTime时间周期的NEWLEADER消息的ack,如果没有通过投票并且多数Follower与Leader已经建立了连接,那么就需要延长投票时间;然后会关闭之前创建的Acceptor线程并且清理缓存的LeaderZooKeeperServer,接下来会关闭ServerSocket并关闭所有已经建立的连接,再把LeaderZooKeeperServer停止,然后将LearnerHandler的引用清除并设置停止标志;接下来又将进入Leader选举流程
  • 10.如果通过了NEWLEADER的投票,接下来启动LeaderZooKeeperServer服务
  • 11.循环的在一个tickTime周期内发送两次PING消息给Follower,并建立一个跟踪Follower是否跟Leader同步的投票统计器,在发送PING消息之前需要判断是否有多数Follower跟Leader同步,如果投票未通过则会退出Leader流程,这里需要注意的是这个投票在一个tickTime周期内只会进行一次而不是两次;接下来会跟第9步时一样shutdown当前LeaderZookeeperServer并注销掉JMX服务
loadData():恢复数据和会话
    public void loadData() throws IOException, InterruptedException {/** When a new leader starts executing Leader#lead, it* invokes this method. The database, however, has been* initialized before running leader election so that* the server could pick its zxid for its initial vote.* It does it by invoking QuorumPeer#getLastLoggedZxid.* Consequently, we don't need to initialize it once more* and avoid the penalty of loading it a second time. Not* reloading it is particularly important for applications* that host a large database.** The following if block checks whether the database has* been initialized or not. Note that this method is* invoked by at least one other method:* ZooKeeperServer#startdata.** See ZOOKEEPER-1642 for more detail.*/if(zkDb.isInitialized()){setZxid(zkDb.getDataTreeLastProcessedZxid());}else {setZxid(zkDb.loadDataBase());}// Clean up dead sessionsList<Long> deadSessions = new ArrayList<>();for (Long session : zkDb.getSessions()) {if (zkDb.getSessionWithTimeOuts().get(session) == null) {deadSessions.add(session);}}for (long session : deadSessions) {// XXX: Is lastProcessedZxid really the best thing to use?killSession(session, zkDb.getDataTreeLastProcessedZxid());}// Make a clean snapshottakeSnapshot();}

分析如下:

  • 1.判断zookeeper数据库ZKDataBase是否已经加载,如果没初始化则进行加载,然后便设置zxid
  • 2.清理失效会话
  • 3.创建新的数据快照
清理失效会话:killSession(long sessionId, long zxid)
    protected void killSession(long sessionId, long zxid) {zkDb.killSession(sessionId, zxid);if (LOG.isTraceEnabled()) {ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,"ZooKeeperServer --- killSession: 0x"+ Long.toHexString(sessionId));}if (sessionTracker != null) {sessionTracker.removeSession(sessionId);}}- DataTree.killSession(long session, long zxid)void killSession(long session, long zxid) {// the list is already removed from the ephemerals// so we do not have to worry about synchronizing on// the list. This is only called from FinalRequestProcessor// so there is no need for synchronization. The list is not// changed here. Only create and delete change the list which// are again called from FinalRequestProcessor in sequence.Set<String> list = ephemerals.remove(session);if (list != null) {for (String path : list) {try {deleteNode(path, zxid);if (LOG.isDebugEnabled()) {LOG.debug("Deleting ephemeral node " + path+ " for session 0x"+ Long.toHexString(session));}} catch (NoNodeException e) {LOG.warn("Ignoring NoNodeException for path " + path+ " while removing ephemeral for dead session 0x"+ Long.toHexString(session));}}}}

分析如下:

  • 1.循环获取失效的sessionId,然后从临时节点列表中移除相应的数据
  • 2.移除之后获得一个已移除的节点路径列表,然后循环删除DataTree中的数据节点
  • 3.如果存在会话管理器,那么也会移除会话管理器中的失效会话
创建新的数据快照:takeSnapshot()
    public void takeSnapshot() {takeSnapshot(false);}public void takeSnapshot(boolean syncSnap){long start = Time.currentElapsedTime();try {txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts(), syncSnap);} catch (IOException e) {LOG.error("Severe unrecoverable error, exiting", e);// This is a severe error that we cannot recover from,// so we need to exitSystem.exit(ExitCode.TXNLOG_ERROR_TAKING_SNAPSHOT.getValue());}long elapsed = Time.currentElapsedTime() - start;LOG.info("Snapshot taken in " + elapsed + " ms");ServerMetrics.getMetrics().SNAPSHOT_TIME.add(elapsed);}- FileTxnSnapLog.save(DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts, boolean syncSnap)public void save(DataTree dataTree,ConcurrentHashMap<Long, Integer> sessionsWithTimeouts,boolean syncSnap)throws IOException {long lastZxid = dataTree.lastProcessedZxid;File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),snapshotFile);try {snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile, syncSnap);} catch (IOException e) {if (snapshotFile.length() == 0) {/* This may be caused by a full disk. In such a case, the server* will get stuck in a loop where it tries to write a snapshot* out to disk, and ends up creating an empty file instead.* Doing so will eventually result in valid snapshots being* removed during cleanup. */if (snapshotFile.delete()) {LOG.info("Deleted empty snapshot file: " +snapshotFile.getAbsolutePath());} else {LOG.warn("Could not delete empty snapshot file: " +snapshotFile.getAbsolutePath());}} else {/* Something else went wrong when writing the snapshot out to* disk. If this snapshot file is invalid, when restarting,* ZooKeeper will skip it, and find the last known good snapshot* instead. */}throw e;}}- FileSnap. serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot, boolean fsync)public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot, boolean fsync)throws IOException {if (!close) {try (CheckedOutputStream snapOS = SnapStream.getOutputStream(snapShot)) {OutputArchive oa = BinaryOutputArchive.getArchive(snapOS);FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);serialize(dt, sessions, oa, header);SnapStream.sealStream(snapOS, oa);lastSnapshotInfo = new SnapshotInfo(Util.getZxidFromName(snapShot.getName(), SNAPSHOT_FILE_PREFIX),snapShot.lastModified() / 1000);}}}- FileSnap. serialize(DataTree dt, Map<Long, Integer> sessions, OutputArchive oa, FileHeader header)protected void serialize(DataTree dt,Map<Long, Integer> sessions,OutputArchive oa, FileHeader header) throws IOException {// this is really a programmatic error and not something that can// happen at runtimeif(header==null)throw new IllegalStateException("Snapshot's not open for writing: uninitialized header");header.serialize(oa, "fileheader");SerializeUtils.serializeSnapshot(dt,oa,sessions);}- SerializeUtils.serializeSnapshot(DataTree dt,OutputArchive oa,Map<Long, Integer> sessions)public static void serializeSnapshot(DataTree dt,OutputArchive oa,Map<Long, Integer> sessions) throws IOException {HashMap<Long, Integer> sessSnap = new HashMap<Long, Integer>(sessions);oa.writeInt(sessSnap.size(), "count");for (Entry<Long, Integer> entry : sessSnap.entrySet()) {oa.writeLong(entry.getKey().longValue(), "id");oa.writeInt(entry.getValue().intValue(), "timeout");}dt.serialize(oa, "tree");}- SnapStream.sealStream(CheckedOutputStream os, OutputArchive oa)public static void sealStream(CheckedOutputStream os, OutputArchive oa)throws IOException {long val = os.getChecksum().getValue();oa.writeLong(val, "val");oa.writeString("/", "path");}

分析如下:

  • 1.根据最大事务id-zxid创建一个空的快照文件
  • 2.创建文件头FileHeader
  • 3.将文件头、会话数量、会话列表、全部数据节点依次写入快照文件中,然后再将checkSum写入快照文件(这里可以跟第三章的反序列化相互印证)
  • 4.记录当前进行快照的信息
  • 5.进行快照时间的性能指标统计
获取集群领导纪元:getEpochToPropose(self.getId(), self.getAcceptedEpoch())
    public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {synchronized(connectingFollowers) {if (!waitingForNewEpoch) {return epoch;}if (lastAcceptedEpoch >= epoch) {epoch = lastAcceptedEpoch+1;}if (isParticipant(sid)) {connectingFollowers.add(sid);}QuorumVerifier verifier = self.getQuorumVerifier();if (connectingFollowers.contains(self.getId()) &&verifier.containsQuorum(connectingFollowers)) {waitingForNewEpoch = false;self.setAcceptedEpoch(epoch);connectingFollowers.notifyAll();} else {long start = Time.currentElapsedTime();if (sid == self.getId()) {timeStartWaitForEpoch = start;}long cur = start;long end = start + self.getInitLimit()*self.getTickTime();while(waitingForNewEpoch && cur < end  && !quitWaitForEpoch) {connectingFollowers.wait(end - cur);cur = Time.currentElapsedTime();}if (waitingForNewEpoch) {throw new InterruptedException("Timeout while waiting for epoch from quorum");}}return epoch;}}private void quitLeading() {synchronized(connectingFollowers) {quitWaitForEpoch = true;connectingFollowers.notifyAll();}ServerMetrics.getMetrics().QUIT_LEADING_DUE_TO_DISLOYAL_VOTER.add(1);LOG.info("Quit leading due to voter changed mind.");}

分析如下:

  • 1.锁定connectingFollowers列表,值得注意的是在Leader退出领导的方法quitLeading()中也进行了锁定,connectingFollowers对象进入wait时会释放锁,这时候如果退出领导了,会将quitWaitForEpoch标志设置为true并唤醒执行getEpochToPropose()方法的线程,这时候将结束while等待的过程
  • 2.如果参数lastAcceptedEpoch大于等于当前缓存的epoch,则将缓存的epoch设置为lastAcceptedEpoch+1
  • 3.判断这个serverId是不是投票参与者,如果是的话则将其放入connectingFollowers列表
  • 4.判断connectingFollowers列表是否包含传入的serverId并且验证是否通过投票,如果满足条件的话,首先将等待标志waitingForNewEpoch设置为false并且设置acceptedEpoch为当前缓存的epoch,然后唤醒其他执行connectingFollowers对象的wait()方法而陷入休眠状态的线程,这样做将会结束其他线程while等待的过程并且退出当前方法栈
  • 5.如果不满足条件的话,将陷入等待,等待周期为initTime * tickTime
waitForEpochAck(long id, StateSummary ss):
    public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {synchronized(electingFollowers) {if (electionFinished) {return;}if (ss.getCurrentEpoch() != -1) {if (ss.isMoreRecentThan(leaderStateSummary)) {throw new IOException("Follower is ahead of the leader, leader summary: "+ leaderStateSummary.getCurrentEpoch()+ " (current epoch), "+ leaderStateSummary.getLastZxid()+ " (last zxid)");}if (ss.getLastZxid() != -1 && isParticipant(id)) {electingFollowers.add(id);}}QuorumVerifier verifier = self.getQuorumVerifier();if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) {electionFinished = true;electingFollowers.notifyAll();} else {long start = Time.currentElapsedTime();long cur = start;long end = start + self.getInitLimit()*self.getTickTime();while(!electionFinished && cur < end) {electingFollowers.wait(end - cur);cur = Time.currentElapsedTime();}if (!electionFinished) {throw new InterruptedException("Timeout while waiting for epoch to be acked by quorum");}}}}- StateSummary.isMoreRecentThan(StateSummary ss)public boolean isMoreRecentThan(StateSummary ss) {return (currentEpoch > ss.currentEpoch) || (currentEpoch == ss.currentEpoch && lastZxid > ss.lastZxid);}

分析如下:

  • 1.锁定electingFollowers列表,并判断标志位electionFinished
  • 2.判断currentEpoch与zxid是否超出当前Leader,如果超出说明Follower领先于Leader,这时会抛出异常结束lead流程然后重新进入Leader选举流程
  • 3.如果这个zookeeper服务的zxid有效并且是投票者,那么将其放入electingFollowers列表
  • 4.判断electingFollowers列表是否包含传入的serverId并且验证是否通过投票,如果满足条件的话,首先将结束标志位electionFinished设置为true,然后唤醒其他执行electingFollowers对象的wait()方法而陷入休眠状态的线程,这样做将会结束其他线程while等待的过程并且退出当前方法栈
  • 5.如果不满足条件的话,将陷入等待,等待周期为initTime * tickTime
等待NEWLEADER消息的ack:waitForNewLeaderAck(long sid, long zxid)
    public void waitForNewLeaderAck(long sid, long zxid)throws InterruptedException {synchronized (newLeaderProposal.qvAcksetPairs) {if (quorumFormed) {return;}long currentZxid = newLeaderProposal.packet.getZxid();if (zxid != currentZxid) {LOG.error("NEWLEADER ACK from sid: " + sid+ " is from a different epoch - current 0x"+ Long.toHexString(currentZxid) + " receieved 0x"+ Long.toHexString(zxid));return;}/** Note that addAck already checks that the learner* is a PARTICIPANT.*/newLeaderProposal.addAck(sid);if (newLeaderProposal.hasAllQuorums()) {quorumFormed = true;newLeaderProposal.qvAcksetPairs.notifyAll();} else {long start = Time.currentElapsedTime();long cur = start;long end = start + self.getInitLimit() * self.getTickTime();while (!quorumFormed && cur < end) {newLeaderProposal.qvAcksetPairs.wait(end - cur);cur = Time.currentElapsedTime();}if (!quorumFormed) {throw new InterruptedException("Timeout while waiting for NEWLEADER to be acked by quorum");}}}}

分析如下:

  • 1.锁定newLeaderProposal.qvAcksetPairs列表,并判断标志位quorumFormed
  • 2.判断其他Follower服务发来的zxid与当前集群中Leader的zxid是否相等,如果不相等说明处于不同纪元则会直接退出当前方法栈,如果是正常同意了NEWLEADER请求的话,Follower服务发来的zxid一定是等于Leader的zxid;如果相等则计入票数
  • 3.验证NEWLEADER提案是否通过投票,如果通过投票,首先将标志位quorumFormed设置为true,然后唤醒其他执行newLeaderProposal.qvAcksetPairs对象的wait()方法而陷入休眠状态的线程,这样做将会结束其他线程while等待的过程并且退出当前方法栈
  • 4.如果没通过投票,将陷入等待,等待周期为initTime * tickTime
启动LeaderZookeeperServer:startZkServer()
    private synchronized void startZkServer() {// Update lastCommitted and Db's zxid to a value representing the new epochlastCommitted = zk.getZxid();LOG.info("Have quorum of supporters, sids: [ "+ newLeaderProposal.ackSetsToString()+ " ]; starting up and setting last processed zxid: 0x{}",Long.toHexString(zk.getZxid()));/** ZOOKEEPER-1324. the leader sends the new config it must complete*  to others inside a NEWLEADER message (see LearnerHandler where*  the NEWLEADER message is constructed), and once it has enough*  acks we must execute the following code so that it applies the*  config to itself.*/QuorumVerifier newQV = self.getLastSeenQuorumVerifier();Long designatedLeader = getDesignatedLeader(newLeaderProposal, zk.getZxid());self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);if (designatedLeader != self.getId()) {allowedToCommit = false;}leaderStartTime = Time.currentElapsedTime();zk.startup();/** Update the election vote here to ensure that all members of the* ensemble report the same vote to new servers that start up and* send leader election notifications to the ensemble.** @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732*/self.updateElectionVote(getEpoch());zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());}private long getDesignatedLeader(Proposal reconfigProposal, long zxid) {//new configurationProposal.QuorumVerifierAcksetPair newQVAcksetPair = reconfigProposal.qvAcksetPairs.get(reconfigProposal.qvAcksetPairs.size()-1);//check if I'm in the new configuration with the same quorum address -// if so, I'll remain the leaderif (newQVAcksetPair.getQuorumVerifier().getVotingMembers().containsKey(self.getId()) &&newQVAcksetPair.getQuorumVerifier().getVotingMembers().get(self.getId()).addr.equals(self.getQuorumAddress())){return self.getId();}// start with an initial set of candidates that are voters from new config that// acknowledged the reconfig op (there must be a quorum). Choose one of them as// current leader candidateHashSet<Long> candidates = new HashSet<Long>(newQVAcksetPair.getAckset());candidates.remove(self.getId()); // if we're here, I shouldn't be the leaderlong curCandidate = candidates.iterator().next();//go over outstanding ops in order, and try to find a candidate that acked the most ops.//this way it will be the most up-to-date and we'll minimize the number of ops that get droppedlong curZxid = zxid + 1;Proposal p = outstandingProposals.get(curZxid);while (p!=null && !candidates.isEmpty()) {for (Proposal.QuorumVerifierAcksetPair qvAckset: p.qvAcksetPairs){//reduce the set of candidates to those that acknowledged pcandidates.retainAll(qvAckset.getAckset());//no candidate acked p, return the best candidate found so farif (candidates.isEmpty()) return curCandidate;//update the current candidate, and if it is the only one remaining, return itcurCandidate = candidates.iterator().next();if (candidates.size() == 1) return curCandidate;}curZxid++;p = outstandingProposals.get(curZxid);}return curCandidate;}

分析如下:

  • 1.更新lastCommitted为当前zxid,获取lastSeenQuorumVerifier
  • 2.通过newLeaderProposal(即NEWLEADER提案)检查当前Leader是否在具有相同投票地址的新配置中,如果是的话直接返回当前Leader的serverId
  • 3.如果不是代表新配置已经移除了当前Leader或者修改了投票地址,这时候会把NEWLEADER提案发送了同意选票(即发送了ack)的serverId放入一个新的列表中并把当前Leader的serverId移除,之后将会从这个列表中选出一个作为Leader的候选server
  • 4.依次从提案列表outstandingProposals中获取比zxid更大的提案Proposal,然后尝试找到一个对这些提案相似度最高(即对这些提案全部都发送过ack的server列表中获取一个)的候选server
  • 5.执行reconfig流程
  • 6.真正启动一个可以对客户端提供服务的zookeeper server

zookeeper集群模式(十)zookeeper的lead流程相关推荐

  1. zookeeper专题:zookeeper集群模式下,leader选举流程分析

    文章目录 Zookeeper 集群模式一共有三种类型的角色 1. zookeeper启动时leader选举流程 1.1 加载配置文件,设置基本信息 1.2 指定快速选举算法,启动多级队列.线程 1.3 ...

  2. Zookeeper 集群模式搭建

    Zookeeper 集群模式搭建 前言 文件下载 Zookeeper集群角色 安装配置Zookeeper集群 前言 前面有单机模式zookeeper的搭建 .这里记录下自己搭建简单集群的步骤. 因为是 ...

  3. zookeeper集群部署监控与选举同步流程等工作原理

    部署一个zookeeper集群,要多简单就能有多简单(下载压缩包,解压,修改配置文件zoo.cfg,执行启动脚本),但是想要真的把这套东西玩好了,还是需要费些功夫研究一番的.就跟自己搭建一个lnmp的 ...

  4. zookeeper专题:zookeeper集群搭建和客户端连接

    文章目录 1. Zookeeper 集群模式介绍 2. zookeeper 集群搭建 3. 使用curate客户端连接zookeeper集群 1. Zookeeper 集群模式介绍 Zookeeper ...

  5. Zookeeper源码分析:集群模式启动概述

    参考资料 <<从PAXOS到ZOOKEEPER分布式一致性原理与实践>> zookeeper-3.0.0 Zookeeper概述 Zookeeper是一个分布式的,开放源码的分 ...

  6. Apache ZooKeeper -从初始化到对外提供服务的过程解析( 集群模式 )

    文章目录 流程图 Pre 什么是集群模式? ZooKeeper 集群模式的特点 底层实现原理 程序启动 QuorumPeer 类 Leader 服务器启动过程 Follow 服务器启动过程 小结 流程 ...

  7. 面试官:Zookeeper集群怎么搭建?

    哈喽!大家好,我是小奇,一位不靠谱的程序员 小奇打算以轻松幽默的对话方式来分享一些技术,如果你觉得通过小奇的文章学到了东西,那就给小奇一个赞吧 文章持续更新,可以微信搜索[小奇JAVA面试]第一时间阅 ...

  8. Zookeeper集群部署和使用

    Zookeeper 由 Apache Hadoop 的 Zookeeper 子项目发展而来,Google Chubby的一个开源实现.它是一个分布式应用程序协调服务,提供的功能包括:配置管理,名字服务 ...

  9. 面试官:说说你对ZooKeeper集群与Leader选举的理解?

    作者:TalkingData 史天舒 来自:TalkingData ZooKeeper是一个开源分布式协调服务.分布式数据一致性解决方案.可基于ZooKeeper实现命名服务.集群管理.Master选 ...

最新文章

  1. 从Storm到Flink:大数据处理的开源系统及编程模型(文末福利)
  2. spring mvc 文件上传 form表单
  3. java访问控制符_java中访问控制符的作用
  4. 【LeetCode笔记】338. 比特位计数(Java、位运算、动态规划)
  5. 【java】窗口和监听器的使用
  6. Golang 中使用多维 map
  7. Visual Leak Detector 2.2.3 Visual C++内存检测工具
  8. 分类排序 同辈元素只在数据上的层级关系
  9. windows命令和linux,WSL 命令行参考 | Microsoft Docs
  10. 破局:技术视野与规划
  11. 中国古代称谓专有名词
  12. 嵌入式知识-ARM裸机-学习笔记(2):利用GPIO来控制LED(附mkv210_image.c文件解析)
  13. 大学英语四级考试必读必备
  14. CTF MISC解题思路BUUCTF MISC1-8刷题
  15. 下载RoboWare Studio官网登录不上去
  16. MetaQ安装部署文档
  17. 秃头大牛一文竟然就把SpringCloudStream(SCS)给讲明白了?
  18. 生命起源的奥秘:分子生物学对生命起源研究
  19. Canvas画钟 js
  20. 在低容错业务场景下落地微服务的实践经验

热门文章

  1. 存储区更新、插入或删除语句影响到了意外的行数(0)。实体在加载后可能被修改或删除
  2. 将虚拟机映射到本地硬盘
  3. 资源采集网php源码,ThinkPHP5.1 自动采集资源网 源码开源
  4. 绕过雷蛇官网的动态验证码
  5. 中华兵法大典--电子书下载
  6. GreenHand爬虫系列02——爬取豆瓣排行榜
  7. “嘭、嘭、嘭”---C/S架构下的心跳机制
  8. 爆火的韩剧《鱿鱼游戏》,却被日本网友这样质疑?
  9. 法兰克机械手手动操作_法兰克机械手操作说明
  10. 机械臂坐标系变换----极简总结