# Fabric 1.0源代码笔记 之 Orderer #BroadcastServer(Broadcast服务端)
## 1、BroadcastServer概述
BroadcastServer相关代码在protos/orderer、orderer目录下。
protos/orderer/ab.pb.go,AtomicBroadcastServer接口定义。
orderer/server.go,go,AtomicBroadcastServer接口实现。
## 2、AtomicBroadcastServer接口定义
### 2.1、AtomicBroadcastServer接口定义
```go
type AtomicBroadcastServer interface {
Broadcast(AtomicBroadcast_BroadcastServer) error
Deliver(AtomicBroadcast_DeliverServer) error
}
//代码在protos/orderer/ab.pb.go
···
### 2.2、gRPC相关实现
```go
var _AtomicBroadcast_serviceDesc = grpc.ServiceDesc{
    ServiceName: "orderer.AtomicBroadcast",
    HandlerType: (*AtomicBroadcastServer)(nil),
    Methods: []grpc.MethodDesc{},
    Streams: []grpc.StreamDesc{
        {
            StreamName: "Broadcast",
            Handler: _AtomicBroadcast_Broadcast_Handler,
            ServerStreams: true,
            ClientStreams: true,
        },
        {
            StreamName: "Deliver",
            Handler: _AtomicBroadcast_Deliver_Handler,
            ServerStreams: true,
            ClientStreams: true,
        },
    },
    Metadata: "orderer/ab.proto",
}
func RegisterAtomicBroadcastServer(s *grpc.Server, srv AtomicBroadcastServer) {
    s.RegisterService(&_AtomicBroadcast_serviceDesc, srv)
}
func _AtomicBroadcast_Broadcast_Handler(srv interface{}, stream grpc.ServerStream) error {
    return srv.(AtomicBroadcastServer).Broadcast(&atomicBroadcastBroadcastServer{stream})
}
func _AtomicBroadcast_Deliver_Handler(srv interface{}, stream grpc.ServerStream) error {
    return srv.(AtomicBroadcastServer).Deliver(&atomicBroadcastDeliverServer{stream})
}
//代码在protos/orderer/ab.pb.go
```
## 3、AtomicBroadcastServer接口实现
### 3.1、server结构体
server结构体:
```go
type server struct {
bh broadcast.Handler
dh deliver.Handler
}
type broadcastSupport struct {
multichain.Manager
broadcast.ConfigUpdateProcessor
}
//代码在orderer/server.go
```
broadcast.Handler:
```go
type Handler interface {
Handle(srv ab.AtomicBroadcast_BroadcastServer) error
}
type handlerImpl struct {
sm SupportManager
}
func NewHandlerImpl(sm SupportManager) Handler {
return &handlerImpl{
sm: sm,
}
}
type SupportManager interface {
ConfigUpdateProcessor
GetChain(chainID string) (Support, bool)
}
type ConfigUpdateProcessor interface { //处理通道配置更新
Process(envConfigUpdate *cb.Envelope) (*cb.Envelope, error)
}
//代码在orderer/common/broadcast/broadcast.go
```
deliver.Handler:
```go
type Handler interface {
Handle(srv ab.AtomicBroadcast_DeliverServer) error
}
type deliverServer struct {
sm SupportManager
}
type SupportManager interface {
GetChain(chainID string) (Support, bool)
}
//代码在orderer/common/deliver/deliver.go
```
### 3.2、server结构体相关方法
```go
//构建server结构体
func NewServer(ml multichain.Manager, signer crypto.LocalSigner) ab.AtomicBroadcastServer
//s.bh.Handle(srv)
func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error
//s.dh.Handle(srv)
func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error
//代码在orderer/server.go
```
func NewServer(ml multichain.Manager, signer crypto.LocalSigner) ab.AtomicBroadcastServer代码如下:
```go
func NewServer(ml multichain.Manager, signer crypto.LocalSigner) ab.AtomicBroadcastServer {
s := &server{
dh: deliver.NewHandlerImpl(deliverSupport{Manager: ml}),
bh: broadcast.NewHandlerImpl(broadcastSupport{
Manager: ml,
ConfigUpdateProcessor: configupdate.New(ml.SystemChannelID(), configUpdateSupport{Manager: ml}, signer),
}),
}
return s
}
//代码在orderer/server.go
```
### 3.3、Broadcast服务端Broadcast处理流程
Broadcast服务端Broadcast处理流程,即broadcast.handlerImpl.Handle方法。
#### 3.3.1、接收Envelope消息,并获取Payload和ChannelHeader
```go
msg, err := srv.Recv() //接收Envelope消息
payload, err := utils.UnmarshalPayload(msg.Payload) //反序列化获取Payload
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader) //反序列化获取ChannelHeader
//代码在orderer/common/broadcast/broadcast.go
```
#### 3.3.2、如果消息类型为channel配置或更新,则使用multichain.Manager处理消息
```
if chdr.Type == int32(cb.HeaderType_CONFIG_UPDATE) { //如果是channel配置或更新
msg, err = bh.sm.Process(msg) //configupdate.Processor.Process方法
}
//代码在orderer/common/broadcast/broadcast.go
```
msg, err = bh.sm.Process(msg)代码如下:
```go
func (p *Processor) Process(envConfigUpdate *cb.Envelope) (*cb.Envelope, error) {
channelID, err := channelID(envConfigUpdate) //获取ChannelHeader.ChannelId
//multichain.Manager.GetChain方法,获取chainSupport,以及chain是否存在
support, ok := p.manager.GetChain(channelID)
if ok {
//已存在的channel配置,调取multichain.Manager.ProposeConfigUpdate方法
return p.existingChannelConfig(envConfigUpdate, channelID, support)
}
//新channel配置,调取multichain.Manager.NewChannelConfig方法
return p.newChannelConfig(channelID, envConfigUpdate)
}
//代码在orderer/configupdate/configupdate.go
```
#### 3.3.3、其他消息类型或channel消息处理后,接受消息并加入排序
```go
support, ok := bh.sm.GetChain(chdr.ChannelId) //获取chainSupport
_, filterErr := support.Filters().Apply(msg) //filter.RuleSet.Apply方法
//调取Chain.Enqueue方法,接受消息,加入排序
support.Enqueue(msg)
//代码在orderer/common/broadcast/broadcast.go
```
#### 3.3.4、向客户端发送响应信息
```go
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
//代码在orderer/common/broadcast/broadcast.go
```
### 3.4、Broadcast服务端Deliver处理流程
Broadcast服务端Deliver处理流程,即deliver.deliverServer.Handle方法。
```go
func (ds *deliverServer) Handle(srv ab.AtomicBroadcast_DeliverServer) error {
for {
//接收客户端查询请求
envelope, err := srv.Recv()
payload, err := utils.UnmarshalPayload(envelope.Payload)
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
chain, ok := ds.sm.GetChain(chdr.ChannelId)
erroredChan := chain.Errored()
select {
case <-erroredChan:
return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
default:
}
lastConfigSequence := chain.Sequence()
sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
result, _ := sf.Apply(envelope)
seekInfo := &ab.SeekInfo{}
err = proto.Unmarshal(payload.Data, seekInfo)
cursor, number := chain.Reader().Iterator(seekInfo.Start)
var stopNum uint64
switch stop := seekInfo.Stop.Type.(type) {
case *ab.SeekPosition_Oldest:
stopNum = number
case *ab.SeekPosition_Newest:
stopNum = chain.Reader().Height() - 1
case *ab.SeekPosition_Specified:
stopNum = stop.Specified.Number
if stopNum < number {
return sendStatusReply(srv, cb.Status_BAD_REQUEST)
}
}
for {
if seekInfo.Behavior == ab.SeekInfo_BLOCK_UNTIL_READY {
select {
case <-erroredChan:
return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
case <-cursor.ReadyChan():
}
} else {
select {
case <-cursor.ReadyChan():
default:
return sendStatusReply(srv, cb.Status_NOT_FOUND)
}
}
currentConfigSequence := chain.Sequence()
if currentConfigSequence > lastConfigSequence {
lastConfigSequence = currentConfigSequence
sf := sigfilter.New(policies.ChannelReaders, chain.PolicyManager())
result, _ := sf.Apply(envelope)
}
block, status := cursor.Next()
err := sendBlockReply(srv, block)
if stopNum == block.Header.Number {
break
}
}
err := sendStatusReply(srv, cb.Status_SUCCESS)
}
}
```

网址:http://www.qukuailianxueyuan.io/

欲领取造币技术与全套虚拟机资料

区块链技术交流QQ群:756146052  备注:CSDN

尹成学院微信:备注:CSDN

网址:http://www.qukuailianxueyuan.io/

欲领取造币技术与全套虚拟机资料

区块链技术交流QQ群:756146052  备注:CSDN

尹成学院微信:备注:CSDN

Fabric 1.0源代码分析(30) Orderer #BroadcastServer(Broadcast服务端)相关推荐

  1. Fabric 1.0源代码分析(25) Orderer

    # Fabric 1.0源代码笔记 之 Orderer ## 1.Orderer概述 Orderer,为排序节点,对所有发往网络中的交易进行排序,将排序后的交易安排配置中的约定整理为块,之后提交给Co ...

  2. Fabric 1.0源代码分析(27) Orderer #configupdate(处理通道配置更新)

    # Fabric 1.0源代码笔记 之 Orderer #configupdate(处理通道配置更新) ## 1.configupdate概述 configupdate,用于接收配置交易,并处理通道配 ...

  3. Fabric 1.0源代码分析(29) Orderer #multichain(多链支持包)

    # Fabric 1.0源代码笔记 之 Orderer #multichain(多链支持包) ## 1.multichain概述 multichain代码集中在orderer/multichain目录 ...

  4. Fabric 1.0源代码分析(26)Orderer #ledger(Orderer Ledger)

    # Fabric 1.0源代码笔记 之 Orderer #ledger(Orderer Ledger) ## 1.Orderer Ledger概述 Orderer Ledger代码分布在orderer ...

  5. Fabric 1.0源代码分析(28) Orderer #localconfig(Orderer配置文件定义)

    # Fabric 1.0源代码笔记 之 Orderer #localconfig(Orderer配置文件定义) ## 1.配置文件定义 ```bash General: #通用配置 LedgerTyp ...

  6. Fabric 1.0源代码分析(32) Peer #peer node start命令实现

    # Fabric 1.0源代码笔记 之 Peer #peer node start命令实现 ## 1.peer node加载子命令start和status peer node加载子命令start和st ...

  7. Fabric 1.0源代码分析(15)gossip(流言算法)

    # Fabric 1.0源代码笔记 之 gossip(流言算法) ## 1.gossip概述 gossip,翻译为流言蜚语,即为一种可最终达到一致的算法.最终一致的另外的含义就是,不保证同时达到一致. ...

  8. Fabric 1.0源代码分析(37) Peer #DeliverClient(Deliver客户端)

    # Fabric 1.0源代码笔记 之 Peer #DeliverClient(Deliver客户端) ## 1.DeliverClient概述 DeliverClient代码分布如下: * peer ...

  9. Fabric 1.0源代码分析(45)gRPC(Fabric中注册的gRPC Service)

    # Fabric 1.0源代码笔记 之 -gRPC(Fabric中注册的gRPC Service) Peer节点中注册的gRPC Service,包括: * Events Service(事件服务): ...

