rocketmq DLedger主从自动切换

rocketmq从4.5开始,提供了故障自动切换功能,当主从集群中的master故障后,可自动从多个slave中选举出master,完成故障转移,不需要人工操作

rocketmq使用DLedger实现自动故障转移,DLedger是基于raft协议的commitLog存储库,主要包括master选举和日志复制

*********************

master 选举

节点状态:leader、candidate、follower

public class MemberState {...public static enum Role {UNKNOWN,CANDIDATE,LEADER,FOLLOWER;private Role() {}}
}

leader:接受客户端请求,本地写入日志数据,并将数据复制给follower定期发送心跳数据给follower维护leader状态

candidate:master故障后节点的中间状态,只有处于candidate状态的节点才会发送投票选举请求master选举完成后,节点状态为leader或者follower

follower:负责同步leader的日志数据接受leader心跳数据,重置倒计时器保持follower状态,并将心跳响应返回给leader

master选举触发:

集群初始启动,此时所有节点都处于candidate状态,需要选举产生master;

master故障或者网络故障导致超过半数follower接收不到心跳数据,倒计时器到期触发master选举

****************

master选举过程

follower倒计时器到期,状态转变为candidate,向自己及其它节点发起投票请求(自己给自己投赞成票)

其他节点收到投票请求后,如果满足以下任一条件,则拒绝投票:

reject_already_voted:当前节点已经投票已经投票、

reject_already_has_leader:集群中已经选举产生leader、

reject_expired_term:请求节点投票term小于当前节点投票term、

reject_term_not_ready:请求节点投票term大于当前节点投票term、

reject_term_small_than_ledger:请求节点投票term小于当前节点日志term(ledgerEndTerm)、

reject_expired_ledger:请求节点日志term(ledgerEndTerm)小于当前节点日志term(ledgerEndTerm)、

reject_small_ledger_end_index:请求节点与当前节点日志term(ledgerEndTerm)相等,但是日志索引小于当前节点日志索引(ledgerEndIndex)、

否则,当前节点投票同意请求节点为主节点(accepted)

请求投票结果:

DLedgerLeaderSelector.matainAsCandidate()方法final AtomicInteger allNum = new AtomicInteger(0);            //所有投票数final AtomicInteger validNum = new AtomicInteger(0);          //有效投票final AtomicInteger acceptedNum = new AtomicInteger(0);       //同意票final AtomicInteger notReadyTermNum = new AtomicInteger(0);   //未准备好投票,请求节点投票term大于远端节点投票term,远端节点返回rejected_term_not_readyfinal AtomicInteger biggerLedgerNum = new AtomicInteger(0);   //请求节点日志term小于远端节点日志term,或者日志term相同,请求节点日志索引小于远端节点日志索引(ledgerEndIndex)final AtomicBoolean alreadyHasLeader = new AtomicBoolean(false); //当前集群已有leaderif (knownMaxTermInGroup.get() > term) {parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;nextTimeToRequestVote = getNextTimeToRequestVote();changeRoleToCandidate(knownMaxTermInGroup.get());} else if (alreadyHasLeader.get()) {parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;nextTimeToRequestVote = getNextTimeToRequestVote() + heartBeatTimeIntervalMs * maxHeartBeatLeak;} else if (!memberState.isQuorum(validNum.get())) {parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;nextTimeToRequestVote = getNextTimeToRequestVote();} else if (memberState.isQuorum(acceptedNum.get())) {parseResult = VoteResponse.ParseResult.PASSED;} else if (memberState.isQuorum(acceptedNum.get() + notReadyTermNum.get())) {parseResult = VoteResponse.ParseResult.REVOTE_IMMEDIATELY;} else if (memberState.isQuorum(acceptedNum.get() + biggerLedgerNum.get())) {parseResult = VoteResponse.ParseResult.WAIT_TO_REVOTE;nextTimeToRequestVote = getNextTimeToRequestVote();} else {parseResult = VoteResponse.ParseResult.WAIT_TO_VOTE_NEXT;nextTimeToRequestVote = getNextTimeToRequestVote();}lastParseResult = parseResult;logger.info("[{}] [PARSE_VOTE_RESULT] cost={} term={} memberNum={} allNum={} acceptedNum={} notReadyTermNum={} biggerLedgerNum={} alreadyHasLeader={} maxTerm={} result={}",memberState.getSelfId(), lastVoteCost, term, memberState.peerSize(), allNum, acceptedNum, notReadyTermNum, biggerLedgerNum, alreadyHasLeader, knownMaxTermInGroup.get(), parseResult);if (parseResult == VoteResponse.ParseResult.PASSED) {logger.info("[{}] [VOTE_RESULT] has been elected to be the leader in term {}", memberState.getSelfId(), term);changeRoleToLeader(term);}

选主成功:同意票数(acceptedNum)超过一半

立即重新投票:acceptedNum + notReadyTermNum 超过一半

同一投票term重新投票:有效票数(validNum)未超过一半、acceptedNum + biggerLegderNum 超过一半

自增投票term重新投票:请求节点的投票term小于集群中最大的投票term、集群中已有leader(此种情况当接收到leader的心跳数据时会转变为follower)、以及其他情况

****************

节点状态变更

candidate状态

变为leader:投票选举阶段获得半数以上的accepted投票

变为follower:如果集群中已有leader节点,candidate节点收到leader节点的心跳数据

维持candidate:其他状况需要重新投票选主

