麻省理工6.824 分布式课程 Raft选主实现笔记
Raft解决什么问题
解决数据一致性的问题, 允许一组机器像一台整体一样工作, 只要写入成功, 那么就不用担心这一组机器中一定数量机器的故障导致数据丢失或者不一致的情况.因为这一组机器上面存储着完全一致的用户数据,为了叙述方便, 把这一组机器叫做一个Raft集群.一个Raft集群最少需要3台机器.
但是瓶颈也是显而易见的, 这一组机器的吞吐, 比这一组机器中任意一台机器的吞吐都还要低, 这是因为请求进入到集群中需要进行分发,确认等协调工作. 所以在大型系统中, 我们需要把用户数据按照用户UID进行切片, 调整到适合一个Raft集群存储的大小. 如果有需要Raft集群之间的协调, 事务工作, 需要解耦, 把事务包装成事务消息, 再通过消息中间件的方式完成.
Raft简介
Raft集群中角色有三种, 分为Leader, Candidate,Follower. 其中Candidate是一个中间状态, 只发生在发生领导人选举的过程中.
对于写请求来说
- Leader负责接受用户的请求(如 set key = value),给请求包装成一条日志, 并且分配一个单调递增编号Index, 并且分发这个请求日志到集群中所有其他机器上, 等到集群中的大多数机器确认收到了这个请求后,Raft就会调整自己的commitIndex(代表可以提交到状态机中的序号), 异步向本地状态机内提交这个请求(执行set key = value),并且向集群中其他机器发送commitIndex, 要求其他机器提交这个请求.
- 需要注意的是, 这里并不是一个完整的两阶段提交.
对于读请求来说
- Leader在收到读请求后, 记录下此时的commitIndex, 并且向集群中其他机器发送心跳广播, 如果确认多数派相应成功, 证明自己还是Leader, 再确定已经执行的Index(applyIndex) >= commitIndex, 然后在从状态机中取出结果返回给客户端.
- 如果是Follower收到了读请求, 需要向Leader请求当前的commitIndex(Leader收到请求后, 也会和上述行为一样,发送心跳确定自己的Leader地位), 并且保证Follower上的applyIndex > commitIndex.再返回读请求.
- 这仅仅是ReadIndex的一种读实现, 你可以看到, 步骤很繁琐, 但是是一致性最高的读实现. 另外还有基于时间的LeaseRead实现, 效率更高, 但是依赖时间.
如果集群中的一台机器在一段时间内没有收到心跳, 就会把自己的Term加一, 并且向其他机器发送选举请求.所以任期的增加来自于选主.如果没有发生选主, 那么集群内并不会发生任期升级.
Raft协议能够保证在一个Term内, 只有一个Leader, 手段就是.
- 成为Leader需要经过集群中大多数机器赞成
- 任何一台机器, 在一个Term内, 只能给一台机器投票
Raft实现
首先我们来考虑如何实现选主.我们先以时间序列来看,选主的过程中发生了什么.
- 在新集群启动的时候, 所有机器A, B, C的默认状态都是Follower, 所有机器地址endpoint作为初始化参数传入进程.
- 如果收到心跳, 就开始作为Follower搬砖,选主结束, 如果在经过一段随机(大于心跳数倍)时间后, 开始发起Election.随机的目的是为了保证不要同时发起Election,当然, 并不能保证一定不会同时Election, 但是绝大多数情况下不会同时发生.
- 集群初始化时是没有Leader, 所以没有机器收到心跳,机器A开始Election, 首先, 机器A把自己的身份置为Candidate, 把Term自增一位, 并且把当前Term的票投给自己.
- 随后, 机器A会向其他机器发起vote请求, 要求投票给自己.
- 对于接受到这个vote请求的机器来说, 如果机器发现Candidate的Term小于自己, 就否决,并且返回自己的Term
- 如果机器发现Candidate的Term大于自己, 就更新term, 并且重置votedFor
- 如果机器发现在这个term已经给别人vote过了, 就否决
- 如果Candidate的日志没有自己新, 就否决(这是后面实现的逻辑, 本次暂不实现)
- 赞成
复制代码
机器A在发送vote请求后, 如果发现集群中的大多数赞成, 就跳到步骤7, 成为Leader
如果发现返回的Term大于自己, 就放弃本次Election, 更新自己的term到返回的term, 重置自己状态为Follower,回到步骤3
如果机器A在随机一段时间后(数倍心跳时间), 没有收到大多数赞成,就回到步骤3
机器A选主成功, 调整自身状态为Leader, 并且立即发送一次心跳(避免其他机器超时进入选主状态)
周期性发送心跳, 保证Leader地位
代码实现
首选来定义一下一个Raft Peer
type Raft struct {mu sync.RWMutex // Lock to protect shared access to this peer's statepeers []*labrpc.ClientEnd // RPC end points of all peerspersister *Persister // Object to hold this peer's persisted stateme int // this peer's index into peers[]nextIndex []int // matchIndex []inttimeoutDuration time.Duration // 基础超时时间currentTerm int // 当前任期votedFor int // 投给谁了logEntries []RLog // log entrisrole Role // Leader Candidate Followerleader int // 谁是LeadercommitIndex int lastAppliedIndex intheartbeatTime time.Duration // 心跳周期recvPeerMsgCh chan interface{}// 收到集群内请求的chan, 和收到外部请求的chan区分开}
复制代码
说实话, 第一次看到这些互相依赖的超时, 并行处理, 还是有点懵逼的..
不过仔细想了下, 需要把这些行为抽象成几个线程.
第一个线程, 主要负责成为Leader之后,调整自身身份,并且周期性发送心跳
func (rf *Raft) stepLeader() {rf.mu.Lock()rf.leader = rf.merf.role = Leaderrf.logWithoutLockSprint("now i am the leader")rf.mu.Unlock()// 立即发送心跳go rf.broadcast()for {_, isLeader := rf.GetState()if isLeader {select {case <-time.Tick(rf.heartbeatTime):// broadcast heartbeatgo rf.broadcast()case logEntry := <-rf.reqCh:switch e := logEntry.(type) {case RLog:rf.mu.Lock()rf.reqBufNum --rf.logEntries = append(rf.logEntries, e)rf.mu.Unlock()go rf.broadcast()}}} }
}
复制代码
第二个线程, 来负责处理集群间的RPC, 统计超时情况, 以及选主.
这里看到, 在选主的时候, 是不会接受其他的投票以及追加日志以及心跳请求的.这是我的实现, 这样处理更清晰.
即便造成了VoteRequest以及AppendEntry的超时也没关系.VoteRequest超时是可以接受的,因为即使Term不对劲的话, 会让Candidate通过投票返回的结果放弃选主.
因为Candidate本身就不会给其他人投票.AppendEntry超时就更没关系了, 不应该让AppendEntry打断选主的流程
func (rf *Raft) loop() {for {select {case msg := <-rf.recvPeerMsgCh:switch m := msg.(type) {case VoteRequest:rf.VoteRequestHandle(m)case AppendEntriesRequest:rf.AppendEntriesHandle(m)}case <-time.After(rf.jitterTimeoutDuration()):role, _ := rf.GetRoleAndTerm()if role == Follower {success := rf.raiseElection()if success {go rf.stepLeader()}}}}
}
复制代码
主要的线程就是上面的两个, 第一个线程也是第二个线程派生出来的, 但是可能会同时运行.
下面看看发起选举的实现,尽可能对变量加锁, 并且避免锁中的外部调用. 可以的话, 用原子操作代替锁的使用
func (rf *Raft) raiseElection() bool {rf.logWithRLock("Start to raise election")rf.mu.Lock()rf.role = Candidaterf.votedFor = rf.merf.currentTerm += 1rf.logWithoutLock(fmt.Sprintf("Term upgrade to %d due to raise election",rf.currentTerm))rf.mu.Unlock()_, currentTerm := rf.GetRoleAndTerm()lastLogIndex := rf.getLastLogIndex()lastLogTerm := rf.getLastLogTerm()peerNum := rf.getPeersNum()// 自带Candidate一张选票winThreshold := int64(peerNum/2 + 1)counter := int64(1)// 只有写没有读的ch会被GC, 所以不用担心resultCh := make(chan bool)arg := &RequestVoteArgs{Term: currentTerm,CandidateId: rf.me,LastLogIndex: lastLogIndex,LastLogTerm: lastLogTerm,}wg := sync.WaitGroup{}wg.Add(peerNum - 1)go func() {for i := 0; i < len(rf.peers); i++ {i := iif i == rf.me {continue}go func() {defer wg.Done()reply := &RequestVoteReply{}ok := rf.sendRequestVote(i, arg, reply)if ok {if reply.VoteGranted {atomic.AddInt64(&counter, 1)if atomic.LoadInt64(&counter) >= winThreshold {resultCh <- true}} else {if reply.Term > currentTerm {// 这轮已经输了rf.mu.Lock()rf.upgradeTermWithoutLock(reply.Term)rf.mu.Unlock()resultCh <- false}}}}()}wg.Wait()if atomic.LoadInt64(&counter) < winThreshold {resultCh <- false}}()// 实现选举超时select {case result := <-resultCh:return resultcase <-time.After(rf.jitterTimeoutDuration()):rf.mu.Lock()rf.role = Followerrf.logWithoutLockSprint("Lost this term election because of timeout")rf.mu.Unlock()return false}
}
复制代码
同步转异步
外部RPC调用都是同步调用, 而我们在mainLoop中, 希望异步的来处理这个流程, 所以这个地方做了一点小小的改造.
在处理请求时,把请求带上一个done chan, 包装成一个wrap request, 然后把wrap request发送给mainLoop, 最后监听done chan是否返回.
在真正进行请求处理是, 通过defer函数来向done chan发送结束消息
另外一点是, 由于处理请求完全都是内部变量, 状态的操作, 并且和其他线程抢占的时间很少(因为我们都是监听chan 顺序处理请求, 并且不会和发起选举这类请求同时处理), 我可以直接进入函数就Lock上锁, 最后defer Unlock.
func (rf *Raft) sendAppendEntries(server int, entries *AppendEntries, reply *AppendEntriesReply) bool {ok := rf.peers[server].Call("Raft.AppendEntries", entries, reply)return ok
}// append entries
func (rf *Raft) AppendEntries(entries *AppendEntries, reply *AppendEntriesReply) {done := make(chan struct{})entriesReq := AppendEntriesRequest{entries, reply, done}rf.recvPeerMsgCh <- entriesReq<-done
}// append entry handler
func (rf *Raft) AppendEntriesHandle(request AppendEntriesRequest) {entries := request.entriesreply := request.replydefer func() {rf.mu.Unlock()request.done <- struct{}{}close(request.done)}()rf.mu.Lock()//...
}
复制代码
最后, 执行一次go -test run 2A就ok了
➜ raft git:(master) go test -run 2A
复制代码
记得多执行几次, 因为这种多线程的bug不是必然复现的, 提高执行次数有助于更早的发现bug
转载于:https://juejin.im/post/5b56f135f265da0f8c02bd07
麻省理工6.824 分布式课程 Raft选主实现笔记相关推荐
- Zookeeper,etcd,consul内部机制和分布式锁和选主实现的比较
我的另外3篇文章分别介绍了Zookeeper,etcd,consul是如何实现分布式锁和选主的.本文想比较一下Zookeeper.etcd.consul内部机制有哪些不同,他们实现锁和选主的方式相同和 ...
- 用 Go 语言实现 Raft 选主
用 Go 语言实现 Raft 选主 选主模块主要包括三大功能: candidate状态下的选主功能 leader状态下的心跳广播功能 follower状态下的确认功能 candidate状态下的选主功 ...
- 如何高效学习?一年学完麻省理工4年计算机课程
斯科特.杨用用10天拿下线性代数,用1年时间学完麻省理工大学4年的计算机课程,他是如何做到的?他在这本书<如何高效学习>中做了具体阐述. 斯科特.杨很早就发现,在美国有一半的学生在死记硬背 ...
- 麻省理工教授良心总结,Python的学习方法 学习笔记教程都在这里 ,一学就会
现在学习Python的人越来越多,但是能学成并且挣钱到的人并不多,往往从入门到放弃,但我从一个什么都不会的小白成功转变成行业大佬,想必大家对我的学习方法是很感兴趣的吧! 本文纯手工码字!我梳理了一上午 ...
- raft协议 MySQL 切换_Raft 协议实战系列(二)—— 选主
注:本文原创,转载请标明出处. 欢迎转发.关注微信公众号:Q的博客. 不定期发送干货,实践经验.系统总结.源码解读.技术原理. 本文目的 笔者期望通过系列文章帮助读者深入理解Raft协议并能付诸于工程 ...
- 麻省理工免费在线课程首次可以获取学位
(风景美如画,请转到原文看图) 马萨诸塞州,剑桥.(美联社)-麻省理工在过去四年里提供的免费在线课程有一个重大的不足:它们不能用于获取学位.现在是时候改变了. 在周三将要宣布的一个试点项目中,学生们将 ...
- CAP原理,分布式一致性算法,两阶段提交,三阶段提交,Paxos,Raft,zookeeper的选主过程,zab协议,顺序一致性,数据写入流程,节点状态,节点的角色
我们知道,一般在分布式应用都有CAP准则: C Consistency, 一致性,分布式中的各个节点数据保持一致 A availability 可用性,任何时刻,分布式集群总是能够提供服务 P par ...
- 在麻省理工读计算机专业,看美国的计算机教育(转载)
在麻省理工读计算机专业,看美国的计算机教育 2010-02-06 12:21 | 22228 次阅读 | [已有179 条评论]发表评论 关键词:新闻资讯 Google | 感谢jiangt ...
- 【全军覆没】麻省理工把中国学生拉入黑名单,斯坦福取消中国大陆面试! 这是怎么了?...
来源 | 列文虎克网 日前,美国顶尖名校麻省理工学院(MIT)公布了提前录取榜单,虽然校方并未公布新生的国籍背景,但细心的媒体在研究名单后却发现了一个"令人震惊"的事实: 在今年E ...
- 麻省理工计算机科学录取条件,美国麻省理工学院录取条件
麻省理工学院录取条件是什么?谈论起美国的大学,很多人都可以讲出好几所,其中就包括麻省理工学院.作为世界上顶尖的理工类学校,麻省理工学院在世界上享有盛名.也因此,很多学生想要报考麻省理工学院.那么,下面 ...
最新文章
- ui设计师要养成哪些职场习惯呢?
- 使用迭代查找一个list中最小和最大值,并返回一个tuple。
- 阿里开发者们的第18个感悟:每次困难出现时,就是成长的机会出现了
- flink 写入到es_《从0到1学习Flink》—— Flink 写入数据到 Kafka
- sqlalchemy 基操,勿6
- spring异常 java.lang.ClassNotFoundException: org.springframework.web.context.ContextLoaderServlet
- 无向图:计算亏格(环的孔洞)
- 终端I/O之行控制函数
- 选防晒霜 要看四个要点 - 健康程序员,至尚生活!
- 仿淘宝分页按钮效果简单美观易使用的JS分页控件
- laravel5.5表单验证
- 看看ConcurrentLinkedQueue源码 in Java 9
- 软件测试订单测试用例,测试用例 - 进销存软件测试.doc
- 斑马打印机链接数据库实现自动打印
- 收藏级干货——Auto CAD历史版本功能大盘点(上)
- 关于美图秀秀的flash在线版
- 为什么要ROS2而不是对ROS1修修补补?
- css 模拟手机充电水滴效果
- BUUCTF Cipher writeup
- 对于模糊人脸图片和原图的清晰度评估——sobel算子
热门文章
- Atitit 分布式之道 之常见的分布式技术 1. 第十二章基于对象的分布式系统	1 1.1. Corba dcom	2 2. 第11章 分布式文件系统 -	2 2.1. 常见的分布式文件系统有,G
- atitit 数据库mysq启动不起来解决方案.docx
- 新特性AAtitti css3 新特性attilax总结titti css
- paip.提升用户体验-----c++ 宏的使用...替换从在的地张儿复制过来的代码.
- paip.提升安全性------登录地区变换后进行验证
- 稀缺的“稳定”--业绩导向的基金筛选困局
- 企业GDPR安全隐私合规性指南
- (转)2017 年最流行的 15 个数据科学 Python 库
- 【语音隐写】基于matlab GUI LSB语音信号数字水印嵌入提取(带面板)【含Matlab源码 1676期】
- 【TWVRP】基于matlab遗传算法求解带时间窗的外卖配送车辆路径规划问题【含Matlab源码 1416期】