Zookeeper服务器在启动的时候会通过一定的选举算法从多个server中选出leader server,剩下的server则作为follower.目前实现的选举算法有FastLeaderElection、AuthFastLeaderElection和 LeaderElection算法,但是AuthFastLeaderElection和LeaderElection都被标注为@Deprecated,因此真正使用的算法只有FastLeaderElection算法。所有的选举算法实现类都实现了接口Election:

public interface Election {public Vote lookForLeader() throws InterruptedException;public void shutdown();
}

这个接口有两个方法,lookForLeader()是具体的选举算法的实现,而shutdown()是在选举结束后的清理工作,包括关闭server之间为了进行选举而建立的连接,停止为了选举而建立的消息发送、接收的线程。

基本的选举逻辑,大家可以参考这篇博客:zookeeper的领导者选举和原子广播,建议读者先看懂这篇博客,再来看本文的源码解析,宏观到微观哈。

系统初始化时,每一个QuorumPeer对象(一个QuorumPeer可以理解为一个准备参加选举的ZK的server,即配置文件zoo.cfg中配置的服务器)维护了一个FastLeaderElection对象来为自己的选举工作进行代言。当然,一台服务器可以运行一个或者多个QuorumPeer。

在选举过程中需要进行消息的沟通,因此在FastLeaderElection中维护了两个变量:

    LinkedBlockingQueue<ToSend> sendqueue;LinkedBlockingQueue<Notification> recvqueue;

recvqueue中存放了选举过程中接收到的消息,这些消息被交给了FastLeaderElection的最核心方法lookForLeader()进行处理以选举出leader。而sendqueue中存放了待发送出去的消息,待发送的消息会被接下来要介绍的WorkerSender处理。

同时,每一个FastLeaderElection变量维护了一个内置类Messager,Messager类包含了两个实现了Runnable接口的类WorkerReceiver和WorkerSender,从名字可以看出,这两个类分别负责消息的发送和接收。即WorkerReceiver负责接收消息并将接收到的消息放入recvqueue中等待处理,WorkerSender负责从sendqueue中取出待发送消息,交给下层的连接管理类QuorumCnxManager进行发送。

每一个QuorumPeer都有一个QuorumCnxManager对象负责选举期间QuorumPeer之间连接的建立和发送、接收消息队列的维护。

    /** Mapping from Peer to Thread number*/final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;/** Reception queue*/public final ArrayBlockingQueue<Message> recvQueue;

可以看到,QuorumCnxManager也含有发送队列queueSendMap、接收队列recvQueue,同时,还有分别负责消息的发送和接收的SenderWorker和RecvWorker两个继承了Runnable接口的线程类,这两个线程类的构造方法如下:

 SendWorker(Socket sock, Long sid) {super("SendWorker:" + sid);this.sid = sid;this.sock = sock;recvWorker = null;try {dout = new DataOutputStream(sock.getOutputStream());} catch (IOException e) {LOG.error("Unable to access socket output stream", e);closeSocket(sock);running = false;}LOG.debug("Address of remote peer: " + this.sid);}
      RecvWorker(Socket sock, Long sid, SendWorker sw) {super("RecvWorker:" + sid);this.sid = sid;this.sock = sock;this.sw = sw;try {din = new DataInputStream(sock.getInputStream());// OK to wait until socket disconnects while reading.sock.setSoTimeout(0);} catch (IOException e) {LOG.error("Error while accessing socket for " + sid, e);closeSocket(sock);running = false;}}

每个SenderWorker或者RecvWorker都有一个sid变量,显然,每一个sid对应的QuorumPeer都会有与之对应的SenderWorker和RecvWorker来专门负责处理接收到的它的消息或者向它发送消息。

queueSendMap的key是sid,value是需要发送给这个sid的所有消息。

再看看这两个worker最核心的run()方法都在做什么:

SenderWorker.run():

        @Overridepublic void run() {threadCnt.incrementAndGet();try {/*** If there is nothing in the queue to send, then we* send the lastMessage to ensure that the last message* was received by the peer. The message could be dropped* in case self or the peer shutdown their connection* (and exit the thread) prior to reading/processing* the last message. Duplicate messages are handled correctly* by the peer.** If the send queue is non-empty, then we have a recent* message than that stored in lastMessage. To avoid sending* stale message, we should send the message in the send queue.*/ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);if (bq == null || isSendQueueEmpty(bq)) {ByteBuffer b = lastMessageSent.get(sid);if (b != null) {LOG.debug("Attempting to send lastMessage to sid=" + sid);send(b);}}} catch (IOException e) {LOG.error("Failed to send last message. Shutting down thread.", e);this.finish();}try {while (running && !shutdown && sock != null) {ByteBuffer b = null;try {ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);if (bq != null) {b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);} else {LOG.error("No queue of incoming messages for " +"server " + sid);break;}if(b != null){lastMessageSent.put(sid, b);send(b);}} catch (InterruptedException e) {LOG.warn("Interrupted while waiting for message on queue",e);}}} catch (Exception e) {LOG.warn("Exception when using channel: for id " + sid + " my id = " + self.getId() + " error = " + e);}this.finish();LOG.warn("Send worker leaving thread");}

