关于raft算法相关细节,可以全看之前的文章 分布式一致性算法,两阶段提交,三阶段提交,Paxos,Raft,zookeeper的选主过程,zab协议,顺序一致性,数据写入流程,节点状态,节点的角色

这里我们说下阿里开源的sofa-jraft的实现。
首先说明下,在sofa-jraft有几个比较重要的角色

  • Node 代表的就是一个服务节点
  • Ballot 代表的是一次投票的相关信息
  • PeerId 代表的是一个复制组里面的一个参与角色
  • StateMachine 当数据提交到Node之后,会执行其onApply方法

另外Node中有几个比较重要的定时器:

  • electionTimer 选举定时器,如果当前leader挂了,会进行preVote
  • voteTimer 投票定时器,当投票超时后,会进行preVote
  • stepDownTimer leader使用,判断当前节点是否存活,且检察整个集群是否有节点下线并更新Leader节点的Timestamp

选主投票

JRaft的选举投票有两个步骤preVotevote,之所以要增加一个preVote的步骤,是为了解决系统中防止某个节点由于无法和leader同步,不断发起投票,抬升自己的Term,导致自己Term比Leader的Term还大,然后迫使Leader放弃Leader身份,开始新一轮的选举。
preVote则强调节点必须获得半数以上的投票才能开始发起新一轮的选举。

JRaft的选举是通过定时器超时开始的,在NodeImpl中(Node的具体实现类),当我们执行NodeImpl.init的时候,会开启electionTimer:

this.electionTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(),TIMER_FACTORY.getElectionTimer(this.options.isSharedElectionTimer(), name)) {protected void onTrigger() {handleElectionTimeout();}protected int adjustTimeout(final int timeoutMs) {return randomTimeout(timeoutMs);}};private void handleElectionTimeout() {boolean doUnlock = true;this.writeLock.lock();try {if (this.state != State.STATE_FOLLOWER) {return;}if (isCurrentLeaderValid()) {return;}resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "Lost connection from leader %s.",this.leaderId));// Judge whether to launch a election.if (!allowLaunchElection()) {return;}doUnlock = false;preVote();} finally {if (doUnlock) {this.writeLock.unlock();}}}

handleElectionTimeout中主要就是进行了preVote操作,这里JRaft一次投票的主要几个操作如下:

preVote ===> handlePreVoteRequest ===> electSelf ===>handleRequestVoteRequest

我们首先看下preVote:

