声明:链码开发语言是golang,源码分析是基于fabric 1.4.0版本

用户链码与peer的关系

用户链码是一个独立的进程,使用docker封装(非dev模式下)。

链码容器由peer创建,在启动容器时指定了peer的地址,所以链码容器启动后能够找到peer,并建立tcp长连接,其中peer为服务端,协议是:grpc->http2->tcp。

switch ccType {case pb.ChaincodeSpec_GOLANG.String(), pb.ChaincodeSpec_CAR.String():lc.Args = []string{"chaincode", fmt.Sprintf("-peer.address=%s", c.PeerAddress)}case pb.ChaincodeSpec_JAVA.String():lc.Args = []string{"/root/chaincode-java/start", "--peerAddress", c.PeerAddress}case pb.ChaincodeSpec_NODE.String():lc.Args = []string{"/bin/sh", "-c", fmt.Sprintf("cd /usr/local/src; npm start -- --peer.address %s", c.PeerAddress)}default:return nil, errors.Errorf("unknown chaincodeType: %s", ccType)}

链码进程在和peer建立长连接之后,会有一个注册的动作,注册完成之后,链码处于ready状态,开始接收背书请求。
注:由于链码容器只会和一个peer建立长连接,所以一个链码容器不可能被多个peer使用,但可以被同一个peer的多个通道共用。

背书过程分析

peer在同时收到多个client的交易提案时(接收动作是并行,因为是不同的tcp长连接),

而把提案信息发送给链码,这个发送动作必然是串行的,因为只有一个tcp长连接!不过这些都不影响我们的链码编写。

接下来看链码侧逻辑。

链码进程启动之后,核心代码:

负责与peer交互的
函数:func chatWithPeer(chaincodename string, stream PeerChaincodeStream, cc Chaincode) error {}
位置:core/chaincode/shim/chaincode.go: 321函数:func (handler *Handler) handleReady(msg *pb.ChaincodeMessage, errc chan error) error {}
位置:core/chaincode/shim/handler.go: 774函数:func (handler *Handler) handleTransaction(msg *pb.ChaincodeMessage, errc chan error) {}
位置:core/chaincode/shim/handler.go: 238

在chatWithPeer函数里面,使用一个无限for循环,不停的接收peer发送的消息。

如果peer是ready状态,调用handleTransaction函数,这个函数里启动了一个goroutine(关键点),在这个goroutine函数里调用了我们的Invoke函数,并把invoke的结果返回给peer!

如果极短的时间内,链码收到了多个背书请求,就会创建多个goroutine,那么这些goroutine就可能在同一时刻被执行。更多细节可以了解goroutine调度相关资料。
现在确认了多次调用Invoke是并行的(至少开始执行是并行的),不管是否是同一个接口!

在我们的业务逻辑中,不可避免的会读写世界状态,两个goroutine中同时调用GetState/PutState,是否会互相影响,最终导致串行呢?
分析源码,以GetState为例,调用堆栈如下:

func (stub *ChaincodeStub) GetState(key string) ([]byte, error) {}func (handler *Handler) handleGetState(collection string, key string, channelId string, txid string) ([]byte, error) {}func (handler *Handler) callPeerWithChaincodeMsg(msg *pb.ChaincodeMessage, channelID, txid string) (pb.ChaincodeMessage, error) {}func (handler *Handler) createChannel(channelID, txid string) (chan pb.ChaincodeMessage, error) {}func (handler *Handler) sendReceive(msg *pb.ChaincodeMessage, c chan pb.ChaincodeMessage) (pb.ChaincodeMessage, error) {}func (handler *Handler) serialSendAsync(msg *pb.ChaincodeMessage, errc chan error) {}

callPeerWithChaincodeMsg源码如下:

func (handler *Handler) callPeerWithChaincodeMsg(msg *pb.ChaincodeMessage, channelID, txid string) (pb.ChaincodeMessage, error) {// Create the channel on which to communicate the response from the peervar respChan chan pb.ChaincodeMessagevar err errorif respChan, err = handler.createChannel(channelID, txid); err != nil { // 创建管道return pb.ChaincodeMessage{}, err}defer handler.deleteChannel(channelID, txid)  // 删除管道return handler.sendReceive(msg, respChan)
}

createChannel会创建一个管道,并以channelID+txid为key,缓存到Handler对象中,在收到peer返回的数据后,根据channelID+txid找到对应的管道,将数据放入管道中。

注意:创建管道时,如果channelID+txid对应的管道已存在,就返回失败,这意味着Invoke中无法通过创建goroutine来调用GetState等涉及与peer交互的接口。

sendReceive的源码如下:

func (handler *Handler) sendReceive(msg *pb.ChaincodeMessage, c chan pb.ChaincodeMessage) (pb.ChaincodeMessage, error) {errc := make(chan error, 1)handler.serialSendAsync(msg, errc) // 异步发送,发送动作的结果通过errc管道返回for { // 为了处理serialSendAsync发送失败的情况,才使用forselect {case err := <-errc:if err == nil {continue}//would have been logged, return falsereturn pb.ChaincodeMessage{}, errcase outmsg, val := <-c: // 接收peer的应答if !val {return pb.ChaincodeMessage{}, errors.New("unexpected failure on receive")}return outmsg, nil}}
}

因为链码进程和peer之间只有一个tcp长连接,所以,发给peer的消息必然是串行的!

发送动作的底层是通过调用serialSendAsync这个函数把消息发给peer,函数源码如下:

func (handler *Handler) serialSendAsync(msg *pb.ChaincodeMessage, errc chan error) {go func() { //新的goroutineerr := handler.serialSend(msg) //串行发送到peerif errc != nil {errc <- err}}()
}

可以看到,serialSendAsync创建了一个新的goroutine来处理发送,那么这个发送动作就不再影响上层的goroutine了。

考虑这个场景:此时有两笔交易,都调用了链码的test接口,test接口中调用了GetState去读取key1和key2两个世界状态
假设GetState耗时较长,这时,我们知晓存在两个goroutine,同时调用了GetState,在把请求异步发给peer后,都处于阻塞状态(在sendReceive的for循环中),当结果返回时,可能存在两个物理线程同时执行这两个goroutine,这就意味着这两笔交易存在并行的场景。

结论:链码进程中的Invoke是并行执行的

验证

分析完源码,就开始验证吧!

链码源码

package mainimport ("fmt""github.com/hyperledger/fabric/core/chaincode/shim"pb "github.com/hyperledger/fabric/protos/peer""math/rand""sync""time"
)type simplechaincode struct {txIndex inttxIndexLock sync.MutexdataDB map[int]int
}func (t *simplechaincode) Init(stub shim.ChaincodeStubInterface) pb.Response {return shim.Success(nil)
}func (t *simplechaincode) Invoke(stub shim.ChaincodeStubInterface) pb.Response {funcName, args := stub.GetFunctionAndParameters()if funcName == "test" {return t.test(stub, args)}else if funcName == "testMap" {return t.testMap(stub, args)}else if funcName == "testWithPutState" {return t.testWithPutState(stub, args)}else if funcName == "testGoroutine" {return t.testGoroutine(stub, args)}return shim.Error("Invalid Smart Contract function name, or no right!")
}func (t *simplechaincode) getIndex() int { // 这里加个锁,保证每笔交易分配的index是不同的,方便分析,不会影响Invoke是串行还是并行t.txIndexLock.Lock()defer t.txIndexLock.Unlock()t.txIndex += 1return t.txIndex
}// 不涉及操作世界状态
func (t *simplechaincode) test(stub shim.ChaincodeStubInterface, args []string) pb.Response {now := time.Now()  // 获取被调用时的当前时间index := t.getIndex() // 分配index,这里有互斥锁sleep := rand.Intn(1000)time.Sleep(time.Duration(sleep) * time.Millisecond) // 随机sleep一段时间fmt.Printf("recv tx at %s, global index %d, sleep %d millisecond\n", now.Format("2006/01/02 15:04:05.999"), index, sleep) // 打印日志return shim.Success(nil)
}// 操作世界状态
func (t *simplechaincode) testWithPutState(stub shim.ChaincodeStubInterface, args []string) pb.Response {now := time.Now()index := t.getIndex()data,_ := now.MarshalJSON()stub.PutState(fmt.Sprintf("key_%d", index), data) fmt.Printf("recv tx at %s, global index %d\n", now.Format("2006/01/02 15:04:05.999"), index)return shim.Success(nil)
}// 测试一下线程不安全对象的读写
func (t *simplechaincode) testMap(stub shim.ChaincodeStubInterface, args []string) pb.Response {now := time.Now()index := t.getIndex()data,_ := now.MarshalJSON()stub.PutState(fmt.Sprintf("key_%d", index), data)t.dataDB[index] = indexreturn shim.Success(nil)
}// 测试一下在goroutine中调用PutState
func (t *simplechaincode) testGoroutine(stub shim.ChaincodeStubInterface, args []string) pb.Response {now := time.Now()index := t.getIndex()errc := make(chan error)data,_ := now.MarshalJSON()for i := 0; i < 2; i++ {go func() {errc <- stub.PutState(fmt.Sprintf("key_%d_%d", index, i), data)}()}for i := 0; i < 2; i++ {err := <- errcif err != nil {return shim.Error(err.Error())}}fmt.Printf("recv tx at %s, global index %d\n", now.Format("2006/01/02 15:04:05.999"), index)return shim.Success(nil)
}func main() {rand.Seed(time.Now().UnixNano())pCCInstance := new(simplechaincode)pCCInstance.dataDB = make(map[int]int)err := shim.Start(pCCInstance)if err != nil {fmt.Printf("Error:%s", err)}
}

客户端逻辑

客户端使用fabric-go-sdk,利用goroutine,批量创建交易,调用链码,核心逻辑如下:

    s := sync.WaitGroup{}s.Add(executeTimesEachCycle)for j := 0; j < executeTimesEachCycle; j++ {go func() {defer s.Done()req := channel.Request{ChaincodeID: f.ChainCodeID, Fcn:funcName, Args: args}if _,err := f.client[0].userClient.Execute(req, channel.WithTargetEndpoints("192.168.117.134:7051")); err != nil {fmt.Println(err)}}()}s.Wait()

测试结果

调用100次test接口

docker日志输出如下:
recv tx at 2019/09/05 18:26:21.192, global index 52, sleep 3 millisecond
recv tx at 2019/09/05 18:26:21.191, global index 10, sleep 7 millisecond
recv tx at 2019/09/05 18:26:21.193, global index 67, sleep 18 millisecond
recv tx at 2019/09/05 18:26:21.136, global index 3, sleep 91 millisecond
recv tx at 2019/09/05 18:26:21.191, global index 15, sleep 37 millisecond
recv tx at 2019/09/05 18:26:21.193, global index 64, sleep 66 millisecond
recv tx at 2019/09/05 18:26:21.194, global index 91, sleep 78 millisecond
recv tx at 2019/09/05 18:26:21.193, global index 54, sleep 101 millisecond
recv tx at 2019/09/05 18:26:21.192, global index 44, sleep 105 millisecond
recv tx at 2019/09/05 18:26:21.193, global index 68, sleep 121 millisecond
recv tx at 2019/09/05 18:26:21.194, global index 86, sleep 126 millisecond
recv tx at 2019/09/05 18:26:21.194, global index 95, sleep 166 millisecond
recv tx at 2019/09/05 18:26:21.191, global index 27, sleep 178 millisecond
recv tx at 2019/09/05 18:26:21.194, global index 81, sleep 176 millisecond
可以看到index的输出并不是从1到100顺序输出,这说明Invoke的调用是并行的。

调用100次testWithPutState接口

docker日志输出如下:
recv tx at 2019/09/05 18:29:03.457, global index 149
recv tx at 2019/09/05 18:29:03.457, global index 150
recv tx at 2019/09/05 18:29:03.457, global index 151
recv tx at 2019/09/05 18:29:03.457, global index 152
recv tx at 2019/09/05 18:29:03.457, global index 153
recv tx at 2019/09/05 18:29:03.481, global index 164
recv tx at 2019/09/05 18:29:03.481, global index 160
recv tx at 2019/09/05 18:29:03.481, global index 156
recv tx at 2019/09/05 18:29:03.481, global index 161
recv tx at 2019/09/05 18:29:03.481, global index 162
recv tx at 2019/09/05 18:29:03.481, global index 163
recv tx at 2019/09/05 18:29:03.481, global index 154
recv tx at 2019/09/05 18:29:03.481, global index 155

可以看到index仍旧不是从小到大按顺序输出的,这说明Invoke函数在调用PutState之后,仍旧是并行的。

但是链码进程在从peer接收应答数据后,把数据丢入对应的管道,这个操作是串行的,一定程度上影响了goroutine的调度。如果是多核的机器,测试效果应该更明显。

多次调用100次testMap接口

链码进程崩溃,docker日志输出如下:
fatal error: concurrent map writes
致命错误:map并行写
进程崩溃,需要多次测试,交易量大时更容易复现!

调用一次testGoroutine

背书失败,日志如下:
Unable to submit the request: Unable to invoke testWithPutState in the blockchain Transaction processing for endorser [xxx.xxx.xxx.xxx:7051]: Chaincode status Code: (500) UNKNOWN. Description: [e256e7f251074d9a19e78cf4fdc00d6cd1677264b8cd1c4174215e6d2ab70543] error sending PUT_STATE: [titanium] channel exists

结论:

综上,链码的调用是并行的;

在链码中使用非线程安全的对象时,要谨慎;

在链码中使用goroutine进行性能优化时,要谨慎。
另外,从fabric的交易流程上思考,链码的调用限制为串行没有意义,链码只是模拟执行,并不会真实的修改世界状态,而交易的排序又是在orderer节点完成,背书的顺序并不能决定交易在区块中的顺序!背书串行除了降低出块的效率,想不到还有别的用处了~

https://blog.csdn.net/love_feng_forever/article/details/100540592

区块链Hyperledger Fabric背书过程中链码是并行还是串行?相关推荐

  1. Fabric背书过程中链码是并行还是串行?

    声明:链码开发语言是golang,源码分析是基于fabric 1.4.0版本 链码开发的时候,总是有一个疑问,链码的调用到底是并行还是串行?如果是并行,就需要我们对一些线程不安全的对象进行保护,反之则 ...

  2. 基于区块链/Hyperledger Fabric的商品交易溯源系统搭建步骤

    原项目链接:https://github.com/togettoyou/fabric-realty 此项目链接:https://gitee.com/real__cool/fabdeal 演示链接:ht ...

  3. 区块链 HyperLedger Fabric安装

    前提条件 centos 7.4 64位 安装docker并配置阿里云加速器 安装docker compose 安装golang 由于被禁的关系,是没有办法直接访问golang.org网站的, 不过国内 ...

  4. 基于区块链/Hyperledger Fabric的商品交易溯源系统开发模式搭建

    搭建该项目的开发模式主要有链码的开发模式和前后端的开发模式. 以下内容暂时没有核验,可能有bug 链码开发模式 窗口1:开启dev组件 cd /home/real/project/fabric-sam ...

  5. 区块链架构--fabric基本介绍

    本文总结自<区块链核心技术与应用>这本书,Linux基金会于2015年12月启动了名为"超级账本"(Hyperledger)的开源项目,旨在推动各方协作,共同打造基于区 ...

  6. 区块链前后端交互过程

    区块链前后端交互过程 vue application chaincode 此文章基于github中的开源项目" 基于区块链的房地产交易系统模型"进行介绍.对于新手来说,就算可以运行 ...

  7. 区块链 Hyperledger - 超级账本项目在centos7环境下的安装与部署

    Hyperledger - 超级账本项目在centos7环境下的安装与部署 Hyperledger 项目是开源界面向开放.标准区块链技术的首个重要探索,在 Linux 基金会的支持下,吸引了众多科技和 ...

  8. 区块链技术在食品溯源中的应用

    一.食品溯源机制 1.1食品溯源的研究意义 近年来,食品安全问题频发引起了社会大众的广泛关注.在当今食品贸易的大背景下,生产商和消费者之间存在着严重的信息不对称现象:生产商的有意误导.消费者的认知缺乏 ...

  9. 区块链技术在软件开发中的应用

    如果你是一名软件开发者或者IT从业者,你一定已经听说过区块链技术.区块链是一种基于密码学的分布式账本技术,被广泛应用于数字货币.金融.物联网等领域.但是,除了这些领域之外,区块链技术还可以在软件开发中 ...

  10. seo模拟点击软件_网站外链在SEO优化过程中不可或缺 - 360排名点击软件

    原出处:超级排名系统 原文链接:网站外链在SEO优化过程中不可或缺 - 超级排名系统 在网站优化的路上,要充分考虑到优质外链的含义,但是很多站长虽然近似疯狂的发外链,贴外链.几乎所到之处都可以留下外链 ...

最新文章

  1. easyswoole数据库连接池_easyswoole redis连接池:集群迁移教程
  2. 传输预编码matlab,基于MATLAB的MIMO系统预编码性能仿真教程.doc
  3. vim一些挺方便的功能
  4. 采用预取(Prefetch)来加速你的网站(转)
  5. FIR定点提高精度的trick_02
  6. consul的安装搭建
  7. 创建表结构相同的表,表结构相同的表之间复制数据,Oracle 中 insert into XXX select from 的用法...
  8. 面试官:谈谈分布式一致性机制,我一脸懵逼。。
  9. 教程-脚本之Python
  10. 金山打字通——绿色安全无捆绑下载
  11. ie11兼容性视图设置_IE11打开某些网站,F12仿真模式中文档模式默认为IE7?
  12. 手机计算机16进制,16进制计算器安装方法 16进制计算器使用技巧
  13. 宝塔linux怎么安装asp网站,宝塔面板创建网站:宝塔linux面板添加网站详细教程...
  14. 金针探底技术分析(下)
  15. Python 图_系列之纵横对比 Bellman-Ford 和 Dijkstra 最短路径算法
  16. 360极速了浏览器 HTML5的浏览器,360极速浏览器4大HTML5特性 领先全球
  17. 用pandas生成excel文件示例,并调整excel的格式或样式
  18. Python获取Win7,Win10系统缩放大小
  19. 抗混叠滤波器及其使用
  20. 基于GraphHooper的离线导航软件实现

热门文章

  1. 轻松学习理解ACL访问控制列表(转)
  2. Java 泛型完全解读
  3. python安装whl文件的注意事项(windows系统)
  4. mongo-connector导入数据到Es
  5. C语言之选择结构与循环结构
  6. Introduction to the 80386
  7. 静态成员变量.xml
  8. 基于visual Studio2013解决C语言竞赛题之1030计算函数
  9. java基础之输入语句
  10. AJAX做一个动态进度条