RecvWorker.run():

        @Overridepublic void run() {threadCnt.incrementAndGet();try {while (running && !shutdown && sock != null) {/*** Reads the first int to determine the length of the* message*/int length = din.readInt();if (length <= 0 || length > PACKETMAXSIZE) {throw new IOException("Received packet with invalid packet: "+ length);}/*** Allocates a new ByteBuffer to receive the message*/byte[] msgArray = new byte[length];din.readFully(msgArray, 0, length);ByteBuffer message = ByteBuffer.wrap(msgArray);addToRecvQueue(new Message(message.duplicate(), sid));}} catch (Exception e) {LOG.warn("Connection broken for id " + sid + ", my id = " + self.getId() + ", error = " , e);} finally {LOG.warn("Interrupting SendWorker");sw.finish();if (sock != null) {closeSocket(sock);}}}

从代码可以看到,SenderWorker负责不断从全局的queueSendMap中读取自己所负责的sid对应的消息的列表,然后将消息发送给对应的sid。

而RecvWorker负责从与自己负责的sid建立的TCP连接中读取数据放入到recvQueue的末尾。

从QuorumCnxManager.SenderWorker和QuorumCnxManager.RecvWorker的run方法中可以看出,这两个worker都是基于QuorumCnxManager建立的连接,与对应的server进行消息的发送和接收,而要发送的消息则来自FastLeaderElection,接收到的消息,也是被FastLeaderElection处理,因此,QuorumCnxManager的两个worker并不负责具体的算法实现,只是消息发送、接收的代理类,FastLeaderElection不需要理睬怎么与其它的server通信、怎么获得其它server的投票信息这些细节,只需要从QuorumCnxManager提供的队列里面取消息或者放入消息。

下面,我们来看看FastLeaderElection.Messager.WorkekSender和 FastLeaderElection.Messager.WorkerReceiver各自的run方法:

