在paxos算法中,主要包含了三个角色,proposer、accepter和learner。learner在paxos算法的论文中提及不是很详细,但是在phxpaxos实现是最为复杂的,之后独立一篇文章分析。
在微信的文档中,通过推导,一步一步得出simple-paxos正式算法的流程,推导的过程可以反复看看原先的文档(Paxos理论介绍(1): 朴素Paxos算法理论推导与证明),理解透了收益非浅。
这里主要分析phxpaxos的proposer和Acceptor代码实现算法的过程。
先看看算法的主要过程,给出的一页PPT如下:
    在phxpaxos的代码中,整理出主要流程如下图,
    在图中,左边是propoer的流程,右边是accepter的流程。其实就是最简单的方式实现了文档中paxos算法的流程。这里按照代码的主要流程来进行一步步分析。
首先,Propose的处理:
int Proposer :: NewValue(const std::string & sValue)
{
BP->GetProposerBP()->NewProposal(sValue);
if (m_oProposerState.GetValue().size() == 0)
{
m_oProposerState.SetValue(sValue);
}
m_iLastPrepareTimeoutMs = START_PREPARE_TIMEOUTMS;
m_iLastAcceptTimeoutMs = START_ACCEPT_TIMEOUTMS;
if (m_bCanSkipPrepare && !m_bWasRejectBySomeone)
{
//本节点之前已经执行过Prepare阶段,并且Prepare阶段或者Accept阶段没有被人拒绝过。。
BP->GetProposerBP()->NewProposalSkipPrepare();
PLGHead("skip prepare, directly start accept");
Accept();
}
else
{
//这里被人拒绝过就增加proposalID,否则,沿用之前的proposalID
//if not reject by someone, no need to increase ballot
Prepare(m_bWasRejectBySomeone);
}
return 0;
}
Propose的代码就是一些初始化并调用Prepare,这里有一个Multi-paxos的处理,就是有选择性的跳过Prepare,当前的proposer已经进行过了提交,并且在Prepare阶段或者Accept阶段没有被拒绝过,则跳过prepare阶段,具体的推导过程可以见(Paxos理论介绍(2): Multi-Paxos与Leader)。
接着进入Prepare,代码如下:
void Proposer :: Prepare(const bool bNeedNewBallot)
{
PLGHead("START Now.InstanceID %lu MyNodeID %lu State.ProposalID %lu State.ValueLen %zu",
GetInstanceID(), m_poConfig->GetMyNodeID(), m_oProposerState.GetProposalID(),
m_oProposerState.GetValue().size());
BP->GetProposerBP()->Prepare();
m_oTimeStat.Point();
//重置proposer的状态,退出Accept状态,进入Prepare状态
ExitAccept();
m_bIsPreparing = true;
m_bCanSkipPrepare = false;
m_bWasRejectBySomeone = false;
//是否需要重新分配ballot,被人拒绝过就需要重新分配
m_oProposerState.ResetHighestOtherPreAcceptBallot();
if (bNeedNewBallot)
{
m_oProposerState.NewPrepare();
}
PaxosMsg oPaxosMsg;
oPaxosMsg.set_msgtype(MsgType_PaxosPrepare);
oPaxosMsg.set_instanceid(GetInstanceID());
oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
m_oMsgCounter.StartNewRound();
//设置Prepare超时定时器
AddPrepareTimer();
PLGHead("END OK");
//发送Prepare消息。BroadcastMessage默认采用UDP方式发送,自己先执行,再发送给其他的节点
BroadcastMessage(oPaxosMsg);
}
Prepare主要判断是否需要重新分配ballot,并且增加一个Prepare定时器,超时重新进行Prepare。这里有一行代码:
m_oMsgCounter.StartNewRound();
m_oMsgCounter主要负责统计是否通过投票,每次prepare或者accept都会清空,表示新一轮投票。
接下来我们看看Accepter通过OnPrepare函数来处理Prepare的消息的代码:
int Acceptor :: OnPrepare(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu",
oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid());
BP->GetAcceptorBP()->OnPrepare();
PaxosMsg oReplyPaxosMsg;
oReplyPaxosMsg.set_instanceid(GetInstanceID());
oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
oReplyPaxosMsg.set_msgtype(MsgType_PaxosPrepareReply);
BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
if (oBallot >= m_oAcceptorState.GetPromiseBallot())
{
//Ballot大于承诺的ballot, 则触发Promise操作
PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
"State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID,
m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
//设置承认的ballot
oReplyPaxosMsg.set_preacceptid(m_oAcceptorState.GetAcceptedBallot().m_llProposalID);
oReplyPaxosMsg.set_preacceptnodeid(m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
if (m_oAcceptorState.GetAcceptedBallot().m_llProposalID > 0)
{
oReplyPaxosMsg.set_value(m_oAcceptorState.GetAcceptedValue());
}
//设置promise ballot 的值
m_oAcceptorState.SetPromiseBallot(oBallot);
//持久化
int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
if (ret != 0)
{
BP->GetAcceptorBP()->OnPreparePersistFail();
PLGErr("Persist fail, Now.InstanceID %lu ret %d",
GetInstanceID(), ret);
return -1;
}
BP->GetAcceptorBP()->OnPreparePass();
}
else
{
//Reject,并且告诉proposer拒绝他的promiseID的值
BP->GetAcceptorBP()->OnPrepareReject();
PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID);
oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
}
nodeid_t iReplyNodeID = oPaxosMsg.nodeid();
PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
GetInstanceID(), oPaxosMsg.nodeid());;
//通过UDP回消息
SendMessage(iReplyNodeID, oReplyPaxosMsg);
return 0;
}
可以对比看看算法的过程,是不是和代码的流程几乎一样,只是一个对PromiseID的判断,如果大于等于PromiseID,则Promise,否则Reject。
    接下来,Proposer收到每个Accepter发回的OnPrepareReply代码如下:
void Proposer :: OnPrepareReply(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",
oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(),
oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());
BP->GetProposerBP()->OnPrepareReply();
if (!m_bIsPreparing)
{
//不处于Preparing状态,直接退出
BP->GetProposerBP()->OnPrepareReplyButNotPreparing();
//PLGErr("Not preparing, skip this msg");
return;
}
if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID())
{
//proposalID不匹配,也直接退出
BP->GetProposerBP()->OnPrepareReplyNotSameProposalIDMsg();
//PLGErr("ProposalID not same, skip this msg");
return;
}
//消息计数器增加收到消息的节点
m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());
if (oPaxosMsg.rejectbypromiseid() == 0)
{
//Promise,消息计数器中增加PromiseOrAccept的节点,
//并且将该节点的promise id、提案值更新到本地
BallotNumber oBallot(oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid());
PLGDebug("[Promise] PreAcceptedID %lu PreAcceptedNodeID %lu ValueSize %zu",
oPaxosMsg.preacceptid(), oPaxosMsg.preacceptnodeid(), oPaxosMsg.value().size());
m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());
m_oProposerState.AddPreAcceptValue(oBallot, oPaxosMsg.value());
}
else
{
//Reject,消息计数器中增加Reject的节点,将m_bWasRejectBySomeone置为true,表示proposalID需要增加
//增加的大小在拒绝的accepter的promiseID的大小和本机的ProposalID的最大值加1
PLGDebug("[Reject] RejectByPromiseID %lu", oPaxosMsg.rejectbypromiseid());
m_oMsgCounter.AddReject(oPaxosMsg.nodeid());
m_bWasRejectBySomeone = true;
m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());
}
if (m_oMsgCounter.IsPassedOnThisRound())
{
//通过了之后,将m_bCanSkipPrepare置为true,表面可以跳过prepare状态。启动Accept流程
int iUseTimeMs = m_oTimeStat.Point();
BP->GetProposerBP()->PreparePass(iUseTimeMs);
PLGImp("[Pass] start accept, usetime %dms", iUseTimeMs);
m_bCanSkipPrepare = true;
Accept();
}
else if (m_oMsgCounter.IsRejectedOnThisRound()
|| m_oMsgCounter.IsAllReceiveOnThisRound())
{
//拒绝了之后重置prepare定时器
BP->GetProposerBP()->PrepareNotPass();
PLGImp("[Not Pass] wait 30ms and restart prepare");
AddPrepareTimer(OtherUtils::FastRand() % 30 + 10);
}
PLGHead("END");
}
这里看看AddPreAcceptValue函数:
void ProposerState :: AddPreAcceptValue(
const BallotNumber & oOtherPreAcceptBallot,
const std::string & sOtherPreAcceptValue)
{
PLGDebug("OtherPreAcceptID %lu OtherPreAcceptNodeID %lu HighestOtherPreAcceptID %lu "
"HighestOtherPreAcceptNodeID %lu OtherPreAcceptValue %zu",
oOtherPreAcceptBallot.m_llProposalID, oOtherPreAcceptBallot.m_llNodeID,
m_oHighestOtherPreAcceptBallot.m_llProposalID, m_oHighestOtherPreAcceptBallot.m_llNodeID,
sOtherPreAcceptValue.size());
if (oOtherPreAcceptBallot.isnull())
{
//如果av为空。直接返回
return;
}
if (oOtherPreAcceptBallot > m_oHighestOtherPreAcceptBallot)
{
//如果ab大于maxb,则保存maxb和value
m_oHighestOtherPreAcceptBallot = oOtherPreAcceptBallot;
m_sValue = sOtherPreAcceptValue;
}
}
在simple paxos论文对第一阶段Accept描述如下:
If an acceptor receives a prepare request with number n greater than that of any prepare request to which it has already responded, then it responds to the request with a promise not to accept any more proposals numbered less than n and with the highest-numbered proposal (if any) that it has accepted.
如果ab>maxb且av不等于null,将maxb赋值为ab并且将value赋值为av,即上文说的,被accept的最高提案值。然后当多数派promise之后就进行accept。
Accept的代码如下:
void Proposer :: Accept()
{
PLGHead("START ProposalID %lu ValueSize %zu ValueLen %zu",
m_oProposerState.GetProposalID(), m_oProposerState.GetValue().size(), m_oProposerState.GetValue().size());
BP->GetProposerBP()->Accept();
m_oTimeStat.Point();
//退出Prepare状态,进入Accept状态
ExitPrepare();
m_bIsAccepting = true;
PaxosMsg oPaxosMsg;
oPaxosMsg.set_msgtype(MsgType_PaxosAccept);
oPaxosMsg.set_instanceid(GetInstanceID());
oPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oPaxosMsg.set_proposalid(m_oProposerState.GetProposalID());
oPaxosMsg.set_value(m_oProposerState.GetValue());
oPaxosMsg.set_lastchecksum(GetLastChecksum());
m_oMsgCounter.StartNewRound();
//增加Accept定时器
AddAcceptTimer();
PLGHead("END");
//Udp广播Accept消息,自己最后执行
BroadcastMessage(oPaxosMsg, BroadcastMessage_Type_RunSelf_Final);
}
    
    基本流程和prepare差不多,就不多说了,看看accepter接受到accept的消息的OnAccept的代码:
void Acceptor :: OnAccept(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.InstanceID %lu Msg.from_nodeid %lu Msg.ProposalID %lu Msg.ValueLen %zu",
oPaxosMsg.instanceid(), oPaxosMsg.nodeid(), oPaxosMsg.proposalid(), oPaxosMsg.value().size());
BP->GetAcceptorBP()->OnAccept();
PaxosMsg oReplyPaxosMsg;
oReplyPaxosMsg.set_instanceid(GetInstanceID());
oReplyPaxosMsg.set_nodeid(m_poConfig->GetMyNodeID());
oReplyPaxosMsg.set_proposalid(oPaxosMsg.proposalid());
oReplyPaxosMsg.set_msgtype(MsgType_PaxosAcceptReply);
BallotNumber oBallot(oPaxosMsg.proposalid(), oPaxosMsg.nodeid());
if (oBallot >= m_oAcceptorState.GetPromiseBallot())
{
//Ballot大于承诺的ballot, 则触发Accept的操作
PLGDebug("[Promise] State.PromiseID %lu State.PromiseNodeID %lu "
"State.PreAcceptedID %lu State.PreAcceptedNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID,
m_oAcceptorState.GetAcceptedBallot().m_llProposalID,
m_oAcceptorState.GetAcceptedBallot().m_llNodeID);
//设置pb, ab和av
m_oAcceptorState.SetPromiseBallot(oBallot);
m_oAcceptorState.SetAcceptedBallot(oBallot);
m_oAcceptorState.SetAcceptedValue(oPaxosMsg.value());
//持久化
int ret = m_oAcceptorState.Persist(GetInstanceID(), GetLastChecksum());
if (ret != 0)
{
BP->GetAcceptorBP()->OnAcceptPersistFail();
PLGErr("Persist fail, Now.InstanceID %lu ret %d",
GetInstanceID(), ret);
return;
}
BP->GetAcceptorBP()->OnAcceptPass();
}
else
{
//Reject,并且告诉proposer拒绝他的promiseID的值
BP->GetAcceptorBP()->OnAcceptReject();
PLGDebug("[Reject] State.PromiseID %lu State.PromiseNodeID %lu",
m_oAcceptorState.GetPromiseBallot().m_llProposalID,
m_oAcceptorState.GetPromiseBallot().m_llNodeID);
oReplyPaxosMsg.set_rejectbypromiseid(m_oAcceptorState.GetPromiseBallot().m_llProposalID);
}
nodeid_t iReplyNodeID = oPaxosMsg.nodeid();
PLGHead("END Now.InstanceID %lu ReplyNodeID %lu",
GetInstanceID(), oPaxosMsg.nodeid());
//通过UDP回消息
SendMessage(iReplyNodeID, oReplyPaxosMsg);
}
这里也没有进行特殊的处理。在accept的处理中,如果接受了,则不能改变(当然了,如果遇到同一个instance的另一个ballot更高的提案接受,则更新为proposeid更高的提案),需要将提案值持久化,通过leveldb保存至本地数据库。
接着看看OnAcceptReplay 的代码如下:
void Proposer :: OnAcceptReply(const PaxosMsg & oPaxosMsg)
{
PLGHead("START Msg.ProposalID %lu State.ProposalID %lu Msg.from_nodeid %lu RejectByPromiseID %lu",
oPaxosMsg.proposalid(), m_oProposerState.GetProposalID(),
oPaxosMsg.nodeid(), oPaxosMsg.rejectbypromiseid());
BP->GetProposerBP()->OnAcceptReply();
if (!m_bIsAccepting)
{
//不处于Accept状态,直接退出
//PLGErr("Not proposing, skip this msg");
BP->GetProposerBP()->OnAcceptReplyButNotAccepting();
return;
}
if (oPaxosMsg.proposalid() != m_oProposerState.GetProposalID())
{
//proposalID不匹配,也直接退出
//PLGErr("ProposalID not same, skip this msg");
BP->GetProposerBP()->OnAcceptReplyNotSameProposalIDMsg();
return;
}
m_oMsgCounter.AddReceive(oPaxosMsg.nodeid());
if (oPaxosMsg.rejectbypromiseid() == 0)
{
//Accept,消息计数器中增加PromiseOrAccept的节点(m_oMsgCounter在每一轮Prepare或者Accept都会重置)
PLGDebug("[Accept]");
m_oMsgCounter.AddPromiseOrAccept(oPaxosMsg.nodeid());
}
else
{
//Reject,消息计数器中增加Reject的节点,将m_bWasRejectBySomeone置为true,表示proposalID需要增加
//增加的大小在拒绝的accepter的promiseID的大小和本机的ProposalID的最大值加1
PLGDebug("[Reject]");
m_oMsgCounter.AddReject(oPaxosMsg.nodeid());
m_bWasRejectBySomeone = true;
m_oProposerState.SetOtherProposalID(oPaxosMsg.rejectbypromiseid());
}
if (m_oMsgCounter.IsPassedOnThisRound())
{
//多数派同意了提案,退出Accept状态,发送实例中提案被chosen的消息
int iUseTimeMs = m_oTimeStat.Point();
BP->GetProposerBP()->AcceptPass(iUseTimeMs);
PLGImp("[Pass] Start send learn, usetime %dms", iUseTimeMs);
ExitAccept();
m_poLearner->ProposerSendSuccess(GetInstanceID(), m_oProposerState.GetProposalID());
}
else if (m_oMsgCounter.IsRejectedOnThisRound()
|| m_oMsgCounter.IsAllReceiveOnThisRound())
{
//失败,重置Accept定时器,Accept重新从Prepare开始,如果instance增加了则忽略提案
BP->GetProposerBP()->AcceptNotPass();
PLGImp("[Not pass] wait 30ms and Restart prepare");
AddAcceptTimer(OtherUtils::FastRand() % 30 + 10);
}
PLGHead("END");
}
        
    在OnAcceptReply和OnPrepareReply的消息中,如果遇到不在预期状态的回复时,直接忽略。

