用 Go 语言实现 Raft 选主

选主模块主要包括三大功能:

  • candidate状态下的选主功能
  • leader状态下的心跳广播功能
  • follower状态下的确认功能

candidate状态下的选主功能

candidate状态下的选主功能需要关注两个方面:

  • 何时进入candidate状态,进行选主?
  • 选主的逻辑是怎样的?

首先,来讨论何时进入candidate状态,进行选主

在一定时间内没有收到来自leader或者其他candidate的有效RPC时,将会触发选主。这里需要关注的是有效两个字,要么是leader发的有效的心跳信息,要么是candidate发的是有效的选主信息,即server本身确认这些信息是有效的后,才会重新更新超时时间,超时时间根据raft论文中推荐设置为[150ms,300ms],并且每次是随机生成的值。

其次,来讨论选主的逻辑。

server首先会进行选主的初始化操作,即server会增加其term,把状态改成candidate,然后选举自己为主,并把选主的RPC并行地发送给集群中其他的server,根据返回的RPC的情况的不同,做不同的处理:

  • 该server被选为leader
  • 其他的server选为leader
  • 一段时间后,没有server被选为leader

针对情况一,该server被选为leader,当前仅当在大多数的server投票给该server时。当其被选为主时,会立马发送心跳消息给其他的server,来表明其已经是leader,防止发生新的选举。

针对情况二,其他的server被选为leader,它会收到leader发送的心跳信息,此时,该server应该转为follower,然后退出选举。

针对情况三,一段时间后,没有server被选为leader,这种情况发生在没有server获得了大多数的server的投票情况下,此时,应该发起新一轮的选举。

leader状态下的心跳广播功能

当某个server被选为leader后,需要广播心跳信息,表明其是leader,主要在以下两个场景触发:

  • server刚当选为leader
  • server周期性的发送心跳消息,防止其他的server进入candidate选举状态

leader广播心跳的逻辑为,如果广播的心跳信息得到了大多数的server的确认,那么更新leader自身的选举超时时间,防止发生重新选举。

follower状态下的确认功能

主要包括对candidate发的选举RPC以及leader发来的心跳RPC的确认功能。

对于选举RPC,假设candidate c发送选举RPC到该follower,由于follower每个term只能选举一个server,因此,只有当一个follower没有选举其他server的时候,并且选举RPC中的candidate c的term大于或等于follower的term时,才会返回选举当前candidate c为主,否则,则返回拒绝选举当前candidate c为主。

对于leader的心跳RPC,如果leader的心跳的term大于或等于follower的term,则认可该leader的心跳,否则,不认可该leader的心跳。

代码实现

candidate状态下的选主功能

根据前面描述,主要的逻辑为

  • 等待选举超时
  • 增加term,置状态为follower,并且选举自己为leader
  • 向其他的server并行地发送选举RPC,直到碰到上述描述的三种情况退出
func (rf *Raft) election_one_round() bool {// begin electionvar timeout int64var done intvar triggerHeartbeat booltimeout = 100last := milliseconds()success := falserf.mu.Lock()rf.becomeCandidate()rf.mu.Unlock()printTime()rpcTimeout := 20fmt.Printf("candidate=%d start electing leader\n", rf.me)for {for i := 0; i < len(rf.peers); i++ {if i != rf.me {var args RequestVoteArgsserver := iargs.Term = rf.currentTermargs.CandidateId = rf.mevar reply RequestVoteReplyprintTime()fmt.Printf("candidate=%d send request vote to server=%d\n", rf.me, i)go rf.sendRequestVoteAndTrigger(server, args, &reply, rpcTimeout)}}done = 0triggerHeartbeat = falsefor i := 0; i < len(rf.peers)-1; i++ {printTime()fmt.Printf("candidate=%d waiting for select for i=%d\n", rf.me, i)select {case ok := <-rf.electCh:if ok {done++success = done >= len(rf.peers)/2 || rf.currentLeader > -1success = success && rf.votedFor == rf.meif success && !triggerHeartbeat {triggerHeartbeat = truerf.mu.Lock()rf.becomeLeader()rf.mu.Unlock()rf.heartbeat <- trueprintTime()fmt.Printf("candidate=%d becomes leader\n", rf.currentLeader)}}}printTime()fmt.Printf("candidate=%d complete for select for i=%d\n", rf.me, i)}if (timeout+last < milliseconds()) || (done >= len(rf.peers)/2 || rf.currentLeader > -1) {break} else {select {case <-time.After(time.Duration(10) * time.Millisecond):}}}printTime()fmt.Printf("candidate=%d receive votes status=%t\n", rf.me, success)return success
}