WorkerReceiver.run()

            public void run() {Message response;while (!stop) {// Sleeps on receivetry{response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);if(response == null) continue;/** If it is from an observer, respond right away.* Note that the following predicate assumes that* if a server is not a follower, then it must be* an observer. If we ever have any other type of* learner in the future, we'll have to change the* way we check for observers.*/if(!self.getVotingView().containsKey(response.sid)){Vote current = self.getCurrentVote();ToSend notmsg = new ToSend(ToSend.mType.notification,current.getId(),current.getZxid(),logicalclock,self.getPeerState(),response.sid,current.getPeerEpoch());sendqueue.offer(notmsg);} else {// Receive new messageif (LOG.isDebugEnabled()) {LOG.debug("Receive new notification message. My id = "+ self.getId());}/** We check for 28 bytes for backward compatibility*/if (response.buffer.capacity() < 28) {LOG.error("Got a short response: "+ response.buffer.capacity());continue;}boolean backCompatibility = (response.buffer.capacity() == 28);response.buffer.clear();// Instantiate Notification and set its attributesNotification n = new Notification();// State of peer that sent this messageQuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;switch (response.buffer.getInt()) {case 0:ackstate = QuorumPeer.ServerState.LOOKING;break;case 1:ackstate = QuorumPeer.ServerState.FOLLOWING;break;case 2:ackstate = QuorumPeer.ServerState.LEADING;break;case 3:ackstate = QuorumPeer.ServerState.OBSERVING;break;default:continue;}n.leader = response.buffer.getLong();n.zxid = response.buffer.getLong();n.electionEpoch = response.buffer.getLong();n.state = ackstate;n.sid = response.sid;if(!backCompatibility){n.peerEpoch = response.buffer.getLong();} else {if(LOG.isInfoEnabled()){LOG.info("Backward compatibility mode, server id=" + n.sid);}n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);}/** Version added in 3.4.6*/n.version = (response.buffer.remaining() >= 4) ? response.buffer.getInt() : 0x0;/** Print notification info*/if(LOG.isInfoEnabled()){printNotification(n);}/** If this server is looking, then send proposed leader*/if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){recvqueue.offer(n);/** Send a notification back if the peer that sent this* message is also looking and its logical clock is* lagging behind.*/if((ackstate == QuorumPeer.ServerState.LOOKING)&& (n.electionEpoch < logicalclock)){Vote v = getVote();ToSend notmsg = new ToSend(ToSend.mType.notification,v.getId(),v.getZxid(),logicalclock,self.getPeerState(),response.sid,v.getPeerEpoch());sendqueue.offer(notmsg);}} else {/** If this server is not looking, but the one that sent the ack* is looking, then send back what it believes to be the leader.*/Vote current = self.getCurrentVote();if(ackstate == QuorumPeer.ServerState.LOOKING){if(LOG.isDebugEnabled()){LOG.debug("Sending new notification. My id =  " +self.getId() + " recipient=" +response.sid + " zxid=0x" +Long.toHexString(current.getZxid()) +" leader=" + current.getId());}ToSend notmsg;if(n.version > 0x0) {notmsg = new ToSend(ToSend.mType.notification,current.getId(),current.getZxid(),current.getElectionEpoch(),self.getPeerState(),response.sid,current.getPeerEpoch());} else {Vote bcVote = self.getBCVote();notmsg = new ToSend(ToSend.mType.notification,bcVote.getId(),bcVote.getZxid(),bcVote.getElectionEpoch(),self.getPeerState(),response.sid,bcVote.getPeerEpoch());}sendqueue.offer(notmsg);}}}} catch (InterruptedException e) {System.out.println("Interrupted Exception while waiting for new message" +e.toString());}}LOG.info("WorkerReceiver is down");}

WorkerReceiver的核心逻辑就是不断读取QuorumCnxManager的接收队列中接收到的消息并对消息进行处理。

(1)18行的代码对该消息的发送者身份进行验证,即如果该消息来自一个未知的发送者(自己没有维护这个发送者的sid,即配置文件中没有配置这个sid),就把自己当前的投票结果(自己推举的leader 的 sid 、自己的zxid、自己的状态(LOOKING, FOLLOWING, LEADING, OBSERVING)、逻辑时钟)发送给对方。

(2)29行代码:如果消息的确来自配置文件中的某个server,那就开始组装Notification消息,即把消息发送者发过来的;leader的sid 、选举的逻辑时钟、发送者的sid、zxid等信息放入到Notification中,这个Notification对象是选举过程中参与选举的多个机器进行不断的交流沟通的信息的封装类,因此,1)代码109行,如果本机正处在Looking状态,则将这个消息放入到FastLeaderElection.recvqueue中,交给lookForLeader()方法进行处理以选举leader,;2)代码126行,如果本机不是正处在LOOKING,但是消息的发送者处在LOOKING,则本机有义务将自己所认为的leader发送给它。

WorkerSender.run()

            public void run() {while (!stop) {try {ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);if(m == null) continue;process(m);} catch (InterruptedException e) {break;}}LOG.info("WorkerSender is down");}

