摘要

本博文主要介绍Zookeeper的选举机制的原理与Zookeeper事务请求处理的原理。

一、zookeeper选举算法原理

Leader 服务器的作用是管理 ZooKeeper 集群中的其他服务器。因此,如果是单独一台服务器,不构成集群规模。在 ZooKeeper 服务的运行中不会选举 Leader 服务器,也不会作为 Leader 服务器运行。一个 ZooKeeper 服务要想满足集群方式运行,至少需要三台服务器。本课时我们就以三台服务器组成的 ZooKeeper 集群为例,介绍一下 Leader 服务器选举的内部过程和底层实现。Leader 服务器的选举操作主要发生在两种情况下。

  • 第一种就是 ZooKeeper 集群服务启动的时候,
  • 第二种就是在 ZooKeeper 集群中旧的 Leader 服务器失效时,这时 ZooKeeper 集群需要选举出新的 Leader 服务器。

1.1 集群服务启动Leader选举机制

在 ZooKeeper 集群启动时,需要在集群中的服务器之间确定一台 Leader 服务器。当 ZooKeeper 集群中的三台服务器启动之后,首先会进行通信检查,如果集群中的服务器之间能够进行通信。集群中的三台机器开始尝试寻找集群中的 Leader 服务器并进行数据同步等操作。如何这时没有搜索到 Leader 服务器,说明集群中不存在 Leader 服务器。这时 ZooKeeper 集群开始发起 Leader 服务器选举。在整个 ZooKeeper 集群中 Leader 选举主要可以分为三大步骤分别是:发起投票、接收投票、统计投票。

1.1.1 发起投票

在 ZooKeeper 服务器集群初始化启动的时候,集群中的每一台服务器都会将自己作为 Leader 服务器进行投票。也就是每次投票时,发送的服务器的 myid(服务器标识符)和 ZXID (集群投票信息标识符)等选票信息字段都指向本机服务器。 而一个投票信息就是通过这两个字段组成的。以集群中三个服务器 Serverhost1、Serverhost2、Serverhost3 为例,三个服务器的投票内容分别是:Severhost1 的投票是(1,0)、Serverhost2 服务器的投票是(2,0)、Serverhost3 服务器的投票是(3,0)。

1.1.2 接收投票

集群中各个服务器在发起投票的同时,也通过网络接收来自集群中其他服务器的投票信息。

在接收到网络中的投票信息后,服务器内部首先会判断该条投票信息的有效性。检查该条投票信息的时效性,是否是本轮最新投票,并检查该条投票信息是否是处于LOOKING状态的服务器发的。

1.1.3 统计投票

在接收到投票后,ZooKeeper 集群就该处理和统计投票结果了。对于每条接收到的投票信息,集群中的每一台服务器都会将自己的投票信息与其接收到的 ZooKeeper 集群中的其他投票信息进行对比。主要进行对比的内容是 ZXID,ZXID 数值比较大的投票信息优先作为 Leader 服务器。如果每个投票信息中的 ZXID 相同,就会接着比对投票信息中的 myid 信息字段,选举出 myid 较大的服务器作为 Leader 服务器。

拿上面列举的三个服务器组成的集群例子来说,对于 Serverhost1,服务器的投票信息是(1,0),该服务器接收到的 Serverhost2 服务器的投票信息是(2,0)。在 ZooKeeper 集群服务运行的过程中,首先会对比 ZXID,发现结果相同之后,对比 myid,发现 Serverhost2 服务器的 myid 比较大,于是更新自己的投票信息为(2,0),并重新向 ZooKeeper 集群中的服务器发送新的投票信息。而 Serverhost2 服务器则保留自身的投票信息,并重新向 ZooKeeper 集群服务器中发送投票信息。

而当每轮投票过后,ZooKeeper 服务都会统计集群中服务器的投票结果,判断是否有过半数的机器投出一样的信息。如果存在过半数投票信息指向的服务器,那么该台服务器就被选举为 Leader 服务器。比如上面我们举的例子,ZooKeeper 集群会选举Severhost2服务器作为 Leader 服务器。

当 ZooKeeper 集群选举出 Leader 服务器后,ZooKeeper 集群中的服务器就开始更新自己的角色信息,除被选举成 Leader 的服务器之外,其他集群中的服务器角色变更为 Following。