leader状态:通过发送心跳数据,根据心跳响应维持leader状态或者变为candidate状态

    private void maintainAsLeader() throws Exception {if (DLedgerUtils.elapsed(lastSendHeartBeatTime) > heartBeatTimeIntervalMs) {//超过心跳间隔时间,发送心跳long term;String leaderId;synchronized (memberState) {if (!memberState.isLeader()) {  //非leader节点直接返回//stop sendingreturn;}term = memberState.currTerm();leaderId = memberState.getLeaderId();lastSendHeartBeatTime = System.currentTimeMillis();}sendHeartbeats(term, leaderId);     //leader节点发送心跳}}private void sendHeartbeats(long term, String leaderId) throws Exception {  //leader节点发送心跳final AtomicInteger allNum = new AtomicInteger(1);       //所有节点数final AtomicInteger succNum = new AtomicInteger(1);      //响应为success的节点数final AtomicInteger notReadyNum = new AtomicInteger(0);  //发送心跳的投票term大于接收心跳节点的投票term的借点数目final AtomicLong maxTerm = new AtomicLong(-1);           //所有节点最大投票termfinal AtomicBoolean inconsistLeader = new AtomicBoolean(false);  //leader节点不一致final CountDownLatch beatLatch = new CountDownLatch(1);long startHeartbeatTimeMs = System.currentTimeMillis();for (String id : memberState.getPeerMap().keySet()) {if (memberState.getSelfId().equals(id)) {continue;}HeartBeatRequest heartBeatRequest = new HeartBeatRequest();heartBeatRequest.setGroup(memberState.getGroup());heartBeatRequest.setLocalId(memberState.getSelfId());  //当前节点idheartBeatRequest.setRemoteId(id);                      //接收发送心跳数据的远端节点idheartBeatRequest.setLeaderId(leaderId);                //当前节点leaderIdheartBeatRequest.setTerm(term);                        //当前节点投票termCompletableFuture<HeartBeatResponse> future = dLedgerRpcService.heartBeat(heartBeatRequest);//心跳响应数据future.whenComplete((HeartBeatResponse x, Throwable ex) -> {try {if (ex != null) {throw ex;}switch (DLedgerResponseCode.valueOf(x.getCode())) {case SUCCESS:                  //响应为success的节点数succNum.incrementAndGet();break;case EXPIRED_TERM:             //响应为expired_term的节点数(发送心跳节点的投票term小于接收心跳节点的投票term数)maxTerm.set(x.getTerm());  //设置最大请求termbreak;case INCONSISTENT_LEADER:      //响应inconsistent_leader,集群中leader不一致inconsistLeader.compareAndSet(false, true);break;case TERM_NOT_READY:           //响应为term_not_ready,发送心跳节点的投票term大于接收心跳节点的投票termnotReadyNum.incrementAndGet();     break;default:break;}if (memberState.isQuorum(succNum.get())|| memberState.isQuorum(succNum.get() + notReadyNum.get())) {//如果响应为success的节点超过半数//或者succNum + notReady(此种情况会立即重新投票),到计数器减一beatLatch.countDown();}} catch (Throwable t) {logger.error("Parse heartbeat response failed", t);} finally {allNum.incrementAndGet();if (allNum.get() == memberState.peerSize()) {//所有响应节点数等与集群节点数,到计数器减一beatLatch.countDown();}}});}beatLatch.await(heartBeatTimeIntervalMs, TimeUnit.MILLISECONDS);  //等待一个心跳间隔周期,在此间隔期间,不满足//memberState.isQuorum(succNum.get())、 //memberState.isQuorum(succNum.get() + notReadyNum.get())//allNum.get() == memberState.peerSize()时,会重新发送心跳if (memberState.isQuorum(succNum.get())) { //心跳响应success超过半数,设置心跳发送成功时间,当前节点状态保持为leaderlastSuccHeartBeatTime = System.currentTimeMillis();} else {logger.info("[{}] Parse heartbeat responses in cost={} term={} allNum={} succNum={} notReadyNum={} inconsistLeader={} maxTerm={} peerSize={} lastSuccHeartBeatTime={}",memberState.getSelfId(), DLedgerUtils.elapsed(startHeartbeatTimeMs), term, allNum.get(), succNum.get(), notReadyNum.get(), inconsistLeader.get(), maxTerm.get(), memberState.peerSize(), new Timestamp(lastSuccHeartBeatTime));if (memberState.isQuorum(succNum.get() + notReadyNum.get())) {lastSendHeartBeatTime = -1;} else if (maxTerm.get() > term) {changeRoleToCandidate(maxTerm.get());} else if (inconsistLeader.get()) {changeRoleToCandidate(term);} else if (DLedgerUtils.elapsed(lastSuccHeartBeatTime) > maxHeartBeatLeak * heartBeatTimeIntervalMs) {changeRoleToCandidate(term);}  //如果集群最大投票term大于当前leader状态节点投票term、//出现不一致leader//上一次发送心跳间隔时间超过最大心跳间隔时间,leader转为candidate}}public CompletableFuture<HeartBeatResponse> handleHeartBeat(HeartBeatRequest request) throws Exception {//处理leader节点心跳数据if (!memberState.isPeerMember(request.getLeaderId())) {//如果集群中不存在节点id,返回unknown_memberlogger.warn("[BUG] [HandleHeartBeat] remoteId={} is an unknown member", request.getLeaderId());return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.UNKNOWN_MEMBER.getCode()));}if (memberState.getSelfId().equals(request.getLeaderId())) {//如果当前节点id等于请求节点leaderId(leader节点不需要给自己发送心跳),返回unexpected_errorlogger.warn("[BUG] [HandleHeartBeat] selfId={} but remoteId={}", memberState.getSelfId(), request.getLeaderId());return CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.UNEXPECTED_MEMBER.getCode()));}if (request.getTerm() < memberState.currTerm()) {//如果请求节点的投票term小于当前节点的投票term,返回expired_termreturn CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode()));} else if (request.getTerm() == memberState.currTerm()) {//如果当前节点的投票term与请求节点想等if (request.getLeaderId().equals(memberState.getLeaderId())) {//如果leaderId相等,返回successlastLeaderHeartBeatTime = System.currentTimeMillis();return CompletableFuture.completedFuture(new HeartBeatResponse());}}//abnormal case//hold the lock to get the latest term and leaderIdsynchronized (memberState) {            //如果遇到异常情况,获取当前节点状态锁,获取最新的投票term、leaderId重新判断if (request.getTerm() < memberState.currTerm()) {  //如果请求节点的投票term小于当前节点的投票term,返回expired_termreturn CompletableFuture.completedFuture(new HeartBeatResponse().term(memberState.currTerm()).code(DLedgerResponseCode.EXPIRED_TERM.getCode()));} else if (request.getTerm() == memberState.currTerm()) {  //请求节点的投票term等于当前节点的投票termif (memberState.getLeaderId() == null) {        //如果当前节点的leaderId为null(节点处于candidate状态),则将节点转变为follower状态,返回successchangeRoleToFollower(request.getTerm(), request.getLeaderId());return CompletableFuture.completedFuture(new HeartBeatResponse());} else if (request.getLeaderId().equals(memberState.getLeaderId())) {//如果请求节点的leaderId等于当前节点的leaderId,返回successlastLeaderHeartBeatTime = System.currentTimeMillis();return CompletableFuture.completedFuture(new HeartBeatResponse());} else {   //leaderId不相等,返回inconsistent_leader//this should not happen, but if happenedlogger.error("[{}][BUG] currTerm {} has leader {}, but received leader {}", memberState.getSelfId(), memberState.currTerm(), memberState.getLeaderId(), request.getLeaderId());return CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.INCONSISTENT_LEADER.getCode()));}} else { //如果请求节点的投票term大于当前节点的投票term,首先将当前节点状态设置为candidate//投票term设置为请求节点的term,当前节点发送投票请求,立即重新开始投票//返回term_not_ready//To make it simple, for larger term, do not change to follower immediately//first change to candidate, and notify the state-maintainer threadchangeRoleToCandidate(request.getTerm());needIncreaseTermImmediately = true;//TOOD notifyreturn CompletableFuture.completedFuture(new HeartBeatResponse().code(DLedgerResponseCode.TERM_NOT_READY.getCode()));}}}

保持leader:心跳响应成功数(succNum)超过集群半数

变为candidate:集群最大投票term大于leader状态节点集群中出现leader不一致最新心跳发送成功时间超过最大间隔时间

follower状态:接受leader心跳数据,维持follower或者变为candidate

    private void maintainAsFollower() {if (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > 2 * heartBeatTimeIntervalMs) {synchronized (memberState) {//如果follower接收心跳时间超过2 * heartBeatTimeIntervalMsif (memberState.isFollower() && (DLedgerUtils.elapsed(lastLeaderHeartBeatTime) > maxHeartBeatLeak * heartBeatTimeIntervalMs)) {//如果follower接收心跳时间超过maxHeartBeatLeak * heartBeatTimeIntervalMs(maxHeartBeatLeak 默认为3),follower转变为candidatelogger.info("[{}][HeartBeatTimeOut] lastLeaderHeartBeatTime: {} heartBeatTimeIntervalMs: {} lastLeader={}", memberState.getSelfId(), new Timestamp(lastLeaderHeartBeatTime), heartBeatTimeIntervalMs, memberState.getLeaderId());changeRoleToCandidate(memberState.currTerm());}}}}

维持follower:上一次接收心跳间隔 <= maxHeartBeatLeak * heartBeatTimeIntervalMs

变为candidate:上一次接收心跳间隔 > maxHeartBeatLeak * heartBeatTimeIntervalMs

说明:maxHeartBeatLeak默认为3

*********************

日志复制

leader接收客户端读写请求在本地写入

同时将日志数据发送给follower节点follower节点本地写入,并返回写入状态

超过半数的节点日志写入成功后leader节点确认提交日志数据(修改committedIndex);

随后将committedIndex附加在随后的复制请求中发送给follower节点,follower节点提交日志数据

说明:主节点接受客户端读写请求,从节点只复制主节点数据,不能读写(由于主从数据存在延时)

客户端只能读取主节点已经提交的数据, committedIndex之后的数据不能读取

****************

主从异步复制

leader、follower日志使用异步复制,减少了线程等待

leader接受客户端写入请求,将日志数据写入本地(pageCache)

生成append completeFuture,将日志数据存入pending map待确认;

启动entryDispatcher向follower发送日志数据follower节点接受数据,异步响应写入状态

此时,leader节点接收下一个append请求,不需要等待数据确认

****************

独立并发复制

leader会为每个follower开启一个EntryDispatcher线程进行日志复制,记录复制点位,使用单独的异步线程判断日志是否复制到到多数节点上

****************

follower并行复制

follower维护了按照索引顺序排序的append请求列表,follower线程按照索引顺序处理append请求

****************

主从数据一致性

leader 节点根据日志序列号向follower节点发送日志数据follower将数据存入待写队列

从节点读取尝试追加序列号为ledgerEndIndex+1的日志数据

如果待写队列不为空,且有该数据,则follower追加到本地;

如果待写队列不为空,但是没有该数据,则数据丢失,从节点返回响应(数据不一致);

leader如果接到follower响应状态不一致,转发线程变更状态compare,向follower发送compare命令;

根据主从数据差异,重新同步或者删除多余数据,最终达到数据一致

此外,leader节点也会对超时未响应数据进行重发

*********************

日志复制相关类

DledgerServer:leader接受客户端请求,写入本地,发送日志给follower

public class DLedgerServer implements DLedgerProtocolHander {private static Logger logger = LoggerFactory.getLogger(DLedgerServer.class);private MemberState memberState;private DLedgerConfig dLedgerConfig;private DLedgerStore dLedgerStore;                 //本地写入日志数据private DLedgerRpcService dLedgerRpcService;       //接受客户端请求private DLedgerEntryPusher dLedgerEntryPusher;     //向follower发送日志数据private DLedgerLeaderElector dLedgerLeaderElector; //节点选主public DLedgerServer(DLedgerConfig dLedgerConfig) {this.dLedgerConfig = dLedgerConfig;this.memberState = new MemberState(dLedgerConfig);this.dLedgerStore = createDLedgerStore(dLedgerConfig.getStoreType(), this.dLedgerConfig, this.memberState);dLedgerRpcService = new DLedgerRpcNettyService(this);dLedgerEntryPusher = new DLedgerEntryPusher(dLedgerConfig, memberState, dLedgerStore, dLedgerRpcService);dLedgerLeaderElector = new DLedgerLeaderElector(dLedgerConfig, memberState, dLedgerRpcService);}public void startup() {this.dLedgerStore.startup();this.dLedgerRpcService.startup();this.dLedgerEntryPusher.startup();this.dLedgerLeaderElector.startup();}public void shutdown() {this.dLedgerLeaderElector.shutdown();this.dLedgerEntryPusher.shutdown();this.dLedgerRpcService.shutdown();this.dLedgerStore.shutdown();}private DLedgerStore createDLedgerStore(String storeType, DLedgerConfig config, MemberState memberState) {if (storeType.equals(DLedgerConfig.MEMORY)) {return new DLedgerMemoryStore(config, memberState);     //写入内存} else {return new DLedgerMmapFileStore(config, memberState);   //写入文件}}public MemberState getMemberState() {return memberState;}@Overridepublic CompletableFuture<HeartBeatResponse> handleHeartBeat(HeartBeatRequest request) throws Exception {//当前节点处理心跳try {PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());return dLedgerLeaderElector.handleHeartBeat(request);   //最终调用dLedgerLeaderElector handleHeartBeat方法进行处理} catch (DLedgerException e) {logger.error("[{}][HandleHeartBeat] failed", memberState.getSelfId(), e);HeartBeatResponse response = new HeartBeatResponse();response.copyBaseInfo(request);response.setCode(e.getCode().getCode());response.setLeaderId(memberState.getLeaderId());return CompletableFuture.completedFuture(response);}}@Overridepublic CompletableFuture<VoteResponse> handleVote(VoteRequest request) throws Exception {//当前节点处理投票请求try {PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());return dLedgerLeaderElector.handleVote(request, false);//最终调用dledgerLeaderElector handleVote方法处理} catch (DLedgerException e) {logger.error("[{}][HandleVote] failed", memberState.getSelfId(), e);VoteResponse response = new VoteResponse();response.copyBaseInfo(request);response.setCode(e.getCode().getCode());response.setLeaderId(memberState.getLeaderId());return CompletableFuture.completedFuture(response);}}/*** Handle the append requests:*  1.append the entry to local store*  2.submit the future to entry pusher and wait the quorum ack*  3.if the pending requests are full, then reject it immediately* @param request* @return* @throws IOException*/@Overridepublic CompletableFuture<AppendEntryResponse> handleAppend(AppendEntryRequest request) throws IOException {//处理追加日志请求(客户端的写请求)try {PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());//验证当前节点id 是否等于 请求发送id(remoteId),不等抛出异常PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());//验证当前节点group 是否等于 请求设置的group,不等抛出异常PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);//验证当前节点是否为主节点,不是主节点抛出异常(只有主节点才能接受客户端写请求)long currTerm = memberState.currTerm();             //当前节点投票termif (dLedgerEntryPusher.isPendingFull(currTerm)) {   //如果主节点等待确认的日志数量超过指定数,拒绝追加写入AppendEntryResponse appendEntryResponse = new AppendEntryResponse();appendEntryResponse.setGroup(memberState.getGroup());appendEntryResponse.setCode(DLedgerResponseCode.LEADER_PENDING_FULL.getCode());appendEntryResponse.setTerm(currTerm);appendEntryResponse.setLeaderId(memberState.getSelfId());return AppendFuture.newCompletedFuture(-1, appendEntryResponse);} else {   //如果没满,可以追加写入DLedgerEntry dLedgerEntry = new DLedgerEntry();dLedgerEntry.setBody(request.getBody());DLedgerEntry resEntry = dLedgerStore.appendAsLeader(dLedgerEntry);  //本地写入数据return dLedgerEntryPusher.waitAck(resEntry);       //当前数据等待确认}} catch (DLedgerException e) {logger.error("[{}][HandleAppend] failed", memberState.getSelfId(), e);AppendEntryResponse response = new AppendEntryResponse();response.copyBaseInfo(request);response.setCode(e.getCode().getCode());response.setLeaderId(memberState.getLeaderId());return AppendFuture.newCompletedFuture(-1, response);}}@Overridepublic CompletableFuture<GetEntriesResponse> handleGet(GetEntriesRequest request) throws IOException {//获取日志数据try {PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());PreConditions.check(memberState.isLeader(), DLedgerResponseCode.NOT_LEADER);DLedgerEntry entry = dLedgerStore.get(request.getBeginIndex());//根据index,从本地内存或者文件中读取GetEntriesResponse response = new GetEntriesResponse();response.setGroup(memberState.getGroup());if (entry != null) {response.setEntries(Collections.singletonList(entry));}return CompletableFuture.completedFuture(response);} catch (DLedgerException e) {logger.error("[{}][HandleGet] failed", memberState.getSelfId(), e);GetEntriesResponse response = new GetEntriesResponse();response.copyBaseInfo(request);response.setLeaderId(memberState.getLeaderId());response.setCode(e.getCode().getCode());return CompletableFuture.completedFuture(response);}}@Override public CompletableFuture<MetadataResponse> handleMetadata(MetadataRequest request) throws Exception {//获取元数据try {PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());MetadataResponse metadataResponse = new MetadataResponse();metadataResponse.setGroup(memberState.getGroup());metadataResponse.setPeers(memberState.getPeerMap());metadataResponse.setLeaderId(memberState.getLeaderId());return CompletableFuture.completedFuture(metadataResponse);} catch (DLedgerException e) {logger.error("[{}][HandleMetadata] failed", memberState.getSelfId(), e);MetadataResponse response = new MetadataResponse();response.copyBaseInfo(request);response.setCode(e.getCode().getCode());response.setLeaderId(memberState.getLeaderId());return CompletableFuture.completedFuture(response);}}@Overridepublic CompletableFuture<PullEntriesResponse> handlePull(PullEntriesRequest request) {return null;}@Override public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request) throws Exception {//将日志数据推送给followertry {PreConditions.check(memberState.getSelfId().equals(request.getRemoteId()), DLedgerResponseCode.UNKNOWN_MEMBER, "%s != %s", request.getRemoteId(), memberState.getSelfId());PreConditions.check(memberState.getGroup().equals(request.getGroup()), DLedgerResponseCode.UNKNOWN_GROUP, "%s != %s", request.getGroup(), memberState.getGroup());return dLedgerEntryPusher.handlePush(request); //调用dledgerEntryPusher handlePush推送日志数据} catch (DLedgerException e) {logger.error("[{}][HandlePush] failed", memberState.getSelfId(), e);PushEntryResponse response = new PushEntryResponse();response.copyBaseInfo(request);response.setCode(e.getCode().getCode());response.setLeaderId(memberState.getLeaderId());return CompletableFuture.completedFuture(response);}}public DLedgerStore getdLedgerStore() {return dLedgerStore;}public DLedgerRpcService getdLedgerRpcService() {return dLedgerRpcService;}public DLedgerLeaderElector getdLedgerLeaderElector() {return dLedgerLeaderElector;}public DLedgerConfig getdLedgerConfig() {return dLedgerConfig;}
}

DledgerEntryPusher:推送数据给follower、follower写入数据、统计写入成功数确认日志提交

public class DLedgerEntryPusher {private static Logger logger = LoggerFactory.getLogger(DLedgerEntryPusher.class);private DLedgerConfig dLedgerConfig;private DLedgerStore dLedgerStore;private final MemberState memberState;private DLedgerRpcService dLedgerRpcService;private Map<Long, ConcurrentMap<String, Long>> peerWaterMarksByTerm = new ConcurrentHashMap<>();//记录投票term下,各follower日志索引//key ==> 当前投票term//value(map) ==> key:follower id,value:日志索引private Map<Long, ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>>> pendingAppendResponsesByTerm = new ConcurrentHashMap<>();//leader节点等待确认队列//key ==> 当前投票term//value(map) ==> key:日志索引、value:写入响应,等待确认提交private EntryHandler entryHandler = new EntryHandler(logger); //follower节点写入日志private QuorumAckChecker quorumAckChecker = new QuorumAckChecker(logger);//检查主从节点日志数据写入成功数量,超过半数则可提交private Map<String, EntryDispatcher> dispatcherMap = new HashMap<>(); //key为follower id,entryDispatcher向对应follower发送数据,leader为每个follower创建一个entryDispatcherpublic DLedgerEntryPusher(DLedgerConfig dLedgerConfig, MemberState memberState, DLedgerStore dLedgerStore,DLedgerRpcService dLedgerRpcService) {this.dLedgerConfig = dLedgerConfig;this.memberState = memberState;this.dLedgerStore = dLedgerStore;this.dLedgerRpcService = dLedgerRpcService;for (String peer : memberState.getPeerMap().keySet()) {if (!peer.equals(memberState.getSelfId())) {dispatcherMap.put(peer, new EntryDispatcher(peer, logger));}}}public void startup() {entryHandler.start();quorumAckChecker.start();for (EntryDispatcher dispatcher : dispatcherMap.values()) {dispatcher.start();}}public void shutdown() {entryHandler.shutdown();quorumAckChecker.shutdown();for (EntryDispatcher dispatcher : dispatcherMap.values()) {dispatcher.shutdown();}}public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request) throws Exception {return entryHandler.handlePush(request);}private void checkTermForWaterMark(long term, String env) { //检查投票term waterMarks,不存在则创建if (!peerWaterMarksByTerm.containsKey(term)) {logger.info("Initialize the watermark in {} for term={}", env, term);ConcurrentMap<String, Long> waterMarks = new ConcurrentHashMap<>();for (String peer : memberState.getPeerMap().keySet()) {waterMarks.put(peer, -1L);}peerWaterMarksByTerm.putIfAbsent(term, waterMarks);}}private void checkTermForPendingMap(long term, String env) {  //初始化pending map,不存在则创建if (!pendingAppendResponsesByTerm.containsKey(term)) {logger.info("Initialize the pending append map in {} for term={}", env, term);pendingAppendResponsesByTerm.putIfAbsent(term, new ConcurrentHashMap<>());}}private void updatePeerWaterMark(long term, String peerId, long index) {  //更新follower节点最新写入索引synchronized (peerWaterMarksByTerm) {checkTermForWaterMark(term, "updatePeerWaterMark");if (peerWaterMarksByTerm.get(term).get(peerId) < index) {peerWaterMarksByTerm.get(term).put(peerId, index);}}}private long getPeerWaterMark(long term, String peerId) {  //获取投票term,follower节点最新写入索引synchronized (peerWaterMarksByTerm) {checkTermForWaterMark(term, "getPeerWaterMark");return peerWaterMarksByTerm.get(term).get(peerId);}}public boolean isPendingFull(long currTerm) {  //判断leader等待确认日志数据是否超过最大值(默认10000)checkTermForPendingMap(currTerm, "isPendingFull");return pendingAppendResponsesByTerm.get(currTerm).size() > dLedgerConfig.getMaxPendingRequestsNum();}public CompletableFuture<AppendEntryResponse> waitAck(DLedgerEntry entry) {updatePeerWaterMark(entry.getTerm(), memberState.getSelfId(), entry.getIndex());if (memberState.getPeerMap().size() == 1) {AppendEntryResponse response = new AppendEntryResponse();response.setGroup(memberState.getGroup());response.setLeaderId(memberState.getSelfId());response.setIndex(entry.getIndex());response.setTerm(entry.getTerm());response.setPos(entry.getPos());return AppendFuture.newCompletedFuture(entry.getPos(), response);} else {checkTermForPendingMap(entry.getTerm(), "waitAck");AppendFuture<AppendEntryResponse> future = new AppendFuture<>(dLedgerConfig.getMaxWaitAckTimeMs());future.setPos(entry.getPos());CompletableFuture<AppendEntryResponse> old = pendingAppendResponsesByTerm.get(entry.getTerm()).put(entry.getIndex(), future);if (old != null) {logger.warn("[MONITOR] get old wait at index={}", entry.getIndex());}wakeUpDispatchers();return future;}}public void wakeUpDispatchers() {for (EntryDispatcher dispatcher : dispatcherMap.values()) {dispatcher.wakeup();}}/*** This thread will check the quorum index and complete the pending requests.*/private class QuorumAckChecker extends ShutdownAbleThread {  //确认是否半数节点写入日志数据线程private long lastPrintWatermarkTimeMs = System.currentTimeMillis();private long lastCheckLeakTimeMs = System.currentTimeMillis();private long lastQuorumIndex = -1;public QuorumAckChecker(Logger logger) {super("QuorumAckChecker", logger);}@Overridepublic void doWork() {try {if (DLedgerUtils.elapsed(lastPrintWatermarkTimeMs) > 3000) {logger.info("[{}][{}] term={} ledgerBegin={} ledgerEnd={} committed={} watermarks={}",memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex(), dLedgerStore.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm));lastPrintWatermarkTimeMs = System.currentTimeMillis();}if (!memberState.isLeader()) {waitForRunning(1);return;}long currTerm = memberState.currTerm();checkTermForPendingMap(currTerm, "QuorumAckChecker");checkTermForWaterMark(currTerm, "QuorumAckChecker");if (pendingAppendResponsesByTerm.size() > 1) {for (Long term : pendingAppendResponsesByTerm.keySet()) {if (term == currTerm) {continue;}for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry : pendingAppendResponsesByTerm.get(term).entrySet()) {AppendEntryResponse response = new AppendEntryResponse();response.setGroup(memberState.getGroup());response.setIndex(futureEntry.getKey());response.setCode(DLedgerResponseCode.TERM_CHANGED.getCode());response.setLeaderId(memberState.getLeaderId());logger.info("[TermChange] Will clear the pending response index={} for term changed from {} to {}", futureEntry.getKey(), term, currTerm);futureEntry.getValue().complete(response);}pendingAppendResponsesByTerm.remove(term);}}if (peerWaterMarksByTerm.size() > 1) {for (Long term : peerWaterMarksByTerm.keySet()) {if (term == currTerm) {continue;}logger.info("[TermChange] Will clear the watermarks for term changed from {} to {}", term, currTerm);peerWaterMarksByTerm.remove(term);}}Map<String, Long> peerWaterMarks = peerWaterMarksByTerm.get(currTerm);long quorumIndex = -1;for (Long index : peerWaterMarks.values()) {int num = 0;for (Long another : peerWaterMarks.values()) {if (another >= index) {num++;}}if (memberState.isQuorum(num) && index > quorumIndex) {quorumIndex = index;}}dLedgerStore.updateCommittedIndex(currTerm, quorumIndex);ConcurrentMap<Long, TimeoutFuture<AppendEntryResponse>> responses = pendingAppendResponsesByTerm.get(currTerm);boolean needCheck = false;int ackNum = 0;if (quorumIndex >= 0) {for (Long i = quorumIndex; i >= 0; i--) {try {CompletableFuture<AppendEntryResponse> future = responses.remove(i);if (future == null) {needCheck = lastQuorumIndex != -1 && lastQuorumIndex != quorumIndex && i != lastQuorumIndex;break;} else if (!future.isDone()) {AppendEntryResponse response = new AppendEntryResponse();response.setGroup(memberState.getGroup());response.setTerm(currTerm);response.setIndex(i);response.setLeaderId(memberState.getSelfId());response.setPos(((AppendFuture) future).getPos());future.complete(response);}ackNum++;} catch (Throwable t) {logger.error("Error in ack to index={} term={}", i, currTerm, t);}}}if (ackNum == 0) {for (long i = quorumIndex + 1; i < Integer.MAX_VALUE; i++) {TimeoutFuture<AppendEntryResponse> future = responses.get(i);if (future == null) {break;} else if (future.isTimeOut()) {AppendEntryResponse response = new AppendEntryResponse();response.setGroup(memberState.getGroup());response.setCode(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT.getCode());response.setTerm(currTerm);response.setIndex(i);response.setLeaderId(memberState.getSelfId());future.complete(response);} else {break;}}waitForRunning(1);}if (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000 || needCheck) {updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex());for (Map.Entry<Long, TimeoutFuture<AppendEntryResponse>> futureEntry : responses.entrySet()) {if (futureEntry.getKey() < quorumIndex) {AppendEntryResponse response = new AppendEntryResponse();response.setGroup(memberState.getGroup());response.setTerm(currTerm);response.setIndex(futureEntry.getKey());response.setLeaderId(memberState.getSelfId());response.setPos(((AppendFuture) futureEntry.getValue()).getPos());futureEntry.getValue().complete(response);responses.remove(futureEntry.getKey());}}lastCheckLeakTimeMs = System.currentTimeMillis();}lastQuorumIndex = quorumIndex;} catch (Throwable t) {DLedgerEntryPusher.logger.error("Error in {}", getName(), t);DLedgerUtils.sleep(100);}}}/*** This thread will be activated by the leader.* This thread will push the entry to follower(identified by peerId) and update the completed pushed index to index map.* Should generate a single thread for each peer.* The push has 4 types:*   APPEND : append the entries to the follower*   COMPARE : if the leader changes, the new leader should compare its entries to follower's*   TRUNCATE : if the leader finished comparing by an index, the leader will send a request to truncate the follower's ledger*   COMMIT: usually, the leader will attach the committed index with the APPEND request, but if the append requests are few and scattered,*           the leader will send a pure request to inform the follower of committed index.**   The common transferring between these types are as following:**   COMPARE ---- TRUNCATE ---- APPEND ---- COMMIT*   ^                             |*   |---<-----<------<-------<----|**//* 主节点使用线程,用来传送日志数据,leader会为每个follower创建一个entryDispatcher* leader向follower发送4种请求类型:append、compare、truncate、commit* append:follower追加日志数据* compare:leader节点变更,需要检查主从数据一致性* truncate:检测到主从数据不一致,follower需要追加或者删除对应数据* commit:通常leader会将committed index附加在append 请求中,但是如果append request较少或者分散,leader也会单独发送committed index给follower*/private class EntryDispatcher extends ShutdownAbleThread {private AtomicReference<PushEntryRequest.Type> type = new AtomicReference<>(PushEntryRequest.Type.COMPARE);private long lastPushCommitTimeMs = -1;private String peerId;private long compareIndex = -1;private long writeIndex = -1;private int maxPendingSize = 1000;   //follower待追加日志数位默认为1000private long term = -1;private String leaderId = null;private long lastCheckLeakTimeMs = System.currentTimeMillis();private ConcurrentMap<Long, Long> pendingMap = new ConcurrentHashMap<>(); //记录索引为index的日志发送时间,//key位日志索引,value位leader发送append request时间戳//当超过一段时间(dLedgerConfig.getMaxPushTimeOutMs())没有收到follower响应时,主动重新发送日志private Quota quota = new Quota(dLedgerConfig.getPeerPushQuota());public EntryDispatcher(String peerId, Logger logger) {super("EntryDispatcher-" + memberState.getSelfId() + "-" + peerId, logger);this.peerId = peerId;}private boolean checkAndFreshState() { //检查当前节点是否为主节点,并更新节点状态if (!memberState.isLeader()) {     //不是主节点,返回falsereturn false;}if (term != memberState.currTerm() || leaderId == null || !leaderId.equals(memberState.getLeaderId())) {synchronized (memberState) {if (!memberState.isLeader()) {return false;}PreConditions.check(memberState.getSelfId().equals(memberState.getLeaderId()), DLedgerResponseCode.UNKNOWN);term = memberState.currTerm();           //更新投票termleaderId = memberState.getSelfId();      //更新leaderId为当前节点idchangeState(-1, PushEntryRequest.Type.COMPARE);}}return true;}private PushEntryRequest buildPushRequest(DLedgerEntry entry, PushEntryRequest.Type target) {//构建发向follower节点的请求PushEntryRequest request = new PushEntryRequest();request.setGroup(memberState.getGroup());request.setRemoteId(peerId);request.setLeaderId(leaderId);request.setTerm(term);request.setEntry(entry);request.setType(target);request.setCommitIndex(dLedgerStore.getCommittedIndex());return request;}private void checkQuotaAndWait(DLedgerEntry entry) {  //检查主从复制状况,如果follower落后太多,leader节点停止一段时间if (dLedgerStore.getLedgerEndIndex() - entry.getIndex() <= maxPendingSize) {return;}  //如果leader节点的最新日志索引 - 发送给follower节点的索引 <=1000,直接返回继续运行if (dLedgerStore instanceof DLedgerMemoryStore) {return;}  //如果leader节点在内存中写入,直接返回可继续运行DLedgerMmapFileStore mmapFileStore = (DLedgerMmapFileStore) dLedgerStore;if (mmapFileStore.getDataFileList().getMaxWrotePosition() - entry.getPos() < dLedgerConfig.getPeerPushThrottlePoint()) {return;}  //使用文件写入,leader节点文件最大写入位置 -发送给follower位置 < dLedgerConfig.getPeerPushThrottlePoint()(默认为 300 * 1024 * 1024),直接返回可继续运行quota.sample(entry.getSize());if (quota.validateNow()) {DLedgerUtils.sleep(quota.leftNow());  //不满足上述条件,leader节点停止一段时间}}private void doAppendInner(long index) throws Exception {  //leader发送append request给followerDLedgerEntry entry = dLedgerStore.get(index);PreConditions.check(entry != null, DLedgerResponseCode.UNKNOWN, "writeIndex=%d", index);checkQuotaAndWait(entry);PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.APPEND);CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(request);//主从异步复制pendingMap.put(index, System.currentTimeMillis());responseFuture.whenComplete((x, ex) -> {try {PreConditions.check(ex == null, DLedgerResponseCode.UNKNOWN);DLedgerResponseCode responseCode = DLedgerResponseCode.valueOf(x.getCode());switch (responseCode) {case SUCCESS:            //从节点写入成功pendingMap.remove(x.getIndex());  //删除pendingMap中key为index的数据updatePeerWaterMark(x.getTerm(), peerId, x.getIndex());  //更新follower节点最新写入索引quorumAckChecker.wakeup();  //唤醒线程,检查是否超过半数日志数据写入成功break;case INCONSISTENT_STATE:     //返回数据不一致logger.info("[Push-{}]Get INCONSISTENT_STATE when push index={} term={}", peerId, x.getIndex(), x.getTerm());changeState(-1, PushEntryRequest.Type.COMPARE);  //leader发送compare,检查主从数据一致性break;default:logger.warn("[Push-{}]Get error response code {} {}", peerId, responseCode, x.baseInfo());break;}} catch (Throwable t) {logger.error("", t);}});lastPushCommitTimeMs = System.currentTimeMillis();      //设置最后一次提交数据时间为当前时间(committedIndex与append request一起发送)}private void doCommit() throws Exception {       //发送日志提交请求if (DLedgerUtils.elapsed(lastPushCommitTimeMs) > 1000) {  //如果最后一次发送日志commit请求间隔超过1000ms,主动推送committedIndexPushEntryRequest request = buildPushRequest(null, PushEntryRequest.Type.COMMIT);//Ignore the resultsdLedgerRpcService.push(request);      //不处理commit request的响应数据lastPushCommitTimeMs = System.currentTimeMillis();  //设置最后一次推送committedIndex时间为当前时间}}private void doCheckAppendResponse() throws Exception {  //检查append request最后发送时间,超过指定时间间隔没有收到数据响应,主动重新发送数据long peerWaterMark = getPeerWaterMark(term, peerId);Long sendTimeMs = pendingMap.get(peerWaterMark + 1);if (sendTimeMs != null && System.currentTimeMillis() - sendTimeMs > dLedgerConfig.getMaxPushTimeOutMs()) {//已经发送peerWaterMark + 1的数据,超过指定时间没有收到响应信息logger.warn("[Push-{}]Retry to push entry at {}", peerId, peerWaterMark + 1);doAppendInner(peerWaterMark + 1);  //重新发送索引为peerWaterMark + 1的数据}}private void doAppend() throws Exception {  //发送append requestwhile (true) {if (!checkAndFreshState()) {break;  //当前节点非leader 节点,退出}if (type.get() != PushEntryRequest.Type.APPEND) {break;  //发送请求不是append类型,退出}if (writeIndex > dLedgerStore.getLedgerEndIndex()) {doCommit();doCheckAppendResponse();break;  //写入索引大于leader节点日志索引,向follower发送提交请求,检查follower节点响应状况,退出}if (pendingMap.size() >= maxPendingSize || (DLedgerUtils.elapsed(lastCheckLeakTimeMs) > 1000)) {//pendingMap超过1000,lastCheckLeakTimeMs间隔超过1000mslong peerWaterMark = getPeerWaterMark(term, peerId);for (Long index : pendingMap.keySet()) {if (index < peerWaterMark) {pendingMap.remove(index);}  //遍历follower id,如果index < 最新写入索引,删除pendingMap索引为index的数据}lastCheckLeakTimeMs = System.currentTimeMillis();  //设置lastCheckLeakTimeMs}if (pendingMap.size() >= maxPendingSize) {  //如果pendingMap大小超过1000,且lastCheckLeakTimeMs间隔时间没有超过1000msdoCheckAppendResponse();break;  //检查主从响应时间有无超时,退出}doAppendInner(writeIndex);   //追加写入数据writeIndex++;                //写入索引自增}}private void doTruncate(long truncateIndex) throws Exception {  //发送truncate请求,truncateIndex为follower节点开始复制的索引PreConditions.check(type.get() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN);//检查当前请求是否为truncateDLedgerEntry truncateEntry = dLedgerStore.get(truncateIndex);PreConditions.check(truncateEntry != null, DLedgerResponseCode.UNKNOWN);logger.info("[Push-{}]Will push data to truncate truncateIndex={} pos={}", peerId, truncateIndex, truncateEntry.getPos());PushEntryRequest truncateRequest = buildPushRequest(truncateEntry, PushEntryRequest.Type.TRUNCATE);//构建truncate requestPushEntryResponse truncateResponse = dLedgerRpcService.push(truncateRequest).get(3, TimeUnit.SECONDS);//发送truncate request,设置超时时间为3sPreConditions.check(truncateResponse != null, DLedgerResponseCode.UNKNOWN, "truncateIndex=%d", truncateIndex);PreConditions.check(truncateResponse.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(truncateResponse.getCode()), "truncateIndex=%d", truncateIndex);//检查truncate request的响应信息,success表示日志写入成功,否则抛出异常lastPushCommitTimeMs = System.currentTimeMillis();        //设置最新推送时间changeState(truncateIndex, PushEntryRequest.Type.APPEND); //改变请求状态为append}private synchronized void changeState(long index, PushEntryRequest.Type target) {logger.info("[Push-{}]Change state from {} to {} at {}", peerId, type.get(), target, index);switch (target) {case APPEND:compareIndex = -1;updatePeerWaterMark(term, peerId, index);     //设置follower节点最新写入索引quorumAckChecker.wakeup();                    //唤醒检查线程判断是否超过半数节点成功写入数据writeIndex = index + 1;                       //写入索引加 1break;case COMPARE:if (this.type.compareAndSet(PushEntryRequest.Type.APPEND, PushEntryRequest.Type.COMPARE)) {compareIndex = -1;pendingMap.clear();        //如果请求类型为compare,清空pendingMap}break;case TRUNCATE:compareIndex = -1;break;default:break;}type.set(target);}private void doCompare() throws Exception {  //发送compare请求while (true) {if (!checkAndFreshState()) {break;     //检查更新节点状态,不为leader则推出}if (type.get() != PushEntryRequest.Type.COMPARE&& type.get() != PushEntryRequest.Type.TRUNCATE) {break;     //请求类型不是compare、truncate,退出}if (compareIndex == -1 && dLedgerStore.getLedgerEndIndex() == -1) {break;}//revise the compareIndexif (compareIndex == -1) {   //如果compareIndex = -1,将compareIndex设置为leader节点日志最新索引compareIndex = dLedgerStore.getLedgerEndIndex();logger.info("[Push-{}][DoCompare] compareIndex=-1 means start to compare", peerId);} else if (compareIndex > dLedgerStore.getLedgerEndIndex() || compareIndex < dLedgerStore.getLedgerBeginIndex()) {//如果compareIndex > leader日志最新索引,或者< leader日志最小索引,将compare设置为leader节点最新索引logger.info("[Push-{}][DoCompare] compareIndex={} out of range {}-{}", peerId, compareIndex, dLedgerStore.getLedgerBeginIndex(), dLedgerStore.getLedgerEndIndex());compareIndex = dLedgerStore.getLedgerEndIndex();}DLedgerEntry entry = dLedgerStore.get(compareIndex);PreConditions.check(entry != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", compareIndex);PushEntryRequest request = buildPushRequest(entry, PushEntryRequest.Type.COMPARE);CompletableFuture<PushEntryResponse> responseFuture = dLedgerRpcService.push(request);PushEntryResponse response = responseFuture.get(3, TimeUnit.SECONDS);PreConditions.check(response != null, DLedgerResponseCode.INTERNAL_ERROR, "compareIndex=%d", compareIndex);PreConditions.check(response.getCode() == DLedgerResponseCode.INCONSISTENT_STATE.getCode() || response.getCode() == DLedgerResponseCode.SUCCESS.getCode(), DLedgerResponseCode.valueOf(response.getCode()), "compareIndex=%d", compareIndex);long truncateIndex = -1;if (response.getCode() == DLedgerResponseCode.SUCCESS.getCode()) {//如果响应状态为success/** The comparison is successful:* 1.Just change to append state, if the follower's end index is equal the compared index.* 2.Truncate the follower, if the follower has some dirty entries.*/if (compareIndex == response.getEndIndex()) {    //leader节点最新索引等于follower节点最新索引,直接追加数据changeState(compareIndex, PushEntryRequest.Type.APPEND);break;} else {truncateIndex = compareIndex;}  //如果follower节点响应的endIndex > compareIndex,说明存在脏数据,截断数据} else if (response.getEndIndex() < dLedgerStore.getLedgerBeginIndex()|| response.getBeginIndex() > dLedgerStore.getLedgerEndIndex()) {/*The follower's entries does not intersect with the leader.This usually happened when the follower has crashed for a long time while the leader has deleted the expired entries.Just truncate the follower.*///follower节点的数据不与leader节点相交,通常是follower节点故障很长时间//follower节点全量复制leader节点数据,truncateIndex设置为主节点初始索引dLedgerStore.getLedgerBeginIndex()truncateIndex = dLedgerStore.getLedgerBeginIndex();} else if (compareIndex < response.getBeginIndex()) {/*The compared index is smaller than the follower's begin index.This happened rarely, usually means some disk damage.Just truncate the follower.*///如果comparedIndex < follower节点初始索引,follower节点全亮复制leader数据truncateIndex = dLedgerStore.getLedgerBeginIndex();} else if (compareIndex > response.getEndIndex()) {/*The compared index is bigger than the follower's end index.This happened frequently. For the compared index is usually starting from the end index of the leader.*///compareIndex > follower节点最新索引,这种情况经常发生,表明follower节点落后于leader节点//设置compareIndex为follower节点最新索引,重新比较compareIndex = response.getEndIndex();} else {/*Compare failed and the compared index is in the range of follower's entries.*/compareIndex--;  //比较失败,comparedIndex减1}/*The compared index is smaller than the leader's begin index, truncate the follower.*/if (compareIndex < dLedgerStore.getLedgerBeginIndex()) {truncateIndex = dLedgerStore.getLedgerBeginIndex();}  //如果comparedIndex < leader初始索引,follower节点全量复制leader数据/*If get value for truncateIndex, do it right now.*/if (truncateIndex != -1) {changeState(truncateIndex, PushEntryRequest.Type.TRUNCATE);doTruncate(truncateIndex);break;}  //获取到truncateIndex,发送truncate request}}@Overridepublic void doWork() {try {if (!checkAndFreshState()) {waitForRunning(1);return;}if (type.get() == PushEntryRequest.Type.APPEND) {doAppend();} else {doCompare();}waitForRunning(1);} catch (Throwable t) {DLedgerEntryPusher.logger.error("[Push-{}]Error in {} writeIndex={} compareIndex={}", peerId, getName(), writeIndex, compareIndex, t);DLedgerUtils.sleep(500);}}}/*** This thread will be activated by the follower.* Accept the push request and order it by the index, then append to ledger store one by one.**///follower节点开启线程(entryHandler)追加日志数据private class EntryHandler extends ShutdownAbleThread {private long lastCheckFastForwardTimeMs = System.currentTimeMillis();ConcurrentMap<Long, Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>> writeRequestMap = new ConcurrentHashMap<>();//follower节点待写队列BlockingQueue<Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>> compareOrTruncateRequests = new ArrayBlockingQueue<Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>>>(100);//follower节点接收的compare、truncate requestpublic EntryHandler(Logger logger) {super("EntryHandler", logger);}public CompletableFuture<PushEntryResponse> handlePush(PushEntryRequest request) throws Exception {//处理leader节点发送的请求//The timeout should smaller than the remoting layer's request timeoutCompletableFuture<PushEntryResponse> future = new TimeoutFuture<>(1000);switch (request.getType()) {case APPEND:              //日志追加请求PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);long index = request.getEntry().getIndex();Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> old = writeRequestMap.putIfAbsent(index, new Pair<>(request, future));if (old != null) {logger.warn("[MONITOR]The index {} has already existed with {} and curr is {}", index, old.getKey().baseInfo(), request.baseInfo());future.complete(buildResponse(request, DLedgerResponseCode.REPEATED_PUSH.getCode()));}break;case COMMIT:              //日志提交请求compareOrTruncateRequests.put(new Pair<>(request, future));break;case COMPARE:             //日志compare请求case TRUNCATE:            //日志truncate请求PreConditions.check(request.getEntry() != null, DLedgerResponseCode.UNEXPECTED_ARGUMENT);writeRequestMap.clear();compareOrTruncateRequests.put(new Pair<>(request, future));break;default:logger.error("[BUG]Unknown type {} from {}", request.getType(), request.baseInfo());future.complete(buildResponse(request, DLedgerResponseCode.UNEXPECTED_ARGUMENT.getCode()));break;}return future;}private PushEntryResponse buildResponse(PushEntryRequest request, int code) {//构件响应信息PushEntryResponse response = new PushEntryResponse();response.setGroup(request.getGroup());response.setCode(code);response.setTerm(request.getTerm());if (request.getType() != PushEntryRequest.Type.COMMIT) {response.setIndex(request.getEntry().getIndex());}response.setBeginIndex(dLedgerStore.getLedgerBeginIndex());response.setEndIndex(dLedgerStore.getLedgerEndIndex());return response;}private void handleDoAppend(long writeIndex, PushEntryRequest request,CompletableFuture<PushEntryResponse> future) {try {PreConditions.check(writeIndex == request.getEntry().getIndex(), DLedgerResponseCode.INCONSISTENT_STATE);DLedgerEntry entry = dLedgerStore.appendAsFollower(request.getEntry(), request.getTerm(), request.getLeaderId());//follower追加写入日志数据(日志内容、投票term、leaderId)PreConditions.check(entry.getIndex() == writeIndex, DLedgerResponseCode.INCONSISTENT_STATE);future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));dLedgerStore.updateCommittedIndex(request.getTerm(), request.getCommitIndex());//更新follower节点committedIndex} catch (Throwable t) {logger.error("[HandleDoWrite] writeIndex={}", writeIndex, t);future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));}}private CompletableFuture<PushEntryResponse> handleDoCompare(long compareIndex, PushEntryRequest request,CompletableFuture<PushEntryResponse> future) {//处理comparerequesttry {PreConditions.check(compareIndex == request.getEntry().getIndex(), DLedgerResponseCode.UNKNOWN);PreConditions.check(request.getType() == PushEntryRequest.Type.COMPARE, DLedgerResponseCode.UNKNOWN);DLedgerEntry local = dLedgerStore.get(compareIndex);  //查找本地compareIndex索引对应的日志数据PreConditions.check(request.getEntry().equals(local), DLedgerResponseCode.INCONSISTENT_STATE);//数据一致,返回success//数据不一致,返回inconsistent_statefuture.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));} catch (Throwable t) {logger.error("[HandleDoCompare] compareIndex={}", compareIndex, t);future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));}return future;}private CompletableFuture<PushEntryResponse> handleDoCommit(long committedIndex, PushEntryRequest request,CompletableFuture<PushEntryResponse> future) {//处理commit requesttry {PreConditions.check(committedIndex == request.getCommitIndex(), DLedgerResponseCode.UNKNOWN);PreConditions.check(request.getType() == PushEntryRequest.Type.COMMIT, DLedgerResponseCode.UNKNOWN);dLedgerStore.updateCommittedIndex(request.getTerm(), committedIndex);//更新follower节点committedIndexfuture.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));} catch (Throwable t) {logger.error("[HandleDoCommit] committedIndex={}", request.getCommitIndex(), t);future.complete(buildResponse(request, DLedgerResponseCode.UNKNOWN.getCode()));}return future;}private CompletableFuture<PushEntryResponse> handleDoTruncate(long truncateIndex, PushEntryRequest request,CompletableFuture<PushEntryResponse> future) {//处理truncate requesttry {logger.info("[HandleDoTruncate] truncateIndex={} pos={}", truncateIndex, request.getEntry().getPos());PreConditions.check(truncateIndex == request.getEntry().getIndex(), DLedgerResponseCode.UNKNOWN);PreConditions.check(request.getType() == PushEntryRequest.Type.TRUNCATE, DLedgerResponseCode.UNKNOWN);long index = dLedgerStore.truncate(request.getEntry(), request.getTerm(), request.getLeaderId());//阶段日志数据,返回最新索引PreConditions.check(index == truncateIndex, DLedgerResponseCode.INCONSISTENT_STATE);//如果index等于truncateIndex,返回success//否则返回inconsistent_statefuture.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode()));dLedgerStore.updateCommittedIndex(request.getTerm(), request.getCommitIndex());//follower节点更新committedIndex} catch (Throwable t) {logger.error("[HandleDoTruncate] truncateIndex={}", truncateIndex, t);future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode()));}return future;}/*** The leader does push entries to follower, and record the pushed index. But in the following conditions, the push may get stopped.*   * If the follower is abnormally shutdown, its ledger end index may be smaller than before. At this time, the leader may push fast-forward entries, and retry all the time.*   * If the last ack is missed, and no new message is coming in.The leader may retry push the last message, but the follower will ignore it.* @param endIndex*///以下情形,leader不会向follower推送日志数据:follower异常关闭、ack丢失private void checkAbnormalFuture(long endIndex) {  //follower异常检查if (DLedgerUtils.elapsed(lastCheckFastForwardTimeMs) < 1000) {return;}lastCheckFastForwardTimeMs  = System.currentTimeMillis();if (writeRequestMap.isEmpty()) {return;}long minFastForwardIndex = Long.MAX_VALUE;for (Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair : writeRequestMap.values()) {long index = pair.getKey().getEntry().getIndex();//Fall behindif (index <= endIndex) {  //读写队列落后于follower最新索引,此时发生ack丢失try {DLedgerEntry local = dLedgerStore.get(index);PreConditions.check(pair.getKey().getEntry().equals(local), DLedgerResponseCode.INCONSISTENT_STATE);//检查数据是否一致,不一致则抛出异常pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.SUCCESS.getCode()));//数据一致返回successlogger.warn("[PushFallBehind]The leader pushed an entry index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", index, endIndex);} catch (Throwable t) {logger.error("[PushFallBehind]The leader pushed an entry index={} smaller than current ledgerEndIndex={}, maybe the last ack is missed", index, endIndex, t);pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));}writeRequestMap.remove(index);  //删除follower writeRequestMap 索引为index的追加请求continue;}//Just OKif (index ==  endIndex + 1) {       //ack没有丢失,执行后续追加操作//The next entry is coming, just returnreturn;}//Fast forward//writeRequestMap index > follower最新日志索引 + 1,说明append请求丢失,leader需重新推送数据TimeoutFuture<PushEntryResponse> future  = (TimeoutFuture<PushEntryResponse>) pair.getValue();if (!future.isTimeOut()) {continue;}if (index < minFastForwardIndex) {minFastForwardIndex = index;}}if (minFastForwardIndex == Long.MAX_VALUE) {return;}Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.get(minFastForwardIndex);if (pair == null) {return;}logger.warn("[PushFastForward] ledgerEndIndex={} entryIndex={}", endIndex, minFastForwardIndex);pair.getValue().complete(buildResponse(pair.getKey(), DLedgerResponseCode.INCONSISTENT_STATE.getCode()));}@Overridepublic void doWork() {try {if (!memberState.isFollower()) {       //当前节点不是follower节点,等待1ms后返回waitForRunning(1);return;}if (compareOrTruncateRequests.peek() != null) {  //如果存在compare或者truncate request,先处理compare、truncate请求Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = compareOrTruncateRequests.poll();PreConditions.check(pair != null, DLedgerResponseCode.UNKNOWN);switch (pair.getKey().getType()) {case TRUNCATE:      //处理truncate请求handleDoTruncate(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue());break;case COMPARE:       //处理compare请求handleDoCompare(pair.getKey().getEntry().getIndex(), pair.getKey(), pair.getValue());break;case COMMIT:        //处理commit请求handleDoCommit(pair.getKey().getCommitIndex(), pair.getKey(), pair.getValue());break;default:break;}} else {      //追加日志long nextIndex = dLedgerStore.getLedgerEndIndex() + 1;  //待写入的索引Pair<PushEntryRequest, CompletableFuture<PushEntryResponse>> pair = writeRequestMap.remove(nextIndex); //删除并返回nextIndex对应的append请求if (pair == null) {  //如果pair为null,检查follower节点异常checkAbnormalFuture(dLedgerStore.getLedgerEndIndex());waitForRunning(1);return;}PushEntryRequest request = pair.getKey();handleDoAppend(nextIndex, request, pair.getValue());  //追加日志数据}} catch (Throwable t) {DLedgerEntryPusher.logger.error("Error in {}", getName(), t);DLedgerUtils.sleep(100);}}}
}

rocketmq DLedger主从自动切换相关推荐

  1. Linux企业化运维--(7)redis服务之redis配置及主从复制、主从自动切换、集群、redis+mysql、gearman实现数据同步

    Linux企业化运维 实验所用系统为Redhat-rhel7.6. 目录 Linux企业化运维 Linux企业化运维--(7)redis服务之redis配置及主从复制.主从自动切换.集群.redis+ ...

  2. redis的主从自动切换

    设置redis主从主要是在不同的主机上编辑配置文件 我们准备三台主机 分别是server1.server2.server3 redis的主从自动切换是基于sentinel(哨兵) 1.Redis 的 ...

  3. 双机高可用、负载均衡、MySQL(读写分离、主从自动切换)架构设计

    前几天网友来信说帮忙实现这样一个架构:只有两台机器,需要实现其中一台死机之后另一台能接管这台机器的服务,并且在两台机器正常服务时,两台机器都能用上.于是设计了如下的架构. 架构简介 此架构主要是由ke ...

  4. php redis主从自动切换,Redis 集群的主从切换

    Redis 集群的主从切换不再使用 Sentinel 作为外置监控, 而是集群内部在主节点挂掉之后选举出一个从节点取代主节点, 处理相应的分片的数据请求. 当然前提条件是对应的主节点有至少一个可连通的 ...

  5. k8s主从自动切换mysql_Kubernetes一键部署Mycat+Mysql主从集群

    Kubernetes一键部署一主一从,读写分离,自动切换的mycat+mysql架构,其中mycat配置文件.mysqsl数据文件的volume挂载未涉及,可根据实际情况进行修改.所有代码参照gith ...

  6. MyCat2 mysql8 读写分离 主从自动切换

    MyCat2数据中间件应用 mysql主从配置与自动切换 环境准备 三台虚拟机192.168.2.5(mycat).192.168.2.6(mysql1).192.168.2.7(mysql2) jd ...

  7. java redis 故障切换_java使用Redis6–sentinel单点故障主从自动切换

    Redis Sentinel Sentinel(哨兵)是用于监控redis集群中Master状态的工具,其已经被集成在redis2.4+的版本中 一.Sentinel作用: 1):Master状态检测 ...

  8. k8s主从自动切换mysql_K8S与Ceph RBD集成-多主与主从数据库示例

    参考文章: 感谢以上作者提供的技术参考,这里我加以整理,分别实现了多主数据库集群和主从数据库结合Ceph RDB的实现方式.以下配置只为测试使用,不能做为生产配置. K8S中存储的分类 在K8S的持久 ...

  9. mycat定时向mysql存储数据_【实战演练】Linux操作系统20-MyCat实现Mysql数据库读写分离与自动切换...

    #本文欢迎转载,转载请注明出处和作者. 理论部分,详见:繁星亮与鲍包包:[理论研究]业务系统高可用及负载均衡​zhuanlan.zhihu.com 本篇主要实现"8.Mysql读写分离&qu ...

最新文章

  1. 如何重构“箭头型”代码
  2. 通过VNC Viewer使用VMware虚拟机的远程桌面连接
  3. 如何保护企业网络免受勒索软件攻击 Vecloud微云
  4. oracle简易版创建数据库,浅析新建Oracle数据库的三种方法
  5. 直入灵魂的Python教学:《看动漫学Python》让学习不再枯燥
  6. 二分法求非线性方程组Java_用C#编写二分法解一元非线性方程
  7. 涂威威:第四范式经验与思考分享
  8. 转载:Charles 使用过程中遇到问题
  9. 一款基于SpringCloudAlibaba从0到1手敲的商城项目mtg-shop
  10. 二维otsu算法python_图像二值化与otsu算法介绍
  11. LPDDR4协议规范之 (四)命令和时序(转自https://blog.csdn.net/YJFeiii/article/details/105475327)
  12. Mac 使用终端彻底格式化U盘
  13. lisp princ详解_晓东CAD家园-论坛-A/VLISP-正则表达式lisp实例讲解-通过代码,完整的展示了正则表达式在lisp中使用 - Powered by Discuz!...
  14. 利用手机模拟器进行apk抓包分析
  15. python爬虫|post的响应,利用python实现有道翻译在线翻译
  16. 场景管理:四叉树算法C++实现
  17. Codeforces Round #362 (Div. 2) E. PLEASE(数论 + 递推)
  18. python爬虫—练习题(re,requestBeautifulSoup,selenium)
  19. base64格式转为二进制流
  20. SAR,SHR,SAL,SHL区别

热门文章

  1. Android 时间轴的实现
  2. 手机照片压缩的快捷方法
  3. 【MySQL 数据库】MySQL 的对库的操作及其数据类型
  4. 十进制数转换为二进制,八进制,十六进制数的算法(欢迎拍砖)
  5. arange和range
  6. A/B-Test简介
  7. VxWorks中文FAQ(转载)
  8. 如何启动一个vue项目
  9. python快速搭建本地服务器
  10. java 文件移动文件夹_java移动文件到另一个文件夹中