看一下Peer节点的启动过程,通常在Fabric网络中,Peer节点的启动方式有两种,通过Docker容器启动,或者是通过执行命令直接启动。
一般情况下,我们都是执行docker-compose -f docker-*.yaml up命令通过容器启动了Peer节点,而如果直接启动Peer节点则是执行了peer node start这条命令。看起来,这两种方式所使用的命令毫无关系,但事实上,在Docker容器中启动Peer节点也是通过执行了peer node start这条命令来启动Peer节点,只不过是Docker替我们执行了,这条命令就在之前通过启动Docker容器的那个文件中写到。所以说,无论是哪种方式启动Peer节点,都是通过peer node start这条命令,接下来,我们就分析一下执行完这条命令后,Peer节点的启动过程。
和之前一样,首先找到切入点,在/fabric/peer/main.go文件中,第46行:

mainCmd.AddCommand(node.Cmd())

这里包含了与对Peer节点进行相关操作的命令集合,其中就有启动Peer节点的命令,我们点进行看一下:

func Cmd() *cobra.Command {nodeCmd.AddCommand(startCmd())nodeCmd.AddCommand(statusCmd())return nodeCmd
}

共有两条命令:启动Peer节点,以及查看节点的状态,我们看一下启动Peer节点这条命令,首先调用了peer/node/start.go文件中的startCmd(),之后转到了nodeStartCmd,以及serve(args)这个方法。其中,serve(args)这个方法就是本文要说明了主要方法,我们就从这里开始分析,在peer/node/start.go文件中第125行:

func serve(args []string) error {#首先获取MSP的类型,msp指的是成员关系服务提供者,相当于许可证mspType := mgmt.GetLocalMSP().GetType()#如果MSP的类型不是FABRIC,返回错误信息if mspType != msp.FABRIC {panic("Unsupported msp type " + msp.ProviderTypeToString(mspType))}...#创建ACL提供者,access control list访问控制列表aclProvider := aclmgmt.NewACLProvider(aclmgmt.ResourceGetter(peer.GetStableChannelConfig),)#平台注册,可以使用的语言类型,最后一个car不太理解,可能和官方的一个例子有关pr := platforms.NewRegistry(&golang.Platform{},&node.Platform{},&java.Platform{},&car.Platform{},)

定义一个用于部署链码的Provider结构体:

    deployedCCInfoProvider := &lscc.DeployedCCInfoProvider{}
==========================DeployedCCInfoProvider==========================
type DeployedChaincodeInfoProvider interface {Namespaces() []string   #命名空间UpdatedChaincodes(stateUpdates map[string][]*kvrwset.KVWrite) ([]*ChaincodeLifecycleInfo, error)   #保存更新的链码ChaincodeInfo(chaincodeName string, qe SimpleQueryExecutor) (*DeployedChaincodeInfo, error) #保存链码信息CollectionInfo(chaincodeName, collectionName string, qe SimpleQueryExecutor) (*common.StaticCollectionConfig, error)
}   #保存链码数据信息
==========================DeployedCCInfoProvider==========================

下面是对Peer节点的一些属性的设置了:

    identityDeserializerFactory := func(chainID string) msp.IdentityDeserializer {#获取通道管理者return mgmt.GetManagerForChain(chainID)}#相当于配置Peer节点的运行环境了,主要就是保存Peer节点的IP地址,端口,证书等相关基本信息opsSystem := newOperationsSystem()err := opsSystem.Start()if err != nil {return errors.WithMessage(err, "failed to initialize operations subystems")}defer opsSystem.Stop()metricsProvider := opsSystem.Provider#创建观察者,对Peer节点进行记录logObserver := floggingmetrics.NewObserver(metricsProvider)flogging.Global.SetObserver(logObserver)#创建成员关系信息Provider,简单来说就是保存其他Peer节点的信息,以便通信等等membershipInfoProvider := privdata.NewMembershipInfoProvider(createSelfSignedData(), identityDeserializerFactory)#账本管理器初始化,主要就是之前所定义的一些属性ledgermgmt.Initialize(&ledgermgmt.Initializer{#与Tx处理相关CustomTxProcessors:            peer.ConfigTxProcessors,#之前定义的所使用的语言PlatformRegistry:              pr,#与链码相关DeployedChaincodeInfoProvider: deployedCCInfoProvider,#与Peer节点交互相关MembershipInfoProvider:        membershipInfoProvider,#这个不太清楚,与Peer节点的属性相关?MetricsProvider:               metricsProvider,#健康检查HealthCheckRegistry:           opsSystem,},)#判断是否处于开发模式下if chaincodeDevMode {logger.Info("Running in chaincode development mode")logger.Info("Disable loading validity system chaincode")viper.Set("chaincode.mode", chaincode.DevModeUserRunsChaincode)}#里面有两个方法,分别是获取本地地址与获取当前Peer节点实例地址,将地址进行缓存if err := peer.CacheConfiguration(); err != nil {return err}#获取当前Peer节点实例地址,如果没有进行缓存,则会执行上一步的CacheConfiguration()方法peerEndpoint, err := peer.GetPeerEndpoint()if err != nil {err = fmt.Errorf("Failed to get Peer Endpoint: %s", err)return err}#简单的字符串操作,获取HostpeerHost, _, err := net.SplitHostPort(peerEndpoint.Address)if err != nil {return fmt.Errorf("peer address is not in the format of host:port: %v", err)}#获取监听地址,该属性在opsSystem中定义过listenAddr := viper.GetString("peer.listenAddress")#返回当前Peer节点的gRPC服务器配置,该方法主要就是设置TLS与心跳信息,在/core/peer/config.go文件中第128行。serverConfig, err := peer.GetServerConfig()if err != nil {logger.Fatalf("Error loading secure config for peer (%s)", err)}#设置gRPC最大并发 grpcMaxConcurrency=2500throttle := comm.NewThrottle(grpcMaxConcurrency)#设置日志信息serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "PeerServer")serverConfig.MetricsProvider = metricsProvider#设置拦截器,不再细说serverConfig.UnaryInterceptors = append(serverConfig.UnaryInterceptors,grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)),grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),throttle.UnaryServerIntercptor,)serverConfig.StreamInterceptors = append(serverConfig.StreamInterceptors,grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)),grpclogging.StreamServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),throttle.StreamServerInterceptor,)

到这里创建了Peer节点的gRPC服务器,将之前的监听地址与服务器配置传了进去:

    peerServer, err := peer.NewPeerServer(listenAddr, serverConfig)if err != nil {logger.Fatalf("Failed to create peer server (%s)", err)}

关于权限的一些配置:

    #TLS的相关设置if serverConfig.SecOpts.UseTLS {logger.Info("Starting peer with TLS enabled")// set up credential supportcs := comm.GetCredentialSupport()roots, err := peer.GetServerRootCAs()if err != nil {logger.Fatalf("Failed to set TLS server root CAs: %s", err)}cs.ServerRootCAs = roots// set the cert to use if client auth is requested by remote endpointsclientCert, err := peer.GetClientCertificate()if err != nil {logger.Fatalf("Failed to set TLS client certificate: %s", err)}comm.GetCredentialSupport().SetClientCertificate(clientCert)}mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert#策略检查Provider,看传入的参数就比较清楚了,Envelope,通道ID,环境变量policyCheckerProvider := func(resourceName string) deliver.PolicyCheckerFunc {return func(env *cb.Envelope, channelID string) error {return aclProvider.CheckACL(resourceName, channelID, env)}}

创建了另一个服务器,与上面的权限设置相关,用于交付与过滤区块的事件服务器:

    abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)#将之前创建的gRPC服务器与用于交付与过滤区块的事件服务器注册到这里pb.RegisterDeliverServer(peerServer.Server(), abServer)

接下来是与链码相关的操作:

    #启动与链码相关的服务器,看传入的值  Peer节点的主机名,访问控制列表Provider,pr是之前提到与语言相关的,以及之前的运行环境#主要完成三个操作:1.设置本地链码安装路径,2.创建自签名CA,3,启动链码gRPC监听服务,该方法在本文件中第709行chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem)logger.Debugf("Running peer")#启动管理员服务,这个不太懂干嘛的startAdminServer(listenAddr, peerServer.Server(), metricsProvider)privDataDist := func(channel string, txID string, privateData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error {#看这个方法是分发私有数据到其他节点return service.GetGossipService().DistributePrivateData(channel, txID, privateData, blkHt)}
========================TxPvtReadWriteSetWithConfigInfo==========================
#看这里,主要是私有的读写集以及配置信息
type TxPvtReadWriteSetWithConfigInfo struct {EndorsedAt           uint64                                     `protobuf:"varint,1,opt,name=endorsed_at,json=endorsedAt,proto3" json:"endorsed_at,omitempty"`PvtRwset             *rwset.TxPvtReadWriteSet                   `protobuf:"bytes,2,opt,name=pvt_rwset,json=pvtRwset,proto3" json:"pvt_rwset,omitempty"`CollectionConfigs    map[string]*common.CollectionConfigPackage `protobuf:"bytes,3,rep,name=collection_configs,json=collectionConfigs,proto3" json:"collection_configs,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`XXX_NoUnkeyedLiteral struct{}                                   `json:"-"`XXX_unrecognized     []byte                                     `json:"-"`XXX_sizecache        int32                                      `json:"-"`
}
============================TxPvtReadWriteSetWithConfigInfo==========================#获取本地的已签名的身份信息,主要是看当前节点具有的功能,比如背书,验证signingIdentity := mgmt.GetLocalSigningIdentityOrPanic()serializedIdentity, err := signingIdentity.Serialize()if err != nil {logger.Panicf("Failed serializing self identity: %v", err)}#libConf := library.Config{}
================================Config=============================
type Config struct {#权限过滤AuthFilters []*HandlerConfig `mapstructure:"authFilters" yaml:"authFilters"`#这个不清楚Decorators  []*HandlerConfig `mapstructure:"decorators" yaml:"decorators"`#背书Endorsers   PluginMapping    `mapstructure:"endorsers" yaml:"endorsers"`#验证Validators  PluginMapping    `mapstructure:"validators" yaml:"validators"`
}
==================================Config=============================if err = viperutil.EnhancedExactUnmarshalKey("peer.handlers", &libConf); err != nil {return errors.WithMessage(err, "could not load YAML config")}#创建一个Registry实例,将上面的配置注册到这里reg := library.InitRegistry(libConf)#这一部分是背书操作的相关设置,不贴出来了...#设置完之后注册背书服务pb.RegisterEndorserServer(peerServer.Server(), auth)#创建通道策略管理者,比如哪些节点或用户具有可读,可写,可操作的权限,都是由它管理policyMgr := peer.NewChannelPolicyManagerGetter()#创建用于广播的服务,就是区块链中用于向其他节点发送消息的服务err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address)

到这里,链码的相关配置已经差不多了,到了部署系统链码的地方了:

    #这一行代码就是将系统链码部署上去sccp.DeploySysCCs("", ccp)logger.Infof("Deployed system chaincodes")installedCCs := func() ([]ccdef.InstalledChaincode, error) {#查看已经安装的链码return packageProvider.ListInstalledChaincodes()}#与链码的生命周期相关lifecycle, err := cc.NewLifeCycle(cc.Enumerate(installedCCs))if err != nil {logger.Panicf("Failed creating lifecycle: +%v", err)}#处理链码的元数据更新,由其他节点广播onUpdate := cc.HandleMetadataUpdate(func(channel string, chaincodes ccdef.MetadataSet) {service.GetGossipService().UpdateChaincodes(chaincodes.AsChaincodes(), gossipcommon.ChainID(channel))})#添加监听器监听链码元数据更新lifecycle.AddListener(onUpdate)

这一部分是与通道的初始化相关的内容:

    peer.Initialize(func(cid string) {logger.Debugf("Deploying system CC, for channel <%s>", cid)sccp.DeploySysCCs(cid, ccp)#获取通道的描述信息,就是通道的基本属性sub, err := lifecycle.NewChannelSubscription(cid, cc.QueryCreatorFunc(func() (cc.Query, error) {#根据通道ID获取账本的查询执行器return peer.GetLedger(cid).NewQueryExecutor()}))if err != nil {logger.Panicf("Failed subscribing to chaincode lifecycle updates")}#为通道注册监听器cceventmgmt.GetMgr().Register(cid, sub)}, ccp, sccp, txvalidator.MapBasedPluginMapper(validationPluginsByName),pr, deployedCCInfoProvider, membershipInfoProvider, metricsProvider)#当前节点状态改变后是否可以被发现if viper.GetBool("peer.discovery.enabled") {registerDiscoveryService(peerServer, policyMgr, lifecycle)}#获取Peer节点加入的网络IDnetworkID := viper.GetString("peer.networkId")logger.Infof("Starting peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address)#查看是否已经定义了配置文件profileEnabled := viper.GetBool("peer.profile.enabled")profileListenAddress := viper.GetString("peer.profile.listenAddress")#创建进程启动gRPC服务器serve := make(chan error)go func() {var grpcErr errorif grpcErr = peerServer.Start(); grpcErr != nil {grpcErr = fmt.Errorf("grpc server exited with error: %s", grpcErr)} else {logger.Info("peer server exited")}serve <- grpcErr}()#如果已经定义了配置文件,则启动监听服务if profileEnabled {go func() {logger.Infof("Starting profiling server with listenAddress = %s", profileListenAddress)if profileErr := http.ListenAndServe(profileListenAddress, nil); profileErr != nil {logger.Errorf("Error starting profiler: %s", profileErr)}}()}#开始处理接收到的消息了go handleSignals(addPlatformSignals(map[os.Signal]func(){syscall.SIGINT:  func() { serve <- nil },syscall.SIGTERM: func() { serve <- nil },}))logger.Infof("Started peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address)#阻塞在这里,除非gRPC服务停止return <-serve
}

到这里Peer节点已经启动完成了,过程还是很复杂的,这里总结一下整体的过程:

  1. 首先就是读取配置信息,创建Cache结构,以及检测其他Peer节点的信息。

    1. CacheConfiguration(),主要保存其他Peer节点的相关信息。
  2. 创建PeerServer
    1. peerServer, err := peer.NewPeerServer(listenAddr, serverConfig)
  3. 创建DeliverEventsServer
    1. abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)
    2. pb.RegisterDeliverServer(peerServer.Server(), abServer)
    3. fabric/core/peer/deliverevents.go,该服务主要用于区块的交付与过滤,主要方法:Deliver(),DeliverFiltered()
  4. 启动ChaincodeServer
    1. chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem)
    2. core/chaincode/chaincode_support.go,返回了ChaincodeSupport:为Peer提供执行链码的接口,主要功能有Launch():启动一个停止运行的链码,Stop():停止链码的运行,HandleChaincodeStream():处理链码流信息,Register():将链码注册到当前Peer节点 ,createCCMessage():创建一个交易,ExecuteLegacyInit():链码的实例化,Execute():执行链码并返回回原始的响应,processChaincodeExecutionResult():处理链码的执行结果,InvokeInit():调用链码的Init方法,Invoke():调用链码,execute():执行一个交易
  5. 启动AdminServer
    1. startAdminServer(listenAddr, peerServer.Server(), metricsProvider)
    2. core/protos/peer/admin.go文件,具有GetStatus(),StartServer(),GetModuleLogLevel(),SetModuleLogLevel()等方法
  6. 创建EndorserServer
    1. pb.RegisterEndorserServer(peerServer.Server(), auth)
    2. core/endorser/endorser.go文件,注册背书服务器,提供了一个很重要的方法:ProcessProposal(),这个方法值得看一下。
  7. 创建GossipService
    1. err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address)
    2. gossip/service/gossip_service.go,具有InitializeChannel(),createSelfSignedData(),updateAnchors(),AddPayload()等方法
  8. 部署系统链码。
  9. 初始化通道。
  10. 启动gRPC服务。
  11. 如果启用了profile,还会启动监听服务。