WorkerSender.run不负责具体的业务逻辑,只是负责从sendqueue中取出消息,然后调用process()方法进行处理,从process()源代码中可以看到,实际上是将消息放入到queueSendMap中,最终由SenderWorker.run()负责将这个消息发送出去。

讲完了不同的server时间的消息传递,下面,让我们来看最核心的选举算法实现逻辑,即lookForLeader()方法。

FastLeaderElection.lookForLeader()方法:

   /*** Starts a new round of leader election. Whenever our QuorumPeer* changes its state to LOOKING, this method is invoked, and it* sends notifications to all other peers.*/public Vote lookForLeader() throws InterruptedException {try {self.jmxLeaderElectionBean = new LeaderElectionBean();MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);} catch (Exception e) {LOG.warn("Failed to register with JMX", e);self.jmxLeaderElectionBean = null;}if (self.start_fle == 0) {self.start_fle = System.currentTimeMillis();}try {HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();int notTimeout = finalizeWait;synchronized(this){logicalclock++;updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());}LOG.info("New election. My id =  " + self.getId() +", proposed zxid=0x" + Long.toHexString(proposedZxid));sendNotifications();//开始选举时,设置自身状态为LOOKING/** Loop in which we exchange notifications until we find a leader*/while ((self.getPeerState() == ServerState.LOOKING) &&(!stop)){/** Remove next notification from queue, times out after 2 times* the termination time*/Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS);/** Sends more notifications if haven't received enough.* Otherwise processes new notification.*/if(n == null){if(manager.haveDelivered()){sendNotifications();} else {manager.connectAll();}/** Exponential backoff*/int tmpTimeOut = notTimeout*2;notTimeout = (tmpTimeOut < maxNotificationInterval?tmpTimeOut : maxNotificationInterval);LOG.info("Notification time out: " + notTimeout);}else if(self.getVotingView().containsKey(n.sid)) {/** Only proceed if the vote comes from a replica in the* voting view.*/switch (n.state) {case LOOKING:// If notification > current, replace and send messages outif (n.electionEpoch > logicalclock) {logicalclock = n.electionEpoch;recvset.clear();if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {updateProposal(n.leader, n.zxid, n.peerEpoch);} else {updateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch());}sendNotifications();} else if (n.electionEpoch < logicalclock) { //对方的epoch更陈旧if(LOG.isDebugEnabled()){LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"+ Long.toHexString(n.electionEpoch)+ ", logicalclock=0x" + Long.toHexString(logicalclock));}break;} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)) {updateProposal(n.leader, n.zxid, n.peerEpoch);sendNotifications();}if(LOG.isDebugEnabled()){LOG.debug("Adding vote: from=" + n.sid +", proposed leader=" + n.leader +", proposed zxid=0x" + Long.toHexString(n.zxid) +", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));}recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));//记录对方服务器的投票信息if (termPredicate(recvset,new Vote(proposedLeader, proposedZxid,logicalclock, proposedEpoch))) {// Verify if there is any change in the proposed leaderwhile((n = recvqueue.poll(finalizeWait,TimeUnit.MILLISECONDS)) != null){if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)){recvqueue.put(n);break;}}/** This predicate is true once we don't read any new* relevant message from the reception queue*/if (n == null) {self.setPeerState((proposedLeader == self.getId()) ?ServerState.LEADING: learningState());Vote endVote = new Vote(proposedLeader,proposedZxid,logicalclock,proposedEpoch);leaveInstance(endVote);return endVote;}}break;case OBSERVING:LOG.debug("Notification from observer: " + n.sid);break;case FOLLOWING:case LEADING:/** Consider all notifications from the same epoch* together.*/if(n.electionEpoch == logicalclock){recvset.put(n.sid, new Vote(n.leader,n.zxid,n.electionEpoch,n.peerEpoch));if(ooePredicate(recvset, outofelection, n)) {self.setPeerState((n.leader == self.getId()) ?ServerState.LEADING: learningState());Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);leaveInstance(endVote);return endVote;}}/** Before joining an established ensemble, verify* a majority is following the same leader.*/outofelection.put(n.sid, new Vote(n.version,n.leader,n.zxid,n.electionEpoch,n.peerEpoch,n.state));if(ooePredicate(outofelection, outofelection, n)) {synchronized(this){logicalclock = n.electionEpoch;self.setPeerState((n.leader == self.getId()) ?ServerState.LEADING: learningState());}Vote endVote = new Vote(n.leader,n.zxid,n.electionEpoch,n.peerEpoch);leaveInstance(endVote);return endVote;}break;default:LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",n.state, n.sid);break;}} else {LOG.warn("Ignoring notification from non-cluster member " + n.sid);}}return null;} finally {try {if(self.jmxLeaderElectionBean != null){MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);}} catch (Exception e) {LOG.warn("Failed to unregister with JMX", e);}self.jmxLeaderElectionBean = null;}}

