Fabric1.4源码解析:Peer节点启动过程
看一下Peer
节点的启动过程,通常在Fabric网络中,Peer
节点的启动方式有两种,通过Docker容器启动,或者是通过执行命令直接启动。
一般情况下,我们都是执行docker-compose -f docker-*.yaml up
命令通过容器启动了Peer
节点,而如果直接启动Peer
节点则是执行了peer node start
这条命令。看起来,这两种方式所使用的命令毫无关系,但事实上,在Docker容器中启动Peer
节点也是通过执行了peer node start
这条命令来启动Peer
节点,只不过是Docker替我们执行了,这条命令就在之前通过启动Docker容器的那个文件中写到。所以说,无论是哪种方式启动Peer
节点,都是通过peer node start
这条命令,接下来,我们就分析一下执行完这条命令后,Peer
节点的启动过程。
和之前一样,首先找到切入点,在/fabric/peer/main.go
文件中,第46行:
mainCmd.AddCommand(node.Cmd())
这里包含了与对Peer
节点进行相关操作的命令集合,其中就有启动Peer
节点的命令,我们点进行看一下:
func Cmd() *cobra.Command {nodeCmd.AddCommand(startCmd())nodeCmd.AddCommand(statusCmd())return nodeCmd
}
共有两条命令:启动Peer
节点,以及查看节点的状态,我们看一下启动Peer
节点这条命令,首先调用了peer/node/start.go
文件中的startCmd()
,之后转到了nodeStartCmd
,以及serve(args)
这个方法。其中,serve(args)
这个方法就是本文要说明了主要方法,我们就从这里开始分析,在peer/node/start.go
文件中第125行:
func serve(args []string) error {#首先获取MSP的类型,msp指的是成员关系服务提供者,相当于许可证mspType := mgmt.GetLocalMSP().GetType()#如果MSP的类型不是FABRIC,返回错误信息if mspType != msp.FABRIC {panic("Unsupported msp type " + msp.ProviderTypeToString(mspType))}...#创建ACL提供者,access control list访问控制列表aclProvider := aclmgmt.NewACLProvider(aclmgmt.ResourceGetter(peer.GetStableChannelConfig),)#平台注册,可以使用的语言类型,最后一个car不太理解,可能和官方的一个例子有关pr := platforms.NewRegistry(&golang.Platform{},&node.Platform{},&java.Platform{},&car.Platform{},)
定义一个用于部署链码的Provider结构体:
deployedCCInfoProvider := &lscc.DeployedCCInfoProvider{}
==========================DeployedCCInfoProvider==========================
type DeployedChaincodeInfoProvider interface {Namespaces() []string #命名空间UpdatedChaincodes(stateUpdates map[string][]*kvrwset.KVWrite) ([]*ChaincodeLifecycleInfo, error) #保存更新的链码ChaincodeInfo(chaincodeName string, qe SimpleQueryExecutor) (*DeployedChaincodeInfo, error) #保存链码信息CollectionInfo(chaincodeName, collectionName string, qe SimpleQueryExecutor) (*common.StaticCollectionConfig, error)
} #保存链码数据信息
==========================DeployedCCInfoProvider==========================
下面是对Peer节点的一些属性的设置了:
identityDeserializerFactory := func(chainID string) msp.IdentityDeserializer {#获取通道管理者return mgmt.GetManagerForChain(chainID)}#相当于配置Peer节点的运行环境了,主要就是保存Peer节点的IP地址,端口,证书等相关基本信息opsSystem := newOperationsSystem()err := opsSystem.Start()if err != nil {return errors.WithMessage(err, "failed to initialize operations subystems")}defer opsSystem.Stop()metricsProvider := opsSystem.Provider#创建观察者,对Peer节点进行记录logObserver := floggingmetrics.NewObserver(metricsProvider)flogging.Global.SetObserver(logObserver)#创建成员关系信息Provider,简单来说就是保存其他Peer节点的信息,以便通信等等membershipInfoProvider := privdata.NewMembershipInfoProvider(createSelfSignedData(), identityDeserializerFactory)#账本管理器初始化,主要就是之前所定义的一些属性ledgermgmt.Initialize(&ledgermgmt.Initializer{#与Tx处理相关CustomTxProcessors: peer.ConfigTxProcessors,#之前定义的所使用的语言PlatformRegistry: pr,#与链码相关DeployedChaincodeInfoProvider: deployedCCInfoProvider,#与Peer节点交互相关MembershipInfoProvider: membershipInfoProvider,#这个不太清楚,与Peer节点的属性相关?MetricsProvider: metricsProvider,#健康检查HealthCheckRegistry: opsSystem,},)#判断是否处于开发模式下if chaincodeDevMode {logger.Info("Running in chaincode development mode")logger.Info("Disable loading validity system chaincode")viper.Set("chaincode.mode", chaincode.DevModeUserRunsChaincode)}#里面有两个方法,分别是获取本地地址与获取当前Peer节点实例地址,将地址进行缓存if err := peer.CacheConfiguration(); err != nil {return err}#获取当前Peer节点实例地址,如果没有进行缓存,则会执行上一步的CacheConfiguration()方法peerEndpoint, err := peer.GetPeerEndpoint()if err != nil {err = fmt.Errorf("Failed to get Peer Endpoint: %s", err)return err}#简单的字符串操作,获取HostpeerHost, _, err := net.SplitHostPort(peerEndpoint.Address)if err != nil {return fmt.Errorf("peer address is not in the format of host:port: %v", err)}#获取监听地址,该属性在opsSystem中定义过listenAddr := viper.GetString("peer.listenAddress")#返回当前Peer节点的gRPC服务器配置,该方法主要就是设置TLS与心跳信息,在/core/peer/config.go文件中第128行。serverConfig, err := peer.GetServerConfig()if err != nil {logger.Fatalf("Error loading secure config for peer (%s)", err)}#设置gRPC最大并发 grpcMaxConcurrency=2500throttle := comm.NewThrottle(grpcMaxConcurrency)#设置日志信息serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "PeerServer")serverConfig.MetricsProvider = metricsProvider#设置拦截器,不再细说serverConfig.UnaryInterceptors = append(serverConfig.UnaryInterceptors,grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)),grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),throttle.UnaryServerIntercptor,)serverConfig.StreamInterceptors = append(serverConfig.StreamInterceptors,grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)),grpclogging.StreamServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),throttle.StreamServerInterceptor,)
到这里创建了Peer节点的gRPC服务器,将之前的监听地址与服务器配置传了进去:
peerServer, err := peer.NewPeerServer(listenAddr, serverConfig)if err != nil {logger.Fatalf("Failed to create peer server (%s)", err)}
关于权限的一些配置:
#TLS的相关设置if serverConfig.SecOpts.UseTLS {logger.Info("Starting peer with TLS enabled")// set up credential supportcs := comm.GetCredentialSupport()roots, err := peer.GetServerRootCAs()if err != nil {logger.Fatalf("Failed to set TLS server root CAs: %s", err)}cs.ServerRootCAs = roots// set the cert to use if client auth is requested by remote endpointsclientCert, err := peer.GetClientCertificate()if err != nil {logger.Fatalf("Failed to set TLS client certificate: %s", err)}comm.GetCredentialSupport().SetClientCertificate(clientCert)}mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert#策略检查Provider,看传入的参数就比较清楚了,Envelope,通道ID,环境变量policyCheckerProvider := func(resourceName string) deliver.PolicyCheckerFunc {return func(env *cb.Envelope, channelID string) error {return aclProvider.CheckACL(resourceName, channelID, env)}}
创建了另一个服务器,与上面的权限设置相关,用于交付与过滤区块的事件服务器:
abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)#将之前创建的gRPC服务器与用于交付与过滤区块的事件服务器注册到这里pb.RegisterDeliverServer(peerServer.Server(), abServer)
接下来是与链码相关的操作:
#启动与链码相关的服务器,看传入的值 Peer节点的主机名,访问控制列表Provider,pr是之前提到与语言相关的,以及之前的运行环境#主要完成三个操作:1.设置本地链码安装路径,2.创建自签名CA,3,启动链码gRPC监听服务,该方法在本文件中第709行chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem)logger.Debugf("Running peer")#启动管理员服务,这个不太懂干嘛的startAdminServer(listenAddr, peerServer.Server(), metricsProvider)privDataDist := func(channel string, txID string, privateData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error {#看这个方法是分发私有数据到其他节点return service.GetGossipService().DistributePrivateData(channel, txID, privateData, blkHt)}
========================TxPvtReadWriteSetWithConfigInfo==========================
#看这里,主要是私有的读写集以及配置信息
type TxPvtReadWriteSetWithConfigInfo struct {EndorsedAt uint64 `protobuf:"varint,1,opt,name=endorsed_at,json=endorsedAt,proto3" json:"endorsed_at,omitempty"`PvtRwset *rwset.TxPvtReadWriteSet `protobuf:"bytes,2,opt,name=pvt_rwset,json=pvtRwset,proto3" json:"pvt_rwset,omitempty"`CollectionConfigs map[string]*common.CollectionConfigPackage `protobuf:"bytes,3,rep,name=collection_configs,json=collectionConfigs,proto3" json:"collection_configs,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`XXX_NoUnkeyedLiteral struct{} `json:"-"`XXX_unrecognized []byte `json:"-"`XXX_sizecache int32 `json:"-"`
}
============================TxPvtReadWriteSetWithConfigInfo==========================#获取本地的已签名的身份信息,主要是看当前节点具有的功能,比如背书,验证signingIdentity := mgmt.GetLocalSigningIdentityOrPanic()serializedIdentity, err := signingIdentity.Serialize()if err != nil {logger.Panicf("Failed serializing self identity: %v", err)}#libConf := library.Config{}
================================Config=============================
type Config struct {#权限过滤AuthFilters []*HandlerConfig `mapstructure:"authFilters" yaml:"authFilters"`#这个不清楚Decorators []*HandlerConfig `mapstructure:"decorators" yaml:"decorators"`#背书Endorsers PluginMapping `mapstructure:"endorsers" yaml:"endorsers"`#验证Validators PluginMapping `mapstructure:"validators" yaml:"validators"`
}
==================================Config=============================if err = viperutil.EnhancedExactUnmarshalKey("peer.handlers", &libConf); err != nil {return errors.WithMessage(err, "could not load YAML config")}#创建一个Registry实例,将上面的配置注册到这里reg := library.InitRegistry(libConf)#这一部分是背书操作的相关设置,不贴出来了...#设置完之后注册背书服务pb.RegisterEndorserServer(peerServer.Server(), auth)#创建通道策略管理者,比如哪些节点或用户具有可读,可写,可操作的权限,都是由它管理policyMgr := peer.NewChannelPolicyManagerGetter()#创建用于广播的服务,就是区块链中用于向其他节点发送消息的服务err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address)
到这里,链码的相关配置已经差不多了,到了部署系统链码的地方了:
#这一行代码就是将系统链码部署上去sccp.DeploySysCCs("", ccp)logger.Infof("Deployed system chaincodes")installedCCs := func() ([]ccdef.InstalledChaincode, error) {#查看已经安装的链码return packageProvider.ListInstalledChaincodes()}#与链码的生命周期相关lifecycle, err := cc.NewLifeCycle(cc.Enumerate(installedCCs))if err != nil {logger.Panicf("Failed creating lifecycle: +%v", err)}#处理链码的元数据更新,由其他节点广播onUpdate := cc.HandleMetadataUpdate(func(channel string, chaincodes ccdef.MetadataSet) {service.GetGossipService().UpdateChaincodes(chaincodes.AsChaincodes(), gossipcommon.ChainID(channel))})#添加监听器监听链码元数据更新lifecycle.AddListener(onUpdate)
这一部分是与通道的初始化相关的内容:
peer.Initialize(func(cid string) {logger.Debugf("Deploying system CC, for channel <%s>", cid)sccp.DeploySysCCs(cid, ccp)#获取通道的描述信息,就是通道的基本属性sub, err := lifecycle.NewChannelSubscription(cid, cc.QueryCreatorFunc(func() (cc.Query, error) {#根据通道ID获取账本的查询执行器return peer.GetLedger(cid).NewQueryExecutor()}))if err != nil {logger.Panicf("Failed subscribing to chaincode lifecycle updates")}#为通道注册监听器cceventmgmt.GetMgr().Register(cid, sub)}, ccp, sccp, txvalidator.MapBasedPluginMapper(validationPluginsByName),pr, deployedCCInfoProvider, membershipInfoProvider, metricsProvider)#当前节点状态改变后是否可以被发现if viper.GetBool("peer.discovery.enabled") {registerDiscoveryService(peerServer, policyMgr, lifecycle)}#获取Peer节点加入的网络IDnetworkID := viper.GetString("peer.networkId")logger.Infof("Starting peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address)#查看是否已经定义了配置文件profileEnabled := viper.GetBool("peer.profile.enabled")profileListenAddress := viper.GetString("peer.profile.listenAddress")#创建进程启动gRPC服务器serve := make(chan error)go func() {var grpcErr errorif grpcErr = peerServer.Start(); grpcErr != nil {grpcErr = fmt.Errorf("grpc server exited with error: %s", grpcErr)} else {logger.Info("peer server exited")}serve <- grpcErr}()#如果已经定义了配置文件,则启动监听服务if profileEnabled {go func() {logger.Infof("Starting profiling server with listenAddress = %s", profileListenAddress)if profileErr := http.ListenAndServe(profileListenAddress, nil); profileErr != nil {logger.Errorf("Error starting profiler: %s", profileErr)}}()}#开始处理接收到的消息了go handleSignals(addPlatformSignals(map[os.Signal]func(){syscall.SIGINT: func() { serve <- nil },syscall.SIGTERM: func() { serve <- nil },}))logger.Infof("Started peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address)#阻塞在这里,除非gRPC服务停止return <-serve
}
到这里Peer
节点已经启动完成了,过程还是很复杂的,这里总结一下整体的过程:
- 首先就是读取配置信息,创建Cache结构,以及检测其他
Peer
节点的信息。CacheConfiguration()
,主要保存其他Peer
节点的相关信息。
- 创建
PeerServer
。peerServer, err := peer.NewPeerServer(listenAddr, serverConfig)
- 创建
DeliverEventsServer
。abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)
pb.RegisterDeliverServer(peerServer.Server(), abServer)
fabric/core/peer/deliverevents.go
,该服务主要用于区块的交付与过滤,主要方法:Deliver(),DeliverFiltered()
- 启动
ChaincodeServer
。chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem)
core/chaincode/chaincode_support.go
,返回了ChaincodeSupport:为Peer提供执行链码的接口,主要功能有Launch():启动一个停止运行的链码,Stop():停止链码的运行,HandleChaincodeStream():处理链码流信息,Register():将链码注册到当前Peer节点 ,createCCMessage():创建一个交易,ExecuteLegacyInit():链码的实例化,Execute():执行链码并返回回原始的响应,processChaincodeExecutionResult():处理链码的执行结果,InvokeInit():调用链码的Init方法,Invoke():调用链码,execute():执行一个交易
- 启动
AdminServer
。startAdminServer(listenAddr, peerServer.Server(), metricsProvider)
core/protos/peer/admin.go
文件,具有GetStatus(),StartServer(),GetModuleLogLevel(),SetModuleLogLevel()
等方法
- 创建
EndorserServer
。pb.RegisterEndorserServer(peerServer.Server(), auth)
core/endorser/endorser.go
文件,注册背书服务器,提供了一个很重要的方法:ProcessProposal()
,这个方法值得看一下。
- 创建
GossipService
。err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address)
gossip/service/gossip_service.go
,具有InitializeChannel(),createSelfSignedData(),updateAnchors(),AddPayload()
等方法
- 部署系统链码。
- 初始化通道。
- 启动gRPC服务。
- 如果启用了profile,还会启动监听服务。
流程图:,由于Fabric在不断更新,所以代码和图中还是有一些不同的。
参考:这里
转载于:https://www.cnblogs.com/cbkj-xd/p/11141717.html
Fabric1.4源码解析:Peer节点启动过程相关推荐
- btcd源码解析——peer节点之间的区块数据同步 (3) —— 非headersFirstMode模式
文章目录 1. 写在前面 2. 非headersFirstMode模式下的数据同步过程 2.1 peer A 发送"获取区块哈希"的请求 2.2 peer B 响应"获取 ...
- 基于8.0源码解析:startService 启动过程
基于8.0源码解析:startService 启动过程 首先看一张startService的图,心里有个大概的预估,跟Activity启动流程比,Service的启动稍微简单点,并且我把Service ...
- Android8.0源码解析——Activity的启动过程
前言 Activity是Android的四大组件,关于Activity 的启动过程是怎么样的昵,下面我们主要通过Android8.0的源码来分析一下. 1.Activity的生命周期: Activit ...
- dubbo(5) Dubbo源码解析之服务调用过程
来源:https://juejin.im/post/5ca4a1286fb9a05e731fc042 Dubbo源码解析之服务调用过程 简介 在前面的文章中,我们分析了 Dubbo SPI.服务导出与 ...
- rocketmq源码解析之name启动(一)
2019独角兽企业重金招聘Python工程师标准>>> 说在前面 主要解析namrsrv启动部分,namesrv配置加载.netty server创建.注册出处理器. 正文 源码解析 ...
- 6、RocketMQ 源码解析之 Broker 启动(上)
上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...
- 源码解析 React Hook 构建过程
2018 年的 React Conf 上 Dan Abramov 正式对外介绍了React Hook,这是一种让函数组件支持状态和其他 React 特性的全新方式,并被官方解读为这是下一个 5 年 R ...
- kotlin coroutine源码解析之Job启动流程
目录 Job启动流程 launch流程分析 父子Job关联分析 结论 Job启动流程 job启动流程,我们先从一段最简单使用协程的代码开始,进行代码跟跟踪,顺便引出几个关键的概念,在后面章节里面去单独 ...
- 【java】spring-boot源码解析之应用启动
spring boot 项目使用默认配置的思想,极大的简化了 spring 项目的开发.下面的代码就是一个最简单的 spring 项目: @SpringBootApplication public c ...
最新文章
- 统计学派的18种经典「数据分析方法」
- Pat乙级1084 外观数列
- ASP.NET Core Web 应用程序系列(四)- ASP.NET Core 异步编程之async await
- 嵌入式linux系统中设备驱动程序
- 前端防xss攻击(去掉空格等能影响和攻击数据库的字段)
- 中国移动IM-飞信-0802上线新版本 试用手记
- JQuery Datatables 服务端分页简单应用学习
- 编程语言_C++_Java_面试题006
- jsp中的九大内置对象和四大作用域
- String s = new String(“abc“)创建了几个对象
- OpenG数组讲解之Filter 1D Array。
- elasticsearch单机版安装及安装过程踩的坑整理
- 445端口是什么,怎么关闭?
- Win11系统一些功能修改并不令人满意,盘点不尽人意之处
- 关于医院精确套打发票的实现
- android 智能家居 物联网 声纹开锁
- 自动生成数据库设计文档利器
- flv格式的视频怎么转mp4?
- 西门子PLC编程,西门子PLC远程下载
- Google免费虚拟主机空间