zoukankan      html  css  js  c++  java
  • 代码分析

    // dbft算法返回下一个出块节点,如此看返回的就是创世块,也就是说出块节点不变化
    func GetValidators(txs []*types.Transaction) ([]keypair.PublicKey, error) {
       // TODO implement vote
       return genesis.GenesisBookkeepers, nil
    }
    
    // 入口
    func setupAPP() *cli.App {
       app := cli.NewApp()
       app.Usage = "Ontology CLI"
       app.Action = startOntology
       app.Version = config.Version
       app.Copyright = "Copyright in 2018 The Ontology Authors"
       app.Commands = []cli.Command{  // 定义各种操作命令的入口
          cmd.AccountCommand,
          cmd.InfoCommand,
          cmd.AssetCommand,
          cmd.ContractCommand,
          cmd.ExportCommand,
       }
    
    
    // 定义command
    {
       Action:    invokeContract,
       Name:      "invoke",
       Usage:     "Invoke smart contract",
       ArgsUsage: " ",
       Flags: []cli.Flag{
          utils.RPCPortFlag,
          utils.TransactionGasPriceFlag,
          utils.TransactionGasLimitFlag,
          utils.ContractAddrFlag,
          utils.ContractParamsFlag,
          utils.ContractVersionFlag,
          utils.ContractPrepareInvokeFlag,
          utils.ContractReturnTypeFlag,
          utils.WalletFileFlag,
          utils.AccountAddressFlag,
       },
    },
    // 可以看到处理函数是:invokeContract
    func invokeContract(ctx *cli.Context) error {
       SetRpcPort(ctx)  // 设置rpc的端口,DEFAULT_RPC_PORT = uint(20336)
       // 1、检查是否传入合约地址
       // 2、解析合约地址
       // 3、解析合约invoke参数
       // 4、检查是否设置了预执行ContractPrepareInvokeFlag标记,如果设置了,则预执行
       if ctx.IsSet(utils.GetFlagName(utils.ContractPrepareInvokeFlag)) {
          preResult, err := utils.PrepareInvokeNeoVMContract(contractAddr, params) // 预执行结果
          if err != nil {
             return fmt.Errorf("PrepareInvokeNeoVMSmartContact error:%s", err)
          }
          if preResult.State == 0 {
             return fmt.Errorf("Contract invoke failed
    ")
          }
          fmt.Printf("Contract invoke successfully
    ")
          fmt.Printf("Gaslimit:%d
    ", preResult.Gas)
    
          rawReturnTypes := ctx.String(utils.GetFlagName(utils.ContractReturnTypeFlag))
          if rawReturnTypes == "" {
             fmt.Printf("  Return:%s (raw value)
    ", preResult.Result)
             return nil
          }
          values, err := utils.ParseReturnValue(preResult.Result, rawReturnTypes)
          if err != nil {
             return fmt.Errorf("parseReturnValue values:%+v types:%s error:%s", values, rawReturnTypes, err)
          }
          switch len(values) {
          case 0:
             fmt.Printf("  Return: nil
    ")
          case 1:
             fmt.Printf("  Return:%+v
    ", values[0])
          default:
             fmt.Printf("  Return:%+v
    ", values)
          }
          return nil
       }
       // 至此,预检查已经结束。。。。。。
       // 接下来的步骤是本地发起的没有预检查的网络上转发来的交易
       signer, err := cmdcom.GetAccount(ctx)  // 获取交易签名
       if err != nil {
          return fmt.Errorf("Get signer account error:%s", err)
       }
       gasPrice := ctx.Uint64(utils.GetFlagName(utils.TransactionGasPriceFlag))   // 取出参数
       gasLimit := ctx.Uint64(utils.GetFlagName(utils.TransactionGasLimitFlag))
       networkId, err := utils.GetNetworkId()  // 获取网络ID
       if err != nil {
          return err
       }
       if networkId == config.NETWORK_ID_SOLO_NET {
          gasPrice = 0
       }
    
       txHash, err := utils.InvokeNeoVMContract(gasPrice, gasLimit, signer, contractAddr, params)  // 调用VM执行交易
       if err != nil {
          return fmt.Errorf("Invoke NeoVM contract error:%s", err)
       }
    
       fmt.Printf("  TxHash:%s
    ", txHash)
       fmt.Printf("
    Tip:
    ")
       fmt.Printf("  Using './ontology info status %s' to query transaction status
    ", txHash)
       return nil
    }
    
    // 交易上下文
    // Context is a type that is passed through to
    // each Handler action in a cli application. Context
    // can be used to retrieve context-specific Args and
    // parsed command-line options.
    type Context struct {
       App           *App
       Command       Command
       shellComplete bool
       flagSet       *flag.FlagSet
       setFlags      map[string]bool
       parentContext *Context
    }
    // 预执行交易
    func PrepareInvokeNeoVMContract(
       contractAddress common.Address,
       params []interface{},
    ) (*cstates.PreExecResult, error) {
       tx, err := httpcom.NewNeovmInvokeTransaction(0, 0, contractAddress, params)  // 构造填充交易结构体
       if err != nil {
          return nil, err
       }
       var buffer bytes.Buffer  // 定义buffer,取出交易
       err = tx.Serialize(&buffer)
       if err != nil {
          return nil, fmt.Errorf("Serialize error:%s", err)
       }
       txData := hex.EncodeToString(buffer.Bytes())
       data, err := sendRpcRequest("sendrawtransaction", []interface{}{txData, 1})  // 发送交易请求
       if err != nil {
          return nil, err
       }
       preResult := &cstates.PreExecResult{}  // 解析交易结果
       err = json.Unmarshal(data, &preResult)
       if err != nil {
          return nil, fmt.Errorf("json.Unmarshal PreExecResult:%s error:%s", data, err)
       }
       return preResult, nil
    }
    // 组装交易
    func NewNeovmInvokeTransaction(gasPrice, gasLimit uint64, contractAddress common.Address, params []interface{}) (*types.Transaction, error) {
       invokeCode, err := BuildNeoVMInvokeCode(contractAddress, params)  // 构造参数,构造neovm所需要的参数,包括了合约地址被打包进了invokeCode中
       if err != nil {
          return nil, err
       }
       return NewSmartContractTransaction(gasPrice, gasLimit, invokeCode)
    }
    // 构造交易
    func NewSmartContractTransaction(gasPrice, gasLimit uint64, invokeCode []byte) (*types.Transaction, error) {
       invokePayload := &payload.InvokeCode{
          Code: invokeCode,
       }
       tx := &types.Transaction{
          GasPrice: gasPrice,
          GasLimit: gasLimit,
          TxType:   types.Invoke,
          Nonce:    uint32(time.Now().Unix()),
          Payload:  invokePayload,   // 存放交易参数
          Sigs:     make([]*types.Sig, 0, 0), // 签名信息,见下面介绍:
       }
       return tx, nil
    }
    // 签名信息结构体
    type Sig struct {
       SigData [][]byte   // 签名数据
       PubKeys []keypair.PublicKey  // 公钥对
       M       uint16    // 目前不清楚,猜测是交易最终需要验证的个数,目前为0
    }
    // 发送rpc请求:注意发送地址是本地请求
    func sendRpcRequest(method string, params []interface{}) ([]byte, error) {
       ......
       // 本地请求
       addr := fmt.Sprintf("http://localhost:%d", config.DefConfig.Rpc.HttpJsonPort)
       // 读取返回结果
       resp, err := http.Post(addr, "application/json", strings.NewReader(string(data)))
       if err != nil {
          return nil, fmt.Errorf("http post request:%s error:%s", data, err)
       }
       defer resp.Body.Close()
    
       body, err := ioutil.ReadAll(resp.Body)
       if err != nil {
          return nil, fmt.Errorf("read rpc response body error:%s", err)
       }
       rpcRsp := &JsonRpcResponse{}
       err = json.Unmarshal(body, rpcRsp)
       if err != nil {
          return nil, fmt.Errorf("json.Unmarshal JsonRpcResponse:%s error:%s", body, err)
       }
       if rpcRsp.Error != 0 {
          return nil, fmt.Errorf("error code:%d desc:%s", rpcRsp.Error, rpcRsp.Desc)
       }
       return rpcRsp.Result, nil
    }
    // 接下来看交易预执行的流程
    // 接收函数通过如下注册:在startOntology时候调用initRpc启动StartRPCServer
    func StartRPCServer() error {
       log.Debug()
       http.HandleFunc("/", rpc.Handle)
       ......
       rpc.HandleFunc("sendrawtransaction", rpc.SendRawTransaction)
       ......
       err := http.ListenAndServe(":"+strconv.Itoa(int(cfg.DefConfig.Rpc.HttpJsonPort)), nil)
       if err != nil {
          return fmt.Errorf("ListenAndServe error:%s", err)
       }
       return nil
    }
    // 预执行交易
    //send raw transaction
    // A JSON example for sendrawtransaction method as following:
    //   {"jsonrpc": "2.0", "method": "sendrawtransaction", "params": ["raw transactioin in hex"], "id": 0}  接收到的RPC参数
    func SendRawTransaction(params []interface{}) map[string]interface{} {
       if len(params) < 1 {
          return responsePack(berr.INVALID_PARAMS, nil)
       }
       var hash common.Uint256
       switch params[0].(type) {
       case string:
          str := params[0].(string)
          hex, err := common.HexToBytes(str)
          if err != nil {
             return responsePack(berr.INVALID_PARAMS, "")
          }
          var txn types.Transaction
          if err := txn.Deserialize(bytes.NewReader(hex)); err != nil {
             return responsePack(berr.INVALID_TRANSACTION, "")
          }
          hash = txn.Hash()
          log.Debugf("SendRawTransaction recv %s", hash.ToHexString())
          if txn.TxType == types.Invoke || txn.TxType == types.Deploy {
             if len(params) > 1 {
                preExec, ok := params[1].(float64)  // 不明白干啥的,需要看一下上面打包过来的参数都是什么东西
                if ok && preExec == 1 {
                   result, err := bactor.PreExecuteContract(&txn) // 预执行:使用的是base/actor,下面具体分析,返回执行结果
                   if err != nil {
                      log.Infof("PreExec: ", err)
                      return responsePack(berr.SMARTCODE_ERROR, "")
                   }
                   return responseSuccess(result)
                }
             }
          }
    
          log.Debugf("SendRawTransaction send to txpool %s", hash.ToHexString()) 
          if errCode, desc := bcomn.SendTxToPool(&txn); errCode != ontErrors.ErrNoError {  // 将交易发送到交易池,见后面分析
             log.Warnf("SendRawTransaction verified %s error: %s", hash.ToHexString(), desc)
             return responsePack(berr.INVALID_TRANSACTION, desc)
          }
          log.Debugf("SendRawTransaction verified %s", hash.ToHexString())  // 即验证通过了
       default:
          return responsePack(berr.INVALID_PARAMS, "")
       }
       return responseSuccess(hash.ToHexString())
    }
    
    //PreExecuteContract from ledger
    func PreExecuteContract(tx *types.Transaction) (*cstate.PreExecResult, error) {
       return ledger.DefLedger.PreExecuteContract(tx)
    }
    
    func (self *Ledger) PreExecuteContract(tx *types.Transaction) (*cstate.PreExecResult, error) {
       return self.ldgStore.PreExecuteContract(tx)
    }
    
    
    //PreExecuteContract return the result of smart contract execution without commit to store
    func (this *LedgerStoreImp) PreExecuteContract(tx *types.Transaction) (*sstate.PreExecResult, error) {
       header, err := this.GetHeaderByHeight(this.GetCurrentBlockHeight())  // 通过本地的ledger获取区块头信息,获取的是当前记录最新的区块头Current block height
       if err != nil {                                                      // 关于this的结构,可以看下面分析
          return &sstate.PreExecResult{State: event.CONTRACT_STATE_FAIL, Gas: neovm.MIN_TRANSACTION_GAS, Result: nil}, err
       }
    
       config := &smartcontract.Config{
          Time:   header.Timestamp,
          Height: header.Height,
          Tx:     tx,
       }
    
       cache := storage.NewCloneCache(this.stateStore.NewStateBatch())
       preGas, err := this.getPreGas(config, cache)
       if err != nil {
          return &sstate.PreExecResult{State: event.CONTRACT_STATE_FAIL, Gas: neovm.MIN_TRANSACTION_GAS, Result: nil}, err
       }
    
       if tx.TxType == types.Invoke {
          invoke := tx.Payload.(*payload.InvokeCode)
    
          sc := smartcontract.SmartContract{
             Config:     config,
             Store:      this,
             CloneCache: cache,
             Gas:        math.MaxUint64 - calcGasByCodeLen(len(invoke.Code), preGas[neovm.UINT_INVOKE_CODE_LEN_NAME]),
          }
    
          //start the smart contract executive function
          engine, _ := sc.NewExecuteEngine(invoke.Code)  // ?没看到执行的代码啊
          result, err := engine.Invoke()
          if err != nil {
             return &sstate.PreExecResult{State: event.CONTRACT_STATE_FAIL, Gas: neovm.MIN_TRANSACTION_GAS, Result: nil}, err  // 如果失败,返回定义的最小交易gas耗费值
          }
          gasCost := math.MaxUint64 - sc.Gas
          mixGas := neovm.MIN_TRANSACTION_GAS
          if gasCost < mixGas {
             gasCost = mixGas
          }
          return &sstate.PreExecResult{State: event.CONTRACT_STATE_SUCCESS, Gas: gasCost, Result: scommon.ConvertNeoVmTypeHexString(result)}, nil  // 返回成功的结果和交易的gas消耗
       } else if tx.TxType == types.Deploy {
          deploy := tx.Payload.(*payload.DeployCode)
          return &sstate.PreExecResult{State: event.CONTRACT_STATE_SUCCESS, Gas: preGas[neovm.CONTRACT_CREATE_NAME] + calcGasByCodeLen(len(deploy.Code), preGas[neovm.UINT_DEPLOY_CODE_LEN_NAME]), Result: nil}, nil
       } else {
          return &sstate.PreExecResult{State: event.CONTRACT_STATE_FAIL, Gas: neovm.MIN_TRANSACTION_GAS, Result: nil}, errors.NewErr("transaction type error")
       }
    }
    // 本地账本结构体
    //LedgerStoreImp is main store struct fo ledger
    type LedgerStoreImp struct {
       blockStore         *BlockStore                      //BlockStore for saving block & transaction data  块存储,本地账本存储,记录本地的块
       stateStore         *StateStore                      //StateStore for saving state data, like balance, smart contract execution result, and so on.  // 状态存储
       eventStore         *EventStore                      //EventStore for saving log those gen after smart contract executed.  // 执行日志后的事件存储
       storedIndexCount   uint32                           //record the count of have saved block index  // 已经存储的block索引
       currBlockHeight    uint32                           //Current block height   // 当前记录的最新的块高度
       currBlockHash      common.Uint256                   //Current block hash     哈希
       headerCache        map[common.Uint256]*types.Header //BlockHash => Header    块头
       headerIndex        map[uint32]common.Uint256        //Header index, Mapping header height => block hash  索引
       savingBlock        bool                             //is saving block now  是否存储
       vbftPeerInfoheader map[string]uint32                //pubInfo save pubkey,peerindex
       vbftPeerInfoblock  map[string]uint32                //pubInfo save pubkey,peerindex
       lock               sync.RWMutex
    }
    // 上面分析了交易经过了预执行,接着分析交易进入交易池
    func SendTxToPool(txn *types.Transaction) (ontErrors.ErrCode, string) {
       if errCode, desc := bactor.AppendTxToPool(txn); errCode != ontErrors.ErrNoError {
          log.Warn("TxnPool verify error:", errCode.Error())
          return errCode, desc
       }
       return ontErrors.ErrNoError, ""
    }
    //append transaction to pool to txpool actor 
    func AppendTxToPool(txn *types.Transaction) (ontErrors.ErrCode, string) {
       if DisableSyncVerifyTx {
          txReq := &tcomn.TxReq{txn, tcomn.HttpSender, nil}
          txnPid.Tell(txReq)
          return ontErrors.ErrNoError, ""
       }
       ch := make(chan *tcomn.TxResult, 1)
       txReq := &tcomn.TxReq{txn, tcomn.HttpSender, ch}
       txnPid.Tell(txReq)  // 向txnPid发送了一个req
       if msg, ok := <-ch; ok {           ? 没看明白
          return msg.Err, msg.Desc
       }
       return ontErrors.ErrUnknown, ""
    }
    // 处理req请求
    // Receive implements the actor interface
    func (ta *TxActor) Receive(context actor.Context) {
       switch msg := context.Message().(type) {
       case *actor.Started:
          log.Info("txpool-tx actor started and be ready to receive tx msg")
    
       case *actor.Stopping:
          log.Warn("txpool-tx actor stopping")
    
       case *actor.Restarting:
          log.Warn("txpool-tx actor restarting")
    
       case *tc.TxReq:
          sender := msg.Sender
    
          log.Debugf("txpool-tx actor receives tx from %v ", sender.Sender())
    
          ta.handleTransaction(sender, context.Self(), msg.Tx, msg.TxResultCh)
       ......
    // handleTransaction handles a transaction from network and http
    func (ta *TxActor) handleTransaction(sender tc.SenderType, self *actor.PID,
       txn *tx.Transaction, txResultCh chan *tc.TxResult) {
       ta.server.increaseStats(tc.RcvStats)
       if len(txn.ToArray()) > tc.MAX_TX_SIZE {  // 交易不能超过1M
          log.Debugf("handleTransaction: reject a transaction due to size over 1M")
          if sender == tc.HttpSender && txResultCh != nil {
             replyTxResult(txResultCh, txn.Hash(), errors.ErrUnknown, "size is over 1M")
          }
          return
       }
    
       if ta.server.getTransaction(txn.Hash()) != nil {   // 是否是重复的交易,注意这里是冲本地获取的已经存处理过的交易
          log.Debugf("handleTransaction: transaction %x already in the txn pool",
             txn.Hash())
    
          ta.server.increaseStats(tc.DuplicateStats)  // 记录DuplicateStats
          if sender == tc.HttpSender && txResultCh != nil {
             replyTxResult(txResultCh, txn.Hash(), errors.ErrDuplicateInput,
                fmt.Sprintf("transaction %x is already in the tx pool", txn.Hash()))
          }
       } else if ta.server.getTransactionCount() >= tc.MAX_CAPACITY {   // 交易池已满
          log.Debugf("handleTransaction: transaction pool is full for tx %x",
             txn.Hash())
    
          ta.server.increaseStats(tc.FailureStats)  // 记录FailureStats
          if sender == tc.HttpSender && txResultCh != nil {
             replyTxResult(txResultCh, txn.Hash(), errors.ErrTxPoolFull,
                "transaction pool is full")
          }
       } else {
          if _, overflow := common.SafeMul(txn.GasLimit, txn.GasPrice); overflow {  // 溢出保护
             log.Debugf("handleTransaction: gasLimit %v, gasPrice %v overflow",
                txn.GasLimit, txn.GasPrice)
             if sender == tc.HttpSender && txResultCh != nil {
                replyTxResult(txResultCh, txn.Hash(), errors.ErrUnknown,
                   fmt.Sprintf("gasLimit %d * gasPrice %d overflow",
                      txn.GasLimit, txn.GasPrice))
             }
             return
          }
          // 从配置文件读取gas的limit和price
          gasLimitConfig := config.DefConfig.Common.GasLimit
          gasPriceConfig := ta.server.getGasPrice()
          if txn.GasLimit < gasLimitConfig || txn.GasPrice < gasPriceConfig {  // 入参不满足配置文件设置
             log.Debugf("handleTransaction: invalid gasLimit %v, gasPrice %v",
                txn.GasLimit, txn.GasPrice)
             if sender == tc.HttpSender && txResultCh != nil {
                replyTxResult(txResultCh, txn.Hash(), errors.ErrUnknown,
                   fmt.Sprintf("Please input gasLimit >= %d and gasPrice >= %d",
                      gasLimitConfig, gasPriceConfig))
             }
             return
          }
    
          if txn.TxType == tx.Deploy && txn.GasLimit < neovm.CONTRACT_CREATE_GAS {  // 如果是部署合约,要求gas必须大于某个值
             log.Debugf("handleTransaction: deploy tx invalid gasLimit %v, gasPrice %v",
                txn.GasLimit, txn.GasPrice)
             if sender == tc.HttpSender && txResultCh != nil {
                replyTxResult(txResultCh, txn.Hash(), errors.ErrUnknown,
                   fmt.Sprintf("Deploy tx gaslimit should >= %d",
                      neovm.CONTRACT_CREATE_GAS))
             }
             return
          }
    
          if !ta.server.disablePreExec {   // 配置了禁用预先检查
             if ok, desc := preExecCheck(txn); !ok {
                log.Debugf("handleTransaction: preExecCheck tx %x failed", txn.Hash())
                if sender == tc.HttpSender && txResultCh != nil {
                   replyTxResult(txResultCh, txn.Hash(), errors.ErrUnknown, desc)
                }
                return
             }
             log.Debugf("handleTransaction: preExecCheck tx %x passed", txn.Hash())
          }
          <-ta.server.slots   // 写入管道
          ta.server.assignTxToWorker(txn, sender, txResultCh)  // 将tx分配给worker
       }
    }
    
    
    // assignTxToWorker assigns a new transaction to a worker by LB
    func (s *TXPoolServer) assignTxToWorker(tx *tx.Transaction,
       sender tc.SenderType, txResultCh chan *tc.TxResult) bool {
    
       if tx == nil {
          return false
       }
    
       if ok := s.setPendingTx(tx, sender, txResultCh); !ok {  // 重复交易,即work的pending队列中已经存在该交易
          s.increaseStats(tc.DuplicateStats)
          if sender == tc.HttpSender && txResultCh != nil {
             replyTxResult(txResultCh, tx.Hash(), errors.ErrDuplicateInput,
                "duplicated transaction input detected")
          }
          return false
       }
       // Add the rcvTxn to the worker
       lb := make(tc.LBSlice, len(s.workers))
       for i := 0; i < len(s.workers); i++ {
          entry := tc.LB{Size: len(s.workers[i].rcvTXCh) +
             len(s.workers[i].pendingTxList),
             WorkerID: uint8(i),
          }
          lb[i] = entry
       }
       sort.Sort(lb)  // 排序
       s.workers[lb[0].WorkerID].rcvTXCh <- tx  // 将tx放入s.workers[lb[0].WorkerID].rcvTXCh,发送一个channel
       return true
    }
    // 处理channel
    // Start is the main event loop.
    func (worker *txPoolWorker) start() {
       worker.timer = time.NewTimer(time.Second * tc.EXPIRE_INTERVAL)
       for {
          select {
          case <-worker.stopCh:
             worker.server.wg.Done()
             return
          case rcvTx, ok := <-worker.rcvTXCh:  // 处理rcvTXCh
             if ok {
                // Verify rcvTxn
                worker.verifyTx(rcvTx)
             }
          case stfTx, ok := <-worker.stfTxCh:
             if ok {
                worker.verifyStateful(stfTx)
             }
          case <-worker.timer.C:
             worker.handleTimeoutEvent()
             worker.timer.Stop()
             worker.timer.Reset(time.Second * tc.EXPIRE_INTERVAL)
          case rsp, ok := <-worker.rspCh:
             if ok {
                /* Handle the response from validator, if all of cases
                 * are verified, put it to txnPool
                 */
                worker.handleRsp(rsp)
             }
          }
       }
    }
    // 验证rcvTXCh
    // verifyTx prepares a check request and sends it to the validators.
    func (worker *txPoolWorker) verifyTx(tx *tx.Transaction) {
       if tx := worker.server.getTransaction(tx.Hash()); tx != nil {   // 交易已经存在
          log.Infof("verifyTx: transaction %x already in the txn pool",
             tx.Hash())
          worker.server.removePendingTx(tx.Hash(), errors.ErrDuplicateInput)
          return
       }
    
       if _, ok := worker.pendingTxList[tx.Hash()]; ok {   // 
          log.Infof("verifyTx: transaction %x already in the verifying process",
             tx.Hash())
          return
       }
       // Construct the request and send it to each validator server to verify
       req := &types.CheckTx{
          WorkerId: worker.workId,
          Tx:       *tx,
       }
    
       worker.sendReq2Validator(req)
    
       // Construct the pending transaction
       pt := &pendingTx{
          tx:      tx,
          req:     req,
          flag:    0,
          retries: 0,
       }
       // Add it to the pending transaction list
       worker.mu.Lock()
       worker.pendingTxList[tx.Hash()] = pt
       worker.mu.Unlock()
       // Record the time per a txn
       pt.valTime = time.Now()
    }
    
    // 过了预执行阶段,注意这段的逻辑和预执行很像
    //Invoke neo vm smart contract. if isPreExec is true, the invoke will not really execute
    func InvokeNeoVMContract(
       gasPrice,
       gasLimit uint64,
       signer *account.Account,
       smartcodeAddress common.Address,
       params []interface{}) (string, error) {
       tx, err := httpcom.NewNeovmInvokeTransaction(gasPrice, gasLimit, smartcodeAddress, params)  // 组装交易结构体
       if err != nil {
          return "", err
       }
       return InvokeSmartContract(signer, tx)  // invoke
    }
    //InvokeSmartContract is low level method to invoke contact.
    func InvokeSmartContract(signer *account.Account, tx *types.Transaction) (string, error) {
       // 为交易签名
       err := SignTransaction(signer, tx)  
       if err != nil {
          return "", fmt.Errorf("SignTransaction error:%s", err)
       }
       // 发送SendRawTransaction,这个和上面预交易一致
       txHash, err := SendRawTransaction(tx)
       if err != nil {
          return "", fmt.Errorf("SendTransaction error:%s", err)
       }
       return txHash, nil
    }

    // assignTxToWorker assigns a new transaction to a worker by LB
    func (s *TXPoolServer) assignTxToWorker(tx *tx.Transaction,
    sender tc.SenderType, txResultCh chan *tc.TxResult) bool {

    if tx == nil {
    return false
    }
    // 判断交易是否已经存在 allPendingTxs map[common.Uint256]*serverPendingTx // The txs that server is processin
       if ok := s.setPendingTx(tx, sender, txResultCh); !ok {
    s.increaseStats(tc.DuplicateStats)
    if sender == tc.HttpSender && txResultCh != nil {
    replyTxResult(txResultCh, tx.Hash(), errors.ErrDuplicateInput,
    "duplicated transaction input detected")
    }
    return false
    }
    // Add the rcvTxn to the worker 统计所有work的pendingTxList信息,排序后选择排队最少的worker
    lb := make(tc.LBSlice, len(s.workers))
    for i := 0; i < len(s.workers); i++ {
    entry := tc.LB{Size: len(s.workers[i].rcvTXCh) +
    len(s.workers[i].pendingTxList),
    WorkerID: uint8(i),
    }
    lb[i] = entry
    }
    sort.Sort(lb)
    s.workers[lb[0].WorkerID].rcvTXCh <- tx
    return true
    }

    // worker的txpool
    type txPoolWorker struct {
    mu sync.RWMutex
    workId uint8 // Worker ID
    rcvTXCh chan *tx.Transaction // The channel of receive transaction
    stfTxCh chan *tx.Transaction // The channel of txs to be re-verified stateful ? 再次验证
    rspCh chan *types.CheckResponse // The channel of verified response 验证结果返回通道
    server *TXPoolServer // The txn pool server pointer 属于哪个txnpool ?
    timer *time.Timer // The timer of reverifying
    stopCh chan bool // stop routine
    pendingTxList map[common.Uint256]*pendingTx // The transaction on the verifying process 待验证队列
    }
    // 处理验证
    // Start is the main event loop.
    func (worker *txPoolWorker) start() {
    worker.timer = time.NewTimer(time.Second * tc.EXPIRE_INTERVAL)
    for {
    select {
    case <-worker.stopCh:
    worker.server.wg.Done()
    return
    case rcvTx, ok := <-worker.rcvTXCh:
    if ok {
    // Verify rcvTxn
    worker.verifyTx(rcvTx)
    }
    ......
    }
    // 发送给validators验证
    // verifyTx prepares a check request and sends it to the validators.
    func (worker *txPoolWorker) verifyTx(tx *tx.Transaction) {
    if tx := worker.server.getTransaction(tx.Hash()); tx != nil {
    log.Infof("verifyTx: transaction %x already in the txn pool",
    tx.Hash())
    worker.server.removePendingTx(tx.Hash(), errors.ErrDuplicateInput)
    return
    }

    if _, ok := worker.pendingTxList[tx.Hash()]; ok {
    log.Infof("verifyTx: transaction %x already in the verifying process",
    tx.Hash())
    return
    }
    // Construct the request and send it to each validator server to verify 组建请求消息
    req := &types.CheckTx{
    WorkerId: worker.workId,
    Tx: *tx,
    }

    worker.sendReq2Validator(req) // 发送给验证节点进行验证

    // Construct the pending transaction
    pt := &pendingTx{
    tx: tx,
    req: req,
    flag: 0,
    retries: 0,
    }
    // Add it to the pending transaction list
    worker.mu.Lock()
    worker.pendingTxList[tx.Hash()] = pt // 记录到txpool中
    worker.mu.Unlock()
    // Record the time per a txn
    pt.valTime = time.Now()
    }
    // sendReq2Validator sends a check request to the validators
    func (worker *txPoolWorker) sendReq2Validator(req *types.CheckTx) bool {
    rspPid := worker.server.GetPID(tc.VerifyRspActor)
    if rspPid == nil {
    log.Info("sendReq2Validator: VerifyRspActor not exist")
    return false
    }

    pids := worker.server.getNextValidatorPIDs() // 默认验证节点是2个,即stateless和full,分别验证不同的部分,通过两个goroutine实现
    if pids == nil {
    return false
    }
    for _, pid := range pids {
    pid.Request(req, rspPid) // 发送给验证节点
    }

    return true
    }
    // 将交易发送给validator节点进行验证,这个是stateless节点
    func (self *validator) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *actor.Started:
    log.Info("stateless-validator: started and be ready to receive txn")
    case *actor.Stopping:
    log.Info("stateless-validator: stopping")
    case *actor.Restarting:
    log.Info("stateless-validator: restarting")
    case *actor.Stopped:
    log.Info("stateless-validator: stopped")
    case *vatypes.CheckTx:
    log.Debugf("stateless-validator receive tx %x", msg.Tx.Hash())
    sender := context.Sender()
    errCode := validation.VerifyTransaction(&msg.Tx)

    response := &vatypes.CheckResponse{
    WorkerId: msg.WorkerId,
    ErrCode: errCode,
    Hash: msg.Tx.Hash(),
    Type: self.VerifyType(),
    Height: 0,
    }

    sender.Tell(response)
    ........
    }
    // staetfull节点
    func (self *validator) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *actor.Started:
    log.Info("stateless-validator: started and be ready to receive txn")
    case *actor.Stopping:
    log.Info("stateless-validator: stopping")
    case *actor.Restarting:
    log.Info("stateless-validator: restarting")
    case *actor.Stopped:
    log.Info("stateless-validator: stopped")
    case *vatypes.CheckTx:
    log.Debugf("stateless-validator receive tx %x", msg.Tx.Hash())
    sender := context.Sender()
    errCode := validation.VerifyTransaction(&msg.Tx)

    response := &vatypes.CheckResponse{
    WorkerId: msg.WorkerId,
    ErrCode: errCode,
    Hash: msg.Tx.Hash(),
    Type: self.VerifyType(),
    Height: 0,
    }
    // 验证结果发送response
    sender.Tell(response)
    ......
    }
    // stateless验证过程:可以发现其验证没有牵涉到任何的账本数据验证,全是对tx信息的验证
    // VerifyTransaction verifys received single transaction
    func VerifyTransaction(tx *types.Transaction) ontErrors.ErrCode {
    if err := checkTransactionSignatures(tx); err != nil { // 验证签名,包括多重签名
    log.Info("transaction verify error:", err)
    return ontErrors.ErrVerifySignature
    }
    // 检查payload,逻辑还没有实现完全
    if err := checkTransactionPayload(tx); err != nil {
    log.Warn("[VerifyTransaction],", err)
    return ontErrors.ErrTransactionPayload
    }

    return ontErrors.ErrNoError
    }
    // statefull的验证需要调用底层DB查询是否是重复交易
    func (self *validator) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *actor.Started:
    log.Info("stateful-validator: started and be ready to receive txn")
    case *actor.Stopping:
    log.Info("stateful-validator: stopping")
    case *actor.Restarting:
    log.Info("stateful-validator: restarting")
    case *vatypes.CheckTx:
    log.Debugf("stateful-validator: receive tx %x", msg.Tx.Hash())
    sender := context.Sender()
    height := ledger.DefLedger.GetCurrentBlockHeight() // 当前块高度

    errCode := errors.ErrNoError
    hash := msg.Tx.Hash()

    exist, err := ledger.DefLedger.IsContainTransaction(hash) // 查询底层数据库
    if err != nil {
    log.Warn("query db error:", err)
    errCode = errors.ErrUnknown
    } else if exist {
    errCode = errors.ErrDuplicatedTx
    }

    response := &vatypes.CheckResponse{
    WorkerId: msg.WorkerId,
    Type: self.VerifyType(),
    Hash: msg.Tx.Hash(),
    Height: height,
    ErrCode: errCode,
    }

    sender.Tell(response)
    ......
    }

    // Receive implements the actor interface
    func (vpa *VerifyRspActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *actor.Started:
    log.Info("txpool-verify actor: started and be ready to receive validator's msg")

    case *actor.Stopping:
    log.Warn("txpool-verify actor: stopping")

    case *actor.Restarting:
    log.Warn("txpool-verify actor: Restarting")

    case *types.RegisterValidator:
    log.Debugf("txpool-verify actor:: validator %v connected", msg.Sender)
    vpa.server.registerValidator(msg)

    case *types.UnRegisterValidator:
    log.Debugf("txpool-verify actor:: validator %d:%v disconnected", msg.Type, msg.Id)

    vpa.server.unRegisterValidator(msg.Type, msg.Id)

    case *types.CheckResponse:
    log.Debug("txpool-verify actor:: Receives verify rsp message") // 发送验证结果

    vpa.server.assignRspToWorker(msg)

    default:
    log.Debugf("txpool-verify actor:Unknown msg %v type %v", msg, reflect.TypeOf(msg))
    }
    }
    // 即返回给workID
    // assignRspToWorker assigns a check response from the validator to
    // the correct worker.
    func (s *TXPoolServer) assignRspToWorker(rsp *types.CheckResponse) bool {

    if rsp == nil {
    return false
    }

    if rsp.WorkerId >= 0 && rsp.WorkerId < uint8(len(s.workers)) {
    s.workers[rsp.WorkerId].rspCh <- rsp // 发送到work id
    }

    if rsp.ErrCode == errors.ErrNoError {
    s.increaseStats(tc.SuccessStats)
    } else {
    s.increaseStats(tc.FailureStats)
    if rsp.Type == types.Stateless {
    s.increaseStats(tc.SigErrStats)
    } else {
    s.increaseStats(tc.StateErrStats)
    }
    }
    return true
    }
    // work处理消息
    // Start is the main event loop.
    func (worker *txPoolWorker) start() {
    worker.timer = time.NewTimer(time.Second * tc.EXPIRE_INTERVAL)
    for {
    select {
    case <-worker.stopCh:
    worker.server.wg.Done()
    return
    case rcvTx, ok := <-worker.rcvTXCh:
    if ok {
    // Verify rcvTxn
    worker.verifyTx(rcvTx)
    }
    case stfTx, ok := <-worker.stfTxCh:
    if ok {
    worker.verifyStateful(stfTx)
    }
    case <-worker.timer.C:
    worker.handleTimeoutEvent()
    worker.timer.Stop()
    worker.timer.Reset(time.Second * tc.EXPIRE_INTERVAL)
    case rsp, ok := <-worker.rspCh:
    if ok {
    /* Handle the response from validator, if all of cases
    * are verified, put it to txnPool
    */
    worker.handleRsp(rsp) // 所有都ok了,推送到本地交易池
    }
    }
    }
    }
    // the tx is valid, add it to the tx pool, or remove it from the pending
    // list
    func (worker *txPoolWorker) handleRsp(rsp *types.CheckResponse) {
    if rsp.WorkerId != worker.workId { // 1、id不对
    return
    }

    worker.mu.Lock()
    defer worker.mu.Unlock()

    pt, ok := worker.pendingTxList[rsp.Hash] // 2、不在work的pending队列
    if !ok {
    return
    }
    if rsp.ErrCode != errors.ErrNoError { // 3、验证节点未通过验证
    //Verify fail
    log.Infof("handleRsp: validator %d transaction %x invalid: %s",
    rsp.Type, rsp.Hash, rsp.ErrCode.Error())
    delete(worker.pendingTxList, rsp.Hash) // 从work的pending队列删除
    worker.server.removePendingTx(rsp.Hash, rsp.ErrCode) // 网络同步广播
    return
    }

    if tc.STATEFUL_MASK&(0x1<<rsp.Type) != 0 && rsp.Height < worker.server.getHeight() { // 4、验证节点的块高低于work节点的,再次使用statefull节点验证
    // If validator's height is less than the required one, re-validate it.
    worker.sendReq2StatefulV(pt.req)
    pt.valTime = time.Now()
    return
    }

    if pt.flag&(0x1<<rsp.Type) == 0 {
    retAttr := &tc.TXAttr{
    Height: rsp.Height,
    Type: rsp.Type,
    ErrCode: rsp.ErrCode,
    }
    pt.flag |= (0x1 << rsp.Type)
    pt.ret = append(pt.ret, retAttr)
    }

    if pt.flag&0xf == tc.VERIFY_MASK {
    worker.putTxPool(pt) // 通过验证,放入到txpool中
    delete(worker.pendingTxList, rsp.Hash) // 从pending删除
    }
    }
    // 如果是来自http的,将该交易进行广播
    // when it is handled. And if the submitter of the valid transaction
    // is from http, broadcast it to the network. Meanwhile, check if it
    // is in the block from consensus.
    func (s *TXPoolServer) removePendingTx(hash common.Uint256,
    err errors.ErrCode) {

    s.mu.Lock()

    pt, ok := s.allPendingTxs[hash]
    if !ok {
    s.mu.Unlock()
    return
    }

    if err == errors.ErrNoError && ((pt.sender == tc.HttpSender) ||
    (pt.sender == tc.NetSender && s.enableBroadcastNetTx)) {
    pid := s.GetPID(tc.NetActor)
    if pid != nil {
    pid.Tell(pt.tx)
    }
    }

    if pt.sender == tc.HttpSender && pt.ch != nil {
    replyTxResult(pt.ch, hash, err, err.Error())
    }

    delete(s.allPendingTxs, hash)

    if len(s.allPendingTxs) < tc.MAX_LIMITATION {
    select {
    case s.slots <- struct{}{}:
    default:
    log.Debug("removePendingTx: slots is full")
    }
    }

    s.mu.Unlock()

    // Check if the tx is in the pending block and
    // the pending block is verified
    s.checkPendingBlockOk(hash, err)
    }
    // 交易如池
    // putTxPool adds a valid transaction to the tx pool and removes it from
    // the pending list.
    func (worker *txPoolWorker) putTxPool(pt *pendingTx) bool {
    txEntry := &tc.TXEntry{
    Tx: pt.tx,
    Attrs: pt.ret,
    }
    worker.server.addTxList(txEntry) // 记录的是txEntry,即加入了验证的结果信息
    worker.server.removePendingTx(pt.tx.Hash(), errors.ErrNoError)
    return true
    }
    //总结一下:
    // 1、对于带有pre请求的操作,先会预检查,会执行合约代码,但不会修改状态数据库
    // 2、work先检查,如基本数据和参数的检查,但是不检查签名,通过后放入到pending队列,然后发给validator处理;work的pending放正在验证的交易,all放未经验证的交易
    // 3、stateless检查签名和payload
    // 4、statefull检查底层数据库记录的交易
    // 5、验证通过,记录,失败,广播?

    //
    按照角色不同,节点可以分为记账节点和同步节点,记账节点参与网络共识,而同步节点只同步记账节点生成的区块。
    //Bookkeepers:记账人,用来配置记账人的公钥,需要配置四个;
    //SeedList:用来配置ontology网络的种子节点。种子节点是ontology网络的链接入口,新节点加入ontology网络时,会先向种子节点请求网络相关信息。配置文件里至少需配置一个种子节点。

    func initConsensus(ctx *cli.Context, p2pPid *actor.PID, txpoolSvr *proc.TXPoolServer, acc *account.Account) (consensus.ConsensusService, error) {
    if !config.DefConfig.Consensus.EnableConsensus {
    return nil, nil
    }
    pool := txpoolSvr.GetPID(tc.TxPoolActor)

    consensusType := strings.ToLower(config.DefConfig.Genesis.ConsensusType)
    consensusService, err := consensus.NewConsensusService(consensusType, acc, pool, nil, p2pPid) // 根据type生成共识实例,包括start和halt和getpid方法
    if err != nil {
    return nil, fmt.Errorf("NewConsensusService:%s error:%s", consensusType, err)
    }
    consensusService.Start() // 共识模块启动

    netreqactor.SetConsensusPid(consensusService.GetPID())
    hserver.SetConsensusPid(consensusService.GetPID())

    log.Infof("Consensus init success")
    return consensusService, nil
    }
    // 启动dbft共识
    func (ds *DbftService) start() {
    log.Debug()
    ds.started = true

    if config.DefConfig.Genesis.DBFT.GenBlockTime > config.MIN_GEN_BLOCK_TIME {
    genesis.GenBlockTime = time.Duration(config.DefConfig.Genesis.DBFT.GenBlockTime) * time.Second
    } else {
    log.Warn("The Generate block time should be longer than 2 seconds, so set it to be default 6 seconds.")
    }

    ds.sub.Subscribe(message.TOPIC_SAVE_BLOCK_COMPLETE)

    ds.InitializeConsensus(0)
    }
    func (ds *DbftService) InitializeConsensus(viewNum byte) error {
    log.Debug("[InitializeConsensus] Start InitializeConsensus.")
    log.Debug("[InitializeConsensus] viewNum: ", viewNum)

    if viewNum == 0 {
    ds.context.Reset(ds.Account)
    } else {
    if ds.context.State.HasFlag(BlockGenerated) {
    return nil
    }
    ds.context.ChangeView(viewNum)
    }

    if ds.context.BookkeeperIndex < 0 {
    log.Info("You aren't bookkeeper")
    return nil
    }

    if ds.context.BookkeeperIndex == int(ds.context.PrimaryIndex) {

    //primary peer
    ds.context.State |= Primary
    ds.timerHeight = ds.context.Height
    ds.timeView = viewNum
    span := time.Now().Sub(ds.blockReceivedTime)
    if span > genesis.GenBlockTime {
    //TODO: double check the is the stop necessary
    ds.timer.Stop()
    ds.timer.Reset(0)
    //go ds.Timeout()
    } else {
    ds.timer.Stop()
    ds.timer.Reset(genesis.GenBlockTime - span)
    }
    } else {
    //backup peer
    ds.context.State = Backup
    ds.timerHeight = ds.context.Height
    ds.timeView = viewNum

    ds.timer.Stop()
    ds.timer.Reset(genesis.GenBlockTime << (viewNum + 1))
    }
    return nil
    }
     


  • 相关阅读:
    洛谷P3513 [POI2011]KON-Conspiracy
    柱状图 三分法+树状数组
    CF习题集三
    CF习题集二
    CF习题集一
    单调队列总结
    SP688 SAM
    lemon使用方法
    洛谷 P2403 [SDOI2010]所驼门王的宝藏 题解
    字符串学习笔记二
  • 原文地址:https://www.cnblogs.com/yunlion/p/9379165.html
Copyright © 2011-2022 走看看