zoukankan      html  css  js  c++  java
  • IBM openblockchain学习(二)--chaincode源码分析

    openblockchain是IBM开源的blockchain项目,具体安装流程之前已经介绍过,具体请看http://blog.csdn.net/pangjiuzala/article/details/50897819

    解压后会发现在obc-peer根目录下出现一个main.go文件,其中主要功能是生成obc-peer命令,核心代码集中在openchain中的。接下来,将首先从chaincode代码分析开始,包含了如图下图所示的几个核心文件。



    chaincode.go

    chaincode.go位于shim包中,

    var chaincodeLogger = logging.MustGetLogger("chaincode")
    //调用go-logging中logging库的MustGetLogger函数对shim package进行记录,相当于日志文件
    //其中传递的参数为当前go文件

    MustGetLogger

    MustGetLogger位于logger.go文件中,具体代码如下

    // GetLogger 创建和返回一个基于模块名称的Logger对象
    func GetLogger(module string) (*Logger, error) {
        return &Logger{Module: module}, nil
    }
    
    // MustGetLogger 与GetLogger相似,但是当logger不能被创建时会出现
    //错乱,它简化了安全初始化全局记录器
    
    func MustGetLogger(module string) *Logger {
        logger, err := GetLogger(module)
        if err != nil {
            panic("logger: " + module + ": " + err.Error())
        }
        return logger
    }

    定义chaincode接口,它是一个供chaincode开发者需要实现标准回调接口

    type Chaincode interface {
        // run方法在每一笔交易初始化的时候被调用
        Run(stub *ChaincodeStub, function string, args []string) ([]byte, error)
        // Query函数以只读方式查询chaincode状态
        Query(stub *ChaincodeStub, function string, args []string) ([]byte, error)
    }

    Start(cc Chaincode)

    启动chaincode引导程序的入口节点

    func Start(cc Chaincode) error {
        viper.SetEnvPrefix("OPENCHAIN")
        viper.AutomaticEnv()
        replacer := strings.NewReplacer(".", "_")
        //替换前缀为openchain的文件中的.为_
        viper.SetEnvKeyReplacer(replacer)
    
        flag.StringVar(&peerAddress, "peer.address", "", "peer address")
    
        flag.Parse()
    
        chaincodeLogger.Debug("Peer address: %s", getPeerAddress())
    
        // 使用同步检验建立与client的连接 
        clientConn, err := newPeerClientConnection()
        if err != nil {
            chaincodeLogger.Error(fmt.Sprintf("Error trying to connect to local peer: %s", err))
            return fmt.Errorf("Error trying to connect to local peer: %s", err)
        }
    
        chaincodeLogger.Debug("os.Args returns: %s", os.Args)
    
        chaincodeSupportClient := pb.NewChaincodeSupportClient(clientConn)
    
        err = chatWithPeer(chaincodeSupportClient, cc)
        //启动chiancodeSuppportClient
    
        return err
    }

    getPeerAddress()

    获取peer地址

    func getPeerAddress() string {
        if peerAddress != "" {
            return peerAddress
        }
    
        if peerAddress = viper.GetString("peer.address"); peerAddress == "" {
            //假如被docker容器包含,返回主机地址
            peerAddress = "172.17.42.1:30303"
        }
    
        return peerAddress
    }

    newPeerClientConnection()

    创建peer 客户端连接

    func newPeerClientConnection() (*grpc.ClientConn, error) {
    //调用google.golang.org中grpc的ClinetConn方法
        var opts []grpc.DialOption
        if viper.GetBool("peer.tls.enabled") {
    //viper来源于github.com/spf13/viper,一个应用程序的配置系统
            var sn string
            if viper.GetString("peer.tls.server-host-override") != "" {
                sn = viper.GetString("peer.tls.server-host-override")
            }
            var creds credentials.TransportAuthenticator
            //credenetials包实现了各种支持g RPC库调用的凭证
            if viper.GetString("peer.tls.cert.file") != "" {
                var err error
                creds, err = credentials.NewClientTLSFromFile(viper.GetString("peer.tls.cert.file"), sn)
                if err != nil {
                    grpclog.Fatalf("Failed to create TLS credentials %v", err)
                }
            } else {
                creds = credentials.NewClientTLSFromCert(nil, sn)
    //NewClientTLSFromCert从客户端输入的证书中构造了TLS
            }
    //append中grpc调用的方法在grpc根目录下下clientconn.go文件中定义方//法
            opts = append(opts, grpc.WithTransportCredentials(creds))
        }
    ///WithTransportCredentials返回配置了一个连接级别的安全凭据(例//如,TLS/ SSL)的拨号操作。
        opts = append(opts, grpc.WithTimeout(1*time.Second))
    //WithTimeout返回配置客户端拨号超时连接的拨号操作
        opts = append(opts, grpc.WithBlock())
    //WithBlock返回一个拨号选项,它将持续调用一个拨号块直到底层的连接已经建立。没有这一点,在后台发生将会立即返回拨打和服务器连接。
        opts = append(opts, grpc.WithInsecure())
        conn, err := grpc.Dial(getPeerAddress(), opts...)
        if err != nil {
            return nil, err
        }
        return conn, err
    }

    chatWithPeer()

    使用peer进行通信

    func chatWithPeer(chaincodeSupportClient pb.ChaincodeSupportClient, cc Chaincode) error {
    
        // 通过peer验证创建流
        stream, err := chaincodeSupportClient.Register(context.Background())
        if err != nil {
            return fmt.Errorf("Error chatting with leader at address=%s:  %s", getPeerAddress(), err)
        }
    
        //创建传递给链码的链码存根
        //stub := &ChaincodeStub{}
    
        // Create the shim handler responsible for all control logic
        handler = newChaincodeHandler(getPeerAddress(), stream, cc)
    
        defer stream.CloseSend()
        // Send the ChaincodeID during register.
        chaincodeID := &pb.ChaincodeID{Name: viper.GetString("chaincode.id.name")}
        payload, err := proto.Marshal(chaincodeID)
        if err != nil {
            return fmt.Errorf("Error marshalling chaincodeID during chaincode registration: %s", err)
        }
        // 流寄存器
        chaincodeLogger.Debug("Registering.. sending %s", pb.ChaincodeMessage_REGISTER)
        handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTER, Payload: payload})
        waitc := make(chan struct{})
        go func() {
            defer close(waitc)
            msgAvail := make(chan *pb.ChaincodeMessage)
            var nsInfo *nextStateInfo
            var in *pb.ChaincodeMessage
            recv := true
            for {
                in = nil
                err = nil
                nsInfo = nil
                if recv {
                    recv = false
                    go func() {
                        var in2 *pb.ChaincodeMessage
                        in2, err = stream.Recv()
                        msgAvail <- in2
                    }()
                }
                select {
                case in = <-msgAvail:
                    if err == io.EOF {
                        chaincodeLogger.Debug("Received EOF, ending chaincode stream, %s", err)
                        return
                    } else if err != nil {
                        chaincodeLogger.Error(fmt.Sprintf("Received error from server: %s, ending chaincode stream", err))
                        return
                    } else if in == nil {
                        err = fmt.Errorf("Received nil message, ending chaincode stream")
                        chaincodeLogger.Debug("Received nil message, ending chaincode stream")
                        return
                    }
                    chaincodeLogger.Debug("[%s]Received message %s from shim", shortuuid(in.Uuid), in.Type.String())
                    recv = true
                case nsInfo = <-handler.nextState:
                    in = nsInfo.msg
                    if in == nil {
                        panic("nil msg")
                    }
                    chaincodeLogger.Debug("[%s]Move state message %s", shortuuid(in.Uuid), in.Type.String())
                }
    
                // 调用 FSM.handleMessage(),fsm即状态机
                err = handler.handleMessage(in)
                if err != nil {
                    err = fmt.Errorf("Error handling message: %s", err)
                    return
                }
                if nsInfo != nil && nsInfo.sendToCC {
                    chaincodeLogger.Debug("[%s]send state message %s", shortuuid(in.Uuid), in.Type.String())
                    if err = handler.serialSend(in); err != nil {
                        err = fmt.Errorf("Error sending %s: %s", in.Type.String(), err)
                        return
                    }
                }
            }
        }()
        <-waitc
        return err
    }
    

    GetState

    GetState被调用后,将从总帐中获取chaincode状态记录

    func (stub *ChaincodeStub) GetState(key string) ([]byte, error) {
        return handler.handleGetState(key, stub.UUID)

    PutState

    相对于GetState,PutState被调用后,将会把chaincode状态记录到总账中

    func (stub *ChaincodeStub) PutState(key string, value []byte) error {
        return handler.handlePutState(key, value, stub.UUID)
    }

    DelState

    从总账中删除chaincode状态记录

    func (stub *ChaincodeStub) DelState(key string) error {
        return handler.handleDelState(key, stub.UUID)
    }

    StateRangeQueryIterator

    StateRangeQueryIterator是一个迭代器,遍历一定范围内的以键值对形式记录的状态

    type StateRangeQueryIterator struct {
        handler    *Handler
        uuid       string
        response   *pb.RangeQueryStateResponse
        currentLoc int
    }

    StateRangeQueryIterator

    chaincode 调用RangeQueryState来查询一定范围内键的状态。假设startKey和endKey按词汇顺序排序,将返回一个迭代器,可用于遍历startKey和endKey之间的所有键,并且迭代器返回顺序是随机的。

    func (stub *ChaincodeStub) RangeQueryState(startKey, endKey string) (*StateRangeQueryIterator, error) {
        response, err := handler.handleRangeQueryState(startKey, endKey, stub.UUID)
        if err != nil {
            return nil, err
        }
        return &StateRangeQueryIterator{handler, stub.UUID, response, 0}, nil
    }

    HasNext()

    如果迭代器的查询范围包含附加键和值,hasnext将返回true

    func (iter *StateRangeQueryIterator) HasNext() bool {
        if iter.currentLoc < len(iter.response.KeysAndValues) || iter.response.HasMore {
            return true
        }
        return false
    }

    Next()

    Next将返回下一个在迭代器查询范围内的键和值

    func (iter *StateRangeQueryIterator) Next() (string, []byte, error) {
        if iter.currentLoc < len(iter.response.KeysAndValues) {
            keyValue := iter.response.KeysAndValues[iter.currentLoc]
            iter.currentLoc++
            return keyValue.Key, keyValue.Value, nil
        } else if !iter.response.HasMore {
            return "", nil, errors.New("No such key")
        } else {
            response, err := iter.handler.handleRangeQueryStateNext(iter.response.ID, iter.uuid)
    //返回迭代器的响应id和uuid
            if err != nil {
                return "", nil, err
            }
    
            iter.currentLoc = 0
            iter.response = response
            keyValue := iter.response.KeysAndValues[iter.currentLoc]
            iter.currentLoc++
            return keyValue.Key, keyValue.Value, nil
    
        }
    }

    Close()

    当读取过程完成从迭代器中释放资源后,调用Close函数关闭范围查询迭代器

    func (iter *StateRangeQueryIterator) Close() error {
        _, err := iter.handler.handleRangeQueryStateClose(iter.response.ID, iter.uuid)
        return err
    }

    InvokeChaincode()

    通过一个chaincode调用中执行对另一个chaincode的调用

    func (stub *ChaincodeStub) InvokeChaincode(chaincodeName string, function string, args []string) ([]byte, error) {
        return handler.handleInvokeChaincode(chaincodeName, function, args, stub.UUID)
    }

    QueryChaincode

    当一个chaincode调用中执行对另一个chaincode的查询操作时调用
    QueryChaincode

    
    func (stub *ChaincodeStub) QueryChaincode(chaincodeName string, function string, args []string) ([]byte, error) {
        return handler.handleQueryChaincode(chaincodeName, function, args, stub.UUID)
    }

    GetRows

    接下来的方法是对数据库建表以及查询的一些功能函数,这里就不一一赘述,以GetRows方法为例介绍

    GetRows返回基于部分key的行,例如 ,下给出下表
    | A | B | C | D |
    当A,C和D是key的时候,GetRow方法可以被|A,C|调用,来返回所有包含A,C以及其他例如D的值的行。当然,只包含A时也可以获取C以及D的值

    func (stub *ChaincodeStub) GetRows(tableName string, key []Column) (<-chan Row, error) {
    
        keyString, err := buildKeyString(tableName, key)
        if err != nil {
            return nil, err
        }
    
        iter, err := stub.RangeQueryState(keyString+"1", keyString+":")
        if err != nil {
            return nil, fmt.Errorf("Error fetching rows: %s", err)
        }
        defer iter.Close()
    
        rows := make(chan Row)
    
        go func() {
            for iter.HasNext() {
                _, rowBytes, err := iter.Next()
                if err != nil {
                    close(rows)
                }
    
                var row Row
                err = proto.Unmarshal(rowBytes, &row)
                //调用proto的Unmarshal函数,实现对数据的散集
                if err != nil {
                    close(rows)
                }
    
                rows <- row
    
            }
            close(rows)
        }()
    
        return rows, nil
    
    }

    chaincode.pb.go

    chaincode.pb.go是从chaincode.proto文件中产生的,它包含了一些顶端消息

    • 列的定义
    • 表信息
    • 行信息
    • 列信息

    ColumnDefinition

    type ColumnDefinition struct {
        Name string                `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
        Type ColumnDefinition_Type `protobuf:"varint,2,opt,name=type,enum=shim.ColumnDefinition_Type" json:"type,omitempty"`
        Key  bool                  `protobuf:"varint,3,opt,name=key" json:"key,omitempty"`
    }
    

    Table

    type Table struct {
        Name              string              `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
        //protobuf以bytes为单位,opt未操作
        //json中传递的是表名,可以忽略空表情况
        ColumnDefinitions []*ColumnDefinition `protobuf:"bytes,2,rep,name=columnDefinitions" json:"columnDefinitions,omitempty"`
    }

    Column

    type Column struct {
        // Types that are valid to be assigned to Value:
        //  *Column_String_
        //  *Column_Int32
        //  *Column_Int64
        //  *Column_Uint32
        //  *Column_Uint64
        //  *Column_Bytes
        //  *Column_Bool
        Value isColumn_Value `protobuf_oneof:"value"`
    }

    Row

    type Row struct {
        Columns []*Column `protobuf:"bytes,1,rep,name=columns" json:"columns,omitempty"`
    }
    

    shim包下handler.go

    Handler Struct

    Handler实现shim包中的一面

    
    type Handler struct {
        sync.RWMutex
        // RWMutex提供了四个方法:
    
    // func (*RWMutex) Lock  写锁定
    
    // func (*RWMutex) Unlock  写解锁
    
    // func (*RWMutex) RLock  读锁定
    
    // func (*RWMutex) RUnlock  读解锁
        To         string
        ChatStream PeerChaincodeStream
        FSM        *fsm.FSM
        cc         Chaincode
        // 多个查询(和一个事务)具有不同的UUID可以并行为执行chaincode
        // responseChannel是其上响应由shimchaincodeStub连通的通道。
        responseChannel map[string]chan pb.ChaincodeMessage
        // 跟踪哪些是的UUID交易,这是查询,以决定是否允许获取/把状态并调用chaincodeisTransaction map[string]bool
        //isTransactionkeyString类型,valuebool类型
        nextState     chan *nextStateInfo
    }

    PeerChaincodeStream接口定义了Peer和chaincode实例之间的流。

    
    type PeerChaincodeStream interface {
        Send(*pb.ChaincodeMessage) error
        Recv() (*pb.ChaincodeMessage, error)
    }

    markIsTransaction

    markIsTransaction函数标志着UUID作为交易或查询,
    为true的时候代表交易,为false的时候代表查询

    func (handler *Handler) markIsTransaction(uuid string, isTrans bool) bool {
        if handler.isTransaction == nil {
            return false
        }
        handler.Lock()
        defer handler.Unlock()
        handler.isTransaction[uuid] = isTrans
        return true
    }

    exectransaction.go

    Execute

    执行交易或查询

    func Execute(ctxt context.Context, chain *ChaincodeSupport, t *pb.Transaction) ([]byte, error) {
        var err error
    
        // 得到一个处理账本至标记TX的开始/结束
        ledger, ledgerErr := ledger.GetLedger()
        if ledgerErr != nil {
            return nil, fmt.Errorf("Failed to get handle to ledger (%s)", ledgerErr)
        }
    
        if secHelper := chain.getSecHelper(); nil != secHelper {
            var err error
            t, err = secHelper.TransactionPreExecution(t)
            // 注意,t被现在解密并且是原始输入t的深克隆
            if nil != err {
                return nil, err
            }
        }
    
        if t.Type == pb.Transaction_CHAINCODE_NEW {
            _, err := chain.DeployChaincode(ctxt, t)
            if err != nil {
                return nil, fmt.Errorf("Failed to deploy chaincode spec(%s)", err)
            }
    
            //启动并等待准备就绪
            markTxBegin(ledger, t)
            _, _, err = chain.LaunchChaincode(ctxt, t)
            if err != nil {
                markTxFinish(ledger, t, false)
                return nil, fmt.Errorf("%s", err)
            }
            markTxFinish(ledger, t, true)
        } else if t.Type == pb.Transaction_CHAINCODE_EXECUTE || t.Type == pb.Transaction_CHAINCODE_QUERY {
            //将发动(如有必要,并等待就绪)
            cID, cMsg, err := chain.LaunchChaincode(ctxt, t)
            if err != nil {
                return nil, fmt.Errorf("Failed to launch chaincode spec(%s)", err)
            }
    
            //这里应该生效,因为它上面的生效...
            chaincode := cID.Name
    
            if err != nil {
                return nil, fmt.Errorf("Failed to stablish stream to container %s", chaincode)
            }
    
            // 当getTimeout调用被创建的事务块需要注释下一行,并取消注释
            timeout := time.Duration(30000) * time.Millisecond
            //timeout, err := getTimeout(cID)
    
            if err != nil {
                return nil, fmt.Errorf("Failed to retrieve chaincode spec(%s)", err)
            }
    
            var ccMsg *pb.ChaincodeMessage
            if t.Type == pb.Transaction_CHAINCODE_EXECUTE {
                ccMsg, err = createTransactionMessage(t.Uuid, cMsg)
                if err != nil {
                    return nil, fmt.Errorf("Failed to transaction message(%s)", err)
                }
            } else {
                ccMsg, err = createQueryMessage(t.Uuid, cMsg)
                if err != nil {
                    return nil, fmt.Errorf("Failed to query message(%s)", err)
                }
            }
    
            markTxBegin(ledger, t)
            resp, err := chain.Execute(ctxt, chaincode, ccMsg, timeout, t)
            if err != nil {
                // 交易回滚
                markTxFinish(ledger, t, false)
                return nil, fmt.Errorf("Failed to execute transaction or query(%s)", err)
            } else if resp == nil {
                // 交易回滚
                markTxFinish(ledger, t, false)
                return nil, fmt.Errorf("Failed to receive a response for (%s)", t.Uuid)
            } else {
                if resp.Type == pb.ChaincodeMessage_COMPLETED || resp.Type == pb.ChaincodeMessage_QUERY_COMPLETED {
                    // Success
                    markTxFinish(ledger, t, true)
                    return resp.Payload, nil
                } else if resp.Type == pb.ChaincodeMessage_ERROR || resp.Type == pb.ChaincodeMessage_QUERY_ERROR {
                    // Rollback transaction
                    markTxFinish(ledger, t, false)
                    return nil, fmt.Errorf("Transaction or query returned with failure: %s", string(resp.Payload))
                }
                markTxFinish(ledger, t, false)
                return resp.Payload, fmt.Errorf("receive a response for (%s) but in invalid state(%d)", t.Uuid, resp.Type)
            }
    
        } else {
            err = fmt.Errorf("Invalid transaction type %s", t.Type.String())
        }
        return nil, err
    }
    

    ExecuteTransactions

    由一个执行数组中的一个上的交易将返回错误阵列之一的每个交易。如果执行成功,数组元素将是零。返回状态的哈希值

    func ExecuteTransactions(ctxt context.Context, cname ChainName, xacts []*pb.Transaction) ([]byte, []error) {
        var chain = GetChain(cname)
        if chain == nil {
            // 我们不应该到调到这里来,但在其他方面一个很好的提醒,以更好地处理
            panic(fmt.Sprintf("[ExecuteTransactions]Chain %s not found
    ", cname))
        }
        errs := make([]error, len(xacts)+1)
        for i, t := range xacts {
            _, errs[i] = Execute(ctxt, chain, t)
        }
        ledger, hasherr := ledger.GetLedger()
        var statehash []byte
        if hasherr == nil {
            statehash, hasherr = ledger.GetTempStateHash()
        }
        errs[len(errs)-1] = hasherr
        return statehash, errs
    }

    chaincode下handler.go

    //负责处理对Peer's 侧的chaincode流的管理
    
    type Handler struct {
        sync.RWMutex
        ChatStream  PeerChaincodeStream
        FSM         *fsm.FSM
        ChaincodeID *pb.ChaincodeID
    
        //此处理管理解密部署TX的副本,没有编码
        deployTXSecContext *pb.Transaction
    
        chaincodeSupport *ChaincodeSupport
        registered       bool
        readyNotify      chan bool
        //是对TX UUID的要么调用或查询TX(解密)的映射。每个TX将被添加之前执行并完成时,执行删除操作
        txCtxs map[string]*transactionContext
    
        uuidMap map[string]bool
    
        //跟踪哪些是UUID查询的;虽然shim保持包含这个,但它不能被信任
        isTransaction map[string]bool
    
        //用来发送并确保后的状态转换完成,
        nextState chan *nextStateInfo
    }

    chaincode就分析到这里,其中有不正确的地方还望读者批评指正。接下来将分析另一个重要的部分Ledger,敬请期待哦

  • 相关阅读:
    从客户端(&)中检测到有潜在危险的 Request.Path 值。
    对访问修饰关键字public, protected, internal and private的说明
    C#综合揭秘——细说多线程(下)
    iis下配置:php+mysql
    工厂模式(Factory Patter)
    HDU 1040 As Easy As A+B [补]
    HDU 1020 Encoding
    HDU 1076 An Easy Task
    UVA 100 The 3n + 1 problem
    民大OJ 1668 追杀系列第二发
  • 原文地址:https://www.cnblogs.com/ainima/p/6331787.html
Copyright © 2011-2022 走看看