主osd收到副osd发送来的MSG_OSD_PG_NOTIFY消息后,会将该消息中所带的osd的日志信息合并到本地。

主osd收到MSG_OSD_PG_NOTIFY消息后,会调用ms_fast_dispatch进行处理,其调用栈如下

OSD::ms_fast_dispatch(Message *m)  case MSG_OSD_PG_NOTIFY:  //pg中其他从osd发来的MOSDPGNotify信息return handle_fast_pg_notify(static_cast<MOSDPGNotify*>(m));   for (auto& p : m->get_pg_list())  //p的类型是pair<pg_notify_t,PastIntervals>enqueue_peering_evt(pgid,PGPeeringEventRef(std::make_shared<PGPeeringEvent>(p.first.epoch_sent,p.first.query_epoch,MNotifyRec(pgid, pg_shard_t(from, p.first.from),p.first,m->get_connection()->get_features(),p.second),true,new PGCreateInfo(pgid,p.first.query_epoch,p.first.info.history,p.second,false)))); op_shardedwq.queue(OpQueueItem(unique_ptr<OpQueueItem::OpQueueable>(new PGPeeringItem(pgid, evt)),10,cct->_conf->osd_peering_op_priority,utime_t(),0,evt->get_epoch_sent()));  //这里插入的Op,在ShardedOpWQ::_process中被处理,如下

ShardedOpWQ的处理函数对该op的处理如下:

OSD::ShardedOpWQ::_processOpQueueItem item = sdata->pqueue->dequeue();slot->to_process.push_back(std::move(item));auto qi = std::move(slot->to_process.front());qi.run(osd, sdata, pg, tp_handle);               PGPeeringItem::runosd->dequeue_peering_evt(sdata, pg.get(), evt, handle);pg->do_peering_event(evt, &rctx);recovery_state.handle_event(evt, rctx); //evt为上面传进去的 MNotifyRec , 创建pg后第一次执行的这里时传进来的为NullEvtstart_handle(rctx);machine.process_event(evt->get_event()); //主osd在GetInfo状态收到MNotifyRec事件后,调用PG::RecoveryState::GetInfo::react(const MNotifyRec& infoevt)PG::RecoveryState::GetInfo::react(const MNotifyRec& infoevt)set<pg_shard_t>::iterator p = peer_info_requested.find(infoevt.from);if (p != peer_info_requested.end())peer_info_requested.erase(p);   //删除已经获取到信息的osdpg->blocked_by.erase(infoevt.from.osd);pg->proc_replica_info(infoevt.from, infoevt.notify.info, infoevt.notify.epoch_sent)  map<pg_shard_t, pg_info_t>::iterator p = peer_info.find(from);if (p != peer_info.end() && p->second.last_update == oinfo.last_update)  //信息已经存在并且last_update相等,则推出dout(10) << " got dup osd." << from << " info " << oinfo << ", identical to ours" << dendl;return false if (!get_osdmap()->has_been_up_since(from.osd, send_epoch))  //这个osd现在是不是up的,且这个osd的up from 是不是小于等于send_epoch,  send_epoch为从osd发送notify时的opochreturn false;peer_info[from] = oinfo;might_have_unfound.insert(from);update_history(oinfo.history); //利用对方的history更新自己的if (info.history.merge(new_history))dirty_info = true;if (info.history.last_epoch_clean >= info.history.same_interval_since)//已经修复完成了,不需要历史信息了past_intervals.clear()dirty_big_info = true;if (!is_up(from) && !is_acting(from))  //如果from都不在这两个集合里/*对于replica osd is_up就是从up vector里判断是否有from,相似,is_acting就是从acting vector里找, ec就不太一样了*/stray_set.insert(from);if (is_clean())purge_strays();  //发消息让这个osd去除pgif (p == peer_info.end())update_heartbeat_peers();if trueif (old_start < pg->info.history.last_epoch_started)  //epoch_t old_start = pg->info.history.last_epoch_started;,即未处理replica info之前的ldout(pg->cct, 10) << " last_epoch_started moved forward, rebuilding prior" << dendl;prior_set = pg->build_prior();   //因为更改了一些变量,重新建立priorset<pg_shard_t>::iterator p = peer_info_requested.begin();while (p != peer_info_requested.end()) //从当前peer_info_requested中删除用不到的probeif (prior_set.probe.count(*p) == 0) {peer_info_requested.erase(p++);} else++p;get_infos();  //重新获取probe中的osd信息if (peer_info_requested.empty() && !prior_set.pg_down)//pg_down只有在past interval阶段osd up的数量不够,且有osd 处于down时才会被设置post_event(GotInfo());  //此时主osd处于GetInfo状态,接收到GotInfo后转入到GetLog状态,进入GetLog构造函数