phxpaxos的Proposer和Acceptor的流程相关推荐

  1. 通过 PhxPaxos 了解 Paxos 原理

    通过 PhxPaxos 了解 Paxos 原理 Prepare阶段 Prepare // src/algorithm/proposer.cppvoid Proposer :: Prepare(cons ...

  2. PhxPaxos源码分析之(3)提案发起篇(Paxos协议核心)

    更多 blog 见: https://joeylichang.github.io/ 本篇内容根据Paxos协议分五部分介绍,即发起Prpare请求.给Prepare请求投票.收集Prepare投票,接 ...

  3. Paxos和Raft的前世今生

    前言 在保证数据安全的基础上,保持服务的持续可用,是核心业务对底层数据存储系统的基本要求.业界常见的1主N备的方案面临的问题是"最大可用(Maximum Availability)" ...

  4. 跟着微信后台团队学习分布式一致性协议

    目录 什么是Paxos 一致性协议 分布式环境 提议者 Paxos是用来干什么的 确定一个值 确定多个值 有序的确定多个值 实例的对齐(Learn) 如何应用Paxos 状态机 工程化 我们需要多个角 ...

  5. 区块链共识机制:分布式系统的Paxos协议

    前言:第一次接触paxos可能很多人不理解这玩意儿有啥用,近几天一直在研究paxos,不敢说理解的多到位,但是把自己理解的记录下来,供大家参考.文章主要参考知行学社的<分布式系统与Paxos算法 ...

  6. 如何实现一个 Paxos

    Paxos 作为一个经典的分布式一致性算法(Consensus Algorithm),在各种教材中也被当做范例来讲解.但由于其抽象性,很少有人基于朴素 Paxos 开发一致性库,而 RAFT 则是工业 ...

  7. 深度介绍分布式系统原理与设计

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 1 概念 1.1 模型 1.2 副本 1.3 衡量分布式系 ...

  8. 分布式概念-分布式事务,并发处理协议

    点击上方蓝色字体,选择"设为星标" 优质文章,及时送达 提到分布式系统,分布式事务是经常被大家提起的话题,也是经常在我们编码或是系统设计时遇到的问题,很常见. 如果让大家说一种解决 ...

  9. 分布式一致性协议paxos

    Paxos协议/算法是分布式系统中比较重要的协议,它有多重要呢? <分布式系统的事务处理>: Google Chubby的作者Mike Burrows说过这个世界上只有一种一致性算法,那就 ...

