文章目录

  • Raft源码阅读
    • 代码结构
    • 消息结构
      • Entry
      • Message
    • 日志模块
      • log_unstable.go
      • Storage
      • log.go
    • 状态机
      • progress.go
    • 核心算法
      • raft.go
      • node.go
    • 执行流程
      • 选举流程
      • 写入流程

Raft源码阅读

etcd 是 coreOS 使用 golang 开发的分布式,一致性的 kv 存储系统,因其易用性和高可靠性被广泛运用于服务发现、消息发布和订阅、分布式锁和共享配置等方面,也被认为是 zookeeper 的强有力的竞争者。

作为分布式 kv,其底层使用 raft 算法实现多副本数据的强一致性。etcd 作为 raft 开源实现的标杆,在设计上,将 raft 算法逻辑和持久化、网络、线程等完全抽离出来单独实现,充分解耦,在工程上,实现了诸多性能优化,是 raft 开源实践中较早的工业级的实现,很多后来的 raft 实践者都直接或者间接的参考了 ectd-raft 的设计和实现,例如 kubernetes,TIdb 等。其广泛的影响力和优雅的 golang 代码实践也使得 ectd 成为 golang 的明星项目。

在我们实际的分布式存储系统的项目开发中,raft 也被应用于元信息管理和数据存储等多个模块,因此熟悉和理解 etcd-raft 的实现具有重大意义,本文的主要内容就是分析 raft 在 ectd 中的具体实现。

源码链接:etcd

中文注释:etcd-中文注释

代码结构

首先给出代码结构,然后按照各个模块依次介绍

$ tree --dirsfirst -L 1 -I '*test*' -P '*.go'
.
├── raftpb
├── doc.go
├── log.go
├── log_unstable.go
├── logger.go
├── node.go
├── progress.go
├── raft.go
├── rawnode.go
├── read_only.go
├── status.go
├── storage.go
└── util.go

消息结构

Raft 的序列化是基于 Protocol Buffer 实现的,因此在该目录下就定义了几个需要序列化的数据结构。

Entry

从整体上来说,一个集群中的每个节点都是一个状态机,而 raft 管理的就是对这个状态机进行更改的一些操作,这些操作在代码中被封装为一个个 Entry

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/raftpb/raft.pb.gotype Entry struct {Term             uint64    //选举任期,每次选举之后递增1。用于标记信息的时效性Index            uint64    //当前这个entry在整个raft日志中的位置索引Type             EntryType //当前entry的类型,目前etcd支持两种类型:EntryNormal和EntryConfChange,EntryNormal代表当前Entry是对状态机的操作,EntryConfChange则代表对当前集群配置进行更改的操作Data             []byte    //被序列化后的byte数组,代表当前entry真正要执行的操作。当Type为EntryNormal时为key-value pair;当Type为EntryConfChange时为ConfChange
}

Message

Raft 集群中节点之间通过传递不同的 Message 来完成通讯,这个 Message 结构涵盖了各种消息所需的字段。

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/raftpb/raft.pb.gotype Message struct {Type             MessageType //消息类型(心跳MsgHeartbeat、日志MsgApp、投票MsgVote等)To               uint64      //这个消息的接受者From             uint64      //这个消息的发送者Term             uint64      //这个消息发出时整个集群所处的任期,即逻辑时钟LogTerm          uint64      //消息发出者所保存的日志中最后一条的任期号Index            uint64      //日志索引号。如果当前消息是MsgVote的话,代表这个候选人最后一条日志的索引号Entries          []Entry     //需要存储的日志Commit           uint64      //已经提交的日志的索引值,用来向别人同步日志的提交信息Snapshot         Snapshot    //快照Reject           bool        //对方节点拒绝了当前节点的请求(MsgVote/MsgApp/MsgSnap…)RejectHint       uint64      //对方节点拒绝了当前节点的请求(MsgVote/MsgApp/MsgSnap…)Context          []byte      //上下文信息
}

日志模块

log_unstable.go

unstable 数据结构用于还没有被用户层持久化的数据,它维护了两部分内容 snapshotentries 。由于日志信息不可能无休止的增长,所以超过一定期限后就会被删除。而为了能保证将数据完整的同步到其他节点上,每个一段时间就会将当前的状态存储为快照的形式,此时新节点只需要以快照为基础,再执行快照之后的日志(通过offset确定),就可以完成状态同步。

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/log_unstable.gotype unstable struct {// 保存还没有持久化的快照数据snapshot *pb.Snapshot// 还未持久化的数据entries []pb.Entry// offset用于保存entries数组中的数据的起始indexoffset  uint64logger Logger
}

