用 Go 语言实现 Raft 选主
用 Go 语言实现 Raft 选主
选主模块主要包括三大功能:
- candidate状态下的选主功能
- leader状态下的心跳广播功能
- follower状态下的确认功能
candidate状态下的选主功能
candidate状态下的选主功能需要关注两个方面:
- 何时进入candidate状态,进行选主?
- 选主的逻辑是怎样的?
首先,来讨论何时进入candidate状态,进行选主。
在一定时间内没有收到来自leader或者其他candidate的有效RPC时,将会触发选主。这里需要关注的是有效两个字,要么是leader发的有效的心跳信息,要么是candidate发的是有效的选主信息,即server本身确认这些信息是有效的后,才会重新更新超时时间,超时时间根据raft论文中推荐设置为[150ms,300ms],并且每次是随机生成的值。
其次,来讨论选主的逻辑。
server首先会进行选主的初始化操作,即server会增加其term,把状态改成candidate,然后选举自己为主,并把选主的RPC并行地发送给集群中其他的server,根据返回的RPC的情况的不同,做不同的处理:
- 该server被选为leader
- 其他的server选为leader
- 一段时间后,没有server被选为leader
针对情况一,该server被选为leader,当前仅当在大多数的server投票给该server时。当其被选为主时,会立马发送心跳消息给其他的server,来表明其已经是leader,防止发生新的选举。
针对情况二,其他的server被选为leader,它会收到leader发送的心跳信息,此时,该server应该转为follower,然后退出选举。
针对情况三,一段时间后,没有server被选为leader,这种情况发生在没有server获得了大多数的server的投票情况下,此时,应该发起新一轮的选举。
leader状态下的心跳广播功能
当某个server被选为leader后,需要广播心跳信息,表明其是leader,主要在以下两个场景触发:
- server刚当选为leader
- server周期性的发送心跳消息,防止其他的server进入candidate选举状态
leader广播心跳的逻辑为,如果广播的心跳信息得到了大多数的server的确认,那么更新leader自身的选举超时时间,防止发生重新选举。
follower状态下的确认功能
主要包括对candidate发的选举RPC以及leader发来的心跳RPC的确认功能。
对于选举RPC,假设candidate c发送选举RPC到该follower,由于follower每个term只能选举一个server,因此,只有当一个follower没有选举其他server的时候,并且选举RPC中的candidate c的term大于或等于follower的term时,才会返回选举当前candidate c为主,否则,则返回拒绝选举当前candidate c为主。
对于leader的心跳RPC,如果leader的心跳的term大于或等于follower的term,则认可该leader的心跳,否则,不认可该leader的心跳。
代码实现
candidate状态下的选主功能
根据前面描述,主要的逻辑为
- 等待选举超时
- 增加term,置状态为follower,并且选举自己为leader
- 向其他的server并行地发送选举RPC,直到碰到上述描述的三种情况退出
func (rf *Raft) election_one_round() bool {// begin electionvar timeout int64var done intvar triggerHeartbeat booltimeout = 100last := milliseconds()success := falserf.mu.Lock()rf.becomeCandidate()rf.mu.Unlock()printTime()rpcTimeout := 20fmt.Printf("candidate=%d start electing leader\n", rf.me)for {for i := 0; i < len(rf.peers); i++ {if i != rf.me {var args RequestVoteArgsserver := iargs.Term = rf.currentTermargs.CandidateId = rf.mevar reply RequestVoteReplyprintTime()fmt.Printf("candidate=%d send request vote to server=%d\n", rf.me, i)go rf.sendRequestVoteAndTrigger(server, args, &reply, rpcTimeout)}}done = 0triggerHeartbeat = falsefor i := 0; i < len(rf.peers)-1; i++ {printTime()fmt.Printf("candidate=%d waiting for select for i=%d\n", rf.me, i)select {case ok := <-rf.electCh:if ok {done++success = done >= len(rf.peers)/2 || rf.currentLeader > -1success = success && rf.votedFor == rf.meif success && !triggerHeartbeat {triggerHeartbeat = truerf.mu.Lock()rf.becomeLeader()rf.mu.Unlock()rf.heartbeat <- trueprintTime()fmt.Printf("candidate=%d becomes leader\n", rf.currentLeader)}}}printTime()fmt.Printf("candidate=%d complete for select for i=%d\n", rf.me, i)}if (timeout+last < milliseconds()) || (done >= len(rf.peers)/2 || rf.currentLeader > -1) {break} else {select {case <-time.After(time.Duration(10) * time.Millisecond):}}}printTime()fmt.Printf("candidate=%d receive votes status=%t\n", rf.me, success)return success
}
首先等待选举超时,超时后,会进入真正的选举逻辑election_one_round()
首先,进入candidate状态,增加其term,然后,选举自己。
func (rf *Raft) becomeCandidate() { rf.state = 1 rf.setTerm(rf.currentTerm + 1)rf.votedFor = rf.merf.currentLeader = -1
}
接着,向除自己外的server发送选举RPC,等待server的回复
fmt.Printf("candidate=%d start electing leader\n", rf.me)for {for i := 0; i < len(rf.peers); i++ {if i != rf.me {var args RequestVoteArgsserver := iargs.Term = rf.currentTermargs.CandidateId = rf.mevar reply RequestVoteReplyprintTime()fmt.Printf("candidate=%d send request vote to server=%d\n", rf.me, i)go rf.sendRequestVoteAndTrigger(server, args, &reply, rpcTimeout)}}done = 0triggerHeartbeat = falsefor i := 0; i < len(rf.peers)-1; i++ {printTime()fmt.Printf("candidate=%d waiting for select for i=%d\n", rf.me, i)select {case ok := <-rf.electCh:if ok {done++success = done >= len(rf.peers)/2 || rf.currentLeader > -1success = success && rf.votedFor == rf.meif success && !triggerHeartbeat {triggerHeartbeat = truerf.mu.Lock()rf.becomeLeader()rf.mu.Unlock()rf.heartbeat <- trueprintTime()fmt.Printf("candidate=%d becomes leader\n", rf.currentLeader)}}}printTime()fmt.Printf("candidate=%d complete for select for i=%d\n", rf.me, i)}if (timeout+last < milliseconds()) || (done >= len(rf.peers)/2 || rf.currentLeader > -1) {break} else {select {case <-time.After(time.Duration(10) * time.Millisecond):}}}printTime()fmt.Printf("candidate=%d receive votes status=%t\n", rf.me, success)return success
当成功返回数目到多数派时(包含自己在内),则宣布自己称为leader,即becomeLeader()
,如下
func (rf *Raft) becomeLeader() {rf.state = 2rf.currentLeader = rf.me
}
即,修改自身状态为leader。然后,给发送心跳的线程发送 rf.heartbeat <-true
,通知心跳线程开始发心跳包。
leader状态下的广播心跳功能
首先,来看触发心跳的逻辑
func (rf *Raft) sendLeaderHeartBeat() {timeout := 20for { select {case <-rf.heartbeat:rf.sendAppendEntriesImpl()case <-time.After(time.Duration(timeout) * time.Millisecond):rf.sendAppendEntriesImpl()} }
}
分为两个方面:
第一个为刚当选为leader后,需要马上发送心跳信息,防止新的选举发生
第二个是leader周期性的发送心跳信息,来宣布自己为主
真正的广播心跳的逻辑如下:
func (rf *Raft) sendAppendEntriesImpl() {if rf.currentLeader == rf.me {var args AppendEntriesArgsvar success_count inttimeout := 20args.LeaderId = rf.meargs.Term = rf.currentTermprintTime()fmt.Printf("broadcast heartbeat start\n")for i := 0; i < len(rf.peers); i++ {if i != rf.me {var reply AppendEntriesReplyprintTime()fmt.Printf("Leader=%d send heartbeat to server=%d\n", rf.me, i)go rf.sendHeartBeat(i, args, &reply, timeout)}}for i := 0; i < len(rf.peers)-1; i++ {select {case ok := <-rf.heartbeatRe:if ok {success_count++if success_count >= len(rf.peers)/2 {rf.mu.Lock()rf.setMessageTime(milliseconds())rf.mu.Unlock()}}}}printTime()fmt.Printf("broadcast heartbeat end\n")if success_count < len(rf.peers)/2 {rf.mu.Lock()rf.currentLeader = -1rf.mu.Unlock()}}
}
先是向集群中所有的其他server广播心跳,分为两种结果:
收到了大多数server的确认,则更新leader的超时时间,防止重新进入选举状态
未收到大多数server的确认,则会退出发送心跳的逻辑,即置currentLeader = -1,此后,自然会有选举超时的server重新发起选举
follower状态下的确认功能
包括对选举RPC的确认已经对心跳RPC的确认。
选举RPC的确认逻辑如下
func (rf *Raft) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) {// Your code here.currentTerm, _ := rf.GetState()if args.Term < currentTerm {reply.Term = currentTermreply.VoteGranted = falseprintTime() fmt.Printf("candidate=%d term = %d smaller than server = %d, currentTerm = %d\n", args.CandidateId, args.Term, rf.me, rf.currentTerm)return } if rf.votedFor != -1 && args.Term <= rf.currentTerm {reply.VoteGranted = falserf.mu.Lock()rf.setTerm(max(args.Term, currentTerm))reply.Term = rf.currentTermrf.mu.Unlock()printTime() fmt.Printf("rejected candidate=%d term = %d server = %d, currentTerm = %d, has_voted_for = %d\n", args.CandidateId, args.Term, rf.me, rf.currentTerm, rf.votedFor)} else { rf.mu.Lock()rf.becomeFollower(max(args.Term, currentTerm), args.CandidateId)rf.mu.Unlock()reply.VoteGranted = truefmt.Printf("accepted server = %d voted_for candidate = %d\n", rf.me, args.CandidateId)}
}
如果当前server的term大于candidate的term,或者当前server已经选举过其他server为leader了,那么返回拒绝的RPC,否则,则返回成功的RPC,并置自身状态为follower。
心跳的RPC的逻辑如下
func (rf *Raft) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) {if args.Term < rf.currentTerm {reply.Success = falsereply.Term = rf.currentTerm} else { reply.Success = truereply.Term = rf.currentTermrf.mu.Lock() rf.currentLeader = args.LeaderIdrf.votedFor = args.LeaderIdrf.state = 0 rf.setMessageTime(milliseconds())printTime() fmt.Printf("server = %d learned that leader = %d\n", rf.me, rf.currentLeader)rf.mu.Unlock()}
}
如果follower的term大于leader的term,则返回拒绝的RPC,否则,返回成功的RPC。
用 Go 语言实现 Raft 选主相关推荐
- raft协议 MySQL 切换_Raft 协议实战系列(二)—— 选主
注:本文原创,转载请标明出处. 欢迎转发.关注微信公众号:Q的博客. 不定期发送干货,实践经验.系统总结.源码解读.技术原理. 本文目的 笔者期望通过系列文章帮助读者深入理解Raft协议并能付诸于工程 ...
- Raft算法实现 - Sofa-JRaft,选主,数据写入,日志复制
关于raft算法相关细节,可以全看之前的文章 分布式一致性算法,两阶段提交,三阶段提交,Paxos,Raft,zookeeper的选主过程,zab协议,顺序一致性,数据写入流程,节点状态,节点的角色 ...
- CAP原理,分布式一致性算法,两阶段提交,三阶段提交,Paxos,Raft,zookeeper的选主过程,zab协议,顺序一致性,数据写入流程,节点状态,节点的角色
我们知道,一般在分布式应用都有CAP准则: C Consistency, 一致性,分布式中的各个节点数据保持一致 A availability 可用性,任何时刻,分布式集群总是能够提供服务 P par ...
- Etcd集群的介绍和选主应用
女主宣言 ETCD作为开源.分布式.高可用.强一致性的key-value存储系统,提供了配置共享和服务发现等众多功能.目前已广泛应用在kubernetes.ROOK.CoreDNS.M3以及opens ...
- 分布式系统选主怎么玩
来自:架构之美 分布式系统为了保证其可靠性,一般都会多节点提供服务,各别节点的故障不会影响系统的可用性.对于分布式的存储系统来说,在保证可用性的同时,数据的可靠性(不丢失)也是其要解决的核心问题.目前 ...
- redis cluster集群选主
redis 选主过程分析 当slave发现自己的master变为FAIL状态时,便尝试进行Failover,以期成为新的master.由于挂掉的master可能会有多个slave.Failover的 ...
- 【Elasticsearch】留意Elasticsearch 7.x 可能无法选主的问题
1.概述 转载:留意Elasticsearch 7.x 可能无法选主的问题 Elasticsearch 7.x 选举算法改为基于 Raft 的实现,与标准 Raft 相比,最大的区别是允许选民可以投多 ...
- Redis哨兵模式原理剖析,监控、选主、通知客户端你懂了吗?
Redis 除了具有非常高的性能之外,还需要保证高可用,在故障发生时,尽可能地降低故障带来的影响,Redis提供了哨兵模式,来进行故障恢复. 哨兵主要负责做三件事: ①监控,监控主.从节点是否正常运行 ...
- dbproxy选主原理
一.Sqlproxy遵循raft算法 Raft是实现分布式共识的一种算法,主要用来管理日志复制的一致性. 二.Raft的三种状态(角色) Follower(群众) 被动接收Leader发送的请求.所有 ...
最新文章
- 26期20180601目录管理
- 新闻通稿 | 2021年世界互联网大会乌镇峰会网络法治分论坛圆满举行
- 标杆课程采访补充问题
- element手机验证格式_Excel数据验证:给数据把个关,工作效率有保障。
- 敏友的【敏捷个人】有感(3): 有感于“敏捷个人”讨论与练习
- TADOStoredProc返回多个数据集
- python秒转换成小时分钟秒_新闻联播66分钟,康辉口播22分38秒,零失误
- Z-Stack通过按键中断实现长按功能
- 用于 Windows8 的 Wijmo Charts 图表控件
- Windows下部署elasticsearch和kibana
- 【链表相加】程序员面试金典——2.5链式A+B
- CSDN自定义模块内容编写
- 八 关于电机驱动芯片L298N使用心得
- C++ 解析pcap文件
- 电路方案分析(十三)采用 CAN 的汽车分立式 SBC 预升压、后降压参考设计方案
- 韩天峰(Rango)推荐书目
- 数据可视化之大数据平台可视化
- Swoole 基础入门
- mysql索引匹配方式
- 「面试必背」Redis面试题(2022最新版)
热门文章
- 组织可以最大限度提高数据中心性能的五个步骤
- UPS 异常停机案例分析
- 成功解决Exception “unhandled ImportError“cannot import name ‘imread‘ from ‘scipy.misc‘
- Python:利用原生函数count或正则表达式compile、findall、finditer实现匹配统计(包括模糊匹配的贪婪匹配、懒惰匹配)
- 成功解决 利用plt.plot绘图时,横坐标出现浮点小数而不是整数的情况(坐标轴刻度)
- DL之RNN:人工智能为你写代码——基于TF利用RNN算法实现生成编程语言代码(C++语言)、训练测试过程全记录
- pycharm专业版-2017.3.3 安装+anaconda3-2019.03-windows
- 泛型--协变与逆变(转)
- 接口与继承动手动脑整理
- HDU2102 A计划