女主宣言

今天小编为大家分享一篇关于Golang实现Raft的文章,本篇文章为系列中的第三篇,对Raft中的命令和日志复制进行介绍并使用go进行实现。希望能对大家有所帮助。

PS:丰富的一线技术、多元化的表现形式,尽在“360云计算”,点关注哦!

本篇文章为Raft系列文章中的第三篇,命令和日志复制。在该篇中,我们将增强Raft的相关功能实现,实际的处理客户端提交的命令并在Raft集群中进行复制。

1

客户端交互

在第一篇文章中我们简要讨论了客户端交互,如果不清晰建议可以再回顾一下。这里我们先不关注客户端如何找到领导者,将重点讨论当找到一个领导者时会发生什么。

  1. 首先,客户端将命令提交给领导者。在Raft集群中,命令通常只提交给单个节点。

  2. 领导者将命令复制到其跟随者。

  3. 最后,如果大多数集群节点都承认在其日志中有该命令,该命令将被提交,并向所有客户端通知新的提交。

注意提交和提交命令之间的不对称性 - 在检查我们即将讨论的实现决策时,这一点很重要。命令被提交到单个Raft节点,但是多个节点(特别是所有已连接/活动的节点啊)会在一段时间后将其提交并通知其客户端。

回顾此图:

状态机代表使用Raft进行复制的任意服务。

然后我们在Raft ConsensusModule模块的上下文中讨论客户端,我们通常指的是此服务,因为这是将提交报告到的地方。换句话说,从Consensus模块到服务状态机的黑色箭头就是该通知。

2

实现:提交管道

在我们的实现中,当一个 ConsensusModule 被创建时,它接受一个提交管道 - 一个用来向调用者发送提交命令的通道:commitChan chan<-CommitEntry。定义如下:

// CommitEntry is the data reported by Raft to the commit channel. Each commit
// entry notifies the client that consensus was reached on a command and it can
// be applied to the client's state machine.
type CommitEntry struct {// Command is the client command being committed.Command interface{}// Index is the log index at which the client command is committed.Index int// Term is the Raft term at which the client command is committed.Term int
}

使用通道是一种设计选择,但不是唯一方式。也可以改用回调。创建ConsensusModule时,调用者将注册一个回调函数,只要有要提交的命令,就会调用该回调函数。

在实现通道上发送条目的功能之前。我们需要先讨论Raft服务器如何复制命令并确定命令是否已提交。

3

Raft日志

在文章中多次提到Raft日志,但还没有详细介绍。日志只是应该应用于状态机的线性命令序列;如果有需要,日志应该足以从某个开始状态“重放”状态机。在正常运行期间,所有Raft节点的日志都是相同的;当领导者收到新命令时,将其存放在自己的日志中,然后复制到跟随者。跟随者将命令放在日志中,并确认给领导者,领导者将保留已安全复制到群集中大多数服务器的最新日志索引的计数。

每个框都是一个日志条目;框顶部的数字是将其添加到日志中的任期。底部是此日志包含的键值命令。每个日志条目都有一个线性索引。框的颜色是任期的另一种表示形式。

如果将此日志应用于空键值存储,则最终结果将具有值x = 4,y = 7。

在我们的实现中,日志条目由以下形式表示:

type LogEntry struct {Command interface{}Term    int
}

每个ConsensusModule的日志都只是log []LogEntry。用户端通常不在乎任期。任期对Raft的正确性至关重要,在阅读代码时务必牢记。

4

提交新的命令

新的Submit方法,使客户端可以提交新命令:

func (cm *ConsensusModule) Submit(command interface{}) bool {cm.mu.Lock()defer cm.mu.Unlock()cm.dlog("Submit received by %v: %v", cm.state, command)if cm.state == Leader {cm.log = append(cm.log, LogEntry{Command: command, Term: cm.currentTerm})cm.dlog("... log=%v", cm.log)return true}return false
}

很简单,如果此CM是领导者,则将新命令附加到日志中并返回true。否则,将被忽略并返回false。

问:“提交”返回的真实值是否表明客户端已向领导者提交了命令?

答:在极少数情况下,领导者可能会与其他Raft服务器分开,而后者在一段时间后会继续选举新的领导者。但是,客户可能仍在与旧的领导者通信。客户端应等待一段合理的时间,以使其提交的命令出现在提交通道上;如果不是,则表示它联系了错误的领导者,应与其他领导者重试。