1.2 服务运行时的 Leader 选举

在 ZooKeeper 集群服务的运行过程中,Leader 服务器作为处理事物性请求以及管理其他角色服务器,在 ZooKeeper 集群中起到关键的作用。当 ZooKeeper 集群中的Leader 服务器发生崩溃时,集群会暂停处理事务性的会话请求,直到 ZooKeeper 集群中选举出新的 Leader 服务器。而整个 ZooKeeper 集群在重新选举 Leader 时也经过了四个过程,分别是变更服务器状态、发起投票、接收投票、统计投票。其中,与初始化启动时 Leader 服务器的选举过程相比,变更状态和发起投票这两个阶段的实现是不同的。下面我们来分别看看这两个阶段。

1.2.1 变更状态

与上面介绍的 ZooKeeper 集群服务器初始化阶段不同。在 ZooKeeper 集群服务运行的过程中,集群中每台服务器的角色已经确定了,当 Leader 服务器崩溃后 ,ZooKeeper 集群中的其他服务器会首先将自身的状态信息变为 LOOKING 状态,该状态表示服务器已经做好选举新 Leader 服务器的准备了,这之后整个 ZooKeeper 集群开始进入选举新的 Leader 服务器过程。

1.2.2 发起投票

ZooKeeper 集群重新选举 Leader 服务器的过程中发起投票的过程与初始化启动时发起投票的过程基本相同。首先每个集群中的服务器都会投票给自己,将投票信息中的 Zxid 和 myid 分别指向本机服务器。

1.2.3 Zookeeper选举算法源码分析

ZooKeeper 中实现的选举算法有三种,而在目前的 ZooKeeper 3.6 版本后,只支持 “快速选举” 这一种算法。而在代码层面的实现中,QuorumCnxManager 作为核心的实现类,用来管理 Leader 服务器与 Follow 服务器的 TCP 通信,以及消息的接收与发送等功能。在 QuorumCnxManager 中,主要定义了 ConcurrentHashMap<Long, SendWorker> 类型的 senderWorkerMap 数据字段,用来管理每一个通信的服务器。

public class QuorumCnxManager {final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;}

在QuorumCnxManager类的内部,定义了RecvWorker内部类。该类继承了一个ZooKeeperThread 类的多线程类。主要负责消息接收。在 ZooKeeper 的实现中,为每一个集群中的通信服务器都分配一个 RecvWorker,负责接收来自其他服务器发送的信息。在 RecvWorker 的 run 函数中,不断通过 queueSendMap 队列读取信息。

class SendWorker extends ZooKeeperThread {Long sid;Socket sock;volatile boolean running = true;DataInputStream din;final SendWorker sw;public void run() {threadCnt.incrementAndGet();while (running && !shutdown && sock != null) {int length = din.readInt();if (length <= 0 || length > PACKETMAXSIZE) {throw new IOException("Received packet with invalid packet: "+ length);}byte[] msgArray = new byte[length];din.readFully(msgArray, 0, length);ByteBuffer message = ByteBuffer.wrap(msgArray);addToRecvQueue(new Message(message.duplicate(), sid));}}}

除了接收信息的功能外,QuorumCnxManager 内还定义了一个 SendWorker 内部类用来向集群中的其他服务器发送投票信息。如下面的代码所示。在 SendWorker 类中,不会立刻将投票信息发送到 ZooKeeper 集群中,而是将投票信息首先插入到 pollSendQueue 队列,之后通过 send 函数进行发送。

   class SendWorker extends ZooKeeperThread {Long sid;Socket sock;RecvWorker recvWorker;volatile boolean running = true;DataOutputStream dout;public void run() {while (running && !shutdown && sock != null) {ByteBuffer b = null;try {ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);if (bq != null) {b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);} else {LOG.error("No queue of incoming messages for " +"server " + sid);break;}if (b != null) {lastMessageSent.put(sid, b);send(b);}} catch (InterruptedException e) {LOG.warn("Interrupted while waiting for message on queue",e);}}}}

实现了投票信息的发送与接收后,接下来我们就来看看如何处理投票结果。在 ZooKeeper 的底层,是通过 FastLeaderElection 类实现的。如下面的代码所示,在 FastLeaderElection 的内部,定义了最大通信间隔 maxNotificationInterval、服务器等待时间 finalizeWait 等属性配置。

