zoukankan      html  css  js  c++  java
  • IBM openblockchain学习(三)--Ledger源码分析

    Ledger是总账簿的意思,也就是blockchain中存储交易记录的部分。其代码包含如下,这块代码量大,可能分析时间会很长,希望读者耐心等待。

    blockchain

    先看下Blockchain在内存中保存的基本信息,Blockchain中的操作不是线程安全的

    type blockchain struct {
        size               uint64  //块大小
        previousBlockHash  []byte   //上一个块的哈希
        indexer            blockchainIndexer //块索引
        lastProcessedBlock *lastProcessedBlock  //最后处理的块
    }

    最后处理的块的结构

    type lastProcessedBlock struct {
        block       *protos.Block
        blockNumber uint64  块数
        blockHash   []byte 块哈希值
    }

    newBlockchain

    newBlockchain()用于创建一个区块

    func newBlockchain() (*blockchain, error) {
        size, err := fetchBlockchainSizeFromDB()//从数据库中读取blockchain的大小
        if err != nil {
            return nil, err
        }
        blockchain := &blockchain{0, nil, nil, nil}
        blockchain.size = size
        if size > 0 {
            previousBlock, err := fetchBlockFromDB(size - 1)
            //如果为创世区块,则上一个块是创世区块的大小减一
            if err != nil {
                return nil, err
            }
            previousBlockHash, err := previousBlock.GetHash()
            //获取前驱块的哈希
            if err != nil {
                return nil, err
            }
            blockchain.previousBlockHash = previousBlockHash
        }
    
        err = blockchain.startIndexer()
        //开始创建索引
        if err != nil {
            return nil, err
        }
        return blockchain, nil
    }
    

    startIndexer()

    创建索引

    func (blockchain *blockchain) startIndexer() (err error) {
        if indexBlockDataSynchronously {
            blockchain.indexer = newBlockchainIndexerSync()
            //同步创建区块链索引
        } else {
            blockchain.indexer = newBlockchainIndexerAsync()
        }
        err = blockchain.indexer.start(blockchain)
        return
    }

    getLastBlock

    getLastBlock创建最后区块

    func (blockchain *blockchain) getLastBlock() (*protos.Block, error) {
        if blockchain.size == 0 {
            return nil, nil
        }
        return blockchain.getBlock(blockchain.size - 1)
    }
    

    getSize

    getSize用于返回块大小

    func (blockchain *blockchain) getSize() uint64 {
        return blockchain.size
    }

    getBlock

    在blockchain中通过任意高度获取块

    
    func (blockchain *blockchain) getBlock(blockNumber uint64) (*protos.Block, error) {
        return fetchBlockFromDB(blockNumber)
    }

    getBlockByHash

    通过块的哈希获取块

    func (blockchain *blockchain) getBlockByHash(blockHash []byte) (*protos.Block, error) {
        blockNumber, err := blockchain.indexer.fetchBlockNumberByBlockHash(blockHash)
        if err != nil {
            return nil, err
        }
        return blockchain.getBlock(blockNumber)
    }

    getTransactionByUUID

    通过UUID获取交易记录

    func (blockchain *blockchain) getTransactionByUUID(txUUID string) (*protos.Transaction, error) {
        blockNumber, txIndex, err := blockchain.indexer.fetchTransactionIndexByUUID(txUUID)
        if err != nil {
            return nil, err
        }
        block, err := blockchain.getBlock(blockNumber)
        if err != nil {
            return nil, err
        }
        transaction := block.GetTransactions()[txIndex]
        return transaction, nil
    }
    

    getTransactions

    通过有块号标识的块获取所有的交易

    func (blockchain *blockchain) getTransactions(blockNumber uint64) ([]*protos.Transaction, error) {
        block, err := blockchain.getBlock(blockNumber)
        if err != nil {
            return nil, err
        }
        return block.GetTransactions(), nil
    }

    getTransactionsByBlockHash

    通过块的哈希获取所有的交易

    func (blockchain *blockchain) getTransactionsByBlockHash(blockHash []byte) ([]*protos.Transaction, error) {
        block, err := blockchain.getBlockByHash(blockHash)
        if err != nil {
            return nil, err
        }
        return block.GetTransactions(), nil
    }
    

    getTransaction

    通过数块和确定块内索引获取交易

    func (blockchain *blockchain) getTransaction(blockNumber uint64, txIndex uint64) (*protos.Transaction, error) {
        block, err := blockchain.getBlock(blockNumber)
        if err != nil {
            return nil, err
        }
        return block.GetTransactions()[txIndex], nil
    }

    getTransactionByBlockHash

    通过块内块的哈希和标识索引获取交易

    func (blockchain *blockchain) getTransactionByBlockHash(blockHash []byte, txIndex uint64) (*protos.Transaction, error) {
        block, err := blockchain.getBlockByHash(blockHash)
        if err != nil {
            return nil, err
        }
        return block.GetTransactions()[txIndex], nil
    }

    getBlockchainInfo

    获取区块链的信息

    func (blockchain *blockchain) getBlockchainInfo() (*protos.BlockchainInfo, error) {
        if blockchain.getSize() == 0 {
            return &protos.BlockchainInfo{Height: 0}, nil
        }
    
        lastBlock, err := blockchain.getLastBlock()
        if err != nil {
            return nil, err
        }
    
        info := &protos.BlockchainInfo{
            Height:            blockchain.getSize(),
            CurrentBlockHash:  blockchain.previousBlockHash,
            PreviousBlockHash: lastBlock.PreviousBlockHash}
    
        return info, nil
    }

    buildBlock

    创建块

    func (blockchain *blockchain) buildBlock(block *protos.Block, stateHash []byte) *protos.Block {
        block.SetPreviousBlockHash(blockchain.previousBlockHash)
        block.StateHash = stateHash
        return block
    }
    

    addPersistenceChangesForNewBlock

    对于新块添加持久性的更改

    func (blockchain *blockchain) addPersistenceChangesForNewBlock(ctx context.Context,
        block *protos.Block, stateHash []byte, writeBatch *gorocksdb.WriteBatch) (uint64, error) {
        block = blockchain.buildBlock(block, stateHash)
        if block.NonHashData == nil {
            block.NonHashData = &protos.NonHashData{LocalLedgerCommitTimestamp: util.CreateUtcTimestamp()}
        } else {
            block.NonHashData.LocalLedgerCommitTimestamp = util.CreateUtcTimestamp()
        }
        blockNumber := blockchain.size
        blockHash, err := block.GetHash()
        if err != nil {
            return 0, err
        }
        blockBytes, blockBytesErr := block.Bytes()
        if blockBytesErr != nil {
            return 0, blockBytesErr
        }
        writeBatch.PutCF(db.GetDBHandle().BlockchainCF, encodeBlockNumberDBKey(blockNumber), blockBytes)
        writeBatch.PutCF(db.GetDBHandle().BlockchainCF, blockCountKey, encodeUint64(blockNumber+1))
        if blockchain.indexer.isSynchronous() {
            blockchain.indexer.createIndexesSync(block, blockNumber, blockHash, writeBatch)
        }
        blockchain.lastProcessedBlock = &lastProcessedBlock{block, blockNumber, blockHash}
        return blockNumber, nil
    }
    

    blockPersistenceStatus

    块持久状态

    func (blockchain *blockchain) blockPersistenceStatus(success bool) {
        if success {
            blockchain.size++
            blockchain.previousBlockHash = blockchain.lastProcessedBlock.blockHash
            if !blockchain.indexer.isSynchronous() {
                blockchain.indexer.createIndexesAsync(blockchain.lastProcessedBlock.block,
                    blockchain.lastProcessedBlock.blockNumber, blockchain.lastProcessedBlock.blockHash)
            }
        }
        blockchain.lastProcessedBlock = nil
    }

    persistRawBlock

    持久化原始块

    func (blockchain *blockchain) persistRawBlock(block *protos.Block, blockNumber uint64) error {
        blockBytes, blockBytesErr := block.Bytes()
        if blockBytesErr != nil {
            return blockBytesErr
        }
        writeBatch := gorocksdb.NewWriteBatch()
        defer writeBatch.Destroy()
        writeBatch.PutCF(db.GetDBHandle().BlockchainCF, encodeBlockNumberDBKey(blockNumber), blockBytes)
    
        // 它需要检查,因为我们在这样的情况下块/状态同步支持乱序块。其真正意义区块链的高度,而不是规模。
        if blockchain.getSize() < blockNumber+1 {
            sizeBytes := encodeUint64(blockNumber + 1)
            writeBatch.PutCF(db.GetDBHandle().BlockchainCF, blockCountKey, sizeBytes)
            blockchain.size = blockNumber + 1
        }
        blockHash, err := block.GetHash()
        if err != nil {
            return err
        }
    
        if blockchain.indexer.isSynchronous() {
            blockchain.indexer.createIndexesSync(block, blockNumber, blockHash, writeBatch)
        }
    
        opt := gorocksdb.NewDefaultWriteOptions()
        defer opt.Destroy()
        err = db.GetDBHandle().DB.Write(opt, writeBatch)
        if err != nil {
            return err
        }
        return nil
    }
    

    fetchBlockFromDB

    从数据库中获取块

    func fetchBlockFromDB(blockNumber uint64) (*protos.Block, error) {
        blockBytes, err := db.GetDBHandle().GetFromBlockchainCF(encodeBlockNumberDBKey(blockNumber))
        if err != nil {
            return nil, err
        }
        if blockBytes == nil {
            return nil, nil
        }
        return protos.UnmarshallBlock(blockBytes)
    }

    fetchTransactionFromDB

    从数据库中获取交易记录

    func fetchTransactionFromDB(blockNum uint64, txIndex uint64) (*protos.Transaction, error) {
        block, err := fetchBlockFromDB(blockNum)
        if err != nil {
            return nil, err
        }
        return block.GetTransactions()[txIndex], nil
    }

    fetchBlockchainSizeFromDB

    从数据库中获取区块链的大小

    func fetchBlockchainSizeFromDB() (uint64, error) {
        bytes, err := db.GetDBHandle().GetFromBlockchainCF(blockCountKey)
        if err != nil {
            return 0, err
        }
        if bytes == nil {
            return 0, nil
        }
        return decodeToUint64(bytes), nil
    }

    fetchBlockchainSizeFromSnapshot

    从快照中获取区块链大小

    func fetchBlockchainSizeFromSnapshot(snapshot *gorocksdb.Snapshot) (uint64, error) {
        blockNumberBytes, err := db.GetDBHandle().GetFromBlockchainCFSnapshot(snapshot, blockCountKey)
        if err != nil {
            return 0, err
        }
        var blockNumber uint64
        if blockNumberBytes != nil {
            blockNumber = decodeToUint64(blockNumberBytes)
        }
        return blockNumber, nil
    }

    String

    将区块链的信息以字符串形式输出

    func (blockchain *blockchain) String() string {
        var buffer bytes.Buffer
        size := blockchain.getSize()
        for i := uint64(0); i < size; i++ {
            block, blockErr := blockchain.getBlock(i)
            if blockErr != nil {
                return ""
            }
            buffer.WriteString("
    ----------<block #")
            buffer.WriteString(strconv.FormatUint(i, 10))
            buffer.WriteString(">----------
    ")
            buffer.WriteString(block.String())
            buffer.WriteString("
    ----------<\block #")
            buffer.WriteString(strconv.FormatUint(i, 10))
            buffer.WriteString(">----------
    ")
        }
        return buffer.String()
    }

    blockchain_indexes

    blockchainIndexer定义了以下几个接口

    type blockchainIndexer interface {
        //同步标识
        isSynchronous() bool
        //开始创建
        start(blockchain *blockchain) error
        //同步创建索引
        createIndexesSync(block *protos.Block, blockNumber uint64, blockHash []byte, writeBatch *gorocksdb.WriteBatch) error
        //异步创建索引
        createIndexesAsync(block *protos.Block, blockNumber uint64, blockHash []byte) error
        //通过块哈希获取块号
        fetchBlockNumberByBlockHash(blockHash []byte) (uint64, error)
        //通过UUID获取块号
        fetchTransactionIndexByUUID(txUUID string) (uint64, uint64, error)
        //停止创建
        stop()
    }

    addIndexDataForPersistence

    持久化并且检索索引数据

    func addIndexDataForPersistence(block *protos.Block, blockNumber uint64, blockHash []byte, writeBatch *gorocksdb.WriteBatch) error {
        openchainDB := db.GetDBHandle()
        cf := openchainDB.IndexesCF
    
        // 块号映射成块哈希值
        indexLogger.Debug("Indexing block number [%d] by hash = [%x]", blockNumber, blockHash)
        writeBatch.PutCF(cf, encodeBlockHashKey(blockHash), encodeBlockNumber(blockNumber))
    
        addressToTxIndexesMap := make(map[string][]uint64)
        addressToChaincodeIDsMap := make(map[string][]*protos.ChaincodeID)
    
        transactions := block.GetTransactions()
        for txIndex, tx := range transactions {
            // 添加TXT UUID - >(块号,索引中块)
            writeBatch.PutCF(cf, encodeTxUUIDKey(tx.Uuid), encodeBlockNumTxIndex(blockNumber, uint64(txIndex)))
    
            txExecutingAddress := getTxExecutingAddress(tx)
            addressToTxIndexesMap[txExecutingAddress] = append(addressToTxIndexesMap[txExecutingAddress], uint64(txIndex))
    
            switch tx.Type {
            case protos.Transaction_CHAINCODE_NEW, protos.Transaction_CHAINCODE_UPDATE:
                authroizedAddresses, chaincodeID := getAuthorisedAddresses(tx)
                for _, authroizedAddress := range authroizedAddresses {
                    addressToChaincodeIDsMap[authroizedAddress] = append(addressToChaincodeIDsMap[authroizedAddress], chaincodeID)
                }
            }
        }
        for address, txsIndexes := range addressToTxIndexesMap {
            writeBatch.PutCF(cf, encodeAddressBlockNumCompositeKey(address, blockNumber), encodeListTxIndexes(txsIndexes))
        }
        return nil
    }
    

    getAuthorisedAddresses

    获得授权地址

    func getAuthorisedAddresses(tx *protos.Transaction) ([]string, *protos.ChaincodeID) {
        // 从chaincode的部署TX中获取取地址
        // 这个方法也会返回错误
        data := tx.ChaincodeID
        cID := &protos.ChaincodeID{}
        err := proto.Unmarshal(data, cID)
        if err != nil {
            return nil, nil
        }
        return []string{"address1", "address2"}, cID
    }
    

    encodeBlockNumber

    编码/解码数据库键/值函数,索引数据编码/解码块数

    
    func encodeBlockNumber(blockNumber uint64) []byte {
        return proto.EncodeVarint(blockNumber)
    }
    func decodeBlockNumber(blockNumberBytes []byte) (blockNumber uint64) {
        blockNumber, _ = proto.DecodeVarint(blockNumberBytes)
        return
    }

    encodeBlockNumTxIndex

    对 块号的Tx索引进行编码/解码

    func encodeBlockNumTxIndex(blockNumber uint64, txIndexInBlock uint64) []byte {
        b := proto.NewBuffer([]byte{})
        b.EncodeVarint(blockNumber)
        b.EncodeVarint(txIndexInBlock)
        return b.Bytes()
    }
    
    func decodeBlockNumTxIndex(bytes []byte) (blockNum uint64, txIndex uint64, err error) {
        b := proto.NewBuffer(bytes)
        blockNum, err = b.DecodeVarint()
        if err != nil {
            return
        }
        txIndex, err = b.DecodeVarint()
        if err != nil {
            return
        }
        return
    }
    

    对区块哈希的键值进行编码

    
    func encodeBlockHashKey(blockHash []byte) []byte {
        return prependKeyPrefix(prefixBlockHashKey, blockHash)
    }

    对TxUUID的键值进行编码

    func encodeTxUUIDKey(txUUID string) []byte {
        return prependKeyPrefix(prefixTxUUIDKey, []byte(txUUID))
    }

    对区块号地址的复合键值进行编码

    func encodeAddressBlockNumCompositeKey(address string, blockNumber uint64) []byte {
        b := proto.NewBuffer([]byte{prefixAddressBlockNumCompositeKey})
        b.EncodeRawBytes([]byte(address))
        b.EncodeVarint(blockNumber)
        return b.Bytes()
    }

    对Tx的索引清单进行编码

    func encodeListTxIndexes(listTx []uint64) []byte {
        b := proto.NewBuffer([]byte{})
        for i := range listTx {
            b.EncodeVarint(listTx[i])
        }
        return b.Bytes()
    }

    对chaincode的ID进行编码

    func encodeChaincodeID(c *protos.ChaincodeID) []byte {
        // 序列化chaincode ID
        return []byte{}
    }
    

    前置键值前缀

    func prependKeyPrefix(prefix byte, key []byte) []byte {
        modifiedKey := []byte{}
        modifiedKey = append(modifiedKey, prefix)
        modifiedKey = append(modifiedKey, key...)
        return modifiedKey
    }
    

    blockchain_indexes_async

    整个代码主要执行对blockchain的异步创建索引

    type blockchainIndexerAsync struct {
        blockchain *blockchain
        //从块链转移块索引的通道
        blockChan    chan blockWrapper
        indexerState *blockchainIndexerState
    }

    createIndexesInternal

    创建索引条目并逐步添加到数据库,用于创建各种属性的索引

    func (indexer *blockchainIndexerAsync) createIndexesInternal(block *protos.Block, blockNumber uint64, blockHash []byte) error {
        openchainDB := db.GetDBHandle()
        writeBatch := gorocksdb.NewWriteBatch()
        defer writeBatch.Destroy()
        addIndexDataForPersistence(block, blockNumber, blockHash, writeBatch)
        writeBatch.PutCF(openchainDB.IndexesCF, lastIndexedBlockKey, encodeBlockNumber(blockNumber))
        opt := gorocksdb.NewDefaultWriteOptions()
        defer opt.Destroy()
        err := openchainDB.DB.Write(opt, writeBatch)
        if err != nil {
            return err
        }
        indexer.indexerState.blockIndexed(blockNumber)
        return nil
    }

    indexPendingBlocks

    待定块的索引

    func (indexer *blockchainIndexerAsync) indexPendingBlocks() error {
        blockchain := indexer.blockchain
        if blockchain.getSize() == 0 {
            // 链至今为空
            return nil
        }
    
        lastCommittedBlockNum := blockchain.getSize() - 1
        lastIndexedBlockNum := indexer.indexerState.getLastIndexedBlockNumber()
        if lastCommittedBlockNum == lastIndexedBlockNum {
            //所有块索引的提交
            return nil
        }
    
        for ; lastIndexedBlockNum < lastCommittedBlockNum; lastIndexedBlockNum++ {
            blockNumToIndex := lastIndexedBlockNum + 1
            blockToIndex, errBlockFetch := blockchain.getBlock(blockNumToIndex)
            if errBlockFetch != nil {
                return errBlockFetch
            }
    
            blockHash, errBlockHash := blockToIndex.GetHash()
            if errBlockHash != nil {
                return errBlockHash
            }
            indexer.createIndexesInternal(blockToIndex, blockNumToIndex, blockHash)
        }
        return nil
    }
    

    blockIndexed

    块索引

    func (indexerState *blockchainIndexerState) blockIndexed(blockNumber uint64) {
        indexerState.newBlockIndexed.L.Lock()
        defer indexerState.newBlockIndexed.L.Unlock()
        indexerState.lastBlockIndexed = blockNumber
        indexerState.zerothBlockIndexed = true
        indexerState.newBlockIndexed.Broadcast()
    }
    

    waitForLastCommittedBlock

    等待最后一个块的创建

    func (indexerState *blockchainIndexerState) waitForLastCommittedBlock() (err error) {
        chain := indexerState.indexer.blockchain
        if err != nil || chain.getSize() == 0 {
            return
        }
    
        lastBlockCommitted := chain.getSize() - 1
    
        indexerState.newBlockIndexed.L.Lock()
        defer indexerState.newBlockIndexed.L.Unlock()
    
        if !indexerState.zerothBlockIndexed {
            indexLogger.Debug(
                "Waiting for zeroth block to be indexed. lastBlockCommitted=[%d] and lastBlockIndexed=[%d]",
                lastBlockCommitted, indexerState.lastBlockIndexed)
            indexerState.newBlockIndexed.Wait()
        }
    
        for indexerState.lastBlockIndexed < lastBlockCommitted {
            indexLogger.Debug(
                "Waiting for index to catch up with block chain. lastBlockCommitted=[%d] and lastBlockIndexed=[%d]",
                lastBlockCommitted, indexerState.lastBlockIndexed)
            indexerState.newBlockIndexed.Wait()
        }
        return
    }

    fetchLastIndexedBlockNumFromDB

    获取从数据库中得到上一个块号的块索引

    func fetchLastIndexedBlockNumFromDB() (zerothBlockIndexed bool, lastIndexedBlockNum uint64, err error) {
        lastIndexedBlockNumberBytes, err := db.GetDBHandle().GetFromIndexesCF(lastIndexedBlockKey)
        if err != nil {
            return
        }
        if lastIndexedBlockNumberBytes == nil {
            return
        }
        lastIndexedBlockNum = decodeBlockNumber(lastIndexedBlockNumberBytes)
        zerothBlockIndexed = true
        return
    }

    ledger

    先看下ledger的结构

    type Ledger struct {
        blockchain *blockchain //区块链
        state      *state.State //状态
        currentID  interface{} //当前ID
    }

    GetLedger

    给出”单个“ledger的引用

    func GetLedger() (*Ledger, error) {
        once.Do(func() {
            ledger, ledgerError = newLedger()
        })
        return ledger, ledgerError
    }

    BeginTxBatch

    开始批量发出

    func (ledger *Ledger) BeginTxBatch(id interface{}) error {
        err := ledger.checkValidIDBegin()
        if err != nil {
            return err
        }
        ledger.currentID = id
        return nil
    }

    GetTXBatchPreviewBlock

    返回将具有相同块的哈希,如果ledger.CommitTxBatch使用相同的参数则提交到数据库。如果该状态是由一个事务这两个调用之间修改,散列将不同。该块预览不包括非散列数据,如本地时间戳。

    func (ledger *Ledger) GetTXBatchPreviewBlock(id interface{},
        transactions []*protos.Transaction, metadata []byte) (*protos.Block, error) {
        err := ledger.checkValidIDCommitORRollback(id)
        if err != nil {
            return nil, err
        }
        stateHash, err := ledger.state.GetHash()
        if err != nil {
            return nil, err
        }
        return ledger.blockchain.buildBlock(protos.NewBlock(transactions, metadata), stateHash), nil
    }

    CommitTxBatch

    CommitTxBatch被调用时,当前事务需要分批次提交,该函数成功返回了交易的细节和状态变化(可能在这个交易批量的执行过程中发生)一直致力于持久化存储

    func (ledger *Ledger) CommitTxBatch(id interface{}, transactions []*protos.Transaction, transactionResults []*protos.TransactionResult, metadata []byte) error {
        err := ledger.checkValidIDCommitORRollback(id)
        if err != nil {
            return err
        }
    
        stateHash, err := ledger.state.GetHash()
        if err != nil {
            ledger.resetForNextTxGroup(false)
            ledger.blockchain.blockPersistenceStatus(false)
            return err
        }
    
        writeBatch := gorocksdb.NewWriteBatch()
        defer writeBatch.Destroy()
        block := protos.NewBlock(transactions, metadata)
        block.NonHashData = &protos.NonHashData{TransactionResults: transactionResults}
        newBlockNumber, err := ledger.blockchain.addPersistenceChangesForNewBlock(context.TODO(), block, stateHash, writeBatch)
        if err != nil {
            ledger.resetForNextTxGroup(false)
            ledger.blockchain.blockPersistenceStatus(false)
            return err
        }
        ledger.state.AddChangesForPersistence(newBlockNumber, writeBatch)
        opt := gorocksdb.NewDefaultWriteOptions()
        defer opt.Destroy()
        dbErr := db.GetDBHandle().DB.Write(opt, writeBatch)
        if dbErr != nil {
            ledger.resetForNextTxGroup(false)
            ledger.blockchain.blockPersistenceStatus(false)
            return dbErr
        }
    
        ledger.resetForNextTxGroup(true)
        ledger.blockchain.blockPersistenceStatus(true)
    
        sendProducerBlockEvent(block)
        return nil
    }
    

    RollbackTxBatch

    批处理回滚时放弃当前事务批次执行过程中可能发生的所有状态变化

    func (ledger *Ledger) RollbackTxBatch(id interface{}) error {
        ledgerLogger.Debug("RollbackTxBatch for id = [%s]", id)
        err := ledger.checkValidIDCommitORRollback(id)
        if err != nil {
            return err
        }
        ledger.resetForNextTxGroup(false)
        return nil
    }

    TxBegin

    标志着在持续一批新的交易开始

    func (ledger *Ledger) TxBegin(txUUID string) {
        ledger.state.TxBegin(txUUID)
    }

    TxFinished

    标志着正在进行交易的完成。如果成功话设置为false,丢弃事务的状态变化

    func (ledger *Ledger) TxFinished(txUUID string, txSuccessful bool) {
        ledger.state.TxFinish(txUUID, txSuccessful)
    }

    GetTempStateHash

    计算哈希状态并考虑到当前事务批次执行过程中可能发生的状态变化

    func (ledger *Ledger) GetTempStateHash() ([]byte, error) {
        return ledger.state.GetHash()
    }

    GetTempStateHashWithTxDeltaStateHashes

    除状态散列(如在方法GetTempStateHash定义),
    此方法返回一个映射[TX的txUuid - > cryptoHash(stateChange MadeBySIx),只有TX成功,才会出现在该映射中

    func (ledger *Ledger) GetTempStateHashWithTxDeltaStateHashes() ([]byte, map[string][]byte, error) {
        stateHash, err := ledger.state.GetHash()
        return stateHash, ledger.state.GetTxStateDeltaHash(), err
    }

    GetState

    获取chaincode的id和键值。如果提交为false,它首先会在内存中查看。如果丢失的话,将从数据库中获取。如果提交为true,则仅仅只能在数据库中获取。

    func (ledger *Ledger) GetState(chaincodeID string, key string, committed bool) ([]byte, error) {
        return ledger.state.Get(chaincodeID, key, committed)
    }

    GetStateRangeScanIterator

    返回一个迭代器来获取所有startKey和endKey之间的键(和值)(假设键的词汇顺序)为chaincodeID。如果提交为true,则从数据库检索的键值是唯一。如果提交为false,从数据库被mergerd后的结果与在存储器中的结果(优先考虑在内存中的数据)在返回的迭代的键值是不同的

     guaranteed to be in any specific order
    func (ledger *Ledger) GetStateRangeScanIterator(chaincodeID string, startKey string, endKey string, committed bool) (statemgmt.RangeScanIterator, error) {
        return ledger.state.GetRangeScanIterator(chaincodeID, startKey, endKey, committed)
    }
    

    GetStateSnapshot

    返回当前块点对点全局状态。 这个是在从一个端到另一个端转化中的状态时使用。必须调用状态Snapshot.Release()方法一旦你与快照是以释放资源完成的。

    func (ledger *Ledger) GetStateSnapshot() (*state.StateSnapshot, error) {
        dbSnapshot := db.GetDBHandle().GetSnapshot()
        blockHeight, err := fetchBlockchainSizeFromSnapshot(dbSnapshot)
        if err != nil {
            dbSnapshot.Release()
            return nil, err
        }
        if 0 == blockHeight {
            dbSnapshot.Release()
            return nil, fmt.Errorf("Blockchain has no blocks, cannot determine block number")
        }
        return ledger.state.GetSnapshot(blockHeight-1, dbSnapshot)
    }
    

    GetStateDelta

    如果可用,则返回指定块的状态增量。

    func (ledger *Ledger) GetStateDelta(blockNumber uint64) (*statemgmt.StateDelta, error) {
        if blockNumber >= ledger.GetBlockchainSize() {
            return nil, ErrOutOfBounds
        }
        return ledger.state.FetchStateDeltaFromDB(blockNumber)
    }

    ApplyStateDelta

    即适用于一个当前的状态状态增量。它只在内存改变。必须调用ledger.CommitStateDelta持久化到数据库。这应该只被用来作为状态同步的一部分。状态增量可以从另一对等虽然Ledger.GetStateDelta函数检索或者与来自Ledger.GetStateshot()获取密钥创​​建的状态增量。举一个例子,在ledger_test.go定义的TestSetRawState。请注意,没有在此功能上检查它是否被调用,以确保增量是在正确的顺序中使用。例如,如果你目前正处于块8,并调用Ledger.GetStateDelta(10)的功能检索增量,您现在会是在一个糟糕的状态,因为你没有块9.申请增量这是可能的回滚状态向前或向后使用stateDelta.RollBackwards。默认情况下,块3检索的增量可以被用来从状态向前回滚在块2到状态在块3.如果
    stateDelta.RollBackwards =false,增量检索块3可用于向后滚动块3状态和块2的状态。

    func (ledger *Ledger) ApplyStateDelta(id interface{}, delta *statemgmt.StateDelta) error {
        err := ledger.checkValidIDBegin()
        if err != nil {
            return err
        }
        ledger.currentID = id
        ledger.state.ApplyStateDelta(delta)
        return nil
    }

    CommitStateDelta

    将提交ledger.ApplyState状态增量并传递到到数据库

    func (ledger *Ledger) CommitStateDelta(id interface{}) error {
        err := ledger.checkValidIDCommitORRollback(id)
        if err != nil {
            return err
        }
        defer ledger.resetForNextTxGroup(true)
        return ledger.state.CommitStateDelta()
    }

    RollbackStateDelta

    放弃到ledger.ApplyStateDelta状态增量

    func (ledger *Ledger) RollbackStateDelta(id interface{}) error {
        err := ledger.checkValidIDCommitORRollback(id)
        if err != nil {
            return err
        }
        ledger.resetForNextTxGroup(false)
        return nil
    }

    VerifyChain

    将验证blockchain的integrety。完成这一步
    通过确保存储在每个块中的前一个块的哈希链中的前块的实际散列相匹配。返回值是包含不匹配的前一个块的散列块的块号。例如,如果验证链(0,99)称为与prevous哈希值存储在块8中,32和42不相匹配各自前块42的实际的哈希值将是从该函数的返回值。 highBlock在链中高级验证。 如果你要验证的整个链条中,使用ledger.GetBlockchainsize() - 1。低块是在链中被低级验证。如果您想验证整个链条,为创世区块使用0。

    func (ledger *Ledger) VerifyChain(highBlock, lowBlock uint64) (uint64, error) {
        if highBlock >= ledger.GetBlockchainSize() {
            return highBlock, ErrOutOfBounds
        }
        if highBlock <= lowBlock {
            return lowBlock, ErrOutOfBounds
        }
    
        for i := highBlock; i > lowBlock; i-- {
            currentBlock, err := ledger.GetBlockByNumber(i)
            if err != nil {
                return i, fmt.Errorf("Error fetching block %d.", i)
            }
            if currentBlock == nil {
                return i, fmt.Errorf("Block %d is nil.", i)
            }
            previousBlock, err := ledger.GetBlockByNumber(i - 1)
            if err != nil {
                return i - 1, fmt.Errorf("Error fetching block %d.", i)
            }
            if previousBlock == nil {
                return i - 1, fmt.Errorf("Block %d is nil.", i-1)
            }
    
            previousBlockHash, err := previousBlock.GetHash()
            if err != nil {
                return i - 1, fmt.Errorf("Error calculating block hash for block %d.", i-1)
            }
            if bytes.Compare(previousBlockHash, currentBlock.PreviousBlockHash) != 0 {
                return i, nil
            }
        }
    
        return 0, nil
    }
    

    sendProducerBlockEvent

    func sendProducerBlockEvent(block *protos.Block) {
    
        // 从部署删除交易的有效载荷。这样做是为了创建块
        //这些类型的交易使事件更轻巧,有效载荷有可能非常大
        blockTransactions := block.GetTransactions()
        for _, transaction := range blockTransactions {
            if transaction.Type == protos.Transaction_CHAINCODE_NEW {
                deploymentSpec := &protos.ChaincodeDeploymentSpec{}
                err := proto.Unmarshal(transaction.Payload, deploymentSpec)
                if err != nil {
                    ledgerLogger.Error(fmt.Sprintf("Error unmarshalling deployment transaction for block event: %s", err))
                    continue
                }
                deploymentSpec.CodePackage = nil
                deploymentSpecBytes, err := proto.Marshal(deploymentSpec)
                if err != nil {
                    ledgerLogger.Error(fmt.Sprintf("Error marshalling deployment transaction for block event: %s", err))
                    continue
                }
                transaction.Payload = deploymentSpecBytes
            }
        }
    
        producer.Send(producer.CreateBlockEvent(block))
    }

    genesis

    类似于chaincode,调用go-logging中logging库的MustGetLogger函数对genesis package进行记录

    var genesisLogger = logging.MustGetLogger("genesis")

    MakeGenesis

    MakeGenesis基于在openchain.yaml中配置创建创世区块,并把它添加到blockchain。

    func MakeGenesis() error {
        once.Do(func() {
            ledger, err := ledger.GetLedger()
            if err != nil {
                makeGenesisError = err
                return
            }
    
            if ledger.GetBlockchainSize() > 0 {
                // 获取blockchain的大小,如果大于0代表创世区块已经存在
                return
            }
    
            genesisLogger.Info("Creating genesis block.")
    
            ledger.BeginTxBatch(0)
            var genesisTransactions []*protos.Transaction
            //我们现在禁用在有效期内部署,甚至不应该允许它在配置中启用,将其设置为false
            allowDeployValidityPeriod := false
    
            if(deploySystemChaincodeEnabled() && allowDeployValidityPeriod){
                vpTransaction, deployErr :=  deployUpdateValidityPeriodChaincode()
    
                if deployErr != nil {
                    genesisLogger.Error("Error deploying validity period system chaincode for genesis block.", deployErr)
                    makeGenesisError = deployErr
                    return
                }
    
                genesisTransactions = append(genesisTransactions, vpTransaction)
            }
    
            genesis := viper.GetStringMap("ledger.blockchain.genesisBlock")
    
            if genesis == nil {
                genesisLogger.Info("No genesis block chaincodes defined.")
            } else {
    
                chaincodes, chaincodesOK := genesis["chaincode"].([]interface{})
                if !chaincodesOK {
                    genesisLogger.Info("No genesis block chaincodes defined.")
                    ledger.CommitTxBatch(0, genesisTransactions, nil, nil)
                    return
                }
    
                genesisLogger.Debug("Genesis chaincodes are %s", chaincodes)
    
    
                for i := 0; i < len(chaincodes); i++ {
                    genesisLogger.Debug("Chaincode %d is %s", i, chaincodes[i])
    
                    chaincodeMap, chaincodeMapOK := chaincodes[i].(map[interface{}]interface{})
                    if !chaincodeMapOK {
                        genesisLogger.Error("Invalid chaincode defined in genesis configuration:", chaincodes[i])
                        makeGenesisError = fmt.Errorf("Invalid chaincode defined in genesis configuration: %s", chaincodes[i])
                        return
                    }
    
                    path, pathOK := chaincodeMap["path"].(string)
                    if !pathOK {
                        genesisLogger.Error("Invalid chaincode URL defined in genesis configuration:", chaincodeMap["path"])
                        makeGenesisError = fmt.Errorf("Invalid chaincode URL defined in genesis configuration: %s", chaincodeMap["path"])
                        return
                    }
    
                    chaincodeType, chaincodeTypeOK := chaincodeMap["type"].(string)
                    if !chaincodeTypeOK {
                        genesisLogger.Error("Invalid chaincode type defined in genesis configuration:", chaincodeMap["type"])
                        makeGenesisError = fmt.Errorf("Invalid chaincode type defined in genesis configuration: %s", chaincodeMap["type"])
                        return
                    }
    
                    chaincodeID := &protos.ChaincodeID{Path: path, Name: ""}
    
                    genesisLogger.Debug("Genesis chaincodeID %s", chaincodeID)
                    genesisLogger.Debug("Genesis chaincode type %s", chaincodeType)
    
                    constructorMap, constructorMapOK := chaincodeMap["constructor"].(map[interface{}]interface{})
                    if !constructorMapOK {
                        genesisLogger.Error("Invalid chaincode constructor defined in genesis configuration:", chaincodeMap["constructor"])
                        makeGenesisError = fmt.Errorf("Invalid chaincode constructor defined in genesis configuration: %s", chaincodeMap["constructor"])
                        return
                    }
    
                    var spec protos.ChaincodeSpec
                    if constructorMap == nil {
                        genesisLogger.Debug("Genesis chaincode has no constructor.")
                        spec = protos.ChaincodeSpec{Type: protos.ChaincodeSpec_Type(protos.ChaincodeSpec_Type_value[chaincodeType]), ChaincodeID: chaincodeID}
                    } else {
    
                        ctorFunc, ctorFuncOK := constructorMap["func"].(string)
                        if !ctorFuncOK {
                            genesisLogger.Error("Invalid chaincode constructor function defined in genesis configuration:", constructorMap["func"])
                            makeGenesisError = fmt.Errorf("Invalid chaincode constructor function args defined in genesis configuration: %s", constructorMap["func"])
                            return
                        }
    
                        ctorArgs, ctorArgsOK := constructorMap["args"].([]interface{})
                        if !ctorArgsOK {
                            genesisLogger.Error("Invalid chaincode constructor args defined in genesis configuration:", constructorMap["args"])
                            makeGenesisError = fmt.Errorf("Invalid chaincode constructor args defined in genesis configuration: %s", constructorMap["args"])
                            return
                        }
    
                        genesisLogger.Debug("Genesis chaincode constructor func %s", ctorFunc)
                        genesisLogger.Debug("Genesis chaincode constructor args %s", ctorArgs)
                        var ctorArgsStringArray []string
                        for j := 0; j < len(ctorArgs); j++ {
                            ctorArgsStringArray = append(ctorArgsStringArray, ctorArgs[j].(string))
                        }
    
                        spec = protos.ChaincodeSpec{Type: protos.ChaincodeSpec_Type(protos.ChaincodeSpec_Type_value[chaincodeType]), ChaincodeID: chaincodeID, CtorMsg: &protos.ChaincodeInput{Function: ctorFunc, Args: ctorArgsStringArray}}
                    }
    
                    transaction, _, deployErr := DeployLocal(context.Background(), &spec)
                    if deployErr != nil {
                        genesisLogger.Error("Error deploying chaincode for genesis block.", deployErr)
                        makeGenesisError = deployErr
                        return
                    }
    
                    genesisTransactions = append(genesisTransactions, transaction)
    
                }//for
    
            }//else
    
            genesisLogger.Info("Adding %d system chaincodes to the genesis block.", len(genesisTransactions))
            ledger.CommitTxBatch(0, genesisTransactions, nil, nil)
    
        })
        return makeGenesisError
    }
    

    BuildLocal

    构建一个指定的chaincode码

    func BuildLocal(context context.Context, spec *protos.ChaincodeSpec) (*protos.ChaincodeDeploymentSpec, error) {
        genesisLogger.Debug("Received build request for chaincode spec: %v", spec)
        mode := viper.GetString("chaincode.chaincoderunmode")
        var codePackageBytes []byte
        if mode != chaincode.DevModeUserRunsChaincode {
            if err := openchain.CheckSpec(spec); err != nil {
                genesisLogger.Debug("check spec failed: %s", err)
                return nil, err
            }
            // 规范构建
            var err error
            codePackageBytes, err = container.GetChaincodePackageBytes(spec)
            if err != nil {
                genesisLogger.Error(fmt.Sprintf("Error getting VM: %s", err))
                return nil, err
            }
        }
        chaincodeDeploymentSpec := &protos.ChaincodeDeploymentSpec{ChaincodeSpec: spec, CodePackage: codePackageBytes}
        return chaincodeDeploymentSpec, nil
    }

    DeployLocal

    部署供应链代码的映像到本地端

    func DeployLocal(ctx context.Context, spec *protos.ChaincodeSpec) (*protos.Transaction, []byte, error) {
        // 首先建立并得到部署规范
        chaincodeDeploymentSpec, err := BuildLocal(ctx, spec)
    
        if err != nil {
            genesisLogger.Error(fmt.Sprintf("Error deploying chaincode spec: %v
    
     error: %s", spec, err))
            return nil, nil, err
        }
    
        transaction, err := protos.NewChaincodeDeployTransaction(chaincodeDeploymentSpec, chaincodeDeploymentSpec.ChaincodeSpec.ChaincodeID.Name)
        if err != nil {
            return nil, nil, fmt.Errorf("Error deploying chaincode: %s ", err)
        }
        //chaincode.NewChaincodeSupport(chaincode.DefaultChain, peer.GetPeerEndpoint, false, 120000)
        // secHelper设置在ChaincodeSupport创建期间,因此我们不需要这一步
        //ctx = context.WithValue(ctx, "security", secCxt)
        result, err := chaincode.Execute(ctx, chaincode.GetChain(chaincode.DefaultChain), transaction)
        return transaction, result, err
    }
    

    设置是否部署系统chaincode

    func deploySystemChaincodeEnabled() bool {
        // 如果系统chaincode的部署配置文件中能够返回所配置的值
        if viper.IsSet("ledger.blockchain.deploy-system-chaincode") {
            return viper.GetBool("ledger.blockchain.deploy-system-chaincode")
        }
    
        // 如果没有指定配置能够启用,系统chaincode将采用默认情况部署
        return true
    } 
    

    deployUpdateValidityPeriodChaincode

    部署更新chaincode的有效期

    func deployUpdateValidityPeriodChaincode() (*protos.Transaction, error) {
        //它应该是可配置的,不采取硬编码
        vpChaincodePath := "github.com/openblockchain/obc-peer/openchain/system_chaincode/validity_period_update"
        vpFunction := "init"
    
        //这应该是负责有效期更新的组件的登录凭证。
        //该组件需要在系统中注册,以便能够调用更新chaincode的有效期
        vpToken := "system_chaincode_invoker"
    
        var vpCtorArgsStringArray []string
    
        validityPeriodSpec := &protos.ChaincodeSpec{Type: protos.ChaincodeSpec_GOLANG,
            ChaincodeID: &protos.ChaincodeID{Path: vpChaincodePath,
                Name: "",
            },
            CtorMsg: &protos.ChaincodeInput{Function: vpFunction,
                Args: vpCtorArgsStringArray,
            },
        }
    
        validityPeriodSpec.SecureContext = string(vpToken)
    
        vpTransaction, _, deployErr := DeployLocal(context.Background(), validityPeriodSpec)
    
        if deployErr != nil {
            genesisLogger.Error("Error deploying validity period chaincode for genesis block.", deployErr)
            makeGenesisError = deployErr
            return nil, deployErr
        }
    
        return vpTransaction, nil
    } 

    util

    EncodeOrderPreservingVarUint64

    返回一个字节表示要的int64数使得起始字节全零比特,以减少阵列的长度被修整,用于保存在一个缺省字节对比的顺序,第一个字节包含剩余的第一字节的bytes。存在的数量也允许使用返回的字节作为其它较大字节阵列的一部分,如以数据库复合键表示

    func EncodeOrderPreservingVarUint64(number uint64) []byte {
        bytes := make([]byte, 8)
        binary.BigEndian.PutUint64(bytes, number)
        startingIndex := 0
        size := 0
        for i, b := range bytes {
            if b != 0x00 {
                startingIndex = i
                size = 8 - i
                break
            }
        }
        sizeBytes := proto.EncodeVarint(uint64(size))
        if len(sizeBytes) > 1 {
            panic(fmt.Errorf("[]sizeBytes should not be more than one byte because the max number it needs to hold is 8. size=%d", size))
        }
        encodedBytes := make([]byte, size+1)
        encodedBytes[0] = sizeBytes[0]
        copy(encodedBytes[1:], bytes[startingIndex:])
        return encodedBytes
    }
    

    DecodeOrderPreservingVarUint64

    解码从由方法“EncodeOrderPreservingVarUint64’得到的字节数。
    此外,返回在该过程中所消耗的字节数

    func DecodeOrderPreservingVarUint64(bytes []byte) (uint64, int) {
        s, _ := proto.DecodeVarint(bytes)
        size := int(s)
        decodedBytes := make([]byte, 8)
        copy(decodedBytes[8-size:], bytes[1:size+1])
        numBytesConsumed := size + 1
        return binary.BigEndian.Uint64(decodedBytes), numBytesConsumed
    }

    buckettree

    bucket_hash

    addNextNode

    这个方法假定数据节点都按键的增加顺序添加

    func (c *bucketHashCalculator) addNextNode(dataNode *dataNode) {
        chaincodeID, _ := dataNode.getKeyElements()
        if chaincodeID != c.currentChaincodeID {
            c.appendCurrentChaincodeData()
            c.currentChaincodeID = chaincodeID
            c.dataNodes = nil
        }
        c.dataNodes = append(c.dataNodes, dataNode)
    }

    computeCryptoHash

    计算加密哈希

    func (c *bucketHashCalculator) computeCryptoHash() []byte {
        if c.currentChaincodeID != "" {
            c.appendCurrentChaincodeData()
            c.currentChaincodeID = ""
            c.dataNodes = nil
        }
        logger.Debug("Hashable content for bucket [%s]: length=%d, contentInStringForm=[%s]", c.bucketKey, len(c.hashingData), string(c.hashingData))
        if util.IsNil(c.hashingData) {
            return nil
        }
        return openchainUtil.ComputeCryptoHash(c.hashingData)
    }

    appendCurrentChaincodeData

    添加当前chaincode数据

    func (c *bucketHashCalculator) appendCurrentChaincodeData() {
        if c.currentChaincodeID == "" {
            return
        }
        c.appendSizeAndData([]byte(c.currentChaincodeID))
        c.appendSize(len(c.dataNodes))
        for _, dataNode := range c.dataNodes {
            _, key := dataNode.getKeyElements()
            value := dataNode.getValue()
            c.appendSizeAndData([]byte(key))
            c.appendSizeAndData(value)
        }
    }

    appendSizeAndData

    添加数据和容量

    func (c *bucketHashCalculator) appendSizeAndData(b []byte) {
        c.appendSize(len(b))
        c.hashingData = append(c.hashingData, b...)
    }

    appendSize

    增加容量

    func (c *bucketHashCalculator) appendSize(size int) {
        c.hashingData = append(c.hashingData, proto.EncodeVarint(uint64(size))...)
    }
    

    bucket_key

    bucket key的结构如下

    type bucketKey struct {
        level        int  //级别
        bucketNumber int  //bucket号
    }

    newBucketKey

    当level为0,bucketNumber为1时,构造bucket树根节点;
    当level为bucketKey.level-1, bucketNumber为conf.computeParentBucketNumber(bucketKey.bucketNumber)
    时构建的是父节点的bucketkey

    func newBucketKey(level int, bucketNumber int) *bucketKey {
        if level > conf.getLowestLevel() || level < 0 {
            panic(fmt.Errorf("Invalid Level [%d] for bucket key. Level can be between 0 and [%d]", level, conf.lowestLevel))
            //如果级别大于最低级别或者级别小于0,则输出当前级别以及最小级别
        }
    //如果bucket号小于1或者大于bucket级别对应的级别好,则返回bucketkey的级别和级别号
        if bucketNumber < 1 || bucketNumber > conf.getNumBuckets(level) {
            panic(fmt.Errorf("Invalid bucket number [%d]. Bucket nuber at level [%d] can be between 1 and [%d]", bucketNumber, level, conf.getNumBuckets(level)))
        }
        return &bucketKey{level, bucketNumber}
    }

    getChildIndex

    获取子节点的索引

    func (bucketKey *bucketKey) getChildIndex(childKey *bucketKey) int {
        bucketNumberOfFirstChild := ((bucketKey.bucketNumber - 1) * conf.getMaxGroupingAtEachLevel()) + 1
        bucketNumberOfLastChild := bucketKey.bucketNumber * conf.getMaxGroupingAtEachLevel()
        if childKey.bucketNumber < bucketNumberOfFirstChild || childKey.bucketNumber > bucketNumberOfLastChild {
            panic(fmt.Errorf("[%#v] is not a valid child bucket of [%#v]", childKey, bucketKey))
        }
        return childKey.bucketNumber - bucketNumberOfFirstChild
    }
    

    bucket_node

    bucketnode的结构如下

    type bucketNode struct {
        bucketKey          *bucketKey 
        childrenCryptoHash [][]byte //子节点的加密哈希
        childrenUpdated    []bool  //子节点更新
        markedForDeletion  bool  //删除标记
    }
    

    unmarshalBucketNode

    重组bucketnode

    func unmarshalBucketNode(bucketKey *bucketKey, serializedBytes []byte) *bucketNode {
        bucketNode := newBucketNode(bucketKey)
        buffer := proto.NewBuffer(serializedBytes)
        for i := 0; i < conf.getMaxGroupingAtEachLevel(); i++ {
            childCryptoHash, err := buffer.DecodeRawBytes(false)
            if err != nil {
                panic(fmt.Errorf("this error should not occur: %s", err))
            }
            if !util.IsNil(childCryptoHash) {
                bucketNode.childrenCryptoHash[i] = childCryptoHash
            }
        }
        return bucketNode
    }

    mergeBucketNode

    合并bucket节点

    func (bucketNode *bucketNode) mergeBucketNode(anotherBucketNode *bucketNode) {
        if !bucketNode.bucketKey.equals(anotherBucketNode.bucketKey) {
            panic(fmt.Errorf("Nodes with different keys can not be merged. BaseKey=[%#v], MergeKey=[%#v]", bucketNode.bucketKey, anotherBucketNode.bucketKey))
        }
        for i, childCryptoHash := range anotherBucketNode.childrenCryptoHash {
            if !bucketNode.childrenUpdated[i] && util.IsNil(bucketNode.childrenCryptoHash[i]) {
                bucketNode.childrenCryptoHash[i] = childCryptoHash
            }
        }
    }

    bucket_tree_delta

    包含的功能比较少,直接上代码

    //创建bucket树增量
    func newBucketTreeDelta() *bucketTreeDelta {
        return &bucketTreeDelta{make(map[int]byBucketNumber)}
    }
    //获取或者创建Bucket节点
    func (bucketTreeDelta *bucketTreeDelta) getOrCreateBucketNode(bucketKey *bucketKey) *bucketNode {
        byBucketNumber := bucketTreeDelta.byLevel[bucketKey.level]
        if byBucketNumber == nil {
            byBucketNumber = make(map[int]*bucketNode)
            bucketTreeDelta.byLevel[bucketKey.level] = byBucketNumber
        }
        bucketNode := byBucketNumber[bucketKey.bucketNumber]
        if bucketNode == nil {
            bucketNode = newBucketNode(bucketKey)
            byBucketNumber[bucketKey.bucketNumber] = bucketNode
        }
        return bucketNode
    }
    
    //获取某一级别下的bucket节点
    func (bucketTreeDelta *bucketTreeDelta) getBucketNodesAt(level int) []*bucketNode {
        bucketNodes := []*bucketNode{}
        byBucketNumber := bucketTreeDelta.byLevel[level]
        if byBucketNumber == nil {
            return nil
        }
        for _, bucketNode := range byBucketNumber {
            bucketNodes = append(bucketNodes, bucketNode)
        }
        return bucketNodes
    }
    //获取根节点
    func (bucketTreeDelta *bucketTreeDelta) getRootNode() *bucketNode {
        bucketNodes := bucketTreeDelta.getBucketNodesAt(0)
        if bucketNodes == nil || len(bucketNodes) == 0 {
            panic("This method should be called after processing is completed (i.e., the root node has been created)")
        }
        return bucketNodes[0]
    }
    

    config

    计算父节点的bucket数量

    func (config *config) computeParentBucketNumber(bucketNumber int) int {
        logger.Debug("Computing parent bucket number for bucketNumber [%d]", bucketNumber)
        parentBucketNumber := bucketNumber / config.getMaxGroupingAtEachLevel()
        if bucketNumber%config.getMaxGroupingAtEachLevel() != 0 {
            parentBucketNumber++
        }
        return parentBucketNumber
    }

    Datakey

    //创建datakey
    func newDataKey(chaincodeID string, key string) *dataKey {
        logger.Debug("Enter - newDataKey. chaincodeID=[%s], key=[%s]", chaincodeID, key)
        compositeKey := statemgmt.ConstructCompositeKey(chaincodeID, key)
        bucketHash := conf.computeBucketHash(compositeKey)
        // 添加一个,因为 - 我们开始启动bucket的数为1
        bucketNumber := int(bucketHash)%conf.getNumBucketsAtLowestLevel() + 1
        dataKey := &dataKey{newBucketKeyAtLowestLevel(bucketNumber), compositeKey}
        logger.Debug("Exit - newDataKey=[%s]", dataKey)
        return dataKey
    }
    //最小化DataKey可能的字节
    func minimumPossibleDataKeyBytesFor(bucketKey *bucketKey) []byte {
        min := encodeBucketNumber(bucketKey.bucketNumber)
        min = append(min, byte(0))
        return min
    }
    
    func minimumPossibleDataKeyBytes(bucketNumber int, chaincodeID string, key string) []byte {
        b := encodeBucketNumber(bucketNumber)
        b = append(b, statemgmt.ConstructCompositeKey(chaincodeID, key)...)
        return b
    }

    data_nodes_delta

    newDataNodesDelta

    创建datanode增量

    func newDataNodesDelta(stateDelta *statemgmt.StateDelta) *dataNodesDelta {
        dataNodesDelta := &dataNodesDelta{make(map[bucketKey]dataNodes)}
        chaincodeIDs := stateDelta.GetUpdatedChaincodeIds(false)
        for _, chaincodeID := range chaincodeIDs {
            updates := stateDelta.GetUpdates(chaincodeID)
            for key, updatedValue := range updates {
                if stateDelta.RollBackwards {
                    dataNodesDelta.add(chaincodeID, key, updatedValue.GetPreviousValue())
                } else {
                    dataNodesDelta.add(chaincodeID, key, updatedValue.GetValue())
                }
            }
        }
        for _, dataNodes := range dataNodesDelta.byBucket {
            sort.Sort(dataNodes)
        }
        return dataNodesDelta
    }

    getAffectedBuckets

    获取受到影响的buckets

    func (dataNodesDelta *dataNodesDelta) getAffectedBuckets() []*bucketKey {
        changedBuckets := []*bucketKey{}
        for bucketKey := range dataNodesDelta.byBucket {
            copyOfBucketKey := bucketKey.clone()
            logger.Debug("Adding changed bucket [%s]", copyOfBucketKey)
            changedBuckets = append(changedBuckets, copyOfBucketKey)
        }
        logger.Debug("Changed buckets are = [%s]", changedBuckets)
        return changedBuckets
    }

    range_scan_iterator

    RangeScanIterator实现了 ‘statemgmt.RangeScanIterator’接口

    type RangeScanIterator struct {
        dbItr               *gorocksdb.Iterator
        chaincodeID         string
        startKey            string
        endKey              string
        currentBucketNumber int
        currentKey          string
        currentValue        []byte
        done                bool
    }

    这是其中接口实现的一些细节

    func (itr *RangeScanIterator) Next() bool {
        if itr.done {
            return false
        }
    
        for itr.dbItr.Valid() {
    
            //创建键 - 值字节的副本,因为潜在的键值字节由UTR重用。关闭时没有必要为迭代器释放内存而释放切片。
            keyBytes := statemgmt.Copy(itr.dbItr.Key().Data())
            valueBytes := statemgmt.Copy(itr.dbItr.Value().Data())
    
            dataNode := unmarshalDataNodeFromBytes(keyBytes, valueBytes)
            dataKey := dataNode.dataKey
            chaincodeID, key := statemgmt.DecodeCompositeKey(dataNode.getCompositeKey())
            value := dataNode.value
            logger.Debug("Evaluating data-key = %s", dataKey)
    
            bucketNumber := dataKey.bucketKey.bucketNumber
            if bucketNumber > itr.currentBucketNumber {
                itr.seekForStartKeyWithinBucket(bucketNumber)
                continue
            }
    
            if chaincodeID == itr.chaincodeID && (itr.endKey == "" || key <= itr.endKey) {
                logger.Debug("including data-key = %s", dataKey)
                itr.currentKey = key
                itr.currentValue = value
                itr.dbItr.Next()
                return true
            }
    
            itr.seekForStartKeyWithinBucket(bucketNumber + 1)
            continue
        }
        itr.done = true
        return false
    }
    

    snapshot_iterator

    //接口实现
    type StateSnapshotIterator struct {
        dbItr *gorocksdb.Iterator
    }
    //创建迭代器
    func newStateSnapshotIterator(snapshot *gorocksdb.Snapshot) (*StateSnapshotIterator, error) {
        dbItr := db.GetDBHandle().GetStateCFSnapshotIterator(snapshot)
        dbItr.Seek([]byte{0x01})
        dbItr.Prev()
        return &StateSnapshotIterator{dbItr}, nil
    }
    
    // 接口实现细节-Next
    func (snapshotItr *StateSnapshotIterator) Next() bool {
        snapshotItr.dbItr.Next()
        return snapshotItr.dbItr.Valid()
    }
    
    // 接口实现细节-GetRawKeyValue
    func (snapshotItr *StateSnapshotIterator) GetRawKeyValue() ([]byte, []byte) {
    //创建键 - 值字节的副本,因为潜在的键值字节由UTR重用。关闭时没有必要为迭代器释放内存而释放切片。
    
        keyBytes := statemgmt.Copy(snapshotItr.dbItr.Key().Data())
        valueBytes := statemgmt.Copy(snapshotItr.dbItr.Value().Data())
        dataNode := unmarshalDataNodeFromBytes(keyBytes, valueBytes)
        return dataNode.getCompositeKey(), dataNode.getValue()
    }
    
    // 接口实现细节-Close 
    func (snapshotItr *StateSnapshotIterator) Close() {
        snapshotItr.dbItr.Close()
    }
    

    state_impl

    实现了 ‘statemgmt.HashableState’接口

    NewStateImpl

    构建一个新的StateImpl

    func NewStateImpl() *StateImpl {
        return &StateImpl{}
    }

    Initialize

    状态初始化

    func (stateImpl *StateImpl) Initialize(configs map[string]interface{}) error {
        initConfig(configs)
        rootBucketNode, err := fetchBucketNodeFromDB(constructRootBucketKey())
        if err != nil {
            return err
        }
        if rootBucketNode != nil {
            stateImpl.persistedStateHash = rootBucketNode.computeCryptoHash()
            stateImpl.lastComputedCryptoHash = stateImpl.persistedStateHash
        }
        return nil
    //我们可以创建一个高速缓存,并保持所有的bucket节点预加载。
    //因为,铲斗节点不包含实际数据和最大可能的bucket是预先确定的,所述存储器需求
    //可能不是非常高,或者可以容易地控制 - 通过保持在高速缓存中选择性bucket(bucket
    //树的最可能的前面几级 - 因为,较高的bucket的水平,
    //更是将需要散列重新计算的几率)
    
    }

    PrepareWorkingSet

    准备工作集

    func (stateImpl *StateImpl) PrepareWorkingSet(stateDelta *statemgmt.StateDelta) error {
        logger.Debug("Enter - PrepareWorkingSet()")
        if stateDelta.IsEmpty() {
            logger.Debug("Ignoring working-set as it is empty")
            return nil
        }
        stateImpl.dataNodesDelta = newDataNodesDelta(stateDelta)
        stateImpl.bucketTreeDelta = newBucketTreeDelta()
        stateImpl.recomputeCryptoHash = true
        return nil
    }
    

    computeDataNodesCryptoHash

    计算datanodes的哈希加密计算

    func computeDataNodesCryptoHash(bucketKey *bucketKey, updatedNodes dataNodes, existingNodes dataNodes) []byte {
        logger.Debug("Computing crypto-hash for bucket [%s]. numUpdatedNodes=[%d], numExistingNodes=[%d]", bucketKey, len(updatedNodes), len(existingNodes))
        bucketHashCalculator := newBucketHashCalculator(bucketKey)
        i := 0
        j := 0
        for i < len(updatedNodes) && j < len(existingNodes) {
            updatedNode := updatedNodes[i]
            existingNode := existingNodes[j]
            c := bytes.Compare(updatedNode.dataKey.compositeKey, existingNode.dataKey.compositeKey)
            var nextNode *dataNode
            switch c {
            case -1:
                nextNode = updatedNode
                i++
            case 0:
                nextNode = updatedNode
                i++
                j++
            case 1:
                nextNode = existingNode
                j++
            }
            if !nextNode.isDelete() {
                bucketHashCalculator.addNextNode(nextNode)
            }
        }
    
        var remainingNodes dataNodes
        if i < len(updatedNodes) {
            remainingNodes = updatedNodes[i:]
        } else if j < len(existingNodes) {
            remainingNodes = existingNodes[j:]
        }
    
        for _, remainingNode := range remainingNodes {
            if !remainingNode.isDelete() {
                bucketHashCalculator.addNextNode(remainingNode)
            }
        }
        return bucketHashCalculator.computeCryptoHash()
    }
    

    state

    composite_range_scan_iterator

    包装了一个以上的潜在迭代,下面是具体实施,从第一底层迭代器开始,
    耗尽第一底层迭代后,移动到第二个潜在的迭代器。实施重复这个直到已经耗尽之前的底层迭代器此外,如果键值是找到从底层迭代器的键值被跳过任一前代的迭代器

    func (itr *CompositeRangeScanIterator) Next() bool {
        currentItrNumber := itr.currentItrNumber
        currentItr := itr.itrs[currentItrNumber]
        logger.Debug("Operating on iterator number = %d", currentItrNumber)
        keyAvailable := currentItr.Next()
        for keyAvailable {
            key, _ := currentItr.GetKeyValue()
            logger.Debug("Retrieved key = %s", key)
            skipKey := false
            for i := currentItrNumber - 1; i >= 0; i-- {
                logger.Debug("Evaluating key = %s in itr number = %d. currentItrNumber = %d", key, i, currentItrNumber)
                previousItr := itr.itrs[i]
                if previousItr.(*statemgmt.StateDeltaIterator).ContainsKey(key) {
                    skipKey = true
                    break
                }
            }
            if skipKey {
                logger.Debug("Skipping key = %s", key)
                keyAvailable = currentItr.Next()
                continue
            }
            break
        }
    
        if keyAvailable || currentItrNumber == 2 {
            logger.Debug("Returning for current key")
            return keyAvailable
        }
    
        logger.Debug("Moving to next iterator")
        itr.currentItrNumber++
        return itr.Next()
    }
    

    state_snapshot

    // 按实际的实施和数据库快照封装了对状态快照的迭代
    type StateSnapshot struct {
        blockNumber  uint64
        stateImplItr statemgmt.StateSnapshotIterator
        dbSnapshot   *gorocksdb.Snapshot
    }
    
    // 创建当前块全局状态的新快照
    func newStateSnapshot(blockNumber uint64, dbSnapshot *gorocksdb.Snapshot) (*StateSnapshot, error) {
        itr, err := stateImpl.GetStateSnapshotIterator(dbSnapshot)
        if err != nil {
            return nil, err
        }
        snapshot := &StateSnapshot{blockNumber, itr, dbSnapshot}
        return snapshot, nil
    }
    
    //当您使用这个资源做这必须调用释放快照
    func (ss *StateSnapshot) Release() {
        ss.stateImplItr.Close()
        ss.dbSnapshot.Release()
    }
    
    //接下来将迭代器移动到下一个键/值对的状态
    func (ss *StateSnapshot) Next() bool {
        return ss.stateImplItr.Next()
    }
    
    //返回在当前迭代器位置的键和值的原始字节
    func (ss *StateSnapshot) GetRawKeyValue() ([]byte, []byte) {
        return ss.stateImplItr.GetRawKeyValue()
    }
    
    // 返回与此全局状态的快照相关联的块号
    func (ss *StateSnapshot) GetBlockNumber() uint64 {
        return ss.blockNumber
    }
    

    state

    构造全局状态,封装状态持久性的特定管理实现,它不是线程安全的

    NewState

    构造一个新的状态。对初始化状态的实现进行封装

    func NewState() *State {
        stateImplName := viper.GetString("ledger.state.dataStructure.name")
        stateImplConfigs := viper.GetStringMap("ledger.state.dataStructure.configs")
    
        if len(stateImplName) == 0 {
            stateImplName = detaultStateImpl
            stateImplConfigs = nil
        }
    
        switch stateImplName {
        case "buckettree":
            stateImpl = buckettree.NewStateImpl()
        case "trie":
            stateImpl = trie.NewStateTrie()
        default:
            panic(fmt.Errorf("Error during initialization of state implementation. State data structure '%s' is not valid.", stateImplName))
        }
    
        err := stateImpl.Initialize(stateImplConfigs)
        if err != nil {
            panic(fmt.Errorf("Error during initialization of state implementation: %s", err))
        }
        deltaHistorySize := viper.GetInt("ledger.state.deltaHistorySize")
        if deltaHistorySize < 0 {
            panic(fmt.Errorf("Delta history size must be greater than or equal to 0. Current value is %d.", deltaHistorySize))
        }
        return &State{stateImpl, statemgmt.NewStateDelta(), statemgmt.NewStateDelta(), "", make(map[string][]byte),
            false, uint64(deltaHistorySize)}
    }
    

    TxBegin

    标记开始新的tx。如果tx已在进行中,将调用混乱

    func (state *State) TxBegin(txUUID string) {
        logger.Debug("txBegin() for txUuid [%s]", txUUID)
        if state.txInProgress() {
            panic(fmt.Errorf("A tx [%s] is already in progress. Received call for begin of another tx [%s]", state.currentTxUUID, txUUID))
        }
        state.currentTxUUID = txUUID
    }
    

    Get

    返回chaincodeID和键的状态。如果提交为false,首先从内存中查找,如果缺失,从数据库获取。如果为true,仅仅可以从数据库中获取。

    func (state *State) Get(chaincodeID string, key string, committed bool) ([]byte, error) {
        if !committed {
            valueHolder := state.currentTxStateDelta.Get(chaincodeID, key)
            if valueHolder != nil {
                return valueHolder.GetValue(), nil
            }
            valueHolder = state.stateDelta.Get(chaincodeID, key)
            if valueHolder != nil {
                return valueHolder.GetValue(), nil
            }
        }
        return state.stateImpl.Get(chaincodeID, key)
    }

    GetRangeScanIterator

    返回一来获取所有startKey和endKey之间的键(和值)的迭代器
    对于chaincodeID(假设按照键的词汇顺序)。

    func (state *State) GetRangeScanIterator(chaincodeID string, startKey string, endKey string, committed bool) (statemgmt.RangeScanIterator, error) {
        stateImplItr, err := state.stateImpl.GetRangeScanIterator(chaincodeID, startKey, endKey)
        if err != nil {
            return nil, err
        }
    
        if committed {
            return stateImplItr, nil
        }
        return newCompositeRangeScanIterator(
            statemgmt.NewStateDeltaRangeScanIterator(state.currentTxStateDelta, chaincodeID, startKey, endKey),
            statemgmt.NewStateDeltaRangeScanIterator(state.stateDelta, chaincodeID, startKey, endKey),
            stateImplItr), nil
    }
    

    GetHash

    如果计算要应用的状态增量如果是新状态的哈希值。
    如果stateDelta已最近一次调用后,想要更改此功能只能重新计算

    func (state *State) GetHash() ([]byte, error) {
        logger.Debug("Enter - GetHash()")
        if state.updateStateImpl {
            logger.Debug("updating stateImpl with working-set")
            state.stateImpl.PrepareWorkingSet(state.stateDelta)
            state.updateStateImpl = false
        }
        hash, err := state.stateImpl.ComputeCryptoHash()
        if err != nil {
            return nil, err
        }
        logger.Debug("Exit - GetHash()")
        return hash, nil
    }

    trie

    trie,又称前缀树或字典树,是一种有序树,用于保存关联数组,其中的键通常是字符串。与二叉查找树不同,键不是直接保存在节点中,而是由节点在树中的位置决定。

    TrieKey

    如下是trie key的接口定义

    type trieKeyInterface interface {
        getLevel() int //获取级别
        getParentTrieKey() trieKeyInterface //获取父 trie key
        getIndexInParent() int  //获取索引
        getEncodedBytes() []byte
    }

    newTrieKey

    创建一个trie key

    func newTrieKey(chaincodeID string, key string) *trieKey {
        compositeKey := statemgmt.ConstructCompositeKey(chaincodeID, key)
        return newTrieKeyFromCompositeKey(compositeKey)
    }
    

    newTrieKeyFromCompositeKey

    从组合键中创建trie key

    func newTrieKeyFromCompositeKey(compositeKey []byte) *trieKey {
        return &trieKey{trieKeyEncoderImpl.newTrieKey(compositeKey)}
    }

    getIndexInParent

    获取父triekey的索引

    
    func (key *trieKey) getIndexInParent() int {
        if key.isRootKey() {
            panic(fmt.Errorf("Parent for Trie root shoould not be asked for"))
        }
        return key.trieKeyImpl.getIndexInParent()
    }

    getParentTrieKey

    获取父 trie key

    func (key *trieKey) getParentTrieKey() *trieKey {
        if key.isRootKey() {
            panic(fmt.Errorf("Parent for Trie root shoould not be asked for"))
        }
        return &trieKey{key.trieKeyImpl.getParentTrieKey()}
    }

    getEncodedBytes

    获得字节编码,如果字节编码为0,代表为根的键值

    func (key *trieKey) getEncodedBytes() []byte {
        return key.trieKeyImpl.getEncodedBytes()
    }

    assertIsChildOf

    断言是否为孩子节点的trie key

    func (key *trieKey) assertIsChildOf(parentTrieKey *trieKey) {
        if !bytes.Equal(key.getParentTrieKey().getEncodedBytes(), parentTrieKey.getEncodedBytes()) {
            panic(fmt.Errorf("trie key [%s] is not a child of trie key [%s]", key, parentTrieKey))
        }
    }
    

    trie_node

    trienode的结构如下

    type trieNode struct {
        trieKey              *trieKey
        value                []byte //值
        childrenCryptoHashes map[int][]byte//孩子节点的哈希加密,key为int,value为byte
    
        valueUpdated                bool //值是否更新
        childrenCryptoHashesUpdated map[int]bool//是否产生新的哈希加密
        markedForDeletion           bool //节店删除状态标记
    }

    setChildCryptoHash

    设置孩子节点加密哈希

    func (trieNode *trieNode) setChildCryptoHash(index int, childCryptoHash []byte) {
        if index >= trieKeyEncoderImpl.getMaxTrieWidth() {
            panic(fmt.Errorf("Index for child crypto-hash cannot be greater than [%d]. Tried to access index value [%d]", trieKeyEncoderImpl.getMaxTrieWidth(), index))
        }
        if childCryptoHash != nil {
            trieNode.childrenCryptoHashes[index] = childCryptoHash
        }
        trieNode.childrenCryptoHashesUpdated[index] = true
    }
    

    mergeMissingAttributesFrom

    合并丢失属性

    func (trieNode *trieNode) mergeMissingAttributesFrom(dbTrieNode *trieNode) {
        stateTrieLogger.Debug("Enter mergeMissingAttributesFrom() baseNode=[%s], mergeNode=[%s]", trieNode, dbTrieNode)
        if !trieNode.valueUpdated {
            trieNode.value = dbTrieNode.value
        }
        for k, v := range dbTrieNode.childrenCryptoHashes {
            if !trieNode.childrenCryptoHashesUpdated[k] {
                trieNode.childrenCryptoHashes[k] = v
            }
        }
        stateTrieLogger.Debug("Exit mergeMissingAttributesFrom() mergedNode=[%s]", trieNode)
    }

    computeCryptoHash

    哈希加密计算

    func (trieNode *trieNode) computeCryptoHash() []byte {
        stateTrieLogger.Debug("Enter computeCryptoHash() for trieNode [%s]", trieNode)
        var cryptoHashContent []byte
        if trieNode.containsValue() {
            stateTrieLogger.Debug("Adding value to hash computation for trieNode [%s]", trieNode)
            key := trieNode.trieKey.getEncodedBytes()
            cryptoHashContent = append(cryptoHashContent, proto.EncodeVarint(uint64(len(key)))...)
            cryptoHashContent = append(cryptoHashContent, key...)
            cryptoHashContent = append(cryptoHashContent, trieNode.value...)
        }
    
        sortedChildrenIndexes := trieNode.getSortedChildrenIndex()
        for _, index := range sortedChildrenIndexes {
            childCryptoHash := trieNode.childrenCryptoHashes[index]
            stateTrieLogger.Debug("Adding hash [%#v] for child number [%d] to hash computation for trieNode [%s]", childCryptoHash, index, trieNode)
            cryptoHashContent = append(cryptoHashContent, childCryptoHash...)
        }
    
        if cryptoHashContent == nil {
            // 节点没有关联值,也没有关联孩子节点。
            stateTrieLogger.Debug("Returning nil as hash for trieNode = [%s]. Also, marking this key for deletion.", trieNode)
            trieNode.markedForDeletion = true
            return nil
        }
    
        if !trieNode.containsValue() && trieNode.getNumChildren() == 1 {
            // 节点没有关联值,并且只有一个孩子节点,传递的孩子hash丢失
            stateTrieLogger.Debug("Returning hash as of a single child for trieKey = [%s]", trieNode.trieKey)
            return cryptoHashContent
        }
    
        stateTrieLogger.Debug("Recomputing hash for trieKey = [%s]", trieNode)
        return util.ComputeCryptoHash(cryptoHashContent)
    }

    marshal

    func (trieNode *trieNode) marshal() ([]byte, error) {
        buffer := proto.NewBuffer([]byte{})
    
        // 写入值
        err := buffer.EncodeRawBytes(trieNode.value)
        if err != nil {
            return nil, err
        }
    
        numCryptoHashes := trieNode.getNumChildren()
    
        //写加密哈希数
        err = buffer.EncodeVarint(uint64(numCryptoHashes))
        if err != nil {
            return nil, err
        }
    
        if numCryptoHashes == 0 {
            return buffer.Bytes(), nil
        }
    
        for i, cryptoHash := range trieNode.childrenCryptoHashes {
            //写入加密哈希索引
            err = buffer.EncodeVarint(uint64(i))
            if err != nil {
                return nil, err
            }
            // 写入加密哈希
            err = buffer.EncodeRawBytes(cryptoHash)
            if err != nil {
                return nil, err
            }
        }
        return buffer.Bytes(), nil
    }

    getSortedChildrenIndex

    获得孩子节点排序后的索引

    func (trieNode *trieNode) getSortedChildrenIndex() []int {
        keys := make([]int, trieNode.getNumChildren())
        i := 0
        for k := range trieNode.childrenCryptoHashes {
            keys[i] = k
            i++
        }
        sort.Ints(keys)
        return keys
    }

    newTrieDelta

    创建trie的增量

    func newTrieDelta(stateDelta *statemgmt.StateDelta) *trieDelta {
        trieDelta := &trieDelta{0, make(map[int]levelDeltaMap)}
        chaincodes := stateDelta.GetUpdatedChaincodeIds(false)
        for _, chaincodeID := range chaincodes {
            updates := stateDelta.GetUpdates(chaincodeID)
            for key, updatedvalue := range updates {
                if updatedvalue.IsDelete() {
                    trieDelta.delete(chaincodeID, key)
                } else {
                    if stateDelta.RollBackwards {
                        trieDelta.set(chaincodeID, key, updatedvalue.GetPreviousValue())
                    } else {
                        trieDelta.set(chaincodeID, key, updatedvalue.GetValue())
                    }
                }
            }
        }
        return trieDelta
    }

    trie_db_helper

    从数据库中获取trie节点

    func fetchTrieNodeFromDB(key *trieKey) (*trieNode, error) {
        stateTrieLogger.Debug("Enter fetchTrieNodeFromDB() for trieKey [%s]", key)
        openchainDB := db.GetDBHandle()
        trieNodeBytes, err := openchainDB.GetFromStateCF(key.getEncodedBytes())
        if err != nil {
            stateTrieLogger.Error("Error in retrieving trie node from DB for triekey [%s]. Error:%s", key, err)
            return nil, err
        }
    
        if trieNodeBytes == nil {
            return nil, nil
        }
    
        trieNode, err := unmarshalTrieNode(key, trieNodeBytes)
        if err != nil {
            stateTrieLogger.Error("Error in unmarshalling trie node for triekey [%s]. Error:%s", key, err)
            return nil, err
        }
        stateTrieLogger.Debug("Exit fetchTrieNodeFromDB() for trieKey [%s]", key)
        return trieNode, nil
    }
    

    byteTrieKey

    func (encoder *byteTrieKeyEncoder) newTrieKey(originalBytes []byte) trieKeyInterface {
        len := len(originalBytes)
        remainingBytes := len % numBytesAtEachLevel
        //剩余字节=长度和每一个级别字节数的余数
        bytesToAppend := 0
        if remainingBytes != 0 {
            bytesToAppend = numBytesAtEachLevel - remainingBytes
        }
        for i := 0; i < bytesToAppend; i++ {
            originalBytes = append(originalBytes, byte(0))
        }
        return byteTrieKey(originalBytes)
    }

    hexTrieKey

    首先定义一个对于索引的映射,类十余bytetriekey,实现了 trieKeyInterface接口

    var charIndexMap = map[hexTrieKey]int{
        "0": 0,
        "1": 1,
        "2": 2,
        "3": 3,
        "4": 4,
        "5": 5,
        "6": 6,
        "7": 7,
        "8": 8,
        "9": 9,
        "a": 10,
        "b": 11,
        "c": 12,
        "d": 13,
        "e": 14,
        "f": 15,
    }
    
    func (encoder *hexTrieKeyEncoder) newTrieKey(originalBytes []byte) trieKeyInterface {
        return hexTrieKey(hex.EncodeToString(originalBytes))
    }
    

    range_scan_iterator

    func (itr *RangeScanIterator) Next() bool {
        if itr.done {
            return false
        }
        for ; itr.dbItr.Valid(); itr.dbItr.Next() {
    
            //使得键 - 值字节的副本,以至于使得潜在的键值字节由ITR重用。
            //关闭时没有必要为迭代器释放内存释放切片。
            trieKeyBytes := statemgmt.Copy(itr.dbItr.Key().Data())
            trieNodeBytes := statemgmt.Copy(itr.dbItr.Value().Data())
            value := unmarshalTrieNodeValue(trieNodeBytes)
            if util.IsNil(value) {
                continue
            }
    
            // 找到一个实际的键值
            currentCompositeKey := trieKeyEncoderImpl.decodeTrieKeyBytes(statemgmt.Copy(trieKeyBytes))
            currentChaincodeID, currentKey := statemgmt.DecodeCompositeKey(currentCompositeKey)
            if currentChaincodeID == itr.chaincodeID && (itr.endKey == "" || currentKey <= itr.endKey) {
                itr.currentKey = currentKey
                itr.currentValue = value
                itr.dbItr.Next()
                return true
            }
    
            // 检索指定的范围内的所有的密钥
            break
        }
        itr.done = true
        return false
    }
    

    snapshot_iterator

    和range_scan_iterator的实现方式类似

    func (snapshotItr *StateSnapshotIterator) Next() bool {
        var available bool
        for ; snapshotItr.dbItr.Valid(); snapshotItr.dbItr.Next() {
    
            trieKeyBytes := statemgmt.Copy(snapshotItr.dbItr.Key().Data())
            trieNodeBytes := statemgmt.Copy(snapshotItr.dbItr.Value().Data())
            value := unmarshalTrieNodeValue(trieNodeBytes)
            if util.NotNil(value) {
                snapshotItr.currentKey = trieKeyEncoderImpl.decodeTrieKeyBytes(statemgmt.Copy(trieKeyBytes))
                snapshotItr.currentValue = value
                available = true
                snapshotItr.dbItr.Next()
                break
            }
        }
        return available
    }

    state_trie

    结构如下

    type StateTrie struct {
        trieDelta              *trieDelta
        persistedStateHash     []byte 持久化状态哈希
        lastComputedCryptoHash []byte  最后哈希加密计算
        recomputeCryptoHash    bool 重新哈希加密计算
    }

    processChangedNode

    节点改变流程

    func (stateTrie *StateTrie) processChangedNode(changedNode *trieNode) error {
        stateTrieLogger.Debug("Enter - processChangedNode() for node [%s]", changedNode)
        dbNode, err := fetchTrieNodeFromDB(changedNode.trieKey)
        if err != nil {
            return err
        }
        if dbNode != nil {
            stateTrieLogger.Debug("processChangedNode() - merging attributes from db node [%s]", dbNode)
            changedNode.mergeMissingAttributesFrom(dbNode)
        }
        newCryptoHash := changedNode.computeCryptoHash()
        parentNode := stateTrie.trieDelta.getParentOf(changedNode)
        if parentNode == nil {
            parentNode = newTrieNode(changedNode.getParentTrieKey(), nil, false)
            stateTrie.trieDelta.addTrieNode(parentNode)
        }
        parentNode.setChildCryptoHash(changedNode.getIndexInParent(), newCryptoHash)
        if logHashOfEveryNode {
            stateTrieLogger.Debug("Hash for changedNode[%s]", changedNode)
            stateTrieLogger.Debug("%#v", newCryptoHash)
        }
        stateTrieLogger.Debug("Exit - processChangedNode() for node [%s]", changedNode)
        return nil
    }
    

    AddChangesForPersistence

    为持久化添加更改

    func (stateTrie *StateTrie) AddChangesForPersistence(writeBatch *gorocksdb.WriteBatch) error {
        if stateTrie.recomputeCryptoHash {
            _, err := stateTrie.ComputeCryptoHash()
            if err != nil {
                return err
            }
        }
    
        if stateTrie.trieDelta == nil {
            stateTrieLogger.Info("trieDelta is nil. Not writing anything to DB")
            return nil
        }
    
        openchainDB := db.GetDBHandle()
        lowestLevel := stateTrie.trieDelta.getLowestLevel()
        for level := lowestLevel; level >= 0; level-- {
            changedNodes := stateTrie.trieDelta.deltaMap[level]
            for _, changedNode := range changedNodes {
                if changedNode.markedForDeletion {
                    writeBatch.DeleteCF(openchainDB.StateCF, changedNode.trieKey.getEncodedBytes())
                    continue
                }
                serializedContent, err := changedNode.marshal()
                if err != nil {
                    return err
                }
                writeBatch.PutCF(openchainDB.StateCF, changedNode.trieKey.getEncodedBytes(), serializedContent)
            }
        }
        stateTrieLogger.Debug("Added changes to DB")
        return nil
    }

    commons

    commons位于statemgmt目录下

    // 构建复合键,并返回唯一代表一个指定的chaincodeID和 []byte字节。
    //这假定chaincodeID不包含0×00字节
    //但键值的可以强制实施的限制chaincodeID或使用长度前缀,而不是这里的分隔符
    func ConstructCompositeKey(chaincodeID string, key string) []byte {
        return bytes.Join([][]byte{[]byte(chaincodeID), []byte(key)}, stateKeyDelimiter)
    }
    
    //通过解码构建了compositeKey构造复合键的方法,返回原始chaincodeID和键形式
    func DecodeCompositeKey(compositeKey []byte) (string, string) {
        split := bytes.SplitN(compositeKey, stateKeyDelimiter, 2)
        return string(split[0]), string(split[1])
    }
    
    //返回指定字节的副本
    func Copy(src []byte) []byte {
        dest := make([]byte, len(src))
        copy(dest, src)
        return dest
    }
    

    hashable_state

    由stat实现不同的状态管理来实现接口,可以高效地为不同的工作负载条件下,计算加密哈希状态。

    type HashableState interface {
    
        //提供了一个机会来初始化。例如, 可以加载数据库的一些数据来实现state
        Initialize(configs map[string]interface{}) error
    
        // 从数据库获取值
        Get(chaincodeID string, key string) ([]byte, error)
    
        // 需要施加到状态,通过捕获需要的变化的stateDelta
        PrepareWorkingSet(stateDelta *StateDelta) error
    
        //计算状态加密哈希来实现state假设状态增量适用以PrepareWorkingSet方法传递
        ComputeCryptoHash() ([]byte, error)
    
        //添加的所有键值对,它需要为数据库持续触发statedelta(在、、//PrepareWorkingSet方法传递)。
    //除了在StateDelta中的信息,实现还可能希望
    //更快进行持久化中间结果的加密哈希计算
        AddChangesForPersistence(writeBatch *gorocksdb.WriteBatch) error
    
        // ClearWorkingSet可能会清除state实现,它可能已经建立了计算哈希加密和持续变化的状态增量的数据结构
        ClearWorkingSet(changesPersisted bool)
    
    
        GetStateSnapshotIterator(snapshot *gorocksdb.Snapshot) (StateSnapshotIterator, error)
    
        //提供一种应该给一个给定的chaincodeID使得返回键应该词法更大//所有键值迭代大于或等于startKey且小于或等于endKey。如果startKey参//数的值是假设一个空字符串startKey是在DB的chaincodeID可用最小的关//键。同样,对于endKey参数为空字符串假定endKey可用的分贝为chaincodeID//的最大键
        GetRangeScanIterator(chaincodeID string, startKey string, endKey string) (RangeScanIterator, error)
    
        //与StateDelta之前一些提示制备和PrepareWorkingSet方法通//过提供了可能。一个state的实现可以使用这个提示的预取相关数据,因此,//如果这可以提高哈希加密计算方法的性能(当被调用在以后的时间)
        PerfHintKeyChanged(chaincodeID string, key string)
    }
    
    
    

    state_delta_iterator

    state增量迭代器结构如下

    type StateDeltaIterator struct {
        updates         map[string]*UpdatedValue //更新
        relevantKeys    []string   // 关联键
        currentKeyIndex int  // 当前键的索引
        done            bool //完成标识
    }

    state_delta

    控制变现有的状态。这个结构被用于TX-batchAlso的执行期间中保持未提交的变化,以用于以块的状态转移到另一peer

    type StateDelta struct {
        ChaincodeStateDeltas map[string]*ChaincodeStateDelta
        //允许一个控制此增量是否会向前或向后回滚的状态
        RollBackwards bool
    }

    IsUpdatedValueSet

    如果更新值已经设置为给定的chaincode ID和密钥,则为true

    func (stateDelta *StateDelta) IsUpdatedValueSet(chaincodeID, key string) bool {
        chaincodeStateDelta, ok := stateDelta.ChaincodeStateDeltas[chaincodeID]
        if !ok {
            return false
        }
        if _, ok := chaincodeStateDelta.UpdatedKVs[key]; ok {
            return true
        }
        return false
    }

    ApplyChanges

    合并另一增量- 如果一个键值存在,则现有键的值被覆盖

    func (stateDelta *StateDelta) ApplyChanges(anotherStateDelta *StateDelta) {
        for chaincodeID, chaincodeStateDelta := range anotherStateDelta.ChaincodeStateDeltas {
            existingChaincodeStateDelta, existingChaincode := stateDelta.ChaincodeStateDeltas[chaincodeID]
            for key, valueHolder := range chaincodeStateDelta.UpdatedKVs {
                var previousValue []byte
                if existingChaincode {
                    existingUpdateValue, existingUpdate := existingChaincodeStateDelta.UpdatedKVs[key]
                    if existingUpdate {
                        // 现有的状态增量已经为这个键值的更新值
                        previousValue = existingUpdateValue.PreviousValue
                    } else {
                        //使用以前的值在新的状态增量的设置
                        previousValue = valueHolder.PreviousValue
                    }
                } else {
                    //使用之前值的状态增量
                    previousValue = valueHolder.PreviousValue
                }
    
                if valueHolder.IsDelete() {
                    stateDelta.Delete(chaincodeID, key, previousValue)
                } else {
                    stateDelta.Set(chaincodeID, key, valueHolder.Value, previousValue)
                }
            }
        }
    }
    

    GetUpdatedChaincodeIds

    返回在存在于chaincodeIDs的状态增量
    如果排序为true,方法按照字典顺序返回在排序之前的chaincodeIDs的排序顺序

    func (stateDelta *StateDelta) GetUpdatedChaincodeIds(sorted bool) []string {
        updatedChaincodeIds := make([]string, len(stateDelta.ChaincodeStateDeltas))
        i := 0
        for k := range stateDelta.ChaincodeStateDeltas {
            updatedChaincodeIds[i] = k
            i++
        }
        if sorted {
            sort.Strings(updatedChaincodeIds)
        }
        return updatedChaincodeIds
    }

    好了,ledger源码就分析到这里,其实仔细观察,会发现只要有一个iterrator,就会重写对应的next、getvalue方法等,各个模块之间相互协作,比较难的就是hash加密算法,以及状态增量的计算。小编接下来将对各个模块之间进行结构功能图解,敬请期待

  • 相关阅读:
    LINQ学习系列-----1.3 扩展方法
    表单重复提交的三种情况及解决办法
    JDBC的简单封装
    Java学习路线图
    成为一名Java高级工程师你需要学什么
    站在烦恼里仰望幸福
    如何发布Web项目到互联网
    用户管理的设计--2.新增用户信息实现
    MD5加密工具
    springMvc注解之@ResponseBody和@RequestBody
  • 原文地址:https://www.cnblogs.com/ainima/p/6331786.html
Copyright © 2011-2022 走看看