Leader Election基本设计

  • 按照rank表示优先级解决冲突问题,为每个monitor预先分配了一个rank
  • 只会接受优先级(rank)比自己高、epoch比上次已接受的epoch大的选举请求
  • 当选的leader,不一定有最新的数据。所以在phase 1中,会根据已经commit的数据,进行leader和peon之间的同步
  • 用奇数的epoch表示选举状态,偶数表示稳定状态
  • 一旦选举成功,会形成一个quorum,在该leader当选期间,所有提议,必须quorum中全部成员同意

Leader Election主要过程和函数

  • Elector::init()
    初始化处理,从kv读取之前持久化的信息
  • Elector::shutdown()
    退出处理
  • Elector::start()
    推选自己(自荐)
  • Elector::defer()

    接受别人选举,延迟推选自己
  • Elector::handle_propose()

    处理别人的自荐消息
  • Elector::handle_ack()
    处理别人的ack
  • Elector::victory()
    宣布自己当选
  • Elector::handle_victory()
    处理别人当选消息
  • Elector::expire()
    选举超时处理
  • Elector::reset_timer()
    设置选举timer
  • Elector::cancel_timer()
    取消timer
  • Elector::dispatch()
    选举消息的分发函数

代码

初始化

/*Ceph的monitor使用leveldb作为持久化存储,下面的mon->store
就是leveldb操作的封装,由于很多数据共用同一leveldb,所以对
key的空间做了分级,Monitor::MONITOR_NAME可以认为是第一级key*/
void Elector::init()
{//选举的epoch,递增分配。每次修改都做了持久化,这是从kv db读取。epoch = mon->store->get(Monitor::MONITOR_NAME, "election_epoch"); if (!epoch)//首次使用 epoch = 1; }

bump_epoch

/*将自己的epoch修改为参数 e,需要持久化到 kv 存储。*/
void Elector::bump_epoch(epoch_t e)
{dout(10) << "bump_epoch " << epoch << " to " << e << dendl; assert(epoch <= e); epoch = e; //使用一个事务,写kv存储 MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); t->put(Monitor::MONITOR_NAME, "election_epoch", epoch); mon->store->apply_transaction(t);//持久化收到的epoch。 //join election会使 monitor 进入 STATE_ELECTING 状态 mon->join_election(); // clear up some state electing_me = false; //因为别人的epoch比自己大,放弃选自己 acked_me.clear(); //这个acked_m,是选自己才有意义,在此清空。 classic_mons.clear(); }

推选自己为leader的函数(自荐)

//在启动后或者leader超时等场合,会发起自荐
void Elector::start()//推选自己
{if (!participating) {return;}//清空,表示还没人响应自己acked_me.clear();classic_mons.clear();//从store获取持久化的epoch. init(); if (epoch % 2 == 0) //epoch是偶数表明是稳定态 bump_epoch(epoch+1); //odd == election cycle 选举都是用的奇数epoch start_stamp = ceph_clock_now(g_ceph_context); electing_me = true; //设置为true,前面的bump_epoch可能设置为false了 //填写map的key是自己的rank,表明自己先同意了自己 acked_me[mon->rank] = CEPH_FEATURES_ALL; leader_acked = -1;//无效值,表明我没有ack别人 //给monmap中每个成员发送消息。可以认为monmap成员是预先配置的,且配置了rank for (unsigned i=0; i<mon->monmap->size(); ++i) { if ((int)i == mon->rank) continue; //消息中带有自己的epoch Message *m = new MMonElection(MMonElection::OP_PROPOSE, epoch, mon->monmap); mon->messenger->send_message(m, mon->monmap->get_inst(i)); } reset_timer();//设置选举用的timer,参见expire()函数 }

响应别人自荐

//defer就是暂时放弃推选自己
void Elector::defer(int who)
{if (electing_me) {//放弃选自己acked_me.clear();classic_mons.clear();electing_me = false;}//表明我支持了"who"当leader. leader_acked不需要持久化,因为任何一个monitor在 //reboot后都会重新发起election。 leader_acked = who; ack_stamp = ceph_clock_now(g_ceph_context); //返回OP_ACK消息,即赞成对方当leader MMonElection *m = new MMonElection(MMonElection::OP_ACK, epoch, mon->monmap); m->sharing_bl = mon->get_supported_commands_bl(); mon->messenger->send_message(m, mon->monmap->get_inst(who)); // set a timer 对方在一定时间内,应该宣布自己当选才对 reset_timer(1.0); // give the leader some extra time to declare victory }

timer相关工具函数

设置timer的工具函数

void Elector::reset_timer(double plus)
{// set the timercancel_timer();expire_event = new C_ElectionExpire(this); mon->timer.add_event_after(g_conf->mon_lease + plus, expire_event); }

取消timer的工具函数

void Elector::cancel_timer()
{if (expire_event) {mon->timer.cancel_event(expire_event);expire_event = 0;}
}

超时处理

void Elector::expire()
{// 如果是自荐,只要超过半数同意,就认为成功if (electing_me &&acked_me.size() > (unsigned)(mon->monmap->size() / 2)) { //注意,expire判断的是 > monmap->size()/2,而handle_ack里面是等待全部ack。 // i win victory(); } else {//没有推选自己 // whoever i deferred to didn't declare victory quickly enough. if (mon->has_ever_joined) start();//之前我加入过quorum,直接重新发动选举。因为monmap中会包含我。 else mon->bootstrap();//否则,走bootstrap } }

成功当选leader处理

void Elector::victory()
{leader_acked = -1;electing_me = false;uint64_t features = CEPH_FEATURES_ALL;set<int> quorum;for (map<int, uint64_t>::iterator p = acked_me.begin(); p != acked_me.end(); ++p) {//如果是从expire()调用的victory(),则不是monmap的记录的所有node, //但是肯定是超过半数。 quorum.insert(p->first);//ack过我的,全部进入quorum。 features &= p->second; } // decide what command set we're supporting bool use_classic_commands = !classic_mons.empty(); // keep a copy to share with the monitor; we clear classic_mons in bump_epoch set<int> copy_classic_mons = classic_mons; cancel_timer(); assert(epoch % 2 == 1); // 选举期间用的奇数epoch bump_epoch(epoch+1); // 选举完成,epoch变成偶数 // decide my supported commands for peons to advertise const bufferlist *cmds_bl = NULL; const MonCommand *cmds; int cmdsize; if (use_classic_commands) { mon->get_classic_monitor_commands(&cmds, &cmdsize); cmds_bl = &mon->get_classic_commands_bl(); } else { mon->get_locally_supported_monitor_commands(&cmds, &cmdsize); cmds_bl = &mon->get_supported_commands_bl(); } //通知大家自己当选 for (set<int>::iterator p = quorum.begin(); p != quorum.end(); ++p) { if (*p == mon->rank) continue; MMonElection *m = new MMonElection(MMonElection::OP_VICTORY, epoch, mon->monmap); m->quorum = quorum; m->quorum_features = features; m->sharing_bl = *cmds_bl; mon->messenger->send_message(m, mon->monmap->get_inst(*p)); } //调用monitor的函数,它会发起paxos的propose mon->win_election(epoch, quorum, features, cmds, cmdsize, &copy_classic_mons); }

处理别人的自荐消息

根据情况决定是否支持,或者决定该推荐自己

void Elector::handle_propose(MonOpRequestRef op)
{MMonElection *m = static_cast<MMonElection*>(op->get_req());dout(5) << "handle_propose from " << m->get_source() << dendl;int from = m->get_source().num();//获取对方的rankassert(m->epoch % 2 == 1); // election uint64_t required_features = mon->get_required_features(); if ((required_features ^ m->get_connection()->get_features()) & required_features) {//要求对方的feature,覆盖required_features nak_old_peer(op); return; } else if (m->epoch > epoch) {//对方epoch比我大,放弃选自己,追随它。 bump_epoch(m->epoch); } else if (m->epoch < epoch) {//对方epoch太小,否决 // got an "old" propose, //发送消息的peer可能是刚刚加进来的,以前不在quorum里面。所以epoch比较小 if (epoch % 2 == 0 && // in a non-election cycle mon->quorum.count(from) == 0) { // from someone outside the quorum // a mon just started up, call a new election so they can rejoin! //为什么我要start_election? 因为其epoch太旧,不可能当选。 mon->start_election(); } else {//认为收到了旧消息,忽略 dout(5) << " ignoring old propose" << dendl; return; } } //我比发送方的rank高。如果我没有响应过其他比我rank高的,就推选自己 if (mon->rank < from) { // i would win over them. if (leader_acked >= 0) { // we already acked someone assert(leader_acked < from); // and they still win, of course 否则不可能ack它 } else {//如果没有acked过,比我优先级低的在推选自己,那么我应该选自己才对。 // wait, i should win! if (!electing_me) { mon->start_election(); } } } else {//发送方rank比我高 // they would win over me //之前我没有赞成谁,或者之前那个优先级没现在这个高,则赞成现在这个 if (leader_acked < 0 || // haven't acked anyone yet, or leader_acked > from || // they would win over who you did ack, or leader_acked == from) { // this is the guy we're already deferring to defer(from);//这个函数内部会发送 OP_ACK,支持发送方 } else {//我之前响应过别人,坚持之前的选择 // ignore them! dout(5) << "no, we already acked " << leader_acked << dendl; } } }

收到 ack之后的处理

//收到别人响应后的处理
void Elector::handle_ack(MonOpRequestRef op)
{op->mark_event("elector:handle_ack");MMonElection *m = static_cast<MMonElection*>(op->get_req());dout(5) << "handle_ack from " << m->get_source() << dendl;int from = m->get_source().num(); assert(m->epoch % 2 == 1); // election状态,必须是奇数 //下面dout解释了出现这个现象的原因,即我在自己重启,out了。 if (m->epoch > epoch) { dout(5) << "woah, that's a newer epoch, i must have rebooted. bumping and re-starting!" << dendl; bump_epoch(m->epoch);//必须用新的epoch才能引起有效选举,否则被忽略了 start(); return; } assert(m->epoch == epoch); uint64_t required_features = mon->get_required_features(); if ((required_features ^ m->get_connection()->get_features()) & required_features) { dout(5) << " ignoring ack from mon" << from << " without required features" << dendl; return; } //如果正在推选自己 if (electing_me) { // thanks //acked_me是个map acked_me[from] = m->get_connection()->get_features(); if (!m->sharing_bl.length()) classic_mons.insert(from); dout(5) << " so far i have " << acked_me << dendl; //所有人都赞成我 if (acked_me.size() == mon->monmap->size()) { // if yes, shortcut to election finish victory(); } } else {//以前我曾经推选过自己,但是现在我已经投别人了 // ignore, i'm deferring already. assert(leader_acked >= 0); } }

在别人当选后的处理

//收到别人宣告选举胜利的消息后的处理
void Elector::handle_victory(MonOpRequestRef op)
{op->mark_event("elector:handle_victory");MMonElection *m = static_cast<MMonElection*>(op->get_req());int from = m->get_source().num();assert(from < mon->rank);assert(m->epoch % 2 == 0); leader_acked = -1; //之前我一定选举了它,所以epoch必须match,否则有问题。 // i should have seen this election if i'm getting the victory. if (m->epoch != epoch + 1) { //在victory()中,已经加1,所以是偶数,且比peon看到的大1 dout(5) << "woah, that's a funny epoch, i must have rebooted. bumping and re-starting!" << dendl; bump_epoch(m->epoch); start(); return; } bump_epoch(m->epoch);//我也变成偶数epoch // they win mon->lose_election(epoch, m->quorum, from, m->quorum_features); // cancel my timer cancel_timer();//选举timer没用了 // stash leader's commands if (m->sharing_bl.length()) { MonCommand *new_cmds; int cmdsize; bufferlist::iterator bi = m->sharing_bl.begin(); MonCommand::decode_array(&new_cmds, &cmdsize, bi); mon->set_leader_supported_commands(new_cmds, cmdsize); } else { // they are a legacy monitor; use known legacy command set const MonCommand *new_cmds; int cmdsize; mon->get_classic_monitor_commands(&new_cmds, &cmdsize); mon->set_leader_supported_commands(new_cmds, cmdsize); } }

消息分发函数

消息的dispatch,里面有些关于monmap的处理。

/*monmap,是集群当前配置的所有monitor的集合。*monmap在bootstrp过程中会在montior间同步,这里没仔细讨论。*monmap中的各个monitor,只有参与选举投票的,才会进入quorum。*/
void Elector::dispatch(MonOpRequestRef op)
{op->mark_event("elector:dispatch");assert(op->is_type_election());switch (op->get_req()->get_type()) {case MSG_MON_ELECTION://elector只收election这个类别的消息 { if (!participating) { return; } if (op->get_req()->get_source().num() >= mon->monmap->size()) { dout(5) << " ignoring bogus election message with bad mon rank " << op->get_req()->get_source() << dendl; return; } MMonElection *em = static_cast<MMonElection*>(op->get_req()); // assume an old message encoding would have matched if (em->fsid != mon->monmap->fsid) { dout(0) << " ignoring election msg fsid " << em->fsid << " != " << mon->monmap->fsid << dendl; return; } //选举是根据monmap干活的。monmap在bootstrap阶段大家已经同步了。 if (!mon->monmap->contains(em->get_source_addr())) { dout(1) << "discarding election message: " << em->get_source_addr() << " not in my monmap " << *mon->monmap << dendl; return; } MonMap *peermap = new MonMap; peermap->decode(em->monmap_bl); //比较二者的monmap的epoch,即二者看到的monitor配置应该相同。 if (peermap->epoch > mon->monmap->epoch) { dout(0) << em->get_source_inst() << " has newer monmap epoch " << peermap->epoch << " > my epoch " << mon->monmap->epoch << ", taking it" << dendl; mon->monmap->decode(em->monmap_bl); MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction); //更新了自己的monmap,并且写盘。实际上信任了对方的monmap。 t->put("monmap", mon->monmap->epoch, em->monmap_bl); t->put("monmap", "last_committed", mon->monmap->epoch); mon->store->apply_transaction(t); //mon->monmon()->paxos->stash_latest(mon->monmap->epoch, em->monmap_bl); cancel_timer(); mon->bootstrap();//重新做一次自举。自举后会重新选举 delete peermap; return; } if (peermap->epoch < mon->monmap->epoch) { //这种情况下,会用我的map去同步对方的。 dout(0) << em->get_source_inst() << " has older monmap epoch " << peermap->epoch << " < my epoch " << mon->monmap->epoch << dendl; } delete peermap; switch (em->op) { case MMonElection::OP_PROPOSE: handle_propose(op); return; } if (em->epoch < epoch) www.rcsx.org {//为什么又比较这个epoch? dout(5) << "old epoch, dropping" << dendl; break; } switch (em->op) { case MMonElection::OP_ACK: handle_ack(op); return; case MMonElection::OP_VICTORY: handle_victory(op); return; case MMonElection::OP_NAK: handle_nak(op); return; default: assert(0); } } break; default: assert(0); } }
//admin command处理,触发选举
void Elector::start_participating()
{if (!participating) {participating = true; call_election(); } }

转载于:https://www.cnblogs.com/tianshifu/p/7336039.html

根据已经commit的数据,进行leader和peon之间的同步相关推荐

  1. 共享首选项中commit()和apply()之间的区别是什么

    我在我的Android应用程序中使用共享首选项. 我正在使用共享首选项中的commit()和apply()方法. 当我使用AVD 2.3时它没有显示错误,但是当我在AVD 2.1中运行代码时, app ...

  2. vue+vuex+axios从后台获取数据存入vuex,组件之间共享数据

    vue+vuex+axios从后台获取数据存入vuex,组件之间共享数据 在vue项目中组件间相互传值或者后台获取的数据需要供多个组件使用的情况很多的话,有必要考虑引入vuex来管理这些凌乱的状态,今 ...

  3. Redis配置主从数据,实现主从库之间数据同步

    一.背景 需求来源:在现代网络时代,随着用户访问量增加,网站并发量增加,数据库面临压力倍增,当并发到达一个巅峰值,服务器会宕机,那么如何避免这种现象出现呢? 下面是个人总结的几点解决方案: 1.增加服 ...

  4. 他用几个公式解释了现金贷业务的风控与运营 (下) 2017-09-18 22:04 风控/运营/违约 “金额如此小的业务,成本极度敏感,刚开始的时候我们在数据成本和坏账成本之间特别纠结。” 以上是许

    他用几个公式解释了现金贷业务的风控与运营 (下) 2017-09-18 22:04风控/运营/违约 "金额如此小的业务,成本极度敏感,刚开始的时候我们在数据成本和坏账成本之间特别纠结.&qu ...

  5. 阿里巴巴天池大数据竞赛黄金联赛全面开战,全球同步报名,只为寻找最聪明的你!...

    阿里巴巴天池大数据竞赛黄金联赛全面开战,全球同步报名,只为寻找最聪明的你!          天池大数据竞赛是由阿里巴巴集团主办,面向全球新生代力量的高端算法竞赛.通过开放海量数据和"天池& ...

  6. a人工智能b大数据c云计算_你清楚5G物联网、大数据、云计算、人工智能之间的关联吗?...

    同属于高新技术发展领域的物联网.大数据.云计算.人工智能之间有着割舍不开的联系,成其一都离不开其他技术的支撑辅佐,特别是落地应用的时候,在不同的场景中,这个几个技术之间不同的方式配合,成就了现如今的高 ...

  7. 物联网、大数据、云计算、人工智能之间的关系如何?

    物联网.大数据.云计算.人工智能之间的关系如何? 物联网是指通过 各种信息传感器.射频识别技术.全球定位系统.红外感应器.激光扫描器等各种装置与技术,实时采集任何需要监控. 连接.互动的物体或过程,采 ...

  8. 数据建模实战:方寸之间玩转购物篮分析

    购物篮分析是零售行业里非常重要经典的一个模型,曾经被大家津津乐道的啤酒与尿布的故事,相信大家都还记忆犹新,这个故事很好地诠释了商品关联性对销售额的提升作用,时至今日,仍有很强的现实指导意义.这种通过研 ...

  9. vue根据url获取内容axios_vue+vuex+axios从后台获取数据存入vuex,组件之间共享数据...

    在vue项目中组件间相互传值或者后台获取的数据需要供多个组件使用的情况很多的话,有必要考虑引入vuex来管理这些凌乱的状态,今天这边博文用来记录这一整个的过程,后台api接口是使用webpack-se ...

最新文章

  1. SQL Server 远程无法连接
  2. 作为一名准程序员,谈一下现实和未来
  3. 命令点亮硬盘灯_macOS下移动硬盘无法挂载且硬盘灯一直闪烁的解决方法
  4. hdu 1251 统计难题(求前缀出现了多少次)
  5. linux win10 时间同步服务器,windows和linux下服务器时间如何校正?
  6. 获取进程或线程的ID以及句柄信息
  7. 前端倒计时不准的问题
  8. LAMP_PHP配置
  9. JavaScript:indexOf()方法
  10. python点图为什么显示不出来_matplotlib图只显示点而不是lin
  11. Atitit 长距离无线通信法 LoRa NB-IoT NB-CIoT LoRa是Semtech公司的创新发明,该技术向用户提供显著的长距离、低功耗、安全数据传输机制。使用LoRa技术构建的公用网
  12. 领域词汇知识库的类型、可用资源与构建技术漫谈
  13. pdf论文在线翻译网站
  14. 华为手机安装GMS服务
  15. 计算机主板自动重启,我电脑关机后总是自动重启,主板换了
  16. 谈谈软件开发模式:瀑布与敏捷
  17. sybase datediff mysql_Sybase中的日期时间函数_龙的天空
  18. U盘用作启动盘后空间变为原来的一半
  19. hdu 2222 AC 自动机 模版(数组实现)
  20. Visual Studio 2019安装与配置

热门文章

  1. javamail调用阿里企业邮箱实现推送包括多个附件
  2. 移动硬盘\U盘在使用过程中0x80070570 文件或目录损坏且无法读取 CHKDSK 修复方法
  3. 强柱的治疗目标_24周疗效预测因素
  4. 读《当众讲话诀窍》-殷亚敏 (2)
  5. 网站接入服务器必须备案吗,服务器和域名必须备案吗
  6. 【9秒实验室自研】FLA文件资源导出工具 源码开放
  7. QTableWidget实现复制粘贴
  8. 白鹭引擎开发微信小游戏新手教程文档
  9. 用户商家对刷脸支付好评不断普及指日可待
  10. Norgen痰液液化缓冲液解决方案