一、fabric数据存储

在fabric中,数据的存储被抽象成一个帐本对象,本身数据底层的数据库和文件是被隔离的,这样做的优势在于,上层数据的变化,对底层数据库的影响极小,甚至于一般情况下都不会有什么影响。抽象的这一层包括有帐本数据存储对象(包括区块数据文件、隐私数据库和区块索引数据库)、状态数据库、历史数据库和Transient隐私数据库等。看一下它们的相关的类图:

从上图可以清晰的看到整个Fabric中数据存储的相关的类关系。其具体的类代码如下:
首先看一个最底层的DB操作类及其相关的类,其下就是操作第三方的LevelDB的类了:

//数据库操作类
//common\ledger\util\leveldbhelper\leveldb_helper.go
type DB struct {conf    *Conf//操作LevelDB的底层开源接口类//在向下就是第三方的DB类了,有兴趣可以看看db      *leveldb.DBdbState dbStatemux     sync.MutexreadOpts        *opt.ReadOptionswriteOptsNoSync *opt.WriteOptionswriteOptsSync   *opt.WriteOptions
}
// Conf configuration for `DB`
type Conf struct {DBPath string
}

DB类中包括这个Conf,其实都是一些重命名的基本数据类型。而leveldb.DB,这里不展开,有兴趣可以看看网上的第三方的数据接口包。包括RocksDB都有类似的第三方开发包。然后下面看看在其上哪个类调用了它:

//\core\ledger\kvledger\kv_ledger_provider.go
type idStore struct {db *leveldbhelper.DB
}
//common\ledger\util\leveldbhelper/leveldb_provider.go
type DBHandle struct {dbName stringdb     *DB
}
type Provider struct {db        *DBdbHandles map[string]*DBHandlemux       sync.Mutex
}

在这一层简单抽象的基础上,在上层又开始了继续的抽象,看一下再次封装的上一层的数据存储相关:

//core\ledger\kvledger\kv_ledger_provider.go---kvledger包
//它实现了下面的Peer帐本的接口
type Provider struct {//ID索引存储库:两个作用,一个是存储帐本ID和创建块,另外一个是保存在建标志,标志帐本是否创建成功idStore             *idStore//帐本存储器(区块存储器和隐私数据存储器)ledgerStoreProvider *ledgerstorage.Provider//状态数据库vdbProvider         privacyenabledstate.DBProvider//历史数据库historydbProvider   historydb.HistoryDBProvider//历史配置管理器configHistoryMgr    confighistory.MgrstateListeners      []ledger.StateListener//隐私数据持有者数据库--处理隐私相关策略包含隐私数据的过期日期和块号等问题bookkeepingProvider bookkeeping.Providerinitializer         *ledger.InitializercollElgNotifier     *collElgNotifierstats               *statsfileLock            *leveldbhelper.FileLock
}
//core\ledger\ledger_interface.go
// PeerLedgerProvider provides handle to ledger instances
type PeerLedgerProvider interface {Initialize(initializer *Initializer) error// Create creates a new ledger with the given genesis block.// This function guarantees that the creation of ledger and committing the genesis block would an atomic action// The chain id retrieved from the genesis block is treated as a ledger idCreate(genesisBlock *common.Block) (PeerLedger, error)// Open opens an already created ledgerOpen(ledgerID string) (PeerLedger, error)// Exists tells whether the ledger with given id existsExists(ledgerID string) (bool, error)// List lists the ids of the existing ledgersList() ([]string, error)// Close closes the PeerLedgerProviderClose()
}

这形成了一条向上的类包含关系的路线,到kvledger.Provider类这一层,就到了Peer节点的帐本处理的抽象顶层了,回头再看,这个类除了包含数据库的访问类,还包含什么,是不是和第二层中的这些类还有一些其它的关系?看一下定义的数据结构:

// core\ledger\ledgerstorage\store.go
type Provider struct {blkStoreProvider     blkstorage.BlockStoreProviderpvtdataStoreProvider pvtdatastorage.Provider
}}
//core\ledger\kvledger\txmgmt\privacyenabledstate\db.go
// DBProvider provides handle to a PvtVersionedDB
type DBProvider interface {// GetDBHandle returns a handle to a PvtVersionedDBGetDBHandle(id string) (DB, error)// Close closes all the PvtVersionedDB instances and releases any resources held by VersionedDBProviderClose()
}
//core\ledger\kvledger\history\historydb\historydb.go
// HistoryDBProvider provides an instance of a history DB
type HistoryDBProvider interface {// GetDBHandle returns a handle to a HistoryDBGetDBHandle(id string) (HistoryDB, error)// Close closes all the HistoryDB instances and releases any resources held by HistoryDBProviderClose()
}
//core\ledger\kvledger\bookkeeping\provider.go
// Provider provides handle to different bookkeepers for the given ledger
type Provider interface {// GetDBHandle returns a db handle that can be used for maintaining the bookkeeping of a given categoryGetDBHandle(ledgerID string, cat Category) *leveldbhelper.DBHandle// Close closes the BookkeeperProviderClose()
}

这里看到,除了第一个是个数据结构,其它都是接口函数,那么就到此为止,只看数据结构(最后再统一看接口实现)也就是第三层的包含(依赖或者关联关系):

//\common\ledger\blkstorage\blockstorage.go
// BlockStoreProvider provides an handle to a BlockStore
type BlockStoreProvider interface {CreateBlockStore(ledgerid string) (BlockStore, error)OpenBlockStore(ledgerid string) (BlockStore, error)Exists(ledgerid string) (bool, error)List() ([]string, error)Close()
}
//core\ledger\pvtdatastorage\store.go
// Provider provides handle to specific 'Store' that in turn manages
// private write sets for a ledger
type Provider interface {OpenStore(id string) (Store, error)Close()
}

那么谁又实现了这些接口(包含上一层的接口)呢:

//common\ledger\blkstorage\fsblkstorage\fs_blockstore_provider.go
type FsBlockstoreProvider struct {conf            *ConfindexConfig     *blkstorage.IndexConfigleveldbProvider *leveldbhelper.Providerstats           *stats
}
//core\ledger\pvtdatastorage\store_impl.go
type provider struct {dbProvider *leveldbhelper.Provider
}
//core\ledger\kvledger\history\historydb\historyleveldb\historyleveldb.go
// HistoryDBProvider implements interface HistoryDBProvider
type HistoryDBProvider struct {dbProvider *leveldbhelper.Provider
}//core\ledger\kvledger\bookkeeping\provider.go
type provider struct {dbProvider *leveldbhelper.Provider
}
//\core\ledger\kvledger\txmgmt\privacyenabledstate\common_storage_db.go
type CommonStorageDBProvider struct {statedb.VersionedDBProviderHealthCheckRegistry ledger.HealthCheckRegistrybookkeepingProvider bookkeeping.Provider
}
//core\ledger\kvledger\txmgmt\statedb\stateleveldb\stateleveldb.go
// VersionedDBProvider implements interface VersionedDBProvider
type VersionedDBProvider struct {dbProvider *leveldbhelper.Provider
}
//core\ledger\kvledger\txmgmt\statedb\statecouchdb\statecouchdb.go
// VersionedDBProvider implements interface VersionedDBProvider
type VersionedDBProvider struct {couchInstance *couchdb.CouchInstancedatabases     map[string]*VersionedDBmux           sync.MutexopenCounts    uint64
}

在这里发现,这几个类FsBlockstoreProvider,provider等都包含了最初的DB上层的那个封装的leveldbhelper.Provider,进一步回到了DB和DBHandle。而第一个FsBlockstoreProvider中,还包含了blkstorage.IndexConfig,看一下它的定义:

type IndexConfig struct {//后面这个定义就是string的别名AttrsToIndex []IndexableAttr
}

再看一下历史数据库的操作接口和相关封装的代码:

//core\ledger\kvledger\history\historydb\historydb.go
// HistoryDB - an interface that a history database should implement
type HistoryDB interface {NewHistoryQueryExecutor(blockStore blkstorage.BlockStore) (ledger.HistoryQueryExecutor, error)Commit(block *common.Block) errorGetLastSavepoint() (*version.Height, error)ShouldRecover(lastAvailableBlock uint64) (bool, uint64, error)CommitLostBlock(blockAndPvtdata *ledger.BlockAndPvtData) errorName() string
}
// historyDB implements HistoryDB interface
type historyDB struct {db     *leveldbhelper.DBHandledbName string
}

这样,基本上一个存储的体系就完成了,正如上面的类图所示,当然,这里的数据存储还有另外一层的逻辑的分类,从整体的帐本控制到Channel帐本,看一下定义的代码:

type PeerLedger interface {commonledger.Ledger// GetTransactionByID retrieves a transaction by idGetTransactionByID(txID string) (*peer.ProcessedTransaction, error)// GetBlockByHash returns a block given it's hashGetBlockByHash(blockHash []byte) (*common.Block, error)// GetBlockByTxID returns a block which contains a transactionGetBlockByTxID(txID string) (*common.Block, error)// GetTxValidationCodeByTxID returns reason code of transaction validationGetTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error)// NewTxSimulator gives handle to a transaction simulator.// A client can obtain more than one 'TxSimulator's for parallel execution.// Any snapshoting/synchronization should be performed at the implementation level if requiredNewTxSimulator(txid string) (TxSimulator, error)// NewQueryExecutor gives handle to a query executor.// A client can obtain more than one 'QueryExecutor's for parallel execution.// Any synchronization should be performed at the implementation level if requiredNewQueryExecutor() (QueryExecutor, error)// NewHistoryQueryExecutor gives handle to a history query executor.// A client can obtain more than one 'HistoryQueryExecutor's for parallel execution.// Any synchronization should be performed at the implementation level if requiredNewHistoryQueryExecutor() (HistoryQueryExecutor, error)// GetPvtDataAndBlockByNum returns the block and the corresponding pvt data.// The pvt data is filtered by the list of 'ns/collections' supplied// A nil filter does not filter any results and causes retrieving all the pvt data for the given blockNumGetPvtDataAndBlockByNum(blockNum uint64, filter PvtNsCollFilter) (*BlockAndPvtData, error)// GetPvtDataByNum returns only the pvt data  corresponding to the given block number// The pvt data is filtered by the list of 'ns/collections' supplied in the filter// A nil filter does not filter any results and causes retrieving all the pvt data for the given blockNumGetPvtDataByNum(blockNum uint64, filter PvtNsCollFilter) ([]*TxPvtData, error)// CommitWithPvtData commits the block and the corresponding pvt data in an atomic operationCommitWithPvtData(blockAndPvtdata *BlockAndPvtData, commitOpts *CommitOptions) error// Purge removes private read-writes set generated by endorsers at block height lesser than// a given maxBlockNumToRetain. In other words, Purge only retains private read-write sets// that were generated at block height of maxBlockNumToRetain or higher.PurgePrivateData(maxBlockNumToRetain uint64) error// PrivateDataMinBlockNum returns the lowest retained endorsement block heightPrivateDataMinBlockNum() (uint64, error)//Prune prunes the blocks/transactions that satisfy the given policyPrune(policy commonledger.PrunePolicy) error// GetConfigHistoryRetriever returns the ConfigHistoryRetrieverGetConfigHistoryRetriever() (ConfigHistoryRetriever, error)// CommitPvtDataOfOldBlocks commits the private data corresponding to already committed block// If hashes for some of the private data supplied in this function does not match// the corresponding hash present in the block, the unmatched private data is not// committed and instead the mismatch inforation is returned backCommitPvtDataOfOldBlocks(blockPvtData []*BlockPvtData) ([]*PvtdataHashMismatch, error)// GetMissingPvtDataTracker return the MissingPvtDataTrackerGetMissingPvtDataTracker() (MissingPvtDataTracker, error)// DoesPvtDataInfoExist returns true when// (1) the ledger has pvtdata associated with the given block number (or)// (2) a few or all pvtdata associated with the given block number is missing but the//     missing info is recorded in the ledger (or)// (3) the block is committed and does not contain any pvtData.DoesPvtDataInfoExist(blockNum uint64) (bool, error)
}
//common\ledger\ledger_interface.go
type Ledger interface {// GetBlockchainInfo returns basic info about blockchainGetBlockchainInfo() (*common.BlockchainInfo, error)// GetBlockByNumber returns block at a given height// blockNumber of  math.MaxUint64 will return last blockGetBlockByNumber(blockNumber uint64) (*common.Block, error)// GetBlocksIterator returns an iterator that starts from `startBlockNumber`(inclusive).// The iterator is a blocking iterator i.e., it blocks till the next block gets available in the ledger// ResultsIterator contains type BlockHolderGetBlocksIterator(startBlockNumber uint64) (ResultsIterator, error)// Close closes the ledgerClose()
}
//\core\ledger\kvledger\kv_ledger.go
type kvLedger struct {ledgerID               stringblockStore             *ledgerstorage.Storetxtmgmt                txmgr.TxMgrhistoryDB              historydb.HistoryDBconfigHistoryRetriever ledger.ConfigHistoryRetrieverblockAPIsRWLock        *sync.RWMutexstats                  *ledgerStatscommitHash             []byte
}

然后看一下他们的内部成员:


// Store encapsulates two stores 1) block store and pvt data store
type Store struct {blkstorage.BlockStore//下面的store继承了pvtdatastorage.Store接口pvtdataStore                pvtdatastorage.Storerwlock                      sync.RWMutexisPvtstoreAheadOfBlockstore atomic.Value
}
type store struct {db        *leveldbhelper.DBHandleledgerid  string//超时策略//其关心的配置选项blockToLive(BTL):私有数据在sideDB保存时长。如果要一致保存,则配置为0.btlPolicy pvtdatapolicy.BTLPolicyisEmpty            boollastCommittedBlock uint64batchPending       boolpurgerLock         sync.MutexcollElgProcSync    *collElgProcSync// After committing the pvtdata of old blocks,// the `isLastUpdatedOldBlocksSet` is set to true.// Once the stateDB is updated with these pvtdata,// the `isLastUpdatedOldBlocksSet` is set to false.// isLastUpdatedOldBlocksSet is mainly used during the// recovery process. During the peer startup, if the// isLastUpdatedOldBlocksSet is set to true, the pvtdata// in the stateDB needs to be updated before finishing the// recovery operation.//更新清理的标记isLastUpdatedOldBlocksSet bool
}

这样,基本上一个完整的节点存储脉络就清晰了。其它节点如果使用存储也不外乎在这层次之内选择。但是整个的重点就在这里。从上面的分析基本可以看出整个数据库的脉络:Provider、 DB(既有接口又有实现类)这两个互相配合。在上面的代码中看到了一个现象,经常是xxxDB(historyDB)和xxProvider的关系,其实最后都映射到了leveldbhelper.Provider和leveldbhelper.DB的关系,其实就是通过Provider可以拿到相关DB的实例。在Provider中,可以提供一个或多个物理或逻辑上的数据库,然后通过GetDBHandle得到具体的数据库对象操作的句柄。当然这些具体的数据库本身又有自己同名的数据接口,一定要区分清楚。
从Peer层次上看,kvledgr.Provider是整个节点所有帐本的抽象,而kvLedger对应着通道的帐本,可以说实际上常说的帐本,应该是后一个,而不是前一个,前一个是一个综合集。从里面可以找出任一个想查询的数据帐本或者说数据库。
这样,从两条线上就把数据存储这一部分看清楚了。这里面有一点东西耗费了很多时间,就是bookkeeping的流转过程,初步总结基本如下:
Peer的Start.go server函数中两次init,once.Do保证只执行一次。然后在peer.go加入通道时CreateChainFromBlock调用Create时,将bookkeeping传递到expirekeeper中。即最终在这里使用:
func newExpiryKeeper(ledgerid string, provider bookkeeping.Provider) expiryKeeper {
return &expKeeper{provider.GetDBHandle(ledgerid, bookkeeping.PvtdataExpiry)}
}
通过ID来得到过期控制的隐私数据帐本。预习利润代码看下面的启动部分。

二、Peer节点启动和初始化代码

老规矩,还是从启动部分开始,这样可以顺利接驳以前的分析。同时,对一些前面没有展开的具体的部分详细加以说明:
1、入口
在上面提到了Peer的启动初始化,即:server()函数调用ledgermgmt.Initinalize()再调用同文件下的initialize()函数:

//core\ledger\ledgermgmt\ledger_mgmt.go
// Initialize initializes ledgermgmt
func Initialize(initializer *Initializer) {once.Do(func() {initialize(initializer)})
}func initialize(initializer *Initializer) {logger.Info("Initializing ledger mgmt")lock.Lock()defer lock.Unlock()initialized = true//当前打开的帐本openedLedgers = make(map[string]ledger.PeerLedger)customtx.Initialize(initializer.CustomTxProcessors)cceventmgmt.Initialize(&chaincodeInfoProviderImpl{initializer.PlatformRegistry,initializer.DeployedChaincodeInfoProvider,})finalStateListeners := addListenerForCCEventsHandler(initializer.DeployedChaincodeInfoProvider, []ledger.StateListener{})//创建新的帐本提供器--看下面的函数定义provider, err := kvledger.NewProvider()if err != nil {panic(errors.WithMessage(err, "Error in instantiating ledger provider"))}//初始这个帐本提供器--此处调用Initialize的实现接口函数//另外注意ledger.Initializer这个对象的构造err = provider.Initialize(&ledger.Initializer{//状态监听器StateListeners:                finalStateListeners,//链码部署信息提供器DeployedChaincodeInfoProvider: initializer.DeployedChaincodeInfoProvider,//MSP成员信息提供器MembershipInfoProvider:        initializer.MembershipInfoProvider,//监控提供器MetricsProvider:               initializer.MetricsProvider,//健康检查器HealthCheckRegistry:           initializer.HealthCheckRegistry,})if err != nil {panic(errors.WithMessage(err, "Error initializing ledger provider"))}ledgerProvider = providerlogger.Info("ledger mgmt initialized")
}//fabric\core\ledger\kvledger\kv_ledger_provider.go
// Initialize implements the corresponding method from interface ledger.PeerLedgerProvider
func (provider *Provider) Initialize(initializer *ledger.Initializer) error {var err errorconfigHistoryMgr := confighistory.NewMgr(initializer.DeployedChaincodeInfoProvider)collElgNotifier := &collElgNotifier{initializer.DeployedChaincodeInfoProvider,initializer.MembershipInfoProvider,make(map[string]collElgListener),}stateListeners := initializer.StateListenersstateListeners = append(stateListeners, collElgNotifier)stateListeners = append(stateListeners, configHistoryMgr)provider.initializer = initializer//帐本提供器的创建provider.ledgerStoreProvider = ledgerstorage.NewProvider(initializer.MetricsProvider)provider.configHistoryMgr = configHistoryMgrprovider.stateListeners = stateListenersprovider.collElgNotifier = collElgNotifier//隐私数据库持有者的创建provider.bookkeepingProvider = bookkeeping.NewProvider()//状态数据库的创建provider.vdbProvider, err = privacyenabledstate.NewCommonStorageDBProvider(provider.bookkeepingProvider, initializer.MetricsProvider, initializer.HealthCheckRegistry)if err != nil {return err}provider.stats = newStats(initializer.MetricsProvider)//重新恢复未完成构建的Peer节点帐本对象,比如帐本在创建时意外的崩溃等情况provider.recoverUnderConstructionLedger()return nil
}//fabric\core\ledger\kvledger\kv_ledger_provider.go
// NewProvider instantiates a new Provider.
// This is not thread-safe and assumed to be synchronized be the caller
//provider, err := kvledger.NewProvider()注意和其它的NewProvider()的不同
func NewProvider() (ledger.PeerLedgerProvider, error) {logger.Info("Initializing ledger provider")// Initialize the ID store (inventory of chainIds/ledgerIds)idStore := openIDStore(ledgerconfig.GetLedgerProviderPath())// Initialize the history database (index for history of values by key)historydbProvider := historyleveldb.NewHistoryDBProvider()fileLock := leveldbhelper.NewFileLock(ledgerconfig.GetFileLockPath())if err := fileLock.Lock(); err != nil {return nil, errors.Wrap(err, "as another peer node command is executing,"+" wait for that command to complete its execution or terminate it before retrying")}logger.Info("ledger provider Initialized")provider := &Provider{idStore, nil,nil, historydbProvider, nil, nil, nil, nil, nil, nil, fileLock}return provider, nil
}

帐本是两个地方,一个是前面分析过的加入通道时:

//\core\scc\cscc\configure.go
func joinChain(chainID string, block *common.Block, ccp ccprovider.ChaincodeProvider, sccp sysccprovider.SystemChaincodeProvider) pb.Response {if err := peer.CreateChainFromBlock(block, ccp, sccp); err != nil {return shim.Error(err.Error())}peer.InitChain(chainID)return shim.Success(nil)
}
//core/peer/peer.go
// CreateChainFromBlock creates a new chain from config block
func CreateChainFromBlock(cb *common.Block, ccp ccprovider.ChaincodeProvider, sccp sysccprovider.SystemChaincodeProvider) error {cid, err := utils.GetChainIDFromBlock(cb)if err != nil {return err}var l ledger.PeerLedgerif l, err = ledgermgmt.CreateLedger(cb); err != nil {return errors.WithMessage(err, "cannot create ledger from genesis block")}return createChain(cid, l, cb, ccp, sccp, pluginMapper)
}
/ CreateLedger creates a new ledger with the given genesis block.
// This function guarantees that the creation of ledger and committing the genesis block would an atomic action
// The chain id retrieved from the genesis block is treated as a ledger id
func CreateLedger(genesisBlock *common.Block) (ledger.PeerLedger, error) {lock.Lock()defer lock.Unlock()if !initialized {return nil, ErrLedgerMgmtNotInitialized}id, err := utils.GetChainIDFromBlock(genesisBlock)if err != nil {return nil, err}logger.Infof("Creating ledger [%s] with genesis block", id)//创建帐本l, err := ledgerProvider.Create(genesisBlock)if err != nil {return nil, err}l = wrapLedger(id, l)openedLedgers[id] = llogger.Infof("Created ledger [%s] with genesis block", id)return l, nil
}func (provider *Provider) Create(genesisBlock *common.Block) (ledger.PeerLedger, error) {// 从创世块中获取链ID做为帐本IDledgerID, err := utils.GetChainIDFromBlock(genesisBlock)if err != nil {return nil, err}//检查是否此ID已经存在,存在表示已经创建exists, err := provider.idStore.ledgerIDExists(ledgerID)if err != nil {return nil, err}if exists {return nil, ErrLedgerIDExists}//设置帐本尚处理于构建中状态标志if err = provider.idStore.setUnderConstructionFlag(ledgerID); err != nil {return nil, err}//获得ID帐本的句柄lgr, err := provider.openInternal(ledgerID)if err != nil {logger.Errorf("Error opening a new empty ledger. Unsetting under construction flag. Error: %+v", err)panicOnErr(provider.runCleanup(ledgerID), "Error running cleanup for ledger id [%s]", ledgerID)panicOnErr(provider.idStore.unsetUnderConstructionFlag(), "Error while unsetting under construction flag")return nil, err}//提交块和隐私数据--这个关边分析过if err := lgr.CommitWithPvtData(&ledger.BlockAndPvtData{Block: genesisBlock}, &ledger.CommitOptions{}); err != nil {lgr.Close()return nil, err}//创建帐本的ID键值对panicOnErr(provider.idStore.createLedgerID(ledgerID, genesisBlock), "Error while marking ledger as created")return lgr, nil
}

在前面的代码中看到了“ledgerProvider = provider”,而前面这个全局的变量负责调用Create(genesisBlock*common.Block)来创建指定通道帐本的创世区块genesisBlock来创建Peer节点的帐本对象,也就是上面的Create函数。这里一定要看清楚不同的Provider的创建,可能一个疏忽大意,代码就看走眼。
另外一个是默认启动,还记不记得前面Peer节点启动的start.go中的Server函数:

func serve(args []string) error {......endorserSupport := &endorser.SupportImpl{SignerSupport:    signingIdentity,//这个默认定义Peer:             peer.Default,PeerSupport:      peer.DefaultSupport,ChaincodeSupport: chaincodeSupport,SysCCProvider:    sccp,ACLProvider:      aclProvider,}......
}
// Default provides in implementation of the Peer interface that provides
// access to the package level state.
var Default Operations = &peerImpl{//第一个CreateChainFromBlock,它是peer.go中的全局函数createChainFromBlock: CreateChainFromBlock,getChannelConfig:     GetChannelConfig,getChannelsInfo:      GetChannelsInfo,getCurrConfigBlock:   GetCurrConfigBlock,getLedger:            GetLedger,getMSPIDs:            GetMSPIDs,getPolicyManager:     GetPolicyManager,initChain:            InitChain,initialize:           Initialize,
}
var DefaultSupport Support = &supportImpl{operations: Default}
//这个是peerImpl中的的CreateChainFromBlock
func (p *peerImpl) CreateChainFromBlock(cb *common.Block, ccp ccprovider.ChaincodeProvider, sccp sysccprovider.SystemChaincodeProvider) error {//这个调用上面Default中第一个CreateChainFromBlock,也即调用peer.go中的全局函数return p.createChainFromBlock(cb, ccp, sccp)
}

先看一下kvledger.NewProvider()这个函数。此函数中provider.recoverUnderConstructionLedger()这个函数是比较复杂的,其它的调用基本都是初始化相关:

//fabric\core\ledger\kvledger\kv_ledger_provider.go
func (provider *Provider) recoverUnderConstructionLedger() {logger.Debugf("Recovering under construction ledger")//获取idStore数据库中帐本的构造对象的状态标志位,通过其得到LedgerIDledgerID, err := provider.idStore.getUnderConstructionFlag()panicOnErr(err, "Error while checking whether the under construction flag is set")if ledgerID == "" {logger.Debugf("No under construction ledger found. Quitting recovery")return}logger.Infof("ledger [%s] found as under construction", ledgerID)//打开或创建创建由LedgerID指定的通道帐本ledger, err := provider.openInternal(ledgerID)panicOnErr(err, "Error while opening under construction ledger [%s]", ledgerID)//获取区块链的信息bcInfo, err := ledger.GetBlockchainInfo()panicOnErr(err, "Error while getting blockchain info for the under construction ledger [%s]", ledgerID)ledger.Close()//检查帐本的高度switch bcInfo.Height {case 0://创世块未提交,意味着没区块logger.Infof("Genesis block was not committed. Hence, the peer ledger not created. unsetting the under construction flag")panicOnErr(provider.runCleanup(ledgerID), "Error while running cleanup for ledger id [%s]", ledgerID)panicOnErr(provider.idStore.unsetUnderConstructionFlag(), "Error while unsetting under construction flag")case 1://提交了创世块,表示帐本已经创建logger.Infof("Genesis block was committed. Hence, marking the peer ledger as created")genesisBlock, err := ledger.GetBlockByNumber(0)panicOnErr(err, "Error while retrieving genesis block from blockchain for ledger [%s]", ledgerID)//构建帐本的ID键值对panicOnErr(provider.idStore.createLedgerID(ledgerID, genesisBlock), "Error while adding ledgerID [%s] to created list", ledgerID)default:panic(errors.Errorf("data inconsistency: under construction flag is set for ledger [%s] while the height of the blockchain is [%d]",ledgerID, bcInfo.Height))}return
}
func (s *idStore) getUnderConstructionFlag() (string, error) {//underConstructionLedgerKey对应的Value被取出,即帐本IDval, err := s.db.Get(underConstructionLedgerKey)if err != nil {return "", err}return string(val), nil
}
//core\ledger\kvledger\kv_ledger_provider.go
func (provider *Provider) openInternal(ledgerID string) (ledger.PeerLedger, error) {// Get the block store for a chain/ledger//打开本地帐本数据存储对象blockStore, err := provider.ledgerStoreProvider.Open(ledgerID)if err != nil {return nil, err}provider.collElgNotifier.registerListener(ledgerID, blockStore)// Get the versioned database (state database) for a chain/ledgervDB, err := provider.vdbProvider.GetDBHandle(ledgerID)if err != nil {return nil, err}// Get the history database (index for history of values by key) for a chain/ledgerhistoryDB, err := provider.historydbProvider.GetDBHandle(ledgerID)if err != nil {return nil, err}// Create a kvLedger for this chain/ledger, which encasulates the underlying data stores// (id store, blockstore, state database, history database)l, err := newKVLedger(ledgerID, blockStore, vDB, historyDB, provider.configHistoryMgr,provider.stateListeners, provider.bookkeepingProvider,provider.initializer.DeployedChaincodeInfoProvider,provider.stats.ledgerStats(ledgerID),)if err != nil {return nil, err}return l, nil
}
//\core\ledger\kvledger\kv_ledger.go
// NewKVLedger constructs new `KVLedger`
func newKVLedger(ledgerID string,blockStore *ledgerstorage.Store,versionedDB privacyenabledstate.DB,historyDB historydb.HistoryDB,configHistoryMgr confighistory.Mgr,stateListeners []ledger.StateListener,bookkeeperProvider bookkeeping.Provider,ccInfoProvider ledger.DeployedChaincodeInfoProvider,stats *ledgerStats,
) (*kvLedger, error) {logger.Debugf("Creating KVLedger ledgerID=%s: ", ledgerID)// Create a kvLedger for this chain/ledger, which encasulates the underlying// id store, blockstore, txmgr (state database), history database//创建帐本l := &kvLedger{ledgerID: ledgerID, blockStore: blockStore, historyDB: historyDB, blockAPIsRWLock: &sync.RWMutex{}}// Retrieves the current commit hash from the blockstore//检索提交的HASH,var err errorl.commitHash, err = l.lastPersistedCommitHash()if err != nil {return nil, err}// TODO Move the function `GetChaincodeEventListener` to ledger interface and// this functionality of regiserting for events to ledgermgmt package so that this// is reused across other future ledger implementationsccEventListener := versionedDB.GetChaincodeEventListener()logger.Debugf("Register state db for chaincode lifecycle events: %t", ccEventListener != nil)if ccEventListener != nil {cceventmgmt.GetMgr().Register(ledgerID, ccEventListener)}//构建隐私数据的相关策略,超时等btlPolicy := pvtdatapolicy.ConstructBTLPolicy(&collectionInfoRetriever{l, ccInfoProvider})//初始化交易管理器if err := l.initTxMgr(versionedDB, stateListeners, btlPolicy, bookkeeperProvider, ccInfoProvider); err != nil {return nil, err}//利用策略初始化存储块l.initBlockStore(btlPolicy)//Recover both state DB and history DB if they are out of sync with block storage//恢复不同步的状态数据库和历史数据库if err := l.recoverDBs(); err != nil {return nil, err}l.configHistoryRetriever = configHistoryMgr.GetRetriever(ledgerID, l)l.stats = statsreturn l, nil
}
//\core\ledger\kvledger\kv_ledger.go
func (l *kvLedger) initTxMgr(versionedDB privacyenabledstate.DB, stateListeners []ledger.StateListener,btlPolicy pvtdatapolicy.BTLPolicy, bookkeeperProvider bookkeeping.Provider, ccInfoProvider ledger.DeployedChaincodeInfoProvider) error {var err errorl.txtmgmt, err = lockbasedtxmgr.NewLockBasedTxMgr(l.ledgerID, versionedDB, stateListeners, btlPolicy, bookkeeperProvider, ccInfoProvider)return err
}
//Recover the state database and history database (if exist)
//by recommitting last valid blocks
//\core\ledger\kvledger\kv_ledger.go
func (l *kvLedger) recoverDBs() error {logger.Debugf("Entering recoverDB()")if err := l.syncStateAndHistoryDBWithBlockstore(); err != nil {return err}if err := l.syncStateDBWithPvtdatastore(); err != nil {return err}return nil
}
func (l *kvLedger) syncStateAndHistoryDBWithBlockstore() error {//If there is no block in blockstorage, nothing to recover.//如果帐本中不存在数据,则无需恢复,即blockstorage没有block。info, _ := l.blockStore.GetBlockchainInfo()if info.Height == 0 {logger.Debug("Block storage is empty.")return nil}//重点获得当前有效的区块高度,为0则代表无数据lastAvailableBlockNum := info.Height - 1//构造恢复对象列表recoverables := []recoverable{l.txtmgmt, l.historyDB}recoverers := []*recoverer{}for _, recoverable := range recoverables {//检查数据库的对象同步状态recoverFlag, firstBlockNum, err := recoverable.ShouldRecover(lastAvailableBlockNum)if err != nil {return err}// During ledger reset/rollback, the state database must be dropped. If the state database// uses goleveldb, the reset/rollback code itself drop the DB. If it uses couchDB, the// DB must be dropped manually. Hence, we compare (only for the stateDB) the height// of the state DB and block store to ensure that the state DB is dropped.// firstBlockNum is nothing but the nextBlockNum expected by the state DB.// In otherwords, the firstBlockNum is nothing but the height of stateDB.//回滚或重置帐本,需要删除状态数据库。CouchDB需要手动来处理。if firstBlockNum > lastAvailableBlockNum+1 {dbName := recoverable.Name()return fmt.Errorf("the %s database [height=%d] is ahead of the block store [height=%d]. "+"This is possible when the %s database is not dropped after a ledger reset/rollback. "+"The %s database can safely be dropped and will be rebuilt up to block store height upon the next peer start.",dbName, firstBlockNum, lastAvailableBlockNum+1, dbName, dbName)}if recoverFlag {//如果更新恢复则添加上述的恢复列表recoverers = append(recoverers, &recoverer{firstBlockNum, recoverable})}}//根据恢复的数据量来确定是否需要恢复if len(recoverers) == 0 {return nil}if len(recoverers) == 1 {return l.recommitLostBlocks(recoverers[0].firstBlockNum, lastAvailableBlockNum, recoverers[0].recoverable)}//如果两个数据库均要恢复,则按照高度最低(缺失最多)的来懒得// both dbs need to be recoveredif recoverers[0].firstBlockNum > recoverers[1].firstBlockNum {// swap (put the lagger db at 0 index)//如果第一个缺失少,则交换二者,也就是说谁缺失的多谁在0recoverers[0], recoverers[1] = recoverers[1], recoverers[0]}if recoverers[0].firstBlockNum != recoverers[1].firstBlockNum {// bring the lagger db equal to the other db//先让二者保持一致if err := l.recommitLostBlocks(recoverers[0].firstBlockNum, recoverers[1].firstBlockNum-1,recoverers[0].recoverable); err != nil {return err}}// get both the db upto block storage//恢复数据到指定高度return l.recommitLostBlocks(recoverers[1].firstBlockNum, lastAvailableBlockNum,recoverers[0].recoverable, recoverers[1].recoverable)
}
func (l *kvLedger) syncStateDBWithPvtdatastore() error {// TODO: So far, the design philosophy was that the scope of block storage is// limited to storing and retrieving blocks data with certain guarantees and statedb is// for the state management. The higher layer, 'kvledger', coordinates the acts between// the two. However, with maintaining the state of the consumption of blocks (i.e,// lastUpdatedOldBlockList for pvtstore reconciliation) within private data block storage// breaks that assumption. The knowledge of what blocks have been consumed for the purpose// of state update should not lie with the source (i.e., pvtdatastorage). A potential fix// is mentioned in FAB-12731//得到最新隐私区块数据blocksPvtData, err := l.blockStore.GetLastUpdatedOldBlocksPvtData()if err != nil {return err}// as the pvtdataStore can contain pvtData of yet to be committed blocks,// we need to filter them before passing it to the transaction manager for// stateDB updates.//提交前过滤数据if err := l.filterYetToCommitBlocks(blocksPvtData); err != nil {return err}//提交if err = l.applyValidTxPvtDataOfOldBlocks(blocksPvtData); err != nil {return err}return nil
}

在帐本建立的过程可,可能会由于各种原因导致创建的失败,比如系统的崩溃,硬件故障等,那么recoverUnderConstructionLedger, 这个函数主要是通过拿到LedgerID得到相应的帐本对象,然后检查并恢复相关的通道帐本。通过分析区块链信息的的bcInfo中的高度来得到帐本的实际状态情况。根据信息的不同情况,来判断帐本当前的状态,根据当前的状态来决定下一步的行动是重新创建还是继续创建。在newKVLedger函数中,可以看到l.recoverDBs(),用来恢复非同步的相关数据库的数据。
在这里需要提醒的是,相比于老的1.0代码,此处增加隐私数据库的数据,大家可以跟进去看一下。好,既然前面有数据库的类图,下面以帐本的类图呼应一下:

三、idStore

首先得说明这个数据库是LevelDB,在NewProvider这个函数中,第一行感兴趣的代码就是它。即:

//fabric\core\ledger\kvledger\kv_ledger_provider.go
func NewProvider() (ledger.PeerLedgerProvider, error) {......// Initialize the ID store (inventory of chainIds/ledgerIds)idStore := openIDStore(ledgerconfig.GetLedgerProviderPath())
......return provider, nil
}
func openIDStore(path string) *idStore {db := leveldbhelper.CreateDB(&leveldbhelper.Conf{DBPath: path})db.Open()return &idStore{db}
}

再向下调用就是数据接口的相关实现了,其实就是直接创建了一个idStore的数据库并返回过来。在上面的recoverUnderConstructionLedger函数和Create函数中都调用createLedgerID这个函数:

//fabric\core\ledger\kvledger\kv_ledger_provider.go
func (s *idStore) createLedgerID(ledgerID string, gb *common.Block) error {key := s.encodeLedgerKey(ledgerID)var val []bytevar err errorif val, err = s.db.Get(key); err != nil {return err}if val != nil {return ErrLedgerIDExists}if val, err = proto.Marshal(gb); err != nil {return err}batch := &leveldb.Batch{}batch.Put(key, val)batch.Delete(underConstructionLedgerKey)return s.db.WriteBatch(batch, true)
}

这个函数就是通过键值对的形式来保存Ledger的ID。其中KEY为[]byte(“l”)+,值为帐本的创世块genesisBlock序列化后封装的字节数组。另外,前面也提到过,这个数据库还负责保存维护帐本的构造状态标识键值对,即是否创建或构建中。其KEY为[]byute(“underConstructionLedgerKey”),值为[]byte(ledgerID)。在这种状态下,其类似于一个Checkpoint的机制,控制数据库的安全创建。
在前面也看到了,在创建前首先判断一下ledgerIDExists是否已经存在,并可能通过getAllLedgerIds获得创建的ID列表:

func (s *idStore) ledgerIDExists(ledgerID string) (bool, error) {key := s.encodeLedgerKey(ledgerID)val := []byte{}err := error(nil)if val, err = s.db.Get(key); err != nil {return false, err}return val != nil, nil
}
func (s *idStore) getAllLedgerIds() ([]string, error) {var ids []stringitr := s.db.GetIterator(nil, nil)defer itr.Release()itr.First()for itr.Valid() {if bytes.Equal(itr.Key(), underConstructionLedgerKey) {continue}id := string(s.decodeLedgerID(itr.Key()))ids = append(ids, id)itr.Next()}return ids, nil
}

四、区块数据存储和隐私数据存储

在上面的数据库创建过程中,会发现下面的代码:


// Initialize implements the corresponding method from interface ledger.PeerLedgerProvider
func (provider *Provider) Initialize(initializer *ledger.Initializer) error {var err error
......provider.ledgerStoreProvider = ledgerstorage.NewProvider(initializer.MetricsProvider)
......return nil
}

在前面已经看到相关的数据结构体的定义,此处是为了创建ledgerStoreProvider(ledgerstorage.Provider),看一下定义:

//fabric\core\ledger\ledgerstorage\stroe.go
func NewProvider(metricsProvider metrics.Provider) *Provider {// Initialize the block storageindexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex}blockStoreProvider := fsblkstorage.NewProvider(fsblkstorage.NewConf(ledgerconfig.GetBlockStorePath(), ledgerconfig.GetMaxBlockfileSize()),indexConfig,metricsProvider)pvtStoreProvider := pvtdatastorage.NewProvider()return &Provider{blockStoreProvider, pvtStoreProvider}
}

其实就是生成此数据结构中的两个对象,blkStoreProvider和pvtdataStoreProvider,也就是重点要分析的区块数据和隐私数据的存储。这里默认的区块设置最大字节数为64M.下面的第五小节"区块索引数据库"会详细的对其进行说明,这里先看一下隐私数据库的创建:

//\fabric\core\ledger\pvtdatastorage\store_impl.go
func NewProvider() Provider {dbPath := ledgerconfig.GetPvtdataStorePath()dbProvider := leveldbhelper.NewProvider(&leveldbhelper.Conf{DBPath: dbPath})return &provider{dbProvider: dbProvider}
}

再看一下打开相关数据库:

//fabric\core\ledger\kvledger\kv_ledger_provider.go
func (provider *Provider) recoverUnderConstructionLedger() {......logger.Infof("ledger [%s] found as under construction", ledgerID)//打开或创建创建由LedgerID指定的通道帐本ledger, err := provider.openInternal(ledgerID)
......
}
//fabric\core\ledger\kvledger\kv_ledger_provider.go
func (provider *Provider) openInternal(ledgerID string) (ledger.PeerLedger, error) {// Get the block store for a chain/ledgerblockStore, err := provider.ledgerStoreProvider.Open(ledgerID)......return l, nil
}
//fabric\core\ledger\ledgerstorage\stroe.go
// Open opens the store
func (p *Provider) Open(ledgerid string) (*Store, error) {var blockStore blkstorage.BlockStorevar pvtdataStore pvtdatastorage.Storevar err error//创建指定帐本的区块存储对象--具体在下面的“区块索引数据库”分析if blockStore, err = p.blkStoreProvider.OpenBlockStore(ledgerid); err != nil {return nil, err}//创建指定帐本的隐私数据存储对象if pvtdataStore, err = p.pvtdataStoreProvider.OpenStore(ledgerid); err != nil {return nil, err}//创建并初始化帐本的区块存储对象store := &Store{BlockStore:   blockStore,pvtdataStore: pvtdataStore,}if err := store.init(); err != nil {return nil, err}info, err := blockStore.GetBlockchainInfo()if err != nil {return nil, err}pvtstoreHeight, err := pvtdataStore.LastCommittedBlockHeight()if err != nil {return nil, err}store.isPvtstoreAheadOfBlockstore.Store(pvtstoreHeight > info.Height)return store, nil
}
// init first invokes function `initFromExistingBlockchain`
// in order to check whether the pvtdata store is present because of an upgrade
// of peer from 1.0 and need to be updated with the existing blockchain. If, this is
// not the case then this init will invoke function `syncPvtdataStoreWithBlockStore`
// to follow the normal course
func (s *Store) init() error {var initialized boolvar err error//初始化隐私数据存储对象if initialized, err = s.initPvtdataStoreFromExistingBlockchain(); err != nil || initialized {return err}//提交Pending数据--用来同步区块区块数据和隐私数据是否保持一致return s.commitPendingBatchInPvtdataStore()
}
func (s *Store) initPvtdataStoreFromExistingBlockchain() (bool, error) {var bcInfo *common.BlockchainInfovar pvtdataStoreEmpty boolvar err error//获得当前区块链的信息if bcInfo, err = s.BlockStore.GetBlockchainInfo(); err != nil {return false, err}//判断隐私数据库是否为空if pvtdataStoreEmpty, err = s.pvtdataStore.IsEmpty(); err != nil {return false, err}//隐私数据库为空并且区块存储数据高度大于0,则调用InitLastCommittedBlock基于最新的帐本区块号初始化隐私数据库,同时设置标记isEmpty为false。if pvtdataStoreEmpty && bcInfo.Height > 0 {if err = s.pvtdataStore.InitLastCommittedBlock(bcInfo.Height - 1); err != nil {return false, err}return true, nil}return false, nil
}
// InitLastCommittedBlock implements the function in the interface `Store`
func (s *store) InitLastCommittedBlock(blockNum uint64) error {if !(s.isEmpty && !s.batchPending) {return &ErrIllegalCall{"The private data store is not empty. InitLastCommittedBlock() function call is not allowed"}}batch := leveldbhelper.NewUpdateBatch()batch.Put(lastCommittedBlkkey, encodeLastCommittedBlockVal(blockNum))if err := s.db.WriteBatch(batch, true); err != nil {return err}s.isEmpty = falses.lastCommittedBlock = blockNumlogger.Debugf("InitLastCommittedBlock set to block [%d]", blockNum)return nil
}//提交Pending数据部分
// commitPendingBatchInPvtdataStore checks whether there is a pending batch
// (possibly from a previous system crash) of pvt data that was not committed.
// If a pending batch exists, the batch is committed.
func (s *Store) commitPendingBatchInPvtdataStore() error {var pendingPvtbatch boolvar err errorif pendingPvtbatch, err = s.pvtdataStore.HasPendingBatch(); err != nil {return err}if !pendingPvtbatch {return nil}// we can safetly commit the pending batch as gossip would avoid// fetching pvtData if already exist in the local pvtdataStore.// when the pvtdataStore height is greater than the blockstore,// pvtdata reconciler will not fetch any missing pvtData.//确认提交相关的隐私数据return s.pvtdataStore.Commit()
}

这里会用前面Committer中的相关流程,在其中的流程分析中得知,帐本数据存储对象在提交帐本是,是先通过Prepared()函数将隐私数据提交到隐私数据库的,同时添加相关的状态标志和相关的数据键值对。然后才将相关的区块数据提交,如果提交成功,则删除相关数据保存键值对的隐私数据,并添加lastCommittedBlkkey键值对以对应提交的区块号,否则就需要回滚数据。
在前面的区块数据和隐私数据的判断中有两种情况:一种是有区块数据,但是没有隐私数据的,则在当前直接同步创建即可,第二种是都存在的情况,直接提交隐私数据即可。在早期的版本中,此处进行了二者数据高度(区块高度和隐私数据高度)的比对,并针对不同情况进行了处理,现在的新版本中,简单了这部分的流程。
下面看一下两类数据存储对象的打开情况,首先看区块存储对象:


//fabric\common\ledger\blkstorage\fsblkstorage\fs_blockstore_provider.go
func (p *FsBlockstoreProvider) OpenBlockStore(ledgerid string) (blkstorage.BlockStore, error) {indexStoreHandle := p.leveldbProvider.GetDBHandle(ledgerid)return newFsBlockStore(ledgerid, p.conf, p.indexConfig, indexStoreHandle, p.stats), nil
}
// NewFsBlockStore constructs a `FsBlockStore`
func newFsBlockStore(id string, conf *Conf, indexConfig *blkstorage.IndexConfig,dbHandle *leveldbhelper.DBHandle, stats *stats) *fsBlockStore {//创建区块文件管理器-区块是用文件的形式来存储的,索引通过索引数据库来控制fileMgr := newBlockfileMgr(id, conf, indexConfig, dbHandle)// create ledgerStats and initialize blockchain_height statledgerStats := stats.ledgerStats(id)info := fileMgr.getBlockchainInfo()ledgerStats.updateBlockchainHeight(info.Height)return &fsBlockStore{id, conf, fileMgr, ledgerStats}
}
func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig, indexStore *leveldbhelper.DBHandle) *blockfileMgr {logger.Debugf("newBlockfileMgr() initializing file-based block storage for ledger: %s ", id)//Determine the root directory for the blockfile storage, if it does not exist create it//取得帐本的文件根目录rootDir := conf.getLedgerBlockDir(id)_, err := util.CreateDirIfMissing(rootDir)if err != nil {panic(fmt.Sprintf("Error creating block storage root dir [%s]: %s", rootDir, err))}// Instantiate the manager, i.e. blockFileMgr structure//初始化文件管理器mgr := &blockfileMgr{rootDir: rootDir, conf: conf, db: indexStore}// cp = checkpointInfo, retrieve from the database the file suffix or number of where blocks were stored.// It also retrieves the current size of that file and the last block number that was written to that file.// At init checkpointInfo:latestFileChunkSuffixNum=[0], latestFileChunksize=[0], lastBlockNumber=[0]//取得检查点的信息cpInfo, err := mgr.loadCurrentInfo()if err != nil {panic(fmt.Sprintf("Could not get block file info for current block file from db: %s", err))}if cpInfo == nil {logger.Info(`Getting block information from block storage`)//构造新的检查点信息if cpInfo, err = constructCheckpointInfoFromBlockFiles(rootDir); err != nil {panic(fmt.Sprintf("Could not build checkpoint info from block files: %s", err))}logger.Debugf("Info constructed by scanning the blocks dir = %s", spew.Sdump(cpInfo))} else {logger.Debug(`Synching block information from block storage (if needed)`)//从文件系统同步检查点系统syncCPInfoFromFS(rootDir, cpInfo)}//保存检查点信息err = mgr.saveCurrentInfo(cpInfo, true)if err != nil {panic(fmt.Sprintf("Could not save next block file info to db: %s", err))}//Open a writer to the file identified by the number and truncate it to only contain the latest block// that was completely saved (file system, index, cpinfo, etc)//创建指定文件的读写文件句柄currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum))if err != nil {panic(fmt.Sprintf("Could not open writer to current file: %s", err))}//Truncate the file to remove excess past last block//删除无效部分即未经验证部分err = currentFileWriter.truncateFile(cpInfo.latestFileChunksize)if err != nil {panic(fmt.Sprintf("Could not truncate current file to known size in db: %s", err))}// Create a new KeyValue store database handler for the blocks index in the keyvalue database//创建区块索引数据库,在下面详细解释if mgr.index, err = newBlockIndex(indexConfig, indexStore); err != nil {panic(fmt.Sprintf("error in block index: %s", err))}// Update the manager with the checkpoint info and the file writermgr.cpInfo = cpInfomgr.currentFileWriter = currentFileWriter// Create a checkpoint condition (event) variable, for the  goroutine waiting for// or announcing the occurrence of an event.mgr.cpInfoCond = sync.NewCond(&sync.Mutex{})// init BlockchainInfo for external API'sbcInfo := &common.BlockchainInfo{Height:            0,CurrentBlockHash:  nil,PreviousBlockHash: nil}if !cpInfo.isChainEmpty {//If start up is a restart of an existing storage, sync the index from block storage and update BlockchainInfo for external API'smgr.syncIndex()//建立区块索引信息lastBlockHeader, err := mgr.retrieveBlockHeaderByNumber(cpInfo.lastBlockNumber)if err != nil {panic(fmt.Sprintf("Could not retrieve header of the last block form file: %s", err))}//读取相关的值 ,哈希等lastBlockHash := lastBlockHeader.Hash()previousBlockHash := lastBlockHeader.PreviousHashbcInfo = &common.BlockchainInfo{Height:            cpInfo.lastBlockNumber + 1,CurrentBlockHash:  lastBlockHash,PreviousBlockHash: previousBlockHash}}mgr.bcInfo.Store(bcInfo)return mgr
}

创建并打开指定的通道ID的数据库,同时拿到它的句柄和ID进行映射,便于快速的对不同的通道的帐本进行查询。在newFsBlockStore中,首先调用newBlockfileMgr来创建区块文件管理器,用于管理义卖的区块数据文件与区块索引数据库,负责对区块、交易等数据库的存储和查询。可以索引数据库中取得最新的检查点信息,如果不存在则扫描默认帐本的目录重新构造生成检查点信息。如果当前通道的帐本上可更新区块数据,则对其建立索引信息和检查点信息并保存更新到区块索引数据库上。
区块检查点信息用来保存最后一次提交到帐本中的区块相关的信息,包括最新区块文件名后缀编号,区块文件字节数,最新区块号等。继续向下看同步索引:

//fabric\common\ledger\blkstorage\fsblkstorage\blockfile_mgr.go
func (mgr *blockfileMgr) syncIndex() error {var lastBlockIndexed uint64var indexEmpty boolvar err error//from the database, get the last block that was indexed//从数据库获取最近建立的索引区块号if lastBlockIndexed, err = mgr.index.getLastBlockIndexed(); err != nil {if err != errIndexEmpty {return err}//不存在设置为trueindexEmpty = true}//initialize index to file number:zero, offset:zero and blockNum:0startFileNum := 0startOffset := 0skipFirstBlock := false//get the last file that blocks were added to using the checkpoint info//获取最新区块文件后缀编号endFileNum := mgr.cpInfo.latestFileChunkSuffixNumstartingBlockNum := uint64(0)//if the index stored in the db has value, update the index information with those values//查检二者是否一致,否则同步if !indexEmpty {if lastBlockIndexed == mgr.cpInfo.lastBlockNumber {logger.Debug("Both the block files and indices are in sync.")return nil}logger.Debugf("Last block indexed [%d], Last block present in block files [%d]", lastBlockIndexed, mgr.cpInfo.lastBlockNumber)var flp *fileLocPointer//获取区块的文件指针if flp, err = mgr.index.getBlockLocByBlockNum(lastBlockIndexed); err != nil {return err}startFileNum = flp.fileSuffixNumstartOffset = flp.locPointer.offset//跳过第一个区块skipFirstBlock = truestartingBlockNum = lastBlockIndexed + 1} else {logger.Debugf("No block indexed, Last block present in block files=[%d]", mgr.cpInfo.lastBlockNumber)}logger.Infof("Start building index from block [%d] to last block [%d]", startingBlockNum, mgr.cpInfo.lastBlockNumber)//open a blockstream to the file location that was stored in the indexvar stream *blockStream//创建并打开一个文件流if stream, err = newBlockStream(mgr.rootDir, startFileNum, int64(startOffset), endFileNum); err != nil {return err}var blockBytes []bytevar blockPlacementInfo *blockPlacementInfoif skipFirstBlock {//如果跳过第一个区块标志位设置,则不处理相关数据//读取下一个区块if blockBytes, _, err = stream.nextBlockBytesAndPlacementInfo(); err != nil {return err}if blockBytes == nil {return errors.Errorf("block bytes for block num = [%d] should not be nil here. The indexes for the block are already present",lastBlockIndexed)}}//Should be at the last block already, but go ahead and loop looking for next blockBytes.//If there is another block, add it to the index.//This will ensure block indexes are correct, for example if peer had crashed before indexes got updated.blockIdxInfo := &blockIdxInfo{}for {if blockBytes, blockPlacementInfo, err = stream.nextBlockBytesAndPlacementInfo(); err != nil {return err}if blockBytes == nil {break}info, err := extractSerializedBlockInfo(blockBytes)if err != nil {return err}//The blockStartOffset will get applied to the txOffsets prior to indexing within indexBlock(),//therefore just shift by the difference between blockBytesOffset and blockStartOffsetnumBytesToShift := int(blockPlacementInfo.blockBytesOffset - blockPlacementInfo.blockStartOffset)for _, offset := range info.txOffsets {offset.loc.offset += numBytesToShift}//根据实际存储的文件系统中的信息更新相关索引数据库//Update the blockIndexInfo with what was actually stored in file systemblockIdxInfo.blockHash = info.blockHeader.Hash()blockIdxInfo.blockNum = info.blockHeader.NumberblockIdxInfo.flp = &fileLocPointer{fileSuffixNum: blockPlacementInfo.fileNum,locPointer: locPointer{offset: int(blockPlacementInfo.blockStartOffset)}}blockIdxInfo.txOffsets = info.txOffsetsblockIdxInfo.metadata = info.metadatalogger.Debugf("syncIndex() indexing block [%d]", blockIdxInfo.blockNum)//更新数据库if err = mgr.index.indexBlock(blockIdxInfo); err != nil {return err}if blockIdxInfo.blockNum%10000 == 0 {logger.Infof("Indexed block number [%d]", blockIdxInfo.blockNum)}}logger.Infof("Finished building index. Last block indexed [%d]", blockIdxInfo.blockNum)return nil
}

这里可以看一下区块文件的相关数据结构,可以更好的分析相关的代码:

//fabric\common\ledger\blkstorage\fsblkstorage\blockindex.go
type blockIdxInfo struct {blockNum  uint64blockHash []byteflp       *fileLocPointertxOffsets []*txindexInfometadata  *common.BlockMetadata
}
type fileLocPointer struct {fileSuffixNum intlocPointer
}type locPointer struct {offset      intbytesLength int
}
type BlockMetadata struct {Metadata             [][]byte `protobuf:"bytes,1,rep,name=metadata,proto3" json:"metadata,omitempty"`XXX_NoUnkeyedLiteral struct{} `json:"-"`XXX_unrecognized     []byte   `json:"-"`XXX_sizecache        int32    `json:"-"`
}
//fabric\common\ledger\blkstorage\fsblkstorage\block_serialization.go
type serializedBlockInfo struct {blockHeader *common.BlockHeadertxOffsets   []*txindexInfometadata    *common.BlockMetadata
}
//The order of the transactions must be maintained for history
type txindexInfo struct {txID        stringloc         *locPointerisDuplicate bool
}

这样初始的过程中的数据存储相关就基本分析完成了,那么在实际的应用中,在初始完成后,就是数据区块的存储或者说写入了,它可以有几种情况,在前面的记帐结点分析中提到过,这里只分析数据的提交部分:

//fabric\common\ledger\blkstorage\fsblkstorage\fs_blockstore.go
// AddBlock adds a new block
func (store *fsBlockStore) AddBlock(block *common.Block) error {// track elapsed time to collect block commit timestartBlockCommit := time.Now()result := store.fileMgr.addBlock(block)elapsedBlockCommit := time.Since(startBlockCommit)store.updateBlockStats(block.Header.Number, elapsedBlockCommit)return result
}
//fabric\common\ledger\blkstorage\fsblkstorage\blockfile_mgr.go
func (mgr *blockfileMgr) addBlock(block *common.Block) error {bcInfo := mgr.getBlockchainInfo()//比较当前区块数据的区块号与当前帐本的高度if block.Header.Number != bcInfo.Height {return errors.Errorf("block number should have been %d but was %d",mgr.getBlockchainInfo().Height, block.Header.Number,)}// Add the previous hash check - Though, not essential but may not be a bad idea to// verify the field `block.Header.PreviousHash` present in the block.// This check is a simple bytes comparison and hence does not cause any observable performance penalty// and may help in detecting a rare scenario if there is any bug in the ordering service.if !bytes.Equal(block.Header.PreviousHash, bcInfo.CurrentBlockHash) {return errors.Errorf("unexpected Previous block hash. Expected PreviousHash = [%x], PreviousHash referred in the latest block= [%x]",bcInfo.CurrentBlockHash, block.Header.PreviousHash,)}//序列化区块数组blockBytes, info, err := serializeBlock(block)if err != nil {return errors.WithMessage(err, "error serializing block")}//获取区块头哈希blockHash := block.Header.Hash()//Get the location / offset where each transaction starts in the block and where the block ends//交易的索引信息量txOffsets := info.txOffsets//当前即最新区块文件偏移量currentOffset := mgr.cpInfo.latestFileChunksize//计算区块字节长度blockBytesLen := len(blockBytes)//序列化区块字节数组长度blockBytesEncodedLen := proto.EncodeVarint(uint64(blockBytesLen))totalBytesToAppend := blockBytesLen + len(blockBytesEncodedLen)//Determine if we need to start a new file since the size of this block//exceeds the amount of space left in the current file//确定是否创建新区块文件if currentOffset+totalBytesToAppend > mgr.conf.maxBlockfileSize {mgr.moveToNextFile()currentOffset = 0}//append blockBytesEncodedLen to the file//添加区块字节长度到区块文件err = mgr.currentFileWriter.append(blockBytesEncodedLen, false)if err == nil {//append the actual block bytes to the fileerr = mgr.currentFileWriter.append(blockBytes, true)}if err != nil {truncateErr := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize)if truncateErr != nil {panic(fmt.Sprintf("Could not truncate current file to known size after an error during block append: %s", err))}return errors.WithMessage(err, "error appending block to file")}//Update the checkpoint info with the results of adding the new block//更新区块检查点信息并添加新区块信息currentCPInfo := mgr.cpInfonewCPInfo := &checkpointInfo{latestFileChunkSuffixNum: currentCPInfo.latestFileChunkSuffixNum,latestFileChunksize:      currentCPInfo.latestFileChunksize + totalBytesToAppend,isChainEmpty:             false,lastBlockNumber:          block.Header.Number}//save the checkpoint information in the databaseif err = mgr.saveCurrentInfo(newCPInfo, false); err != nil {truncateErr := mgr.currentFileWriter.truncateFile(currentCPInfo.latestFileChunksize)if truncateErr != nil {panic(fmt.Sprintf("Error in truncating current file to known size after an error in saving checkpoint info: %s", err))}return errors.WithMessage(err, "error saving current file info to db")}//Index block file location pointer updated with file suffex and offset for the new blockblockFLP := &fileLocPointer{fileSuffixNum: newCPInfo.latestFileChunkSuffixNum}blockFLP.offset = currentOffset// shift the txoffset because we prepend length of bytes before block bytesfor _, txOffset := range txOffsets {//遍历更新每个交易文件位置偏移量都加上区块长度字节长度txOffset.loc.offset += len(blockBytesEncodedLen)}//save the index in the database//建立区块索引信息并保存到区块索引数据库if err = mgr.index.indexBlock(&blockIdxInfo{blockNum: block.Header.Number, blockHash: blockHash,flp: blockFLP, txOffsets: txOffsets, metadata: block.Metadata}); err != nil {return err}//update the checkpoint info (for storage) and the blockchain info (for APIs) in the managermgr.updateCheckpoint(newCPInfo)mgr.updateBlockchainInfo(blockHash, block)return nil
}

其实不光有AddBlock这个函数,看一下它实现的接口,可以发现:

//fabric\common\ledger\blkstorage\blockstrage.go
type BlockStore interface {//添加区块AddBlock(block *common.Block) error//获得区块链的信息GetBlockchainInfo() (*common.BlockchainInfo, error)//根据区块的起始编号获得区块RetrieveBlocks(startNum uint64) (ledger.ResultsIterator, error)//根据区块哈希值获取区块RetrieveBlockByHash(blockHash []byte) (*common.Block, error)//根据区块号获取区块RetrieveBlockByNumber(blockNum uint64) (*common.Block, error) // blockNum of  math.MaxUint64 will return last block//根据交易ID获取交易对象RetrieveTxByID(txID string) (*common.Envelope, error)//根据区块序号和交易序号获取交易对象RetrieveTxByBlockNumTranNum(blockNum uint64, tranNum uint64) (*common.Envelope, error)//根据交易ID获取区块RetrieveBlockByTxID(txID string) (*common.Block, error)//根据交易ID获取交易验证码RetrieveTxValidationCodeByTxID(txID string) (peer.TxValidationCode, error)//结束关闭Shutdown()
}

这个区块存储的接口的实现主要在fsBlockStore这个结构体上,即:

//fabric\common\ledger\blkstorage\fsblkstorage\fs_blockstore.go
// fsBlockStore - filesystem based implementation for `BlockStore`
type fsBlockStore struct {id      stringconf    *ConffileMgr *blockfileMgrstats   *ledgerStats
}

但是其实跟踪到内部会发现,它自己并没有实现相关的功能,而是如上面的AddBlock一样,调用了fsblkstorage.blockfileMgr的相关函数(fabric\common\ledger\blkstorage\fsblkstorage\blockfile_mgr.go),所以说,这又回到了前面的分析中,明白了吧。更多的细节这里就不再赘述。

下面看分析一下隐私数据库的相关流程:

//隐私数据库的打开
// OpenStore returns a handle to a store
func (p *provider) OpenStore(ledgerid string) (Store, error) {dbHandle := p.dbProvider.GetDBHandle(ledgerid)s := &store{db: dbHandle, ledgerid: ledgerid,collElgProcSync: &collElgProcSync{notification: make(chan bool, 1),procComplete: make(chan bool, 1),},}if err := s.initState(); err != nil {return nil, err}s.launchCollElgProc()logger.Debugf("Pvtdata store opened. Initial state: isEmpty [%t], lastCommittedBlock [%d], batchPending [%t]",s.isEmpty, s.lastCommittedBlock, s.batchPending)return s, nil
}
//\fabric\core\ledger\pvtdatastorage\store_impl.go
func (s *store) initState() error {var err errorvar blist lastUpdatedOldBlocksListif s.isEmpty, s.lastCommittedBlock, err = s.getLastCommittedBlockNum(); err != nil {return err}if s.batchPending, err = s.hasPendingCommit(); err != nil {return err}if blist, err = s.getLastUpdatedOldBlocksList(); err != nil {return err}if len(blist) > 0 {s.isLastUpdatedOldBlocksSet = true} // false if not setreturn nil
}

然后分析一下提交隐私数据,同样在前面的记帐结点分析中提到过:

// CommitWithPvtData commits the block and the corresponding pvt data in an atomic operation
func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error {blockNum := blockAndPvtdata.Block.Header.Numbers.rwlock.Lock()defer s.rwlock.Unlock()pvtBlkStoreHt, err := s.pvtdataStore.LastCommittedBlockHeight()if err != nil {return err}writtenToPvtStore := falseif pvtBlkStoreHt < blockNum+1 { // The pvt data store sanity check does not allow rewriting the pvt data.// when re-processing blocks (rejoin the channel or re-fetching last few block),// skip the pvt data commit to the pvtdata blockstorelogger.Debugf("Writing block [%d] to pvt block store", blockNum)// If a state fork occurs during a regular block commit,// we have a mechanism to drop all blocks followed by refetching of blocks// and re-processing them. In the current way of doing this, we only drop// the block files (and related artifacts) but we do not drop/overwrite the// pvtdatastorage as it might leads to data loss.// During block reprocessing, as there is a possibility of an invalid pvtdata// transaction to become valid, we store the pvtdata of invalid transactions// too in the pvtdataStore as we do for the publicdata in the case of blockStore.pvtData, missingPvtData := constructPvtDataAndMissingData(blockAndPvtdata)//先准备if err := s.pvtdataStore.Prepare(blockAndPvtdata.Block.Header.Number, pvtData, missingPvtData); err != nil {return err}writtenToPvtStore = true} else {logger.Debugf("Skipping writing block [%d] to pvt block store as the store height is [%d]", blockNum, pvtBlkStoreHt)}if err := s.AddBlock(blockAndPvtdata.Block); err != nil {s.pvtdataStore.Rollback()return err}if pvtBlkStoreHt == blockNum+1 {// we reach here only when the pvtdataStore was ahead// of blockStore during the store opening time (would// occur after a peer rollback/reset).s.isPvtstoreAheadOfBlockstore.Store(false)}if writtenToPvtStore {//再验证return s.pvtdataStore.Commit()}return nil
}

隐私数据是分成两部分提交的,即先Prepared,然后再提交;提交成功后再Commit进行确认,清除相关数据,如果错误则需要回滚。
然后是查询隐私数据:


//fabric\core\ledger\pvtdatastorage\store_impl.go
func (s *store) GetPvtDataByBlockNum(blockNum uint64, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error) {logger.Debugf("Get private data for block [%d], filter=%#v", blockNum, filter)if s.isEmpty {return nil, &ErrOutOfRange{"The store is empty"}}lastCommittedBlock := atomic.LoadUint64(&s.lastCommittedBlock)//将当前区块号和最后提交区块号比较,看是否超出if blockNum > lastCommittedBlock {return nil, &ErrOutOfRange{fmt.Sprintf("Last committed block=%d, block requested=%d", lastCommittedBlock, blockNum)}}//获取查询范围的起始和结束键值startKey, endKey := getDataKeysForRangeScanByBlockNum(blockNum)logger.Debugf("Querying private data storage for write sets using startKey=%#v, endKey=%#v", startKey, endKey)//构造查询迭代器itr := s.db.GetIterator(startKey, endKey)defer itr.Release()var blockPvtdata []*ledger.TxPvtDatavar currentTxNum uint64var currentTxWsetAssember *txPvtdataAssemblerfirstItr := true//迭代查询for itr.Next() {dataKeyBytes := itr.Key()v11Fmt, err := v11Format(dataKeyBytes)if err != nil {return nil, err}if v11Fmt {return v11RetrievePvtdata(itr, filter)}dataValueBytes := itr.Value()dataKey, err := decodeDatakey(dataKeyBytes)if err != nil {return nil, err}expired, err := isExpired(dataKey.nsCollBlk, s.btlPolicy, lastCommittedBlock)if err != nil {return nil, err}if expired || !passesFilter(dataKey, filter) {continue}dataValue, err := decodeDataValue(dataValueBytes)if err != nil {return nil, err}if firstItr {currentTxNum = dataKey.txNumcurrentTxWsetAssember = newTxPvtdataAssembler(blockNum, currentTxNum)firstItr = false}if dataKey.txNum != currentTxNum {blockPvtdata = append(blockPvtdata, currentTxWsetAssember.getTxPvtdata())currentTxNum = dataKey.txNumcurrentTxWsetAssember = newTxPvtdataAssembler(blockNum, currentTxNum)}currentTxWsetAssember.add(dataKey.ns, dataValue)}if currentTxWsetAssember != nil {blockPvtdata = append(blockPvtdata, currentTxWsetAssember.getTxPvtdata())}return blockPvtdata, nil
}

在新的版本中会过滤掉过期的数据,这是需要注意的。
同样,在Fabric中还提供了区块和隐私查询的接口函数:

func (c *coordinator) GetPvtDataAndBlockByNum(seqNum uint64, peerAuthInfo common.SignedData) (*common.Block, util.PvtDataCollections, error) {blockAndPvtData, err := c.Committer.GetPvtDataAndBlockByNum(seqNum)if err != nil {return nil, nil, err}seqs2Namespaces := aggregatedCollections(make(map[seqAndDataModel]map[string][]*rwset.CollectionPvtReadWriteSet))data := blockData(blockAndPvtData.Block.Data.Data)storePvtDataOfInvalidTx := c.Support.CapabilityProvider.Capabilities().StorePvtDataOfInvalidTx()data.forEachTxn(storePvtDataOfInvalidTx, make(txValidationFlags, len(data)),func(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet, _ []*peer.Endorsement) error {item, exists := blockAndPvtData.PvtData[seqInBlock]if !exists {return nil}for _, ns := range item.WriteSet.NsPvtRwset {for _, col := range ns.CollectionPvtRwset {cc := common.CollectionCriteria{Channel:    chdr.ChannelId,TxId:       chdr.TxId,Namespace:  ns.Namespace,Collection: col.CollectionName,}sp, err := c.CollectionStore.RetrieveCollectionAccessPolicy(cc)if err != nil {logger.Warning("Failed obtaining policy for", cc, ":", err)continue}isAuthorized := sp.AccessFilter()if isAuthorized == nil {logger.Warning("Failed obtaining filter for", cc)continue}if !isAuthorized(peerAuthInfo) {logger.Debug("Skipping", cc, "because peer isn't authorized")continue}seqs2Namespaces.addCollection(seqInBlock, item.WriteSet.DataModel, ns.Namespace, col)}}return nil})return blockAndPvtData.Block, seqs2Namespaces.asPrivateData(), nil
}

函数首先通过c.Committer.GetPvtDataAndBlockByNum(seqNum)来利用kvLedger上的ledgerstorage.Store调用GetPvtDataAndBlockByNum获取指定区块号上的区块数据和隐私数据,依此来构造新的区块和隐私数据对象。然后遍历交易数据,过滤有效的交易序号及通道头数据。再后再调用LSCC获取相关数据集合,调用sp.AccessFilter()来生成节点过滤方法,验证节点的访问权限策略。如果验证通过,则将相关数据写入到seqs2Namespaces中。最后,将区块数据和交易隐私数据返回。

五、区块索引数据库

在前面的Initialize(fabric\core\ledger\kvledger\kv_ledger_provider.go)然后在recoverUnderConstructionLedger调用了openInternal这个函数,在前面的函数实现中可以看到它又调用了索引数据库的创建:

func (provider *Provider) openInternal(ledgerID string) (ledger.PeerLedger, error) {// Get the block store for a chain/ledgerblockStore, err := provider.ledgerStoreProvider.Open(ledgerID)......return l, nil
}
func (p *Provider) Open(ledgerid string) (*Store, error) {......//此处打开并创建区块存储的过程中创建了索引数据库if blockStore, err = p.blkStoreProvider.OpenBlockStore(ledgerid); err != nil {return nil, err}if pvtdataStore, err = p.pvtdataStoreProvider.OpenStore(ledgerid); err != nil {return nil, err}store := &Store{BlockStore:   blockStore,pvtdataStore: pvtdataStore,}
......return store, nil
}
func (p *FsBlockstoreProvider) OpenBlockStore(ledgerid string) (blkstorage.BlockStore, error) {indexStoreHandle := p.leveldbProvider.GetDBHandle(ledgerid)return newFsBlockStore(ledgerid, p.conf, p.indexConfig, indexStoreHandle, p.stats), nil
}
func newFsBlockStore(id string, conf *Conf, indexConfig *blkstorage.IndexConfig,dbHandle *leveldbhelper.DBHandle, stats *stats) *fsBlockStore {//创建区块文件管理器fileMgr := newBlockfileMgr(id, conf, indexConfig, dbHandle)// create ledgerStats and initialize blockchain_height statledgerStats := stats.ledgerStats(id)info := fileMgr.getBlockchainInfo()ledgerStats.updateBlockchainHeight(info.Height)return &fsBlockStore{id, conf, fileMgr, ledgerStats}
}
func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig, indexStore *leveldbhelper.DBHandle) *blockfileMgr {logger.Debugf("newBlockfileMgr() initializing file-based block storage for ledger: %s ", id)......// Create a new KeyValue store database handler for the blocks index in the keyvalue database//创建一个新的KV数据库句柄,用来存储区块的索引if mgr.index, err = newBlockIndex(indexConfig, indexStore); err != nil {panic(fmt.Sprintf("error in block index: %s", err))}......mgr.bcInfo.Store(bcInfo)return mgr
}