(1)从peer_info_requested和blocked_by集合中删除该osd。
(2)调用update_history更新历史信息
(3)因为更新了一些变量,因此需要重新调用build_prior来重新计算要获取日志的osd列表,如果未新的osd,则需要向该osd get_infos。
(4)如果peer_info_requested为空且pg_down为false,则说明目前用于pg恢复的必要的osd的info都已经有了,因此抛出GotInfo事件进入GetLog状态。

GetLog的构造函数如下

PG::RecoveryState::GetLog::GetLog(my_context ctx)context< RecoveryMachine >().log_enter(state_name);  //"Started/Primary/Peering/GetLog"if (!pg->choose_acting(auth_log_shard, false, &context< Peering >().history_les_bound))   //有选择一个拥有权威日志的OSD auth_log_shardif (!pg->want_acting.empty())   //monitor中的acting需要变化post_event(NeedActingChange());elsepost_event(IsIncomplete());if (auth_log_shard == pg->pg_whoami)post_event(GotLog());  if (pg->info.last_update < best.log_tail)post_event(IsIncomplete());return;context<RecoveryMachine>().send_query(auth_log_shard,pg_query_t(pg_query_t::LOG, auth_log_shard.shard, pg->pg_whoami.shard, request_log_from, pg->info.history,  pg->get_osdmap()->get_epoch()));pg->blocked_by.insert(auth_log_shard.osd); //返回dequeue_peering_evt函数的调用栈中, 并在dispatch_context中发送query

(1)调用choose_acting去计算acting集合,在初始情况下,monitor会为每一个pg预先分配一个acting集合,但是这个集合不一定准确,因此需要重新计算acting集合。acting集合负责对接客户端读写请求,即客户端直接通信的是acting集合的第一个osd,up集合是crush规则算出来了的,最后pg恢复完成后,acting会被赋值为up集合。如果choose_acting返回false并且want_acting不为空,则说明acting集合需要改变,则抛出NeedActingChange事件。如果acting集合不需要变化,继续往下。
(2)如果当前主OSD就是含有权威日治,则抛出GotLog事件。
(3)如果当前主OSD最新日志小于权威日志的最老日志,则该主OSD恢复不了,抛出IsIncomplete。
(4)调用send_query将query消息入队,query消息是用来向权威日志所在OSD索要权威日志的。其最终会在dispatch_context中发送。

choose_acting用来计算该PG的acting集合,如下