具体结构关系如下图

unstable 结构关系图

Storage

这个文件定义了一个 Storage 接口,因为 etcd 中的 raft 实现并不负责数据的持久化,所以它希望上面的应用层能实现这个接口,以便提供给它查询 log 的能力。

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/storage.gotype Storage interface {// 返回保存的初始状态InitialState() (pb.HardState, pb.ConfState, error)// 返回索引范围在[lo,hi)之内并且不大于maxSize的entries数组Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)// 传入一个索引值,返回这个索引值对应的任期号,如果不存在则error不为空,其中:// ErrCompacted:表示传入的索引数据已经找不到,说明已经被压缩成快照数据了。// ErrUnavailable:表示传入的索引值大于当前的最大索引Term(i uint64) (uint64, error)// 获得最后一条数据的索引值LastIndex() (uint64, error)// 返回第一条数据的索引值FirstIndex() (uint64, error)// 返回最新的快照数据Snapshot() (pb.Snapshot, error)
}

另外,这个文件也提供了 Storage 接口的一个内存版本的实现 MemoryStorage,这个实现同样也维护了 snapshotentries 这两部分,他们的排列跟 unstable 中的类似,也是 snapshot 在前,entries 在后。从代码中看来 etcdserverraftexample 都是直接用的这个实现来提供 log 的查询功能的。

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/storage.go// 使用在内存中的数组来实现 Storage 接口的结构体,具体实现参考上面的链接
type MemoryStorage struct {sync.MutexhardState pb.HardStatesnapshot  pb.Snapshotents []pb.Entry
}

log.go

这个结构体承担了 raft 日志相关的操作。

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/log.gotype raftLog struct {// 用于保存自从最后一次snapshot之后提交的数据storage Storage// 用于保存还没有持久化的数据和快照,这些数据最终都会保存到storage中unstable unstable// committed数据索引committed uint64// committed保存是写入持久化存储中的最高index,而applied保存的是传入状态机中的最高index// 即一条日志首先要提交成功(即committed),才能被applied到状态机中// 因此以下不等式一直成立:applied <= committedapplied uint64logger Logger
}

一条日志数据,首先需要被 committed 成功,然后才能被应用 applied 到状态机中。因此,以下不等式一直成立:applied <= committed

raftLog 结构图