public class FastLeaderElection implements Election {final static int maxNotificationInterval = 60000;final static int IGNOREVALUE = -1QuorumCnxManager manager;}

在 ZooKeeper 底层通过 getVote 函数来设置本机的投票内容,如下图面的代码所示,在 getVote 中通过 proposedLeader 服务器信息、proposedZxid 服务器 ZXID、proposedEpoch 投票轮次等信息封装投票信息。

synchronized public Vote getVote(){return new Vote(proposedLeader, proposedZxid, proposedEpoch);}

在完成投票信息的封装以及投票信息的接收和发送后。一个 ZooKeeper 集群中,Leader 服务器选举底层实现的关键步骤就已经介绍完了。 Leader 节点的底层实现过程的逻辑相对来说比较简单,基本分为封装投票信息、发送投票、接收投票等。

那就是之前崩溃的 Leader 服务器是否会参与本次投票,以及是否能被重新选举为 Leader 服务器。这主要取决于在选举过程中旧的 Leader 服务器的运行状态。如果该服务器可以正常运行且可以和集群中其他服务器通信,那么该服务器也会参与新的 Leader 服务器的选举,在满足条件的情况下该台服务器也会再次被选举为新的 Leader 服务器。

二、Leader 与 Follower 的数据同步策略

在我们介绍 ZooKeeper 集群数据同步之前,先要清楚为什么要进行数据同步。在 ZooKeeper 集群服务运行过程中,主要负责处理发送到 ZooKeeper 集群服务端的客户端会话请求。这些客户端的会话请求基本可以分为事务性的会话请求和非事务性的会话请求,而这两种会话的本质区别在于,执行会话请求后,ZooKeeper 集群服务器状态是否发生改变。

事物性会话请求最常用的操作类型有节点的创建、删除、更新等操作。而查询数据节点等会话请求操作就是非事务性的,因为查询不会造成 ZooKeeper 集群中服务器上数据状态的变更 。

分布式环境下经常会出现 CAP 定义中的一致性问题。比如当一个 ZooKeeper 集群服务器中,Leader 节点处理了一个节点的创建会话操作后,该 Leader 服务器上就新增了一个数据节点。而如果不在 ZooKeeper 集群中进行数据同步,那么其他服务器上的数据则保持旧有的状态,新增加的节点在服务器上不存在。当 ZooKeeper 集群收到来自客户端的查询请求时,会出现该数据节点查询不到的情况,这就是典型的集群中服务器数据不一致的情况。为了避免这种情况的发生,在进行事务性请求的操作后,ZooKeeper 集群中的服务器要进行数据同步,而主要的数据同步是从 Learnning 服务器同步 Leader 服务器上的数据。接下来我们再学习一下 ZooKeeper 集群中数据同步的方法。我们主要通过三个方面来讲解 ZooKeeper 集群中的同步方法,分别是同步条件、同步过程、同步后的处理。

2.1 同步条件

同步条件是指在 ZooKeeper 集群中何时触发数据同步的机制。与上一课时中 Leader 选举首先要判断集群中 Leader 服务器是否存在不同,要想进行集群中的数据同步,首先需要 ZooKeeper 集群中存在用来进行数据同步的Learning 服务器。 也就是说,当 ZooKeeper 集群中选举出 Leader 节点后,除了被选举为 Leader 的服务器,其他服务器都作为 Learnning 服务器,并向 Leader 服务器注册。之后系统就进入到数据同步的过程中。

2.2 同步过程

在数据同步的过程中,ZooKeeper 集群的主要工作就是将那些没有在 Learnning 服务器上执行过的事务性请求同步到 Learning 服务器上。这里请你注意,事务性的会话请求会被同步,而像数据节点的查询等非事务性请求则不在数据同步的操作范围内。 而在具体实现数据同步的时候,ZooKeeper 集群又提供四种同步方式,如下图所示:

2.2.1 DIFF 同步

DIFF 同步即差异化同步的方式,在 ZooKeeper 集群中,Leader 服务器探测到 Learnning 服务器的存在后,首先会向该 Learnning 服务器发送一个 DIFF 不同指令。在收到该条指令后,Learnning 服务器会进行差异化方式的数据同步操作。在这个过程中,Leader 服务器会将一些 Proposal 发送给 Learnning 服务器。之后 Learnning 服务器在接收到来自 Leader 服务器的 commit 命令后执行数据持久化的操作。