map<pg_shard_t, pg_info_t>::const_iterator auth_log_shard = find_best_info(all_info, restrict_to_up_acting, history_les_bound);
if (auth_log_shard == all_info.end())if (up != acting)want_acting = up;osd->queue_want_pg_temp(info.pgid.pgid, empty);return false;
auth_log_shard_id = auth_log_shard->first;
calc_replicated_acting(auth_log_shard, get_osdmap()->get_pg_size(info.pgid.pgid), acting, up, up_primary, all_info, restrict_to_up_acting, &want, &want_backfill, &want_acting_backfill, ss);want->push_back(primary->first.osd); //将这里选举的primary作为want和acting_backfill的第一个,注意这里和monitor选举的可能不一样acting_backfill->insert(primary->first);for (vector<int>::const_iterator i = up.begin(); i != up.end(); ++i)if (cur_info.is_incomplete() || cur_info.last_update < oldest_auth_log_entry)] 其不能根据权威日志修复backfill->insert(up_cand);acting_backfill->insert(up_cand);elsewant->push_back(*i);acting_backfill->insert(up_cand);if (usable >= size) //如果当前up中的osd足够,则返回return;for (vector<int>::const_iterator i = acting.begin();i != acting.end(); ++i) //这个acting是从osdmap中获取出来的,monitor可以提前计算的acting const pg_info_t &cur_info = all_info.find(acting_cand)->second;if (cur_info.is_incomplete() || cur_info.last_update < oldest_auth_log_entry)elsecandidate_by_last_update.push_back(make_pair(cur_info.last_update, *i));for (auto &p: candidate_by_last_update)    want->push_back(p.second);pg_shard_t s = pg_shard_t(p.second, shard_id_t::NO_SHARD);acting_backfill->insert(s);if (usable >= size) return;for (map<pg_shard_t,pg_info_t>::const_iterator i = all_info.begin(); i != all_info.end();  ++i)if (i->second.is_incomplete() || i->second.last_update < oldest_auth_log_entry)elsecandidate_by_last_update.push_back(make_pair(i->second.last_update, i->first.osd));for (auto &p: candidate_by_last_update)want->push_back(p.second);pg_shard_t s = pg_shard_t(p.second, shard_id_t::NO_SHARD);acting_backfill->insert(s);usable++;if (usable >= size)return;
if (want != acting) { //需要向monitor申请acting(temp),因为这时候acting是从osdmap中的temppg里获取的,其为monitor提前计算的want_acting = want;if (want_acting == up)osd->queue_want_pg_temp(info.pgid.pgid, empty);elseosd->queue_want_pg_temp(info.pgid.pgid, want);return false;acting_recovery_backfill = want_acting_backfill;
if (backfill_targets.empty())backfill_targets = want_backfill; //want_backfill是up集合中不能根据权威日志来恢复的osd,不能依据权威日志就是其last_update小于权威日治最老的版本
return true;

(1)对于副本模式,调用calc_replicated_acting去计算want, want_backfill, want_acting_backfill。
(2)将主OSD入队到want和want_acting_backfill集合。
(3)遍历up集合的OSD,如果OSD不能根据权威日志来reocvery,则加入到want_backfill和want_acting_backfill集合,否则加入到want和want_acting_backfill集合。
(4)遍历acting集合,注意此时这个acting集合是monitor预先分配的集合,这个预先分配的集合很可能不适用,所以需要改变。如果acting集合中的OSD可以根据权威日志恢复(last_update不小于权威日志的最老日志),则加入到want和want_acting_backfill集合。
(5)从all_info中获取,其依据也是能根据权威日志恢复,如果可以则加入到want和want_acting_backfill集合。
注意以上过程中,如果加入到want中的数量达到了pg的副本大小就退出。因此可以看到want集合保存了计算得到的真正的acting集合,want_backfill保存了up集合中需要执行backfilling操作的OSD,want_acting_backfill是want和want_backfill的并集。
(6)calc_replicated_acting返回后,如果want和acting集合不相等,就需要通知monitor去修改acting集合。这是通过queue_want_pg_temp实现的,queue_want_pg_temp只讲wang加入到pg_temp_wanted中,在函数调用栈返回到dequeue_peering_evt中后,通过调用service.send_pg_temp();来发送给monitor。

在GetLog状态里,如果主OSD就权威日志所在OSD,则抛出GotLog事件,GetLog状态对GotLog事件的处理如下:

PG::RecoveryState::GetLog::react(const GotLog&)if (msg) //如果主osd不是权威osd,则需要向权威osd索要日志,此时msg不为空pg->proc_master_log(*context<RecoveryMachine>().get_cur_transaction(),msg->info, msg->log, msg->missing,auth_log_shard);pg->start_flush(context< RecoveryMachine >().get_cur_transaction());return transit< GetMissing >();  //进入GetMissing状态PG::RecoveryState::GetMissing::GetMissing(my_context ctx)context< RecoveryMachine >().log_enter(state_name);for (set<pg_shard_t>::iterator i = pg->acting_recovery_backfill.begin();i != pg->acting_recovery_backfill.end();++i)//acting_recovery_backfill就是want_acting_backfilling集合const pg_info_t& pi = pg->peer_info[*i];if (pi.last_update < pg->pg_log.get_tail()) //不需要拉取日志了,因为不能用日志来恢复了,只能执行backfillingpg->peer_missing[*i].clear();continue;if (pi.last_backfill == hobject_t())pg->peer_missing[*i].clear();continue;if (pi.last_update == pi.last_complete && pi.last_update == pg->info.last_update) pg->peer_missing[*i].clear();continue;since.epoch = pi.last_epoch_started;if (pi.log_tail <= since)context< RecoveryMachine >().send_query(*i, pg_query_t(pg_query_t::LOG, i->shard, pg->pg_whoami.shard, since, pg->info.history, pg->get_osdmap()->get_epoch()));elsecontext< RecoveryMachine >().send_query(*i, pg_query_t(pg_query_t::FULLLOG,  i->shard, pg->pg_whoami.shard,pg->info.history, pg->get_osdmap()->get_epoch()));      peer_missing_requested.insert(*i);pg->blocked_by.insert(i->osd);if (peer_missing_requested.empty())if (pg->need_up_thru)post_event(NeedUpThru());post_event(Activate(pg->get_osdmap()->get_epoch()));// all good!   当前子状态都处于Peering状态boost::statechart::transition< Activate, Active >,接收到Activate事件后,进入Active状态context< RecoveryMachine >().log_enter(state_name); //"Started/Primary/Active"pg->start_flush(context< RecoveryMachine >().get_cur_transaction());pg->activate(*context< RecoveryMachine >().get_cur_transaction(),pg->get_osdmap()->get_epoch(),*context< RecoveryMachine >().get_query_map(),context< RecoveryMachine >().get_info_map(),context< RecoveryMachine >().get_recovery_ctx());pg->publish_stats_to_osd();

(1)遍历acting_recovery_backfill集合,acting_recovery_backfill集合包括了acting集合和up中需要backfilling的osd集合,如果集合中的osd可以根据权威日志获取,则需要向这个osd发送query消息拉取日志,分为拉取全部日志和部分日志,并将此osd插入到peer_missing_requested中。
(2)如果peer_missing_requested为空,则说明不需要向其他osd拉取日志,其他osd不用恢复或者只用backfilling恢复,就抛出Activate事件。

如果拉取的日志返回来了,则抛出MLogRec事件,GetMissing对该事件处理如下

boost::statechart::result PG::RecoveryState::GetMissing::react(const MLogRec& logevt)peer_missing_requested.erase(logevt.from);pg->proc_replica_log(logevt.msg->info, logevt.msg->log, logevt.msg->missing, logevt.from);pg_log.proc_replica_log(oinfo, olog, omissing, from);if (peer_missing_requested.empty()) post_event(Activate(pg->get_osdmap()->get_epoch()));    

如果收到对方发送来的日志,就从peer_missing_requested中删除这个osd,并调用proc_replica_log处理副本日志,如果peer_missing_requested为空,则说明要reocvery的osd的日志都受到了,就抛出Activate事件。

ceph pg peering和恢复 (2)相关推荐

  1. Ceph pg状态总结

    PG( placement group)是一个放置策略组,它是对象的集合,该集合里的所有对象都具有相同的放置策略:简单点说就是相同PG内的对象都会放到相同的硬盘上: PG是 ceph的核心概念, 服务 ...

  2. ceph PG状态及部分故障(状态)模拟

    1. PG介绍 这次主要来分享Ceph中的PG各种状态详解,PG是最复杂和难于理解的概念之一,PG的复杂如下: - 在架构层次上,PG位于RADOS层的中间. a. 往上负责接收和处理来自客户端的请求 ...

  3. Ceph入门到精通-Ceph PG状态详细介绍(全)

    本文主要介绍PG的各个状态,以及ceph故障过程中PG状态的转变. Placement Group States(PG状态) creating Ceph is still creating the p ...

  4. ceph 查看是否在恢复_Ceph osd故障恢复

    1  调高osd的日志等级 加上红框那一行就可以了 osd的日志路径:/var/log/ceph/ceph-osd.3.log 注意:加上了这一行后日志会刷很多,所以要特别注意日志容量的变化,以防把v ...

  5. [置顶]Ceph源码解析:PG peering

    转载请注明出处.陈小跑 http://blog.csdn.net/a513029766 集群中的设备异常(异常OSD的添加删除操作),会导致PG的各个副本间出现数据的不一致现象,这时就需要进行数据的恢 ...

  6. ceph PG设计,状态机,peering,recovery 导图

  7. Ceph PG 归置组状态

    归置组状态 检查集群状态时(如运行 ceph -s 或 ceph -w ), Ceph 会报告归置组状态.一个归置组有一到多种状态,其最优状态为 active+clean . Creating Cep ...

  8. ceph PG和PGP调整经验

    PG和PGP调整经验 调整前准备 为了降低对业务的影响,需要调整以下参数 ceph tell osd.* injectargs '–osd-max-backfills 1' ceph tell osd ...

  9. ceph 查看是否在恢复_Ceph的最佳实践

    背景 最近在做个对象存储,基于ROOK项目,简而言之,就是使用Rook提供的K8s operator,基于K8s cluster部署Ceph服务. 部署 将Rook项目克隆到本地,使用里面的k8s/c ...

  10. ceph 查看是否在恢复_Ceph monitor故障恢复探讨

    1 问题 一般来说,在实际运行中,ceph monitor的个数是2n+1(n>=0)个,在线上至少3个,只要正常的节点数>=n+1,ceph的paxos算法能保证系统的正常运行.所以,对 ...

最新文章

  1. JSBridge深度剖析
  2. stm32跑python-STM32F4系列使用MicroPython开发
  3. 广告小程序后端开发(4.导入地区数据,修改adminx,修改models,手动添加模拟数据)...
  4. 使用Spring Initializer快速创建Spring Boot项目
  5. 结对开发四------求一维无头数组最大子数组的和
  6. CodeForces - 856B Similar Words(AC自动机+树形dp)
  7. 弄清 CSS3 的 transition 和 animation
  8. 32位JDK和64位JDK
  9. Win11锁屏快捷键是什么 Win11锁屏的方法
  10. [渝粤教育] 西南科技大学 交通工程学 在线考试复习资料
  11. 新医改背景下,民营医院如何通过绩效变革支持高质量发展?
  12. 互联网思维之大数据思维
  13. Android10.0 startService启动过程
  14. 【微信小程序】video视频(77/100)
  15. Android 图片波浪动画,Android水纹波浪动画
  16. 手机查看pcap文件_java抓包后对pcap文件解析示例
  17. 起底欧莱雅“美护”帝国:23个品牌“连轴转”,打下2119亿江山
  18. Believe in yourself
  19. 利用 imu_utils 标定 imu
  20. 升压芯片很简单(三),FSB628升压芯片大串讲

热门文章

  1. 计算机技术在生物学中的应用鲁东大学,项目学习在高中生物学教学中的应用研究...
  2. 除了赚钱,腾讯游戏还有诗和远方!
  3. 线程数据共享:pthread_getspecific和pthread_setspecific
  4. html 怎么做图标在圆圈上旋转,纯CSS3图标旋转效果代码
  5. wps/word中怎么插入各种水平分隔线
  6. 黑客的google运用技巧
  7. 使用python-docx模块解析docx文档
  8. Python for S60(pys60)介绍
  9. 传言阿里P10赵海平,被P11多隆判定3.25离职,如何评价阿里 P10 赵海平对王垠的面试?
  10. 【朝花夕拾】【编程基础】一 存储单位