PBFT算法源码详解
真的好久没有写博客了,正好最近在研究PBFT,那就从PBFT开始写起吧!
先奉上大佬@byron1st写的PBFT代码:https://github.com/bigpicturelabs/simple_pbft
我会带着大家鹿一遍源代码!因为看到网上好多的博客都是互相抄袭,对大家一点帮助没有
整个代码逻辑都是围绕这张图来的!
PBFT算法基础理论部分:https://www.jianshu.com/p/cf1010f39b84
文章目录
- 1.服务的启动主函数
- 2.NewServer函数
- 2.1 NewNode(nodeID)
- 2.1.1 协程1:dispatchMsg
- 2.1.2 协程2:resolveMsg
- 2.2 setRoute函数
- 3.Request共识过程
- 3.0 共识流程图
- 3.1 创建状态createStateForNewConsensus
- 3.2 开始共识StartConsensus
- 3.3 广播消息Broadcast
- 4.PrePrepare共识过程
- 4.1 创建状态createStateForNewConsensus
- 4.2 开始共识PrePrepare
- 4.2.1 验证过程
- 4.3 广播Broadcast
1.服务的启动主函数
func main() {nodeID := os.Args[1] // 这边就是传进来的公司的名称server := network.NewServer(nodeID)server.Start()
}
这是整个PBFT的启动主函数,可以看到会获取用户输入的数值作为nodeID
,然后调用network包中的NewServer函数定义一个服务(可以理解成一个节点的client窗口),然后开始启动server.Start()
2.NewServer函数
这里会给完整的流程图,等我写完所有的部分会重新画的
func NewServer(nodeID string) *Server {// 根据传进来的NodeID新建了一个节点,节点的默认视图是1000000,并且该节点启动了三个协程:dispatchMsg、alarmToDispatcher、resolveMsgnode := NewNode(nodeID)// 启动服务server := &Server{node.NodeTable[nodeID], node}// 设置路由server.setRoute()return server
}
我们可以看到NewServer函数主要干了三件事:
- 根据传进来的nodeID创建一个节点
- 创建一个server服务
- 设置路由
2.1 NewNode(nodeID)
我们先来看第一件事,就是创建一个node,进入node类,看一下node到底在干嘛!
func NewNode(nodeID string) *Node {const viewID = 10000000000 // temporary.node := &Node{// Hard-coded for test.NodeID: nodeID,NodeTable: map[string]string{"Apple": "localhost:1111","MS": "localhost:1112","Google": "localhost:1113","IBM": "localhost:1114",},View: &View{ID: viewID,Primary: "Apple", // 主节点是Apple公司},// Consensus-related structCurrentState: nil,CommittedMsgs: make([]*consensus.RequestMsg, 0), // 被提交的信息MsgBuffer: &MsgBuffer{ReqMsgs: make([]*consensus.RequestMsg, 0),PrePrepareMsgs: make([]*consensus.PrePrepareMsg, 0),PrepareMsgs: make([]*consensus.VoteMsg, 0),CommitMsgs: make([]*consensus.VoteMsg, 0),},// ChannelsMsgEntrance: make(chan interface{}), // 无缓冲的信息接收通道MsgDelivery: make(chan interface{}), // 无缓冲的信息发送通道Alarm: make(chan bool),}// 启动消息调度器go node.dispatchMsg()// Start alarm triggergo node.alarmToDispatcher()// 开始信息表决go node.resolveMsg()return node
}
这边的NewNode其实就是新建了一个节点,并且定义了node结构体中的一些属性信息,node结构体如下:
// 节点
type Node struct {NodeID stringNodeTable map[string]string // key=nodeID, value=urlView *ViewCurrentState *consensus.StateCommittedMsgs []*consensus.RequestMsg // kinda block.MsgBuffer *MsgBufferMsgEntrance chan interface{}MsgDelivery chan interface{}Alarm chan bool
}
我们一步步剖析NewNode
- 初始化视图编号
const viewID = 10000000000
- 定义了node结构体中的一些属性信息
属性 | 值 | 说明 |
---|---|---|
NodeID | nodeID | 节点ID |
NodeTable | map[string]string | 节点索引表 |
View | &View | 设置视图编号和主节点 |
CurrentState | nil | 默认当前节点的状态为nil |
CommittedMsgs | make([]*consensus.RequestMsg, 0) | 被提交的信息 |
MsgBuffer | &MsgBuffer | 四种消息类型缓冲列表 |
MsgEntrance | make(chan interface{}) | 无缓冲的信息接收通道 |
MsgDelivery | make(chan interface{}) | 无缓冲的信息发送通道 |
Alarm | make(chan bool) | 警告通道 |
- 紧接着我们可以看到每一个node都开启了3个
goroutine
- dispatchMsg
- alarmToDispatcher
- resolveMsg
我们先重点关注一下两个协程goroutine
2.1.1 协程1:dispatchMsg
func (node *Node) dispatchMsg() {for {select {case msg := <-node.MsgEntrance: // 如果MsgEntrance通道有消息传送过来,拿到msgerr := node.routeMsg(msg) // 进行routeMsgif err != nil {fmt.Println(err)// TODO: send err to ErrorChannel}case <-node.Alarm:err := node.routeMsgWhenAlarmed()if err != nil {fmt.Println(err)// TODO: send err to ErrorChannel}}}
}
这里需要你对
for select
多路复用有所了解!
我们可以从dispatchMsg
的代码中看到,只要MsgEnrance
通道中有值,就会传递给一个中间变量msg
,然后进行消息路由转发routeMsg
。
func (node *Node) routeMsg(msg interface{}) []error {switch msg.(type) {case *consensus.RequestMsg:if node.CurrentState == nil {// Copy buffered messages first.msgs := make([]*consensus.RequestMsg, len(node.MsgBuffer.ReqMsgs))copy(msgs, node.MsgBuffer.ReqMsgs)// Append a newly arrived message.msgs = append(msgs, msg.(*consensus.RequestMsg))// Empty the buffer.node.MsgBuffer.ReqMsgs = make([]*consensus.RequestMsg, 0)// Send messages.node.MsgDelivery <- msgs} else {node.MsgBuffer.ReqMsgs = append(node.MsgBuffer.ReqMsgs, msg.(*consensus.RequestMsg))}... 其他类型的信息数据return nil
}
我们刚开始先以RequestMsg为例,因为其他类型的消息数据的执行逻辑与RequestMsg几乎是一致的!这是节点会面临两种选择:
- 当CurrentState 为 nil 时,将consensus.RequestMsg的信息复制给中间变量msgs,然后清空重置!并将msgs中的信息发送给MsgDelivery通道!(我们还记得node一直开了3个goroutine嘛,这边传出去,另外一个goroutine
resolveMsg
会立马接受这个通道中的信息) - 当CurrentState 不为 nil 时,直接往MsgBuffer缓冲通道中进行添加,这边会涉及到数组扩容的问题(可以了解一下)
2.1.2 协程2:resolveMsg
func (node *Node) resolveMsg() {for {// 从调度器中获取缓存信息msgs := <-node.MsgDeliveryswitch msgs.(type) {case []*consensus.RequestMsg:// 节点表决决策信息errs := node.resolveRequestMsg(msgs.([]*consensus.RequestMsg))if len(errs) != 0 {for _, err := range errs {fmt.Println(err)}// TODO: send err to ErrorChannel}... 替他类型的信息数据}}
}
这边我也先只以RequestMsg进行讲解!我们可以清楚的看到node.MsgDelivery这边在等着调度器dispatchMsg那边传递消息过来,因为两个缓冲通道都是无缓冲的,没有消息会一直阻塞在这边!
下面就是执行resolveRequestMsg
函数了:
// 节点表决请求阶段的信息
func (node *Node) resolveRequestMsg(msgs []*consensus.RequestMsg) []error {errs := make([]error, 0)// 表决信息for _, reqMsg := range msgs {err := node.GetReq(reqMsg)if err != nil {errs = append(errs, err)}}if len(errs) != 0 {return errs}return nil
}
在resolveRequestMsg代码块中,我们可以看到主要的执行逻辑就是调用了GetReq
函数
// 主节点开始全局的共识
func (node *Node) GetReq(reqMsg *consensus.RequestMsg) error {LogMsg(reqMsg)// Create a new state for the new consensus.err := node.createStateForNewConsensus()if err != nil {return err}// 开始共识程序prePrepareMsg, err := node.CurrentState.StartConsensus(reqMsg)if err != nil {return err}LogStage(fmt.Sprintf("Consensus Process (ViewID:%d)", node.CurrentState.ViewID), false)// 发现getPrePrepare信息// 这边主节点开始向其他节点发送预准备消息了if prePrepareMsg != nil {node.Broadcast(prePrepareMsg, "/preprepare")LogStage("Pre-prepare", true)}return nil
}
- 创建一个新的共识状态
createStateForNewConsensus
- 对节点当前状态开始共识
StartConsensus
- 共识完成之后,主节点向其他节点广播
prePrepareMsg
阶段的信息!
这边先去看第3章共识过程
2.2 setRoute函数
func (server *Server) setRoute() {http.HandleFunc("/req", server.getReq)http.HandleFunc("/preprepare", server.getPrePrepare)http.HandleFunc("/prepare", server.getPrepare)http.HandleFunc("/commit", server.getCommit)http.HandleFunc("/reply", server.getReply)
}
3.Request共识过程
3.0 共识流程图
这张流程图是为了方便大家理解,等你梳理完整个流程,再回过头来重新看我画的这个流程图,思路应该会更清晰一点!
一只安慕嘻是我的B站账号,大家也可以关注 o(´^`)o
3.1 创建状态createStateForNewConsensus
func (node *Node) createStateForNewConsensus() error {// Check if there is an ongoing consensus process.if node.CurrentState != nil {return errors.New("another consensus is ongoing")}// Get the last sequence IDvar lastSequenceID int64if len(node.CommittedMsgs) == 0 {lastSequenceID = -1} else {lastSequenceID = node.CommittedMsgs[len(node.CommittedMsgs) - 1].SequenceID}// Create a new state for this new consensus process in the Primarynode.CurrentState = consensus.CreateState(node.View.ID, lastSequenceID)LogStage("Create the replica status", true)return nil
}
- 首选判断当前节点的状态是不是为nil,也就是说判断当前节点是不是处于其他阶段(预准备阶段或者准备阶段等等)
- 判断当前阶段是否已经发送过消息,如果是首次进行共识,则上一个序列号
lastSequenceID
设置为-1,否则我们取出上一个序列号 - 创建状态
CreateState
,主要是初始化State之后返回
func CreateState(viewID int64, lastSequenceID int64) *State {return &State{ViewID: viewID,MsgLogs: &MsgLogs{ReqMsg:nil,PrepareMsgs:make(map[string]*VoteMsg),CommitMsgs:make(map[string]*VoteMsg),},LastSequenceID: lastSequenceID,CurrentStage: Idle,}
}
3.2 开始共识StartConsensus
// 主节点开始共识
func (state *State) StartConsensus(request *RequestMsg) (*PrePrepareMsg, error) {// 消息的序号为sequenceID,也就是当前时间sequenceID := time.Now().UnixNano()// Find the unique and largest number for the sequence IDif state.LastSequenceID != -1 {for state.LastSequenceID >= sequenceID {sequenceID += 1}}// Assign a new sequence ID to the request message object.request.SequenceID = sequenceID// Save ReqMsgs to its logs.state.MsgLogs.ReqMsg = request// 得到客户端请求消息requestMsg的摘要digest, err := digest(request)if err != nil {fmt.Println(err)return nil, err}// 将当前阶段转换到PrePrepared阶段state.CurrentStage = PrePrepared// 这边其实就是主节点向其他节点发送消息的格式:视图ID,请求的序列号,请求信息和请求信息的摘要return &PrePrepareMsg{ViewID: state.ViewID,SequenceID: sequenceID,Digest: digest,RequestMsg: request,}, nil
}
- 首先使用当前的时间戳作为序列号
- 如果上一个序列号 >= 当前序列号,当前序列号+1,目的很简单,主节点每开始一次共识,序列号+1
- 进行序号号
SequenceID
的更新,获取请求信息的摘要digest(request)
,将当前状态转化为PrePrepared
3.3 广播消息Broadcast
// 节点广播函数
func (node *Node) Broadcast(msg interface{}, path string) map[string]error {errorMap := make(map[string]error)for nodeID, url := range node.NodeTable {// 因为不需要向自己进行广播了,所以就直接跳过if nodeID == node.NodeID {continue}// 将msg信息编码成json格式jsonMsg, err := json.Marshal(msg)if err != nil {errorMap[nodeID] = errcontinue}// 将json格式传送给其他的节点send(url + path, jsonMsg) // url localhost:1111 path:/prepare等等// send函数:http.Post("http://"+url, "application/json", buff)}if len(errorMap) == 0 {return nil} else {return errorMap}
}
- 遍历node节点初始化时自带的节点列表
node.NodeTable
- 将msg信息编码成json格式
json.Marshal(msg)
,然后发送给其他节点send(url + path, jsonMsg)
func send(url string, msg []byte) {buff := bytes.NewBuffer(msg)_, _ = http.Post("http://"+url, "application/json", buff)
}
4.PrePrepare共识过程
顺便帮助大家回忆一下,在resolveMsg
相应的goroutine中会判断传进来的
信息类型,前面一直说的是request类型,现在我们开始鹿PrePrepare的代码
case []*consensus.PrePrepareMsg:errs := node.resolvePrePrepareMsg(msgs.([]*consensus.PrePrepareMsg))if len(errs) != 0 {for _, err := range errs {fmt.Println(err)}// TODO: send err to ErrorChannel}
调用resolvePrePrepareMsg
函数,开启prePrepareMsg的共识
func (node *Node) resolvePrePrepareMsg(msgs []*consensus.PrePrepareMsg) []error {errs := make([]error, 0)// Resolve messagesfor _, prePrepareMsg := range msgs {err := node.GetPrePrepare(prePrepareMsg)if err != nil {errs = append(errs, err)}}if len(errs) != 0 {return errs}return nil
}
对于每一个prePrepareMsg 信息会调用GetPrePrepare
函数,下面正式开始共识!
// 到了预准备阶段,所有节点开始参与共识
func (node *Node) GetPrePrepare(prePrepareMsg *consensus.PrePrepareMsg) error {LogMsg(prePrepareMsg)// Create a new state for the new consensus.err := node.createStateForNewConsensus()if err != nil {return err}prePareMsg, err := node.CurrentState.PrePrepare(prePrepareMsg)if err != nil {return err}if prePareMsg != nil {// Attach node ID to the messageprePareMsg.NodeID = node.NodeIDLogStage("Pre-prepare", true)node.Broadcast(prePareMsg, "/prepare")LogStage("Prepare", false)}return nil
}
4.1 创建状态createStateForNewConsensus
这边跟request阶段是一样的,所以就省略啦!
4.2 开始共识PrePrepare
// 预准备阶段的共识
func (state *State) PrePrepare(prePrepareMsg *PrePrepareMsg) (*VoteMsg, error) {// Get ReqMsgs and save it to its logs like the primary.state.MsgLogs.ReqMsg = prePrepareMsg.RequestMsg// 验证视图ID、消息序列号和信息摘要是否正确if !state.verifyMsg(prePrepareMsg.ViewID, prePrepareMsg.SequenceID, prePrepareMsg.Digest) {return nil, errors.New("当前节点预准备阶段消息是错误的")}// Change the stage to pre-prepared.state.CurrentStage = PrePreparedreturn &VoteMsg{ViewID: state.ViewID,SequenceID: prePrepareMsg.SequenceID,Digest: prePrepareMsg.Digest,MsgType: PrepareMsg,}, nil
}
- 进行状态验证
- 将当前阶段改为
PrePrepared
4.2.1 验证过程
// 验证
func (state *State) verifyMsg(viewID int64, sequenceID int64, digestGot string) bool {// Wrong view. That is, wrong configurations of peers to start the consensus.if state.ViewID != viewID {return false}// Check if the Primary sent fault sequence number. => Faulty primary.// TODO: adopt upper/lower bound check.if state.LastSequenceID != -1 {if state.LastSequenceID >= sequenceID {return false}}digest, err := digest(state.MsgLogs.ReqMsg)if err != nil {fmt.Println(err)return false}// Check digest.if digestGot != digest {return false}return true
}
- 验证传进来的视图ID是否与当前状态的视图ID一致
- 验证序号号是否为当前最大值
- 验证当前的摘要是否与原始的request信息摘要是一致的
4.3 广播Broadcast
广播和request阶段也是一样的,所以也忽略啦!
PBFT算法源码详解相关推荐
- 【5G/4G】加/解密+完整性保护/校验算法源码详解
文章目录 加/解密+完整性保护/校验算法源码详解 一.加解密算法 二.完整性保护/校验算法 本人就职于国际知名终端厂商,负责modem芯片研发. 在5G早期负责终端数据业务层.核心网相关的开发工作,目 ...
- faiss hnsw 算法源码详解 - train
hnswlib 代码分析 hnswlib 源码分析 train过程说明 主要是生成hnsw模型 Hnsw中,storage中存储的原始的中心点向量 生成hnsw 为每层分配空间 每层的中心点在当前层及 ...
- fdct算法 java_ImageSharp源码详解之JPEG压缩原理(3)DCT变换
DCT变换可谓是JPEG编码原理里面数学难度最高的一环,我也是因为DCT变换的算法才对JPEG编码感兴趣(真是不自量力).这一章我就把我对DCT的研究心得体会分享出来,希望各位大神也不吝赐教. 1.离 ...
- 【 反向传播算法 Back-Propagation 数学推导以及源码详解 深度学习 Pytorch笔记 B站刘二大人(3/10)】
反向传播算法 Back-Propagation 数学推导以及源码详解 深度学习 Pytorch笔记 B站刘二大人(3/10) 数学推导 BP算法 BP神经网络可以说机器学习的最基础网络.对于普通的简单 ...
- java的数组与Arrays类源码详解
java的数组与Arrays类源码详解 java.util.Arrays 类是 JDK 提供的一个工具类,用来处理数组的各种方法,而且每个方法基本上都是静态方法,能直接通过类名Arrays调用. 类的 ...
- AidLux“换脸”案例源码详解 (Python)
"换脸"案例源码详解 (Python) faceswap_gui.py用于换脸,可与facemovie_gui.py身体互换源码(上一篇文章)对照观看 打开faceswap_gui ...
- 【 卷积神经网络CNN 数学原理分析与源码详解 深度学习 Pytorch笔记 B站刘二大人(9/10)】
卷积神经网络CNN 数学原理分析与源码详解 深度学习 Pytorch笔记 B站刘二大人(9/10) 本章主要进行卷积神经网络的相关数学原理和pytorch的对应模块进行推导分析 代码也是通过demo实 ...
- 【多输入模型 Multiple-Dimension 数学原理分析以及源码详解 深度学习 Pytorch笔记 B站刘二大人 (6/10)】
多输入模型 Multiple-Dimension 数学原理分析以及源码源码详解 深度学习 Pytorch笔记 B站刘二大人(6/10) 数学推导 在之前实现的模型普遍都是单输入单输出模型,显然,在现实 ...
- Integer源码详解
尊重原创,转载请标明出处 http://blog.csdn.net/abcdef314159 对于Integer这个类估计大家也都非常熟悉了,以前看过他的源码,但也只是粗略的看了一下,最近有时间 ...
- java的String类源码详解
java的String类源码详解 类的定义 public final class Stringimplements java.io.Serializable, Comparable<String ...
最新文章
- Bert时代的创新:Bert应用模式比较及其它
- php百分比乘加,用php简单实现加减乘除计算器
- shell脚本——expect命令
- MySql 创建索引原则
- Sublime Text 3在ubuntu12.10下无法中文输入的解决方案
- 【312】◀▶ arcpy 常用函数说明
- 高可用+负载均衡 方案
- MaxCompute 挑战使用SQL进行序列数据处理
- layui父页面调用子页面的渲染_layUI ajax加载html页面后重新渲染的方法
- nginx(4、缓存)
- 4.分布式服务架构:原理、设计与实战 --- 大数据日志系统的构建
- zepto 自定义打包
- Coca语料库的使用方法
- BROTHER 废墨清零教学
- kodi资源_安装Kodi展示播放NAS电影
- 微信小程序 渲染层网络错误_渲染层网络层错误 微信小程序开发 - 云计算资讯 - 服务器之家...
- Visio方向键无法移动对象的解决办法[笔记本版]
- 一次Ajax报错:“存储空间不足,无法完成此操作”的解决经验
- TensorFlow搭建LSTM实现多变量时间序列预测(负荷预测)
- 【我的OpenGL学习进阶之旅】介绍一下 绘制图元
热门文章
- 最优秀的数据可视化案例欣赏
- 幸运抽奖java_Java 幸运抽奖项目
- it职位简称_IT行业常见职位英文缩写
- 机器学习 | MATLAB实现GLM广义线性模型参数设定
- linux服务器查sn,命令查看服务器SN号
- STM32读取HMC5883L的偏航角数据
- 渗透测试工程师常见面试33题——应届生
- icom对讲机写频线定义_ICOM对讲机的常见故障和使用中的问题
- 小米air耳机重新配对_小米air耳机重新配对_「小三爷出品」不错的新年礼物,小米蓝牙耳机Air体验...
- linux ubuntu版本选择,如何选择一个合适的Ubuntu版本