女主宣言

今天小编为大家分享一篇关于Golang实现Raft的文章,本篇文章为系列中的第四篇,对Raft中通过添加持久性和一些优化来完成Raft的基本实现。希望能对大家有所帮助。

PS:丰富的一线技术、多元化的表现形式,尽在“360云计算”,点关注哦!

本篇文章为Raft系列文章中的第四篇,在该部分中,我们将通过添加持久性和一些优化来完成Raft的基本实现。

  • Go实现Raft第一篇:介绍

  • Go实现Raft第二篇:选举

  • Go实现Raft第三篇:命令和日志复制

1

持久化

像Raft这样的共识算法的目标是通过在隔离的服务器之间复制任务来创建一个比其各个部分具有更高可用性的系统。到目前为止,我们一直专注于网络分区的故障情况,其中群集中的某些服务器与其他服务器(或与客户端)断开连接。 失败的另一种模式是崩溃,其中服务器停止工作并重新启动。

对于其他服务器,它看起来像一个网络分区-服务器暂时断开连接,而对于崩溃的服务器本身,情况则大不相同,因为重新启动后,其所有内存状态都会丢失。

正是由于这个原因,Raft论文中的图2清楚地标记了哪个状态应该保持不变;持久状态将在每次更新时写入并刷新到持久化存储中。在服务器发出下一个RPC或答复正在进行的RPC之前,服务器必须保留的任何状态都将保留。

Raft只能通过保留其状态的子集来实现,即:

  • currentTerm - 此服务器观察到的最新任期

  • votedFor - 此服务器在最新任期为其投票的节点ID

  • log - Raft日志条目

2

命令传递语义

在Raft中,视不同情况,一个命令可以多次传递给客户端。有几种可能发生这种情况的场景,包括崩溃导致重新启动(再次重播日志时)。

就消息传递语义而言,Raft选择的是”至少一次”。提交命令后,它将最终复制到所有客户端,但是某些客户端可能多次看到同一命令。因此,建议命令带有唯一的ID,并且客户端应忽略已交付的命令。这在Raft论文中的第8节有更详细的描述。

3

存储接口

为了实现持久性,我们在代码中添加了以下接口:

type Storage interface {Set(key string, value []byte)Get(key string) ([]byte, bool)// HasData returns true iff any Sets were made on this Storage.HasData() bool
}

可以将它看作是一个映射,从字符串映射到一个由持久存储支持的通用字节切片。

4

恢复和保存状态

CM构造函数现在将接受一个 Storage 作为参数并调用:

if cm.storage.HasData() {cm.restoreFromStorage(cm.storage)
}

restoreFromStorage 方法也是新增。它从存储中加载持久状态变量,使用标准的 encoding/gob 包对它们进行反序列化:

func (cm *ConsensusModule) restoreFromStorage(storage Storage) {if termData, found := cm.storage.Get("currentTerm"); found {d := gob.NewDecoder(bytes.NewBuffer(termData))if err := d.Decode(&cm.currentTerm); err != nil {log.Fatal(err)}} else {log.Fatal("currentTerm not found in storage")}if votedData, found := cm.storage.Get("votedFor"); found {d := gob.NewDecoder(bytes.NewBuffer(votedData))if err := d.Decode(&cm.votedFor); err != nil {log.Fatal(err)}} else {log.Fatal("votedFor not found in storage")}if logData, found := cm.storage.Get("log"); found {d := gob.NewDecoder(bytes.NewBuffer(logData))if err := d.Decode(&cm.log); err != nil {log.Fatal(err)}} else {log.Fatal("log not found in storage")}
}

镜像方法为 persistToStorage - 将所有这些状态变量编码并保存到提供的 Storage 中:

func (cm *ConsensusModule) persistToStorage() {var termData bytes.Bufferif err := gob.NewEncoder(&termData).Encode(cm.currentTerm); err != nil {log.Fatal(err)}cm.storage.Set("currentTerm", termData.Bytes())var votedData bytes.Bufferif err := gob.NewEncoder(&votedData).Encode(cm.votedFor); err != nil {log.Fatal(err)}cm.storage.Set("votedFor", votedData.Bytes())var logData bytes.Bufferif err := gob.NewEncoder(&logData).Encode(cm.log); err != nil {log.Fatal(err)}cm.storage.Set("log", logData.Bytes())
}