private void preVote() {.....final LogId lastLogId = this.logManager.getLastLogId(true);boolean doUnlock = true;this.writeLock.lock();try {// pre_vote need defense ABA after unlock&writeLockif (oldTerm != this.currTerm) {LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);return;}this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());for (final PeerId peer : this.conf.listPeers()) {if (peer.equals(this.serverId)) {continue;}if (!this.rpcService.connect(peer.getEndpoint())) {LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());continue;}final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);done.request = RequestVoteRequest.newBuilder() //.setPreVote(true) // it's a pre-vote request..setGroupId(this.groupId) //.setServerId(this.serverId.toString()) //.setPeerId(peer.toString()) //.setTerm(this.currTerm + 1) // next term.setLastLogIndex(lastLogId.getIndex()) //.setLastLogTerm(lastLogId.getTerm()) //.build();this.rpcService.preVote(peer.getEndpoint(), done.request, done);}this.prevVoteCtx.grant(this.serverId);if (this.prevVoteCtx.isGranted()) {doUnlock = false;electSelf();}} finally {if (doUnlock) {this.writeLock.unlock();}}}

可以看到preVote中会对当前出自己以外的节点发送RequestVoteRequest请求,主要设置信息如下:

RequestVoteRequest.newBuilder() //.setPreVote(true) // it's a pre-vote request..setGroupId(this.groupId) //.setServerId(this.serverId.toString()) //.setPeerId(peer.toString()) //.setTerm(this.currTerm + 1) // next term.setLastLogIndex(lastLogId.getIndex()) //.setLastLogTerm(lastLogId.getTerm()) //.build();

可以看到,这时候并没有将自己的currTerm设置为currTerm +1,只是在请求的时候发送了一个currTerm+1的值,这和实际选举的时候有差别,实际选举的时候首选会将currTerm++

我们看下其他节点收到这个请求是怎么处理的:

public Message handlePreVoteRequest(final RequestVoteRequest request) {boolean doUnlock = true;this.writeLock.lock();try {if (!this.state.isActive()) {LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);return RpcFactoryHelper //.responseFactory() //.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,"Node %s is not in active state, state %s.", getNodeId(), this.state.name());}final PeerId candidateId = new PeerId();if (!candidateId.parse(request.getServerId())) {LOG.warn("Node {} received PreVoteRequest from {} serverId bad format.", getNodeId(),request.getServerId());return RpcFactoryHelper //.responseFactory() //.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,"Parse candidateId failed: %s.", request.getServerId());}boolean granted = false;// noinspection ConstantConditionsdo {if (!this.conf.contains(candidateId)) {LOG.warn("Node {} ignore PreVoteRequest from {} as it is not in conf <{}>.", getNodeId(),request.getServerId(), this.conf);break;}if (this.leaderId != null && !this.leaderId.isEmpty() && isCurrentLeaderValid()) {LOG.info("Node {} ignore PreVoteRequest from {}, term={}, currTerm={}, because the leader {}'s lease is still valid.",getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, this.leaderId);break;}if (request.getTerm() < this.currTerm) {LOG.info("Node {} ignore PreVoteRequest from {}, term={}, currTerm={}.", getNodeId(),request.getServerId(), request.getTerm(), this.currTerm);// A follower replicator may not be started when this node become leader, so we must check it.checkReplicator(candidateId);break;}// A follower replicator may not be started when this node become leader, so we must check it.// check replicator statecheckReplicator(candidateId);doUnlock = false;this.writeLock.unlock();final LogId lastLogId = this.logManager.getLastLogId(true);doUnlock = true;this.writeLock.lock();final LogId requestLastLogId = new LogId(request.getLastLogIndex(), request.getLastLogTerm());granted = requestLastLogId.compareTo(lastLogId) >= 0;LOG.info("Node {} received PreVoteRequest from {}, term={}, currTerm={}, granted={}, requestLastLogId={}, lastLogId={}.",getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, granted, requestLastLogId,lastLogId);} while (false);return RequestVoteResponse.newBuilder() //.setTerm(this.currTerm) //.setGranted(granted) //.build();} finally {if (doUnlock) {this.writeLock.unlock();}}}

这里其他节点收到PreVoteRequest的时候,会进行如下判断:

  1. 如果当前节点的Leader节点依然活着,直接返回本次投票granted=false
  2. 如果请求preVote的term比当前节点term小,直接返回本次投票granted=false
  3. 如果请求的Log信息(index和term比当前小),直接返回本次投票granted=false
  4. 如果上面都不满足,返回granted=true

这是其他节点收到PreVoteRequest的处理,我们再看发起preVote节点收到其他节点的响应是怎么处理的:

public void handlePreVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) {boolean doUnlock = true;this.writeLock.lock();try {if (this.state != State.STATE_FOLLOWER) {LOG.warn("Node {} received invalid PreVoteResponse from {}, state not in STATE_FOLLOWER but {}.",getNodeId(), peerId, this.state);return;}if (term != this.currTerm) {LOG.warn("Node {} received invalid PreVoteResponse from {}, term={}, currTerm={}.", getNodeId(),peerId, term, this.currTerm);return;}if (response.getTerm() > this.currTerm) {LOG.warn("Node {} received invalid PreVoteResponse from {}, term {}, expect={}.", getNodeId(), peerId,response.getTerm(), this.currTerm);stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,"Raft node receives higher term pre_vote_response."));return;}LOG.info("Node {} received PreVoteResponse from {}, term={}, granted={}.", getNodeId(), peerId,response.getTerm(), response.getGranted());// check granted quorum?if (response.getGranted()) {this.prevVoteCtx.grant(peerId);if (this.prevVoteCtx.isGranted()) {doUnlock = false;electSelf();}}} finally {if (doUnlock) {this.writeLock.unlock();}}}

这里参数中term是投票之前节点的term,peerId是当前节点发送preVote节点的PeerId信息,我们看下其判断逻辑:

  1. 如果当前term和投票前的term不相等,则表明发生了新一轮投票,当前响应作废直接返回
  2. 如果响应中term(响应节点的term)比当前节点term大,表明响应节点比当前节点的投票轮次更高,直接返回
  3. 如果响应允许这次投票,即response.getGranted=true,判断本轮发起的投票同意是否过半。

在NodeImpl中有两个Ballot,一个是支持preVote的prevVoteCtx,一个是支持vote,在发起preVote的时候,会对prevVoteCtx进行初始化:

public boolean init(final Configuration conf, final Configuration oldConf) {this.peers.clear();this.oldPeers.clear();this.quorum = this.oldQuorum = 0;int index = 0;if (conf != null) {for (final PeerId peer : conf) {this.peers.add(new UnfoundPeerId(peer, index++, false));}}this.quorum = this.peers.size() / 2 + 1;if (oldConf == null) {return true;}index = 0;for (final PeerId peer : oldConf) {this.oldPeers.add(new UnfoundPeerId(peer, index++, false));}this.oldQuorum = this.oldPeers.size() / 2 + 1;return true;}

可以看到这里init对Ballot的法定人数quorum 设置是当前节点/2+1个。
这里我们在来看当preVote请求被同意的情况下是怎么判断是否需要发起选举,在handlePreVoteResponse的最后,会执行this.prevVoteCtx.grant(peerId);

public void grant(final PeerId peerId) {grant(peerId, new PosHint());}
public PosHint grant(final PeerId peerId, final PosHint hint) {UnfoundPeerId peer = findPeer(peerId, this.peers, hint.pos0);if (peer != null) {if (!peer.found) {peer.found = true;this.quorum--;}hint.pos0 = peer.index;} else {hint.pos0 = -1;}if (this.oldPeers.isEmpty()) {hint.pos1 = -1;return hint;}peer = findPeer(peerId, this.oldPeers, hint.pos1);if (peer != null) {if (!peer.found) {peer.found = true;this.oldQuorum--;}hint.pos1 = peer.index;} else {hint.pos1 = -1;}return hint;}

这里的判断逻辑很简单,响应的节点如果同意了这次投票,那么对应的投票信息Ballot法定人数quorum–,同时这里为了防止一个节点多次响应,标记每个节点只能响应一次。然后判断本次preVote投票是否过半:

public boolean isGranted() {return this.quorum <= 0 && this.oldQuorum <= 0;}

如果过半,开始正式选举electSelf:

private void electSelf() {long oldTerm;try {LOG.info("Node {} start vote and grant vote self, term={}.", getNodeId(), this.currTerm);if (!this.conf.contains(this.serverId)) {LOG.warn("Node {} can't do electSelf as it is not in {}.", getNodeId(), this.conf);return;}if (this.state == State.STATE_FOLLOWER) {LOG.debug("Node {} stop election timer, term={}.", getNodeId(), this.currTerm);this.electionTimer.stop();}resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT,"A follower's leader_id is reset to NULL as it begins to request_vote."));this.state = State.STATE_CANDIDATE;this.currTerm++;this.votedId = this.serverId.copy();LOG.debug("Node {} start vote timer, term={} .", getNodeId(), this.currTerm);this.voteTimer.start();this.voteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());oldTerm = this.currTerm;} finally {this.writeLock.unlock();}final LogId lastLogId = this.logManager.getLastLogId(true);this.writeLock.lock();try {// vote need defense ABA after unlock&writeLockif (oldTerm != this.currTerm) {LOG.warn("Node {} raise term {} when getLastLogId.", getNodeId(), this.currTerm);return;}for (final PeerId peer : this.conf.listPeers()) {if (peer.equals(this.serverId)) {continue;}if (!this.rpcService.connect(peer.getEndpoint())) {LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());continue;}final OnRequestVoteRpcDone done = new OnRequestVoteRpcDone(peer, this.currTerm, this);done.request = RequestVoteRequest.newBuilder() //.setPreVote(false) // It's not a pre-vote request..setGroupId(this.groupId) //.setServerId(this.serverId.toString()) //.setPeerId(peer.toString()) //.setTerm(this.currTerm) //.setLastLogIndex(lastLogId.getIndex()) //.setLastLogTerm(lastLogId.getTerm()) //.build();this.rpcService.requestVote(peer.getEndpoint(), done.request, done);}this.metaStorage.setTermAndVotedFor(this.currTerm, this.serverId);this.voteCtx.grant(this.serverId);if (this.voteCtx.isGranted()) {becomeLeader();}} finally {this.writeLock.unlock();}}

正式投票会进行如下操作:

  1. 当前currTerm++,voteTimer启动,voteCtx初始化
  2. 发送RequestVoteRequest请求,与preVote基本差不多,唯一区别PreVote=false

我们再看其他节点收到投票怎么处理的:

public Message handleRequestVoteRequest(final RequestVoteRequest request) {boolean doUnlock = true;this.writeLock.lock();try {if (!this.state.isActive()) {LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);return RpcFactoryHelper //.responseFactory() //.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,"Node %s is not in active state, state %s.", getNodeId(), this.state.name());}final PeerId candidateId = new PeerId();if (!candidateId.parse(request.getServerId())) {LOG.warn("Node {} received RequestVoteRequest from {} serverId bad format.", getNodeId(),request.getServerId());return RpcFactoryHelper //.responseFactory() //.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,"Parse candidateId failed: %s.", request.getServerId());}// noinspection ConstantConditionsdo {// check termif (request.getTerm() >= this.currTerm) {LOG.info("Node {} received RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(),request.getServerId(), request.getTerm(), this.currTerm);// increase current term, change state to followerif (request.getTerm() > this.currTerm) {stepDown(request.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,"Raft node receives higher term RequestVoteRequest."));}} else {// ignore older termLOG.info("Node {} ignore RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(),request.getServerId(), request.getTerm(), this.currTerm);break;}doUnlock = false;this.writeLock.unlock();final LogId lastLogId = this.logManager.getLastLogId(true);doUnlock = true;this.writeLock.lock();// vote need ABA check after unlock&writeLockif (request.getTerm() != this.currTerm) {LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);break;}final boolean logIsOk = new LogId(request.getLastLogIndex(), request.getLastLogTerm()).compareTo(lastLogId) >= 0;if (logIsOk && (this.votedId == null || this.votedId.isEmpty())) {stepDown(request.getTerm(), false, new Status(RaftError.EVOTEFORCANDIDATE,"Raft node votes for some candidate, step down to restart election_timer."));this.votedId = candidateId.copy();this.metaStorage.setVotedFor(candidateId);}} while (false);return RequestVoteResponse.newBuilder() //.setTerm(this.currTerm) //.setGranted(request.getTerm() == this.currTerm && candidateId.equals(this.votedId)) //.build();} finally {if (doUnlock) {this.writeLock.unlock();}}}

收到投票请求节点的处理与preVote请求的处理逻辑上差不多:

  1. 判断请求的term和当前term大小,比请求的term大,返回Granted=false
  2. 判读当前log的位置是否比请求位置小,如果小证明发起请求节点数据位置比当前节点新,如果当前节点没有投票给其他节点,那么设置当前节点term为请求节点的term,同时将当前节点的投票votedId设置为请求节点,表名当前节点将选票投给了请求节点,当前节点会执行stepDown操作,不会进行选举,节点变为STATE_FOLLOWER,同时开启electionTimer定时器
  3. 返回判断当前节点term是否等于请求term且当前节点的选票ID和请求节点是否一致,如果满足上面两个条件,表明当前节点将票投给了请求节点

接下来看请求节点收到响应节点的响应是如何处理的:

public void handleRequestVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) {this.writeLock.lock();try {if (this.state != State.STATE_CANDIDATE) {LOG.warn("Node {} received invalid RequestVoteResponse from {}, state not in STATE_CANDIDATE but {}.",getNodeId(), peerId, this.state);return;}// check stale termif (term != this.currTerm) {LOG.warn("Node {} received stale RequestVoteResponse from {}, term={}, currTerm={}.", getNodeId(),peerId, term, this.currTerm);return;}// check response termif (response.getTerm() > this.currTerm) {LOG.warn("Node {} received invalid RequestVoteResponse from {}, term={}, expect={}.", getNodeId(),peerId, response.getTerm(), this.currTerm);stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,"Raft node receives higher term request_vote_response."));return;}// check granted quorum?if (response.getGranted()) {this.voteCtx.grant(peerId);if (this.voteCtx.isGranted()) {becomeLeader();}}} finally {this.writeLock.unlock();}}

大致逻辑如下:

  1. 判断响应的reponse的term和当前节点发起投票前的term是否一致,如果不一致,直接返回
  2. 判断响应的reposen的term是否比当前节点发起投票前的term大,如果满足,直接返回
  3. 判断当前节点已经获取的选票是否过半,如果过半,将当前节点晋升为Leader节点,执行becomeLeader逻辑

becomeLeader使当前节点晋升为Leader节点,我们看看其实现:

private void becomeLeader() {Requires.requireTrue(this.state == State.STATE_CANDIDATE, "Illegal state: " + this.state);LOG.info("Node {} become leader of group, term={}, conf={}, oldConf={}.", getNodeId(), this.currTerm,this.conf.getConf(), this.conf.getOldConf());// cancel candidate vote timerstopVoteTimer();this.state = State.STATE_LEADER;this.leaderId = this.serverId.copy();this.replicatorGroup.resetTerm(this.currTerm);// Start follower's replicatorsfor (final PeerId peer : this.conf.listPeers()) {if (peer.equals(this.serverId)) {continue;}LOG.debug("Node {} add a replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);if (!this.replicatorGroup.addReplicator(peer)) {LOG.error("Fail to add a replicator, peer={}.", peer);}}// Start learner's replicatorsfor (final PeerId peer : this.conf.listLearners()) {LOG.debug("Node {} add a learner replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);if (!this.replicatorGroup.addReplicator(peer, ReplicatorType.Learner)) {LOG.error("Fail to add a learner replicator, peer={}.", peer);}}// init commit managerthis.ballotBox.resetPendingIndex(this.logManager.getLastLogIndex() + 1);// Register _conf_ctx to reject configuration changing before the first log// is committed.if (this.confCtx.isBusy()) {throw new IllegalStateException();}this.confCtx.flush(this.conf.getConf(), this.conf.getOldConf());this.stepDownTimer.start();}

主要逻辑为:

  1. 停止当前投票定时器
  2. 将所有非当前节点、角色为Follower的节点加入到复制组里面去
  3. 将所有角色为Learner的节点加入到复制组里面去
  4. 重置ballotBox(用来管理选票的)
  5. stepDownTimer启动

这样Leader节点就被选出来。

我们在看看如何写入数据的。

数据写入和复制

客户端通过RouteTable.getInstance().selectLeader(groupId)能够获取当前分组下的Leader节点信息,拼接待写入数据的Request对象,然后通过CliClientServiceImpl..getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() {}能够向集群Leader节点进行数据写入。

服务端则通过对应的RpcProcessor来处理写入的请求,获取到请求后取出数据部分封装成Task,然后通过Node.apply将task写入到node中去。
而Node.apply只是将Task写入到了Disruptor的RingBuffer中去,如果对这块有疑问,可以看看这篇文章高性能队列Disruptor使用入门,原理和代码实现

数据写入的时候,首先会将task转换成LogEntryAndClosure,同时会将Task.done相关信息放入到BallotBox的pendingMetaQueue和closureQueue队列中去(当数据写入完成之后会通过这两个queue取出task对应的done执行),然后将一批LogEntryAndClosure通过logManagerj将数据持久化写入。
这里Node.apply传入的是一个Task类型:

public class Task implements Serializable {private static final long serialVersionUID = 2971309899898274575L;private ByteBuffer        data             = LogEntry.EMPTY_DATA;private Closure           done;private long              expectedTerm     = -1;
}

这里的data就是我们写入的数据,而Closure done则是一个回调接口,当数据被写入到集群1/2+1节点成功之后会调用Closure.run(final Status status)方法。

而NodeImpl中在写入Task是写入到了RingBuffer中,实际处理在LogEntryAndClosureHandler中:

public void onEvent(final LogEntryAndClosure event, final long sequence, final boolean endOfBatch)throws Exception {if (event.shutdownLatch != null) {if (!this.tasks.isEmpty()) {executeApplyingTasks(this.tasks);reset();}final int num = GLOBAL_NUM_NODES.decrementAndGet();event.shutdownLatch.countDown();return;}this.tasks.add(event);if (this.tasks.size() >= NodeImpl.this.raftOptions.getApplyBatch() || endOfBatch) {executeApplyingTasks(this.tasks);reset();}}

最终在executeApplyingTasks进行实际写入,这块代码有点长,就不贴代码了,大概描述下:

  1. 首先判断当前节点是不是Leader节点,如果不是的话,Closure.run(错误状态)返回
  2. task的term和当前节点term不一致,同上,返回
  3. 调用BallotBoxappendPendingTask,这个逻辑需要注意下:
public boolean appendPendingTask(final Configuration conf, final Configuration oldConf, final Closure done) {// 每个写入Task都生成一个Ballot ,并放入pendingMetaQueue,后续其他final Ballot bl = new Ballot();if (!bl.init(conf, oldConf)) {LOG.error("Fail to init ballot.");return false;}final long stamp = this.stampedLock.writeLock();try {if (this.pendingIndex <= 0) {LOG.error("Fail to appendingTask, pendingIndex={}.", this.pendingIndex);return false;}this.pendingMetaQueue.add(bl);this.closureQueue.appendPendingClosure(done);return true;} finally {this.stampedLock.unlockWrite(stamp);}}

这里为每个写入任务都生成了一个Ballot,还记得上面选主投票的时候,用的也是这个来标记选举是否过半,这里也是一样,后续其他节点复制Leader该Task的数据的时候,会对应更新Leader中对应该Task的Ballot 投票信息,通过该Ballot 能够判断集群是否有过半节点已经完成了该Task的写入。同时也将Task写入集群过半节点成功之后的回调入口Closure 保存在closureQueue中,当其他节点写入Task成功更新对应Task的Ballot 的时候,会判断是否过半节点写入成功,如果成功则会回调对应Task的Closure的run方法。
4. 调用logManager.appendEntries将数据写入
5. 本地节点写入完成之后,回调LeaderStableClosure接口,逻辑为:

public void run(final Status status) {if (status.isOk()) {NodeImpl.this.ballotBox.commitAt(this.firstLogIndex, this.firstLogIndex + this.nEntries - 1,NodeImpl.this.serverId);} else {this.firstLogIndex + this.nEntries - 1, status);}}

而ballotBox.commitAt的逻辑如下:

public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final PeerId peer) {final long stamp = this.stampedLock.writeLock();long lastCommittedIndex = 0;try {if (this.pendingIndex == 0) {return false;}if (lastLogIndex < this.pendingIndex) {return true;}final long startAt = Math.max(this.pendingIndex, firstLogIndex);Ballot.PosHint hint = new Ballot.PosHint();for (long logIndex = startAt; logIndex <= lastLogIndex; logIndex++) {final Ballot bl = this.pendingMetaQueue.get((int) (logIndex - this.pendingIndex));hint = bl.grant(peer, hint);if (bl.isGranted()) {lastCommittedIndex = logIndex;}}if (lastCommittedIndex == 0) {return true;}this.pendingMetaQueue.removeFromFirst((int) (lastCommittedIndex - this.pendingIndex) + 1);this.pendingIndex = lastCommittedIndex + 1;this.lastCommittedIndex = lastCommittedIndex;} finally {this.stampedLock.unlockWrite(stamp);}this.waiter.onCommitted(lastCommittedIndex);return true;}

这里可以看到,就是每个节点写入成功后,调用Leader节点的.ballotBox.commitAt,更新对应写入数据的投票信息,如果bl.isGranted,即完成了过半节点的写入,那么会调用this.waiter.onCommitted逻辑,这里最终会调用到StateMachineAdapter.onApply方法。
6. 在节点成为Leader的时候,会初始化日志复制组:this.replicatorGroup.addReplicator(peer),对于集群中的每个除当前节点的节点都会启动一个Replicator进行复制同时会开启心跳超时定时器,开始的时候首先会发送一个空的EmptyEntries给到Follower,获取Follower节点的最新日志位置,获取到Follower节点的最新日志位置之后,会再次发送需要同步的日志
7. 在Replicator中对Follower的响应进行处理onAppendEntriesReturned,如果Follower写入成功,会调用Node.BallotBox().commitAt 这里和步骤5处理一样

这样就完成了数据的写入和日志的复制。

可以看到jraft中的日志复制就是Leader向Follower节点发送数据然后Follower将发送的日志写入到本地。

Raft算法实现 - Sofa-JRaft,选主,数据写入,日志复制相关推荐

  1. CAP原理,分布式一致性算法,两阶段提交,三阶段提交,Paxos,Raft,zookeeper的选主过程,zab协议,顺序一致性,数据写入流程,节点状态,节点的角色

    我们知道,一般在分布式应用都有CAP准则: C Consistency, 一致性,分布式中的各个节点数据保持一致 A availability 可用性,任何时刻,分布式集群总是能够提供服务 P par ...

  2. 分布式系统选主怎么玩

    来自:架构之美 分布式系统为了保证其可靠性,一般都会多节点提供服务,各别节点的故障不会影响系统的可用性.对于分布式的存储系统来说,在保证可用性的同时,数据的可靠性(不丢失)也是其要解决的核心问题.目前 ...

  3. 分布式理论(六)—— Raft 算法

    分布式理论(六)-- Raft 算法 前言 我们之前讲述了 Paxos 一致性算法,虽然楼主尝试用最简单的算法来阐述,但仍然还是有点绕.楼主最初怀疑自己太笨,后来才直到,该算法的晦涩难懂不是只有我一个 ...

  4. 【面试】Raft算法详解

    文章目录 前言 一.Raft算法概述 二.Leader选举 三.日志同步 四.安全性 五.日志压缩 六.成员变更 七.Raft与Multi-Paxos的异同 八.Raft算法总结 参考 前言 Paxo ...

  5. Nacos的Raft 算法

    Nacos集群数据一致性的保证 Nacos Discovery 集群为了保证集群中数据的一致性,其采用了 Raft 算法.这是一种通过对日志进行复制管理来达到一致性的算法.Raft 通过选举 Lead ...

  6. HBase - 数据写入流程解析

    本文由  网易云 发布. 作者:范欣欣 本篇文章仅限内部分享,如需转载,请联系网易获取授权. 众所周知,HBase默认适用于写多读少的应用,正是依赖于它相当出色的写入性能:一个100台RS的集群可以轻 ...

  7. 大数据flume日志采集系统详解

    一.flume介绍 flume 是一个cloudera提供的 高可用高可靠,分布式的海量日志收集聚合传输系统.Flume支持日志系统中定制各类数据发送方,用于收集数据.同时flume提供对数据进行简单 ...

  8. Zookeeper选举算法( FastLeader选主)

    FastLeader选主算法: 看网上关于 zookeeper选主节点fast算法的描述,虽然有几篇写的非常不错,但是总感觉描述的差一些,因此打算写一个我认为的较为详细的版本让大家提点意见.当然如果有 ...

  9. raft协议 MySQL 切换_Raft 协议实战系列(二)—— 选主

    注:本文原创,转载请标明出处. 欢迎转发.关注微信公众号:Q的博客. 不定期发送干货,实践经验.系统总结.源码解读.技术原理. 本文目的 笔者期望通过系列文章帮助读者深入理解Raft协议并能付诸于工程 ...

最新文章

  1. C++中的指针与饮用
  2. Add Two Numbers
  3. 【CF487E】Tourists【圆方树】【树链剖分】【multiset】
  4. 顶宽的div中的英文不能自动换行
  5. 自动化运维环境搭建过程
  6. MySQL数据库事务及其特性
  7. 《手把手教你》系列基础篇之4-python+ selenium自动化测试-xpath使用(详细教程)
  8. Nodejs版本的企业微信中接收消息与腾讯对接之验证URL 代码已经上传,可以去下载
  9. 屏蔽布线系统端接模块的技巧
  10. Python 下载的 11 种姿势
  11. zabbix修改和查看登录密码
  12. epel源mysql版本_centos网络yum源和epel源(2017可用首选)
  13. CoinFLEX的基本情况以及与Bakkt
  14. Hadoop单机配置
  15. 《东周列国志》第六十三回 老祁奚力救羊舌 小范鞅智劫魏舒
  16. java读取并导出多类型数据csv文件
  17. 学会自我学习(自律性)
  18. GeForce RTX 3090深度学习测评
  19. OpenMP学习笔记1
  20. 宝宝出生前需要准备的用品

热门文章

  1. angr-example(解CTF题目)
  2. jmeter监听器---跟随响应时间的指标监控
  3. harmonyOS系统是安卓,HarmonyOS华为操作系统
  4. 简单有趣的互动小游戏介绍:好玩的密室脱逃H5互动小游戏
  5. 晨枫U盘启动盘制作工具V4.0-安装原版Win7
  6. HTC使用官方固件作为底包制作rom卡刷包教程
  7. 数据挖掘 第四篇:OLS回归分析
  8. 横河川仪压力变送器故障代码_压力变送器常见故障及分析
  9. 网页色彩搭配教程:三个实用方法搞定网页配色设计
  10. flash ftp 注册码