5

复制日志条目

我们看到,提交给领导者的新命令被添加到日志的末尾。这个新命令如何到达跟随者?领导者遵循的步骤在Raft论文中进行了精确描述。我们在 leaderSendHeartbeats 中完成实现。

func (cm *ConsensusModule) leaderSendHeartbeats() {cm.mu.Lock()savedCurrentTerm := cm.currentTermcm.mu.Unlock()for _, peerId := range cm.peerIds {go func(peerId int) {cm.mu.Lock()ni := cm.nextIndex[peerId]prevLogIndex := ni - 1prevLogTerm := -1if prevLogIndex >= 0 {prevLogTerm = cm.log[prevLogIndex].Term}entries := cm.log[ni:]args := AppendEntriesArgs{Term:         savedCurrentTerm,LeaderId:     cm.id,PrevLogIndex: prevLogIndex,PrevLogTerm:  prevLogTerm,Entries:      entries,LeaderCommit: cm.commitIndex,}cm.mu.Unlock()cm.dlog("sending AppendEntries to %v: ni=%d, args=%+v", peerId, ni, args)var reply AppendEntriesReplyif err := cm.server.Call(peerId, "ConsensusModule.AppendEntries", args, &reply); err == nil {cm.mu.Lock()defer cm.mu.Unlock()if reply.Term > savedCurrentTerm {cm.dlog("term out of date in heartbeat reply")cm.becomeFollower(reply.Term)return}if cm.state == Leader && savedCurrentTerm == reply.Term {if reply.Success {cm.nextIndex[peerId] = ni + len(entries)cm.matchIndex[peerId] = cm.nextIndex[peerId] - 1cm.dlog("AppendEntries reply from %d success: nextIndex := %v, matchIndex := %v", peerId, cm.nextIndex, cm.matchIndex)savedCommitIndex := cm.commitIndexfor i := cm.commitIndex + 1; i < len(cm.log); i++ {if cm.log[i].Term == cm.currentTerm {matchCount := 1for _, peerId := range cm.peerIds {if cm.matchIndex[peerId] >= i {matchCount++}}if matchCount*2 > len(cm.peerIds)+1 {cm.commitIndex = i}}}if cm.commitIndex != savedCommitIndex {cm.dlog("leader sets commitIndex := %d", cm.commitIndex)cm.newCommitReadyChan <- struct{}{}}} else {cm.nextIndex[peerId] = ni - 1cm.dlog("AppendEntries reply from %d !success: nextIndex := %d", peerId, ni-1)}}}}(peerId)}
}

这比我们在上一部分中所做的要复杂得多,但实际上它仅遵循本文的图2。关于此代码的一些注意事项:

  • 现在已完全填充了AE RPC的字段:有关其含义,请参见本文中的图2。

  • AE响应有一个 success 字段,该字段告诉领导者跟随者是否看到prevLogIndex 和 prevLogTerm 匹配。领导者基于此字段更新此跟随者的nextIndex。

  • commitIndex 根据复制特定日志索引的关注者的数量进行更新。如果索引被多数复制,则 commitIndex 前进到该索引。

与我们之前讨论的用户端交互有关,这部分代码特别重要:

if cm.commitIndex != savedCommitIndex {cm.dlog("leader sets commitIndex := %d", cm.commitIndex)cm.newCommitReadyChan <- struct{}{}
}

newCommitReadyChan 是CM内部使用的通道,用于指示已准备好将新条目通过提交通道发送到客户端。它由在CM启动时在goroutine中运行的以下方法起作用:

func (cm *ConsensusModule) commitChanSender() {for range cm.newCommitReadyChan {// Find which entries we have to apply.cm.mu.Lock()savedTerm := cm.currentTermsavedLastApplied := cm.lastAppliedvar entries []LogEntryif cm.commitIndex > cm.lastApplied {entries = cm.log[cm.lastApplied+1 : cm.commitIndex+1]cm.lastApplied = cm.commitIndex}cm.mu.Unlock()cm.dlog("commitChanSender entries=%v, savedLastApplied=%d", entries, savedLastApplied)for i, entry := range entries {cm.commitChan <- CommitEntry{Command: entry.Command,Index:   savedLastApplied + i + 1,Term:    savedTerm,}}}cm.dlog("commitChanSender done")
}

此方法更新 lastApplied 状态变量以确定哪些条目已经发送到客户端,并且仅发送新条目。

6

更新跟随者的日志

我们已经看到了领导者如何处理新的日志条目。现在介绍跟随者的代码实现。特别是 AppendEntries RPC。

func (cm *ConsensusModule) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error {cm.mu.Lock()defer cm.mu.Unlock()if cm.state == Dead {return nil}cm.dlog("AppendEntries: %+v", args)if args.Term > cm.currentTerm {cm.dlog("... term out of date in AppendEntries")cm.becomeFollower(args.Term)}reply.Success = falseif args.Term == cm.currentTerm {if cm.state != Follower {cm.becomeFollower(args.Term)}cm.electionResetEvent = time.Now()// Does our log contain an entry at PrevLogIndex whose term matches// PrevLogTerm? Note that in the extreme case of PrevLogIndex=-1 this is// vacuously true.if args.PrevLogIndex == -1 ||(args.PrevLogIndex < len(cm.log) && args.PrevLogTerm == cm.log[args.PrevLogIndex].Term) {reply.Success = true// Find an insertion point - where there's a term mismatch between// the existing log starting at PrevLogIndex+1 and the new entries sent// in the RPC.logInsertIndex := args.PrevLogIndex + 1newEntriesIndex := 0for {if logInsertIndex >= len(cm.log) || newEntriesIndex >= len(args.Entries) {break}if cm.log[logInsertIndex].Term != args.Entries[newEntriesIndex].Term {break}logInsertIndex++newEntriesIndex++}// At the end of this loop:// - logInsertIndex points at the end of the log, or an index where the//   term mismatches with an entry from the leader// - newEntriesIndex points at the end of Entries, or an index where the//   term mismatches with the corresponding log entryif newEntriesIndex < len(args.Entries) {cm.dlog("... inserting entries %v from index %d", args.Entries[newEntriesIndex:], logInsertIndex)cm.log = append(cm.log[:logInsertIndex], args.Entries[newEntriesIndex:]...)cm.dlog("... log is now: %v", cm.log)}// Set commit index.if args.LeaderCommit > cm.commitIndex {cm.commitIndex = intMin(args.LeaderCommit, len(cm.log)-1)cm.dlog("... setting commitIndex=%d", cm.commitIndex)cm.newCommitReadyChan <- struct{}{}}}}reply.Term = cm.currentTermcm.dlog("AppendEntries reply: %+v", *reply)return nil
}

当注意到领导者的 LeaderCommit 大于其自己的 cm.commitIndex 时,跟随者知道领导者考虑提交额外的条目时,将在 ch.newCommitReadyChan 上发送。

当领导者使用AE发送新的日志条目时,将发生以下情况:

  • 跟随者将新条目追加到其日志中,并向领导者回复success = true。

  • 结果,领导者为此跟随者更新其matchIndex。当足够的跟随者的下一个索引具有matchIndex时,领导者将更新commitIndex并将其发送给下一个AE中的所有跟随者(在leaderCommit字段中)。

  • 当跟随者收到新的LeaderCommit消息时,已经知道提交了新的日志条目,并且可以通过提交通道将其发送给其用户端。

7

选举安全

目前为止,我们已经研究了添加的新代码以支持日志复制。但是,日志也会影响Raft的选举。Raft使用选举程序来防止候选人赢得选举,除非其日志至少与集群中大多数节点的日志一样。

因此,RV包含lastLogIndex和lastLogTerm字段。当候选人发出RV时,将使用有关其最后一个日志条目的信息填充这些RV。跟随者将这些字段与自己的字段进行比较,并确定候选人是否是最新的才可以被选举。

func (cm *ConsensusModule) startElection() {cm.state = Candidatecm.currentTerm += 1savedCurrentTerm := cm.currentTermcm.electionResetEvent = time.Now()cm.votedFor = cm.idcm.dlog("becomes Candidate (currentTerm=%d); log=%v", savedCurrentTerm, cm.log)var votesReceived int32 = 1// Send RequestVote RPCs to all other servers concurrently.for _, peerId := range cm.peerIds {go func(peerId int) {cm.mu.Lock()savedLastLogIndex, savedLastLogTerm := cm.lastLogIndexAndTerm()cm.mu.Unlock()args := RequestVoteArgs{Term:         savedCurrentTerm,CandidateId:  cm.id,LastLogIndex: savedLastLogIndex,LastLogTerm:  savedLastLogTerm,}cm.dlog("sending RequestVote to %d: %+v", peerId, args)var reply RequestVoteReplyif err := cm.server.Call(peerId, "ConsensusModule.RequestVote", args, &reply); err == nil {cm.mu.Lock()defer cm.mu.Unlock()cm.dlog("received RequestVoteReply %+v", reply)if cm.state != Candidate {cm.dlog("while waiting for reply, state = %v", cm.state)return}if reply.Term > savedCurrentTerm {cm.dlog("term out of date in RequestVoteReply")cm.becomeFollower(reply.Term)return} else if reply.Term == savedCurrentTerm {if reply.VoteGranted {votes := int(atomic.AddInt32(&votesReceived, 1))if votes*2 > len(cm.peerIds)+1 {// Won the election!cm.dlog("wins election with %d votes", votes)cm.startLeader()return}}}}}(peerId)}// Run another election timer, in case this election is not successful.go cm.runElectionTimer()
}

lastLogIndexAndTerm是一个新的帮助器方法:

// lastLogIndexAndTerm returns the last log index and the last log entry's term
// (or -1 if there's no log) for this server.
// Expects cm.mu to be locked.
func (cm *ConsensusModule) lastLogIndexAndTerm() (int, int) {if len(cm.log) > 0 {lastIndex := len(cm.log) - 1return lastIndex, cm.log[lastIndex].Term} else {return -1, -1}
}

我们的实现是基于0的索引,而不是基于1的Raft索引。因此-1经常作为一个标记值。

这是一个更新的RV处理程序,实现选举安全检查:

func (cm *ConsensusModule) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {cm.mu.Lock()defer cm.mu.Unlock()if cm.state == Dead {return nil}lastLogIndex, lastLogTerm := cm.lastLogIndexAndTerm()cm.dlog("RequestVote: %+v [currentTerm=%d, votedFor=%d, log index/term=(%d, %d)]", args, cm.currentTerm, cm.votedFor, lastLogIndex, lastLogTerm)if args.Term > cm.currentTerm {cm.dlog("... term out of date in RequestVote")cm.becomeFollower(args.Term)}if cm.currentTerm == args.Term &&(cm.votedFor == -1 || cm.votedFor == args.CandidateId) &&(args.LastLogTerm > lastLogTerm ||(args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)) {reply.VoteGranted = truecm.votedFor = args.CandidateIdcm.electionResetEvent = time.Now()} else {reply.VoteGranted = false}reply.Term = cm.currentTermcm.dlog("... RequestVote reply: %+v", reply)return nil
}

8

下一步

在目前的Raft实现中,有一个问题是没有进行持久化操作。如果服务器故障重启,将会造成信息丢失。为此,我们将在下一部分增加持久化操作,以及对本篇中部分功能进行优化。敬请关注!

Raft参考:https://raft.github.io/raft.pdf

代码参考:https://github.com/eliben/raft

360云计算

由360云平台团队打造的技术分享公众号,内容涉及数据库、大数据、微服务、容器、AIOps、IoT等众多技术领域,通过夯实的技术积累和丰富的一线实战经验,为你带来最有料的技术分享

Go实现Raft第三篇:命令和日志复制相关推荐

  1. Raft算法的Leader选举和日志复制过程

    Raft 简介 Raft 是一种为了管理复制日志的一致性算法.它提供了和 Paxos 算法相同的功能和性能,但是它的算法结构和 Paxos 不同,使得Raft 算法更加容易理解并且更容易构建实际的系统 ...

  2. Go实现Raft第四篇:持久化和调优

    女主宣言 今天小编为大家分享一篇关于Golang实现Raft的文章,本篇文章为系列中的第四篇,对Raft中通过添加持久性和一些优化来完成Raft的基本实现.希望能对大家有所帮助. PS:丰富的一线技术 ...

  3. Android日志[进阶篇]三-Logcat 命令行工具

    Android日志[进阶篇]一-使用 Logcat 写入和查看日志 Android日志[进阶篇]二-分析堆栈轨迹(调试和外部堆栈) Android日志[进阶篇]三-Logcat命令行工具 Androi ...

  4. ARM开发工具软件命令详解---嵌入式回归第三篇

    先从bootloader开始,因为暂时目前这些都会是裸机程序相关! 本人这里是VMwarm10.0上安装的红帽linux虚拟机.从下面的截图中可以看出 裸机开发流程: 这里先做第三步(第一步第二步已提 ...

  5. Paxos第三篇 - Paxos成员组变更

    本文是Paxos三部曲的第三篇,在前一篇文章  Paxos第二篇 - 使用Multi-Paxos协议的日志同步与恢复  中,我们讨论了基于Multi-Paxos协议的日志同步方案,在这个方案中,我们有 ...

  6. hadoop作业初始化过程详解(源码分析第三篇)

    (一)概述 我们在上一篇blog已经详细的分析了一个作业从用户输入提交命令到到达JobTracker之前的各个过程.在作业到达JobTracker之后初始化之前,JobTracker会通过submit ...

  7. 初学Python——文件操作第三篇

    一.引言 什么?有了第二篇文件操作还不够?远远不够!而且在读完第三篇文件操作还是不够.关于文件的操作,后续的学习中将不断学习新的操作方式,使用更加合适的方法. 进入正题,上一篇讲到,Python对文件 ...

  8. 分析RAC下一个SPFILE整合的三篇文章的文件更改

    大约RAC下一个spfile分析_整理在_2014.4.17 说明:文章来源于网络 第一篇:RAC下SPFILE文件改动 在RAC下spfile位置的改动与单节点环境不全然一致,有些地方须要特别注意, ...

  9. 《Head First Java》的思考总结:第三篇

    前言: 今天要分享的是关于 <Head First Java>这本书的读后感,这本书有点小厚差不多有七百页左右,所以我花了几乎整个国庆的时间去阅读,学习.读完之后发现,受益颇多.正如书名所 ...

最新文章

  1. Unity创建登录页面(2)
  2. [爬虫]通过url获取连接地址中的数据
  3. opencv 直线检测 java_OpenCV实现图像的直线检测
  4. java中的void是什么?有什么作用?
  5. 用java程序完成从kafka队列读取消息到sparkstreaming再从sparkstreaming里把数据导入mysql中
  6. dotNetCore操作Redis(含CentOS7哨兵模式部署)
  7. 利用Azure backup备份和恢复Azure虚拟机(1)
  8. 魅蓝2 刷 android,魅蓝2全系列-解锁BootLoader完整版+刷入第三方recovery+刷入第三方ROM教程...
  9. 【Clickhosue】MySQL 没有主键导致CK不可用 The db.scene cannot be materialized, because there is no primary keys
  10. python打印多个变量名_如何在Python中打印单个和多个变量?
  11. EasyNVR摄像机网页无插件直播方案H5前端构建之:使用BootstrapPagination以分页形式展示数据信息...
  12. Typora 下载方法(windows/ linux)
  13. Java之HTTP长连接
  14. 区块链网络管理平台WeBASE双节点可视化部署
  15. B2B企业越早做网络营销会有哪些优势 由上海添力张进老师讲解
  16. 关于流浪狗社会现状的调查报告
  17. OpenGL学习笔记--配置VS环境
  18. 使用Mailgun Store():应用程序传入电子邮件的临时邮箱
  19. 用人工智能做广告,它成为第一家走上IPO的人工智能企业
  20. 基于python的停车场管理系统的设计与实现/智能停车管理系统

热门文章

  1. Windows PowerShell 语言快速参考
  2. ServletContext对象、ServletConfig对象
  3. AQS(AbstractQuenedSynchronizer)详解
  4. ❤️万字总结八大排序:冒泡排序,选择排序,插入排序,堆排序,希尔排序,归并排序,计数排序❤️
  5. html定位 浏览器兼容,IE6浏览器不支持固定定位(position:fixed)解决方案
  6. mysql exp 注入_使用exp进行SQL报错注入
  7. JVM结构与OOM问题分析
  8. 服务器iis配置 所需文件,iis服务器配置手册.pdf
  9. java中位操作_Java中使用位操作的几个小技巧
  10. MySQL无法启动服务器(1067)