前言

在分布式系统中,节点的增删是常见也是必须的操作,对于实用的共识算法Raft自然也提供了关于节点变更的理论基础。在Raft算法中一次变更一个节点是天然支持的,如果一次涉及多个节点的变更,对于一个稳定的系统来说就具有一定风险。例如,在前面的介绍leader选举时,leader的选举依赖集群中的大多数,如果改变了这个大多数,就会导致leader在选举出现问题。如下图(图来源于这里)所示,当新配置占据大多数,而原leader在老配置中就可能会产生新两个leader。因而本文基于Raft的理论和etcd/raft时间,探讨学习Raft的成员变更。本文主要尝试解决的问题:

  1. Raft提供的方案为什么能够解决多成员变更带来的不一致问题?
  2. etxd/raft的是如何实现成员变更的问题?

理论基础

Raft在成员变更提出联合共识方案来解决成员添加的问题。联合共识有如下要求:

  1. 日志条目会复制到两个配置的所有服务节点;
  2. 任何一个配置的任何服务节点都可以成为leader;
  3. 一致(用来选举和日志条目提交)需要在旧配置和新配置中的大多数分别达成。

    如图所示,这个配置更改过程分为两个步骤,当有收到新的配置时,首先会生成的新老联合配置,这个配置在未提交以前老配置可以独立决策,只要这个配置提交,就需要新老配置共同决策,也就是说需要两份配置中的各自的大多数,这时候就会生成新的配置,这个配置提交后,新配置才可以独立做决策。
    简要分析: 这种方式为什么能够避免数据不一致,首先数据不一致一般是集群中产生多个leader,正常情况下,只要在任意时刻避免多个leader产生就可以达到数据的一致性要求。而发生多个leader主要是当新节点的加入影响了大多数这个成为leader的条件。例如在当前配置中我们有【A,B,C】三个节点,现在新加入四个节点【D,E,F,G】,如果不采用联合配置,那么【D,E,F,G】就可能在收到leader的心跳之前选出新的leader,集群中就会出现两个leader。如果采用联合配置,就需要两个配置中的大多数,然而在老配置中【A,B,C】就会拒绝选举,这样【D,E,F,G】选举leader就不会成功,因为没有达到老配置的大多数。只要保证了在新配置应用之前没有发生多个leader的分化,配置更新就是安全地。只要在节点在等待主节点发送心跳之前没有选举leader成功,这样集群就算更新配置成功了。

etcd/raft 实现

etcd在实现中给出了两个方案:one by one 和联合共识方案。本文跟着上报消息流程来看看一次变更经历哪些步骤。
对于消息上报,etcd/Raft提供了两个方法在Node接口中

 // Propose proposes that data be appended to the log. Note that proposals can be lost without// notice, therefore it is user's job to ensure proposal retries.Propose(ctx context.Context, data []byte) error// ProposeConfChange proposes a configuration change. Like any proposal, the// configuration change may be dropped with or without an error being// returned. In particular, configuration changes are dropped unless the// leader has certainty that there is no prior unapplied configuration// change in its log.//// The method accepts either a pb.ConfChange (deprecated) or pb.ConfChangeV2// message. The latter allows arbitrary configuration changes via joint// consensus, notably including replacing a voter. Passing a ConfChangeV2// message is only allowed if all Nodes participating in the cluster run a// version of this library aware of the V2 API. See pb.ConfChangeV2 for// usage details and semantics.ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error

这两个方法都可以上报成员变更(confchange)的消息,我们这里看看Propose方法,经过Propose的方法最后都会来到stepLeader的处理阶段,也就是只有leader节点才能够处理上报消息的权力。这里我们着重看看处理配置改变的工作,EntryConfChange和EntryConfChangeV2分别代表第一个版本和第二个版本的配置变更,第二种主要是增加联合配置。