最新文章

  1. Hadoop的数据管理
  2. POJ - 1190 生日蛋糕(dfs+剪枝)
  3. 新闻发布项目——访问温馨提示
  4. 使用git和github进行协同开发流程
  5. nuxt服务端php,nuxt服务端部署上线
  6. CenterNet原理详解
  7. 数据饕餮,盛夏旋风!天善学院SVIP冰点促最后一波!
  8. 【互动出版网】央视热播BBC纪录片同名图书买二赠一活动
  9. 百度网盘python客户端——筑梦之路
  10. HDU 5296 Annoying problem LCA+树状数组
  11. 技能分享 | 麦肯锡教给我的写作武器:如何讲好一句话
  12. 数据结构二叉树后序遍历非递归算法
  13. ubuntu20 安装rtx3080 记录
  14. Vue编程的团队代码规范
  15. 《计算机是怎样跑起来的》之 体验一次手工汇编
  16. linux 剪刀石头布c语言,利用C语言编写“剪刀石头布”小游戏
  17. 911S5正式谢幕后 如何找到一个好用的替代品
  18. 第11节 三个败家子(11)——女王与甄妃
  19. 唯有自身强大才能呼风唤雨—Intel要携CXL一统互联江湖了吗?
  20. 【opencv学习笔记】003之图像像素基本操作(获取像素指针、范围处理)及掩膜操作(filter2D)详解

热门文章

  1. 4月13日—4月17日三年级课程
  2. MAMP无法启动servers问题的解决
  3. FANUC、ABB、YASKAWA、SCARA机器人入门书籍整理
  4. 视频专栏《软件测试工程师 必备 之 Jenkins / Linux / Git》基础视频
  5. Altium Designer 14 制作Mark点
  6. 模板列中不自动换行的解决方案
  7. 必须收藏的学位论文免费下载网站!!各国论文免费下载!英国 欧洲 美国 芬兰 日本 加拿大 等等20余
  8. TOGAF10®标准中文版(全文目录)
  9. 相机图像传感器参数及其对成像的影响分析
  10. ios 去掉底部状态栏_iOS状态栏隐藏及显示问题终极解决方案