Redis源码解读(二十五)——集群模式—failover
Redis集群模式不仅提供可扩展性,还提供了高可用性,如果某个分片的master挂了,会自动从这个分片的slave中选取一个slave,接替master继续提供服务,这个选举过程与raft算法里的failover过程很相似。
Redis的failover分为两种,自动failover和手动failover
自动failover
在周期性函数ClusterCron中,如果发现给某个节点发送PING消息之后,该节点超过server.cluster_node_timeout的时间还没有回复PONG消息,就会给该节点打上CLUSTER_NODE_PFAIL标志,最后在clusterHandleSlaveFailover中尝试进行slave的failover:
1、首先slave判断master是否已经断联一段时间了
2、如果本次slave的failover超时,则设置下一次failover的时间,这个时间的设置也很有讲究:首先是固定的500ms+随机的500ms,用随机的500ms是为了避免这个分片的slave同时发送failover的投票请求,最后再加上每个slave的rank,salve的offset越大,也就是说这个slave与master同步的越多,这个slave的rank越大,也就能尽快发出failover投票请求,也就更容易成功当选为新的master
3、等待发送failover的投票请求
4、提升自己的currentepoch,并发送投票请求
5、如果集群中超过半数的master同意选举,则提升自己的configepoch,并设置自己为这个分片的master,继续提供服务
void clusterHandleSlaveFailover(void) {.../* Check if our data is recent enough according to the slave validity* factor configured by the user.** Check bypassed for manual failovers. */// 检查master是否已经失联的时间超过阈值if (server.cluster_slave_validity_factor &&data_age >(((mstime_t)server.repl_ping_slave_period * 1000) +(server.cluster_node_timeout * server.cluster_slave_validity_factor))){if (!manual_failover) {clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);return;}}...// 设置下一次failover的时间/* If the previous failover attempt timedout and the retry time has* elapsed, we can setup a new one. */if (auth_age > auth_retry_time) {server.cluster->failover_auth_time = mstime() +500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */random() % 500; /* Random delay between 0 and 500 milliseconds. */server.cluster->failover_auth_count = 0;server.cluster->failover_auth_sent = 0;server.cluster->failover_auth_rank = clusterGetSlaveRank();/* We add another delay that is proportional to the slave rank.* Specifically 1 second * rank. This way slaves that have a probably* less updated replication offset, are penalized. */server.cluster->failover_auth_time +=server.cluster->failover_auth_rank * 1000;/* However if this is a manual failover, no delay is needed. */if (server.cluster->mf_end) {server.cluster->failover_auth_time = mstime();server.cluster->failover_auth_rank = 0;clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);}serverLog(LL_WARNING,"Start of election delayed for %lld milliseconds ""(rank #%d, offset %lld).",server.cluster->failover_auth_time - mstime(),server.cluster->failover_auth_rank,replicationGetSlaveOffset());/* Now that we have a scheduled election, broadcast our offset* to all the other slaves so that they'll updated their offsets* if our offset is better. */clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);return;}.../* Return ASAP if we can't still start the election. */// 等待failover请求的发送时间if (mstime() < server.cluster->failover_auth_time) {clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);return;}/* Return ASAP if the election is too old to be valid. */// 如果本次failover超时,则返回,等待下一次机会if (auth_age > auth_timeout) {clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);return;}...// 提升自己的currentepoch,并发送投票请求/* Ask for votes if needed. */if (server.cluster->failover_auth_sent == 0) {server.cluster->currentEpoch++;server.cluster->failover_auth_epoch = server.cluster->currentEpoch;serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",(unsigned long long) server.cluster->currentEpoch);clusterRequestFailoverAuth();server.cluster->failover_auth_sent = 1;clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_FSYNC_CONFIG);return; /* Wait for replies. */}// 如果集群中超过半数的master同意选举,则提升自己的configepoch,并设置自己为这个分片的master,继续提供服务/* Check if we reached the quorum. */if (server.cluster->failover_auth_count >= needed_quorum) {/* We have the quorum, we can finally failover the master. */serverLog(LL_WARNING,"Failover election won: I'm the new master.");/* Update my configEpoch to the epoch of the election. */if (myself->configEpoch < server.cluster->failover_auth_epoch) {// 提升自己的configepochmyself->configEpoch = server.cluster->failover_auth_epoch;serverLog(LL_WARNING,"configEpoch set to %llu after successful failover",(unsigned long long) myself->configEpoch);}/* Take responsibility for the cluster slots. */clusterFailoverReplaceYourMaster();} else {clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);}
}
通过clusterRequestFailoverAuth发起选举:
void clusterRequestFailoverAuth(void) {unsigned char buf[sizeof(clusterMsg)];clusterMsg *hdr = (clusterMsg*) buf;uint32_t totlen;// 构造选举消息,注意此时的curccurentepech已经++clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);/* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit* in the header to communicate the nodes receiving the message that* they should authorized the failover even if the master is working. */if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);hdr->totlen = htonl(totlen);// 向集群中所有节点广播消息clusterBroadcastMessage(buf,totlen);
}
集群其他节点中收到这个slave的选取请求时,通过clusterSendFailoverAuthIfNeeded函数来判断是否给这个slave投票:
1、节点必须为master,且必须负责一定数量的slot
2、投票消息中的currentEpoch要大于该节点的currentEpoch
3、当前节点还没有投过票
4、对端节点必须是slave,并且对端节点的master在本节点看来是FAIL状态
5、该节点看来,对应的master达到了需要failover的时间
6、该节点的configepoch要小于等于对端节点的configepoch
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {...// 节点必须为master,且必须负责一定数量的slotif (nodeIsSlave(myself) || myself->numslots == 0) return;// 投票消息中的currentEpoch要大于该节点的currentEpochif (requestCurrentEpoch < server.cluster->currentEpoch) {serverLog(LL_WARNING,"Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",node->name,(unsigned long long) requestCurrentEpoch,(unsigned long long) server.cluster->currentEpoch);return;}/* I already voted for this epoch? Return ASAP. */// 当前节点还没有投过票if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {serverLog(LL_WARNING,"Failover auth denied to %.40s: already voted for epoch %llu",node->name,(unsigned long long) server.cluster->currentEpoch);return;}// 对端节点必须是slave,并且对端节点的master在本节点看来是FAIL状态/* Node must be a slave and its master down.* The master can be non failing if the request is flagged* with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */if (nodeIsMaster(node) || master == NULL ||(!nodeFailed(master) && !force_ack)){if (nodeIsMaster(node)) {serverLog(LL_WARNING,"Failover auth denied to %.40s: it is a master node",node->name);} else if (master == NULL) {serverLog(LL_WARNING,"Failover auth denied to %.40s: I don't know its master",node->name);} else if (!nodeFailed(master)) {serverLog(LL_WARNING,"Failover auth denied to %.40s: its master is up",node->name);}return;}// 该节点看来,对应的master达到了需要failover的时间/* We did not voted for a slave about this master for two* times the node timeout. This is not strictly needed for correctness* of the algorithm but makes the base case more linear. */if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2){serverLog(LL_WARNING,"Failover auth denied to %.40s: ""can't vote about this master before %lld milliseconds",node->name,(long long) ((server.cluster_node_timeout*2)-(mstime() - node->slaveof->voted_time)));return;}// 该节点的configepoch要小于等于对端节点的configepoch/* The slave requesting the vote must have a configEpoch for the claimed* slots that is >= the one of the masters currently serving the same* slots in the current configuration. */for (j = 0; j < CLUSTER_SLOTS; j++) {if (bitmapTestBit(claimed_slots, j) == 0) continue;if (server.cluster->slots[j] == NULL ||server.cluster->slots[j]->configEpoch <= requestConfigEpoch){continue;}/* If we reached this point we found a slot that in our current slots* is served by a master with a greater configEpoch than the one claimed* by the slave requesting our vote. Refuse to vote for this slave. */serverLog(LL_WARNING,"Failover auth denied to %.40s: ""slot %d epoch (%llu) > reqEpoch (%llu)",node->name, j,(unsigned long long) server.cluster->slots[j]->configEpoch,(unsigned long long) requestConfigEpoch);return;}// 最后为该节点投票/* We can vote for this slave. */server.cluster->lastVoteEpoch = server.cluster->currentEpoch;node->slaveof->voted_time = mstime();clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);clusterSendFailoverAuth(node);serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",node->name, (unsigned long long) server.cluster->currentEpoch);...
}
最后如果slave选举成功,通过clusterFailoverReplaceYourMaster函数来向集群中其他节点标明,自己已经成功当选master了,并且将会负责之前master的slot,继续提供服务:
void clusterFailoverReplaceYourMaster(void) {int j;clusterNode *oldmaster = myself->slaveof;if (nodeIsMaster(myself) || oldmaster == NULL) return;/* 1) Turn this node into a master. */clusterSetNodeAsMaster(myself);replicationUnsetMaster();/* 2) Claim all the slots assigned to our master. */for (j = 0; j < CLUSTER_SLOTS; j++) {if (clusterNodeGetSlotBit(oldmaster,j)) {clusterDelSlot(j);clusterAddSlot(myself,j);}}/* 3) Update state and save config. */clusterUpdateState();clusterSaveConfigOrDie(1);/* 4) Pong all the other nodes so that they can update the state* accordingly and detect that we switched to master role. */// 广播一条消息,标明自己当选masterclusterBroadcastPong(CLUSTER_BROADCAST_ALL);/* 5) If there was a manual failover in progress, clear the state. */resetManualFailover();
}
手动failover
手动failover指的是主动发送命令强制某个分片的slave替换为master,命令为: Cluster FAILOVER [force|takeover],其中takeover会立即执行failover,slave替换master提供服务,force表示slave可以立即发起选取,等集群中大多数master同意之后,就可以替换master提供服务,如果没有这两个选项,那么slave要等待offset追上master,然后再发起选举:
void clusterCommand(client *c) {if (!strcasecmp(c->argv[1]->ptr,"failover") &&(c->argc == 2 || c->argc == 3)){if (takeover) {/* A takeover does not perform any initial check. It just* generates a new configuration epoch for this node without* consensus, claims the master's slots, and broadcast the new* configuration. */serverLog(LL_WARNING,"Taking over the master (user request).");// 提升configepechclusterBumpConfigEpochWithoutConsensus();// 强制提升自己为masterclusterFailoverReplaceYourMaster();} else if (force) {/* If this is a forced failover, we don't need to talk with our* master to agree about the offset. We just failover taking over* it without coordination. */serverLog(LL_WARNING,"Forced failover user request accepted.");// 设置manualFailover开始server.cluster->mf_can_start = 1;} else {serverLog(LL_WARNING,"Manual failover user request accepted.");clusterSendMFStart(myself->slaveof);}addReply(c,shared.ok);}
}
节点收到命令之后,在周期性函数ClusterCron中执行manualfailover:
void clusterCron(void) {...// 检查manualfailover是否超时manualFailoverCheckTimeout();if (nodeIsSlave(myself)) {clusterHandleManualFailover();if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))clusterHandleSlaveFailover();/* If there are orphaned slaves, and we are a slave among the masters* with the max number of non-failing slaves, consider migrating to* the orphaned masters. Note that it does not make sense to try* a migration if there is no master with at least *two* working* slaves. */if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)clusterHandleSlaveMigration(max_slaves);}...
}
在clusterHandleManualFailover中,如果发现slave的offset已经追上master了,就可以进行failover:
void clusterHandleManualFailover(void) {/* Return ASAP if no manual failover is in progress. */// 没有manualfailover需求if (server.cluster->mf_end == 0) return;/* If mf_can_start is non-zero, the failover was already triggered so the* next steps are performed by clusterHandleSlaveFailover(). */// 已经进行manualfailover了if (server.cluster->mf_can_start) return;if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */// offset追上master之后,设置server.cluster->mf_can_start为1,进行manualfailoverif (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {/* Our replication offset matches the master replication offset* announced after clients were paused. We can start the failover. */server.cluster->mf_can_start = 1;serverLog(LL_WARNING,"All master replication stream processed, ""manual failover can start.");}
}
具体的failover流程同样在clusterHandleSlaveFailover中实现,如果此时可以进行manualfailover,则进行failover:
void clusterHandleSlaveFailover(void) {...if (nodeIsMaster(myself) ||myself->slaveof == NULL ||(!nodeFailed(myself->slaveof) && !manual_failover) ||(server.cluster_slave_no_failover && !manual_failover) ||myself->slaveof->numslots == 0){/* There are no reasons to failover, so we set the reason why we* are returning without failing over to NONE. */server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;return;}...
}
Redis源码解读(二十五)——集群模式—failover相关推荐
- redis源码剖析(十五)——客户端思维导图整理
redis源码剖析(十五)--客户端执行逻辑结构整理 加载略慢
- redis源码解读二
上一篇解读了一下SDS,本来觉得完了,但之后想想感觉少点什么,现在我们从使用的角度去梳理一下,大家想想对于字符串, 我们经常使用的有哪几个方法呢?这些方法又是怎么实现的? 在研究上面的几个方法之前我们 ...
- 如何访问集群中指定的服务器,【Nacos源码之配置管理 六】集群模式下服务器之间是如何互相感知的...
前言 我们用Nacos当配置中心的时候,上一篇文章中 [Nacos源码之配置管理 五]为什么把配置文件Dump到磁盘中 知道了,所有的配置文件都会Dump到服务器的本地磁盘中, 那么集群模式下: 服务 ...
- 【转】ABP源码分析二十五:EventBus
IEventData/EventData: 封装了EventData信息,触发event的源对象和时间 IEventBus/EventBus: 定义和实现了了一系列注册,注销和触发事件处理函数的方法. ...
- ABP源码分析二十五:EventBus
IEventData/EventData: 封装了EventData信息,触发event的源对象和时间 IEventBus/EventBus: 定义和实现了了一系列注册,注销和触发事件处理函数的方法. ...
- Alamofire源码解读系列(十二)之请求(Request)
本篇是Alamofire中的请求抽象层的讲解 前言 在Alamofire中,围绕着Request,设计了很多额外的特性,这也恰恰表明,Request是所有请求的基础部分和发起点.这无疑给我们一个Req ...
- Spring Cloud 2.2.2 源码之二十九nacos客户端获取配置原理四
Spring Cloud 2.2.2 源码之二十九nacos客户端获取配置原理四 MetricsHttpAgent的httpGet ServerHttpAgent的httpGet HttpSimple ...
- Redis 源码解读之 Rehash 的调用时机
Redis 源码解读之 Rehash 的调用时机 背景和问题 本文想要解决的问题 什么时机触发 Rehash 操作? 什么时机实际执行 Rehash 函数? 结论 什么时机触发 Rehash 操作? ...
- AFNetworking 3.0 源码解读(十)之 UIActivityIndicatorView/UIRefreshControl/UIImageView + AFNetworking...
我们应该看到过很多类似这样的例子:某个控件拥有加载网络图片的能力.但这究竟是怎么做到的呢?看完这篇文章就明白了. 前言 这篇我们会介绍 AFNetworking 中的3个UIKit中的分类.UIAct ...
- php网页游戏学习之xnova(ogame)源码解读,PHP网页游戏学习之Xnova(ogame)源码解读(十六)...
PHP网页游戏学习之Xnova(ogame)源码解读(十六) 作者:bea 十九.攻击任务(MissionCaseAttack.php) 按照舰队任务的编号,排在第一个的就是攻击任务.这个代码很长,看 ...
最新文章
- 机器学习系列 1:监督学习和无监督学习
- iOS开发UI篇—简单介绍静态单元格的使用
- 微信这个坑**的内置浏览器--我很无语
- vijos1774:机器翻译
- getsockname的使用
- 【卡法电子商务】-常用手机屏幕尺寸 ★★★★★
- 蓝桥杯 历届试题 小数第n位
- Python稳基修炼的经典案例13(计算机二级、初学者必会的字符文件处理)
- php xml解析为数组,PHP如何将XML转成数组
- 《机器人爱好者(第2辑)》——部署机械手或末端执行器
- HTML 5参考手册
- Bailian4095 打字员【文本】
- RMQ_第一弹_Sparse Table
- mysql中文版下载5.6_mysql5.6官方版下载
- Debian Linux及kali程序安装卸载方式
- python读取 xls,xlsx,csv,doc,docx,pdf 格式的文件
- 图像的剪切(crop)matlab
- 微信公众号教程(4)微信公众平台编辑模式介绍
- asp.net 加入html,css,js的步骤与错误总结
- Temporary Store limit is 51200 mb, whilst the temporary data directory
热门文章
- http 网络异常请求处理
- x390拆机图解_Thinkpadx390详细拆机图解
- Kubernetes的Limits和Requests
- 8. Celery 4.3.0 Periodic Tasks 周期性任务调度
- WinRAR 破解注册
- 依赖于 mysql-server-5.5; 然而: 软件包 mysql-server-5.5 尚未配置。dpkg: error processing mysql-server (--configur
- OpenPose: Realtime Multi-Person 2D Pose Estimation using Part Affinity Fields
- 合肥工业大学java考试试题_合肥工业大学JAVA程序设计问答题.docx
- 给十二星座的12封信,句句说中你们的心理要害!
- 定制Android模拟器skin