首先等待选举超时,超时后,会进入真正的选举逻辑election_one_round()

首先,进入candidate状态,增加其term,然后,选举自己。

func (rf *Raft) becomeCandidate() {                                                                                                                                                     rf.state = 1   rf.setTerm(rf.currentTerm + 1)rf.votedFor = rf.merf.currentLeader = -1
}

接着,向除自己外的server发送选举RPC,等待server的回复

fmt.Printf("candidate=%d start electing leader\n", rf.me)for {for i := 0; i < len(rf.peers); i++ {if i != rf.me {var args RequestVoteArgsserver := iargs.Term = rf.currentTermargs.CandidateId = rf.mevar reply RequestVoteReplyprintTime()fmt.Printf("candidate=%d send request vote to server=%d\n", rf.me, i)go rf.sendRequestVoteAndTrigger(server, args, &reply, rpcTimeout)}}done = 0triggerHeartbeat = falsefor i := 0; i < len(rf.peers)-1; i++ {printTime()fmt.Printf("candidate=%d waiting for select for i=%d\n", rf.me, i)select {case ok := <-rf.electCh:if ok {done++success = done >= len(rf.peers)/2 || rf.currentLeader > -1success = success && rf.votedFor == rf.meif success && !triggerHeartbeat {triggerHeartbeat = truerf.mu.Lock()rf.becomeLeader()rf.mu.Unlock()rf.heartbeat <- trueprintTime()fmt.Printf("candidate=%d becomes leader\n", rf.currentLeader)}}}printTime()fmt.Printf("candidate=%d complete for select for i=%d\n", rf.me, i)}if (timeout+last < milliseconds()) || (done >= len(rf.peers)/2 || rf.currentLeader > -1) {break} else {select {case <-time.After(time.Duration(10) * time.Millisecond):}}}printTime()fmt.Printf("candidate=%d receive votes status=%t\n", rf.me, success)return success

当成功返回数目到多数派时(包含自己在内),则宣布自己称为leader,即becomeLeader(),如下

func (rf *Raft) becomeLeader() {rf.state = 2rf.currentLeader = rf.me
}

即,修改自身状态为leader。然后,给发送心跳的线程发送 rf.heartbeat <-true,通知心跳线程开始发心跳包。

leader状态下的广播心跳功能

首先,来看触发心跳的逻辑

func (rf *Raft) sendLeaderHeartBeat() {timeout := 20for {   select {case <-rf.heartbeat:rf.sendAppendEntriesImpl()case <-time.After(time.Duration(timeout) * time.Millisecond):rf.sendAppendEntriesImpl()}     }
}

分为两个方面:

  • 第一个为刚当选为leader后,需要马上发送心跳信息,防止新的选举发生

  • 第二个是leader周期性的发送心跳信息,来宣布自己为主

真正的广播心跳的逻辑如下:

func (rf *Raft) sendAppendEntriesImpl() {if rf.currentLeader == rf.me {var args AppendEntriesArgsvar success_count inttimeout := 20args.LeaderId = rf.meargs.Term = rf.currentTermprintTime()fmt.Printf("broadcast heartbeat start\n")for i := 0; i < len(rf.peers); i++ {if i != rf.me {var reply AppendEntriesReplyprintTime()fmt.Printf("Leader=%d send heartbeat to server=%d\n", rf.me, i)go rf.sendHeartBeat(i, args, &reply, timeout)}}for i := 0; i < len(rf.peers)-1; i++ {select {case ok := <-rf.heartbeatRe:if ok {success_count++if success_count >= len(rf.peers)/2 {rf.mu.Lock()rf.setMessageTime(milliseconds())rf.mu.Unlock()}}}}printTime()fmt.Printf("broadcast heartbeat end\n")if success_count < len(rf.peers)/2 {rf.mu.Lock()rf.currentLeader = -1rf.mu.Unlock()}}
}

先是向集群中所有的其他server广播心跳,分为两种结果:

  • 收到了大多数server的确认,则更新leader的超时时间,防止重新进入选举状态

  • 未收到大多数server的确认,则会退出发送心跳的逻辑,即置currentLeader = -1,此后,自然会有选举超时的server重新发起选举

follower状态下的确认功能

包括对选举RPC的确认已经对心跳RPC的确认。

选举RPC的确认逻辑如下

func (rf *Raft) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) {// Your code here.currentTerm, _ := rf.GetState()if args.Term < currentTerm {reply.Term = currentTermreply.VoteGranted = falseprintTime() fmt.Printf("candidate=%d term = %d smaller than server = %d, currentTerm = %d\n", args.CandidateId, args.Term, rf.me, rf.currentTerm)return      }             if rf.votedFor != -1 && args.Term <= rf.currentTerm {reply.VoteGranted = falserf.mu.Lock()rf.setTerm(max(args.Term, currentTerm))reply.Term = rf.currentTermrf.mu.Unlock()printTime() fmt.Printf("rejected candidate=%d term = %d server = %d, currentTerm = %d, has_voted_for = %d\n", args.CandidateId, args.Term, rf.me, rf.currentTerm, rf.votedFor)} else {      rf.mu.Lock()rf.becomeFollower(max(args.Term, currentTerm), args.CandidateId)rf.mu.Unlock()reply.VoteGranted = truefmt.Printf("accepted server = %d voted_for candidate = %d\n", rf.me, args.CandidateId)}
}

如果当前server的term大于candidate的term,或者当前server已经选举过其他server为leader了,那么返回拒绝的RPC,否则,则返回成功的RPC,并置自身状态为follower。

心跳的RPC的逻辑如下

func (rf *Raft) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) {if args.Term < rf.currentTerm {reply.Success = falsereply.Term = rf.currentTerm} else {       reply.Success = truereply.Term = rf.currentTermrf.mu.Lock() rf.currentLeader = args.LeaderIdrf.votedFor = args.LeaderIdrf.state = 0 rf.setMessageTime(milliseconds())printTime()  fmt.Printf("server = %d learned that leader = %d\n", rf.me, rf.currentLeader)rf.mu.Unlock()}
}

如果follower的term大于leader的term,则返回拒绝的RPC,否则,返回成功的RPC。