2.2.2 TRUNC+DIFF 同步

TRUNC+DIFF 同步代表先回滚再执行差异化的同步,这种方式一般发生在 Learnning 服务器上存在一条事务性的操作日志,但在集群中的 Leader 服务器上并不存在的情况 。发生这种情况的原因可能是 Leader 服务器已经将事务记录到本地事务日志中,但没有成功发起 Proposal 流程。当这种问题产生的时候,ZooKeeper 集群会首先进行回滚操作,在 Learning 服务器上的数据回滚到与 Leader 服务器上的数据一致的状态后,再进行 DIFF 方式的数据同步操作。

2.2.3 TRUNC 同步

TRUNC 同步是指仅回滚操作,就是将 Learnning 服务器上的操作日志数据回滚到与 Leader 服务器上的操作日志数据一致的状态下。之后并不进行 DIFF 方式的数据同步操作。

2.2.4 SNAP 同步

SNAP 同步的意思是全量同步,是将 Leader 服务器内存中的数据全部同步给 Learnning 服务器。在进行全量同步的过程中,Leader 服务器首先会向 ZooKeeper 集群中的 Learning 服务器发送一个 SNAP 命令,在接收到 SNAP 命令后, ZooKeeper 集群中的 Learning 服务器开始进行全量同步的操作。随后,Leader 服务器会从内存数据库中获取到全量数据节点和会话超时时间记录器,将他们序列化后传输给 Learnning 服务器。Learnning 服务器接收到该全量数据后,会对其反序列化后载入到内存数据库中。

2.3 同步后的处理

数据同步的本质就是比对 Leader 服务器与 Learning 服务器,将Leader服务器上的数据增加到 Learnning服务器,再将Learnning 服务器上多余的事物日志回滚。前面的介绍已经完成了数据的对比与传递操作,接下来就在 Learning 服务器上执行接收到的事物日志,进行本地化的操作。

2.4 数据的同步的源码分析

ZooKeeper 底层实现了一个 Learner 类,该类可以看作是集群中 Learnning 服务器的实例对象,与集群中的 Learning 服务器是一一对应的。

public class Learner {}

而在 Learner 类的内部,主要通过 syncWithLeader 函数来处理来自 Leader 服务器的命令。在接收到来自 Leader 服务器的命令后,通过 qp.getType() 方法判断数据同步的方式。

protected void syncWithLeader(long newLeaderZxid) throws Exception{if (qp.getType() == Leader.DIFF) {snapshotNeeded = false;}else if (qp.getType() == Leader.TRUNC) {}}

在确定了数据同步的方式后,再调用 packetsCommitted.add(qp.getZxid()) 方法将事物操作同步到处理队列中,之后调用事物操作线程进行处理。

