zoukankan      html  css  js  c++  java
  • 菜鸟系列Fabric源码学习 — Endorser背书节点

    Fabric 1.4 源码分析 Endorser背书节点

    本文档主要介绍fabric背书节点的主要功能及其实现。

    1. 简介

    Endorser节点是peer节点所扮演的一种角色,在peer启动时会创建Endorser背书服务器,并注册到本地gRPC服务器(7051端口)上对外提供服务,对请求的签名提案消息执行启动链码容器、模拟执行链码、背书签名等流程。所有客户端提交到账本的调用交易都需要背书节点背书,当客户端收集到足够的背书信息之后,再将签名提案消息、模拟执行的结果以及背书信息打包成交易信息发给orderer节点排序出块。

    背书者Endorser在一个交易流中充当的作用如下:

    • 客户端发送一个背书申请(SignedProposal)到Endorser。
    • Endorser对申请进行背书,发送一个申请应答(ProposalResponse)到客户端。
    • 客户端将申请应答中的背书组装到一个交易请求(SignedTransaction)中。

    2. 背书服务器初始化

    当peer节点启动时,会注册背书服务器。

    serverEndorser := endorser.NewEndorserServer(privDataDist, endorserSupport, pr, metricsProvider)
    auth := authHandler.ChainFilters(serverEndorser, authFilters...)
    // Register the Endorser server
    pb.RegisterEndorserServer(peerServer.Server(), auth)
    

    其中,背书服务最重要的接口为

    // EndorserServer is the server API for Endorser service.
    type EndorserServer interface {
    	ProcessProposal(context.Context, *SignedProposal) (*ProposalResponse, error)
    }
    

    ProcessProposal()服务接口主要功能为接收和处理签名提案消息(SignedProposal)、启动链码容器、执行调用链码以及进行签名背书。

    3. 背书服务

    在ProcessProposal()服务中,主要存在以下流程:

    1. 调用preProcess()方法检查和校验签名提案的合法性
    2. 调用SimulateProposal()方法调用链码容器并模拟执行提案
    3. 调用endorseProposal()方法对模拟执行结果签名背书,并返回提案响应消息

    源代码如下所示:

    func (e *Endorser) ProcessProposal(ctx context.Context, signedProp *pb.SignedProposal) (*pb.ProposalResponse, error) {
    	...
    	// 0 -- check and validate
    	vr, err := e.preProcess(signedProp)
    	if err != nil {
    		resp := vr.resp
    		return resp, err
    	}
    
    	prop, hdrExt, chainID, txid := vr.prop, vr.hdrExt, vr.chainID, vr.txid
    	txParams := &ccprovider.TransactionParams{
    		ChannelID:            chainID,
    		TxID:                 txid,
    		SignedProp:           signedProp,
    		Proposal:             prop,
    		TXSimulator:          txsim,
    		HistoryQueryExecutor: historyQueryExecutor,
    	}
    	// 1 -- simulate
    	cd, res, simulationResult, ccevent, err := e.SimulateProposal(txParams, hdrExt.ChaincodeId)
    	if err != nil {
    		return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil
    	}
    	...
    	// 2 -- endorse and get a marshalled ProposalResponse message
    	var pResp *pb.ProposalResponse
    	if chainID == "" {
    		pResp = &pb.ProposalResponse{Response: res}
    	} else {
    		pResp, err = e.endorseProposal(ctx, chainID, txid, signedProp, prop, res, simulationResult, ccevent, hdrExt.PayloadVisibility, hdrExt.ChaincodeId, txsim, cd)
            ...
    	}
    	pResp.Response = res
    	return pResp, nil
    }
    

    3.1 检查和校验签名提案的合法性

    preProcess()方法对签名提案消息进行预处理,主要包括验证消息格式和签名的合法性、验证提案消息对应链码检查是否是系统链码并且不为外部调用、交易的唯一性、验证是否满足对应通道的访问控制策略。

    func (e *Endorser) preProcess(signedProp *pb.SignedProposal) (*validateResult, error) {
    	vr := &validateResult{}
    	// 1. 验证消息格式和签名合法性
    	prop, hdr, hdrExt, err := validation.ValidateProposalMessage(signedProp)
    	chdr, err := putils.UnmarshalChannelHeader(hdr.ChannelHeader)
    	shdr, err := putils.GetSignatureHeader(hdr.SignatureHeader)
    	// block invocations to security-sensitive system chaincodes
    	// 2. 验证链码
    	if e.s.IsSysCCAndNotInvokableExternal(hdrExt.ChaincodeId.Name) {
    		vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
    		return vr, err
    	}
    
    	chainID := chdr.ChannelId
    	txid := chdr.TxId
    
    	if chainID != "" {
    	    // 3. 验证交易唯一性
    		if _, err = e.s.GetTransactionByID(chainID, txid); err == nil {
    			err = errors.Errorf("duplicate transaction found [%s]. Creator [%x]", txid, shdr.Creator)
    			vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
    			return vr, err
    		}
    		if !e.s.IsSysCC(hdrExt.ChaincodeId.Name) {
    			// check that the proposal complies with the Channel's writers
    			// 4. 验证acl
    			if err = e.s.CheckACL(signedProp, chdr, shdr, hdrExt); err != nil {
    				e.Metrics.ProposalACLCheckFailed.With(meterLabels...).Add(1)
    				return vr, err
    			}
    		}
    	} else {
    
    	}
    	vr.prop, vr.hdrExt, vr.chainID, vr.txid = prop, hdrExt, chainID, txid
    	return vr, nil
    }
    

    3.1.1 验证消息格式和签名合法性

    preProcess()调用ValidateProposalMessage()对消息进行验证。主要针对消息的格式、签名、交易id进行验证。

    首先调用validateCommonHeader()校验Proposal.Header的合法性。

    // checks for a valid Header
    func validateCommonHeader(hdr *common.Header) (*common.ChannelHeader, *common.SignatureHeader, error) {
    	if hdr == nil {
    		return nil, nil, errors.New("nil header")
    	}
    	chdr, err := utils.UnmarshalChannelHeader(hdr.ChannelHeader)
    	shdr, err := utils.GetSignatureHeader(hdr.SignatureHeader)
    	// 校验消息类型是否属于HeaderType_ENDORSER_TRANSACTION、HeaderType_CONFIG_UPDATE、HeaderType_CONFIG、HeaderType_TOKEN_TRANSACTION,并且校验Epoch是否为0
    	err = validateChannelHeader(chdr)
    	// 校验shdr shdr.Nonce  shdr.Creator是否为nil,或长度是否为0
    	err = validateSignatureHeader(shdr)
    	return chdr, shdr, nil
    }
    

    接着调用checkSignatureFromCreator()对签名进行校验。其中,首先校验传入参数是否为nil,接着creator.Validate()对创建者creator进行验证。

    err = checkSignatureFromCreator(shdr.Creator, signedProp.Signature, signedProp.ProposalBytes, chdr.ChannelId)
    

    然后对交易id进行验证,验证交易id是否与计算的交易id一致

    err = utils.CheckTxID(
    		chdr.TxId,
    		shdr.Nonce,
    		shdr.Creator)
    
    // 计算交易id
    func ComputeTxID(nonce, creator []byte) (string, error) {
    	digest, err := factory.GetDefault().Hash(
    		append(nonce, creator...),
    		&bccsp.SHA256Opts{})
    	if err != nil {
    		return "", err
    	}
    	return hex.EncodeToString(digest), nil
    }
    

    最后根据消息类型进行分类处理:

    switch common.HeaderType(chdr.Type) {
    case common.HeaderType_CONFIG:
    	fallthrough
    case common.HeaderType_ENDORSER_TRANSACTION:
    	chaincodeHdrExt, err := validateChaincodeProposalMessage(prop, hdr)
    	if err != nil {
    		return nil, nil, nil, err
    	}
    	return prop, hdr, chaincodeHdrExt, err
    default:
    	return nil, nil, nil, errors.Errorf("unsupported proposal type %d", common.HeaderType(chdr.Type))
    }
    

    其中,validateChaincodeProposalMessage()方法验证输入参数不为nil,调用GetChaincodeHeaderExtension()方法获取chaincodeHdrExt,并校验chaincodeHdrExt.ChaincodeId是否为nil以及chaincodeHdrExt.PayloadVisibility是否不为nil(当前为nil)

    3.1.2 检查是否是系统链码并且不为外部调用

    preProcess()调用IsSysCCAndNotInvokableExternal()方法验证提案消息头部hdrExt.ChaincodeId.Name链码名对应的链码是否为允许外部调用的系统链码。遍历所有系统链码(lscc,vscc,escc,qscc,cscc),其中vscc、escc不为外部调用

    func (p *Provider) IsSysCCAndNotInvokableExternal(name string) bool {
    	for _, sysCC := range p.SysCCs {
    		if sysCC.Name() == name {
    			return !sysCC.InvokableExternal()
    		}
    	}
    
    	if isDeprecatedSysCC(name) {
    		return true
    	}
    
    	return false
    }
    
    func isDeprecatedSysCC(name string) bool {
    	return name == "vscc" || name == "escc"
    }
    

    3.1.3 检查签名提案消息交易id的唯一性

    首先查看是否存在该账本,然后查看账本是否存在该交易id。

    func (s *SupportImpl) GetTransactionByID(chid, txID string) (*pb.ProcessedTransaction, error) {
    	lgr := s.Peer.GetLedger(chid)
    	if lgr == nil {
    		return nil, errors.Errorf("failed to look up the ledger for Channel %s", chid)
    	}
    	tx, err := lgr.GetTransactionByID(txID)
    	if err != nil {
    		return nil, errors.WithMessage(err, "GetTransactionByID failed")
    	}
    	return tx, nil
    }
    

    3.1.4 验证是否满足对应通道的访问控制策略

    背书节点在背书过程中会检查是否满足应用通道的 Writers 策略。CheckACL()方法最后会调用core/endorser/support.go CheckACL()

    func (s *SupportImpl) CheckACL(signedProp *pb.SignedProposal, chdr *common.ChannelHeader, shdr *common.SignatureHeader, hdrext *pb.ChaincodeHeaderExtension) error {
    	return s.ACLProvider.CheckACL(resources.Peer_Propose, chdr.ChannelId, signedProp)
    }
    

    其中:

    Peer_Propose = "peer/Propose"
    d.cResourcePolicyMap[resources.Peer_Propose] = CHANNELWRITERS
    

    会根据签名提案消息类型调用
    core/aclmgmt/defaultaclprovider.go CheckACL()方法

    case *pb.SignedProposal:
    		return d.policyChecker.CheckPolicy(channelID, policy, typedData)
    

    最终会调用

    func (p *policyChecker) CheckPolicyBySignedData(channelID, policyName string, sd []*common.SignedData) error {
    	if channelID == "" {
    		return errors.New("Invalid channel ID name during check policy on signed data. Name must be different from nil.")
    	}
    
    	if policyName == "" {
    		return fmt.Errorf("Invalid policy name during check policy on signed data on channel [%s]. Name must be different from nil.", channelID)
    	}
    
    	if sd == nil {
    		return fmt.Errorf("Invalid signed data during check policy on channel [%s] with policy [%s]", channelID, policyName)
    	}
    
    	// Get Policy
    	policyManager, _ := p.channelPolicyManagerGetter.Manager(channelID)
    	if policyManager == nil {
    		return fmt.Errorf("Failed to get policy manager for channel [%s]", channelID)
    	}
    
    	// Recall that get policy always returns a policy object
    	policy, _ := policyManager.GetPolicy(policyName)
    
    	// Evaluate the policy
    	err := policy.Evaluate(sd)
    	if err != nil {
    		return fmt.Errorf("Failed evaluating policy on signed data during check policy on channel [%s] with policy [%s]: [%s]", channelID, policyName, err)
    	}
    	return nil
    }
    

    其中:

    sd := []*common.SignedData{{
    	Data:      signedProp.ProposalBytes,
    	Identity:  shdr.Creator,
    	Signature: signedProp.Signature,
    }}
    

    3.2 调用链码并模拟执行提案

    首先,ProcessProposal()方法调用方法acquireTxSimulator()根据链码判断是否需要创建交易模拟器TxSimulator,如果需要则创建交易模拟器TxSimulator(无法查询历史记录)以及历史记录查询器HistoryQueryExecutor,接着再调用SimulateProposal()模拟执行交易提案消息,并返回模拟执行结果。
    其中,链码qscc、cscc不需要交易模拟器。

    unc acquireTxSimulator(chainID string, ccid *pb.ChaincodeID) bool {
    	if chainID == "" {
    		return false
    	}
    
    	// ¯\_(ツ)_/¯ locking.
    	// Don't get a simulator for the query and config system chaincode.
    	// These don't need the simulator and its read lock results in deadlocks.
    	switch ccid.Name {
    	case "qscc", "cscc":
    		return false
    	default:
    		return true
    	}
    }
    

    在SimulateProposal()方法中,首先判断调用链码是否为系统链码:

    • 是 获取链码版本GetSysCCVersion()
    • 否 检查实例化策略以及获取版本CheckInstantiationPolicy()

    然后调用callChaincode()调用链码。接着从交易模拟器中获取模拟执行结果。其中,如果私密数据模拟执行结果不为nil,则分发私密数据。最后获取模拟执行结果的公有数据。

    if simResult, err = txParams.TXSimulator.GetTxSimulationResults(); err != nil {
    	txParams.TXSimulator.Done()
    	return nil, nil, nil, nil, err
    }
    
    if err := e.distributePrivateData(txParams.ChannelID, txParams.TxID, pvtDataWithConfig, endorsedAt); err != nil {
    	return nil, nil, nil, nil, err
    }
    			
    if pubSimResBytes, err = simResult.GetPubSimulationBytes(); err != nil {
        return nil, nil, nil, nil, err
    }		
    

    3.2.1 检查实例化策略

    CheckInstantiationPolicy()会调用GetChaincodeData()尝试从缓存或者本地文件系统获取已安装的链码包CCPackage,再解析成ChaincodeData对象ccdata。再与账本中保存的对应链码的实例化策略进行比较。

    func CheckInstantiationPolicy(name, version string, cdLedger *ChaincodeData) error {
    	ccdata, err := GetChaincodeData(name, version)
    	if err != nil {
    		return err
    	}
    	if ccdata.InstantiationPolicy != nil {
    		if !bytes.Equal(ccdata.InstantiationPolicy, cdLedger.InstantiationPolicy) {
    			return fmt.Errorf("Instantiation policy mismatch for cc %s/%s", name, version)
    		}
    	}
    
    	return nil
    }
    

    实例化策略在安装链码时指定

    3.2.2 调用链码

    在SimulateProposal()方法中,会调用callChaincode()方法调用链码。首先执行Execute()方法调用链码,然后在针对“deploy”和“upgrade”操作进行处理。

    3.2.2.1 Execute()操作

    SimulateProposal()方法调用Execute()执行链码,最终会调用core/chaincode/chaincode_support.go Execute()方法。

    func (cs *ChaincodeSupport) Execute(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, input *pb.ChaincodeInput) (*pb.Response, *pb.ChaincodeEvent, error) {
    	resp, err := cs.Invoke(txParams, cccid, input)
    	return processChaincodeExecutionResult(txParams.TxID, cccid.Name, resp, err)
    }
    
    • Invoke()方法 core/chaincode/chaincode_support.go Invoke()
      启动链码容器,调用链码
    func (cs *ChaincodeSupport) Invoke(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, input *pb.ChaincodeInput) (*pb.ChaincodeMessage, error) {
    	h, err := cs.Launch(txParams.ChannelID, cccid.Name, cccid.Version, txParams.TXSimulator)
    	if err != nil {
    		return nil, err
    	}
    	cctype := pb.ChaincodeMessage_TRANSACTION
    	return cs.execute(cctype, txParams, cccid, input, h)
    }
    

    消息类型为:ChaincodeMessage_TRANSACTION,其中还调用了execute()方法。

    • processChaincodeExecutionResult()方法 core/chaincode/chaincode_support.go processChaincodeExecutionResult()方法
      对链码执行结果进行处理
    func processChaincodeExecutionResult(txid, ccName string, resp *pb.ChaincodeMessage, err error) (*pb.Response, *pb.ChaincodeEvent, error) {
        ...
    	if resp.ChaincodeEvent != nil {
    		resp.ChaincodeEvent.ChaincodeId = ccName
    		resp.ChaincodeEvent.TxId = txid
    	}
    
    	switch resp.Type {
    	case pb.ChaincodeMessage_COMPLETED:
    		res := &pb.Response{}
    		err := proto.Unmarshal(resp.Payload, res)
    		if err != nil {
    			return nil, nil, errors.Wrapf(err, "failed to unmarshal response for transaction %s", txid)
    		}
    		return res, resp.ChaincodeEvent, nil
    
    	case pb.ChaincodeMessage_ERROR:
    		return nil, resp.ChaincodeEvent, errors.Errorf("transaction returned with failure: %s", resp.Payload)
    
    	default:
    		return nil, nil, errors.Errorf("unexpected response type %d for transaction %s", resp.Type, txid)
    	}
    }
    
    3.2.2.2 "deploy"/"upgrade" 操作

    主要实现方法为ExecuteLegacyInit(),该流程和Execute()操作类似

    func (cs *ChaincodeSupport) ExecuteLegacyInit(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, spec *pb.ChaincodeDeploymentSpec) (*pb.Response, *pb.ChaincodeEvent, error) {
    	ccci := ccprovider.DeploymentSpecToChaincodeContainerInfo(spec)
    	ccci.Version = cccid.Version
    
    	err := cs.LaunchInit(ccci)
    	if err != nil {
    		return nil, nil, err
    	}
    
    	cname := ccci.Name + ":" + ccci.Version
    	h := cs.HandlerRegistry.Handler(cname)
    	if h == nil {
    		return nil, nil, errors.Wrapf(err, "[channel %s] claimed to start chaincode container for %s but could not find handler", txParams.ChannelID, cname)
    	}
    
    	resp, err := cs.execute(pb.ChaincodeMessage_INIT, txParams, cccid, spec.GetChaincodeSpec().Input, h)
    	return processChaincodeExecutionResult(txParams.TxID, cccid.Name, resp, err)
    }
    

    其中,消息类型为ChaincodeMessage_INIT

    3.2.3 处理模拟执行结果

    针对模拟执行的结果进行处理。对链码模拟执行以后,将模拟执行结果写入交易模拟器TXSimulator中。通过调用GetTxSimulationResults()方法可以获取模拟执行结果。TxSimulationResults包含公有数据读写集PubSimulationResults以及私有数据读写集PvtSimulationResults。

    3.2.3.1 获取模拟执行结果

    SimulateProposal()方法会调用GetTxSimulationResults()方法获取模拟执行结果。
    源码如下所示。

    func (b *RWSetBuilder) GetTxSimulationResults() (*ledger.TxSimulationResults, error) {
        // 获取交易模拟执行结果的交易私密数据读写集
    	pvtData := b.getTxPvtReadWriteSet()
    	var err error
    
    	var pubDataProto *rwset.TxReadWriteSet
    	var pvtDataProto *rwset.TxPvtReadWriteSet
    
    	// Populate the collection-level hashes into pub rwset and compute the proto bytes for pvt rwset
    	// 计算私密数据hash
    	if pvtData != nil {
    		if pvtDataProto, err = pvtData.toProtoMsg(); err != nil {
    			return nil, err
    		}
    		// 遍历计算私密数据hash值
    		for _, ns := range pvtDataProto.NsPvtRwset {
    			for _, coll := range ns.CollectionPvtRwset {
    			    // 计算并设置私密数据hash 
    				b.setPvtCollectionHash(ns.Namespace, coll.CollectionName, coll.Rwset)
    			}
    		}
    	}
    	// Compute the proto bytes for pub rwset
    	// 获取交易模拟执行结果的公有数据读写集
    	pubSet := b.GetTxReadWriteSet()
    	if pubSet != nil {
    		if pubDataProto, err = b.GetTxReadWriteSet().toProtoMsg(); err != nil {
    			return nil, err
    		}
    	}
    	// 构造交易模拟执行结果
    	return &ledger.TxSimulationResults{
    		PubSimulationResults: pubDataProto,
    		PvtSimulationResults: pvtDataProto,
    	}, nil
    }
    
    3.2.3.2 数据处理
    • 私密数据处理
      SimulateProposal()方法会检查模拟执行结果里面的PvtSimulationResults是否为nil,如果不为nil,则会通过AssemblePvtRWSet()方法将TxPvtReadWriteSet,扩充到TxPvtReadWriteSetWithConfigInfo,并添加与私有读写集相关的可用集合配置信息。再获取当前账本高度。调用gossip模块的distributePrivateData()方法(本质上是gossip/service/gossip_service.go DistributePrivateData方法)将私密数据分发到通道内符合策略的其他peer节点上。并暂时保存私密数据到本地瞬时数据库(transient store)中(交易验证和提交账本时会进行处理)。
    func (g *gossipServiceImpl) DistributePrivateData(chainID string, txID string, privData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error {
    	g.lock.RLock()
    	handler, exists := g.privateHandlers[chainID]
    	g.lock.RUnlock()
    	if !exists {
    		return errors.Errorf("No private data handler for %s", chainID)
    	}
    
    	if err := handler.distributor.Distribute(txID, privData, blkHt); err != nil {
    		logger.Error("Failed to distributed private collection, txID", txID, "channel", chainID, "due to", err)
    		return err
    	}
    
    	if err := handler.coordinator.StorePvtData(txID, privData, blkHt); err != nil {
    		logger.Error("Failed to store private data into transient store, txID",
    			txID, "channel", chainID, "due to", err)
    		return err
    	}
    	return nil
    }
    
    • 公有数据处理
      序列号公有数据读写集并返回结果。
    if pubSimResBytes, err = simResult.GetPubSimulationBytes(); err != nil {
    			return nil, nil, nil, nil, err
    }
    
    func (txSim *TxSimulationResults) GetPubSimulationBytes() ([]byte, error) {
    	return proto.Marshal(txSim.PubSimulationResults)
    }
    

    3.3 签名背书

    在ProcessProposal()方法中,首先会判断通道id是否为nil,如果为nil,则直接返回响应结果(例如install操作)。如果不为nil,会调用endorseProposal()方法对模拟执行结果进行签名和背书。在endorseProposal()方法中,会构造Context对象,再调用EndorseWithPlugin()里面会调用getOrCreatePlugin()创建plugin,然后调用proposalResponsePayloadFromContext()方法,在该方法中会计算背书结果hash以及封装模拟执行结果、链码event事件以及链码响应结果等(数据结构为ProposalResponsePayload),在序列化成[]byte数组,最后调用Endorse()方法执行签名背书操作(由于escc现在是插件形式执行,里面会进行判断。默认执行escc)。

    func (e *DefaultEndorsement) Endorse(prpBytes []byte, sp *peer.SignedProposal) (*peer.Endorsement, []byte, error) {
    	signer, err := e.SigningIdentityForRequest(sp)
    	if err != nil {
    		return nil, nil, errors.Wrap(err, "failed fetching signing identity")
    	}
    	// serialize the signing identity
    	identityBytes, err := signer.Serialize()
    	if err != nil {
    		return nil, nil, errors.Wrapf(err, "could not serialize the signing identity")
    	}
    
    	// sign the concatenation of the proposal response and the serialized endorser identity with this endorser's key
    	signature, err := signer.Sign(append(prpBytes, identityBytes...))
    	if err != nil {
    		return nil, nil, errors.Wrapf(err, "could not sign the proposal response payload")
    	}
    	endorsement := &peer.Endorsement{Signature: signature, Endorser: identityBytes}
    	return endorsement, prpBytes, nil
    }
    
    如果你觉得写的不错,请移步www.itkezhan.top或者关注公众号IT程序员客栈
  • 相关阅读:
    动态横向(水平)合并Repeater数据行DataItem的列
    动态绑数据(Repeater控件HeaderTemplate和ItemTemplate)
    动态横向(水平)合并GridView数据行DataRow的列
    动态绑数据(GridView控件Header和ItemTemplate)
    用具体列名替代星号
    如何实现数据行转换列显示
    用LINQ获取XML节点数据
    从字符串中获取XML节点数据
    字符串创建XML文档
    根据Attribute值条件对XML文档进行修改
  • 原文地址:https://www.cnblogs.com/i-dandan/p/12163377.html
Copyright © 2011-2022 走看看