文章目录

  • 一、Lab3A背景
  • 二、client端
    • 2.1、Clerk结构体与初始化
    • 2.2 OP RPC
  • 三、server端
    • 3.1、server结构体及初始化
    • 3.2、信息转接及RPC定义
  • 四、Lab3B
    • 4.1、lab3B结构
    • 4.2、重写loop
    • 4.3、序列化与反序列化、初始化
  • 五、Debug杂谈
  • 总结

一、Lab3A背景

对于3A来说的话,整体实现并不是很难,在paper中主要对应的是 section8。这次的实验就是实现在lab2中raft服务层的上一层service与client的交互。

这是总体的交互,而对于本次实验我也简单画了个图:

  • 我们需要进行在client中去编写make,put/get/append等关于RPC又或者clerk初始化的函数。
  • 然后这个函数的RPC会传到server中对应的put/get/append函数中,再由这些函数调用raft服务层,在raft进行共识。
  • 最后由raft服务层apply到server中的applyCh中,但是这里返回的msg为raft封装好的command我们需要用自定义的Loop将command封装回传进来的op结构体给server,最后再返回回去。

二、client端

2.1、Clerk结构体与初始化

  • clerk主要保存的是client信息
type Clerk struct {servers []*labrpc.ClientEnd// You will have to modify this struct.seqId    intleaderId int // 确定哪个服务器是leader,下次直接发送给该服务器clientId int64
}
  • 其中对于seqId其实是为了这种情况:

if the leader crashes after committing the log entry but before responding to the client, the client will retry the command with a new leader, causing it to be executed a second time。

  • 如果这个leader在commit log后crash了,但是还没响应给client,client就会重发这条command给新的leader,这样就会导致这个op执行两次。
  • 而这种解决办法就是每次发送操作时附加一个唯一的序列号去为了标识操作,避免op被执行两次。

If it receives a command whose serial number has already been executed, it responds immediately without re-executing the request.

  • 而leaderId其实是为了下次能够直接发给正确leader(在hint中也有提到)

You will probably have to modify your Clerk to remember which server turned out to be the leader for the last RPC, and send the next RPC to that server first. This will avoid wasting time searching for the leader on every RPC, which may help you pass some of the tests quickly enough.

  • 初始化
func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {ck := new(Clerk)ck.servers = servers// You'll have to add code here.ck.clientId = nrand()ck.leaderId = mathrand.Intn(len(ck.servers))return ck
}

nrand()就是为了随机生成clientId,提供好的,mathrand这里是直接用的库随机生成一个开头LeaderId。

2.2 OP RPC

  • get操作Rpc