这个数据布局从下面这段初始化函数也可以看出

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/log.gofunc newLog(storage Storage, logger Logger) *raftLog {if storage == nil {log.Panic("storage must not be nil")}log := &raftLog{storage: storage,logger:  logger,}firstIndex, err := storage.FirstIndex()if err != nil {panic(err) // TODO(bdarnell)}lastIndex, err := storage.LastIndex()if err != nil {panic(err) // TODO(bdarnell)}// offset从持久化之后的最后一个index的下一个开始log.unstable.offset = lastIndex + 1log.unstable.logger = logger// committed和applied从持久化的第一个index的前一个开始log.committed = firstIndex - 1log.applied = firstIndex - 1return log
}

持久化存储和非持久化存储的分界线其实就是 lastIndex。在此之前都是 Storage 管理的已经持久化的数据,而在此之后都是 unstable 管理的还没有持久化的数据。

状态机

progress.go

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/progress.gotype Progress struct {// Next保存的是下一次leader发送append消息时传送过来的日志索引// 当选举出新的leader时,首先初始化Next为该leader最后一条日志+1// 如果向该节点append日志失败,则递减Next回退日志,一直回退到索引匹配为止// Match保存在该节点上保存的日志的最大索引,初始化为0// 正常情况下,Next = Match + 1// 以下情况下不是上面这种情况:// 1. 切换到Probe状态时,如果上一个状态是Snapshot状态,即正在接收快照,那么Next = max(pr.Match+1, pendingSnapshot+1)// 2. 当该follower不在Replicate状态时,说明不是正常的接收副本状态。//    此时当leader与follower同步leader上的日志时,可能出现覆盖的情况,即此时follower上面假设Match为3,但是索引为3的数据会被//    leader覆盖,此时Next指针可能会一直回溯到与leader上日志匹配的位置,再开始正常同步日志,此时也会出现Next != Match + 1的情况出现Match, Next uint64// 三种状态// ProgressStateProbe:探测状态// ProgressStateReplicate:副本状态// ProgressStateSnapshot:快照状态State ProgressStateType// 在状态切换到Probe状态以后,该follower就标记为Paused,此时将暂停同步日志到该节点Paused bool// 如果向该节点发送快照消息,PendingSnapshot用于保存快照消息的索引// 当PendingSnapshot不为0时,该节点也被标记为暂停状态。// raft只有在这个正在进行中的快照同步失败以后,才会重传快照消息PendingSnapshot uint64// 如果进程最近处于活跃状态则为 true(收到来自跟随者的任意消息都认为是活动状态)。在超时后会重置重置为false RecentActive bool// 用于实现滑动窗口,用来做流量控制ins *inflights
}

对于不同的状态,其会采取不同的行为:

  • ProgressStateProbe:探测状态,当 follower 拒绝了最近的 append 消息时,那么就会进入探测状态,此时 leader 会试图继续往前追溯该 follower 的日志从哪里开始丢失的。在 probe 状态时,leader 每次最多 append 一条日志,如果收到的回应中带有 RejectHint 信息,则回退 Next 索引,以便下次重试。在初始时,leader 会把所有 follower 的状态设为 probe,因为它并不知道各个 follower 的同步状态,所以需要慢慢试探。
  • ProgressStateReplicate:当 leader 确认某个 follower 的同步状态后,它就会把这个 follower 的 state 切换到这个状态,并且用 pipeline 的方式快速复制日志。leader 在发送复制消息之后,就修改该节点的 Next 索引为发送消息的最大索引 + 1。
  • ProgressStateSnapshot:接收快照状态。当 leader 向某个 follower 发送 append 消息,试图让该 follower 状态跟上 leader 时,发现此时 leader上保存的索引数据已经对不上了,比如leader在index为10之前的数据都已经写入快照中了,但是该 follower 需要的是 10 之前的数据,此时就会切换到该状态下,发送快照给该 follower。当快照数据同步追上之后,并不是直接切换到 Replicate 状态,而是首先切换到 Probe 状态。

从上面我们可以看出 Progress 是个状态机,下面是它的状态转移图:

Progress状态转移图

核心算法

raft.go

前面介绍了消息、日志、状态机,接下来就是 raft 的核心实现。其中大部分的逻辑都在状态机函数 Step 中:

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/raft.go// raft的状态机
func (r *raft) Step(m pb.Message) error {r.logger.Infof("from:%d, to:%d, type:%s, term:%d, state:%v", m.From, m.To, m.Type, r.Term, r.state)// Handle the message term, which may result in our stepping down to a follower.switch {case m.Term == 0:// 来自本地的消息case m.Term > r.Term:// 消息的Term大于节点当前的Termlead := m.Fromif m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {// 如果收到的是投票类消息// 当context为campaignTransfer时表示强制要求进行竞选force := bytes.Equal(m.Context, []byte(campaignTransfer))// 是否在租约期以内inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeoutif !force && inLease {// 如果非强制,而且又在租约期以内,就不做任何处理// 非强制又在租约期内可以忽略选举消息,见论文的4.2.3,这是为了阻止已经离开集群的节点再次发起投票请求r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)return nil}// 否则将lead置为空lead = None}switch {// 注意Go的switch case不做处理的话是不会默认走到default情况的case m.Type == pb.MsgPreVote:// Never change our term in response to a PreVote// 在应答一个prevote消息时不对任期term做修改case m.Type == pb.MsgPreVoteResp && !m.Reject:default:r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",r.id, r.Term, m.Type, m.From, m.Term)// 变成follower状态r.becomeFollower(m.Term, lead)}case m.Term < r.Term:// 消息的Term小于节点自身的Term,同时消息类型是心跳消息或者是append消息if r.checkQuorum && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {// 收到了一个节点发送过来的更小的term消息。这种情况可能是因为消息的网络延时导致,但是也可能因为该节点由于网络分区导致了它递增了term到一个新的任期。// ,这种情况下该节点不能赢得一次选举,也不能使用旧的任期号重新再加入集群中。如果checkQurom为false,这种情况可以使用递增任期号应答来处理。// 但是如果checkQurom为True,// 此时收到了一个更小的term的节点发出的HB或者APP消息,于是应答一个appresp消息,试图纠正它的状态r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})} else {// ignore other cases// 除了上面的情况以外,忽略任何term小于当前节点所在任期号的消息r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",r.id, r.Term, m.Type, m.From, m.Term)}// 在消息的term小于当前节点的term时,不往下处理直接返回了return nil}//核心流程switch m.Type {case pb.MsgHup:// 收到HUP消息,说明准备进行选举if r.state != StateLeader {// 当前不是leader// 取出[applied+1,committed+1]之间的消息,即得到还未进行applied的日志列表ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)if err != nil {r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)}// 如果其中有config消息,并且commited > applied,说明当前还有没有apply的config消息,这种情况下不能开始投票if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)return nil}r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)// 进行选举if r.preVote {r.campaign(campaignPreElection)} else {r.campaign(campaignElection)}} else {r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)}case pb.MsgVote, pb.MsgPreVote:// 收到投票类的消息if (r.Vote == None || m.Term > r.Term || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {// 如果当前没有给任何节点投票(r.Vote == None)或者投票的节点term大于本节点的(m.Term > r.Term)// 或者是之前已经投票的节点(r.Vote == m.From)// 同时还满足该节点的消息是最新的(r.raftLog.isUpToDate(m.Index, m.LogTerm)),那么就接收这个节点的投票r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)r.send(pb.Message{To: m.From, Type: voteRespMsgType(m.Type)})if m.Type == pb.MsgVote {// 保存下来给哪个节点投票了r.electionElapsed = 0r.Vote = m.From}} else {// 否则拒绝投票r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)r.send(pb.Message{To: m.From, Type: voteRespMsgType(m.Type), Reject: true})}default:// 其他情况下进入各种状态下自己定制的状态机函数r.step(r, m)}return nil
}

将具体逻辑代码去掉,其实我们可以得到以下框架

func (r *raft) Step(m pb.Message) error {//...switch m.Type {case pb.MsgHup://...case pb.MsgVote, pb.MsgPreVote://...default:r.step(r, m)}
}

Step 其实就是根据消息类型的不同(MsgHup/MsgVote/MsgPreVote)而去执行对应的逻辑(状态机)。而对于其他状态,此时则会执行 default 中的 step(函数指针,根据节点角色执行不同的函数stepLeader/stepFollower/stepCandidate)

Raft状态机

node.go

node 其实是应用层和 Raft 协议层之间的中转站,其将上层应用层的消息传递给底层协议层模块,并将协议层的结果反馈给应用层。从代码后半段中可以看到,这里通过 for-select 从 channel 中不断处理数据。

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/node.gofunc (n *node) run(r *raft) {var propc chan pb.Messagevar readyc chan Readyvar advancec chan struct{}var prevLastUnstablei, prevLastUnstablet uint64var havePrevLastUnstablei boolvar prevSnapi uint64var rd Readylead := NoneprevSoftSt := r.softState()prevHardSt := emptyStatefor {if advancec != nil {// advance channel不为空,说明还在等应用调用Advance接口通知已经处理完毕了本次的ready数据readyc = nil} else {rd = newReady(r, prevSoftSt, prevHardSt)if rd.containsUpdates() {// 如果这次ready消息有包含更新,那么ready channel就不为空readyc = n.readyc} else {// 否则为空readyc = nil}}if lead != r.lead {// 如果leader发生了变化if r.hasLeader() {   // 如果原来有leaderif lead == None {// 当前没有leaderr.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)} else {// leader发生了改变r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)}// 有leader,那么可以进行数据提交,prop channel不为空propc = n.propc} else {// 否则,prop channel为空r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)propc = nil}lead = r.lead}select {// TODO: maybe buffer the config propose if there exists one (the way// described in raft dissertation)// Currently it is dropped in Step silently.case m := <-propc:// 处理本地收到的提交值m.From = r.idr.Step(m)case m := <-n.recvc:// 处理其他节点发送过来的提交值// filter out response message from unknown From.if _, ok := r.prs[m.From]; ok || !IsResponseMsg(m.Type) {// 需要确保节点在集群中或者不是应答类消息的情况下才进行处理r.Step(m) // raft never returns an error}case cc := <-n.confc:// 接收到配置发生变化的消息if cc.NodeID == None {// NodeId为空的情况,只需要直接返回当前的nodes就好r.resetPendingConf()select {case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:case <-n.done:}break}switch cc.Type {case pb.ConfChangeAddNode:r.addNode(cc.NodeID)case pb.ConfChangeRemoveNode:// block incoming proposal when local node is// removed// 如果删除的是本节点,停止提交if cc.NodeID == r.id {propc = nil}r.removeNode(cc.NodeID)case pb.ConfChangeUpdateNode:r.resetPendingConf()default:panic("unexpected conf type")}// 返回当前nodesselect {case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:case <-n.done:}case <-n.tickc:r.tick()case readyc <- rd:// 通过channel写入ready数据// 以下先把ready的值保存下来,等待下一次循环使用,或者当advance调用完毕之后用于修改raftLog的if rd.SoftState != nil {prevSoftSt = rd.SoftState}if len(rd.Entries) > 0 {// 保存上一次还未持久化的entries的index、termprevLastUnstablei = rd.Entries[len(rd.Entries)-1].IndexprevLastUnstablet = rd.Entries[len(rd.Entries)-1].TermhavePrevLastUnstablei = true}if !IsEmptyHardState(rd.HardState) {prevHardSt = rd.HardState}if !IsEmptySnap(rd.Snapshot) {prevSnapi = rd.Snapshot.Metadata.Index}r.msgs = nilr.readStates = nil// 修改advance channel不为空,等待接收advance消息advancec = n.advanceccase <-advancec:// 收到advance channel的消息if prevHardSt.Commit != 0 {// 将committed的消息appliedr.raftLog.appliedTo(prevHardSt.Commit)}if havePrevLastUnstablei {// 将还没有持久化的数据进行持久化r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)havePrevLastUnstablei = false}r.raftLog.stableSnapTo(prevSnapi)advancec = nilcase c := <-n.status:c <- getStatus(r)case <-n.stop:close(n.done)return}}
}

从后半部分代码中可以看到,propcrecvc中 拿到的是从上层应用传进来的消息,并且这个消息会被交给 raft层的 Step 函数处理。

那么我们接着来看 readyc。上面也说了,在 etcd 的这个实现中,node 并不负责数据的持久化、网络消息的通信、以及将已经提交的 log 应用到状态机中,所以 node 使用 readyc 这个channel对外通知有数据要处理了,并将这些需要外部处理的数据打包到一个Ready结构体中,传递给下层 Raft 协议层。

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/node.gotype Ready struct {// 软状态是异变的,包括:当前集群leader、当前节点状态*SoftState// 硬状态需要被保存,包括:节点当前Term、Vote、Commit// 如果当前这部分没有更新,则等于空状态pb.HardState// 保存ready状态的readindex数据信息ReadStates []ReadState// 需要在消息发送之前被写入到持久化存储中的entries数据数组Entries []pb.Entry// 需要写入到持久化存储中的快照数据Snapshot pb.Snapshot// 需要输入到状态机中的数据数组,这些数据之前已经被保存到持久化存储中了CommittedEntries []pb.Entry// 在entries被写入持久化存储中以后,需要发送出去的数据Messages []pb.Message
}

当下层得到这个 Ready 之后,其就会触发以下行为:

  1. 将 HardState, Entries, Snapshot 持久化。
  2. 将 Messages 广播给其他节点。
  3. 将 CommittedEntries(已经 commit 还没有 apply )应用到状态机。
  4. 如果发现 CommittedEntries 中有成员变更类型的 entry,调用 node.ApplyConfChange() 方法让 node 知道。
  5. 最后再调用 node.Advance() 告诉 raft,这批状态更新处理完了,状态已经演进了,可以给我下一批 Ready让我处理。

执行流程

选举流程

在 node.go 的大循环中,有一个会定时触发的 tick channel,当其触发时就会调用 raft.tick(),而当当前节点的身份为 follower 时,其就会通过函数指针调用 tickElection

https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/raft.gofunc (r *raft) tickElection() {r.electionElapsed++if r.promotable() && r.pastElectionTimeout() {// 如果可以被提升为leader,同时选举时间也到了r.electionElapsed = 0// 发送HUP消息是为了重新开始选举r.Step(pb.Message{From: r.id, Type: pb.MsgHup})}
}

从上面的代码我们可以看到,其会判断超时时间是否达到,以及是否满足晋升 leader 的条件,如果可以即调用Step 函数并传递 MsgHup 消息来进入选举状态。

https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/raft.gofunc (r *raft) Step(m pb.Message) error {//...switch m.Type {case pb.MsgHup:r.campaign(campaignElection)}
}

从上面可以看到,当接收到的消息为 MsgHup 时即调用 raft.campaign() 触发选举。

https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/raft.gofunc (r *raft) campaign(t CampaignType) {var term uint64var voteMsg pb.MessageTypeif t == campaignPreElection {r.becomePreCandidate()voteMsg = pb.MsgPreVoteterm = r.Term + 1} else {r.becomeCandidate()voteMsg = pb.MsgVoteterm = r.Term}// 调用poll函数给自己投票,同时返回当前投票给本节点的节点数量if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {// 有半数投票,说明通过,切换到下一个状态if t == campaignPreElection {r.campaign(campaignElection)} else {// 如果给自己投票之后,刚好超过半数的通过,那么就成为新的leaderr.becomeLeader()}return}// 向集群里的其他节点发送投票消息for id := range r.prs {if id == r.id {// 过滤掉自己continue}r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)var ctx []byteif t == campaignTransfer {ctx = []byte(t)}r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})}
}

这里的逻辑如下:

  1. 判断投票状态,将身份从 follower 变更为 candidate,更新任期。

    1. 调用 poll 给自己投票,并检查自己的票数,如果投票人数超过法定人数,则说明选举成功,将状态修改为 leader。
  2. 如果此时同意人数不够,则给集群中其他节点发送投票信息。

具体的检票逻辑如下

https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/raft.go// 轮询集群中所有节点,返回一共有多少节点已经进行了投票
func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int) {if v {r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)} else {r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)}// 如果id没有投票过,那么更新id的投票情况if _, ok := r.votes[id]; !ok {r.votes[id] = v}// 计算下都有多少节点已经投票给自己了for _, vv := range r.votes {if vv {granted++}}return granted
}

对于其他节点,当在接收到上面 voteMsg 投票信息时,就会检查接收到的 Term 是否比自己的大,以及日志信息是否比自己的新,来决定是否需要进行投票,具体的逻辑如下:

https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/raft.gofunc (r *raft) Step(m pb.Message) error {switch m.Type {case pb.MsgVote, pb.MsgPreVote:// 收到投票类的消息if (r.Vote == None || m.Term > r.Term || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {// 如果当前没有给任何节点投票(r.Vote == None)或者投票的节点term大于本节点的(m.Term > r.Term)// 或者是之前已经投票的节点(r.Vote == m.From)// 同时还满足该节点的消息是最新的(r.raftLog.isUpToDate(m.Index, m.LogTerm)),那么就接收这个节点的投票r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)r.send(pb.Message{To: m.From, Type: voteRespMsgType(m.Type)})if m.Type == pb.MsgVote {// Only record real votes.// 保存下来给哪个节点投票了r.electionElapsed = 0r.Vote = m.From}} else {// 否则拒绝投票r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)r.send(pb.Message{To: m.From, Type: voteRespMsgType(m.Type), Reject: true})}}-
}

让我们继续回到 Candidate 的视角,当他收到投票回复后,此时就会检查票数是否超过法定人数:如果超过则将身份转变为 Leader 并向所有节点广播,如果没有则说明选举失败,将身份变回 Follower。

https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/raft.gofunc stepCandidate(r *raft, m pb.Message) {var myVoteRespType pb.MessageTypeif r.state == StatePreCandidate {myVoteRespType = pb.MsgPreVoteResp} else {myVoteRespType = pb.MsgVoteResp}// 以下转换成follower状态时,为什么不判断消息的term是否至少大于当前节点的term???switch m.Type {//...case myVoteRespType:// 计算当前集群中有多少节点给自己投了票gr := r.poll(m.From, m.Type, !m.Reject)r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr)switch r.quorum() {case gr: // 如果进行投票的节点数量正好是半数以上节点数量if r.state == StatePreCandidate {r.campaign(campaignElection)} else {// 变成leaderr.becomeLeader()r.bcastAppend()}case len(r.votes) - gr:  // 如果是半数以上节点拒绝了投票// 变成followerr.becomeFollower(r.Term, None)}//...}
}

写入流程

func (n *node) Propose(ctx context.Context, data []byte) error {return n.step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
}

当要执行写入时,就会调用 node 中的 Propose 函数将写入请求封装到 MsgProp 消息中,发送给 step 处理。在前面也提到过,这个 step 其实是一个函数指针,会随着执行者的身份变化而指向不同的方法。

当接受消息的为 Follower 时

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/node.gofunc stepFollower(r *raft, m pb.Message) {switch m.Type {case pb.MsgProp:// 本节点提交的值if r.lead == None {// 没有leader则提交失败,忽略r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)return}// 向leader进行redirectm.To = r.leadr.send(m)//...
}

从上面可以看到,这里会将消息转发给集群中的 Leader,如果不存在 Leader 则提交失败。

当接受消息的为 Leader 时(此时不管是自己产生还是其他人转发的)

https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/raft.gotype stepFunc func(r *raft, m pb.Message)// leader的状态机
func stepLeader(r *raft, m pb.Message) {switch m.Type {case pb.MsgProp:// 不能提交空数据if len(m.Entries) == 0 {r.logger.Panicf("%x stepped empty MsgProp", r.id)}// 检查是否在集群中if _, ok := r.prs[r.id]; !ok {// 这里检查本节点是否还在集群以内,如果已经不在集群中了,不处理该消息直接返回。// 这种情况出现在本节点已经通过配置变化被移除出了集群的场景。return}// 当前正在转换leader过程中,不能提交if r.leadTransferee != None {r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)return}for i, e := range m.Entries {if e.Type == pb.EntryConfChange {if r.pendingConf {// 如果当前为还没有处理的配置变化请求,则其他配置变化的数据暂时忽略r.logger.Infof("propose conf %s ignored since pending unapplied configuration", e.String())// 将这个数据置空m.Entries[i] = pb.Entry{Type: pb.EntryNormal}}// 置位有还没有处理的配置变化数据r.pendingConf = true}}// 添加数据到log中r.appendEntry(m.Entries...)// 向集群其他节点广播append消息r.bcastAppend()return}//...
}

从上面可以看出,此时的逻辑主要就是以下几步:

  1. 检查自身情况(是否可用/是否在集群中)
  2. 将这个消息添加到自己的 log 中。
  3. 向集群其他的节点广播 append 消息。

此时我们再继续看其他节点接收到这个 append 消息后会如何工作

func stepFollower(r *raft, m pb.Message) {switch m.Type {//...case pb.MsgApp:// append消息// 收到leader的app消息,重置选举tick计时器,因为这样证明leader还存活r.electionElapsed = 0r.lead = m.Fromr.handleAppendEntries(m)//...
}

此时 Follower 节点会重置 tick 计时器,接着调用 handleAppendEntries 写入消息

func (r *raft) handleAppendEntries(m pb.Message) {// 先检查消息消息的合法性if m.Index < r.raftLog.committed {    // 传入的消息索引是已经commit过的索引// 返回commit日志索引r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})return}r.logger.Infof("%x -> %x index %d", m.From, r.id, m.Index)// 尝试添加到日志模块中if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {// 添加成功,返回的index是添加成功之后的最大indexr.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})} else {// 添加失败r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)// 添加失败的时候,返回的Index是传入过来的Index,RejectHint是该节点当前日志的最后索引r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})}
}

当 Follower 接收完这个 log 的时候,就会回复一个 MsgAppResp 消息。

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/raft.gofunc (r *raft) Step(m pb.Message) error {r.logger.Infof("from:%d, to:%d, type:%s, term:%d, state:%v", m.From, m.To, m.Type, r.Term, r.state)//...// Handle the message term, which may result in our stepping down to a follower.switch {//...case m.Term < r.Term:// 消息的Term小于节点自身的Term,同时消息类型是心跳消息或者是append消息if r.checkQuorum && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})} else {// 除了上面的情况以外,忽略任何term小于当前节点所在任期号的消息r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",r.id, r.Term, m.Type, m.From, m.Term)}return nil}
}

当 Follower 接受到消息后,并且消息的 Term 小于节点本身,同时集群正常时(活跃节点数> 法定人数),向 Leader 回复一个 MsgAppResp 消息,否则忽略。

//https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/raft.gofunc stepLeader(r *raft, m pb.Message) error {switch m.Type {case pb.MsgAppResp://...if r.maybeCommit() {r.bcastAppend()}//...}
}

当 Leader 接收到 MsgAppResp 消息时,其首先会调用 maybeCommit 检查当前是否可以提交,如果可以则向集群广播提交的消息。

下面我们来看看 maybeCommit 检查以及提交的逻辑

https://github.com/lichuang/etcd-3.1.10-codedump/blob/master/raft/raft.gofunc (r *raft) maybeCommit() bool {// TODO(bmizerany): optimize.. Currently naivemis := make(uint64Slice, 0, len(r.prs))// 拿到当前所有节点的Match到数组中for id := range r.prs {mis = append(mis, r.prs[id].Match)}// 逆序排列sort.Sort(sort.Reverse(mis))// 排列之后拿到中位数的Match,因为如果这个位置的Match对应的Term也等于当前的Term// 说明有过半的节点至少comit了mci这个索引的数据,这样leader就可以以这个索引进行commit了mci := mis[r.quorum()-1]// raft日志尝试commitreturn r.raftLog.maybeCommit(mci, r.Term)
}//https://github.com/lichuang/etcd-3.1.10-codedump/blob/8649774a10f5a73e55601cf895749a450bdd7eea/raft/log.go#L343func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {// 只有在传入的index大于当前commit索引,以及maxIndex对应的term与传入的term匹配时,才使用这些数据进行commitif maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {l.commitTo(maxIndex)return true}return false
}

到这里,写入的流程就全部结束了。可以看出,这里的整体思路与我们常说的两阶段提交非常相像。

etcd Raft 源码剖析相关推荐

  1. 彻底理解 Raft 共识算法及 etcd/raft 源码解析

    译者序 本文翻译自 USENIX 2014 论文 In Search of an Understandable Consensus Algorithm (Extended Version)[1] ,文 ...

  2. RocketMQ NameServer源码剖析

    概述 NameServer是一个简单的 Topic 路由注册中心,支持 Topic.Broker 的动态注册与发现. 主要包括两个功能: Broker管理 ,NameServer接受Broker集群的 ...

  3. K8s基础知识学习笔记及部分源码剖析

    K8s基础知识学习笔记及部分源码剖析 在学习b站黑马k8s视频资料的基础上,查阅了配套基础知识笔记和源码剖析,仅作个人学习和回顾使用. 参考资料: 概念 | Kubernetes 四层.七层负载均衡的 ...

  4. kubernetes源码剖析读后感(一)

    注:结合书中的大概内容以及笔者自身的k8s经验 总结学到的一些新知识每一篇篇幅不会很长 书很棒强烈推荐买一本读 本次读书来自于<kubernetes源码剖析> 作者郑东旭 第一章kuber ...

  5. Nacos注册中心CP架构Raft源码分析

    @toc[] 一.CAP介绍 二.Nacos如何设置CP.AP模式 我们使用nacos的时候,有一个关于节点类型的配置: cloud:nacos:discovery:server-addr: 192. ...

  6. kubernetes的api操作和kubectl的源码剖析

    1.kubernetes的api文档的网址: https://kubernetes.io/docs/concepts/overview/kubernetes-api/ 2.kubernetes的go语 ...

  7. 老李推荐:第14章4节《MonkeyRunner源码剖析》 HierarchyViewer实现原理-装备ViewServer-端口转发 1...

    老李推荐:第14章4节<MonkeyRunner源码剖析> HierarchyViewer实现原理-装备ViewServer-端口转发 在初始化HierarchyViewer的实例过程中, ...

  8. JS魔法堂:mmDeferred源码剖析

    一.前言 avalon.js的影响力愈发强劲,而作为子模块之一的mmDeferred必然成为异步调用模式学习之旅的又一站呢!本文将记录我对mmDeferred的认识,若有纰漏请各位指正,谢谢.项目请见 ...

  9. Kafka源码剖析 —— 网络I/O篇 —— 浅析KafkaSelector

    为什么80%的码农都做不了架构师?>>>    ##NioSelector和KafkaSelector有什么区别? 先说结论,KafkaSelector(org.apache.kaf ...

最新文章

  1. chemdraw怎么画拐弯的箭头_性感皮衣皮裤的质感服装该怎么画?
  2. 求数组最小数平均值和和值
  3. 独家专访 | 从跨国投行到开源社区,IBM Spark总工程师Nick Pentreath的传奇经历
  4. flash加载flv,本地测试正常,上传至空间则失败解决办法
  5. jvm内存溢出分析实践案例:javax.crypto.JceSecurity大量BouncyCastleProvider实例无法被回收
  6. Android SoundPool 的简单使用
  7. 2018年最值得关注的15大技术趋势,区块链将得到更广泛的应用
  8. 这不仅仅是html5的HTML5问题
  9. mysql 密码过期解决图例_MYSQL 密码过期解决办法
  10. keil兼容51单片机和arm
  11. Android Studio 嵌入X5WebView
  12. c语言罗盘,风水罗盘下载_风水罗盘手机免费最新版v1.0下载_hycdc游戏网
  13. 游戏音效是用什么软件做的?
  14. 说一说Qpython3在Android手机上的应用
  15. gitkraken点击Glo出现白屏的情况,回退回去的解决办法
  16. html中美元符号$转义字符是 #36;
  17. 三种梯度下降算法的比较和几种优化算法
  18. LA 3406 Bingo *
  19. 基于泰勒级数展开求余弦函数值
  20. C++基于QT的模仿宝石迷阵游戏源码

热门文章

  1. java jsch_java – 使用JSch的多个命令
  2. 基于TOMCAT的网页地址栏图标设置
  3. 我可爱的老妹 终于回来了
  4. Github md文件换行和标签转义
  5. 12月8日——培训第16天
  6. 作业1范文杰201421410010
  7. 1024程序员日,聊聊人们对程序员的那几个偏见
  8. pageHelper分页查询pageNo大于最大页数及小于一返回数据
  9. 【Android测试技巧】01. root后adb shell默认不是root用户时,如何将文件放入手机系统中
  10. 【连载】【FPGA黑金开发板】Verilog HDL那些事儿--VGA驱动(十)