MIT6.824(lab2A-领导人选举)
枯木逢春不在茂
年少且惜镜边人
写在前面
说好的今天写2A,就得写2A,这种回忆式的写总结是非常痛苦的,这个过程中有大量的遗忘,写的过程就是弥补的过程。今天早上的计组考试直接GG,因为没复习的缘故,前面的大部分知识直接g了。奉劝各位考研小伙伴还是得把专业知识(408)的东西搞好,不要像我一样,一事无成。
实现过程
在写LAB2之前,查阅了好多资料,每次写不出来,都让我怀疑自己是不是能干计算机这个行业,LAB2写了两遍,第一遍勉强过来些测试点,但是多次运行后直接挂掉,找错直接给我整的人都傻了,一度怀疑人生,所以选择重开,当然写的过程中也借鉴了一些大神的思路,导致我不用走那么多的弯路。文章最后会放上借鉴的大神的博客,各位大佬请直接跳过弟弟的解析,去看大神的。
个人认为在lab2的难度 2B>2A>2C
在写lab2之前需要学习什么,raft的论文得看raft论文中午翻译,英文版也得看,这个到处都能找到,还有MIT的学生指导手册,和官方的一些资料。B站上的视频也得看看。
好了废话不多说,因为3A和3B会修改raft代码,还好机智的我,给代码做了“snapshot”。
2A 主要的任务就是领导人选举,本人对lab的描述可能不能说是全面,所以借鉴某乎大佬的话(cola)
简单说下领导选举,raft使用一种心跳机制来触发领导选举,当服务器程序启动的时候,它们的身份都是follower,而follower是被动的接受信息,不会主动的发送信息。leader节点只需要周期性的向所有follower发送心跳包(即不包含日志项内容的附加日志项 RPCs)来维护自己当前的leader状态。
但是如果一个follow节点没有接收到心跳包,也就是选举超时,这个时候它认为没有leader节点,于是从follow变成了candidate节点,follow节点首先给当前的term自增一下,然后就转换为candidate节点,之后并行的向集群中其他的服务器节点发起投票请求,当它获取到大部分follow节点的选票时,就会变成leader,随后立即向其他节点发送心跳包来保持自己的leader状态。大致过程是这样,但是也会有一些其他情况比如:
第一种,candidate赢得选举,成为leader节点,这个没什么好说的,很顺利。
第二种,其他的服务器成为领导者。
第三种,一段时间之后没有任何一个获胜的人。在做的过程也想过几个问题:如果有多个follow节点都没有收到心跳包,那是不是都要变成candidate节点,然后发起选举?或者说都是follow节点,但是只有一个变成candidate节点,那那个可以成为candidate节点?如果某个follow节点没有收到心跳包,然后它发起选举,最后变成leader节点,那原先的leader节点怎么办?
一个raft的动画模拟–good
type Raft struct {mu sync.Mutex // Lock to protect shared access to this peer's statepeers []*labrpc.ClientEnd // RPC end points of all peerspersister *Persister // Object to hold this peer's persisted stateme int // this peer's index into peers[]dead int32 // set by Kill()// Your data here (2A, 2B, 2C).// Look at the paper's Figure 2 for a description of what// state a Raft server must maintain.// 所有服务器,持久化状态currentTerm int //最大任期votedFor int // 记录在currentTerm任期投票给谁了log []LogEntrylastIncludedIndex int // snapshot最后1个logEntry的index,没有snapshot则为0lastIncludedTerm int // snapthost最后1个logEntry的term,没有snaphost则无意义// 所有服务器,易失状态commitIndex int //已知的最大已提交索引lastApplied int // 当前应用到状态机的索引// 仅Leader,易失状态(成为leader时重置)nextIndex []int // 每个follower的log同步起点索引(初始为leader log的最后一项)matchIndex []int // 每个follower的log同步进度(初始为0),和nextIndex强关联// 所有服务器,选举相关状态role stringleaderId intlastActiveTime time.Time//上次活跃时间(刷新时机: 1.收到leader心跳// 2.给其他candidates投票 3.请求其他节点投票)lastBroadcastTime time.Time//作为leader,上次的广播时间applyCh chan ApplyMsg // 应用层的提交队列
}
raft的结构体,raft结构体的每一项都可以在论文中的figure2中找到。
type LogEntry struct {Command interface{}Term int
}// 当前角色
const ROLE_LEADER = "Leader"
const ROLE_FOLLOWER = "Follower"
const ROLE_CANDIDATES = "Candidates"
这里是日志结构体,和角色设定
//
//
// example RequestVote RPC arguments structure.
// field names must start with capital letters!
//
type RequestVoteArgs struct {// Your data here (2A, 2B).Term intCandidateId intLastLogIndex intLastLogTerm int
}//
// example RequestVote RPC reply structure.
// field names must start with capital letters!
//
type RequestVoteReply struct {// Your data here (2A).Term intVoteGranted bool
}type AppendEntriesArgs struct {Term intLeaderId intPrevLogIndex intPrevLogTerm intEntries []LogEntryLeaderCommit int
}
type AppendEntriesReply struct {Term intSuccess boolConflictIndex intConflictTerm int
}
这里是请求投票和心跳RPC参数和返回结果,其中有部分是2B 2C的,没有用到的可以跳过,写上也没关系
func Make(peers []*labrpc.ClientEnd, me int,persister *Persister, applyCh chan ApplyMsg) *Raft {rf := &Raft{}rf.peers = peersrf.persister = persisterrf.me = me// Your initialization code here (2A, 2B, 2C).rf.role = ROLE_FOLLOWERrf.leaderId = -1rf.votedFor = -1rf.lastActiveTime = time.Now()rf.lastIncludedIndex = 0rf.lastIncludedTerm = 0rf.applyCh = applyCh// rf.nextIndex = make([]int, len(rf.peers))// rf.matchIndex = make([]int, len(rf.peers))// initialize from state persisted before a crashrf.readPersist(persister.ReadRaftState())rf.installSnapshotToApplication()// start ticker goroutine to start electionsgo rf.electionLoop()go rf.appendEntriesLoop()go rf.applyLogLoop(applyCh)//go rf.ticker()DPrintf("Raftnode[%d]启动", me)return rf
}
这是make函数(程序入口函数)做一些初始化工作,和开启raft
// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {rf.mu.Lock()defer rf.mu.Unlock()var term intvar isleader bool// Your code here (2A).term = rf.currentTermisleader = rf.role == ROLE_LEADERreturn term, isleader
}
这个函数test会调用
由于我的代码是最终的结果,对2A的改动比较大,所以从这往下都采用lab2A这个大佬的代码,想看原版的同志,直接可以跳进去,本文只作为个人复习所用。
// example RequestVote RPC handler.
//
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {// Your code here (2A, 2B).rf.mu.Lock()defer rf.mu.Unlock()reply.Term = rf.currentTermreply.VoteGranted = falseDPrintf("RaftNode[%d] Handle RequestVote, CandidatesId[%d] Term[%d] CurrentTerm[%d] LastLogIndex[%d] LastLogTerm[%d] votedFor[%d]",rf.me, args.CandidateId, args.Term, rf.currentTerm, args.LastLogIndex, args.LastLogTerm, rf.votedFor)defer func() {DPrintf("RaftNode[%d] Return RequestVote, CandidatesId[%d] VoteGranted[%v] ", rf.me, args.CandidateId, reply.VoteGranted)}()// 任期不如我大,拒绝投票if args.Term < rf.currentTerm {return}// 发现更大的任期,则转为该任期的followerif args.Term > rf.currentTerm {rf.currentTerm = args.Termrf.role = ROLE_FOLLOWERrf.votedFor = -1rf.leaderId = -1// 继续向下走,进行投票}// 每个任期,只能投票给1人if rf.votedFor == -1 || rf.votedFor == args.CandidateId {// candidate的日志必须比我的新// 1, 最后一条log,任期大的更新// 2,更长的log则更新lastLogTerm := 0if len(rf.log) != 0 {lastLogTerm = rf.log[len(rf.log)-1].Term}if args.LastLogTerm < lastLogTerm || args.LastLogIndex < len(rf.log) {return}rf.votedFor = args.CandidateIdreply.VoteGranted = truerf.lastActiveTime = time.Now() // 为其他人投票,那么重置自己的下次投票时间}rf.persist()
}func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {ok := rf.peers[server].Call("Raft.RequestVote", args, reply)return ok
}func (rf *Raft) electionLoop() {for !rf.killed() {time.Sleep(1 * time.Millisecond)func() {rf.mu.Lock()defer rf.mu.Unlock()now := time.Now()timeout := time.Duration(200+rand.Int31n(150)) * time.Millisecond // 超时随机化elapses := now.Sub(rf.lastActiveTime)// follower -> candidatesif rf.role == ROLE_FOLLOWER {if elapses >= timeout {rf.role = ROLE_CANDIDATESDPrintf("RaftNode[%d] Follower -> Candidate", rf.me)}}// 请求voteif rf.role == ROLE_CANDIDATES && elapses >= timeout {rf.lastActiveTime = now // 重置下次选举时间rf.currentTerm += 1 // 发起新任期rf.votedFor = rf.me // 该任期投了自己rf.persist()// 请求投票reqargs := RequestVoteArgs{Term: rf.currentTerm,CandidateId: rf.me,LastLogIndex: len(rf.log),}if len(rf.log) != 0 {args.LastLogTerm = rf.log[len(rf.log)-1].Term}rf.mu.Unlock()DPrintf("RaftNode[%d] RequestVote starts, Term[%d] LastLogIndex[%d] LastLogTerm[%d]", rf.me, args.Term,args.LastLogIndex, args.LastLogTerm)// 并发RPC请求votetype VoteResult struct {peerId intresp *RequestVoteReply}voteCount := 1 // 收到投票个数(先给自己投1票)finishCount := 1 // 收到应答个数voteResultChan := make(chan *VoteResult, len(rf.peers))for peerId := 0; peerId < len(rf.peers); peerId++ {go func(id int) {if id == rf.me {return}resp := RequestVoteReply{}if ok := rf.sendRequestVote(id, &args, &resp); ok {voteResultChan <- &VoteResult{peerId: id, resp: &resp}} else {voteResultChan <- &VoteResult{peerId: id, resp: nil}}}(peerId)}maxTerm := 0for {select {case voteResult := <-voteResultChan:finishCount += 1if voteResult.resp != nil {if voteResult.resp.VoteGranted {voteCount += 1}if voteResult.resp.Term > maxTerm {maxTerm = voteResult.resp.Term}}// 得到大多数vote后,立即离开if finishCount == len(rf.peers) || voteCount > len(rf.peers)/2 {goto VOTE_END}}}VOTE_END:rf.mu.Lock()defer func() {DPrintf("RaftNode[%d] RequestVote ends, finishCount[%d] voteCount[%d] Role[%s] maxTerm[%d] currentTerm[%d]", rf.me, finishCount, voteCount,rf.role, maxTerm, rf.currentTerm)}()// 如果角色改变了,则忽略本轮投票结果if rf.role != ROLE_CANDIDATES {return}// 发现了更高的任期,切回followerif maxTerm > rf.currentTerm {rf.role = ROLE_FOLLOWERrf.leaderId = -1rf.currentTerm = maxTermrf.votedFor = -1rf.persist()return}// 赢得大多数选票,则成为leaderif voteCount > len(rf.peers)/2 {rf.role = ROLE_LEADERrf.leaderId = rf.merf.lastBroadcastTime = time.Unix(0, 0) // 令appendEntries广播立即执行return}}}()}
}
这个包括请求投票PRC的处理函数 ,RPC调用,以及选举loop
先说选举loop做了什么
- 首先这里检查选举超时,这里的超时每次都不一样,减少了多个candidate同时出现的几率,若超时则转为candidate,并且开启选举
- 准备投票参数以及reply,向每个peer发送请求投票的RPC
- 处理RPC响应,这里是用了通道通知,一旦接受了所有peer的响应,或者投票结果大于人数的二分之一则跳到处理响应
- 然后继续循环
如何处理?
- 投票结果大于人数的二分之一则当选,当选后,同一周期其他候选人转为follow状态
- 如果返回的周期比现有周期大则,转为follow
- 如果返回后角色发生变化则直接返回
然后说说请求投票做了什么?
- 如果请求的周期大于本身的周期,不管什么状态,本身角色转为follow,并且周期变成更大的
- 如果请求周期小于自身周期,则直接返回
- rf.votedFor == -1 || rf.votedFor == args.CandidateId 这个条件缺一不可,也是论文中要求的写法,-1可以想象成可以给任何人投票的意思
- if args.LastLogTerm < lastLogTerm || args.LastLogIndex < len(rf.log) 这个判断是2B的内容(日志一致性),2A中可以直接删除
- 最后投票成功后,重新设置自身选举时间,然后投票成功
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {rf.mu.Lock()defer rf.mu.Unlock()DPrintf("RaftNode[%d] Handle AppendEntries, LeaderId[%d] Term[%d] CurrentTerm[%d] role=[%s]",rf.me, args.LeaderId, args.Term, rf.currentTerm, rf.role)defer func() {DPrintf("RaftNode[%d] Return AppendEntries, LeaderId[%d] Term[%d] CurrentTerm[%d] role=[%s]",rf.me, args.LeaderId, args.Term, rf.currentTerm, rf.role)}()reply.Term = rf.currentTermreply.Success = falseif args.Term < rf.currentTerm {return}// 发现更大的任期,则转为该任期的followerif args.Term > rf.currentTerm {rf.currentTerm = args.Termrf.role = ROLE_FOLLOWERrf.votedFor = -1rf.leaderId = -1// 继续向下走}// 认识新的leaderrf.leaderId = args.LeaderId// 刷新活跃时间rf.lastActiveTime = time.Now()// 日志操作lab-2A不实现rf.persist()
}func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)return ok
}// lab-2A只做心跳,不考虑log同步
func (rf *Raft) appendEntriesLoop() {for !rf.killed() {time.Sleep(1 * time.Millisecond)func() {rf.mu.Lock()defer rf.mu.Unlock()// 只有leader才向外广播心跳if rf.role != ROLE_LEADER {return}// 100ms广播1次now := time.Now()if now.Sub(rf.lastBroadcastTime) < 100*time.Millisecond {return}rf.lastBroadcastTime = time.Now()// 并发RPC心跳type AppendResult struct {peerId intresp *AppendEntriesReply}for peerId := 0; peerId < len(rf.peers); peerId++ {if peerId == rf.me {continue}args := AppendEntriesArgs{}args.Term = rf.currentTermargs.LeaderId = rf.me// log相关字段在lab-2A不处理go func(id int, args1 *AppendEntriesArgs) {DPrintf("RaftNode[%d] appendEntries starts, myTerm[%d] peerId[%d]", rf.me, args1.Term, id)reply := AppendEntriesReply{}if ok := rf.sendAppendEntries(id, args1, &reply); ok {rf.mu.Lock()defer rf.mu.Unlock()if reply.Term > rf.currentTerm { // 变成followerrf.role = ROLE_FOLLOWERrf.leaderId = -1rf.currentTerm = reply.Termrf.votedFor = -1rf.persist()}DPrintf("RaftNode[%d] appendEntries ends, peerTerm[%d] myCurrentTerm[%d] myRole[%s]", rf.me, reply.Term, rf.currentTerm, rf.role)}}(peerId, &args)}}()}
}
这三个函数分别是心跳处理,PRC,心跳发送
先看心跳发送(leader当选后,需要维持自己的权威,就相当于隔一段时间问他手下:“喂,你活着么”。手下不回答就是“g了”,回答就是“我一直都在 ღ( ´・ᴗ・` )比心”)
- 首先查看心跳超时机制,若超时则发送心跳
- 向每一个peer发送心跳,参数没有什么过多的检查,只是一个周期和通知leader的参数检查
- 处理响应,也就是如果返回周期大于当前周期则自身转为follow,这也是日志一致性的体现,保证leader一定是最新的
心跳处理,简单的检查周期,和重置领单人,重置自身的选举时间(相当于,他知道老大还活着,所以要重新更改自己的篡位(造反)时间)
- 如果参数周期大于自身,则转follow
- 若小于,则带着更大的周期返回
- 心跳发送成功,更新领导人
写在后边
这就是2A的全部了,其实中单就是4个函数,请求投票(选举),处理投票,发送心跳,心跳回复。关于一些锁的应用,用大锁保护就行,特别注意RPC期间要释放锁,不然会引起死锁。
下面列出大佬对2A的总结,我这里附上,大佬的网址在本文中已经给出
- 一把大锁保护好状态,RPC期间释放锁,RPC结束后注意状态二次判定
- request/response都要先判断term > currentTerm,转换follower
- 一个currentTerm只能voteFor其他节点1次
- 注意candidates请求vote的时间随机性
- 注意requestVote得到大多数投票后立即结束等待剩余RPC
- 注意成为leader后尽快appendEntries心跳,否则其他节点又会成为candidates
- 注意几个刷新选举超时时间的逻辑点
由于明天下午还有英语课 ,还是先写写2B的内容吧,明天早上verilog考试,不复习了,摆烂摆烂。
github地址
(mr)
MIT6.824(lab2A-领导人选举)相关推荐
- MIT 6.824 Lab2A (raft) -- Leader Election
文章目录 实验要求 Leader Election流程 及详细实现介绍 基本角色 关键超时变量 关键的两个RPC实现 RequestVote RPC AppendEntries RPC Go并发编程实 ...
- mit6.824 2022 lab2
MIT6.824 2022 Raft Raft leader election log persistence log compaction 整体测试 后面发现的问题 参考代码 汇总博客:MIT6.8 ...
- jgroups_JGroups:无需额外基础架构的领导人选举
jgroups 嗨,您好, 在本文中,我将展示如何在不使用任何其他基础架构(例如Apache Zookeeper或Consul)的情况下解决领导人选举的问题. 领导者选举是解决以下问题的一种常见方法: ...
- JGroups:无需额外基础架构的领导人选举
嗨,您好, 在这篇文章中,我将展示如何在不使用任何其他基础架构(例如Apache Zookeeper或Consul)的情况下解决领导人选举的问题. 领导者选举是解决以下问题的一种常见方法:在分布式系统 ...
- MIT6.824环境搭建:wls+vs code
MIT6.824环境搭建:wls+vs code 背景 尝试学习MIT 6.824分布式系统,他们的实验使用的是go语言,并且不支持window.打算使用wls+vs code搭建开发环境.这里做记录 ...
- MIT6.824 lab4B实验记录
Background 主要是完成一个可以根据group数量,动态调整shard所属的group的分布式kv键值引擎.其中shard->group的配置由shardctrler集群来管理,底层也是 ...
- MIT6.824 Spanner论文精读
文章目录 Introduction Implementation Spanserver Software Stack Directories and Placement Data Model True ...
- MIT6.824 Lab1 MapReduce
MapReduce分布式计算框架 MapReduce是谷歌开发的分布式计算框架.MapReduce需用户指定Map和Reduce两个函数具体操作内容.现实世界大多数计算操作都可以基于该操作完成. Ma ...
- 笔记 MIT6.824 Lecture 17: COPS, Causal Consistency
目录 前言 一.geo-replication 1.1 Spanner 1.2 Memchche 1.3 新的需求 二.预备方案One 三.预备方案Two 四.COPS 五.limitations 总 ...
最新文章
- 《JavaScript高级程序设计》阅读笔记(二):ECMAScript中的原始类型
- MongoDB学习笔记~为IMongoRepository接口添加分页取集合的方法
- ST17H26上下拉电阻设置注意事项
- python字符串使用技巧
- XP下修改IIS连接数
- linux sys存放内容,了解linux系统目录,sys,tmp,usr,var!
- RabbitMQ学习笔记:安装环境
- 快商通知识图谱工程:让信息不再零碎,构建全行业的知识库 |百万人学AI评选
- 在PreferenceScreen加入自定义布局
- cocos builder中使用九宫格
- 怎么批量删除html里的字段,shp文件怎么删除字段
- PR转场 700+抖音视频转场素材包含PR调色预设和音效素材
- 找出不大于n的最大质数
- 2019年全国大学生电子设计竞赛赛题分享与浅析
- 宝藏又小众的灯罩VRay材质球素材网站分享
- mysql研究内容_基于MySQL数据库的数据管理的研究
- 【Django 天天生鲜项目05】订单(Mysql事务、并发处理、支付宝支付、评论)
- 微信支付之微信小程序支付
- vue 移动端头像裁剪_Vue 头像裁剪控件
- CentOS6启动和内核管理
热门文章
- SEC合规审查办公室2018工作重点:加强对加密货币企业信息披露的监管
- 【PAT(甲级)】1063 Set Similarity(题目意思)
- MySQL配置文件my.ini的一般设置
- 聊聊运营活动的设计与实现逻辑
- An error occurred. Sorry, the page you are looking for is currently unavailable. Please try again
- 我们这个时代,寒门再难出贵子
- 第1章 	SQL Server基本操作
- 前端网站-文档、工具
- 搭建电话机器人或OKCC外呼中心系统实体机与云服务器哪个好(三)
- node.js把前台传来的base64码转成图片存放