func (ck *Clerk) Get(key string) string {// You will have to modify this function.ck.seqId++args := GetArgs{Key: key, ClientId: ck.clientId, SeqId: ck.seqId}serverId := ck.leaderIdfor {reply := GetReply{}//fmt.Printf("[ ++++Client[%v]++++] : send a Get,args:%+v,serverId[%v]\n", ck.clientId, args, serverId)ok := ck.servers[serverId].Call("KVServer.Get", &args, &reply)if ok {if reply.Err == ErrNoKey {ck.leaderId = serverIdreturn ""} else if reply.Err == OK {ck.leaderId = serverIdreturn reply.Value} else if reply.Err == ErrWrongLeader {serverId = (serverId + 1) % len(ck.servers)continue}}// 节点发生crash等原因serverId = (serverId + 1) % len(ck.servers)}}
  • Put/Append 操作RPC
func (ck *Clerk) PutAppend(key string, value string, op string) {// You will have to modify this function.ck.seqId++serverId := ck.leaderIdargs := PutAppendArgs{Key: key, Value: value, Op: op, ClientId: ck.clientId, SeqId: ck.seqId}for {reply := PutAppendReply{}//fmt.Printf("[ ++++Client[%v]++++] : send a %v,serverId[%v] : serverId:%+v\n", ck.clientId, op, args, serverId)ok := ck.servers[serverId].Call("KVServer.PutAppend", &args, &reply)if ok {if reply.Err == OK {ck.leaderId = serverIdreturn} else if reply.Err == ErrWrongLeader {serverId = (serverId + 1) % len(ck.servers)continue}}serverId = (serverId + 1) % len(ck.servers)}}

三、server端

3.1、server结构体及初始化

  • op结构体:
type Op struct {// Your definitions here.// Field names must start with capital letters,// otherwise RPC will break.SeqId    intKey      stringValue    stringClientId int64Index    int // raft服务层传来的IndexOpType   string
}

op结构体的设计要能接受上层client发来的参数,并且能够转接的raft服务层回来的command。

  • KVServer结构体:
type KVServer struct {mu      sync.Mutexme      intrf      *raft.RaftapplyCh chan raft.ApplyMsgdead    int32 // set by Kill()maxraftstate int // snapshot if log grows this big// Your definitions here.seqMap    map[int64]int     //为了确保seq只执行一次   clientId / seqIdwaitChMap map[int]chan Op   //传递由下层Raft服务的appCh传过来的command  index / chan(Op)kvPersist map[string]string // 存储持久化的KV键值对  K / V
}

主要定义的就是seqMap,waitChMap,kvPersist作为信息临时存放、处理、持久化等。

  • 初始化:
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {// call labgob.Register on structures you want// Go's RPC library to marshall/unmarshall.labgob.Register(Op{})kv := new(KVServer)kv.me = mekv.maxraftstate = maxraftstate// You may need initialization code here.kv.applyCh = make(chan raft.ApplyMsg)kv.rf = raft.Make(servers, me, persister, kv.applyCh)// You may need initialization code here.kv.seqMap = make(map[int64]int)kv.kvPersist = make(map[string]string)kv.waitChMap = make(map[int]chan Op)go kv.applyMsgHandlerLoop()return kv
}

主要定义的就是初始化几个map,并开启转接Loop;

3.2、信息转接及RPC定义

  • Get操作RPC
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {// Your code here.if kv.killed() {reply.Err = ErrWrongLeaderreturn}_, ifLeader := kv.rf.GetState()if !ifLeader {reply.Err = ErrWrongLeaderreturn}// 封装Op传到下层startop := Op{OpType: "Get", Key: args.Key, SeqId: args.SeqId, ClientId: args.ClientId}//fmt.Printf("[ ----Server[%v]----] : send a Get,op is :%+v \n", kv.me, op)lastIndex, _, _ := kv.rf.Start(op)ch := kv.getWaitCh(lastIndex)defer func() {kv.mu.Lock()delete(kv.waitChMap, op.Index)kv.mu.Unlock()}()// 设置超时tickertimer := time.NewTicker(100 * time.Millisecond)defer timer.Stop()select {case replyOp := <-ch://fmt.Printf("[ ----Server[%v]----] : receive a GetAsk :%+v,replyOp:+%v\n", kv.me, args, replyOp)if op.ClientId != replyOp.ClientId || op.SeqId != replyOp.SeqId {reply.Err = ErrWrongLeader} else {reply.Err = OKkv.mu.Lock()reply.Value = kv.kvPersist[args.Key]kv.mu.Unlock()return}case <-timer.C:reply.Err = ErrWrongLeader}}

其中的getWaitCh就是为了获取raftStart对应下标的缓冲chan。

func (kv *KVServer) getWaitCh(index int) chan Op {kv.mu.Lock()defer kv.mu.Unlock()ch, exist := kv.waitChMap[index]if !exist {kv.waitChMap[index] = make(chan Op, 1)ch = kv.waitChMap[index]}return ch
}

要注意的对raft进行start后他其实提交的其实是applyCh,主体是raft.ApplyMsg,因此还需要loop将applyMsg转接成op再返回到waitCh中。

  • 转接信息的Loop。
func (kv *KVServer) applyMsgHandlerLoop() {for {if kv.killed() {return}select {case msg := <-kv.applyCh:index := msg.CommandIndexop := msg.Command.(Op)//fmt.Printf("[ ~~~~applyMsgHandlerLoop~~~~ ]: %+v\n", msg)if !kv.ifDuplicate(op.ClientId, op.SeqId) {kv.mu.Lock()switch op.OpType {case "Put":kv.kvPersist[op.Key] = op.Valuecase "Append":kv.kvPersist[op.Key] += op.Value}kv.seqMap[op.ClientId] = op.SeqIdkv.mu.Unlock()}// 将返回的ch返回waitChkv.getWaitCh(index) <- op}}
}

而其中判断是否是重复操作的也比较简单,因为我是对seq进行递增,所以直接比大小即可。

func (kv *KVServer) ifDuplicate(clientId int64, seqId int) bool {kv.mu.Lock()defer kv.mu.Unlock()lastSeqId, exist := kv.seqMap[clientId]if !exist {return false}return seqId <= lastSeqId
}
  • put/append RPC 这个函数就与get大同小异了
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {// Your code here.if kv.killed() {reply.Err = ErrWrongLeaderreturn}_, ifLeader := kv.rf.GetState()if !ifLeader {reply.Err = ErrWrongLeaderreturn}// 封装Op传到下层startop := Op{OpType: args.Op, Key: args.Key, Value: args.Value, SeqId: args.SeqId, ClientId: args.ClientId}//fmt.Printf("[ ----Server[%v]----] : send a %v,op is :%+v \n", kv.me, args.Op, op)lastIndex, _, _ := kv.rf.Start(op)ch := kv.getWaitCh(lastIndex)defer func() {kv.mu.Lock()delete(kv.waitChMap, op.Index)kv.mu.Unlock()}()// 设置超时tickertimer := time.NewTicker(100 * time.Millisecond)select {case replyOp := <-ch://fmt.Printf("[ ----Server[%v]----] : receive a %vAsk :%+v,Op:%+v\n", kv.me, args.Op, args, replyOp)// 通过clientId、seqId确定唯一操作序列if op.ClientId != replyOp.ClientId || op.SeqId != replyOp.SeqId {reply.Err = ErrWrongLeader} else {reply.Err = OK}case <-timer.C:reply.Err = ErrWrongLeader}defer timer.Stop()
}

四、Lab3B

4.1、lab3B结构

对于lab3B来说就是要引入raft2D的快照,去尽可能的减少时间。这里重新画下加上snapshot的结构图:

其实就只要在写入操作时,判断持久化的大小需不需要进行快照存储就行。

4.2、重写loop

  • 重写loop
// 处理applyCh发送过来的ApplyMsg
func (kv *KVServer) applyMsgHandlerLoop() {for {if kv.killed() {return}select {case msg := <-kv.applyCh:if msg.CommandValid {// 传来的信息快照已经存储了if msg.CommandIndex <= kv.lastIncludeIndex {return}index := msg.CommandIndexop := msg.Command.(Op)//fmt.Printf("[ ~~~~applyMsgHandlerLoop~~~~ ]: %+v\n", msg)if !kv.ifDuplicate(op.ClientId, op.SeqId) {kv.mu.Lock()switch op.OpType {case "Put":kv.kvPersist[op.Key] = op.Valuecase "Append":kv.kvPersist[op.Key] += op.Value}kv.seqMap[op.ClientId] = op.SeqIdkv.mu.Unlock()}// 如果需要snapshot,且超出其stateSizeif kv.maxraftstate != -1 && kv.rf.GetRaftStateSize() > kv.maxraftstate {snapshot := kv.PersistSnapShot()kv.rf.Snapshot(msg.CommandIndex, snapshot)}// 将返回的ch返回waitChkv.getWaitCh(index) <- op}if msg.SnapshotValid {kv.mu.Lock()// 判断此时有没有竞争if kv.rf.CondInstallSnapshot(msg.SnapshotTerm, msg.SnapshotIndex, msg.Snapshot) {// 读取快照的数据kv.DecodeSnapShot(msg.Snapshot)kv.lastIncludeIndex = msg.SnapshotIndex}kv.mu.Unlock()}}}
}

超出范围传到raft服务层,自身也要进行持久序列化:

// 如果需要snapshot,且超出其stateSizeif kv.maxraftstate != -1 && kv.rf.GetRaftStateSize() > kv.maxraftstate {snapshot := kv.PersistSnapShot()kv.rf.Snapshot(msg.CommandIndex, snapshot)}

收到snapshot返回的msg直接反序列化出来:

if msg.SnapshotValid {kv.mu.Lock()// 判断此时有没有竞争if kv.rf.CondInstallSnapshot(msg.SnapshotTerm, msg.SnapshotIndex, msg.Snapshot) {// 读取快照的数据kv.DecodeSnapShot(msg.Snapshot)kv.lastIncludeIndex = msg.SnapshotIndex}kv.mu.Unlock()}

4.3、序列化与反序列化、初始化

对于序列化和反序列化就是之前的那一套了,除了缓冲等待的chan不需要其实的都persist就行。

func (kv *KVServer) DecodeSnapShot(snapshot []byte) {if snapshot == nil || len(snapshot) < 1 {return}r := bytes.NewBuffer(snapshot)d := labgob.NewDecoder(r)var kvPersist map[string]stringvar seqMap map[int64]intif d.Decode(&kvPersist) == nil && d.Decode(&seqMap) == nil {kv.kvPersist = kvPersistkv.seqMap = seqMap} else {fmt.Printf("[Server(%v)] Failed to decode snapshot!!!", kv.me)}
}// PersistSnapShot 持久化快照对应的map
func (kv *KVServer) PersistSnapShot() []byte {kv.mu.Lock()defer kv.mu.Unlock()w := new(bytes.Buffer)e := labgob.NewEncoder(w)e.Encode(kv.kvPersist)e.Encode(kv.seqMap)data := w.Bytes()return data
}
  • 初始化也是加上快照的信息:
func StartKVServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int) *KVServer {// call labgob.Register on structures you want// Go's RPC library to marshall/unmarshall.labgob.Register(Op{})kv := new(KVServer)kv.me = mekv.maxraftstate = maxraftstate// You may need initialization code here.kv.applyCh = make(chan raft.ApplyMsg)kv.rf = raft.Make(servers, me, persister, kv.applyCh)// You may need initialization code here.kv.seqMap = make(map[int64]int)kv.kvPersist = make(map[string]string)kv.waitChMap = make(map[int]chan Op)kv.lastIncludeIndex = -1// 因为可能会crash重连snapshot := persister.ReadSnapshot()if len(snapshot) > 0 {kv.DecodeSnapShot(snapshot)}go kv.applyMsgHandlerLoop()return kv
}

五、Debug杂谈

  • 3A:
    对于本次的lab3A总体构建其实并不难,但是其中的test对lab2的实现其实要求还是挺高的。就拿TestSpeed3A来说这个test主要是进行1000次的op请求,再计算平均每次的op时间,要求在30ms以下。对比笔者来说一开始test截图是这样的:

    比要求的30ms慢了四倍,后面一部分的原因是因为我在进行发送RPC的时候,发现其实还是重新进行了初始的serverId进行发送,并没有按照原来的进行。

  • 这一部分的原因在于我在发送rpc返回成功时并没有再一次的锁定,导致被马上第二次发送的rpc给重新覆盖了,因此在发送成功后再进行锁定就行。

    然后再者就是我在raft中三个Loop之间的休眠时间太长,因为lab2中也定义了相关投票时间为150~300,也应该有笔者实现的可能不是那么的完美orz…
    在进行raft的三个ticker的休眠时间后,勉强能降到46ms。

    还差一点,不过相比当初的120ms已经算是减少了2/3,到46ms,lab2的时间也大概缩短了100s左右到400s进行,但是如果只是为了lab3A这个实验能够达到allpast其实也是可以的。

将2b的休眠改的非常短:

但是这样对于我来说这就会有一个很矛盾的点,特别是在raft的2b的testCount中,他其实是为了测试你的RPC数量是否过多,要保持在30以内,那么你就要为了减少heartbeat rpc的次数,而增大休眠时间,而这个则是跟3a的速度测试矛盾的。而且如果把各个休眠时间减少,那么在raft很容易造成容错问题,因为选举等因为时间减少很容易时间散列会到一起,造成平常可能几百份之一的特殊情况现在可能出现的概率会提高几十倍,我认为这是不值得。也因此最后的平衡的情况就是为了通过lab2的全部test保证更高的可用性,牺牲lab3A的性能保证结果的正确性。

同样对于3b来说也是如此:

  • 2022.10.30更新
    对于这里后面其实也有老哥提出了优化的策略:
    就是在start 后立马append entries,而不需要通过ticker去检测。 我自己试了下的确是可以通过的,在30s左右。


就是在并发的情况下可能改的兼容性还是不行。只能算勉强通过通过的all past?

这里也只是相当于给各位读者提供一个思路。对于我写的来说其实还是可以有几个可以优化的思路:

  • 可以在每次append后自己判断一下是否可以apply,加快ch收到的间隔。
    (但是这种优化思路和开头那种优化思路都需要导致额外的append func竞争,并发下对变量的控制也就更严格。)
  • 还有种思路就是,其实对我来说append entries的ticker可以再细分为是否是建立心跳,还是append操作。这样的话也能使ticker的效率来的更高。

总结

这次实验虽然简单但是,也很清晰的体验到了分布式系统三个特性可能有的时候并不是能全不达到,可扩展性、性能、可用性,而这三个特性其实在一开始的lab1的introduction就有提到,希望在后续做完的时候能够去进行一次总结。
gitee:lab3A

MIT6.824-lab3AB-2022(万字推导思路及代码构建)相关推荐

  1. MIT6.824-lab2A-2022篇(万字推导思路及代码构建)

    目录 前言 一.学习背景 二.实验引入 三.结构体实现 3.1 State的定义 3.2 AppendEntries RPC的定义 3.3 RequestVote RPC的定义 四.领导选举 4.1初 ...

  2. MIT6.824-lab2B-2022篇(万字推导思路及代码构建)

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言 一.整体流程思路 二.初始化,发送ticker 2.1.初始化 2.2.发送ticker 三.进行日志增量的RPC 3 ...

  3. mit6.824 2022 lab2

    MIT6.824 2022 Raft Raft leader election log persistence log compaction 整体测试 后面发现的问题 参考代码 汇总博客:MIT6.8 ...

  4. MIT6.824环境搭建:wls+vs code

    MIT6.824环境搭建:wls+vs code 背景 尝试学习MIT 6.824分布式系统,他们的实验使用的是go语言,并且不支持window.打算使用wls+vs code搭建开发环境.这里做记录 ...

  5. 卡尔曼滤波推导思路总结

    推导思路一: (1) 混合高斯 一维高斯函数形式: (1)N(x,μ,σ)=1σ2πe−(x−μ)22σ2\mathcal N(x,\mu,\sigma)=\frac{1}{\sigma\sqrt{2 ...

  6. 2022年蓝桥杯Python程序设计B组思路和代码分享

    2022年蓝桥杯Python程序设计B组比赛结束了,分享一下题目以及思路. 文章目录 A:排列字母 题目: 思路: 代码: B: 寻找整数 题目: 思路: 代码: C: 纸张尺寸 题目: 思路: 代码 ...

  7. 2022年全国大学生数学建模竞赛E题目-小批量物料生产安排详解+思路+Python代码时序预测模型(三)

    目录 前言 一.六种物料挑选 二.周数处理 三.时序预测模型 模型预测结果 建模的部分后续将会写出,想要了解更多的欢迎加博主微信,免费获取更多细化思路+模型! 点关注,防走丢,如有纰漏之处,请留言指教 ...

  8. MIT6.824 lab4B实验记录

    Background 主要是完成一个可以根据group数量,动态调整shard所属的group的分布式kv键值引擎.其中shard->group的配置由shardctrler集群来管理,底层也是 ...

  9. 如何评价2022年数维杯国际赛C题【如何使用大脑结构诊断阿尔茨海默病】【D题:拉尼娜事件】思路模型代码全套资料+高清代码结果图+完整源程序+论文

    资料获取方式我放到下面了,欢迎小伙伴一起学习交流 2022数维杯国际赛C题:https://mbd.pub/o/bread/Y5yVlp1x2022年数维杯国际赛[D题:拉尼娜事件]思路模型代码全套资 ...

  10. 【核心内容及推导思路】人类记忆系统之谜,也许就是这么回事儿

    文章目录 0. 前言 1. 推导思路 第1步(猜想的由来及核心内容): 第2步(解剖学上的"疑似证据"): 记忆输入通路示意图 记忆检出通路示意图 第3步(记忆特征上的" ...

最新文章

  1. Google发布Zipkin与Stackdriver Trace的集成功能
  2. 软件工程python就业方向-月薪2万+的Python Web岗,学到什么程度能找到工作?
  3. swt 键盘事件ctrl+c_跑Python的键盘可以很强大
  4. oracle 层次查询判断叶子和根节点
  5. 从零开始搭建Ubuntu 环境下的Android 源码开发环境
  6. 随想录(一种新的读写锁的写法)
  7. linux 更新系统命令,Linux系统自动更新时间命令的详细说明
  8. 中级软件设计师考试(软考中级)设计模式分类及其典型特点
  9. EXCEL表格-COUNTIF函数查找数据重复项
  10. 将字符串数组含有特定字符的值输出{“张三丰“,“张翠山“,“张无忌“,“宋远桥“,“莫声谷“,“俞正声“}
  11. c语言.jpg图片转成数组_如何把pdf图片转成jpg?快看高手私藏实用的技巧
  12. Windows系统下编译torch-points-kernels
  13. 人脸图像切割分离工具
  14. 微信小程序-计算器小程序《从零开始学微信小程序》
  15. 几种排序算法的稳定性分析
  16. win7更换锁屏壁纸(操作步骤)
  17. springboot访问下载/resource/static下的静态资源;下载excel文件损坏,打不开
  18. 系统集成项目管理工程师各种口诀技巧分享(3)
  19. 2022年武汉市创新型中小企业认定条件和评价指标
  20. 漫谈一条SQL语句的一生

热门文章

  1. 学习Python的第一天
  2. matlab a律非均匀量化,均匀量化与A律PCM非均匀量化实验.doc
  3. 计算机中的颗粒度(granularity)什么是颗粒度?
  4. Windows XP IIS 500内部错误 解决方案(HTTP 500内部服务器错误)
  5. 企查猫app数据解密
  6. wordpress 如何添加Canonical 标签(不通过插件)
  7. redis存储新闻列表_聊聊Redis使用场景
  8. CMD的打开方式以及一些常用的Dos命令
  9. 天行健,君子以自强不息;地势坤,群子以厚德载物。
  10. python课程改进建议_关于Python课程的思考和意见