if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) {QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData()));boolean majorChange = self.processReconfig(qv, ByteBuffer.wrap(qp.getData()).getLong(),qp.getZxid(), true);if (majorChange) {throw new Exception("changes proposed in reconfig");}}if (!writeToTxnLog) {if (pif.hdr.getZxid() != qp.getZxid()) {LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());} else {zk.processTxn(pif.hdr, pif.rec);packetsNotCommitted.remove();}} else {packetsCommitted.add(qp.getZxid());

三、leader和Follower的事务处理与调度分析

ZooKeeper 集群接收到来自客户端的会话请求操作后,首先会判断该条请求是否是事务性的会话请求。对于事务性的会话请求,ZooKeeper 集群服务端会将该请求统一转发给 Leader 服务器进行操作。Leader 服务器内部执行该条事务性的会话请求后,再将数据同步给其他角色服务器,从而保证事务性会话请求的执行顺序,进而保证整个 ZooKeeper 集群的数据一致性。在 ZooKeeper 集群的内部实现中,是通过什么方法保证所有 ZooKeeper 集群接收到的事务性会话请求都能交给 Leader 服务器进行处理的呢?

在 ZooKeeper 集群内部,集群中除 Leader 服务器外的其他角色服务器接收到来自客户端的事务性会话请求后,必须将该条会话请求转发给 Leader 服务器进行处理。 ZooKeeper 集群中的 Follow 和 Observer 服务器,都会检查当前接收到的会话请求是否是事务性的请求,如果是事务性的请求,那么就将该请求以 REQUEST 消息类型转发给 Leader 服务器。

在 ZooKeeper集群中的服务器接收到该条消息后,会对该条消息进行解析。分析出该条消息所包含的原始客户端会话请求。之后将该条消息提交到自己的 Leader 服务器请求处理链中,开始进行事务性的会话请求操作。如果不是事务性请求,ZooKeeper 集群则交由 Follow 和 Observer 角色服务器处理该条会话请求,如查询数据节点信息。

3.1 Leader节点事务处理与调度分析

3.1.1 发起与提交写请求处理。

在 ZooKeeper 集群接收到来自客户端的一个 setData 会话请求后,其内部的处理逻辑基本可以分成四个部分。如下图所示,分别是预处理阶段、事务处理阶段、事务执行阶段、响应客户端。

  • 预处理阶段

主要工作是通过网络 I/O 接收来自客户端的会话请求。判断该条会话请求的类型是否是事务性的会话请求,之后将该请求提交给

PrepRequestProcessor 处理器进行处理。封装请求事务头并检查会话是否过期,最后反序列化事务请求信息创建 setDataRequest 请求,在 setDataRequest 记录中包含了要创建数据的节点的路径、数据节点的内容信息以及数据节点的版本信息。最后将该请求存放在 outstandingChanges 队列中等待之后的处理。

  • 事务处理阶段:

在事务处理阶段,ZooKeeper 集群内部会将该条会话请求提交给 ProposalRequestProcessor 处理器进行处理。本阶段内部又分为提交、同步、统计三个步骤

  • 事务执行阶段:

在经过预处理阶段和事务会话的投票发起等操作后,一个事务性的会话请求都已经准备好了,接下来就是在 ZooKeeper 的数据库中执行该条会话的数据变更操作。

在处理数据变更的过程中,ZooKeeper 内部会将该请求会话的事务头和事务体信息直接交给内存数据库 ZKDatabase 进行事务性的持久化操作。之后返回 ProcessTxnResult 对象表明操作结果是否成功。

  • 响应客户端:

在 ZooKeeper 集群处理完客户端 setData 方法发送的数据节点创建请求后,会将处理结果发送给客户端。而在响应客户端的过程中,ZooKeeper 内部首先会创建一个 setDataResponse 响应体类型,该对象主要包括当前会话请求所创建的数据节点,以及其最新状态字段信息 stat。之后创建请求响应头信息,响应头作为客户端请求响应的重要信息,客户端在接收到 ZooKeeper 集群的响应后,通过解析响应头信息中的事务 ZXID 和请求结果标识符 err 来判断该条会话请求是否成功执行。

  • 事务处理源码分析

首先,ZooKeeper 集群在收到客户端发送的事务性会话请求后,会对该请求进行预处理。在代码层面,ZooKeeper 通过调用 PrepRequestProcessor 类来实现预处理阶段的全部逻辑。可以这样理解:在处理客户端会话请求的时候,首先调用的就是 PrepRequestProcessor 类。而在 PrepRequestProcessor 内部,是通过 pRequest 方法判断客户端发送的会话请求类型。如果是诸如 setData 数据节点创建等事务性的会话请求,就调用 pRequest2Txn 方法进一步处理。

protected void pRequest(Request request){...switch (request.type) {case OpCode.setData:SetDataRequest setDataRequest = new SetDataRequest();                pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);break;}}

而在 pRequest2Txn 方法的内部,就实现了预处理阶段的主要逻辑。如下面的代码所示,首先通过 checkSession 方法检查该条会话请求是否有效(比如会话是否过期等),之后调用 checkACL 检查发起会话操作的客户端在 ZooKeeper 服务端是否具有相关操作的权限。最后将该条会话创建的相关信息,诸如 path 节点路径、data 节点数据信息、version 节点版本信息等字段封装成 setDataRequest 类型并传入到 setTxn 方法中,最后加入处理链中进行处理。

case OpCode.setData:zks.sessionTracker.checkSession(request.sessionId, request.getOwner());SetDataRequest setDataRequest = (SetDataRequest)record;if(deserialize)ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);path = setDataRequest.getPath();validatePath(path, request.sessionId);nodeRecord = getRecordForPath(path);checkACL(zks, request.cnxn, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo, path, null);int newVersion = checkAndIncVersion(nodeRecord.stat.getVersion(), setDataRequest.getVersion(), path);request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());nodeRecord.stat.setVersion(newVersion);addChangeRecord(nodeRecord);