下图是本文中FastLeaderElection类和QuorumCnxManager类的主要方法、变量、线程之间的关系。

在阅读lookForLeader方法前,还是建议读者阅读在本文开头提到的那篇文章以及这篇文章:FastLeader选举算法

1.选举开始前,代码26行,逻辑时钟logicalclock加1,代表自己当前开始了一个新的选举周期,同时,通过updateProposal()方法初始化选举前的自身状态

    synchronized void updateProposal(long leader, long zxid, long epoch){if(LOG.isDebugEnabled()){LOG.debug("Updating proposal: " + leader + " (newleader), 0x"+ Long.toHexString(zxid) + " (newzxid), " + proposedLeader+ " (oldleader), 0x" + Long.toHexString(proposedZxid) + " (oldzxid)");}proposedLeader = leader;proposedZxid = zxid;proposedEpoch = epoch;}

从代码中可以看出来,这个初始化过程是先选举自己为leader,将自己的zxid和epoch值认为是系统最新的zxid和epoch值,在还没有收到其它机器的Notification之前,这种“自以为是”的假定是合理的。然后,第32行代码,把自己的假定发送出去了。初始情况下所有机器都假定自己是leader,但是经过多轮消息的沟通和激烈的博弈,最终只有一台机器胜出。
2.第44行代码,真正的好戏开始了,反复检查接收队列中是否有新消息,如果没有,则怀疑上一条消息发送失败或者连接断开,如第52行代码,重新发送当前自己的选举状态(53行代码),或者,怀疑连接存在问题,则重新建立连接(55行代码),以指数级别更新超时时间,当然,最大超时时间不可以超过maxNotificationInterval。
3.如第66行代码,如果从接收队列中拿到了新的response,则进行处理,先确认这个response的发送者是配置文件中的sid,而不是陌生的sid,如果是陌生的sid,则直接忽
略。然后根据发送者所推举的leader的状态作不同的处理:如果发送者推举的leader的状态是LOOKING,显然,选举过程中的这种状态是最多的,处理也是最复杂的:

a) 代码74行,如果这个leader的逻辑时钟大于目前的逻辑时钟,那么说明这是更新的一次选举,此时需要更新一下本机的逻辑时钟值,同时将之前收集到的来自其他服务器的选举清空,因为这些数据已经不再有效了.然后判断是否需要更新当前自己的选举情况.在这里是根据选举leader id,保存的最大数据id、还有leader 的时钟id(非选举时钟id)来进行判断的,参考totalOrderPredicate()可以看到,这两种数据之间对这个选举结果的影响的权重关系是:首先看对方推举的leader的选举时钟,时钟大者胜出,再看这个leader的数据id,数据id大者胜出;最后再判断leader的sid,sid大者胜出.然后再将自身最新的选举结果(也就是上面提到的三种数据)广播给其他服务器).
b) 代码86行,发送过来数据的逻辑时钟小于本机的逻辑时钟,说明对方在一个相对较早的选举进程中,这里什么都不用做
c) 代码93行,两边的逻辑时钟相同,此时也只是调用totalOrderPredicate函数判断是否需要更新本机的数据,如果更新了再将自己最新的选举结果广播出去就是了.