流程图:,由于Fabric在不断更新,所以代码和图中还是有一些不同的。
参考:这里

转载于:https://www.cnblogs.com/cbkj-xd/p/11141717.html

Fabric1.4源码解析:Peer节点启动过程相关推荐

  1. btcd源码解析——peer节点之间的区块数据同步 (3) —— 非headersFirstMode模式

    文章目录 1. 写在前面 2. 非headersFirstMode模式下的数据同步过程 2.1 peer A 发送"获取区块哈希"的请求 2.2 peer B 响应"获取 ...

  2. 基于8.0源码解析:startService 启动过程

    基于8.0源码解析:startService 启动过程 首先看一张startService的图,心里有个大概的预估,跟Activity启动流程比,Service的启动稍微简单点,并且我把Service ...

  3. Android8.0源码解析——Activity的启动过程

    前言 Activity是Android的四大组件,关于Activity 的启动过程是怎么样的昵,下面我们主要通过Android8.0的源码来分析一下. 1.Activity的生命周期: Activit ...

  4. dubbo(5) Dubbo源码解析之服务调用过程

    来源:https://juejin.im/post/5ca4a1286fb9a05e731fc042 Dubbo源码解析之服务调用过程 简介 在前面的文章中,我们分析了 Dubbo SPI.服务导出与 ...

  5. rocketmq源码解析之name启动(一)

    2019独角兽企业重金招聘Python工程师标准>>> 说在前面 主要解析namrsrv启动部分,namesrv配置加载.netty server创建.注册出处理器. 正文 源码解析 ...

  6. 6、RocketMQ 源码解析之 Broker 启动(上)

    上面一篇我们介绍了 RocketMQ 的元数据管理,它是通过自定义一个 KV 服务器.并且其它服务在 NameServer 注册服务信息的时候都是全量注册.如果 RocketMQ 的拓扑图当中有多台 ...

  7. 源码解析 React Hook 构建过程

    2018 年的 React Conf 上 Dan Abramov 正式对外介绍了React Hook,这是一种让函数组件支持状态和其他 React 特性的全新方式,并被官方解读为这是下一个 5 年 R ...

  8. kotlin coroutine源码解析之Job启动流程

    目录 Job启动流程 launch流程分析 父子Job关联分析 结论 Job启动流程 job启动流程,我们先从一段最简单使用协程的代码开始,进行代码跟跟踪,顺便引出几个关键的概念,在后面章节里面去单独 ...

  9. 【java】spring-boot源码解析之应用启动

    spring boot 项目使用默认配置的思想,极大的简化了 spring 项目的开发.下面的代码就是一个最简单的 spring 项目: @SpringBootApplication public c ...