用 Go 语言实现 Raft 选主相关推荐

  1. raft协议 MySQL 切换_Raft 协议实战系列(二)—— 选主

    注:本文原创,转载请标明出处. 欢迎转发.关注微信公众号:Q的博客. 不定期发送干货,实践经验.系统总结.源码解读.技术原理. 本文目的 笔者期望通过系列文章帮助读者深入理解Raft协议并能付诸于工程 ...

  2. Raft算法实现 - Sofa-JRaft,选主,数据写入,日志复制

    关于raft算法相关细节,可以全看之前的文章 分布式一致性算法,两阶段提交,三阶段提交,Paxos,Raft,zookeeper的选主过程,zab协议,顺序一致性,数据写入流程,节点状态,节点的角色 ...

  3. CAP原理,分布式一致性算法,两阶段提交,三阶段提交,Paxos,Raft,zookeeper的选主过程,zab协议,顺序一致性,数据写入流程,节点状态,节点的角色

    我们知道,一般在分布式应用都有CAP准则: C Consistency, 一致性,分布式中的各个节点数据保持一致 A availability 可用性,任何时刻,分布式集群总是能够提供服务 P par ...

  4. Etcd集群的介绍和选主应用

    女主宣言 ETCD作为开源.分布式.高可用.强一致性的key-value存储系统,提供了配置共享和服务发现等众多功能.目前已广泛应用在kubernetes.ROOK.CoreDNS.M3以及opens ...

  5. 分布式系统选主怎么玩

    来自:架构之美 分布式系统为了保证其可靠性,一般都会多节点提供服务,各别节点的故障不会影响系统的可用性.对于分布式的存储系统来说,在保证可用性的同时,数据的可靠性(不丢失)也是其要解决的核心问题.目前 ...

  6. redis cluster集群选主

    redis 选主过程分析  当slave发现自己的master变为FAIL状态时,便尝试进行Failover,以期成为新的master.由于挂掉的master可能会有多个slave.Failover的 ...

  7. 【Elasticsearch】留意Elasticsearch 7.x 可能无法选主的问题

    1.概述 转载:留意Elasticsearch 7.x 可能无法选主的问题 Elasticsearch 7.x 选举算法改为基于 Raft 的实现,与标准 Raft 相比,最大的区别是允许选民可以投多 ...

  8. Redis哨兵模式原理剖析,监控、选主、通知客户端你懂了吗?

    Redis 除了具有非常高的性能之外,还需要保证高可用,在故障发生时,尽可能地降低故障带来的影响,Redis提供了哨兵模式,来进行故障恢复. 哨兵主要负责做三件事: ①监控,监控主.从节点是否正常运行 ...

  9. dbproxy选主原理

    一.Sqlproxy遵循raft算法 Raft是实现分布式共识的一种算法,主要用来管理日志复制的一致性. 二.Raft的三种状态(角色) Follower(群众) 被动接收Leader发送的请求.所有 ...

最新文章

  1. 26期20180601目录管理
  2. 新闻通稿 | 2021年世界互联网大会乌镇峰会网络法治分论坛圆满举行
  3. 标杆课程采访补充问题
  4. element手机验证格式_Excel数据验证:给数据把个关,工作效率有保障。
  5. 敏友的【敏捷个人】有感(3): 有感于“敏捷个人”讨论与练习
  6. TADOStoredProc返回多个数据集
  7. python秒转换成小时分钟秒_新闻联播66分钟,康辉口播22分38秒,零失误
  8. Z-Stack通过按键中断实现长按功能
  9. 用于 Windows8 的 Wijmo Charts 图表控件
  10. Windows下部署elasticsearch和kibana
  11. 【链表相加】程序员面试金典——2.5链式A+B
  12. CSDN自定义模块内容编写
  13. 八 关于电机驱动芯片L298N使用心得
  14. C++ 解析pcap文件
  15. 电路方案分析(十三)采用 CAN 的汽车分立式 SBC 预升压、后降压参考设计方案
  16. 韩天峰(Rango)推荐书目
  17. 数据可视化之大数据平台可视化
  18. Swoole 基础入门
  19. mysql索引匹配方式
  20. 「面试必背」Redis面试题(2022最新版)

热门文章

  1. 组织可以最大限度提高数据中心性能的五个步骤
  2. UPS 异常停机案例分析
  3. 成功解决Exception “unhandled ImportError“cannot import name ‘imread‘ from ‘scipy.misc‘
  4. Python:利用原生函数count或正则表达式compile、findall、finditer实现匹配统计(包括模糊匹配的贪婪匹配、懒惰匹配)
  5. 成功解决 利用plt.plot绘图时,横坐标出现浮点小数而不是整数的情况(坐标轴刻度)
  6. DL之RNN:人工智能为你写代码——基于TF利用RNN算法实现生成编程语言代码(C++语言)、训练测试过程全记录
  7. pycharm专业版-2017.3.3 安装+anaconda3-2019.03-windows
  8. 泛型--协变与逆变(转)
  9. 接口与继承动手动脑整理
  10. HDU2102 A计划