我们只需在这些状态变量发生变化的每个点调用 pesistToStorage 来实现持久化。如果看一下第2部分中CM的代码与本部分之间的区别,会发现它们散布在少数地方。

当然,这不是实现持久性的最有效的方法,但是简单有效,所以足以满足我们的需要。效率最低的是保存整个日志,这在实际应用中可能很大。为了真正解决这个问题,Raft有一个日志压缩机制,该机制在本文的第7节中进行了描述。我们不打算实现压缩,但是可以将其作为练习添加到我们的实现中。

5

崩溃伸缩

实施持久性后,我们的Raft集群在一定程度上可以应对崩溃。只要集群中的少数节点崩溃并在以后的某个时间点重新启动,集群就将对客户端保持可用。具有2N + 1个服务器的Raft群集将容忍N台故障服务器,并且只要其他N + 1台服务器仍保持相互连接,便会保持可用。

如果查看此部分的测试,会注意到添加了许多新测试。崩溃伸缩可以测试更大范围的人为情况组合,本文中也对此进行了一定程度的描述。

6

不可靠的RPC交付

需要注意的另一个方面是不可靠的RPC交付。到目前为止,我们已经假设在连接的服务器之间发送的RPC可能到达目的地的时间很短。如果查看server.go,会注意到它使用了一种称为 RPCProxy 的类型来实现这些延迟。每个RPC都会延迟1-5毫秒,以模拟位于同一数据中心的节点的真实性。

RPCProxy让我们实现的另一件事是可选的不可靠交付。启用 RAFT_UNRELIABLE_RPC 环境变量后,RPC有时会明显延迟(延迟75毫秒)或完全中断。模拟了实际的网络故障。

我们可以在 RAFT_UNRELIABLE_RPC 开启的情况下重新运行所有测试,并观察Raft群集在出现这些故障时的行为。如果有兴趣,可以尝试调整 RPCProxy,不仅让RPC请求延迟,还可以让RPC答复延迟。

7

优化发送AppendEntries

正如在第2部分中简要提到的,当前的领导者执行效率很低。领导者在 LeaderSendHeartbeats 中发送AE,定时器每隔50毫秒调用一次。假设提交了一条新命令;领导者将等到下一个50毫秒的边界,而不是立即通知跟随者。更糟的是,因为需要两次AE往返来通知跟随者命令已提交。如图:

在时间(1),领导者将心跳AE发送给跟随者,并在几毫秒内获得响应。例如,在35毫秒后提交了新命令。领导者一直等到下一个50毫秒边界(2)才将更新的日志发送给跟随者。跟随者答复该命令已成功添加到日志(3)。此时,领导者已经提高了提交索引(假设它获得了多数),可以立即通知跟随者,但是它一直等到下一个50毫秒边界(4)为止。最后,当跟随者收到更新的 leaderCommit时,它可以将新提交的命令通知其自己的客户端。

在领导者的 Submit(X) 和跟随者的 commitChan <-X 之间经过的大部分时间对于实现来讲都是不必要的。

真正想要的是使序列看起来像这样:

看一下实现的新部分,从startLeader开始。

func (cm *ConsensusModule) startLeader() {cm.state = Leaderfor _, peerId := range cm.peerIds {cm.nextIndex[peerId] = len(cm.log)cm.matchIndex[peerId] = -1}cm.dlog("becomes Leader; term=%d, nextIndex=%v, matchIndex=%v; log=%v", cm.currentTerm, cm.nextIndex, cm.matchIndex, cm.log)// This goroutine runs in the background and sends AEs to peers:// * Whenever something is sent on triggerAEChan// * ... Or every 50 ms, if no events occur on triggerAEChango func(heartbeatTimeout time.Duration) {// Immediately send AEs to peers.cm.leaderSendAEs()t := time.NewTimer(heartbeatTimeout)defer t.Stop()for {doSend := falseselect {case <-t.C:doSend = true// Reset timer to fire again after heartbeatTimeout.t.Stop()t.Reset(heartbeatTimeout)case _, ok := <-cm.triggerAEChan:if ok {doSend = true} else {return}// Reset timer for heartbeatTimeout.if !t.Stop() {<-t.C}t.Reset(heartbeatTimeout)}if doSend {cm.mu.Lock()if cm.state != Leader {cm.mu.Unlock()return}cm.mu.Unlock()cm.leaderSendAEs()}}}(50 * time.Millisecond)
}