当一个业务场景在查询操作多而创建删除等事务性操作少的情况下,ZooKeeper 集群的性能表现的就会很好。而如果是在极端情况下,ZooKeeper 集群只有事务性的会话请求而没有查询操作,那么 Follow 和 Observer 服务器就只能充当一个请求转发服务器的角色, 所有的会话的处理压力都在 Leader 服务器。在处理性能上整个集群服务器的瓶颈取决于 Leader 服务器的性能。ZooKeeper 集群的作用只能保证在 Leader 节点崩溃的时候,重新选举出 Leader 服务器保证系统的稳定性。这也是 ZooKeeper 设计的一个缺点。

3.1.2 与learner保持心跳

3.1.3 崩溃恢复时负责恢复数据以及同步数据到Learner。

3.2 Follower节点事务处理与调度分析

ZooKeeper 集群重新选举 Leader 的过程本质上只有 Follow 服务器参与工作。而在 ZooKeeper 集群重新选举 Leader 节点的过程中,如下图所示。主要可以分为 Leader 失效发现、重新选举 Leader 、Follow 服务器角色变更、集群同步这几个步骤。

3.2.1 Leader 失效发现

在 ZooKeeper 集群中,当 Leader 服务器失效时,ZooKeeper 集群会重新选举出新的 Leader 服务器。也就是说,Leader 服务器的失效会触发 ZooKeeper 开始新 Leader 服务器的选举,那么在 ZooKeeper 集群中,又是如何发现 Leader 服务器失效的呢?

我们之前介绍的保持客户端活跃性的方法,它是通过客户端定期向服务器发送 Ping 请求来实现的。在 ZooKeeper 集群中,探测 Leader 服务器是否存活的方式与保持客户端活跃性的方法非常相似。首先,Follow 服务器会定期向 Leader 服务器发送 网络请求,在接收到请求后,Leader 服务器会返回响应数据包给 Follow 服务器,而在 Follow 服务器接收到 Leader 服务器的响应后,如果判断 Leader 服务器运行正常,则继续进行数据同步和服务转发等工作,反之,则进行 Leader 服务器的重新选举操作。

3.2.2 Leader 重新选举

当 Follow 服务器向 Leader 服务器发送状态请求包后,如果没有得到 Leader 服务器的返回信息,这时,如果是集群中个别的 Follow 服务器发现返回错误,并不会导致 ZooKeeper 集群立刻重新选举 Leader 服务器,而是将该 Follow 服务器的状态变更为 LOOKING 状态,并向网络中发起投票,当 ZooKeeper 集群中有更多的机器发起投票,最后当投票结果满足多数原则的情况下。ZooKeeper 会重新选举出 Leader 服务器。

3.2.3 Follow 角色变更

在 ZooKeeper 集群中,Follow 服务器作为 Leader 服务器的候选者,当被选举为 Leader 服务器之后,其在 ZooKeeper 集群中的 Follow 角色,也随之发生改变。也就是要转变为 Leader 服务器,并作为 ZooKeeper 集群中的 Leader 角色服务器对外提供服务。

3.2.4 集群同步数据

在 ZooKeeper 集群成功选举 Leader 服务器,并且候选 Follow 服务器的角色变更后。为避免在这期间导致的数据不一致问题,ZooKeeper 集群在对外提供服务之前,会通过 Leader 角色服务器管理同步其他角色服务器。

3.2.5 follower源码分析

ZooKeeper 集群会先判断 Leader 服务器是否失效,而判断的方式就是 Follow 服务器向 Leader 服务器发送请求包,之后 Follow 服务器接收到响应数据后,进行解析,如下面的代码所示,Follow 服务器会根据返回的数据,判断 Leader 服务器的运行状态,如果返回的是 LOOKING 关键字,表明与集群中 Leader 服务器无法正常通信。