最新文章

  1. 统计学派的18种经典「数据分析方法」
  2. Pat乙级1084 外观数列
  3. ASP.NET Core Web 应用程序系列(四)- ASP.NET Core 异步编程之async await
  4. 嵌入式linux系统中设备驱动程序
  5. 前端防xss攻击(去掉空格等能影响和攻击数据库的字段)
  6. 中国移动IM-飞信-0802上线新版本 试用手记
  7. JQuery Datatables 服务端分页简单应用学习
  8. 编程语言_C++_Java_面试题006
  9. jsp中的九大内置对象和四大作用域
  10. String s = new String(“abc“)创建了几个对象
  11. OpenG数组讲解之Filter 1D Array。
  12. elasticsearch单机版安装及安装过程踩的坑整理
  13. 445端口是什么,怎么关闭?
  14. Win11系统一些功能修改并不令人满意,盘点不尽人意之处
  15. 关于医院精确套打发票的实现
  16. android 智能家居 物联网 声纹开锁
  17. 自动生成数据库设计文档利器
  18. flv格式的视频怎么转mp4?
  19. 西门子PLC编程,西门子PLC远程下载
  20. Google免费虚拟主机空间

热门文章

  1. 2018/12/06 eclipse 快速加载需要的包
  2. 《SQL Server 2008从入门到精通》--20180703
  3. flex 与 后台通讯
  4. c c++ sizeof
  5. JavaScript格式化金额及格式化输出
  6. 史上最全的程序猿工具集(辅助工具、开发工具、技术栈、学习网站、博客论坛)
  7. 数据结构与算法(updating....)
  8. 【ECharts学习】—实现我的第一个图表
  9. 股票冲高回落意味着什么?
  10. 工厂打工10年,现在被工厂以能力不足为由辞退,可以去仲裁吗?