不仅要等待50 ms的计时,startLeader中的循环还要等待两个可能的事件之一:

  • 在cm.triggerAEChan上发送

  • 计时器计数50毫秒

我们将很快看到触发 cm.triggerAEChan 的原因。这是现在应该发送AE的信号。每当触发通道时,计时器都会重置,并执行心跳逻辑-如果领导者没有新的要报告的内容,则最多等待50毫秒。

还要注意,实际发送AE的方法已从 leaderSendHeartbeats 重命名为 leaderSendAE,可以更好地在新代码中反映其目的。

我们所期望的,触发cm.triggerAEChan的方法之一是Submit:

func (cm *ConsensusModule) Submit(command interface{}) bool {cm.mu.Lock()cm.dlog("Submit received by %v: %v", cm.state, command)if cm.state == Leader {cm.log = append(cm.log, LogEntry{Command: command, Term: cm.currentTerm})cm.persistToStorage()cm.dlog("... log=%v", cm.log)cm.mu.Unlock()cm.triggerAEChan <- struct{}{}return true}cm.mu.Unlock()return false
}

修改成:

  • 每当提交新命令时,都会调用cm.persistToStorage来保留新的日志条目。

  • 一个空结构在 cm.triggerAEChan 上发送。将通知领导者goroutine中的循环。

  • 锁定处理将重新排序;在发送cm.triggerAEChan时不想保持锁定,因为在某些情况下可能导致死锁。

在领导者中处理AE答复并推进提交索引的代码中 cm.triggerAEChan 将被通知。

if cm.commitIndex != savedCommitIndex {cm.dlog("leader sets commitIndex := %d", cm.commitIndex)// Commit index changed: the leader considers new entries to be// committed. Send new entries on the commit channel to this// leader's clients, and notify followers by sending them AEs.cm.newCommitReadyChan <- struct{}{}cm.triggerAEChan <- struct{}{}
}

这个优化很重要,它使实现比以前对新命令的响应速度更快。

8

批量处理命令提交

现在,每次调用 Submit 都会触发很多活动 - 领导者立即向所有跟随者广播RPC。如果想一次提交多个命令,连接Raft群集的网络可能会被RPC淹没。

尽管它看起来效率低,但是安全。Raft的RPC都是幂等的,也就是说多次获得具有基本相同信息的RPC不会造成任何危害。

如果担心一次要频繁提交许多命令时的网络流量,那么批处理应该很容易实现。最简单的方法是提供一种将整个命令片段传递到Submit的方法。这样Raft实现中的代码改动会很小,并且客户端将能够提交整个命令组,而不会产生太多的RPC通信。有兴趣的可以尝试一下!

9

总结

到此,我们结束了有关Raft分布式共识算法的系列文章就结束了。

如果对文章或代码有任何疑问或意见,可以留言。

如果有兴趣学习成熟的Go实现的Raft项目代码,可以参考:

https://github.com/etcd-io/etcd/tree/master/raft 是etcd的Raft部分,它是一个分布式键值数据库。

https://github.com/hashicorp/raft 是一个独立的Raft共识模块,可以绑定到不同的客户端。

360云计算

由360云平台团队打造的技术分享公众号,内容涉及数据库、大数据、微服务、容器、AIOps、IoT等众多技术领域,通过夯实的技术积累和丰富的一线实战经验,为你带来最有料的技术分享