switch (rstate) {
case 0:ackstate = QuorumPeer.ServerState.LOOKING;break;
case 1:ackstate = QuorumPeer.ServerState.FOLLOWING;break;
case 2:ackstate = QuorumPeer.ServerState.LEADING;break;
case 3:ackstate = QuorumPeer.ServerState.OBSERVING;break;
default:continue;

之后,在 ZooKeeper 集群选举 Leader 服务器时,是通过 FastLeaderElection 类实现的。该类实现了 TCP 方式的通信连接,用于在 ZooKeeper 集群中与其他 Follow 服务器进行协调沟通。FastLeaderElection 类继承了 Election 接口,定义其是用来进行选举的实现类。而在其内部,又定义了选举通信相关的一些配置参数,比如 finalizeWait 最终等待时间、最大通知间隔时间 maxNotificationInterval 等。

在选举的过程中,首先调用 ToSend 函数向 ZooKeeper 集群中的其他角色服务器发送本机的投票信息,其他服务器在接收投票信息后,会对投票信息进行有效性验证等操作,之后 ZooKeeper 集群统计投票信息,如果过半数的机器投票信息一致,则集群就重新选出新的 Leader 服务器。

static public class ToSend {static enum mType {crequest, challenge, notification, ack}ToSend(mType type,long leader,long zxid,long electionEpoch,ServerState state,long sid,long peerEpoch,byte[] configData) {this.leader = leader;this.zxid = zxid;this.electionEpoch = electionEpoch;this.state = state;this.sid = sid;this.peerEpoch = peerEpoch;this.configData = configData;}

3.2.6 非事务性请求处理过程

在 ZooKeeper 集群接收到来自客户端的请求后,会首先判断该会话请求的类型,如是否是事务性请求。所谓事务性请求,是指 ZooKeeper 服务器执行完该条会话请求后,是否会导致执行该条会话请求的服务器的数据或状态发生改变,进而导致与其他集群中的服务器出现数据不一致的情况

当 ZooKeeper 集群接收到来自客户端发送的查询会话请求后,会将该客户端请求分配给 Follow 服务器进行处理。而在 Follow 服务器的内部,也采用了责任链的处理模式来处理来自客户端的每一个会话请求。

我们学习了 Leader 服务器的处理链过程,分别包含预处理器阶段、Proposal 提交处理器阶段以及 final 处理器阶段。与 Leader 处理流程不同的是,在 Follow 角色服务器的处理链执行过程中,FollowerRequestProcessor 作为第一个处理器,主要负责筛选该条会话请求是否是事务性的会话请求。如果是事务性的会话请求,则转发给 Leader 服务器进行操作。如果不是事务性的会话请求,则交由 Follow 服务器处理链上的下一个处理器进行处理。

而下一个处理器是 CommitProcessor ,该处理器的作用是对来自集群中其他服务器的非事务性请求和本地服务器的提交请求操作进行匹配。匹配的方式是,将本地执行的 sumbit 提交请求,与集群中其他服务器接收到的 Commit 会话请求进行匹配,匹配完成后再交由 Follow 处理链上的下一个处理器进行处理。最终,当一个客户端会话经过 Final 处理器操作后,就完成了整个 Follow 服务器的会话处理过程,并将结果响应给客户端。

3.2.7 非事务请求源码分析

ZooKeeper 集群在接收到来自客户端的请求后,会将非请求交给 Follow 服务器进行处理。而 Follow 服务器内部首先调用的是 FollowerZooKeeperServer 类,该类的作用是封装 Follow 服务器的属性和行为,你可以把该类当作一台 Follow 服务器的代码抽象。

如下图所示,该 FollowerZooKeeperServer 类继承了 LearnerZooKeeperServer 。在一个 FollowerZooKeeperServer 类内部,定义了一个核心的 ConcurrentLinkedQueue 类型的队列字段,用于存放接收到的会话请求。

在定义了 FollowerZooKeeperServer 类之后,在该类的 setupRequestProcessors 函数中,定义了我们之前一直反复提到的处理责任链,指定了该处理链上的各个处理器。如下面的代码所示,分别按顺序定义了起始处理器 FollowerRequestProcessor 、提交处理器 CommitProcessor、同步处理器 SendAckRequestProcessor 以及最终处理器 FinalProcessor。

protected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());commitProcessor.start();firstProcessor = new FollowerRequestProcessor(this, commitProcessor);((FollowerRequestProcessor) firstProcessor).start();syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower()));syncProcessor.start();
}

博文参考