这时候儿再看一下这个区块索引数据库的定义和创建:

//\fabric\common\ledger\blkstorage\fsblkstorage\blockindex.go
type blockIndex struct {indexItemsMap map[blkstorage.IndexableAttr]booldb            *leveldbhelper.DBHandle
}func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *leveldbhelper.DBHandle) (*blockIndex, error) {indexItems := indexConfig.AttrsToIndexlogger.Debugf("newBlockIndex() - indexItems:[%s]", indexItems)indexItemsMap := make(map[blkstorage.IndexableAttr]bool)for _, indexItem := range indexItems {indexItemsMap[indexItem] = true}// This dependency is needed because the index 'IndexableAttrTxID' is used for detecting the duplicate txid// and the results are reused in the other two indexes. Ideally, all three indexes should be merged into one// for efficiency purpose - [FAB-10587]if (indexItemsMap[blkstorage.IndexableAttrTxValidationCode] || indexItemsMap[blkstorage.IndexableAttrBlockTxID]) &&!indexItemsMap[blkstorage.IndexableAttrTxID] {return nil, errors.Errorf("dependent index [%s] is not enabled for [%s] or [%s]",blkstorage.IndexableAttrTxID, blkstorage.IndexableAttrTxValidationCode, blkstorage.IndexableAttrBlockTxID)}return &blockIndex{indexItemsMap, db}, nil
}

其实一回头,发现还是回到了DBHandle,回到了leveldbhelper.db,其实在上面的索引数据库中不得不提到另外一个数据结构体:

//fabric\common\ledger\blkstorage\fsblkstorage\blockfile_mgr.go
type blockfileMgr struct {rootDir           stringconf              *Confdb                *leveldbhelper.DBHandleindex             indexcpInfo            *checkpointInfocpInfoCond        *sync.CondcurrentFileWriter *blockfileWriterbcInfo            atomic.Value
}// checkpointInfo区块检查点信息
type checkpointInfo struct {latestFileChunkSuffixNum intlatestFileChunksize      intisChainEmpty             boollastBlockNumber          uint64
}//支持的区块索引数据库的六种索引模式
const (IndexableAttrBlockNum         = IndexableAttr("BlockNum")IndexableAttrBlockHash        = IndexableAttr("BlockHash")IndexableAttrTxID             = IndexableAttr("TxID")IndexableAttrBlockNumTranNum  = IndexableAttr("BlockNumTranNum")IndexableAttrBlockTxID        = IndexableAttr("BlockTxID")IndexableAttrTxValidationCode = IndexableAttr("TxValidationCode")
)
//索引检查点
var indexCheckpointKey = []byte(indexCheckpointKeyStr)

这个区块文件管理器中的db和index中的db是一个玩意儿。那意味着,这个区块数据管理器的数据也是存储在索引数据库中的,包括区块检查点信息以及对上述常量信息的检查,另外还有索引检查点相关数据信息。

六、状态数据库

同样在Initialize函数中,还有状态数据库的创建:

func (provider *Provider) Initialize(initializer *ledger.Initializer) error {var err error
......provider.vdbProvider, err = privacyenabledstate.NewCommonStorageDBProvider(provider.bookkeepingProvider, initializer.MetricsProvider, initializer.HealthCheckRegistry)if err != nil {return err}
......return nil
}

在状态数据库中,保存了最新的有效交易的执行结果的读写集(实际上只有写集合,读集合是为了过滤有效交易),这些状态数据就是当前通道上的指定键的最新值,也就是常说的世界状态(World State),状态数据库支持LevelDB和CouchDB两种。状态数据的表述形式一般为(key,version,value),即键,当前版本和键所对应的值。其中版本号是用Height(区块号和交易序号)进行标识的。状态数据库提供了三种查询方式,即查询单条、多条和范围查询。如果支持CouchDB数据库,则可以使用更高级的一些查询方式。
在前面的代码中,通过调用下面的代码来完成数据库的创建工作:

//fabric\core\ledger\kvledger\txmgmt\privacyenabledstate\common_storage_db.go
// NewCommonStorageDBProvider constructs an instance of DBProvider
func NewCommonStorageDBProvider(bookkeeperProvider bookkeeping.Provider, metricsProvider metrics.Provider, healthCheckRegistry ledger.HealthCheckRegistry) (DBProvider, error) {var vdbProvider statedb.VersionedDBProvidervar err errorif ledgerconfig.IsCouchDBEnabled() {if vdbProvider, err = statecouchdb.NewVersionedDBProvider(metricsProvider); err != nil {return nil, err}} else {vdbProvider = stateleveldb.NewVersionedDBProvider()}dbProvider := &CommonStorageDBProvider{vdbProvider, healthCheckRegistry, bookkeeperProvider}err = dbProvider.RegisterHealthChecker()if err != nil {return nil, err}return dbProvider, nil
}
// NewVersionedDBProvider instantiates VersionedDBProvider
func NewVersionedDBProvider() *VersionedDBProvider {dbPath := ledgerconfig.GetStateLevelDBPath()logger.Debugf("constructing VersionedDBProvider dbPath=%s", dbPath)dbProvider := leveldbhelper.NewProvider(&leveldbhelper.Conf{DBPath: dbPath})return &VersionedDBProvider{dbProvider}
}

这里暂时先不考虑CouchDB,所以这一块先略过,这样看创建工作其实不复杂,搞一个leveldbhelper.NewProvider,然后再赋值给状态数据库的提供者即可。在其后又是老面孔,提到过好多次的recoverUnderConstructionLedger这个函数,它会调用openInternal这个函数,通过GetDBHandle这个函数来得到或创建相应通道的帐本数据库。然后就是将其做为参数传递给newKVLedger,看一下代码:


func (provider *Provider) openInternal(ledgerID string) (ledger.PeerLedger, error) {......// Get the versioned database (state database) for a chain/ledgervDB, err := provider.vdbProvider.GetDBHandle(ledgerID)if err != nil {return nil, err}
......l, err := newKVLedger(ledgerID, blockStore, vDB, historyDB, provider.configHistoryMgr,provider.stateListeners, provider.bookkeepingProvider,provider.initializer.DeployedChaincodeInfoProvider,provider.stats.ledgerStats(ledgerID),)
......
}

而在newKVLedger中,又把其封装到交易管理器txtmgmt中去。这在前面都有分析,看一下代码:

func newKVLedger(ledgerID string,blockStore *ledgerstorage.Store,versionedDB privacyenabledstate.DB,historyDB historydb.HistoryDB,configHistoryMgr confighistory.Mgr,stateListeners []ledger.StateListener,bookkeeperProvider bookkeeping.Provider,ccInfoProvider ledger.DeployedChaincodeInfoProvider,stats *ledgerStats,
) (*kvLedger, error) {......if err := l.initTxMgr(versionedDB, stateListeners, btlPolicy, bookkeeperProvider, ccInfoProvider); err != nil {return nil, err}
......return l, nil
}
func (l *kvLedger) initTxMgr(versionedDB privacyenabledstate.DB, stateListeners []ledger.StateListener,btlPolicy pvtdatapolicy.BTLPolicy, bookkeeperProvider bookkeeping.Provider, ccInfoProvider ledger.DeployedChaincodeInfoProvider) error {var err errorl.txtmgmt, err = lockbasedtxmgr.NewLockBasedTxMgr(l.ledgerID, versionedDB, stateListeners, btlPolicy, bookkeeperProvider, ccInfoProvider)return err
}
// NewLockBasedTxMgr constructs a new instance of NewLockBasedTxMgr
func NewLockBasedTxMgr(ledgerid string, db privacyenabledstate.DB, stateListeners []ledger.StateListener,btlPolicy pvtdatapolicy.BTLPolicy, bookkeepingProvider bookkeeping.Provider, ccInfoProvider ledger.DeployedChaincodeInfoProvider) (*LockBasedTxMgr, error) {db.Open()txmgr := &LockBasedTxMgr{ledgerid:       ledgerid,//此处将状态数据库赋值给交易管理器db:             db,stateListeners: stateListeners,ccInfoProvider: ccInfoProvider,}pvtstatePurgeMgr, err := pvtstatepurgemgmt.InstantiatePurgeMgr(ledgerid, db, btlPolicy, bookkeepingProvider)if err != nil {return nil, err}txmgr.pvtdataPurgeMgr = &pvtdataPurgeMgr{pvtstatePurgeMgr, false}txmgr.validator = valimpl.NewStatebasedValidator(txmgr, db)return txmgr, nil
}

状态数据的添加在前面的记帐节点Committer中有过介绍,这里就不再展开。在背书节点上创建的交易模拟器lockBasedTxSimulator,其中包含有查询执行器和交易读写构造器,其中构造器本身的数据结构相当复杂(具体看下面的代码),交易模拟器本身保存执行过程中的 相关数据的哈希值而不是直接提交到状态数据库。其中,数据的增删都不会直接操作数据,而只是在删除标志位上进行处理,并且,删除时是没有值动作的。下面的代码是相关的数据结构:

//fabric\core\ledger\kvledger\txmgmt\txmgr\lockbasedtxmgr\lockbased_tx_simulator.go
type lockBasedTxSimulator struct {//查询执行器lockBasedQueryExecutor//交易读写集构造器rwsetBuilder              *rwsetutil.RWSetBuilderwritePerformed            boolpvtdataQueriesPerformed   boolsimulationResultsComputed boolpaginatedQueriesPerformed bool
}
//fabric\core\ledger\kvledger\txmgmt\rwsetutil\rwset_builder.go
// RWSetBuilder helps building the read-write set
type RWSetBuilder struct {pubRwBuilderMap map[string]*nsPubRwBuilderpvtRwBuilderMap map[string]*nsPvtRwBuilder
}
type nsPubRwBuilder struct {namespace         stringreadMap           map[string]*kvrwset.KVRead //for mvcc validationwriteMap          map[string]*kvrwset.KVWritemetadataWriteMap  map[string]*kvrwset.KVMetadataWriterangeQueriesMap   map[rangeQueryKey]*kvrwset.RangeQueryInfo //for phantom read validationrangeQueriesKeys  []rangeQueryKeycollHashRwBuilder map[string]*collHashRwBuilder
}
type nsPvtRwBuilder struct {namespace         stringcollPvtRwBuilders map[string]*collPvtRwBuilder
}
type collPvtRwBuilder struct {collectionName   stringwriteMap         map[string]*kvrwset.KVWritemetadataWriteMap map[string]*kvrwset.KVMetadataWrite
}

来看一下模拟交易器的创建:

//fabric\core\ledger\kvledger\txmgmt\txmgr\lockbasedtxmgr\lockbased_query_executer.go
func newLockBasedTxSimulator(txmgr *LockBasedTxMgr, txid string) (*lockBasedTxSimulator, error) {//创建读写集合构造器对象rwsetBuilder := rwsetutil.NewRWSetBuilder()//注意和上面的对象的关系helper := newQueryHelper(txmgr, rwsetBuilder)logger.Debugf("constructing new tx simulator txid = [%s]", txid)return &lockBasedTxSimulator{lockBasedQueryExecutor{helper, txid}, rwsetBuilder, false, false, false, false}, nil
}

无论是在调用交易处理交易消息Handle.go中,还是在调用系统链码模拟执行,还是在背书节点处理交易时,其实都要调用上面这个函数来生成模拟交易器。看一下查询器中提供了哪些查询函数:

// GetState implements method in interface `ledger.QueryExecutor`
func (q *lockBasedQueryExecutor) GetState(ns string, key string) (val []byte, err error) {val, _, err = q.helper.getState(ns, key)return
}// GetStateMetadata implements method in interface `ledger.QueryExecutor`
func (q *lockBasedQueryExecutor) GetStateMetadata(namespace, key string) (map[string][]byte, error) {return q.helper.getStateMetadata(namespace, key)
}// GetStateMultipleKeys implements method in interface `ledger.QueryExecutor`
func (q *lockBasedQueryExecutor) GetStateMultipleKeys(namespace string, keys []string) ([][]byte, error) {return q.helper.getStateMultipleKeys(namespace, keys)
}// GetStateRangeScanIterator implements method in interface `ledger.QueryExecutor`
// startKey is included in the results and endKey is excluded. An empty startKey refers to the first available key
// and an empty endKey refers to the last available key. For scanning all the keys, both the startKey and the endKey
// can be supplied as empty strings. However, a full scan shuold be used judiciously for performance reasons.
func (q *lockBasedQueryExecutor) GetStateRangeScanIterator(namespace string, startKey string, endKey string) (commonledger.ResultsIterator, error) {return q.helper.getStateRangeScanIterator(namespace, startKey, endKey)
}// GetStateRangeScanIteratorWithMetadata implements method in interface `ledger.QueryExecutor`
// startKey is included in the results and endKey is excluded. An empty startKey refers to the first available key
// and an empty endKey refers to the last available key. For scanning all the keys, both the startKey and the endKey
// can be supplied as empty strings. However, a full scan shuold be used judiciously for performance reasons.
// metadata is a map of additional query parameters
func (q *lockBasedQueryExecutor) GetStateRangeScanIteratorWithMetadata(namespace string, startKey string, endKey string, metadata map[string]interface{}) (ledger.QueryResultsIterator, error) {return q.helper.getStateRangeScanIteratorWithMetadata(namespace, startKey, endKey, metadata)
}// ExecuteQuery implements method in interface `ledger.QueryExecutor`
//交易数据的富查询,只支持CouchDB
func (q *lockBasedQueryExecutor) ExecuteQuery(namespace, query string) (commonledger.ResultsIterator, error) {return q.helper.executeQuery(namespace, query)
}// ExecuteQueryWithMetadata implements method in interface `ledger.QueryExecutor`
func (q *lockBasedQueryExecutor) ExecuteQueryWithMetadata(namespace, query string, metadata map[string]interface{}) (ledger.QueryResultsIterator, error) {return q.helper.executeQueryWithMetadata(namespace, query, metadata)
}// GetPrivateData implements method in interface `ledger.QueryExecutor`
func (q *lockBasedQueryExecutor) GetPrivateData(namespace, collection, key string) ([]byte, error) {return q.helper.getPrivateData(namespace, collection, key)
}func (q *lockBasedQueryExecutor) GetPrivateDataHash(namespace, collection, key string) ([]byte, error) {valueHash, _, err := q.helper.getPrivateDataValueHash(namespace, collection, key)return valueHash, err
}// GetPrivateDataMetadata implements method in interface `ledger.QueryExecutor`
func (q *lockBasedQueryExecutor) GetPrivateDataMetadata(namespace, collection, key string) (map[string][]byte, error) {return q.helper.getPrivateDataMetadata(namespace, collection, key)
}// GetPrivateDataMetadataByHash implements method in interface `ledger.QueryExecutor`
func (q *lockBasedQueryExecutor) GetPrivateDataMetadataByHash(namespace, collection string, keyhash []byte) (map[string][]byte, error) {return q.helper.getPrivateDataMetadataByHash(namespace, collection, keyhash)
}// GetPrivateDataMultipleKeys implements method in interface `ledger.QueryExecutor`
func (q *lockBasedQueryExecutor) GetPrivateDataMultipleKeys(namespace, collection string, keys []string) ([][]byte, error) {return q.helper.getPrivateDataMultipleKeys(namespace, collection, keys)
}// GetPrivateDataRangeScanIterator implements method in interface `ledger.QueryExecutor`
func (q *lockBasedQueryExecutor) GetPrivateDataRangeScanIterator(namespace, collection, startKey, endKey string) (commonledger.ResultsIterator, error) {return q.helper.getPrivateDataRangeScanIterator(namespace, collection, startKey, endKey)
}// ExecuteQueryOnPrivateData implements method in interface `ledger.QueryExecutor`隐私数据的富查询,只支持CouchDB
func (q *lockBasedQueryExecutor) ExecuteQueryOnPrivateData(namespace, collection, query string) (commonledger.ResultsIterator, error) {return q.helper.executeQueryOnPrivateData(namespace, collection, query)
}
// Done implements method in interface `ledger.QueryExecutor`
func (q *lockBasedQueryExecutor) Done() {logger.Debugf("Done with transaction simulation / query execution [%s]", q.txid)q.helper.done()
}

再看一下状态数据库的修改函数:

// SetState implements method in interface `ledger.TxSimulator`
func (s *lockBasedTxSimulator) SetState(ns string, key string, value []byte) error {if err := s.checkWritePrecondition(key, value); err != nil {return err}s.rwsetBuilder.AddToWriteSet(ns, key, value)return nil
}// DeleteState implements method in interface `ledger.TxSimulator`
func (s *lockBasedTxSimulator) DeleteState(ns string, key string) error {return s.SetState(ns, key, nil)
}// SetStateMultipleKeys implements method in interface `ledger.TxSimulator`
func (s *lockBasedTxSimulator) SetStateMultipleKeys(namespace string, kvs map[string][]byte) error {for k, v := range kvs {if err := s.SetState(namespace, k, v); err != nil {return err}}return nil
}// SetStateMetadata implements method in interface `ledger.TxSimulator`
func (s *lockBasedTxSimulator) SetStateMetadata(namespace, key string, metadata map[string][]byte) error {if err := s.checkWritePrecondition(key, nil); err != nil {return err}s.rwsetBuilder.AddToMetadataWriteSet(namespace, key, metadata)return nil
}// DeleteStateMetadata implements method in interface `ledger.TxSimulator`
func (s *lockBasedTxSimulator) DeleteStateMetadata(namespace, key string) error {return s.SetStateMetadata(namespace, key, nil)
}// SetPrivateData implements method in interface `ledger.TxSimulator`
func (s *lockBasedTxSimulator) SetPrivateData(ns, coll, key string, value []byte) error {if err := s.helper.validateCollName(ns, coll); err != nil {return err}if err := s.checkWritePrecondition(key, value); err != nil {return err}s.writePerformed = trues.rwsetBuilder.AddToPvtAndHashedWriteSet(ns, coll, key, value)return nil
}// DeletePrivateData implements method in interface `ledger.TxSimulator`
func (s *lockBasedTxSimulator) DeletePrivateData(ns, coll, key string) error {return s.SetPrivateData(ns, coll, key, nil)
}// SetPrivateDataMultipleKeys implements method in interface `ledger.TxSimulator`
func (s *lockBasedTxSimulator) SetPrivateDataMultipleKeys(ns, coll string, kvs map[string][]byte) error {for k, v := range kvs {if err := s.SetPrivateData(ns, coll, k, v); err != nil {return err}}return nil
}// GetPrivateDataRangeScanIterator implements method in interface `ledger.TxSimulator`
func (s *lockBasedTxSimulator) GetPrivateDataRangeScanIterator(namespace, collection, startKey, endKey string) (commonledger.ResultsIterator, error) {if err := s.checkBeforePvtdataQueries(); err != nil {return nil, err}return s.lockBasedQueryExecutor.GetPrivateDataRangeScanIterator(namespace, collection, startKey, endKey)
}// SetPrivateDataMetadata implements method in interface `ledger.TxSimulator`
func (s *lockBasedTxSimulator) SetPrivateDataMetadata(namespace, collection, key string, metadata map[string][]byte) error {if err := s.helper.validateCollName(namespace, collection); err != nil {return err}if err := s.checkWritePrecondition(key, nil); err != nil {return err}s.rwsetBuilder.AddToHashedMetadataWriteSet(namespace, collection, key, metadata)return nil
}// DeletePrivateMetadata implements method in interface `ledger.TxSimulator`
func (s *lockBasedTxSimulator) DeletePrivateDataMetadata(namespace, collection, key string) error {return s.SetPrivateDataMetadata(namespace, collection, key, nil)
}// ExecuteQueryOnPrivateData implements method in interface `ledger.TxSimulator`
func (s *lockBasedTxSimulator) ExecuteQueryOnPrivateData(namespace, collection, query string) (commonledger.ResultsIterator, error) {if err := s.checkBeforePvtdataQueries(); err != nil {return nil, err}return s.lockBasedQueryExecutor.ExecuteQueryOnPrivateData(namespace, collection, query)
}// GetStateRangeScanIteratorWithMetadata implements method in interface `ledger.QueryExecutor`
func (s *lockBasedTxSimulator) GetStateRangeScanIteratorWithMetadata(namespace string, startKey string, endKey string, metadata map[string]interface{}) (ledger.QueryResultsIterator, error) {if err := s.checkBeforePaginatedQueries(); err != nil {return nil, err}return s.lockBasedQueryExecutor.GetStateRangeScanIteratorWithMetadata(namespace, startKey, endKey, metadata)
}// ExecuteQueryWithMetadata implements method in interface `ledger.QueryExecutor`
func (s *lockBasedTxSimulator) ExecuteQueryWithMetadata(namespace, query string, metadata map[string]interface{}) (ledger.QueryResultsIterator, error) {if err := s.checkBeforePaginatedQueries(); err != nil {return nil, err}return s.lockBasedQueryExecutor.ExecuteQueryWithMetadata(namespace, query, metadata)
}// GetTxSimulationResults implements method in interface `ledger.TxSimulator`
func (s *lockBasedTxSimulator) GetTxSimulationResults() (*ledger.TxSimulationResults, error) {if s.simulationResultsComputed {return nil, errors.New("this function should only be called once on a transaction simulator instance")}defer func() { s.simulationResultsComputed = true }()logger.Debugf("Simulation completed, getting simulation results")if s.helper.err != nil {return nil, s.helper.err}s.helper.addRangeQueryInfo()return s.rwsetBuilder.GetTxSimulationResults()
}// ExecuteUpdate implements method in interface `ledger.TxSimulator`
func (s *lockBasedTxSimulator) ExecuteUpdate(query string) error {return errors.New("not supported")
}

注释很清楚,这里就不再画蛇添足。
通过上面的简单分析,就基本把状态数据库分析完成了,这里没有分析其中数据查询过程中对数据结构处理,特别是对默克尔树的提交和更新的过程,这个没有什么特别之处,在公链中已经普及了,这里就不再一一再进行赘述,如果有兴趣,可以看一看相关的代码。

七、历史数据库

而在NewProvider函数中,先创建idStore,然后有历史数据库的创建:


func NewProvider() (ledger.PeerLedgerProvider, error) {......// Initialize the history database (index for history of values by key)historydbProvider := historyleveldb.NewHistoryDBProvider()
......return provider, nil
}

历史数据库其实是记录交易中每个状态数据的历史信息的。保存在LevelDB数据库中,其数据的规则是{ns,key,blocknum,trannum}来表示,其含义为名字空间、写入状态数据的键、区块号和区块内的交易号。历史信息实际存储的值是空字节数组。
历史数据库的创建和状态数据库类似,也是先创建historyleveldb.NewHistoryDBProvider()然后再在openInternal函数中通过provider.historydbProvider.GetDBHandle(ledgerID)这个函数来得到相应的实际的数据库操作的句柄。
在历史数据库中提交数据,使用kvLedger的l.historyDB.Commit(block)方法,遍历区块中的所有交易数据,过滤出背书的有效交易提交到历史数据库中,其规则是,键的信息是使用[]byte{0x00}做为分割历史数据信息的字节数组{ns,wirteKey,blockNo+tranNo}来组成组合键compositeHistoryKey,值为空,即emptyValue([]byte{})。

八、transient隐私数据库

transient隐私数据库数据库可以认为是一个临时的数据库,它存储在本地的LevelDB数据库中。当隐私数据真正提交到帐本后,这里面的数据就可以删除了。同时,也可以配置指定的超时时间和高度。当达到上述的两个条件后,此数据库也会删除提交不成功的孤儿记录并删除指定高度下的隐私数据,只保留当前帐本高度指定内的隐私数据读写集。在Fabric的配置文件中,也可以配置超时时间为0即永远不删除。看一下相应的数据结构的定义:

//fabric\gossip\privdata\coordinator.go
// TransientStore holds private data that the corresponding blocks haven't been committed yet into the ledger
type TransientStore interface {// PersistWithConfig stores the private write set of a transaction along with the collection config// in the transient store based on txid and the block height the private data was received atPersistWithConfig(txid string, blockHeight uint64, privateSimulationResultsWithConfig *transientstore2.TxPvtReadWriteSetWithConfigInfo) error// Persist stores the private write set of a transaction in the transient storePersist(txid string, blockHeight uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error// GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private// write sets persisted from different endorsers (via Gossip)GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (transientstore.RWSetScanner, error)// PurgeByTxids removes private read-write sets for a given set of transactions from the// transient storePurgeByTxids(txids []string) error// PurgeByHeight removes private write sets at block height lesser than// a given maxBlockNumToRetain. In other words, Purge only retains private write sets// that were persisted at block height of maxBlockNumToRetain or higher. Though the private// write sets stored in transient store is removed by coordinator using PurgebyTxids()// after successful block commit, PurgeByHeight() is still required to remove orphan entries (as// transaction that gets endorsed may not be submitted by the client for commit)PurgeByHeight(maxBlockNumToRetain uint64) error
}
//fabric\core\transientstore\store.go
type Store interface {// Persist stores the private write set of a transaction in the transient store// based on txid and the block height the private data was received atPersist(txid string, blockHeight uint64, privateSimulationResults *rwset.TxPvtReadWriteSet) error// TODO: Once the related gossip changes are made as per FAB-5096, remove the above function// and rename the below function to Persist form PersistWithConfig.// PersistWithConfig stores the private write set of a transaction along with the collection config// in the transient store based on txid and the block height the private data was received atPersistWithConfig(txid string, blockHeight uint64, privateSimulationResultsWithConfig *transientstore.TxPvtReadWriteSetWithConfigInfo) error// GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private// write sets persisted from different endorsers (via Gossip)GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (RWSetScanner, error)// PurgeByTxids removes private write sets of a given set of transactions from the// transient storePurgeByTxids(txids []string) error// PurgeByHeight removes private write sets at block height lesser than// a given maxBlockNumToRetain. In other words, Purge only retains private write sets// that were persisted at block height of maxBlockNumToRetain or higher. Though the private// write sets stored in transient store is removed by coordinator using PurgebyTxids()// after successful block commit, PurgeByHeight() is still required to remove orphan entries (as// transaction that gets endorsed may not be submitted by the client for commit)PurgeByHeight(maxBlockNumToRetain uint64) error// GetMinTransientBlkHt returns the lowest block height remaining in transient storeGetMinTransientBlkHt() (uint64, error)Shutdown()
}
// store holds an instance of a levelDB.
type store struct {db       *leveldbhelper.DBHandleledgerID string
}

然后看一下这个隐私数据库是在哪里创建的,看一下代码:

// createChain creates a new chain object and insert it into the chains
func createChain(cid string,ledger ledger.PeerLedger,cb *common.Block,ccp ccprovider.ChaincodeProvider,sccp sysccprovider.SystemChaincodeProvider,pm txvalidator.PluginMapper,
) error {chanConf, err := retrievePersistedChannelConfig(ledger)
......// TODO: does someone need to call Close() on the transientStoreFactory at shutdown of the peer?//创建隐私数据库store, err := TransientStoreFactory.OpenStore(bundle.ConfigtxValidator().ChainID())if err != nil {return errors.Wrapf(err, "[channel %s] failed opening transient store", bundle.ConfigtxValidator().ChainID())}csStoreSupport := &CollectionSupport{PeerLedger: ledger,}//创建隐私数据集合存储对象simpleCollectionStore := privdata.NewSimpleCollectionStore(csStoreSupport)oac := service.OrdererAddressConfig{Addresses:        ordererAddresses,AddressesByOrg:   ordererAddressesByOrg,Organizations:    ordererOrganizations,AddressOverrides: ordererAddressOverrides,}service.GetGossipService().InitializeChannel(bundle.ConfigtxValidator().ChainID(), oac, service.Support{//验证器和提交器Validator:            validator,Committer:            c,//transient隐私数据库存储对象Store:                store,//隐私数据集合存储对象Cs:                   simpleCollectionStore,IdDeserializeFactory: csStoreSupport,CapabilityProvider:   cp,})......return nil
}

在joinChannel函数中,会调用CreateChainFromBlock这个函数,这个函数会调用createChain,它就是创建隐私数据库的。另外一个就是在前面反复提到的Peer节点启动时,strat.go函数中的serve函数中调用Initialize函数,同样也会调用这个createChain函数。

//core/peer/peer.go
func Initialize(init func(string), ccp ccprovider.ChaincodeProvider, sccp sysccprovider.SystemChaincodeProvider,pm txvalidator.PluginMapper, pr *platforms.Registry, deployedCCInfoProvider ledger.DeployedChaincodeInfoProvider,membershipProvider ledger.MembershipInfoProvider, metricsProvider metrics.Provider) {......// Create a chain if we get a valid ledger with config blockif err = createChain(cid, ledger, cb, ccp, sccp, pm); err != nil {peerLogger.Errorf("Failed to load chain %s(%s)", cid, err)peerLogger.Debugf("Error reloading chain %s with message %s. We continue to the next chain rather than abort.", cid, err)continue}InitChain(cid)}
}// CreateChainFromBlock creates a new chain from config block
func CreateChainFromBlock(cb *common.Block, ccp ccprovider.ChaincodeProvider, sccp sysccprovider.SystemChaincodeProvider) error {......return createChain(cid, l, cb, ccp, sccp, pluginMapper)
}

好,接着分析创建的过程:

//core/peer/peer.go
func (sp *storeProvider) OpenStore(ledgerID string) (transientstore.Store, error) {sp.Lock()defer sp.Unlock()if sp.StoreProvider == nil {sp.StoreProvider = transientstore.NewStoreProvider()}store, err := sp.StoreProvider.OpenStore(ledgerID)if err == nil {sp.stores[ledgerID] = store}return store, err
}
// OpenStore returns a handle to a ledgerId in Store
func (provider *storeProvider) OpenStore(ledgerID string) (Store, error) {dbHandle := provider.dbProvider.GetDBHandle(ledgerID)return &store{db: dbHandle, ledgerID: ledgerID}, nil
}

会发现创建的过程和其它数据库没有两样,先搞一个提供器,然后再用GetDBHandle根据ledgerID创建或者取得一个相应的数据库的句柄。然后再使用simpleCollectionStore得到隐私数据集合。然后再通过Support对象将二者封装,传递到初始化通道的函数中去,即:

// InitializeChannel allocates the state provider and should be invoked once per channel per execution
func (g *gossipServiceImpl) InitializeChannel(chainID string, oac OrdererAddressConfig, support Support) {g.lock.Lock()defer g.lock.Unlock()
......storeSupport := &DataStoreSupport{TransientStore: support.Store,Committer:      support.Committer,}// Initialize private data fetcherdataRetriever := privdata2.NewDataRetriever(storeSupport)collectionAccessFactory := privdata2.NewCollectionAccessFactory(support.IdDeserializeFactory)fetcher := privdata2.NewPuller(g.metrics.PrivdataMetrics, support.Cs, g.gossipSvc, dataRetriever,collectionAccessFactory, chainID, privdata2.GetBtlPullMargin())coordinatorConfig := privdata2.CoordinatorConfig{TransientBlockRetention:        privdata2.GetTransientBlockRetention(),PullRetryThreshold:             viper.GetDuration("peer.gossip.pvtData.pullRetryThreshold"),SkipPullingInvalidTransactions: viper.GetBool("peer.gossip.pvtData.skipPullingInvalidTransactionsDuringCommit"),}coordinator := privdata2.NewCoordinator(privdata2.Support{ChainID:            chainID,CollectionStore:    support.Cs,Validator:          support.Validator,TransientStore:     support.Store,Committer:          support.Committer,Fetcher:            fetcher,CapabilityProvider: support.CapabilityProvider,}, g.createSelfSignedData(), g.metrics.PrivdataMetrics, coordinatorConfig)reconcilerConfig := privdata2.GetReconcilerConfig()var reconciler privdata2.PvtDataReconcilerif reconcilerConfig.IsEnabled {reconciler = privdata2.NewReconciler(chainID, g.metrics.PrivdataMetrics,support.Committer, fetcher, reconcilerConfig)} else {reconciler = &privdata2.NoOpReconciler{}}pushAckTimeout := viper.GetDuration("peer.gossip.pvtData.pushAckTimeout")g.privateHandlers[chainID] = privateHandler{support:     support,coordinator: coordinator,distributor: privdata2.NewDistributor(chainID, g, collectionAccessFactory, g.metrics.PrivdataMetrics, pushAckTimeout),reconciler:  reconciler,}g.privateHandlers[chainID].reconciler.Start()g.chains[chainID] = state.NewGossipStateProvider(chainID, servicesAdapter, coordinator,g.metrics.StateMetrics, getStateConfiguration())......}

transient隐私数据库创建成功后,如何添加数据到其中呢?
在start.go的server函数中:

privDataDist := func(channel string, txID string, privateData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error {return service.GetGossipService().DistributePrivateData(channel, txID, privateData, blkHt)}......
serverEndorser := endorser.NewEndorserServer(privDataDist, endorserSupport, pr, metricsProvider)

而在背书节点中会调用下面的函数:

// NewEndorserServer creates and returns a new Endorser server instance.
func NewEndorserServer(privDist privateDataDistributor, s Support, pr *platforms.Registry, metricsProv metrics.Provider) *Endorser {e := &Endorser{//此处将上面的函数指针赋值给变量在下面调用distributePrivateData: privDist,s:                     s,PlatformRegistry:      pr,PvtRWSetAssembler:     &rwSetAssembler{},Metrics:               NewEndorserMetrics(metricsProv),}return e
}
func (e *Endorser) SimulateProposal(txParams *ccprovider.TransactionParams, cid *pb.ChaincodeID) (ccprovider.ChaincodeDefinition, *pb.Response, []byte, *pb.ChaincodeEvent, error) {......if err := e.distributePrivateData(txParams.ChannelID, txParams.TxID, pvtDataWithConfig, endorsedAt); err != nil {return nil, nil, nil, nil, err}......
}

看一下这个函数:

//fabric\gossip\service\gossip_service.go
// DistributePrivateData distribute private read write set inside the channel based on the collections policies
func (g *gossipServiceImpl) DistributePrivateData(chainID string, txID string, privData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error {g.lock.RLock()handler, exists := g.privateHandlers[chainID]g.lock.RUnlock()if !exists {return errors.Errorf("No private data handler for %s", chainID)}if err := handler.distributor.Distribute(txID, privData, blkHt); err != nil {logger.Error("Failed to distributed private collection, txID", txID, "channel", chainID, "due to", err)return err}if err := handler.coordinator.StorePvtData(txID, privData, blkHt); err != nil {logger.Error("Failed to store private data into transient store, txID",txID, "channel", chainID, "due to", err)return err}return nil
}
// StorePvtData used to persist private date into transient store
func (c *coordinator) StorePvtData(txID string, privData *transientstore2.TxPvtReadWriteSetWithConfigInfo, blkHeight uint64) error {return c.TransientStore.PersistWithConfig(txID, blkHeight, privData)
}
func (s *store) PersistWithConfig(txid string, blockHeight uint64,privateSimulationResultsWithConfig *transientstore.TxPvtReadWriteSetWithConfigInfo) error {logger.Debugf("Persisting private data to transient store for txid [%s] at block height [%d]", txid, blockHeight)dbBatch := leveldbhelper.NewUpdateBatch()// Create compositeKey with appropriate prefix, txid, uuid and blockHeight// Due to the fact that the txid may have multiple private write sets persisted from different// endorsers (via Gossip), we postfix an uuid with the txid to avoid collision.uuid := util.GenerateUUID()compositeKeyPvtRWSet := createCompositeKeyForPvtRWSet(txid, uuid, blockHeight)privateSimulationResultsWithConfigBytes, err := proto.Marshal(privateSimulationResultsWithConfig)if err != nil {return err}// Note that some rwset.TxPvtReadWriteSet may exist in the transient store immediately after// upgrading the peer to v1.2. In order to differentiate between new proto and old proto while// retrieving, a nil byte is prepended to the new proto, i.e., privateSimulationResultsWithConfigBytes,// as a marshaled message can never start with a nil byte. In v1.3, we can avoid prepending the// nil byte.value := append([]byte{nilByte}, privateSimulationResultsWithConfigBytes...)dbBatch.Put(compositeKeyPvtRWSet, value)// Create two index: (i) by txid, and (ii) by height// Create compositeKey for purge index by height with appropriate prefix, blockHeight,// txid, uuid and store the compositeKey (purge index) with a nil byte as value. Note that// the purge index is used to remove orphan entries in the transient store (which are not removed// by PurgeTxids()) using BTL policy by PurgeByHeight(). Note that orphan entries are due to transaction// that gets endorsed but not submitted by the client for commit)compositeKeyPurgeIndexByHeight := createCompositeKeyForPurgeIndexByHeight(blockHeight, txid, uuid)dbBatch.Put(compositeKeyPurgeIndexByHeight, emptyValue)// Create compositeKey for purge index by txid with appropriate prefix, txid, uuid,// blockHeight and store the compositeKey (purge index) with a nil byte as value.// Though compositeKeyPvtRWSet itself can be used to purge private write set by txid,// we create a separate composite key with a nil byte as value. The reason is that// if we use compositeKeyPvtRWSet, we unnecessarily read (potentially large) private write// set associated with the key from db. Note that this purge index is used to remove non-orphan// entries in the transient store and is used by PurgeTxids()// Note: We can create compositeKeyPurgeIndexByTxid by just replacing the prefix of compositeKeyPvtRWSet// with purgeIndexByTxidPrefix. For code readability and to be expressive, we use a// createCompositeKeyForPurgeIndexByTxid() instead.compositeKeyPurgeIndexByTxid := createCompositeKeyForPurgeIndexByTxid(txid, uuid, blockHeight)dbBatch.Put(compositeKeyPurgeIndexByTxid, emptyValue)return s.db.WriteBatch(dbBatch, true)
}

这里需要注意的是老版本的Persist函数被推荐不使用了。
分析了添加,那么如何查询呢?首先可以根据交易ID查询,看下面的代码:


//fabric\core\transientstore\store.go
// GetTxPvtRWSetByTxid returns an iterator due to the fact that the txid may have multiple private
// write sets persisted from different endorsers.
func (s *store) GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (RWSetScanner, error) {logger.Debugf("Getting private data from transient store for transaction %s", txid)// Construct startKey and endKey to do an range querystartKey := createTxidRangeStartKey(txid)endKey := createTxidRangeEndKey(txid)iter := s.db.GetIterator(startKey, endKey)return &RwsetScanner{txid, iter, filter}, nil
}
// Next moves the iterator to the next key/value pair.
// It returns whether the iterator is exhausted.
// TODO: Once the related gossip changes are made as per FAB-5096, rename this function to Next
func (scanner *RwsetScanner) NextWithConfig() (*EndorserPvtSimulationResultsWithConfig, error) {if !scanner.dbItr.Next() {return nil, nil}dbKey := scanner.dbItr.Key()dbVal := scanner.dbItr.Value()_, blockHeight, err := splitCompositeKeyOfPvtRWSet(dbKey)if err != nil {return nil, err}txPvtRWSet := &rwset.TxPvtReadWriteSet{}filteredTxPvtRWSet := &rwset.TxPvtReadWriteSet{}txPvtRWSetWithConfig := &transientstore.TxPvtReadWriteSetWithConfigInfo{}if dbVal[0] == nilByte {// new proto, i.e., TxPvtReadWriteSetWithConfigInfoif err := proto.Unmarshal(dbVal[1:], txPvtRWSetWithConfig); err != nil {return nil, err}filteredTxPvtRWSet = trimPvtWSet(txPvtRWSetWithConfig.GetPvtRwset(), scanner.filter)configs, err := trimPvtCollectionConfigs(txPvtRWSetWithConfig.CollectionConfigs, scanner.filter)if err != nil {return nil, err}txPvtRWSetWithConfig.CollectionConfigs = configs} else {// old proto, i.e., TxPvtReadWriteSetif err := proto.Unmarshal(dbVal, txPvtRWSet); err != nil {return nil, err}filteredTxPvtRWSet = trimPvtWSet(txPvtRWSet, scanner.filter)}txPvtRWSetWithConfig.PvtRwset = filteredTxPvtRWSetreturn &EndorserPvtSimulationResultsWithConfig{ReceivedAtBlockHeight:          blockHeight,PvtSimulationResultsWithConfig: txPvtRWSetWithConfig,}, nil
}

同样这里有一个不建议的Next,和上面的存储类似。
同样,它还提供了使用摘要信息来得到隐私数据集合的方法:

//fabric\gossip\privdata\dataretriever.go
// CollectionRWSet retrieves for give digest relevant private data if
// available otherwise returns nil, bool which is true if data fetched from ledger and false if was fetched from transient store, and an error
func (dr *dataRetriever) CollectionRWSet(digests []*gossip2.PvtDataDigest, blockNum uint64) (Dig2PvtRWSetWithConfig, bool, error) {height, err := dr.store.LedgerHeight()if err != nil {// if there is an error getting info from the ledger, we need to try to read from transient storereturn nil, false, errors.Wrap(err, "wasn't able to read ledger height")}//检查帐本高度和块号是否匹配if height <= blockNum {logger.Debug("Current ledger height ", height, "is below requested block sequence number",blockNum, "retrieving private data from transient store")}if height <= blockNum { // Check whenever current ledger height is equal or below block sequence num.results := make(Dig2PvtRWSetWithConfig)for _, dig := range digests {filter := map[string]ledger.PvtCollFilter{dig.Namespace: map[string]bool{dig.Collection: true,},}pvtRWSet, err := dr.fromTransientStore(dig, filter)if err != nil {logger.Errorf("couldn't read from transient store private read-write set, "+"digest %+v, because of %s", dig, err)continue}results[common.DigKey{Namespace:  dig.Namespace,Collection: dig.Collection,TxId:       dig.TxId,BlockSeq:   dig.BlockSeq,SeqInBlock: dig.SeqInBlock,}] = pvtRWSet}return results, false, nil}// Since ledger height is above block sequence number private data is might be available in the ledgerresults, err := dr.fromLedger(digests, blockNum)return results, true, err
}
func (dr *dataRetriever) fromTransientStore(dig *gossip2.PvtDataDigest, filter map[string]ledger.PvtCollFilter) (*util.PrivateRWSetWithConfig, error) {results := &util.PrivateRWSetWithConfig{}//得到隐私数据集合数据it, err := dr.store.GetTxPvtRWSetByTxid(dig.TxId, filter)if err != nil {return nil, errors.Errorf("was not able to retrieve private data from transient store, namespace <%s>"+", collection name %s, txID <%s>, due to <%s>", dig.Namespace, dig.Collection, dig.TxId, err)}defer it.Close()maxEndorsedAt := uint64(0)for {res, err := it.NextWithConfig()if err != nil {return nil, errors.Errorf("error getting next element out of private data iterator, namespace <%s>"+", collection name <%s>, txID <%s>, due to <%s>", dig.Namespace, dig.Collection, dig.TxId, err)}if res == nil {return results, nil}rws := res.PvtSimulationResultsWithConfigif rws == nil {logger.Debug("Skipping nil PvtSimulationResultsWithConfig received at block height", res.ReceivedAtBlockHeight)continue}txPvtRWSet := rws.PvtRwsetif txPvtRWSet == nil {logger.Debug("Skipping empty PvtRwset of PvtSimulationResultsWithConfig received at block height", res.ReceivedAtBlockHeight)continue}colConfigs, found := rws.CollectionConfigs[dig.Namespace]if !found {logger.Error("No collection config was found for chaincode", dig.Namespace, "collection name",dig.Namespace, "txID", dig.TxId)continue}configs := extractCollectionConfig(colConfigs, dig.Collection)if configs == nil {logger.Error("No collection config was found for collection", dig.Collection,"namespace", dig.Namespace, "txID", dig.TxId)continue}//过滤数据得到结果pvtRWSet := dr.extractPvtRWsets(txPvtRWSet.NsPvtRwset, dig.Namespace, dig.Collection)if rws.EndorsedAt >= maxEndorsedAt {maxEndorsedAt = rws.EndorsedAtresults.CollectionConfig = configs}results.RWSet = append(results.RWSet, pvtRWSet...)}
}

查询OK后,可以看看在提交后如何删除此处的隐私数据。前面提到过,时间过期,块号范围以外的,都可以清除,清除的方法如下:

//fabric\gossip\privdata\coordinator.go
// StoreBlock stores block with private data into the ledger
func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDataCollections) error {......if len(blockAndPvtData.PvtData) > 0 {// Finally, purge all transactions in block - valid or not valid.if err := c.PurgeByTxids(privateInfo.txns); err != nil {logger.Error("Purging transactions", privateInfo.txns, "failed:", err)}}seq := block.Header.Numberif seq%c.transientBlockRetention == 0 && seq > c.transientBlockRetention {err := c.PurgeByHeight(seq - c.transientBlockRetention)if err != nil {logger.Error("Failed purging data from transient store at block", seq, ":", err)}}c.reportPurgeDuration(time.Since(purgeStart))return nil
}

根据ID删除:

// PurgeByTxids removes private write sets of a given set of transactions from the
// transient store. PurgeByTxids() is expected to be called by coordinator after
// committing a block to ledger.
func (s *store) PurgeByTxids(txids []string) error {logger.Debug("Purging private data from transient store for committed txids")dbBatch := leveldbhelper.NewUpdateBatch()for _, txid := range txids {// Construct startKey and endKey to do an range querystartKey := createPurgeIndexByTxidRangeStartKey(txid)endKey := createPurgeIndexByTxidRangeEndKey(txid)iter := s.db.GetIterator(startKey, endKey)// Get all txid and uuid from above result and remove it from transient store (both// write set and the corresponding indexes.for iter.Next() {// For each entry, remove the private read-write set and corresponding indexes// Remove private write setcompositeKeyPurgeIndexByTxid := iter.Key()// Note: We can create compositeKeyPvtRWSet by just replacing the prefix of compositeKeyPurgeIndexByTxid// with  prwsetPrefix. For code readability and to be expressive, we split and create again.uuid, blockHeight, err := splitCompositeKeyOfPurgeIndexByTxid(compositeKeyPurgeIndexByTxid)if err != nil {return err}compositeKeyPvtRWSet := createCompositeKeyForPvtRWSet(txid, uuid, blockHeight)dbBatch.Delete(compositeKeyPvtRWSet)// Remove purge index -- purgeIndexByHeightcompositeKeyPurgeIndexByHeight := createCompositeKeyForPurgeIndexByHeight(blockHeight, txid, uuid)dbBatch.Delete(compositeKeyPurgeIndexByHeight)// Remove purge index -- purgeIndexByTxiddbBatch.Delete(compositeKeyPurgeIndexByTxid)}iter.Release()}// If peer fails before/while writing the batch to golevelDB, these entries will be// removed as per BTL policy later by PurgeByHeight()return s.db.WriteBatch(dbBatch, true)
}

根据高度删除:


// PurgeByHeight removes private write sets at block height lesser than
// a given maxBlockNumToRetain. In other words, Purge only retains private write sets
// that were persisted at block height of maxBlockNumToRetain or higher. Though the private
// write sets stored in transient store is removed by coordinator using PurgebyTxids()
// after successful block commit, PurgeByHeight() is still required to remove orphan entries (as
// transaction that gets endorsed may not be submitted by the client for commit)
func (s *store) PurgeByHeight(maxBlockNumToRetain uint64) error {logger.Debugf("Purging orphaned private data from transient store received prior to block [%d]", maxBlockNumToRetain)// Do a range query with 0 as startKey and maxBlockNumToRetain-1 as endKeystartKey := createPurgeIndexByHeightRangeStartKey(0)endKey := createPurgeIndexByHeightRangeEndKey(maxBlockNumToRetain - 1)iter := s.db.GetIterator(startKey, endKey)dbBatch := leveldbhelper.NewUpdateBatch()// Get all txid and uuid from above result and remove it from transient store (both// write set and the corresponding index.for iter.Next() {// For each entry, remove the private read-write set and corresponding indexes// Remove private write setcompositeKeyPurgeIndexByHeight := iter.Key()txid, uuid, blockHeight, err := splitCompositeKeyOfPurgeIndexByHeight(compositeKeyPurgeIndexByHeight)if err != nil {return err}logger.Debugf("Purging from transient store private data simulated at block [%d]: txid [%s] uuid [%s]", blockHeight, txid, uuid)compositeKeyPvtRWSet := createCompositeKeyForPvtRWSet(txid, uuid, blockHeight)dbBatch.Delete(compositeKeyPvtRWSet)// Remove purge index -- purgeIndexByTxidcompositeKeyPurgeIndexByTxid := createCompositeKeyForPurgeIndexByTxid(txid, uuid, blockHeight)dbBatch.Delete(compositeKeyPurgeIndexByTxid)// Remove purge index -- purgeIndexByHeightdbBatch.Delete(compositeKeyPurgeIndexByHeight)}iter.Release()return s.db.WriteBatch(dbBatch, true)
}

在前面提到过,如果隐私数据缺失是可以远程拉取丢失的相关数据的,那么相关的代码是怎么工作的呢?同样在前面的StoreBlock(fabric\ gossip\ privdata\ coordinator.go)函数中:

// StoreBlock stores block with private data into the ledger
func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDataCollections) error {......//获得隐私数据信息privateInfo, err := c.listMissingPrivateData(block, ownedRWsets)if err != nil {logger.Warning(err)return err}......if c.skipPullingInvalidTransactions {txsFilter := txValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])for missingRWS := range privateInfo.missingKeys {if txsFilter[missingRWS.seqInBlock] != uint8(peer.TxValidationCode_VALID) {blockAndPvtData.MissingPvtData.Add(missingRWS.seqInBlock, missingRWS.namespace, missingRWS.collection, true)delete(privateInfo.missingKeys, missingRWS)}}}c.reportListMissingPrivateDataDuration(time.Since(listMissingStart))......limit := startPull.Add(retryThresh)for len(privateInfo.missingKeys) > 0 && time.Now().Before(limit) {//从其它节点获取隐私数据c.fetchFromPeers(block.Header.Number, ownedRWsets, privateInfo)// If succeeded to fetch everything, no need to sleep before// retryif len(privateInfo.missingKeys) == 0 {break}time.Sleep(pullRetrySleepInterval)}
......// populate the private RWSets passed to the ledgerfor seqInBlock, nsRWS := range ownedRWsets.bySeqsInBlock() {rwsets := nsRWS.toRWSet()logger.Debugf("[%s] Added %d namespace private write sets for block [%d], tran [%d]", c.ChainID, len(rwsets.NsPvtRwset), block.Header.Number, seqInBlock)blockAndPvtData.PvtData[seqInBlock] = &ledger.TxPvtData{SeqInBlock: seqInBlock,WriteSet:   rwsets,}}// populate missing RWSets to be passed to the ledgerfor missingRWS := range privateInfo.missingKeys {blockAndPvtData.MissingPvtData.Add(missingRWS.seqInBlock, missingRWS.namespace, missingRWS.collection, true)}// populate missing RWSets for ineligible collections to be passed to the ledgerfor _, missingRWS := range privateInfo.missingRWSButIneligible {blockAndPvtData.MissingPvtData.Add(missingRWS.seqInBlock, missingRWS.namespace, missingRWS.collection, false)}// commit block and private datacommitStart := time.Now()err = c.CommitWithPvtData(blockAndPvtData, &ledger.CommitOptions{})c.reportCommitDuration(time.Since(commitStart))if err != nil {return errors.Wrap(err, "commit failed")}
......
}

看一下实现的细节代码:

// listMissingPrivateData identifies missing private write sets and attempts to retrieve them from local transient store
func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets map[rwSetKey][]byte) (*privateDataInfo, error) {if block.Metadata == nil || len(block.Metadata.Metadata) <= int(common.BlockMetadataIndex_TRANSACTIONS_FILTER) {return nil, errors.New("Block.Metadata is nil or Block.Metadata lacks a Tx filter bitmap")}txsFilter := txValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])if len(txsFilter) != len(block.Data.Data) {return nil, errors.Errorf("Block data size(%d) is different from Tx filter size(%d)", len(block.Data.Data), len(txsFilter))}sources := make(map[rwSetKey][]*peer.Endorsement)privateRWsetsInBlock := make(map[rwSetKey]struct{})missing := make(rwSetKeysByTxIDs)data := blockData(block.Data.Data)bi := &transactionInspector{sources:              sources,missingKeys:          missing,ownedRWsets:          ownedRWsets,privateRWsetsInBlock: privateRWsetsInBlock,coordinator:          c,}storePvtDataOfInvalidTx := c.Support.CapabilityProvider.Capabilities().StorePvtDataOfInvalidTx()txList, err := data.forEachTxn(storePvtDataOfInvalidTx, txsFilter, bi.inspectTransaction)if err != nil {return nil, err}privateInfo := &privateDataInfo{sources:                 sources,missingKeysByTxIDs:      missing,txns:                    txList,missingRWSButIneligible: bi.missingRWSButIneligible,}logger.Debug("Retrieving private write sets for", len(privateInfo.missingKeysByTxIDs), "transactions from transient store")// Put into ownedRWsets RW sets that are missing and found in the transient storec.fetchMissingFromTransientStore(privateInfo.missingKeysByTxIDs, ownedRWsets)// In the end, iterate over the ownedRWsets, and if the key doesn't exist in// the privateRWsetsInBlock - delete it from the ownedRWsetsfor k := range ownedRWsets {if _, exists := privateRWsetsInBlock[k]; !exists {logger.Warning("Removed", k.namespace, k.collection, "hash", k.hash, "from the data passed to the ledger")delete(ownedRWsets, k)}}privateInfo.missingKeys = privateInfo.missingKeysByTxIDs.flatten()// Remove all keys we already ownprivateInfo.missingKeys.exclude(func(key rwSetKey) bool {_, exists := ownedRWsets[key]return exists})return privateInfo, nil
}func (c *coordinator) fetchFromPeers(blockSeq uint64, ownedRWsets map[rwSetKey][]byte, privateInfo *privateDataInfo) {dig2src := make(map[privdatacommon.DigKey][]*peer.Endorsement)privateInfo.missingKeys.foreach(func(k rwSetKey) {logger.Debug("Fetching", k, "from peers")dig := privdatacommon.DigKey{TxId:       k.txID,SeqInBlock: k.seqInBlock,Collection: k.collection,Namespace:  k.namespace,BlockSeq:   blockSeq,}dig2src[dig] = privateInfo.sources[k]})//拉取函数fetchedData, err := c.fetch(dig2src)if err != nil {logger.Warning("Failed fetching private data for block", blockSeq, "from peers:", err)return}// Iterate over data fetched from peersfor _, element := range fetchedData.AvailableElements {dig := element.Digestfor _, rws := range element.Payload {hash := hex.EncodeToString(util2.ComputeSHA256(rws))key := rwSetKey{txID:       dig.TxId,namespace:  dig.Namespace,collection: dig.Collection,seqInBlock: dig.SeqInBlock,hash:       hash,}if _, isMissing := privateInfo.missingKeys[key]; !isMissing {logger.Debug("Ignoring", key, "because it wasn't found in the block")continue}ownedRWsets[key] = rwsdelete(privateInfo.missingKeys, key)// If we fetch private data that is associated to block i, then our last block persisted must be i-1// so our ledger height is i, since blocks start from 0.c.TransientStore.Persist(dig.TxId, blockSeq, key.toTxPvtReadWriteSet(rws))logger.Debug("Fetched", key)}}// Iterate over purged datafor _, dig := range fetchedData.PurgedElements {// delete purged key from missing keysfor missingPvtRWKey := range privateInfo.missingKeys {if missingPvtRWKey.namespace == dig.Namespace &&missingPvtRWKey.collection == dig.Collection &&missingPvtRWKey.txID == dig.TxId {delete(privateInfo.missingKeys, missingPvtRWKey)logger.Warningf("Missing key because was purged or will soon be purged, "+"continue block commit without [%+v] in private rwset", missingPvtRWKey)}}}
}
func (p *puller) fetch(dig2src dig2sources) (*privdatacommon.FetchedPvtDataContainer, error) {// computeFilters returns a map from a digest to a routing filterdig2Filter, err := p.computeFilters(dig2src)if err != nil {return nil, errors.WithStack(err)}return p.fetchPrivateData(dig2Filter)
}
func (p *puller) computeFilters(dig2src dig2sources) (digestToFilterMapping, error) {filters := make(map[privdatacommon.DigKey]collectionRoutingFilter)for digest, sources := range dig2src {//得到任意的PeeranyPeerInCollection, err := p.getLatestCollectionConfigRoutingFilter(digest.Namespace, digest.Collection)if err != nil {return nil, errors.WithStack(err)}sources := sources//过滤相关节点endorserPeer, err := p.PeerFilter(common.ChainID(p.channel), func(peerSignature api.PeerSignature) bool {for _, endorsement := range sources {if bytes.Equal(endorsement.Endorser, []byte(peerSignature.PeerIdentity)) {return true}}return false})if err != nil {return nil, errors.WithStack(err)}filters[digest] = collectionRoutingFilter{anyPeer:       anyPeerInCollection,preferredPeer: endorserPeer,}}return filters, nil
}
func (p *puller) fetchPrivateData(dig2Filter digestToFilterMapping) (*privdatacommon.FetchedPvtDataContainer, error) {// Get a list of peers per channel//添加到过滤器的节点allFilters := dig2Filter.flattenFilterValues()members := p.waitForMembership()logger.Debug("Total members in channel:", members)members = filter.AnyMatch(members, allFilters...)logger.Debug("Total members that fit some digest:", members)if len(members) == 0 {logger.Warning("Do not know any peer in the channel(", p.channel, ") that matches the policies , aborting")return nil, errors.New("Empty membership")}members = randomizeMemberList(members)res := &privdatacommon.FetchedPvtDataContainer{}// Distribute requests to peers, and obtain subscriptions for all their messages// matchDigestToPeer returns a map from a peer to the digests which we would ask it forvar peer2digests peer2Digests// We expect all private RWSets represented as digests to be collecteditemsLeftToCollect := len(dig2Filter)// As long as we still have some data to collect and new members to ask the data for://如果节点过滤和成员发送节点同时都存在对象,则建立消息处理循环并发送请求信息for itemsLeftToCollect > 0 && len(members) > 0 {purgedPvt := p.getPurgedCollections(members, dig2Filter)// Need to remove purged digest from mappingfor _, dig := range purgedPvt {res.PurgedElements = append(res.PurgedElements, &proto.PvtDataDigest{TxId:       dig.TxId,BlockSeq:   dig.BlockSeq,SeqInBlock: dig.SeqInBlock,Namespace:  dig.Namespace,Collection: dig.Collection,})// remove digest so we won't even try to pull purged datadelete(dig2Filter, dig)itemsLeftToCollect--}if itemsLeftToCollect == 0 {logger.Debug("No items left to collect")return res, nil}peer2digests, members = p.assignDigestsToPeers(members, dig2Filter)if len(peer2digests) == 0 {logger.Warning("No available peers for digests request, "+"cannot pull missing private data for following digests [%+v], peer membership: [%+v]",dig2Filter.digests(), members)return res, nil}logger.Debug("Matched", len(dig2Filter), "digests to", len(peer2digests), "peer(s)")subscriptions := p.scatterRequests(peer2digests)responses := p.gatherResponses(subscriptions)for _, resp := range responses {if len(resp.Payload) == 0 {logger.Debug("Got empty response for", resp.Digest)continue}delete(dig2Filter, privdatacommon.DigKey{TxId:       resp.Digest.TxId,BlockSeq:   resp.Digest.BlockSeq,SeqInBlock: resp.Digest.SeqInBlock,Namespace:  resp.Digest.Namespace,Collection: resp.Digest.Collection,})itemsLeftToCollect--}res.AvailableElements = append(res.AvailableElements, responses...)}return res, nil
}
//获得符合条件的成员列表
func (p *puller) waitForMembership() []discovery.NetworkMember {polIteration := 0for {members := p.PeersOfChannel(common.ChainID(p.channel))if len(members) != 0 {return members}polIteration++if polIteration == maxMembershipPollIterations {return nil}time.Sleep(membershipPollingBackoff)}
}
func (p *puller) scatterRequests(peersDigestMapping peer2Digests) []util.Subscription {var subscriptions []util.Subscriptionfor peer, digests := range peersDigestMapping {msg := &proto.GossipMessage{Tag:     proto.GossipMessage_CHAN_ONLY,Channel: []byte(p.channel),Nonce:   util.RandomUInt64(),Content: &proto.GossipMessage_PrivateReq{PrivateReq: &proto.RemotePvtDataRequest{Digests: digestsAsPointerSlice(digests),},},}// Subscribe to all digests prior to sending themfor _, dig := range msg.GetPrivateReq().Digests {hash, err := dig.Hash()if err != nil {// Shouldn't happen as we just built this message ourselveslogger.Warning("Failed creating digest", err)continue}sub := p.pubSub.Subscribe(hash, responseWaitTime)subscriptions = append(subscriptions, sub)}logger.Debug("Sending", peer.endpoint, "request", msg.GetPrivateReq().Digests)//发送处理的消息p.Send(msg, peer.AsRemotePeer())}return subscriptions
}
// Send sends a message to remote peers
func (g *gossipServiceImpl) Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer) {m, err := msg.NoopSign()if err != nil {g.logger.Warningf("Failed creating SignedGossipMessage: %+v", errors.WithStack(err))return}g.comm.Send(m, peers...)
}

这里比较复杂,不过一条条的看代码就好了。

九、总结

数据读写这一块数据结构复杂纷纭,但是只要把握住数据走向的流程,弄清楚数据结构之间的关系,再将相应的逻辑理清楚,那么基本就掌握了整个数据读写的重点了。抓住了重点,认清了纲要,一些小的细节只要在应用的过程中再认真加以对待,那么就没有什么大问题了。

Fabric源码分析之九数据库存储源码分析leveldb相关推荐

  1. Android --- SharePreference 存储与数据库存储的效率分析

    原文链接:https://blog.csdn.net/MacaoPark/article/details/114680449 前言 最近到了一家公司,跟一个同事做项目,比如常规的一些操作用Shared ...

  2. 使用python抓包并分析后存入数据库,或直接分析tcpdump和wireshark抓到的包,并存入数据库

    准备工作 抓包首先要用到scapy包 安装scapy包 pip install scapy 在python代码中引用scapy包 from scapy.all import * #为了省事,直接imp ...

  3. java毕业设计阿博图书馆管理系统mybatis+源码+调试部署+系统+数据库+lw

    java毕业设计阿博图书馆管理系统mybatis+源码+调试部署+系统+数据库+lw java毕业设计阿博图书馆管理系统mybatis+源码+调试部署+系统+数据库+lw 本源码技术栈: 项目架构:B ...

  4. java毕业设计教学辅助系统mybatis+源码+调试部署+系统+数据库+lw

    java毕业设计教学辅助系统mybatis+源码+调试部署+系统+数据库+lw java毕业设计教学辅助系统mybatis+源码+调试部署+系统+数据库+lw 本源码技术栈: 项目架构:B/S架构 开 ...

  5. 计算机毕业设计JAVA贴吧管理系统mybatis+源码+调试部署+系统+数据库+lw

    计算机毕业设计JAVA贴吧管理系统mybatis+源码+调试部署+系统+数据库+lw 计算机毕业设计JAVA贴吧管理系统mybatis+源码+调试部署+系统+数据库+lw 本源码技术栈: 项目架构:B ...

  6. java毕业设计后勤管理系统在线报修系统mybatis+源码+调试部署+系统+数据库+lw

    java毕业设计后勤管理系统在线报修系统mybatis+源码+调试部署+系统+数据库+lw java毕业设计后勤管理系统在线报修系统mybatis+源码+调试部署+系统+数据库+lw 本源码技术栈: ...

  7. java毕业设计企业售后服务管理系统mybatis+源码+调试部署+系统+数据库+lw

    java毕业设计企业售后服务管理系统mybatis+源码+调试部署+系统+数据库+lw java毕业设计企业售后服务管理系统mybatis+源码+调试部署+系统+数据库+lw 本源码技术栈: 项目架构 ...

  8. 计算机毕业设计JAVA软件工程专业教辅平台课程子系统mybatis+源码+调试部署+系统+数据库+lw

    计算机毕业设计JAVA软件工程专业教辅平台课程子系统mybatis+源码+调试部署+系统+数据库+lw 计算机毕业设计JAVA软件工程专业教辅平台课程子系统mybatis+源码+调试部署+系统+数据库 ...

  9. 计算机毕业设计JAVA家庭饮用水监测系统mybatis+源码+调试部署+系统+数据库+lw

    计算机毕业设计JAVA家庭饮用水监测系统mybatis+源码+调试部署+系统+数据库+lw 计算机毕业设计JAVA家庭饮用水监测系统mybatis+源码+调试部署+系统+数据库+lw 本源码技术栈: ...

最新文章

  1. R语言与概率统计(六) 主成分分析 因子分析
  2. noip2018——题解总结
  3. 声音对比处理_厨房垃圾处理器 | 厨余垃圾分类的正确打开姿势
  4. 图像边缘检测,检测亦或简化
  5. 【AI初识境】深度学习中常用的损失函数有哪些?
  6. Java内存模型深度解析:顺序一致性
  7. 深度对比学习Vue和React两大框架
  8. 个人博客作业_week2
  9. .Net Core跨平台应用研究-HelloArm(串口篇)
  10. LeetCode之Reverse Integer
  11. Java千百问_03基本语法(002)_java都有哪些关键字
  12. win11可以支持win10驱动吗 Windows11更新驱动的步骤方法
  13. 通俗讲解比特币的原理及运作机制
  14. 升压和升降压拓扑中IDC与IO的关系推导 // 《精通开关电源设计》P41式2-2
  15. [科研自学神器]中国大学MOOC下载器
  16. 虚拟vpc服务器搭建,服务器搭建vpc
  17. m4a怎么转换mp3格式?
  18. 1.(python)阿拉伯数字转中文大写
  19. 【转】Windows10彻底关闭休眠功能
  20. python冒号_python数组冒号取值操作

热门文章

  1. python基础--列表、元组、字典和集合
  2. 西门子S7-200 SMART如何实现远程监控并通过手机自动报警
  3. 第六章:Matplotlib之场景案例显神通
  4. C++实现爬取网页源代码并下载至本地文件(可直接运行)
  5. Solidity基础四
  6. ConstantValue属性
  7. 教你如何用1角,2角,5角的硬币凑出10元以下的金额【C语言】
  8. jmeter 添加html断言,JMeter-断言
  9. [Leetcode]658. Find K Closest Elements
  10. 新浪微博开发之查看详细微博的实现