Go实现Raft第四篇:持久化和调优相关推荐

  1. Linux 调优篇:虚拟化调优(hugepage 大页内存)* 叁

    一. 大页(HugePages)概念     Hugepage的引入 二. hugepages相关概念 三.Regular Pages 与 HugePages     a.Regular Pages ...

  2. spark算子_Spark 性能优化(四)——程序开发调优

    1.4 程序开发调优 Spark 性能优化的第一步,就是要在开发 Spark 作业的过程中注意和应用一些性能优化的基本原则.开发调优,就是要让大家了解以下一些 Spark 基本开发原则,包括:RDD ...

  3. 【Spark篇】---Spark调优之代码调优,数据本地化调优,内存调优,SparkShuffle调优,Executor的堆外内存调优...

    一.前述 Spark中调优大致分为以下几种 ,代码调优,数据本地化,内存调优,SparkShuffle调优,调节Executor的堆外内存. 二.具体    1.代码调优 1.避免创建重复的RDD,尽 ...

  4. Redis系列-第四篇持久化与事务

    一.持久化 Redis是一个内存数据库,为了保证数据的持久性,它提供了两种持久化方案: RDB方式(默认) AOF方式 持久化功能有效地避免因进程退出造成的数据丢失问题, 当下次重启时利用之前持久化的 ...

  5. linux 调优篇 :硬件调优(BIOS配置)* 壹

    一. 设置内存刷新频率为Auto 二. 开启NUMA 三. 设置Stream Write Mode 四. 开启CPU预取配置 五. 开启SRIOV 六. 开启SMMU 通过在BIOS中设置一些高级选项 ...

  6. 【十四】jvm 性能调优实例

    实例1: POI Excel 导出 Excel对象很大,多人同时登录系统导出Excel的话,就会有多个大Excel对象到老年代,这是老年代需要回收,系统可能会卡顿. jvm堆内存设置的越大,Full ...

  7. mysql数据库调优 面试_面试-MySQL篇:数据库调优

    今天谈谈 MySQL 的调优问题,比较偏项目实战.面试中也喜欢问 "xxx如何优化"的问题,这类问题相对开放,一不小心就变成了送命题,但是风险与机遇并存,假如你能理论结合项目经验给 ...

  8. Spark性能调优-RDD算子调优篇

    Spark性能调优-RDD算子调优篇 RDD算子调优 1. RDD复用 在对RDD进行算子时,要避免相同的算子和计算逻辑之下对RDD进行重复的计算,如下图所示: 对上图中的RDD计算架构进行修改,得到 ...

  9. Spark性能优化:Shuffle调优篇

    Spark性能优化:Shuffle调优篇 一.调优概述 大多数Spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘IO.序列化.网络数据传输等操作.因此,如果要让作业的性 ...

最新文章

  1. jqgrid横向滚动条
  2. eclipse html自动编译,eclipse不自动编译解决方法 不编译要怎么办
  3. html标签名缩写与英文全称对照表
  4. db2 最大分区数_db2 查询表分区数据库
  5. linux php执行ci框架,PHP CI框架学习之路径访问
  6. 阶段3 1.Mybatis_07.Mybatis的连接池及事务_4 mybatis中使用unpooled配置连接池的原理分析...
  7. Unity中解析Excel表格工具
  8. Java ResourceBundle 加载外部路径资源文件方式
  9. 科特斯matlab求积公式,牛顿科特斯求积公式.ppt
  10. android+cordova+windows打包vue一条龙服务
  11. android日记app常用,只是意外 - 用这些 APP 来记录生活,再也不用担心无法坚持写日记 - Android 应用 - 【最美应用】...
  12. 空降项目经理,该如何服众?
  13. 利用吉洪若夫正则化及其西尔韦斯特方程来修复受损图像
  14. 【解决方法】iOS 开发小技巧(一)
  15. 在Swift中使用dispatch_once单例模型
  16. SAP中利用标准成本报表计算成品人工成本及组成实例
  17. 通过iis启动服务,会产生C:/inetpub/logs/logsFile产生大量的日志,定期清理
  18. win7与internet时间同步出错_win7系统同步internet时间总提示“同步时出错”的解决方法...
  19. 软键盘实例 request参数修改
  20. 3分钟带你了解微信小程序开发

热门文章

  1. 唯真才能永久--读《十年》
  2. 搭建开发环境tomcat起不来
  3. idea 提示 Cannot resolve symbol ‘log‘解决
  4. prometheus常用语法
  5. Springboot应用中线程池配置教程(2021版)
  6. python映射的主要特点_Python入门 4——字典及其映射
  7. C# 隐藏TabControl头
  8. 计算机电源插头有哪几种,盘点电连接器常见的使用类型
  9. 9个元素换6次达到排序序列_排序总结:二大种,六小种排序方式
  10. python调用bat_python windows 远程执行bat