Zookeeper——选举机制原理与Leader和Follower作用相关推荐

  1. Zookeeper选举算法原理

    Zookeeper选举算法原理 Leader选举 Leader选举是保证分布式数据一致性的关键所在.当Zookeeper集群中的一台服务器出现以下两种情况之一时,需要进入Leader选举. (1) 服 ...

  2. 学习笔记:Zookeeper选举机制

    1.Zookeeper选举机制 Zookeeper虽然在配置文件中并没有指定master和slave 但是,zookeeper工作时,是有一个节点为leader,其他则为follower Leader ...

  3. 2021年大数据ZooKeeper(六):ZooKeeper选举机制

    目录 ​​​​​​ZooKeeper选举机制 概念 全新集群选举 非全新集群选举 ZooKeeper选举机制 zookeeper默认的算法是FastLeaderElection,采用投票数大于半数则胜 ...

  4. Zookeeper watch机制原理

    Zookeeper watch机制原理 准备工作 watch示例 源码解析 总结 准备工作 经过上一小节的学习我们知道ZookeeperServerMain是单机版的服务器主类 我们可以自己写一个Zk ...

  5. ZooKeeper学习总结(4)——Zookeeper选举机制总结

    Zookeeper 是一个分布式服务框架,主要是用来解决分布式应用中遇到的一些数据管理问题如:统一命名服务.状态同步服务.集群管理.分布式应用配置项的管理等.我们可以简单把 Zookeeper 理解为 ...

  6. 理解zookeeper选举机制

    转载:https://www.cnblogs.com/shuaiandjun/p/9383655.html 一.zookeeper集群 配置多个实例共同构成一个集群对外提供服务以达到水平扩展的目的,每 ...

  7. zookeeper选举机制及相关概念

    一.zookeeper的一些概念 server和client: server 指集群的每一台机器 client 指每一个向server请求服务的机器 zookeeper角色: leader:为客户端提 ...

  8. Kafka Partition Leader选举机制原理详解

    1 大数据常用的选主机制 Leader选举算法非常多,大数据领域常用的有以下两种: 1.1 Zab(zookeeper使用) Zab协议有四个阶段 Leader election Discovery ...

  9. Zookeeper选举机制测试

    集群分布式 三台物理机为112.113.114 112,为leader 113,为follower 114,为follower 宕机测试 比如,112宕机,关闭112服务器 检测到主节点宕机,113. ...

最新文章

  1. Spring Annotation(@Autowire、@Qualifier)
  2. 跨浏览器开发:CSS代码的金科玉律
  3. 如何将 Linq 的查询结果转为 HashSet ?
  4. 系统垃圾清理.cmd
  5. Java8 方法引用
  6. linux内核mtd分区,linux-kernel – 在运行时调整MTD分区大小
  7. 中国移动老功臣退休致辞:工作结束了 人生没结束
  8. 【异常】No suitable driver
  9. CRM 权限设置 ss
  10. [Java] 蓝桥杯BASIC-22 基础练习 FJ的字符串
  11. SQLConnect
  12. 数据库信息查询(作者不是我)
  13. VMware 8.02虚拟机安装MAC lion 10.7.3教程 附送原版提取镜像InstallESD.iso!
  14. 第108章 属性关键字 - Required
  15. 照着这本“书”,3年量产自动驾驶卡车
  16. 基于微信小程序的教学评价平台设计与实现
  17. Hibernate征途(四)之映射 序
  18. 我开发过程中遇到的Echarts地图立体描边问题解决方式
  19. Opera 11.01的Bug
  20. 开启定位权限还是定位失败

热门文章

  1. 把onnx模型转TensorRT模型的trt模型报错:Your ONNX model has been generated with INT64 weights. while TensorRT
  2. java applet启动调试,java applet 运行环境调试记要
  3. 计算机通信电气电子适合女生学,工科中适合女生的专业和不适合女生的专业有哪些...
  4. setAttribute、getAttribute、removeAttribute
  5. ITIL知识管理分析及如何实施
  6. Python数据分析案例03——天气K均值聚类分析
  7. 甄别野鸡大学-中国大学查询
  8. 韦东山鸿蒙开发教程02 - 资料下载方法
  9. [计算机网络笔记01] Packet Tracer的简单使用
  10. Android音频管理总结(个人笔记)