最新文章

  1. 来!说说你在流量控制方面的经验!
  2. PyTorch | (1)初识PyTorch
  3. 美妙的模电2013/4/18
  4. [持续更新]UnsatisfiedLinkError常见问题及解决方案
  5. 教你如何用Python追踪快递信息!
  6. boost::set_difference相关的测试程序
  7. 什么是 SAP enhancement package
  8. 12c跨平台完成PDB的备份迁移
  9. 【深度学习】单标签多分类问题之新闻主题分类
  10. aspx反射调用方法
  11. java4android网易云_仿照网易云音乐界面 android特效
  12. c语言编码任务描述,C语言委派任务问题代码及解析
  13. x64dbg 2022 最新版编译方法
  14. 别让生气毁了你的生活(深度好文)
  15. 云集上市,短短四年时间缔造了一个新的电商神话
  16. 华硕笔记本开机自动进入bios,进不了windows系统的解决方法
  17. 跟着做react项目(至P44)
  18. Valgrind 内存管理工具Memcheck 基本使用
  19. # ABAP 1. ALV快速模板
  20. 第十三届蓝桥杯大赛软件赛省赛 Python 大学 C 组

热门文章

  1. mybatis入门基础(六)----高级映射(一对一,一对多,多对多)
  2. 数据结构实验报告——排序算法设计及分析(排序单链表)
  3. Skype新增“愤怒的小鸟”emoji表情
  4. mysql 小试牛刀
  5. 性能测试之细节决定成败
  6. 「 运动控制 」“PID控制原理及参数调整”经验
  7. 图解Hibernate中的三种状态(瞬时态,持久态,游离态|托管态)
  8. 给定年月,打印当月的月历表。
  9. !!最神奇、最实用和最精准的逃顶指标公式(通达信源码)
  10. 线性空间的一些直观感悟