// dbft算法返回下一个出块节点,如此看返回的就是创世块,也就是说出块节点不变化
func GetValidators(txs []*types.Transaction) ([]keypair.PublicKey, error) {
// TODO implement vote
return genesis.GenesisBookkeepers, nil
}
// 入口
func setupAPP() *cli.App {
app := cli.NewApp()
app.Usage = "Ontology CLI"
app.Action = startOntology
app.Version = config.Version
app.Copyright = "Copyright in 2018 The Ontology Authors"
app.Commands = []cli.Command{ // 定义各种操作命令的入口
cmd.AccountCommand,
cmd.InfoCommand,
cmd.AssetCommand,
cmd.ContractCommand,
cmd.ExportCommand,
}
// 定义command
{
Action: invokeContract,
Name: "invoke",
Usage: "Invoke smart contract",
ArgsUsage: " ",
Flags: []cli.Flag{
utils.RPCPortFlag,
utils.TransactionGasPriceFlag,
utils.TransactionGasLimitFlag,
utils.ContractAddrFlag,
utils.ContractParamsFlag,
utils.ContractVersionFlag,
utils.ContractPrepareInvokeFlag,
utils.ContractReturnTypeFlag,
utils.WalletFileFlag,
utils.AccountAddressFlag,
},
},
// 可以看到处理函数是:invokeContract
func invokeContract(ctx *cli.Context) error {
SetRpcPort(ctx) // 设置rpc的端口,DEFAULT_RPC_PORT = uint(20336)
// 1、检查是否传入合约地址
// 2、解析合约地址
// 3、解析合约invoke参数
// 4、检查是否设置了预执行ContractPrepareInvokeFlag标记,如果设置了,则预执行
if ctx.IsSet(utils.GetFlagName(utils.ContractPrepareInvokeFlag)) {
preResult, err := utils.PrepareInvokeNeoVMContract(contractAddr, params) // 预执行结果
if err != nil {
return fmt.Errorf("PrepareInvokeNeoVMSmartContact error:%s", err)
}
if preResult.State == 0 {
return fmt.Errorf("Contract invoke failed
")
}
fmt.Printf("Contract invoke successfully
")
fmt.Printf("Gaslimit:%d
", preResult.Gas)
rawReturnTypes := ctx.String(utils.GetFlagName(utils.ContractReturnTypeFlag))
if rawReturnTypes == "" {
fmt.Printf(" Return:%s (raw value)
", preResult.Result)
return nil
}
values, err := utils.ParseReturnValue(preResult.Result, rawReturnTypes)
if err != nil {
return fmt.Errorf("parseReturnValue values:%+v types:%s error:%s", values, rawReturnTypes, err)
}
switch len(values) {
case 0:
fmt.Printf(" Return: nil
")
case 1:
fmt.Printf(" Return:%+v
", values[0])
default:
fmt.Printf(" Return:%+v
", values)
}
return nil
}
// 至此,预检查已经结束。。。。。。
// 接下来的步骤是本地发起的没有预检查的网络上转发来的交易
signer, err := cmdcom.GetAccount(ctx) // 获取交易签名
if err != nil {
return fmt.Errorf("Get signer account error:%s", err)
}
gasPrice := ctx.Uint64(utils.GetFlagName(utils.TransactionGasPriceFlag)) // 取出参数
gasLimit := ctx.Uint64(utils.GetFlagName(utils.TransactionGasLimitFlag))
networkId, err := utils.GetNetworkId() // 获取网络ID
if err != nil {
return err
}
if networkId == config.NETWORK_ID_SOLO_NET {
gasPrice = 0
}
txHash, err := utils.InvokeNeoVMContract(gasPrice, gasLimit, signer, contractAddr, params) // 调用VM执行交易
if err != nil {
return fmt.Errorf("Invoke NeoVM contract error:%s", err)
}
fmt.Printf(" TxHash:%s
", txHash)
fmt.Printf("
Tip:
")
fmt.Printf(" Using './ontology info status %s' to query transaction status
", txHash)
return nil
}
// 交易上下文
// Context is a type that is passed through to
// each Handler action in a cli application. Context
// can be used to retrieve context-specific Args and
// parsed command-line options.
type Context struct {
App *App
Command Command
shellComplete bool
flagSet *flag.FlagSet
setFlags map[string]bool
parentContext *Context
}
// 预执行交易
func PrepareInvokeNeoVMContract(
contractAddress common.Address,
params []interface{},
) (*cstates.PreExecResult, error) {
tx, err := httpcom.NewNeovmInvokeTransaction(0, 0, contractAddress, params) // 构造填充交易结构体
if err != nil {
return nil, err
}
var buffer bytes.Buffer // 定义buffer,取出交易
err = tx.Serialize(&buffer)
if err != nil {
return nil, fmt.Errorf("Serialize error:%s", err)
}
txData := hex.EncodeToString(buffer.Bytes())
data, err := sendRpcRequest("sendrawtransaction", []interface{}{txData, 1}) // 发送交易请求
if err != nil {
return nil, err
}
preResult := &cstates.PreExecResult{} // 解析交易结果
err = json.Unmarshal(data, &preResult)
if err != nil {
return nil, fmt.Errorf("json.Unmarshal PreExecResult:%s error:%s", data, err)
}
return preResult, nil
}
// 组装交易
func NewNeovmInvokeTransaction(gasPrice, gasLimit uint64, contractAddress common.Address, params []interface{}) (*types.Transaction, error) {
invokeCode, err := BuildNeoVMInvokeCode(contractAddress, params) // 构造参数,构造neovm所需要的参数,包括了合约地址被打包进了invokeCode中
if err != nil {
return nil, err
}
return NewSmartContractTransaction(gasPrice, gasLimit, invokeCode)
}
// 构造交易
func NewSmartContractTransaction(gasPrice, gasLimit uint64, invokeCode []byte) (*types.Transaction, error) {
invokePayload := &payload.InvokeCode{
Code: invokeCode,
}
tx := &types.Transaction{
GasPrice: gasPrice,
GasLimit: gasLimit,
TxType: types.Invoke,
Nonce: uint32(time.Now().Unix()),
Payload: invokePayload, // 存放交易参数
Sigs: make([]*types.Sig, 0, 0), // 签名信息,见下面介绍:
}
return tx, nil
}
// 签名信息结构体
type Sig struct {
SigData [][]byte // 签名数据
PubKeys []keypair.PublicKey // 公钥对
M uint16 // 目前不清楚,猜测是交易最终需要验证的个数,目前为0
}
// 发送rpc请求:注意发送地址是本地请求
func sendRpcRequest(method string, params []interface{}) ([]byte, error) {
......
// 本地请求
addr := fmt.Sprintf("http://localhost:%d", config.DefConfig.Rpc.HttpJsonPort)
// 读取返回结果
resp, err := http.Post(addr, "application/json", strings.NewReader(string(data)))
if err != nil {
return nil, fmt.Errorf("http post request:%s error:%s", data, err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("read rpc response body error:%s", err)
}
rpcRsp := &JsonRpcResponse{}
err = json.Unmarshal(body, rpcRsp)
if err != nil {
return nil, fmt.Errorf("json.Unmarshal JsonRpcResponse:%s error:%s", body, err)
}
if rpcRsp.Error != 0 {
return nil, fmt.Errorf("error code:%d desc:%s", rpcRsp.Error, rpcRsp.Desc)
}
return rpcRsp.Result, nil
}
// 接下来看交易预执行的流程
// 接收函数通过如下注册:在startOntology时候调用initRpc启动StartRPCServer
func StartRPCServer() error {
log.Debug()
http.HandleFunc("/", rpc.Handle)
......
rpc.HandleFunc("sendrawtransaction", rpc.SendRawTransaction)
......
err := http.ListenAndServe(":"+strconv.Itoa(int(cfg.DefConfig.Rpc.HttpJsonPort)), nil)
if err != nil {
return fmt.Errorf("ListenAndServe error:%s", err)
}
return nil
}
// 预执行交易
//send raw transaction
// A JSON example for sendrawtransaction method as following:
// {"jsonrpc": "2.0", "method": "sendrawtransaction", "params": ["raw transactioin in hex"], "id": 0} 接收到的RPC参数
func SendRawTransaction(params []interface{}) map[string]interface{} {
if len(params) < 1 {
return responsePack(berr.INVALID_PARAMS, nil)
}
var hash common.Uint256
switch params[0].(type) {
case string:
str := params[0].(string)
hex, err := common.HexToBytes(str)
if err != nil {
return responsePack(berr.INVALID_PARAMS, "")
}
var txn types.Transaction
if err := txn.Deserialize(bytes.NewReader(hex)); err != nil {
return responsePack(berr.INVALID_TRANSACTION, "")
}
hash = txn.Hash()
log.Debugf("SendRawTransaction recv %s", hash.ToHexString())
if txn.TxType == types.Invoke || txn.TxType == types.Deploy {
if len(params) > 1 {
preExec, ok := params[1].(float64) // 不明白干啥的,需要看一下上面打包过来的参数都是什么东西
if ok && preExec == 1 {
result, err := bactor.PreExecuteContract(&txn) // 预执行:使用的是base/actor,下面具体分析,返回执行结果
if err != nil {
log.Infof("PreExec: ", err)
return responsePack(berr.SMARTCODE_ERROR, "")
}
return responseSuccess(result)
}
}
}
log.Debugf("SendRawTransaction send to txpool %s", hash.ToHexString())
if errCode, desc := bcomn.SendTxToPool(&txn); errCode != ontErrors.ErrNoError { // 将交易发送到交易池,见后面分析
log.Warnf("SendRawTransaction verified %s error: %s", hash.ToHexString(), desc)
return responsePack(berr.INVALID_TRANSACTION, desc)
}
log.Debugf("SendRawTransaction verified %s", hash.ToHexString()) // 即验证通过了
default:
return responsePack(berr.INVALID_PARAMS, "")
}
return responseSuccess(hash.ToHexString())
}
//PreExecuteContract from ledger
func PreExecuteContract(tx *types.Transaction) (*cstate.PreExecResult, error) {
return ledger.DefLedger.PreExecuteContract(tx)
}
func (self *Ledger) PreExecuteContract(tx *types.Transaction) (*cstate.PreExecResult, error) {
return self.ldgStore.PreExecuteContract(tx)
}
//PreExecuteContract return the result of smart contract execution without commit to store
func (this *LedgerStoreImp) PreExecuteContract(tx *types.Transaction) (*sstate.PreExecResult, error) {
header, err := this.GetHeaderByHeight(this.GetCurrentBlockHeight()) // 通过本地的ledger获取区块头信息,获取的是当前记录最新的区块头Current block height
if err != nil { // 关于this的结构,可以看下面分析
return &sstate.PreExecResult{State: event.CONTRACT_STATE_FAIL, Gas: neovm.MIN_TRANSACTION_GAS, Result: nil}, err
}
config := &smartcontract.Config{
Time: header.Timestamp,
Height: header.Height,
Tx: tx,
}
cache := storage.NewCloneCache(this.stateStore.NewStateBatch())
preGas, err := this.getPreGas(config, cache)
if err != nil {
return &sstate.PreExecResult{State: event.CONTRACT_STATE_FAIL, Gas: neovm.MIN_TRANSACTION_GAS, Result: nil}, err
}
if tx.TxType == types.Invoke {
invoke := tx.Payload.(*payload.InvokeCode)
sc := smartcontract.SmartContract{
Config: config,
Store: this,
CloneCache: cache,
Gas: math.MaxUint64 - calcGasByCodeLen(len(invoke.Code), preGas[neovm.UINT_INVOKE_CODE_LEN_NAME]),
}
//start the smart contract executive function
engine, _ := sc.NewExecuteEngine(invoke.Code) // ?没看到执行的代码啊
result, err := engine.Invoke()
if err != nil {
return &sstate.PreExecResult{State: event.CONTRACT_STATE_FAIL, Gas: neovm.MIN_TRANSACTION_GAS, Result: nil}, err // 如果失败,返回定义的最小交易gas耗费值
}
gasCost := math.MaxUint64 - sc.Gas
mixGas := neovm.MIN_TRANSACTION_GAS
if gasCost < mixGas {
gasCost = mixGas
}
return &sstate.PreExecResult{State: event.CONTRACT_STATE_SUCCESS, Gas: gasCost, Result: scommon.ConvertNeoVmTypeHexString(result)}, nil // 返回成功的结果和交易的gas消耗
} else if tx.TxType == types.Deploy {
deploy := tx.Payload.(*payload.DeployCode)
return &sstate.PreExecResult{State: event.CONTRACT_STATE_SUCCESS, Gas: preGas[neovm.CONTRACT_CREATE_NAME] + calcGasByCodeLen(len(deploy.Code), preGas[neovm.UINT_DEPLOY_CODE_LEN_NAME]), Result: nil}, nil
} else {
return &sstate.PreExecResult{State: event.CONTRACT_STATE_FAIL, Gas: neovm.MIN_TRANSACTION_GAS, Result: nil}, errors.NewErr("transaction type error")
}
}
// 本地账本结构体
//LedgerStoreImp is main store struct fo ledger
type LedgerStoreImp struct {
blockStore *BlockStore //BlockStore for saving block & transaction data 块存储,本地账本存储,记录本地的块
stateStore *StateStore //StateStore for saving state data, like balance, smart contract execution result, and so on. // 状态存储
eventStore *EventStore //EventStore for saving log those gen after smart contract executed. // 执行日志后的事件存储
storedIndexCount uint32 //record the count of have saved block index // 已经存储的block索引
currBlockHeight uint32 //Current block height // 当前记录的最新的块高度
currBlockHash common.Uint256 //Current block hash 哈希
headerCache map[common.Uint256]*types.Header //BlockHash => Header 块头
headerIndex map[uint32]common.Uint256 //Header index, Mapping header height => block hash 索引
savingBlock bool //is saving block now 是否存储
vbftPeerInfoheader map[string]uint32 //pubInfo save pubkey,peerindex
vbftPeerInfoblock map[string]uint32 //pubInfo save pubkey,peerindex
lock sync.RWMutex
}
// 上面分析了交易经过了预执行,接着分析交易进入交易池
func SendTxToPool(txn *types.Transaction) (ontErrors.ErrCode, string) {
if errCode, desc := bactor.AppendTxToPool(txn); errCode != ontErrors.ErrNoError {
log.Warn("TxnPool verify error:", errCode.Error())
return errCode, desc
}
return ontErrors.ErrNoError, ""
}
//append transaction to pool to txpool actor
func AppendTxToPool(txn *types.Transaction) (ontErrors.ErrCode, string) {
if DisableSyncVerifyTx {
txReq := &tcomn.TxReq{txn, tcomn.HttpSender, nil}
txnPid.Tell(txReq)
return ontErrors.ErrNoError, ""
}
ch := make(chan *tcomn.TxResult, 1)
txReq := &tcomn.TxReq{txn, tcomn.HttpSender, ch}
txnPid.Tell(txReq) // 向txnPid发送了一个req
if msg, ok := <-ch; ok { ? 没看明白
return msg.Err, msg.Desc
}
return ontErrors.ErrUnknown, ""
}
// 处理req请求
// Receive implements the actor interface
func (ta *TxActor) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case *actor.Started:
log.Info("txpool-tx actor started and be ready to receive tx msg")
case *actor.Stopping:
log.Warn("txpool-tx actor stopping")
case *actor.Restarting:
log.Warn("txpool-tx actor restarting")
case *tc.TxReq:
sender := msg.Sender
log.Debugf("txpool-tx actor receives tx from %v ", sender.Sender())
ta.handleTransaction(sender, context.Self(), msg.Tx, msg.TxResultCh)
......
// handleTransaction handles a transaction from network and http
func (ta *TxActor) handleTransaction(sender tc.SenderType, self *actor.PID,
txn *tx.Transaction, txResultCh chan *tc.TxResult) {
ta.server.increaseStats(tc.RcvStats)
if len(txn.ToArray()) > tc.MAX_TX_SIZE { // 交易不能超过1M
log.Debugf("handleTransaction: reject a transaction due to size over 1M")
if sender == tc.HttpSender && txResultCh != nil {
replyTxResult(txResultCh, txn.Hash(), errors.ErrUnknown, "size is over 1M")
}
return
}
if ta.server.getTransaction(txn.Hash()) != nil { // 是否是重复的交易,注意这里是冲本地获取的已经存处理过的交易
log.Debugf("handleTransaction: transaction %x already in the txn pool",
txn.Hash())
ta.server.increaseStats(tc.DuplicateStats) // 记录DuplicateStats
if sender == tc.HttpSender && txResultCh != nil {
replyTxResult(txResultCh, txn.Hash(), errors.ErrDuplicateInput,
fmt.Sprintf("transaction %x is already in the tx pool", txn.Hash()))
}
} else if ta.server.getTransactionCount() >= tc.MAX_CAPACITY { // 交易池已满
log.Debugf("handleTransaction: transaction pool is full for tx %x",
txn.Hash())
ta.server.increaseStats(tc.FailureStats) // 记录FailureStats
if sender == tc.HttpSender && txResultCh != nil {
replyTxResult(txResultCh, txn.Hash(), errors.ErrTxPoolFull,
"transaction pool is full")
}
} else {
if _, overflow := common.SafeMul(txn.GasLimit, txn.GasPrice); overflow { // 溢出保护
log.Debugf("handleTransaction: gasLimit %v, gasPrice %v overflow",
txn.GasLimit, txn.GasPrice)
if sender == tc.HttpSender && txResultCh != nil {
replyTxResult(txResultCh, txn.Hash(), errors.ErrUnknown,
fmt.Sprintf("gasLimit %d * gasPrice %d overflow",
txn.GasLimit, txn.GasPrice))
}
return
}
// 从配置文件读取gas的limit和price
gasLimitConfig := config.DefConfig.Common.GasLimit
gasPriceConfig := ta.server.getGasPrice()
if txn.GasLimit < gasLimitConfig || txn.GasPrice < gasPriceConfig { // 入参不满足配置文件设置
log.Debugf("handleTransaction: invalid gasLimit %v, gasPrice %v",
txn.GasLimit, txn.GasPrice)
if sender == tc.HttpSender && txResultCh != nil {
replyTxResult(txResultCh, txn.Hash(), errors.ErrUnknown,
fmt.Sprintf("Please input gasLimit >= %d and gasPrice >= %d",
gasLimitConfig, gasPriceConfig))
}
return
}
if txn.TxType == tx.Deploy && txn.GasLimit < neovm.CONTRACT_CREATE_GAS { // 如果是部署合约,要求gas必须大于某个值
log.Debugf("handleTransaction: deploy tx invalid gasLimit %v, gasPrice %v",
txn.GasLimit, txn.GasPrice)
if sender == tc.HttpSender && txResultCh != nil {
replyTxResult(txResultCh, txn.Hash(), errors.ErrUnknown,
fmt.Sprintf("Deploy tx gaslimit should >= %d",
neovm.CONTRACT_CREATE_GAS))
}
return
}
if !ta.server.disablePreExec { // 配置了禁用预先检查
if ok, desc := preExecCheck(txn); !ok {
log.Debugf("handleTransaction: preExecCheck tx %x failed", txn.Hash())
if sender == tc.HttpSender && txResultCh != nil {
replyTxResult(txResultCh, txn.Hash(), errors.ErrUnknown, desc)
}
return
}
log.Debugf("handleTransaction: preExecCheck tx %x passed", txn.Hash())
}
<-ta.server.slots // 写入管道
ta.server.assignTxToWorker(txn, sender, txResultCh) // 将tx分配给worker
}
}
// assignTxToWorker assigns a new transaction to a worker by LB
func (s *TXPoolServer) assignTxToWorker(tx *tx.Transaction,
sender tc.SenderType, txResultCh chan *tc.TxResult) bool {
if tx == nil {
return false
}
if ok := s.setPendingTx(tx, sender, txResultCh); !ok { // 重复交易,即work的pending队列中已经存在该交易
s.increaseStats(tc.DuplicateStats)
if sender == tc.HttpSender && txResultCh != nil {
replyTxResult(txResultCh, tx.Hash(), errors.ErrDuplicateInput,
"duplicated transaction input detected")
}
return false
}
// Add the rcvTxn to the worker
lb := make(tc.LBSlice, len(s.workers))
for i := 0; i < len(s.workers); i++ {
entry := tc.LB{Size: len(s.workers[i].rcvTXCh) +
len(s.workers[i].pendingTxList),
WorkerID: uint8(i),
}
lb[i] = entry
}
sort.Sort(lb) // 排序
s.workers[lb[0].WorkerID].rcvTXCh <- tx // 将tx放入s.workers[lb[0].WorkerID].rcvTXCh,发送一个channel
return true
}
// 处理channel
// Start is the main event loop.
func (worker *txPoolWorker) start() {
worker.timer = time.NewTimer(time.Second * tc.EXPIRE_INTERVAL)
for {
select {
case <-worker.stopCh:
worker.server.wg.Done()
return
case rcvTx, ok := <-worker.rcvTXCh: // 处理rcvTXCh
if ok {
// Verify rcvTxn
worker.verifyTx(rcvTx)
}
case stfTx, ok := <-worker.stfTxCh:
if ok {
worker.verifyStateful(stfTx)
}
case <-worker.timer.C:
worker.handleTimeoutEvent()
worker.timer.Stop()
worker.timer.Reset(time.Second * tc.EXPIRE_INTERVAL)
case rsp, ok := <-worker.rspCh:
if ok {
/* Handle the response from validator, if all of cases
* are verified, put it to txnPool
*/
worker.handleRsp(rsp)
}
}
}
}
// 验证rcvTXCh
// verifyTx prepares a check request and sends it to the validators.
func (worker *txPoolWorker) verifyTx(tx *tx.Transaction) {
if tx := worker.server.getTransaction(tx.Hash()); tx != nil { // 交易已经存在
log.Infof("verifyTx: transaction %x already in the txn pool",
tx.Hash())
worker.server.removePendingTx(tx.Hash(), errors.ErrDuplicateInput)
return
}
if _, ok := worker.pendingTxList[tx.Hash()]; ok { //
log.Infof("verifyTx: transaction %x already in the verifying process",
tx.Hash())
return
}
// Construct the request and send it to each validator server to verify
req := &types.CheckTx{
WorkerId: worker.workId,
Tx: *tx,
}
worker.sendReq2Validator(req)
// Construct the pending transaction
pt := &pendingTx{
tx: tx,
req: req,
flag: 0,
retries: 0,
}
// Add it to the pending transaction list
worker.mu.Lock()
worker.pendingTxList[tx.Hash()] = pt
worker.mu.Unlock()
// Record the time per a txn
pt.valTime = time.Now()
}
// 过了预执行阶段,注意这段的逻辑和预执行很像
//Invoke neo vm smart contract. if isPreExec is true, the invoke will not really execute
func InvokeNeoVMContract(
gasPrice,
gasLimit uint64,
signer *account.Account,
smartcodeAddress common.Address,
params []interface{}) (string, error) {
tx, err := httpcom.NewNeovmInvokeTransaction(gasPrice, gasLimit, smartcodeAddress, params) // 组装交易结构体
if err != nil {
return "", err
}
return InvokeSmartContract(signer, tx) // invoke
}
//InvokeSmartContract is low level method to invoke contact.
func InvokeSmartContract(signer *account.Account, tx *types.Transaction) (string, error) {
// 为交易签名
err := SignTransaction(signer, tx)
if err != nil {
return "", fmt.Errorf("SignTransaction error:%s", err)
}
// 发送SendRawTransaction,这个和上面预交易一致
txHash, err := SendRawTransaction(tx)
if err != nil {
return "", fmt.Errorf("SendTransaction error:%s", err)
}
return txHash, nil
}
// assignTxToWorker assigns a new transaction to a worker by LB
func (s *TXPoolServer) assignTxToWorker(tx *tx.Transaction,
sender tc.SenderType, txResultCh chan *tc.TxResult) bool {
if tx == nil {
return false
}
// 判断交易是否已经存在 allPendingTxs map[common.Uint256]*serverPendingTx // The txs that server is processin
if ok := s.setPendingTx(tx, sender, txResultCh); !ok {
s.increaseStats(tc.DuplicateStats)
if sender == tc.HttpSender && txResultCh != nil {
replyTxResult(txResultCh, tx.Hash(), errors.ErrDuplicateInput,
"duplicated transaction input detected")
}
return false
}
// Add the rcvTxn to the worker 统计所有work的pendingTxList信息,排序后选择排队最少的worker
lb := make(tc.LBSlice, len(s.workers))
for i := 0; i < len(s.workers); i++ {
entry := tc.LB{Size: len(s.workers[i].rcvTXCh) +
len(s.workers[i].pendingTxList),
WorkerID: uint8(i),
}
lb[i] = entry
}
sort.Sort(lb)
s.workers[lb[0].WorkerID].rcvTXCh <- tx
return true
}
// worker的txpool
type txPoolWorker struct {
mu sync.RWMutex
workId uint8 // Worker ID
rcvTXCh chan *tx.Transaction // The channel of receive transaction
stfTxCh chan *tx.Transaction // The channel of txs to be re-verified stateful ? 再次验证
rspCh chan *types.CheckResponse // The channel of verified response 验证结果返回通道
server *TXPoolServer // The txn pool server pointer 属于哪个txnpool ?
timer *time.Timer // The timer of reverifying
stopCh chan bool // stop routine
pendingTxList map[common.Uint256]*pendingTx // The transaction on the verifying process 待验证队列
}
// 处理验证
// Start is the main event loop.
func (worker *txPoolWorker) start() {
worker.timer = time.NewTimer(time.Second * tc.EXPIRE_INTERVAL)
for {
select {
case <-worker.stopCh:
worker.server.wg.Done()
return
case rcvTx, ok := <-worker.rcvTXCh:
if ok {
// Verify rcvTxn
worker.verifyTx(rcvTx)
}
......
}
// 发送给validators验证
// verifyTx prepares a check request and sends it to the validators.
func (worker *txPoolWorker) verifyTx(tx *tx.Transaction) {
if tx := worker.server.getTransaction(tx.Hash()); tx != nil {
log.Infof("verifyTx: transaction %x already in the txn pool",
tx.Hash())
worker.server.removePendingTx(tx.Hash(), errors.ErrDuplicateInput)
return
}
if _, ok := worker.pendingTxList[tx.Hash()]; ok {
log.Infof("verifyTx: transaction %x already in the verifying process",
tx.Hash())
return
}
// Construct the request and send it to each validator server to verify 组建请求消息
req := &types.CheckTx{
WorkerId: worker.workId,
Tx: *tx,
}
worker.sendReq2Validator(req) // 发送给验证节点进行验证
// Construct the pending transaction
pt := &pendingTx{
tx: tx,
req: req,
flag: 0,
retries: 0,
}
// Add it to the pending transaction list
worker.mu.Lock()
worker.pendingTxList[tx.Hash()] = pt // 记录到txpool中
worker.mu.Unlock()
// Record the time per a txn
pt.valTime = time.Now()
}
// sendReq2Validator sends a check request to the validators
func (worker *txPoolWorker) sendReq2Validator(req *types.CheckTx) bool {
rspPid := worker.server.GetPID(tc.VerifyRspActor)
if rspPid == nil {
log.Info("sendReq2Validator: VerifyRspActor not exist")
return false
}
pids := worker.server.getNextValidatorPIDs() // 默认验证节点是2个,即stateless和full,分别验证不同的部分,通过两个goroutine实现
if pids == nil {
return false
}
for _, pid := range pids {
pid.Request(req, rspPid) // 发送给验证节点
}
return true
}
// 将交易发送给validator节点进行验证,这个是stateless节点
func (self *validator) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case *actor.Started:
log.Info("stateless-validator: started and be ready to receive txn")
case *actor.Stopping:
log.Info("stateless-validator: stopping")
case *actor.Restarting:
log.Info("stateless-validator: restarting")
case *actor.Stopped:
log.Info("stateless-validator: stopped")
case *vatypes.CheckTx:
log.Debugf("stateless-validator receive tx %x", msg.Tx.Hash())
sender := context.Sender()
errCode := validation.VerifyTransaction(&msg.Tx)
response := &vatypes.CheckResponse{
WorkerId: msg.WorkerId,
ErrCode: errCode,
Hash: msg.Tx.Hash(),
Type: self.VerifyType(),
Height: 0,
}
sender.Tell(response)
........
}
// staetfull节点
func (self *validator) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case *actor.Started:
log.Info("stateless-validator: started and be ready to receive txn")
case *actor.Stopping:
log.Info("stateless-validator: stopping")
case *actor.Restarting:
log.Info("stateless-validator: restarting")
case *actor.Stopped:
log.Info("stateless-validator: stopped")
case *vatypes.CheckTx:
log.Debugf("stateless-validator receive tx %x", msg.Tx.Hash())
sender := context.Sender()
errCode := validation.VerifyTransaction(&msg.Tx)
response := &vatypes.CheckResponse{
WorkerId: msg.WorkerId,
ErrCode: errCode,
Hash: msg.Tx.Hash(),
Type: self.VerifyType(),
Height: 0,
}
// 验证结果发送response
sender.Tell(response)
......
}
// stateless验证过程:可以发现其验证没有牵涉到任何的账本数据验证,全是对tx信息的验证
// VerifyTransaction verifys received single transaction
func VerifyTransaction(tx *types.Transaction) ontErrors.ErrCode {
if err := checkTransactionSignatures(tx); err != nil { // 验证签名,包括多重签名
log.Info("transaction verify error:", err)
return ontErrors.ErrVerifySignature
}
// 检查payload,逻辑还没有实现完全
if err := checkTransactionPayload(tx); err != nil {
log.Warn("[VerifyTransaction],", err)
return ontErrors.ErrTransactionPayload
}
return ontErrors.ErrNoError
}
// statefull的验证需要调用底层DB查询是否是重复交易
func (self *validator) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case *actor.Started:
log.Info("stateful-validator: started and be ready to receive txn")
case *actor.Stopping:
log.Info("stateful-validator: stopping")
case *actor.Restarting:
log.Info("stateful-validator: restarting")
case *vatypes.CheckTx:
log.Debugf("stateful-validator: receive tx %x", msg.Tx.Hash())
sender := context.Sender()
height := ledger.DefLedger.GetCurrentBlockHeight() // 当前块高度
errCode := errors.ErrNoError
hash := msg.Tx.Hash()
exist, err := ledger.DefLedger.IsContainTransaction(hash) // 查询底层数据库
if err != nil {
log.Warn("query db error:", err)
errCode = errors.ErrUnknown
} else if exist {
errCode = errors.ErrDuplicatedTx
}
response := &vatypes.CheckResponse{
WorkerId: msg.WorkerId,
Type: self.VerifyType(),
Hash: msg.Tx.Hash(),
Height: height,
ErrCode: errCode,
}
sender.Tell(response)
......
}
// Receive implements the actor interface
func (vpa *VerifyRspActor) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case *actor.Started:
log.Info("txpool-verify actor: started and be ready to receive validator's msg")
case *actor.Stopping:
log.Warn("txpool-verify actor: stopping")
case *actor.Restarting:
log.Warn("txpool-verify actor: Restarting")
case *types.RegisterValidator:
log.Debugf("txpool-verify actor:: validator %v connected", msg.Sender)
vpa.server.registerValidator(msg)
case *types.UnRegisterValidator:
log.Debugf("txpool-verify actor:: validator %d:%v disconnected", msg.Type, msg.Id)
vpa.server.unRegisterValidator(msg.Type, msg.Id)
case *types.CheckResponse:
log.Debug("txpool-verify actor:: Receives verify rsp message") // 发送验证结果
vpa.server.assignRspToWorker(msg)
default:
log.Debugf("txpool-verify actor:Unknown msg %v type %v", msg, reflect.TypeOf(msg))
}
}
// 即返回给workID
// assignRspToWorker assigns a check response from the validator to
// the correct worker.
func (s *TXPoolServer) assignRspToWorker(rsp *types.CheckResponse) bool {
if rsp == nil {
return false
}
if rsp.WorkerId >= 0 && rsp.WorkerId < uint8(len(s.workers)) {
s.workers[rsp.WorkerId].rspCh <- rsp // 发送到work id
}
if rsp.ErrCode == errors.ErrNoError {
s.increaseStats(tc.SuccessStats)
} else {
s.increaseStats(tc.FailureStats)
if rsp.Type == types.Stateless {
s.increaseStats(tc.SigErrStats)
} else {
s.increaseStats(tc.StateErrStats)
}
}
return true
}
// work处理消息
// Start is the main event loop.
func (worker *txPoolWorker) start() {
worker.timer = time.NewTimer(time.Second * tc.EXPIRE_INTERVAL)
for {
select {
case <-worker.stopCh:
worker.server.wg.Done()
return
case rcvTx, ok := <-worker.rcvTXCh:
if ok {
// Verify rcvTxn
worker.verifyTx(rcvTx)
}
case stfTx, ok := <-worker.stfTxCh:
if ok {
worker.verifyStateful(stfTx)
}
case <-worker.timer.C:
worker.handleTimeoutEvent()
worker.timer.Stop()
worker.timer.Reset(time.Second * tc.EXPIRE_INTERVAL)
case rsp, ok := <-worker.rspCh:
if ok {
/* Handle the response from validator, if all of cases
* are verified, put it to txnPool
*/
worker.handleRsp(rsp) // 所有都ok了,推送到本地交易池
}
}
}
}
// the tx is valid, add it to the tx pool, or remove it from the pending
// list
func (worker *txPoolWorker) handleRsp(rsp *types.CheckResponse) {
if rsp.WorkerId != worker.workId { // 1、id不对
return
}
worker.mu.Lock()
defer worker.mu.Unlock()
pt, ok := worker.pendingTxList[rsp.Hash] // 2、不在work的pending队列
if !ok {
return
}
if rsp.ErrCode != errors.ErrNoError { // 3、验证节点未通过验证
//Verify fail
log.Infof("handleRsp: validator %d transaction %x invalid: %s",
rsp.Type, rsp.Hash, rsp.ErrCode.Error())
delete(worker.pendingTxList, rsp.Hash) // 从work的pending队列删除
worker.server.removePendingTx(rsp.Hash, rsp.ErrCode) // 网络同步广播
return
}
if tc.STATEFUL_MASK&(0x1<<rsp.Type) != 0 && rsp.Height < worker.server.getHeight() { // 4、验证节点的块高低于work节点的,再次使用statefull节点验证
// If validator's height is less than the required one, re-validate it.
worker.sendReq2StatefulV(pt.req)
pt.valTime = time.Now()
return
}
if pt.flag&(0x1<<rsp.Type) == 0 {
retAttr := &tc.TXAttr{
Height: rsp.Height,
Type: rsp.Type,
ErrCode: rsp.ErrCode,
}
pt.flag |= (0x1 << rsp.Type)
pt.ret = append(pt.ret, retAttr)
}
if pt.flag&0xf == tc.VERIFY_MASK {
worker.putTxPool(pt) // 通过验证,放入到txpool中
delete(worker.pendingTxList, rsp.Hash) // 从pending删除
}
}
// 如果是来自http的,将该交易进行广播
// when it is handled. And if the submitter of the valid transaction
// is from http, broadcast it to the network. Meanwhile, check if it
// is in the block from consensus.
func (s *TXPoolServer) removePendingTx(hash common.Uint256,
err errors.ErrCode) {
s.mu.Lock()
pt, ok := s.allPendingTxs[hash]
if !ok {
s.mu.Unlock()
return
}
if err == errors.ErrNoError && ((pt.sender == tc.HttpSender) ||
(pt.sender == tc.NetSender && s.enableBroadcastNetTx)) {
pid := s.GetPID(tc.NetActor)
if pid != nil {
pid.Tell(pt.tx)
}
}
if pt.sender == tc.HttpSender && pt.ch != nil {
replyTxResult(pt.ch, hash, err, err.Error())
}
delete(s.allPendingTxs, hash)
if len(s.allPendingTxs) < tc.MAX_LIMITATION {
select {
case s.slots <- struct{}{}:
default:
log.Debug("removePendingTx: slots is full")
}
}
s.mu.Unlock()
// Check if the tx is in the pending block and
// the pending block is verified
s.checkPendingBlockOk(hash, err)
}
// 交易如池
// putTxPool adds a valid transaction to the tx pool and removes it from
// the pending list.
func (worker *txPoolWorker) putTxPool(pt *pendingTx) bool {
txEntry := &tc.TXEntry{
Tx: pt.tx,
Attrs: pt.ret,
}
worker.server.addTxList(txEntry) // 记录的是txEntry,即加入了验证的结果信息
worker.server.removePendingTx(pt.tx.Hash(), errors.ErrNoError)
return true
}
//总结一下:
// 1、对于带有pre请求的操作,先会预检查,会执行合约代码,但不会修改状态数据库
// 2、work先检查,如基本数据和参数的检查,但是不检查签名,通过后放入到pending队列,然后发给validator处理;work的pending放正在验证的交易,all放未经验证的交易
// 3、stateless检查签名和payload
// 4、statefull检查底层数据库记录的交易
// 5、验证通过,记录,失败,广播?
//按照角色不同,节点可以分为记账节点和同步节点,记账节点参与网络共识,而同步节点只同步记账节点生成的区块。
//Bookkeepers:记账人,用来配置记账人的公钥,需要配置四个;
//SeedList:用来配置ontology网络的种子节点。种子节点是ontology网络的链接入口,新节点加入ontology网络时,会先向种子节点请求网络相关信息。配置文件里至少需配置一个种子节点。
func initConsensus(ctx *cli.Context, p2pPid *actor.PID, txpoolSvr *proc.TXPoolServer, acc *account.Account) (consensus.ConsensusService, error) {
if !config.DefConfig.Consensus.EnableConsensus {
return nil, nil
}
pool := txpoolSvr.GetPID(tc.TxPoolActor)
consensusType := strings.ToLower(config.DefConfig.Genesis.ConsensusType)
consensusService, err := consensus.NewConsensusService(consensusType, acc, pool, nil, p2pPid) // 根据type生成共识实例,包括start和halt和getpid方法
if err != nil {
return nil, fmt.Errorf("NewConsensusService:%s error:%s", consensusType, err)
}
consensusService.Start() // 共识模块启动
netreqactor.SetConsensusPid(consensusService.GetPID())
hserver.SetConsensusPid(consensusService.GetPID())
log.Infof("Consensus init success")
return consensusService, nil
}
// 启动dbft共识
func (ds *DbftService) start() {
log.Debug()
ds.started = true
if config.DefConfig.Genesis.DBFT.GenBlockTime > config.MIN_GEN_BLOCK_TIME {
genesis.GenBlockTime = time.Duration(config.DefConfig.Genesis.DBFT.GenBlockTime) * time.Second
} else {
log.Warn("The Generate block time should be longer than 2 seconds, so set it to be default 6 seconds.")
}
ds.sub.Subscribe(message.TOPIC_SAVE_BLOCK_COMPLETE)
ds.InitializeConsensus(0)
}
func (ds *DbftService) InitializeConsensus(viewNum byte) error {
log.Debug("[InitializeConsensus] Start InitializeConsensus.")
log.Debug("[InitializeConsensus] viewNum: ", viewNum)
if viewNum == 0 {
ds.context.Reset(ds.Account)
} else {
if ds.context.State.HasFlag(BlockGenerated) {
return nil
}
ds.context.ChangeView(viewNum)
}
if ds.context.BookkeeperIndex < 0 {
log.Info("You aren't bookkeeper")
return nil
}
if ds.context.BookkeeperIndex == int(ds.context.PrimaryIndex) {
//primary peer
ds.context.State |= Primary
ds.timerHeight = ds.context.Height
ds.timeView = viewNum
span := time.Now().Sub(ds.blockReceivedTime)
if span > genesis.GenBlockTime {
//TODO: double check the is the stop necessary
ds.timer.Stop()
ds.timer.Reset(0)
//go ds.Timeout()
} else {
ds.timer.Stop()
ds.timer.Reset(genesis.GenBlockTime - span)
}
} else {
//backup peer
ds.context.State = Backup
ds.timerHeight = ds.context.Height
ds.timeView = viewNum
ds.timer.Stop()
ds.timer.Reset(genesis.GenBlockTime << (viewNum + 1))
}
return nil
}