从上面的比较过程可以看出来,在系统初始启动的时候,各个server的选举时钟都相同,最直接的比较久变成了数据id的比较,数据id越大,即数据越新,就越有可能被选举为leader.

    /*** Check if a pair (server id, zxid) succeeds our* current vote.** @param id    Server identifier* @param zxid  Last zxid observed by the issuer of this vote*/protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));if(self.getQuorumVerifier().getWeight(newId) == 0){return false;}/** We return true if one of the following three cases hold:* 1- New epoch is higher* 2- New epoch is the same as current epoch, but new zxid is higher* 3- New epoch is the same as current epoch, new zxid is the same*  as current zxid, but server id is higher.*/return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) &&((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));}

3.第108行代码,尝试通过现在已经收到的信息,判断是否已经足够确认最终的leader了,如方法termPredicate() ,判断标准很简单:是否已经有超过半数的机器所推举的leader为当前自己所推举的leader.如果是,如113行代码,保险起见,最多再等待finalizeWait(默认200ms)的时间进行最后的确认,如果发现有了更新的leader信息,则把这个Notification重新放回recvqueue(117行代码),显然,选举将继续进行。否则,选举结束,根据选举的leader是否是自己,设置自己的状态为LEADING或者OBSERVING或者FOLLOWING。

/*** Termination predicate. Given a set of votes, determines if* have sufficient to declare the end of the election round.**  @param votes    Set of votes*  @param l        Identifier of the vote received last*  @param zxid     zxid of the the vote received last*/protected boolean termPredicate(HashMap<Long, Vote> votes,Vote vote) {HashSet<Long> set = new HashSet<Long>();/** First make the views consistent. Sometimes peers will have* different zxids for a server depending on timing.*/for (Map.Entry<Long,Vote> entry : votes.entrySet()) {if (vote.equals(entry.getValue())){set.add(entry.getKey());}}return self.getQuorumVerifier().containsQuorum(set);}

4.如果收到的Notification中状态是OBSERVING,代码139行,显然,这是无效消息,直接忽略。而如果是FOLLOWING或者LEADING,代码142行,则作如下处理:

a)代码148行, 如果该leader的选举时钟和本机的逻辑时钟一致,则记录下这个投票结果,然后对这个leader的合法性作最终的判断,包括是否过半的server选择了这个leader(totalPredicate()方法),以及本机是否已经收到了这个leader自己发过来的状态为LEADING的消息(checkLeader()方法),如果是,则选举可以结束了。

b)代码171行,记录这个server的投票结果,然后确认是否过半数的服务器都选择了这个leader,如果是,则选举结束。

选举算法是ZooKeeper的最核心算法,但是从上面的解释来看,感觉算法本身并不是很复杂。但是,如何在一个分布式系统中协商出唯一一个leader,背后则有非常严密的理论逻辑支持,paxos算法,有兴趣的读者可以搜一下。
以上就是FastLeaderElection算法的主要逻辑。对于ZooKeeper的其它代码目前还在研究中。