if e.Type == pb.EntryConfChange {var ccc pb.ConfChangeif err := ccc.Unmarshal(e.Data); err != nil {panic(err)}cc = ccc} else if e.Type == pb.EntryConfChangeV2 {var ccc pb.ConfChangeV2if err := ccc.Unmarshal(e.Data); err != nil {panic(err)}cc = ccc}

在广播消息之前,需要先做几件事情:

if cc != nil {alreadyPending := r.pendingConfIndex > r.raftLog.appliedalreadyJoint := len(r.prs.Config.Voters[1]) > 0wantsLeaveJoint := len(cc.AsV2().Changes) == 0var refused stringif alreadyPending {refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)} else if alreadyJoint && !wantsLeaveJoint {refused = "must transition out of joint config first"} else if !alreadyJoint && wantsLeaveJoint {refused = "not in joint state; refusing empty conf change"}if refused != "" {r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)m.Entries[i] = pb.Entry{Type: pb.EntryNormal}} else {r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1}}
  1. pendingConfIndex:是否有配置正在变更,这个和切主不一样,这里如果有配置正在变更,会舍弃当前的配置变更,返回失败;
  2. wantsLeaveJoint:是否是联合配置,换句话说就是是不是涉及多个节点的变更。
  3. alreadyJoint:之前的联合配置是否已经更新为最新配置,也就是有没有清理联合配置,保留最新配置。

注意:如果alreadyJoint && !wantsLeaveJoint为真,就表示涉及多个节点变更但上次的变更还没有清理联合配置,返回失败;不是联合配置便拒绝空配置。

如果正常就将消息广播,然后返回应用层处理。
广播之后就等待消息提交然后被状态机应用。。。

two years later…

我们的confchange消息来到了这里,这里一般是由状态机应用调用,如下所示

func (n *node) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState {var cs pb.ConfStateselect {case n.confc <- cc.AsV2():case <-n.done:}select {case cs = <-n.confstatec:case <-n.done:}return &cs
}

这里给n.confc里面塞入消息,然后被raft监听到。也就是说应用层告诉raft说准备好了可以应用这个配置了,然后通过node就进入raft协议层

case cc := <-n.confc:_, okBefore := r.prs.Progress[r.id]// 进入raft协议层进行配置变更应用cs := r.applyConfChange(cc)// If the node was removed, block incoming proposals. Note that we// only do this if the node was in the config before. Nodes may be// a member of the group without knowing this (when they're catching// up on the log and don't have the latest config) and we don't want// to block the proposal channel in that case.//// NB: propc is reset when the leader changes, which, if we learn// about it, sort of implies that we got readded, maybe? This isn't// very sound and likely has bugs.if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter {var found boolouter:for _, sl := range [][]uint64{cs.Voters, cs.VotersOutgoing} {for _, id := range sl {if id == r.id {found = truebreak outer}}}if !found {propc = nil}}select {case n.confstatec <- cs:case <-n.done:}

正片开始。。。
下面这一段代码,看似简单,其实只是冰山一角,接下来正式撸这一段

func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {// 整个配置变更准备就在这个虚拟函数中做了cfg, prs, err := func() (tracker.Config, tracker.ProgressMap, error) {// 首先初始化一个Changer结构体changer := confchange.Changer{Tracker:   r.prs,LastIndex: r.raftLog.lastIndex(),}// 判断是不是来解除联合配置的,这里为什么能够判断我们暂时按下不表 ①if cc.LeaveJoint() {// 进入解除流程return changer.LeaveJoint()} else if autoLeave, ok := cc.EnterJoint(); ok { // 判断能否进入联合配置,就是来看是否有多个节点变更②// 然后进入联合配置变更 ③return changer.EnterJoint(autoLeave, cc.Changes...)}// 如果不是就施行one by one模式,就比较简单 return changer.Simple(cc.Changes...)}()if err != nil {// TODO(tbg): return the error to the caller.panic(err)}// 切换到准备好的配置 ④return r.switchToConfig(cfg, prs)
}

