Raft算法实现 - Sofa-JRaft,选主,数据写入,日志复制
关于raft算法相关细节,可以全看之前的文章 分布式一致性算法,两阶段提交,三阶段提交,Paxos,Raft,zookeeper的选主过程,zab协议,顺序一致性,数据写入流程,节点状态,节点的角色
这里我们说下阿里开源的sofa-jraft的实现。
首先说明下,在sofa-jraft有几个比较重要的角色
- Node 代表的就是一个服务节点
- Ballot 代表的是一次投票的相关信息
- PeerId 代表的是一个复制组里面的一个参与角色
- StateMachine 当数据提交到Node之后,会执行其
onApply
方法
另外Node中有几个比较重要的定时器:
- electionTimer 选举定时器,如果当前leader挂了,会进行preVote
- voteTimer 投票定时器,当投票超时后,会进行preVote
- stepDownTimer leader使用,判断当前节点是否存活,且检察整个集群是否有节点下线并更新Leader节点的Timestamp
选主投票
JRaft的选举投票有两个步骤preVote
和vote
,之所以要增加一个preVote的步骤,是为了解决系统中防止某个节点由于无法和leader同步,不断发起投票,抬升自己的Term,导致自己Term比Leader的Term还大,然后迫使Leader放弃Leader身份,开始新一轮的选举。
而preVote
则强调节点必须获得半数以上的投票才能开始发起新一轮的选举。
JRaft的选举是通过定时器超时开始的,在NodeImpl中(Node的具体实现类),当我们执行NodeImpl.init
的时候,会开启electionTimer
:
this.electionTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(),TIMER_FACTORY.getElectionTimer(this.options.isSharedElectionTimer(), name)) {protected void onTrigger() {handleElectionTimeout();}protected int adjustTimeout(final int timeoutMs) {return randomTimeout(timeoutMs);}};private void handleElectionTimeout() {boolean doUnlock = true;this.writeLock.lock();try {if (this.state != State.STATE_FOLLOWER) {return;}if (isCurrentLeaderValid()) {return;}resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "Lost connection from leader %s.",this.leaderId));// Judge whether to launch a election.if (!allowLaunchElection()) {return;}doUnlock = false;preVote();} finally {if (doUnlock) {this.writeLock.unlock();}}}
而handleElectionTimeout
中主要就是进行了preVote
操作,这里JRaft一次投票的主要几个操作如下:
preVote ===> handlePreVoteRequest ===> electSelf ===>handleRequestVoteRequest
我们首先看下preVote
:
private void preVote() {.....final LogId lastLogId = this.logManager.getLastLogId(true);boolean doUnlock = true;this.writeLock.lock();try {// pre_vote need defense ABA after unlock&writeLockif (oldTerm != this.currTerm) {LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);return;}this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());for (final PeerId peer : this.conf.listPeers()) {if (peer.equals(this.serverId)) {continue;}if (!this.rpcService.connect(peer.getEndpoint())) {LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());continue;}final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);done.request = RequestVoteRequest.newBuilder() //.setPreVote(true) // it's a pre-vote request..setGroupId(this.groupId) //.setServerId(this.serverId.toString()) //.setPeerId(peer.toString()) //.setTerm(this.currTerm + 1) // next term.setLastLogIndex(lastLogId.getIndex()) //.setLastLogTerm(lastLogId.getTerm()) //.build();this.rpcService.preVote(peer.getEndpoint(), done.request, done);}this.prevVoteCtx.grant(this.serverId);if (this.prevVoteCtx.isGranted()) {doUnlock = false;electSelf();}} finally {if (doUnlock) {this.writeLock.unlock();}}}
可以看到preVote
中会对当前出自己以外的节点发送RequestVoteRequest
请求,主要设置信息如下:
RequestVoteRequest.newBuilder() //.setPreVote(true) // it's a pre-vote request..setGroupId(this.groupId) //.setServerId(this.serverId.toString()) //.setPeerId(peer.toString()) //.setTerm(this.currTerm + 1) // next term.setLastLogIndex(lastLogId.getIndex()) //.setLastLogTerm(lastLogId.getTerm()) //.build();
可以看到,这时候并没有将自己的currTerm
设置为currTerm +1,只是在请求的时候发送了一个currTerm+1的值,这和实际选举的时候有差别,实际选举的时候首选会将currTerm++
我们看下其他节点收到这个请求是怎么处理的:
public Message handlePreVoteRequest(final RequestVoteRequest request) {boolean doUnlock = true;this.writeLock.lock();try {if (!this.state.isActive()) {LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);return RpcFactoryHelper //.responseFactory() //.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,"Node %s is not in active state, state %s.", getNodeId(), this.state.name());}final PeerId candidateId = new PeerId();if (!candidateId.parse(request.getServerId())) {LOG.warn("Node {} received PreVoteRequest from {} serverId bad format.", getNodeId(),request.getServerId());return RpcFactoryHelper //.responseFactory() //.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,"Parse candidateId failed: %s.", request.getServerId());}boolean granted = false;// noinspection ConstantConditionsdo {if (!this.conf.contains(candidateId)) {LOG.warn("Node {} ignore PreVoteRequest from {} as it is not in conf <{}>.", getNodeId(),request.getServerId(), this.conf);break;}if (this.leaderId != null && !this.leaderId.isEmpty() && isCurrentLeaderValid()) {LOG.info("Node {} ignore PreVoteRequest from {}, term={}, currTerm={}, because the leader {}'s lease is still valid.",getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, this.leaderId);break;}if (request.getTerm() < this.currTerm) {LOG.info("Node {} ignore PreVoteRequest from {}, term={}, currTerm={}.", getNodeId(),request.getServerId(), request.getTerm(), this.currTerm);// A follower replicator may not be started when this node become leader, so we must check it.checkReplicator(candidateId);break;}// A follower replicator may not be started when this node become leader, so we must check it.// check replicator statecheckReplicator(candidateId);doUnlock = false;this.writeLock.unlock();final LogId lastLogId = this.logManager.getLastLogId(true);doUnlock = true;this.writeLock.lock();final LogId requestLastLogId = new LogId(request.getLastLogIndex(), request.getLastLogTerm());granted = requestLastLogId.compareTo(lastLogId) >= 0;LOG.info("Node {} received PreVoteRequest from {}, term={}, currTerm={}, granted={}, requestLastLogId={}, lastLogId={}.",getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, granted, requestLastLogId,lastLogId);} while (false);return RequestVoteResponse.newBuilder() //.setTerm(this.currTerm) //.setGranted(granted) //.build();} finally {if (doUnlock) {this.writeLock.unlock();}}}
这里其他节点收到PreVoteRequest
的时候,会进行如下判断:
- 如果当前节点的Leader节点依然活着,直接返回本次投票granted=false
- 如果请求preVote的term比当前节点term小,直接返回本次投票granted=false
- 如果请求的Log信息(index和term比当前小),直接返回本次投票granted=false
- 如果上面都不满足,返回granted=true
这是其他节点收到PreVoteRequest
的处理,我们再看发起preVote
节点收到其他节点的响应是怎么处理的:
public void handlePreVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) {boolean doUnlock = true;this.writeLock.lock();try {if (this.state != State.STATE_FOLLOWER) {LOG.warn("Node {} received invalid PreVoteResponse from {}, state not in STATE_FOLLOWER but {}.",getNodeId(), peerId, this.state);return;}if (term != this.currTerm) {LOG.warn("Node {} received invalid PreVoteResponse from {}, term={}, currTerm={}.", getNodeId(),peerId, term, this.currTerm);return;}if (response.getTerm() > this.currTerm) {LOG.warn("Node {} received invalid PreVoteResponse from {}, term {}, expect={}.", getNodeId(), peerId,response.getTerm(), this.currTerm);stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,"Raft node receives higher term pre_vote_response."));return;}LOG.info("Node {} received PreVoteResponse from {}, term={}, granted={}.", getNodeId(), peerId,response.getTerm(), response.getGranted());// check granted quorum?if (response.getGranted()) {this.prevVoteCtx.grant(peerId);if (this.prevVoteCtx.isGranted()) {doUnlock = false;electSelf();}}} finally {if (doUnlock) {this.writeLock.unlock();}}}
这里参数中term是投票之前节点的term,peerId是当前节点发送preVote节点的PeerId信息
,我们看下其判断逻辑:
- 如果当前term和投票前的term不相等,则表明发生了新一轮投票,当前响应作废直接返回
- 如果响应中term(响应节点的term)比当前节点term大,表明响应节点比当前节点的投票轮次更高,直接返回
- 如果响应允许这次投票,即
response.getGranted=true
,判断本轮发起的投票同意是否过半。
在NodeImpl中有两个Ballot
,一个是支持preVote
的prevVoteCtx,一个是支持vote
,在发起preVote的时候,会对prevVoteCtx进行初始化:
public boolean init(final Configuration conf, final Configuration oldConf) {this.peers.clear();this.oldPeers.clear();this.quorum = this.oldQuorum = 0;int index = 0;if (conf != null) {for (final PeerId peer : conf) {this.peers.add(new UnfoundPeerId(peer, index++, false));}}this.quorum = this.peers.size() / 2 + 1;if (oldConf == null) {return true;}index = 0;for (final PeerId peer : oldConf) {this.oldPeers.add(new UnfoundPeerId(peer, index++, false));}this.oldQuorum = this.oldPeers.size() / 2 + 1;return true;}
可以看到这里init对Ballot的法定人数quorum 设置是当前节点/2+1个。
这里我们在来看当preVote
请求被同意的情况下是怎么判断是否需要发起选举,在handlePreVoteResponse
的最后,会执行this.prevVoteCtx.grant(peerId);
:
public void grant(final PeerId peerId) {grant(peerId, new PosHint());}
public PosHint grant(final PeerId peerId, final PosHint hint) {UnfoundPeerId peer = findPeer(peerId, this.peers, hint.pos0);if (peer != null) {if (!peer.found) {peer.found = true;this.quorum--;}hint.pos0 = peer.index;} else {hint.pos0 = -1;}if (this.oldPeers.isEmpty()) {hint.pos1 = -1;return hint;}peer = findPeer(peerId, this.oldPeers, hint.pos1);if (peer != null) {if (!peer.found) {peer.found = true;this.oldQuorum--;}hint.pos1 = peer.index;} else {hint.pos1 = -1;}return hint;}
这里的判断逻辑很简单,响应的节点如果同意了这次投票,那么对应的投票信息Ballot法定人数quorum–,同时这里为了防止一个节点多次响应,标记每个节点只能响应一次。然后判断本次preVote投票是否过半:
public boolean isGranted() {return this.quorum <= 0 && this.oldQuorum <= 0;}
如果过半,开始正式选举electSelf
:
private void electSelf() {long oldTerm;try {LOG.info("Node {} start vote and grant vote self, term={}.", getNodeId(), this.currTerm);if (!this.conf.contains(this.serverId)) {LOG.warn("Node {} can't do electSelf as it is not in {}.", getNodeId(), this.conf);return;}if (this.state == State.STATE_FOLLOWER) {LOG.debug("Node {} stop election timer, term={}.", getNodeId(), this.currTerm);this.electionTimer.stop();}resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT,"A follower's leader_id is reset to NULL as it begins to request_vote."));this.state = State.STATE_CANDIDATE;this.currTerm++;this.votedId = this.serverId.copy();LOG.debug("Node {} start vote timer, term={} .", getNodeId(), this.currTerm);this.voteTimer.start();this.voteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());oldTerm = this.currTerm;} finally {this.writeLock.unlock();}final LogId lastLogId = this.logManager.getLastLogId(true);this.writeLock.lock();try {// vote need defense ABA after unlock&writeLockif (oldTerm != this.currTerm) {LOG.warn("Node {} raise term {} when getLastLogId.", getNodeId(), this.currTerm);return;}for (final PeerId peer : this.conf.listPeers()) {if (peer.equals(this.serverId)) {continue;}if (!this.rpcService.connect(peer.getEndpoint())) {LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());continue;}final OnRequestVoteRpcDone done = new OnRequestVoteRpcDone(peer, this.currTerm, this);done.request = RequestVoteRequest.newBuilder() //.setPreVote(false) // It's not a pre-vote request..setGroupId(this.groupId) //.setServerId(this.serverId.toString()) //.setPeerId(peer.toString()) //.setTerm(this.currTerm) //.setLastLogIndex(lastLogId.getIndex()) //.setLastLogTerm(lastLogId.getTerm()) //.build();this.rpcService.requestVote(peer.getEndpoint(), done.request, done);}this.metaStorage.setTermAndVotedFor(this.currTerm, this.serverId);this.voteCtx.grant(this.serverId);if (this.voteCtx.isGranted()) {becomeLeader();}} finally {this.writeLock.unlock();}}
正式投票会进行如下操作:
- 当前currTerm++,voteTimer启动,voteCtx初始化
- 发送RequestVoteRequest请求,与preVote基本差不多,唯一区别PreVote=false
我们再看其他节点收到投票怎么处理的:
public Message handleRequestVoteRequest(final RequestVoteRequest request) {boolean doUnlock = true;this.writeLock.lock();try {if (!this.state.isActive()) {LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);return RpcFactoryHelper //.responseFactory() //.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,"Node %s is not in active state, state %s.", getNodeId(), this.state.name());}final PeerId candidateId = new PeerId();if (!candidateId.parse(request.getServerId())) {LOG.warn("Node {} received RequestVoteRequest from {} serverId bad format.", getNodeId(),request.getServerId());return RpcFactoryHelper //.responseFactory() //.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,"Parse candidateId failed: %s.", request.getServerId());}// noinspection ConstantConditionsdo {// check termif (request.getTerm() >= this.currTerm) {LOG.info("Node {} received RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(),request.getServerId(), request.getTerm(), this.currTerm);// increase current term, change state to followerif (request.getTerm() > this.currTerm) {stepDown(request.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,"Raft node receives higher term RequestVoteRequest."));}} else {// ignore older termLOG.info("Node {} ignore RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(),request.getServerId(), request.getTerm(), this.currTerm);break;}doUnlock = false;this.writeLock.unlock();final LogId lastLogId = this.logManager.getLastLogId(true);doUnlock = true;this.writeLock.lock();// vote need ABA check after unlock&writeLockif (request.getTerm() != this.currTerm) {LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);break;}final boolean logIsOk = new LogId(request.getLastLogIndex(), request.getLastLogTerm()).compareTo(lastLogId) >= 0;if (logIsOk && (this.votedId == null || this.votedId.isEmpty())) {stepDown(request.getTerm(), false, new Status(RaftError.EVOTEFORCANDIDATE,"Raft node votes for some candidate, step down to restart election_timer."));this.votedId = candidateId.copy();this.metaStorage.setVotedFor(candidateId);}} while (false);return RequestVoteResponse.newBuilder() //.setTerm(this.currTerm) //.setGranted(request.getTerm() == this.currTerm && candidateId.equals(this.votedId)) //.build();} finally {if (doUnlock) {this.writeLock.unlock();}}}
收到投票请求节点的处理与preVote请求的处理逻辑上差不多:
- 判断请求的term和当前term大小,比请求的term大,返回Granted=false
- 判读当前log的位置是否比请求位置小,如果小证明发起请求节点数据位置比当前节点新,
如果当前节点没有投票给其他节点
,那么设置当前节点term为请求节点的term,同时将当前节点的投票votedId设置为请求节点,表名当前节点将选票投给了请求节点,当前节点会执行stepDown
操作,不会进行选举,节点变为STATE_FOLLOWER
,同时开启electionTimer
定时器 - 返回判断当前节点term是否等于请求term且当前节点的选票ID和请求节点是否一致,如果满足上面两个条件,表明当前节点将票投给了请求节点
接下来看请求节点收到响应节点的响应是如何处理的:
public void handleRequestVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) {this.writeLock.lock();try {if (this.state != State.STATE_CANDIDATE) {LOG.warn("Node {} received invalid RequestVoteResponse from {}, state not in STATE_CANDIDATE but {}.",getNodeId(), peerId, this.state);return;}// check stale termif (term != this.currTerm) {LOG.warn("Node {} received stale RequestVoteResponse from {}, term={}, currTerm={}.", getNodeId(),peerId, term, this.currTerm);return;}// check response termif (response.getTerm() > this.currTerm) {LOG.warn("Node {} received invalid RequestVoteResponse from {}, term={}, expect={}.", getNodeId(),peerId, response.getTerm(), this.currTerm);stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,"Raft node receives higher term request_vote_response."));return;}// check granted quorum?if (response.getGranted()) {this.voteCtx.grant(peerId);if (this.voteCtx.isGranted()) {becomeLeader();}}} finally {this.writeLock.unlock();}}
大致逻辑如下:
- 判断响应的reponse的term和当前节点发起投票前的term是否一致,如果不一致,直接返回
- 判断响应的reposen的term是否比当前节点发起投票前的term大,如果满足,直接返回
- 判断当前节点已经获取的选票是否过半,如果过半,将当前节点晋升为Leader节点,执行
becomeLeader
逻辑
becomeLeader
使当前节点晋升为Leader节点,我们看看其实现:
private void becomeLeader() {Requires.requireTrue(this.state == State.STATE_CANDIDATE, "Illegal state: " + this.state);LOG.info("Node {} become leader of group, term={}, conf={}, oldConf={}.", getNodeId(), this.currTerm,this.conf.getConf(), this.conf.getOldConf());// cancel candidate vote timerstopVoteTimer();this.state = State.STATE_LEADER;this.leaderId = this.serverId.copy();this.replicatorGroup.resetTerm(this.currTerm);// Start follower's replicatorsfor (final PeerId peer : this.conf.listPeers()) {if (peer.equals(this.serverId)) {continue;}LOG.debug("Node {} add a replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);if (!this.replicatorGroup.addReplicator(peer)) {LOG.error("Fail to add a replicator, peer={}.", peer);}}// Start learner's replicatorsfor (final PeerId peer : this.conf.listLearners()) {LOG.debug("Node {} add a learner replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);if (!this.replicatorGroup.addReplicator(peer, ReplicatorType.Learner)) {LOG.error("Fail to add a learner replicator, peer={}.", peer);}}// init commit managerthis.ballotBox.resetPendingIndex(this.logManager.getLastLogIndex() + 1);// Register _conf_ctx to reject configuration changing before the first log// is committed.if (this.confCtx.isBusy()) {throw new IllegalStateException();}this.confCtx.flush(this.conf.getConf(), this.conf.getOldConf());this.stepDownTimer.start();}
主要逻辑为:
- 停止当前投票定时器
- 将所有非当前节点、角色为Follower的节点加入到复制组里面去
- 将所有角色为Learner的节点加入到复制组里面去
- 重置ballotBox(用来管理选票的)
- stepDownTimer启动
这样Leader节点就被选出来。
我们在看看如何写入数据的。
数据写入和复制
客户端通过RouteTable.getInstance().selectLeader(groupId)
能够获取当前分组下的Leader节点信息,拼接待写入数据的Request
对象,然后通过CliClientServiceImpl..getRpcClient().invokeAsync(leader.getEndpoint(), request, new InvokeCallback() {}
能够向集群Leader节点进行数据写入。
服务端则通过对应的RpcProcessor
来处理写入的请求,获取到请求后取出数据部分封装成Task
,然后通过Node.apply
将task写入到node中去。
而Node.apply只是将Task写入到了Disruptor的RingBuffer中去,如果对这块有疑问,可以看看这篇文章高性能队列Disruptor使用入门,原理和代码实现
数据写入的时候,首先会将task转换成LogEntryAndClosure
,同时会将Task.done相关信息放入到BallotBox的pendingMetaQueue和closureQueue队列中去(当数据写入完成之后会通过这两个queue取出task对应的done执行),然后将一批LogEntryAndClosure通过logManagerj将数据持久化写入。
这里Node.apply传入的是一个Task类型:
public class Task implements Serializable {private static final long serialVersionUID = 2971309899898274575L;private ByteBuffer data = LogEntry.EMPTY_DATA;private Closure done;private long expectedTerm = -1;
}
这里的data就是我们写入的数据,而Closure done
则是一个回调接口,当数据被写入到集群1/2+1节点成功之后会调用Closure.run(final Status status)方法。
而NodeImpl中在写入Task是写入到了RingBuffer中,实际处理在LogEntryAndClosureHandler
中:
public void onEvent(final LogEntryAndClosure event, final long sequence, final boolean endOfBatch)throws Exception {if (event.shutdownLatch != null) {if (!this.tasks.isEmpty()) {executeApplyingTasks(this.tasks);reset();}final int num = GLOBAL_NUM_NODES.decrementAndGet();event.shutdownLatch.countDown();return;}this.tasks.add(event);if (this.tasks.size() >= NodeImpl.this.raftOptions.getApplyBatch() || endOfBatch) {executeApplyingTasks(this.tasks);reset();}}
最终在executeApplyingTasks
进行实际写入,这块代码有点长,就不贴代码了,大概描述下:
- 首先判断当前节点是不是Leader节点,如果不是的话,Closure.run(错误状态)返回
- task的term和当前节点term不一致,同上,返回
- 调用BallotBoxappendPendingTask,这个逻辑需要注意下:
public boolean appendPendingTask(final Configuration conf, final Configuration oldConf, final Closure done) {// 每个写入Task都生成一个Ballot ,并放入pendingMetaQueue,后续其他final Ballot bl = new Ballot();if (!bl.init(conf, oldConf)) {LOG.error("Fail to init ballot.");return false;}final long stamp = this.stampedLock.writeLock();try {if (this.pendingIndex <= 0) {LOG.error("Fail to appendingTask, pendingIndex={}.", this.pendingIndex);return false;}this.pendingMetaQueue.add(bl);this.closureQueue.appendPendingClosure(done);return true;} finally {this.stampedLock.unlockWrite(stamp);}}
这里为每个写入任务都生成了一个Ballot
,还记得上面选主投票的时候,用的也是这个来标记选举是否过半,这里也是一样,后续其他节点复制Leader该Task的数据的时候,会对应更新Leader中对应该Task的Ballot 投票信息,通过该Ballot 能够判断集群是否有过半节点已经完成了该Task的写入。同时也将Task写入集群过半节点成功之后的回调入口Closure 保存在closureQueue中,当其他节点写入Task成功更新对应Task的Ballot 的时候,会判断是否过半节点写入成功,如果成功则会回调对应Task的Closure的run方法。
4. 调用logManager.appendEntries将数据写入
5. 本地节点写入完成之后,回调LeaderStableClosure接口,逻辑为:
public void run(final Status status) {if (status.isOk()) {NodeImpl.this.ballotBox.commitAt(this.firstLogIndex, this.firstLogIndex + this.nEntries - 1,NodeImpl.this.serverId);} else {this.firstLogIndex + this.nEntries - 1, status);}}
而ballotBox.commitAt的逻辑如下:
public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final PeerId peer) {final long stamp = this.stampedLock.writeLock();long lastCommittedIndex = 0;try {if (this.pendingIndex == 0) {return false;}if (lastLogIndex < this.pendingIndex) {return true;}final long startAt = Math.max(this.pendingIndex, firstLogIndex);Ballot.PosHint hint = new Ballot.PosHint();for (long logIndex = startAt; logIndex <= lastLogIndex; logIndex++) {final Ballot bl = this.pendingMetaQueue.get((int) (logIndex - this.pendingIndex));hint = bl.grant(peer, hint);if (bl.isGranted()) {lastCommittedIndex = logIndex;}}if (lastCommittedIndex == 0) {return true;}this.pendingMetaQueue.removeFromFirst((int) (lastCommittedIndex - this.pendingIndex) + 1);this.pendingIndex = lastCommittedIndex + 1;this.lastCommittedIndex = lastCommittedIndex;} finally {this.stampedLock.unlockWrite(stamp);}this.waiter.onCommitted(lastCommittedIndex);return true;}
这里可以看到,就是每个节点写入成功后,调用Leader节点的.ballotBox.commitAt,更新对应写入数据的投票信息,如果bl.isGranted
,即完成了过半节点的写入,那么会调用this.waiter.onCommitted
逻辑,这里最终会调用到StateMachineAdapter.onApply
方法。
6. 在节点成为Leader的时候,会初始化日志复制组:this.replicatorGroup.addReplicator(peer)
,对于集群中的每个除当前节点的节点都会启动一个Replicator
进行复制同时会开启心跳超时定时器,开始的时候首先会发送一个空的EmptyEntries
给到Follower,获取Follower节点的最新日志位置,获取到Follower节点的最新日志位置之后,会再次发送需要同步的日志
7. 在Replicator中对Follower的响应进行处理onAppendEntriesReturned
,如果Follower写入成功,会调用Node.BallotBox().commitAt 这里和步骤5处理一样
这样就完成了数据的写入和日志的复制。
可以看到jraft中的日志复制就是Leader向Follower节点发送数据然后Follower将发送的日志写入到本地。
Raft算法实现 - Sofa-JRaft,选主,数据写入,日志复制相关推荐
- CAP原理,分布式一致性算法,两阶段提交,三阶段提交,Paxos,Raft,zookeeper的选主过程,zab协议,顺序一致性,数据写入流程,节点状态,节点的角色
我们知道,一般在分布式应用都有CAP准则: C Consistency, 一致性,分布式中的各个节点数据保持一致 A availability 可用性,任何时刻,分布式集群总是能够提供服务 P par ...
- 分布式系统选主怎么玩
来自:架构之美 分布式系统为了保证其可靠性,一般都会多节点提供服务,各别节点的故障不会影响系统的可用性.对于分布式的存储系统来说,在保证可用性的同时,数据的可靠性(不丢失)也是其要解决的核心问题.目前 ...
- 分布式理论(六)—— Raft 算法
分布式理论(六)-- Raft 算法 前言 我们之前讲述了 Paxos 一致性算法,虽然楼主尝试用最简单的算法来阐述,但仍然还是有点绕.楼主最初怀疑自己太笨,后来才直到,该算法的晦涩难懂不是只有我一个 ...
- 【面试】Raft算法详解
文章目录 前言 一.Raft算法概述 二.Leader选举 三.日志同步 四.安全性 五.日志压缩 六.成员变更 七.Raft与Multi-Paxos的异同 八.Raft算法总结 参考 前言 Paxo ...
- Nacos的Raft 算法
Nacos集群数据一致性的保证 Nacos Discovery 集群为了保证集群中数据的一致性,其采用了 Raft 算法.这是一种通过对日志进行复制管理来达到一致性的算法.Raft 通过选举 Lead ...
- HBase - 数据写入流程解析
本文由 网易云 发布. 作者:范欣欣 本篇文章仅限内部分享,如需转载,请联系网易获取授权. 众所周知,HBase默认适用于写多读少的应用,正是依赖于它相当出色的写入性能:一个100台RS的集群可以轻 ...
- 大数据flume日志采集系统详解
一.flume介绍 flume 是一个cloudera提供的 高可用高可靠,分布式的海量日志收集聚合传输系统.Flume支持日志系统中定制各类数据发送方,用于收集数据.同时flume提供对数据进行简单 ...
- Zookeeper选举算法( FastLeader选主)
FastLeader选主算法: 看网上关于 zookeeper选主节点fast算法的描述,虽然有几篇写的非常不错,但是总感觉描述的差一些,因此打算写一个我认为的较为详细的版本让大家提点意见.当然如果有 ...
- raft协议 MySQL 切换_Raft 协议实战系列(二)—— 选主
注:本文原创,转载请标明出处. 欢迎转发.关注微信公众号:Q的博客. 不定期发送干货,实践经验.系统总结.源码解读.技术原理. 本文目的 笔者期望通过系列文章帮助读者深入理解Raft协议并能付诸于工程 ...
最新文章
- C++中的指针与饮用
- Add Two Numbers
- 【CF487E】Tourists【圆方树】【树链剖分】【multiset】
- 顶宽的div中的英文不能自动换行
- 自动化运维环境搭建过程
- MySQL数据库事务及其特性
- 《手把手教你》系列基础篇之4-python+ selenium自动化测试-xpath使用(详细教程)
- Nodejs版本的企业微信中接收消息与腾讯对接之验证URL 代码已经上传,可以去下载
- 屏蔽布线系统端接模块的技巧
- Python 下载的 11 种姿势
- zabbix修改和查看登录密码
- epel源mysql版本_centos网络yum源和epel源(2017可用首选)
- CoinFLEX的基本情况以及与Bakkt
- Hadoop单机配置
- 《东周列国志》第六十三回 老祁奚力救羊舌 小范鞅智劫魏舒
- java读取并导出多类型数据csv文件
- 学会自我学习(自律性)
- GeForce RTX 3090深度学习测评
- OpenMP学习笔记1
- 宝宝出生前需要准备的用品