ZooKeeper的FastLeaderElection算法源码解析相关推荐

  1. 以太坊Geth 共识算法源码解析

    共识算法 目前以太坊中有两个公式算法的实现,分别为clique和ethash.其中clique是PoA共识的实现,ethash是PoW共识的实现,其相应的代码位于go-ethereum/consens ...

  2. 【详解】Ribbon 负载均衡服务调用原理及默认轮询负载均衡算法源码解析、手写

    Ribbon 负载均衡服务调用 一.什么是 Ribbon 二.LB负载均衡(Load Balancer)是什么 1.Ribbon 本地负载均衡客户端 VS Nginx 服务端负载均衡的区别 2.LB负 ...

  3. Robin六种常用负载均衡算法源码解析

    文章目录 1 经典轮询算法 2 随机算法 3 以响应时间为权重的轮询策略(重中之重) 4 重试策略 5 断言策略 6 最佳可用性策略 1 经典轮询算法 //Robin的负载均衡原理为 请求服务=请求次 ...

  4. 【Hll】Hll HyperLogLog: Cardinality Estimation(基数估计算法源码解析)

    1.概述 好文章,转载防丢失 主要是这里有源码,我遇到问题了,问题是flink在累加器中使用的时候,每次累加最终结果是1,2 每次到了2 就会重新回到1,很郁闷于是看看源码 2.背景 我们经常会统计某 ...

  5. SM3密码杂凑算法源码解析

    1.在SM3算法源文件中主要有以下几个函数: void sm3_starts( sm3_context *ctx ); void sm3_update( sm3_context *ctx, unsig ...

  6. 机器学习实战第二章——KNN算法(源码解析)

    机器学习实战中的内容讲的都比较清楚,一般都能看懂,这里就不再讲述了,这里主要是对代码进行解析,如果你很熟悉python,这个可以不用看. #coding=utf-8 ''' Created on 20 ...

  7. [源码解析] PyTorch分布式优化器(1)----基石篇

    [源码解析] PyTorch分布式优化器(1)----基石篇 文章目录 [源码解析] PyTorch分布式优化器(1)----基石篇 0x00 摘要 0x01 从问题出发 1.1 示例 1.2 问题点 ...

  8. Zookeeper源码解析 -- 本地事务日志持久化之FileTxnLog

    序言 在各个分布式组件中,持久化数据到本地的思想并不少见,为的是能保存内存中的数据,以及重启后能够重载上次内存状态的值.那么如何行之有效的进行,内存数据持久化到磁盘,怎么样的落盘策略合适,怎么设计持久 ...

  9. 机器学习算法源码全解析(三)-范数规则化之核范数与规则项参数选择

    前言 参见上一篇博文,我们聊到了L0,L1和L2范数,这篇我们絮叨絮叨下核范数和规则项参数选择.知识有限,以下都是我一些浅显的看法,如果理解存在错误,希望大家不吝指正.谢谢. 机器学习算法源码全解析( ...

最新文章

  1. 中国石油大学计算机专业调剂信息,2014年中国石油大学(北京)计算机专业考研调剂信息(新)...
  2. js纯ajax,自动完成JS类(纯JS, Ajax模式)
  3. C++学习之路 | PTA乙级—— 1025 反转链表 (20分)(精简)
  4. javascript-变量的作用域
  5. 数据库 -- 单表的数据查询
  6. paypal 支付接口 php,PHP整合PayPal支付
  7. 利用VB操作目录和文件夹
  8. 软件开发人员是一种很棒的职业选择的五大理由
  9. 如何将多个图片批量转换成pdf文件?
  10. c语言怎么让程序停止3秒,求助!!!!用单片机的定时器T1怎么写一个LED亮2秒灭3秒的程序 C语言...
  11. PHP获取钉钉审批,PHP获取钉钉考勤信息源代码
  12. 产品经理如何营销自己
  13. 操作系统 - - 生产者—消费者问题(PV操作)代码显示
  14. ZYNQ学习之路9.USB总线学习(二)
  15. C语言八行杨辉三角空格数,C语言 杨辉三角
  16. 大众点评CAT开源监控系统剖析
  17. oracle转行交流群,oracle多列转行
  18. unfortunately, system ui has stopped
  19. 如何在计算机自动开机时选择用户,bios如何设置电脑定时自动开机/关机-bios设置电脑定时自动开机/关机的饭方法 - 河东软件园...
  20. 计算机控制器和主控芯片,看完这三点让你完全了解微控制器与微处理器的差别?...

热门文章

  1. Postgres初级教程 insert语句
  2. 阿里云服务器菜鸟教程选配、宝塔面板安装到WordPress网站上线
  3. 搜狗全球首推云输入 创新概念引领新方向
  4. 2021年压力管道巡检维护报名考试及压力管道巡检维护考试试卷
  5. 调整图像亮度和对比度
  6. 031:Cetus sharding
  7. 寒冬让SKYCC营销软件陪您一起走过
  8. 安装IntelliJIDEA的时候提示NSIS ERROR的解决办法
  9. python里isalpha_python isdigit()、isalpha()、isalnum() 三个函数
  10. CSS层叠样式表(一)基本内容