前言

zookeeper算是一个流行的分布式协调框架,在大量java分布式中间件中广泛使用。在学习zookeeper的源码前建议先了解一下分布式一致性协议的概念,zookeeper自己实现了一套满足cp的一致性协议zab。本篇将会先从zookeeper服务器启动讲起,主要内容为各节点启动后的集群选举。本篇源码版本为3.5.8,因为zookeeper的源码说实话在各个java写的中间件中是属于非常复杂的,而且有些代码写的并不规范,对阅读者并不友好。本篇只关注主流程核心源码,一些次要的逻辑会略过。

集群选举

首先来到zookeeper的启动类QuorumPeerMain,zookeeper会从main()方法启动:

public static void main(String[] args) {QuorumPeerMain main = new QuorumPeerMain();try {//初始化并且启动main.initializeAndRun(args);} catch (IllegalArgumentException e) {...} catch (ConfigException e) {...} catch (DatadirException e) {...} catch (AdminServerException e) {...} catch (Exception e) {...}LOG.info("Exiting normally");System.exit(0);
}protected void initializeAndRun(String[] args)throws ConfigException, IOException, AdminServerException{QuorumPeerConfig config = new QuorumPeerConfig();if (args.length == 1) {//读取配置文件,就zoo.cfg,一个properties文件config.parse(args[0]);}//开启一个数据清理的任务,次要代码,略过DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval());purgeMgr.start();if (args.length == 1 && config.isDistributed()) {//从配置文件启动服务runFromConfig(config);} else {//如果没指定配置文件则单节点启动ZooKeeperServerMain.main(args);}
}

既然是zookeeper当然是看集群源码,直接看runFromConfig(config):

public void runFromConfig(QuorumPeerConfig config)throws IOException, AdminServerException{//略过...LOG.info("Starting quorum peer");try {ServerCnxnFactory cnxnFactory = null;ServerCnxnFactory secureCnxnFactory = null;//创建ServerCnxnFactory,这个对象是服务端用来和客户端通信的//现在讲集群选举,不涉及这个对象,略过if (config.getClientPortAddress() != null) {cnxnFactory = ServerCnxnFactory.createFactory();cnxnFactory.configure(config.getClientPortAddress(),config.getMaxClientCnxns(),false);}//安全方面,次要,略过。。。if (config.getSecureClientPortAddress() != null) {secureCnxnFactory = ServerCnxnFactory.createFactory();secureCnxnFactory.configure(config.getSecureClientPortAddress(),config.getMaxClientCnxns(),true);}//new一个QuorumPeer,QuorumPeer可以理解为一个集群节点对象quorumPeer = getQuorumPeer();//...设置一些QuorumPeer属性//设置选举算法,默认值为3quorumPeer.setElectionType(config.getElectionAlg());//设置myid,标识节点idquorumPeer.setMyid(config.getServerId());//...设置一些QuorumPeer属性//创建zk数据库,这是一个内存数据库,存储客户端写入的节点数据quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));//设置QuorumVerifier,这个对象用来存储集群节点信息quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);if (config.getLastSeenQuorumVerifier()!=null) {quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);}//初始化zk数据库,这个方法比较简单,不进去了quorumPeer.initConfigInZKDatabase();//这个方法看上去很核心,其实只是一些认证方面的设置,略过quorumPeer.initialize();//核心方法,着重分析quorumPeer.start();quorumPeer.join();} catch (InterruptedException e) {LOG.warn("Quorum Peer interrupted", e);}}

进入 quorumPeer.start():

public synchronized void start() {...        //从log中载入持久化的数据,并解析出当前选举epoch,最大事务id等等。loadDataBase();//开启与客户端通信服务,略过startServerCnxnFactory();try {//开启一个jetty服务器,主要用来用户查看服务器信息,不重要adminServer.start();} catch (AdminServerException e) {LOG.warn("Problem starting AdminServer", e);System.out.println(e);}//重要,选举前的准备工作startLeaderElection();//正式开始选举super.start();
}

稍微看下loadDataBase():

currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);

主要就是载入持久化的数据并设置上的值,currentEpoch 选举世代

接下来startLeaderElection():

synchronized public void startLeaderElection() {try {if (getPeerState() == ServerState.LOOKING) {//初始化当前选票,第一次肯定是选自己,选票里面包含了节点id,最大事务id,选举世代currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());}} catch(IOException e) {RuntimeException re = new RuntimeException(e.getMessage());re.setStackTrace(e.getStackTrace());throw re;}if (electionType == 0) {try {udpSocket = new DatagramSocket(getQuorumAddress().getPort());responder = new ResponderThread();responder.start();} catch (SocketException e) {throw new RuntimeException(e);}}//electionType默认为3,所以上面的if不会走,这里创建选举算法this.electionAlg = createElectionAlgorithm(electionType);
}protected Election createElectionAlgorithm(int electionAlgorithm){Election le=null;switch (electionAlgorithm) {//...略过0,1,2case 3://创建选举管理器,这是一个zk传输层的数据管理器,最初的收发选票都在这里QuorumCnxManager qcm = createCnxnManager();//...省略            QuorumCnxManager.Listener listener = qcm.listener;if(listener != null){//开启选举监听器listener.start();//创建选举算法FastLeaderElection fle = new FastLeaderElection(this, qcm);//启动选举算法相关的线程fle.start();le = fle;} else {LOG.error("Null listener when initializing cnx manager");}break;default:assert false;}return le;
}

首先看一下这个选举监听器是怎么工作的,方法很长,省略次要代码,它是一个线程:

public void run() {int numRetries = 0;InetSocketAddress addr;Socket client = null;Exception exitException = null;while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {try {if (self.shouldUsePortUnification()) {...} else {//创建一个ServerSocket,用来发送选票ss = new ServerSocket();}//...setName(addr.toString());ss.bind(addr);while (!shutdown) {try {//Socket开始监听其它节点的连接client = ss.accept();setSockOpts(client);if (quorumSaslAuthEnabled) {receiveConnectionAsync(client);} else {//连接到来,处理receiveConnection(client);}numRetries = 0;} catch (SocketTimeoutException e) {}}} catch (IOException e) {//...}}//...
}public void receiveConnection(final Socket sock) {DataInputStream din = null;try {//从socket拿到输入流din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));//接着处理连接handleConnection(sock, din);} catch (IOException e) {//..}
}

这里handleConnection又是一个长方法,进入分析:

private void handleConnection(Socket sock, DataInputStream din)throws IOException {Long sid = null, protocolVersion = null;InetSocketAddress electionAddr = null;try {//...//OBSERVER_ID表示观察者节点,次要,略过if (sid == QuorumPeer.OBSERVER_ID) {sid = observerCounter.getAndDecrement();}} catch (IOException e) {LOG.warn("Exception reading or writing challenge: {}", e);closeSocket(sock);return;}//...if (sid < self.getId()) {/** 如果对方id小于我方id,那么拒绝连接,关闭socket,因为zookeeper只允许* 大的节点id去连小的节点id*/SendWorker sw = senderWorkerMap.get(sid);if (sw != null) {sw.finish();}closeSocket(sock);//关闭socket后,我方向对方发起连接if (electionAddr != null) {connectOne(sid, electionAddr);} else {connectOne(sid);}} else if (sid == self.getId()) {//怎么可能。。。估计配置文件写错了才会到这把} else {//正常连接,创建发送线程和接收线程,构造方法传入了当前的socketSendWorker sw = new SendWorker(sock, sid);RecvWorker rw = new RecvWorker(sock, din, sid, sw);sw.setRecv(rw);SendWorker vsw = senderWorkerMap.get(sid);if (vsw != null) {vsw.finish();}senderWorkerMap.put(sid, sw);queueSendMap.putIfAbsent(sid,new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));//分别开启两个线程sw.start();rw.start();}}

这里看下SendWorker和RecvWorker两个线程,分别负责传输层的发送选票和接收选票,zookeeper会为每个对方节点各创建一个发送线程和接收线程。SendWorker从传输层队列拿到要发送的选票,通过bio发送;RecvWorker从socket接收到选票,然后把选票放入传输层的接收队列。分别看下这两个线程的run方法:

SendWorker
public void run() {threadCnt.incrementAndGet();try {//拿到对应节点的发送队列,这里用map存储,队列中的对象是一个字节缓存区ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);//...} catch (IOException e) {LOG.error("Failed to send last message. Shutting down thread.", e);this.finish();}try {//死循环,不断从队列中拿选票然后发送while (running && !shutdown && sock != null) {ByteBuffer b = null;try {ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);if (bq != null) {//拿到一张选票b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);} else {break;}if(b != null){//记录一下最近一次的投票lastMessageSent.put(sid, b);//发送选票,就是把选票写入对应socket的输出流send(b);}} catch (InterruptedException e) {LOG.warn("Interrupted while waiting for message on queue",e);}}} catch (Exception e) {}this.finish();}RecvWorker
public void run() {threadCnt.incrementAndGet();try {while (running && !shutdown && sock != null) {/*** 不断的从对应节点socket输入流读取数据* 先读取数据长度*/int length = din.readInt();if (length <= 0 || length > PACKETMAXSIZE) {throw new IOException("Received packet with invalid packet: "+ length);}/*** 分配字节缓冲区,并读入选票数据*/byte[] msgArray = new byte[length];din.readFully(msgArray, 0, length);ByteBuffer message = ByteBuffer.wrap(msgArray);//把选票加入传输层的接收队列addToRecvQueue(new Message(message.duplicate(), sid));}} catch (Exception e) {} finally {sw.finish();closeSocket(sock);}}public void addToRecvQueue(Message msg) {synchronized(recvQLock) {if (recvQueue.remainingCapacity() == 0) {try {recvQueue.remove();} catch (NoSuchElementException ne) {// element could be removed by poll()LOG.debug("Trying to remove from an empty " +"recvQueue. Ignoring exception " + ne);}}try {//加入接收队列recvQueue.add(msg);} catch (IllegalStateException ie) {// This should never happenLOG.error("Unable to insert element in the recvQueue " + ie);}}
}

这里看一下传输层的两个队列:recvQueue,queueSendMap。recvQueue表示传输层的接收队列,全局唯一,应用层会从这个队列拿选票,执行选举的比较逻辑。queueSendMap这里是一个map,因为每个节点都会对应一个发送队列,应用层生成选票后,会把选票放入这个队列。而且应用层其实还有两个队列和两个线程负责接收和发送,然后最上层就是真正的选举线程。从这里可以看出zookeeper中线程间通信的主要手段就是阻塞队列,里面充满了这种异步的队列,这也是其源码复杂的一个原因。

选举监听这个线程方法走完,下面看FastLeaderElection#start,涉及到应用层的两个线程:

public void start() {this.messenger.start();
}void start(){this.wsThread.start();this.wrThread.start();
}

很简单,启动两个线程,对应应用层的发送和接收线程,分别看下它们的run方法:

wsThread
public void run() {//死循环,不断从发送队列里取选票while (!stop) {try {//这里的sendqueue发送队列是应用层的发送队列ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);if(m == null) continue;//处理选票process(m);} catch (InterruptedException e) {break;}}LOG.info("WorkerSender is down");
}void process(ToSend m) {//构造选票,里面包装了要选的leader的信息ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),m.leader,m.zxid,m.electionEpoch,m.peerEpoch,m.configData);//发送manager.toSend(m.sid, requestBuffer);}public void toSend(Long sid, ByteBuffer b) {/** 如果要投的就是自己,那么不用发送,直接放入自己的传输层的接收队列即可*/if (this.mySid == sid) {b.position(0);addToRecvQueue(new Message(b.duplicate(), sid));} else {/** 否则需要发送,放入传输层的发送队列*/ArrayBlockingQueue<ByteBuffer> bq = new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY);ArrayBlockingQueue<ByteBuffer> oldq = queueSendMap.putIfAbsent(sid, bq);if (oldq != null) {addToSendQueue(oldq, b);} else {addToSendQueue(bq, b);}//发起socket连接connectOne(sid);}
}

下面看wrThread逻辑:

public void run() {Message response;while (!stop) {try {//从传输层接收队列中获取选票,没有则循环获取response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);if (response == null) continue;//...//创建一个Notification,表示一张选票Notification n = new Notification();//解析选票相关信息int rstate = response.buffer.getInt();//节点状态long rleader = response.buffer.getLong();//投的leaderidlong rzxid = response.buffer.getLong();//最大事务idlong relectionEpoch = response.buffer.getLong();//选举世代long rpeerepoch;//自己的选举世代,从自己的最大事务id中解析int version = 0x0;QuorumVerifier rqv = null;try {// 省略大量代码,这方法太长了if (!validVoter(response.sid)) {Vote current = self.getCurrentVote();QuorumVerifier qv = self.getQuorumVerifier();ToSend notmsg = new ToSend(ToSend.mType.notification,current.getId(),current.getZxid(),logicalclock.get(),self.getPeerState(),response.sid,current.getPeerEpoch(),qv.toString().getBytes());sendqueue.offer(notmsg);} else {// Receive new messageif (LOG.isDebugEnabled()) {LOG.debug("Receive new notification message. My id = "+ self.getId());}// State of peer that sent this messageQuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;//判断对方节点状态,这里是走LOOKING,表示正在选举中switch (rstate) {case 0:ackstate = QuorumPeer.ServerState.LOOKING;break;case 1:ackstate = QuorumPeer.ServerState.FOLLOWING;break;case 2:ackstate = QuorumPeer.ServerState.LEADING;break;case 3:ackstate = QuorumPeer.ServerState.OBSERVING;break;default:continue;}n.leader = rleader;n.zxid = rzxid;n.electionEpoch = relectionEpoch;n.state = ackstate;n.sid = response.sid;n.peerEpoch = rpeerepoch;n.version = version;n.qv = rqv;if (self.getPeerState() == QuorumPeer.ServerState.LOOKING) {//将对方的选票放入应用层的接收队列中recvqueue.offer(n);/** 这里判断如果对方的选举世代小于自己的,说明对方是新加入的节点,* 或者是崩溃恢复的节点,那么把自己的选票发给对方*/if ((ackstate == QuorumPeer.ServerState.LOOKING)&& (n.electionEpoch < logicalclock.get())) {Vote v = getVote();QuorumVerifier qv = self.getQuorumVerifier();ToSend notmsg = new ToSend(ToSend.mType.notification,v.getId(),v.getZxid(),logicalclock.get(),self.getPeerState(),response.sid,v.getPeerEpoch(),qv.toString().getBytes());//老样子,构建选票,然后放入应用层发送队列sendqueue.offer(notmsg);}} else {/** 如果已经选出了leader了,但是对方还处于looking阶段,那么直接把谁是leader的选票发送* 给对方*/Vote current = self.getCurrentVote();if (ackstate == QuorumPeer.ServerState.LOOKING) {//...QuorumVerifier qv = self.getQuorumVerifier();ToSend notmsg = new ToSend(ToSend.mType.notification,current.getId(),current.getZxid(),current.getElectionEpoch(),self.getPeerState(),response.sid,current.getPeerEpoch(),qv.toString().getBytes());//老样子,构建选票,然后放入应用层发送队列sendqueue.offer(notmsg);}}}} catch (InterruptedException e) {LOG.warn("Interrupted Exception while waiting for new message" +e.toString());}}LOG.info("WorkerReceiver is down");
}

到此startLeaderElection()才完全结束,理清里面的几个线程和队列的关系就应该不难理解了。

最后是选举的核心逻辑:super.start(),其实就是QuorumPeer的run方法:

    public void run() {//...省略try {/** 主循环,zk会在这个循环里运行,重复判断当前节点状态,进入相应的逻辑*/while (running) {switch (getPeerState()) {//现在是集群选举,所以会进入这个casecase LOOKING:LOG.info("LOOKING");if (Boolean.getBoolean("readonlymode.enabled")) {//...} else {try {reconfigFlagClear();if (shuttingDownLE) {shuttingDownLE = false;startLeaderElection();}//选举核心逻辑就在在这个方法,makeLEStrategy().lookForLeader()//会返回选出来的leader选票,并设置当前的选票//makeLEStrategy()拿到选举策略,就是之前的FastLeaderElectionsetCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) {LOG.warn("Unexpected exception", e);setPeerState(ServerState.LOOKING);}                        }break;case OBSERVING://观察者逻辑break;case FOLLOWING://follower逻辑try {LOG.info("FOLLOWING");setFollower(makeFollower(logFactory));follower.followLeader();} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {follower.shutdown();setFollower(null);updateServerState();}break;case LEADING://leader逻辑LOG.info("LEADING");try {setLeader(makeLeader(logFactory));leader.lead();setLeader(null);} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {if (leader != null) {leader.shutdown("Forcing shutdown");setLeader(null);}updateServerState();}break;}start_fle = Time.currentElapsedTime();}} finally {//...}}

直接看lookForLeader()方法,选举方法,这是一个巨长的方法,主要看核心代码:

public Vote lookForLeader() throws InterruptedException {//...try {//选票箱,用map表示,key为节点id,value为选票HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();int notTimeout = finalizeWait;synchronized(this){//更新逻辑时钟,就是选举世代加一logicalclock.incrementAndGet();//更新自己要投出的选票updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());}//发送选票,不进入了,就是把选票放入应用层的发送队列sendNotifications();/** 死循环,直到选出leader为止*/while ((self.getPeerState() == ServerState.LOOKING) &&(!stop)){/** 从应用层接收队列获取一张选票*/Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS);/** 一开始接收队列里还没有选票*/if(n == null){if(manager.haveDelivered()){//如果刚刚的选票已经发送了,那么在发一次//个人理解考虑到其它节点可能启动的比较晚,再发一次确保其它节点能收到sendNotifications();} else {//否则发起向其他节点的连接manager.connectAll();}//...} else if (validVoter(n.sid) && validVoter(n.leader)) {//处理收到的选票switch (n.state) {case LOOKING:// 选举核心逻辑if (n.electionEpoch > logicalclock.get()) {//如果对方的选举世代大于自己,则说明自己落后了//更新自己的逻辑时钟logicalclock.set(n.electionEpoch);recvset.clear();//和自己投的票做比较,这个方法下面分析if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {              //如果对方赢了,那么把要投的票改成对方的updateProposal(n.leader, n.zxid, n.peerEpoch);} else {//否则更新选票为自己的,也就是投自己updateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch());}//由于自己的选举世代落后,需要发送选票sendNotifications();} else if (n.electionEpoch < logicalclock.get()) {//如果对方的选举世代小于自己,直接无视}break;//到这里说明是同世代的节点,那么正常比较选票} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)) {//如果对方赢了,那么更新选票为对方的,然后发送,如果自己赢了的话//自然啥都不用做,自己之前的票有效updateProposal(n.leader, n.zxid, n.peerEpoch);sendNotifications();}//把对方选票放入投票箱recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));//真正的选举方法,会从选票箱中数选票,判断自己推选的那个节点是否可以胜出,下面分析if (termPredicate(recvset,new Vote(proposedLeader, proposedZxid,logicalclock.get(), proposedEpoch))) {//自己推选的那个节点赢了,再从接收队列拿选票,考虑到刚刚过程中又有选票发过来while((n = recvqueue.poll(finalizeWait,TimeUnit.MILLISECONDS)) != null){if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,proposedLeader, proposedZxid, proposedEpoch)){recvqueue.put(n);break;}}/** 如果没有选票了那么可以结束本此选举了* 如果有那么会重新循环*/if (n == null) {//结束选举,根据选举结果设置自己为follower或leaderself.setPeerState((proposedLeader == self.getId()) ?ServerState.LEADING: learningState());//设置选票Vote endVote = new Vote(proposedLeader,proposedZxid, logicalclock.get(), proposedEpoch);//清空接收队列leaveInstance(endVote);//返回选票return endVote;}}break;case OBSERVING:LOG.debug("Notification from observer: " + n.sid);break;case FOLLOWING:case LEADING:/** 如果对方已经是leader或follower了,说明选举已经结束了* 自己是后加入的,那么会作为follower加入集群*///...break;default:break;}} else {//...}}return null;} finally {try {if(self.jmxLeaderElectionBean != null){MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);}} catch (Exception e) {LOG.warn("Failed to unregister with JMX", e);}self.jmxLeaderElectionBean = null;LOG.debug("Number of connection processing threads: {}",manager.getConnectionThreadCount());}}

接下来看下比较选票和选出leader的两个方法,首先是比较选票逻辑,FastLeaderElection#

totalOrderPredicate:

protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));if(self.getQuorumVerifier().getWeight(newId) == 0){return false;}/** 其实比较选票的逻辑反而很简单:* 1- 先比较选举世代,较大的胜出* 2- 比较最大事务id,较大的胜出* 3- 比较节点id,较大的胜出*/return ((newEpoch > curEpoch) ||((newEpoch == curEpoch) &&((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}

然后看一下选出leader的逻辑,FastLeaderElection#termPredicate,第一个参数是投票箱,里面存放每个节点投出的选票,第二个参数自己推举的leader:

protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) {//更新一下集群数据验证器里的集群信息SyncedLearnerTracker voteSet = new SyncedLearnerTracker();voteSet.addQuorumVerifier(self.getQuorumVerifier());if (self.getLastSeenQuorumVerifier() != null&& self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()) {voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());}/** 把投给vote的选票作为ack加入voteSet*/for (Map.Entry<Long, Vote> entry : votes.entrySet()) {if (vote.equals(entry.getValue())) {voteSet.addAck(entry.getKey());}}//判断vote是否赢得选举return voteSet.hasAllQuorums();
}public boolean hasAllQuorums() {for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {//主要看这个判断if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()))return false;}//如果赢得过半票数,则指定节点当选为leaderreturn true;
}public boolean containsQuorum(Set<Long> ackSet) {//判断ackSet中的票数是否大于半数return (ackSet.size() > half);
}

选举结束后,跳出QuorumPeer的switch,再次进入循环,这时节点的状态已经变为leader或者follower,会分别进入leader或者follower对应的case,zookeeper始终在QuorumPeer的这个循环中,根据不同的节点状态进入不同的流程。

case FOLLOWING:try {LOG.info("FOLLOWING");setFollower(makeFollower(logFactory));//follower逻辑,开启连接leader的socket,接收leader发来的同步数据follower.followLeader();} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {follower.shutdown();setFollower(null);updateServerState();}break;
case LEADING:LOG.info("LEADING");try {setLeader(makeLeader(logFactory));//leader逻辑,向follower同步数据,维护follower心跳leader.lead();setLeader(null);} catch (Exception e) {LOG.warn("Unexpected exception",e);} finally {if (leader != null) {leader.shutdown("Forcing shutdown");setLeader(null);}updateServerState();}break;

这个两个逻辑就不讲了,大家有兴趣可以进去看下,也是很长的方法。

总结

可以看到,zookeeper选举底层用了bio,当然是因为bio简单啊,很容易编码,zookeeper集群节点很有限,选举用不着维护很多连接,所以bio并不会显得性能差。zookeeper通过传输层和应用层或者说逻辑层区分了选举核心逻辑线程,和底层收发选票的线程,并通过各种异步队列实现线程间通信,在强一致性的基础上尽量做大高性能,zookeeper至少要两次投票才能选出leader。本来想画个架构图,奈何博主美术差,等有机会吧。总的来说zookeeper的选举源码非常复杂,很多方法非常的长,本文也只是分析了核心的流程,更多的细节有待大家自行挖掘。然而zookeeper的zab原子广播的源码还要复杂得。。。多!

Zookeeper源码之集群选举相关推荐

  1. Dubbo 源码分析 - 集群容错之 LoadBalance

    1.简介 LoadBalance 中文意思为负载均衡,它的职责是将网络请求,或者其他形式的负载"均摊"到不同的机器上.避免集群中部分服务器压力过大,而另一些服务器比较空闲的情况.通 ...

  2. Dubbo 源码分析 - 集群容错之 Cluster

    1.简介 为了避免单点故障,现在的应用至少会部署在两台服务器上.对于一些负载比较高的服务,会部署更多台服务器.这样,同一环境下的服务提供者数量会大于1.对于服务消费者来说,同一环境下出现了多个服务提供 ...

  3. Dubbo 源码分析 - 集群容错之 Router

    1. 简介 上一篇文章分析了集群容错的第一部分 – 服务目录 Directory.服务目录在刷新 Invoker 列表的过程中,会通过 Router 进行服务路由.上一篇文章关于服务路由相关逻辑没有细 ...

  4. Dubbo 源码分析 - 集群容错之Directory

    1. 简介 前面文章分析了服务的导出与引用过程,从本篇文章开始,我将开始分析 Dubbo 集群容错方面的源码.这部分源码包含四个部分,分别是服务目录 Directory.服务路由 Router.集群 ...

  5. dubbo源码解析-集群容错架构设计

    前言 本来是想把整个dubbo源码解析一次性弄完,再做成一个系列来发布的,但是正巧最近有位好朋友要去杭州面试,就和我交流了一下.本着对dubbo源码略有心得的心态,在交流过程中也发表了个人的一些粗劣的 ...

  6. zookeeper源码分析之leader选举

    zookeeper提供顺序一致性.原子性.统一视图.可靠性保证服务 zookeeper使用的是zab(atomic broadcast protocol)协议而非paxos协议 zookeeper能处 ...

  7. java b2b2c shop 多用户商城系统源码- eureka集群整合hystrix框架

    继之前项目继续整合hystrix框架,hystrix框架为Netflix的模块,是一个容错框架.当用户访问服务调用者的时候,如果服务提供者出现异常导致无法正常返回出现请求超时的情况,而服务调用者并不知 ...

  8. zookeeper源码分析之五服务端(集群leader)处理请求流程

    leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcesso ...

  9. ZooKeeper 源码和实践揭秘

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 作者:runnerzhang,腾讯 CSIG 后台开发工程 ...

最新文章

  1. 深信服上网行为-域新组建模式单点登录不成功排错
  2. 【Android 逆向】类加载器 ClassLoader ( 使用 DexClassLoader 动态加载字节码文件 | 拷贝 DEX 文件到内置存储 | 加载并执行 DEX 字节码文件 )
  3. 使用WEB应用时后台发生的事
  4. 前端学习(781):格式化日期年月日星期
  5. python 迭代多个对象
  6. 【Spring Cloud】保护机制-Hystrix
  7. (转)AssetBundle系列——共享资源打包/依赖资源打包
  8. JavaScript的RegExp实例方法exec()
  9. 先学python再学c_初学者Python和C先学哪个好?
  10. python正则表达式面试题,带有utf8问题的python正则表达式
  11. HTTP响应代码中文详解
  12. 票据纸张尺寸对照表_粉丝要求,这期整理一版平面设计必备各种尺寸知识(收藏版))...
  13. shell 修改文件格式
  14. 江苏2021高考成绩查询全省排名,2021江苏省地区高考成绩排名查询,江苏省高考各高中成绩喜报榜单...
  15. 随机过程(1.2)—— 数学期望与条件期望
  16. sqlserver2000安装时提示挂起并重启
  17. vue - vue项目使用BOS (百度云对象存储)上传文件
  18. JS测试显示屏分辨率以及屏幕尺寸
  19. 脱壳笔记-手工脱FSG压缩壳
  20. 大公司github官网整理链接

热门文章

  1. 排队队---排列组合之插空法与捆绑法
  2. 2018年蓝桥杯A组C/C++决赛题解
  3. 低代码的价值,短期被高估,长期被低估
  4. 服务器安装与维护,服务器安装与维护 PPT课件
  5. 两款网页在线刷网站访客pv和ip的源码
  6. cas607-34-1|5-硝基喹啉|5-Nitroquinoline淡黄色晶体
  7. 新兴技术abcdefg_智能技术如何掩盖新兴的公司控制时代
  8. 测试人跳槽~怎么说离职原因新的公司比较能接受?
  9. MongoDB单机集群搭建
  10. 计算机数学与数学文化-定义