首先看第①点,要追述第①点我们还要看上一次配置变更的收尾工作
这段代码在raft.advance()函数中,就是当应用层把Ready结构中的数据处理完了,这是就该raft做一些收尾工作

if r.prs.Config.AutoLeave && oldApplied <= r.pendingConfIndex && newApplied >= r.pendingConfIndex && r.state == StateLeader {// If the current (and most recent, at least for this leader's term)// 这里做了一件事,就是如果AutoLeave为真,上一个配置还没收尾,就该收尾了,当然只能是leader收尾// 怎么收尾呢,就是发一个空配置ent := pb.Entry{Type: pb.EntryConfChangeV2,Data: nil,}// There's no way in which this proposal should be able to be rejected.if !r.appendEntry(ent) {panic("refused un-refusable auto-leaving ConfChangeV2")}r.pendingConfIndex = r.raftLog.lastIndex()r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config)}

这个空消息在stepLeader阶段会被解析为ConfChangeV2{},然后传到applyConfChange函数中,这样就可以通过下面这个函数

func (c ConfChangeV2) LeaveJoint() bool {// NB: c is already a copy.c.Context = nilreturn proto.Equal(&c, &ConfChangeV2{})
}

来判断是否是来解除(等价于Cold,newC_{old,new}Cold,new​到CnewC_{new}Cnew​)上一次的联合配置的,这样也能看出为什么是两阶段的形式。
然后来看第②点,这一点比较简单,看代码

func (c ConfChangeV2) EnterJoint() (autoLeave bool, ok bool) {// NB: in theory, more config changes could qualify for the "simple"// protocol but it depends on the config on top of which the changes apply.// For example, adding two learners is not OK if both nodes are part of the// base config (i.e. two voters are turned into learners in the process of// applying the conf change). In practice, these distinctions should not// matter, so we keep it simple and use Joint Consensus liberally.if c.Transition != ConfChangeTransitionAuto || len(c.Changes) > 1 {// Use Joint Consensus.var autoLeave boolswitch c.Transition {// ConfChangeTransitionAuto和ConfChangeTransitionJointImplicit都会在raft的收尾工作中自动完成联合配置解除case ConfChangeTransitionAuto:autoLeave = truecase ConfChangeTransitionJointImplicit:autoLeave = true// 需要应用层显示发一个空配置告诉Raft,解除上一次的联合配置case ConfChangeTransitionJointExplicit:default:panic(fmt.Sprintf("unknown transition: %+v", c))}return autoLeave, true}return false, false
}

主要是判断是否自动解除联合配置以及是否需要应用联合共识
再来看第③点,这个应该是是整个联合配置最复杂的一段,先看代码

func (c Changer) EnterJoint(autoLeave bool, ccs ...pb.ConfChangeSingle) (tracker.Config, tracker.ProgressMap, error) {// 拷贝出要正在应用的配置和Tracker.Progressyi ③-①cfg, prs, err := c.checkAndCopy()if err != nil {return c.err(err)}// 如果已经是联合配置了,就返回错误 ③-②if joint(cfg) {err := errors.New("config is already joint")return c.err(err)}// 只是为了测试,别当真,正常运行的raft集群怎么会没有Votersif len(incoming(cfg.Voters)) == 0 {// We allow adding nodes to an empty config for convenience (testing and// bootstrap), but you can't enter a joint state.err := errors.New("can't make a zero-voter config joint")return c.err(err)}// 保留之前的配置放到joint[1]中*outgoingPtr(&cfg.Voters) = quorum.MajorityConfig{}// Copy incoming to outgoing.for id := range incoming(cfg.Voters) {// 赋值为原来配置,意思是即将不用的配置outgoing,即将走了。。。有没有字面意思的感觉哈哈哈outgoing(cfg.Voters)[id] = struct{}{}}// 应用为联合配置,这里很重要③-③if err := c.apply(&cfg, prs, ccs...); err != nil {return c.err(err)}// 这里的变更主要是让raft后面进行收尾工作cfg.AutoLeave = autoLeave//  这个我们在第①点里说一下return checkAndReturn(cfg, prs)
}

首先看第③-①点

func (c Changer) checkAndCopy() (tracker.Config, tracker.ProgressMap, error) {cfg := c.Tracker.Config.Clone()prs := tracker.ProgressMap{}for id, pr := range c.Tracker.Progress {// A shallow copy is enough because we only mutate the Learner field.ppr := *pr// 这里拷贝的是每个progress地址prs[id] = &ppr}return checkAndReturn(cfg, prs)
}

其实就是把我们在前面初始化的changer中的数据拷贝出来,初始化时传入的是当前正在应用的prs和lastIndex,因此在函数中才能拷贝出当前运行的配置。
然后看第③-②点,就更简单了,就是看看原来的配置中有没有老配置没有清理

func joint(cfg tracker.Config) bool {return len(outgoing(cfg.Voters)) > 0
}

继续看第③-③点,最重要的一点

// 这个函数主要做什么呢?就是根据ConfChangeSingle的内容,对原来的配置做一个变更
func (c Changer) apply(cfg *tracker.Config, prs tracker.ProgressMap, ccs ...pb.ConfChangeSingle) error {for _, cc := range ccs {if cc.NodeID == 0 {// etcd replaces the NodeID with zero if it decides (downstream of// raft) to not apply a change, so we have to have explicit code// here to ignore these.continue}switch cc.Type {// 如果是增加节点,就在prs中加一个,顺便清理Learners 和LearnersNext中相关的节点,下同就不啰嗦了case pb.ConfChangeAddNode:c.makeVoter(cfg, prs, cc.NodeID)case pb.ConfChangeAddLearnerNode:c.makeLearner(cfg, prs, cc.NodeID)case pb.ConfChangeRemoveNode:c.remove(cfg, prs, cc.NodeID)case pb.ConfChangeUpdateNode:default:return fmt.Errorf("unexpected conf type %d", cc.Type)}}// 这个就是如果变更后没有节点了,就是说全给删了或者变为learner节点,这必然不行啊,那干嘛不直接整一个新的集群,然后把这个集群给下了if len(incoming(cfg.Voters)) == 0 {return errors.New("removed all voters")}return nil
}

这样一看最重要但不一定难哈
总的来说,第③点做了什么事呢,就是生成联合配置,将Config中Joint的增加一个MajorityConfig,将老的命名为outgoing在joint[1],变更后为incomming在joint[0],意思为即将应用的的配置,这样就组成一个联合配置了,然后把这个配置返回给applyConfChange。现在我们就可以来看第④点了,变更切换,先看代码

// switchToConfig reconfigures this node to use the provided configuration. It
// updates the in-memory state and, when necessary, carries out additional
// actions such as reacting to the removal of nodes or changed quorum
// requirements.
//
// The inputs usually result from restoring a ConfState or applying a ConfChange.
func (r *raft) switchToConfig(cfg tracker.Config, prs tracker.ProgressMap) pb.ConfState {// 把老配置应用程序新配置,以及prs也换一下r.prs.Config = cfgr.prs.Progress = prsr.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config)// 拿到之前配置状态cs := r.prs.ConfState()// 检查当前节点是否在prs中,就是看看是否被移除pr, ok := r.prs.Progress[r.id]// Update whether the node itself is a learner, resetting to false when the// node is removed.// 当前节点是否被变为learner节点r.isLearner = ok && pr.IsLearner// 如果如果是主节点且被降级了或者移除了if (!ok || r.isLearner) && r.state == StateLeader {// 直接返回// This node is leader and was removed or demoted. We prevent demotions// at the time writing but hypothetically we handle them the same way as// removing the leader: stepping down into the next Term.//// TODO(tbg): step down (for sanity) and ask follower with largest Match// to TimeoutNow (to avoid interruption). This might still drop some// proposals but it's better than nothing.//// TODO(tbg): test this branch. It is untested at the time of writing.return cs}// Follower或者candidateif r.state != StateLeader || len(cs.Voters) == 0 {return cs}// 如果是主节点,就准备提交配置,看看大多数节点的Match Index是否大于当前commitIndex,如果大于就可提交到MatchIndexif r.maybeCommit() {// If the configuration change means that more entries are committed now,// broadcast/append to everyone in the updated config.// 然后广播给其他节点,我准备提交到MatchIndexr.bcastAppend()} else {// Otherwise, still probe the newly added replicas; there's no reason to// let them wait out a heartbeat interval (or the next incoming// proposal).r.prs.Visit(func(id uint64, pr *tracker.Progress) {// 或者给落后的节点发送append消息r.maybeSendAppend(id, false /* sendIfEmpty */)})}// 如果将要切主的节点被移除了,就驳回切主请求if _, tOK := r.prs.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 {r.abortLeaderTransfer()}// 返回变更后的ConfStatereturn cs
}

到这里我们在applyConfChange函数中的四点就说清楚了,然后会到node.run()中

case cc := <-n.confc:_, okBefore := r.prs.Progress[r.id]cs := r.applyConfChange(cc)// If the node was removed, block incoming proposals. Note that we// only do this if the node was in the config before. Nodes may be// a member of the group without knowing this (when they're catching// up on the log and don't have the latest config) and we don't want// to block the proposal channel in that case.//// NB: propc is reset when the leader changes, which, if we learn// about it, sort of implies that we got readded, maybe? This isn't// very sound and likely has bugs.if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter {var found boolouter:for _, sl := range [][]uint64{cs.Voters, cs.VotersOutgoing} {for _, id := range sl {if id == r.id {found = truebreak outer}}}if !found {propc = nil}}select {case n.confstatec <- cs:case <-n.done:}

这里主要检查当前节点是否被移除,如果移除就停止接受上报信息,然后把变更后的配置返回给应用层。这个阶段对应的是从ColdC_{old}Cold​到Cold,newC_{old,new}Cold,new​。后面的leaveJoint就是从Cold,newC_{old,new}Cold,new​到CnewC_{new}Cnew​。后面就比较简单了,大同小异,读者可以自己去看。

总结

本文主要解决两个问题,什么是联合共识?在etcd/raft中是如何实现的?本次分享就到这里,如果有错误欢迎指正。后面准备再详细介绍一下JointConfig和MajorityConfig,这个涉及选举提交,还是很重要。好了,谢谢观看!

【Raft】学习九:成员变更ConfChangeV2相关推荐

  1. Raft 集群成员变更、日志压缩、客户端交互

    Raft 集群成员变更.日志压缩.客户端交互 集群成员变更 在集群服务器发生变化时,不能一次性的把所有的服务器配置信息从老的替换为新的,因为,每台服务器的替换进度是不一样的,可能会导致出现双主的情况, ...

  2. Raft成员变更的工程实践

    简介: 成员变更是一致性系统实现绕不开的难题,对于提升运维能力以及服务可用性都有很大的帮助. 本文从Raft成员变更理论出发,介绍了Raft成员变更和单步成员变更的问题,其中包括Raft著名的Bug. ...

  3. Joint Consensus两阶段成员变更的单步实现

    简介: Raft提出的两阶段成员变更Joint Consensus是业界主流的成员变更方法,极大的推动了成员变更的工程应用.但Joint Consensus成员变更采用两阶段,一次变更需要提议两条日志 ...

  4. Datawhale第五期组队学习团队成员

    第五期组队学习团队成员 组队学习 负责人 评优助教 星球星主 初级算法梳理 Danny 梁乾明 黑桃 高级算法梳理 居居 路建飞 黑桃 编程 LeoLRH 鲁力 黑桃 统计学 谷勇杰 ben 黑桃 L ...

  5. Datawhale第四期组队学习团队成员

    第四期组队学习团队成员 集训 负责人 评优助教 基础算法梳理 Sm1les 钱令武 高级算法梳理 于鸿飞 小雪 ML项目实践 杨冰楠 孙涛 编程 孙超 小熊 统计学 李奇锋 蓝昔 Leetcode 老 ...

  6. OpenCV与图像处理学习九——连通区域分析算法(含代码)

    OpenCV与图像处理学习九--连通区域分析算法(含代码) 一.连通区域概要 二.Two-Pass算法 三.代码实现 一.连通区域概要 连通区域(Connected Component)一般是指图像中 ...

  7. PyTorch框架学习九——网络模型的构建

    PyTorch框架学习九--网络模型的构建 一.概述 二.nn.Module 三.模型容器Container 1.nn.Sequential 2.nn.ModuleList 3.nn.ModuleDi ...

  8. 2021中国大学生喜爱雇主榜发布;调查显示九成员工正经历“职业倦怠”工作危机 | 美通企业日报...

    今日看点:2021中国大学生喜爱雇主榜发布.调查显示九成员工正经历"职业倦怠"工作危机.希尔顿携手飞猪拓宽双方会员生态体系.洲际酒店集团探索多模式校企合作.亚马逊全球开店与江苏省商 ...

  9. C1认证学习九(IP基础)

    C1认证学习九(IP基础) 任务背景 IP是Internet Protocol的缩写,是整个TCp/IP协议族的核心,也是构成互联网的基础,可以说,只要计算机在网络中存在,就一定可以找到IP地址.IP ...

最新文章

  1. 大脑如何判断该睡觉了?可能是这80种蛋白说了算
  2. ubuntu 修改或创建交换分区的大小
  3. PS菜鸟入门 -- 实战演示之磨皮
  4. 使用SAP iRPA创建一个最简单的hello world项目并部署到SAP云平台上
  5. 基于'sessionStorage'与'userData'的类session存储
  6. gz格式linux怎么打开,linux 下载解压gz文件怎么打开
  7. PAT-甲级之树遍历问题的总结
  8. CCF201903-3 损坏的RAID5(100分)【数学计算+文本处理】
  9. 校园网一直是连接认证服务器无响应,校园网常见问题解决办法
  10. 科大讯飞和neospeech tts哪个更好
  11. linux 下的字体引擎(xtt freetype xfs xft)
  12. 定位误差:基准位置公差、基准不重合误差
  13. CRM如何维护客户关系?CRM成功案例分析
  14. 腾达便携无线路由 无法建立到192.168.2.1的服务器连接,Tenda腾达路由器5G信号设置步骤...
  15. 2019年第十一届蓝桥杯国赛JavaB组第H题——“大胖子走迷宫”题目及解析
  16. css图片颜色设置为黑白
  17. 不等距双杆模型_电磁感应之双杆模型ppt课件
  18. 一、ADSP-21489开发版在VisualDSP软件下的仿真器配置
  19. Docker系列-镜像原理
  20. linux命令和vim学习

热门文章

  1. PDF文件太大怎么压缩?用这个方法能够一键瘦身
  2. 数据流图详解(DFD)
  3. 【java毕业设计】基于java+BS的QQ屏幕截图工具设计与实现(毕业论文+程序源码)——屏幕截图工具
  4. 字节跳动 面试 复盘 回顾 2021 过客局
  5. 深度学习数学基础之线性代数
  6. Android Studio从gthub上导入新项目的时候,R文件丢失的问题
  7. Windows 解决端口占用
  8. 印度尼西亚通过新区块链项目改善其航运产业
  9. AtCoder Grand Contest 012 B Splatter Painting (反向处理 + 记忆化)
  10. ARKit之路-LiDAR传感器(一)