zoukankan      html  css  js  c++  java
  • fabric源码学习笔记

    Hyperledger Fabric Orderer部分源码分析

    笔记

    Orderer的数据结构

    前言

    Orderer负责接收交易,把交易打包成区块,然后区块在所有Orderer节点之间达成一致,再分发给Peer的功能,这涉及了:

    • 网络:gRPC接受交易Brodcast,向peer发送区块
    • 切块:把交易按一定规则打包成区块
    • 共识:所有Orderer节点达成一致

    简单来讲Orderer和Peer有点像发布订阅,生产消费的关系

    Registrar

    |   |-- multichannel
    |   |   |-- blockwriter.go
    |   |   |-- blockwriter_test.go
    |   |   |-- chainsupport.go
    |   |   |-- chainsupport_test.go
    |   |   |-- registrar.go
    |   |   |-- registrar_test.go
    |   |   `-- util_test.go
    
    

    Registrar serves as a point of access and control for the individual channel resources.

    它负责了每个channel资源的访问和控制点,要对某个通道(链)怎么样,得从这入手。(通道和链其实都是区块链)

    type Registrar struct {
    	lock               sync.RWMutex
    	// 保存了每一条链,每一条链在Orderer中都以ChainSupport代表,也就是说根据一个链的chainID来查询进而进行后续操作
    	chains             map[string]*ChainSupport
    	config             localconfig.TopLevel
    	// 保存了所有的共识插件,每个共识插件都是一个Consenter,Fabric 1.4中共识插件有Solo、Kafka、EtcdRaft
    	consenters         map[string]consensus.Consenter
    	// 用来读取和创建链的账本
    	ledgerFactory      blockledger.Factory
    	// 用来对Orderer中的数据进行签名,以及创建SignatureHeader
    	signer             crypto.LocalSigner
    	blockcutterMetrics *blockcutter.Metrics
    	// 系统链ID
    	systemChannelID    string
    	// 系统链实例
    	systemChannel      *ChainSupport
    	templator          msgprocessor.ChannelConfigTemplator
    	callbacks          []channelconfig.BundleActor
    }
    

    ChainSupport

    ChainSupport汇集了一条通道所需要的所有资源,所以说一个ChainSupport代表了一条链。

    // ChainSupport holds the resources for a particular channel.
    type ChainSupport struct {
    	// 账本读写
    	*ledgerResources
    	// 处理交易信息,根据不同类型有不同的处理
    	msgprocessor.Processor
    	// 把区块写入到账本,看源码这里是只允许一个thread操作,因此也加入了协程的操作
    	*BlockWriter
    	// Orderer的共识实例,比如每条链都有自己的Raft共识实例,它们互不干扰
    	consensus.Chain
    	// 交易切块
    	cutter blockcutter.Receiver
    	crypto.LocalSigner
    }
    

    Chain


    Chain是接口,它的实现并不一条链,而是一条链的共识实例,可以是Solo、Kafka和EtcdRaft,它运行在单独的协程,使用Channel和ChainSupport通信,它调用其它接口完成切块,以及让所有的Orderer节点对交易达成一致。

    type Chain interface {
    	// 普通消息/交易排序
    	Order(env *cb.Envelope, configSeq uint64) error
    
    	// 配置消息/交易排序
    	Configure(config *cb.Envelope, configSeq uint64) error
    
    	// 等待排序集群可用
    	WaitReady() error
    
    	// 当排序集群发送错误时,会关闭返回的通道
    	Errored() <-chan struct{}
    
    	// 启动当前链
    	Start()
    
    	// 停止当前链,并释放资源
    	Halt()
    }
    

    Consenter

    type Consenter interface {
    	HandleChain(support ConsenterSupport, metadata *cb.Metadata) (Chain, error)
    }
    

    Consenter也是接口,它只有1个功能用来创建Chain。每种共识插件,都有自己单独的consenter实现,分别用来创建solo实例、kafka实例或etcdraft实例。

    ConsenterSupport

    type ConsenterSupport interface {
    	crypto.LocalSigner
    	msgprocessor.Processor
    
    	// VerifyBlockSignature verifies a signature of a block with a given optional
    	// configuration (can be nil).
    	VerifyBlockSignature([]*cb.SignedData, *cb.ConfigEnvelope) error
    
    	// BlockCutter returns the block cutting helper for this channel.
    	// 把消息切成区块
    	BlockCutter() blockcutter.Receiver
    
    	// SharedConfig provides the shared config from the channel's current config block.
    	// 当前链的Orderer配置
    	SharedConfig() channelconfig.Orderer
    
    	// ChannelConfig provides the channel config from the channel's current config block.
    	// 当前链的通道配置
    	ChannelConfig() channelconfig.Channel
    
    	// CreateNextBlock takes a list of messages and creates the next block based on the block with highest block number committed to the ledger
    	// Note that either WriteBlock or WriteConfigBlock must be called before invoking this method a second time.
    	// 生成区块
    	CreateNextBlock(messages []*cb.Envelope) *cb.Block
    
    	// Block returns a block with the given number,
    	// or nil if such a block doesn't exist.
    	// 读区块
    	Block(number uint64) *cb.Block
    
    	// WriteBlock commits a block to the ledger.
    	// 写区块
    	WriteBlock(block *cb.Block, encodedMetadataValue []byte)
    
    	// WriteConfigBlock commits a block to the ledger, and applies the config update inside.
    	// 写区块并更新配置
    	WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte)
    
    	// Sequence returns the current config squence.
    	Sequence() uint64
    
    	// ChainID returns the channel ID this support is associated with.
    	ChainID() string
    
    	// Height returns the number of blocks in the chain this channel is associated with.
    	Height() uint64
    
    	// Append appends a new block to the ledger in its raw form,
    	// unlike WriteBlock that also mutates its metadata.
    	// 以原始数据的格式追加区块,不像WriteBlock那样修改元数据
    	Append(block *cb.Block) error
    }
    

    小结

    • Registrar 包容万象,主要是ChainSupport和Consenter,Consenter是可插拔的
    • ChainSupport 代表了一条链,能够指向属于本条链的共识实例,该共识实例由对应共识类型的Consenter创建
    • 共识实例使用ConsenterSupport访问共识外部资源

    Order主要模块

    orderer
    ├── README.md
    ├── common
    │   ├── blockcutter 缓存待打包的交易,切块
    │   ├── bootstrap 启动时替换通道创世块
    │   ├── broadcast orderer的Broadcast接口
    │   ├── cluster (Raft)集群服务
    │   ├── localconfig 解析orderer配置文件orderer.yaml
    │   ├── metadata 区块元数据填写
    │   ├── msgprocessor 交易检查
    │   ├── multichannel 多通道支持:Registrar、chainSupport、写区块
    │   └── server Orderer节点的服务端程序
    ├── consensus 共识插件
    │   ├── consensus.go 共识插件需要实现的接口等定义
    │   ├── etcdraft raft共识插件
    │   ├── inactive 未激活时的raft
    │   ├── kafka kafka共识插件
    │   ├── mocks 测试用的共识插件
    │   └── solo solo共识插件
    ├── main.go orderer程序入口
    ├── mocks
    │   ├── common
    │   └── util
    └── sample_clients orderer的客户端程序样例
        ├── broadcast_config
        ├── broadcast_msg
        └── deliver_stdout
    
    

    Orderer启动过程

    步骤

    • 加载配置文件
    • 设置Logger
    • 设置本地MSP
    • 核心启动部分:
      • 加载创世块
      • 创建账本工厂
      • 创建本机gRPCServer
      • 如果共识需要集群(raft),创建集群gRPCServer
      • 创建Registrar:设置好共识插件,启动各通道,如果共识是raft,还会设置集群的gRPC接口处理函数Step
      • 创建本机server:它是原子广播的处理服务,融合了Broadcast处理函数、deliver处理函数和registrar
      • 开启profile
      • 启动集群gRPC服务
      • 启动本机gRPC服务

    启动入口main

    github.comhyperledgerfabricorderercommonservermain.go

    // Main is the entry point of orderer process
    func Main() {
    	fullCmd := kingpin.MustParse(app.Parse(os.Args[1:]))
    
    	// "version" command
    	if fullCmd == version.FullCommand() {
    		fmt.Println(metadata.GetVersionInfo())
    		return
    	}
    
    	// 从本地配置文件和环境变量中读取配置信息,构建配置树结构
    	conf, err := localconfig.Load()
    	if err != nil {
    		logger.Error("failed to parse config: ", err)
    		os.Exit(1)
    	}
    	// 配置日志级别
    	initializeLogging()
    	// 配置 MSP 结构
    	initializeLocalMsp(conf)
    
    	prettyPrintStruct(conf)
    	// 完成启动后的核心工作
    	Start(fullCmd, conf)
    }
    

    Start

    github.comhyperledgerfabricorderercommonservermain.go

    // Start provides a layer of abstraction for benchmark test
    func Start(cmd string, conf *localconfig.TopLevel) {
    	// 加载/创建创世块
    	bootstrapBlock := extractBootstrapBlock(conf)
    	if err := ValidateBootstrapBlock(bootstrapBlock); err != nil {
    		logger.Panicf("Failed validating bootstrap block: %v", err)
    	}
    
    	opsSystem := newOperationsSystem(conf.Operations, conf.Metrics)
    	err := opsSystem.Start()
    	if err != nil {
    		logger.Panicf("failed to initialize operations subsystem: %s", err)
    	}
    	defer opsSystem.Stop()
    	metricsProvider := opsSystem.Provider
    
    	// 创建操作的账本工厂
    	lf, _ := createLedgerFactory(conf, metricsProvider)
    	sysChanLastConfigBlock := extractSysChanLastConfig(lf, bootstrapBlock)
    	clusterBootBlock := selectClusterBootBlock(bootstrapBlock, sysChanLastConfigBlock)
    
    	// 初始化签名结构
    	signer := localmsp.NewSigner()
    
    	logObserver := floggingmetrics.NewObserver(metricsProvider)
    	flogging.Global.SetObserver(logObserver)
    
    	serverConfig := initializeServerConfig(conf, metricsProvider)
    	// 创建本机gRPC服务连接
    	grpcServer := initializeGrpcServer(conf, serverConfig)
    	caSupport := &comm.CredentialSupport{
    		AppRootCAsByChain:           make(map[string]comm.CertificateBundle),
    		OrdererRootCAsByChainAndOrg: make(comm.OrgRootCAs),
    		ClientRootCAs:               serverConfig.SecOpts.ClientRootCAs,
    	}
    
    	var r *replicationInitiator
    	clusterServerConfig := serverConfig
    	clusterGRPCServer := grpcServer // by default, cluster shares the same grpc server
    	clusterClientConfig := comm.ClientConfig{SecOpts: &comm.SecureOptions{}, KaOpts: &comm.KeepaliveOptions{}}
    	var clusterDialer *cluster.PredicateDialer
    
    	var reuseGrpcListener bool
    	typ := consensusType(bootstrapBlock)
    	var serversToUpdate []*comm.GRPCServer
    
    	clusterType := isClusterType(clusterBootBlock)
    	// 如果共识需要集群(raft),创建集群gRPCServer
    	if clusterType {
    		logger.Infof("Setting up cluster for orderer type %s", typ)
    
    		clusterClientConfig = initializeClusterClientConfig(conf)
    		clusterDialer = &cluster.PredicateDialer{
    			ClientConfig: clusterClientConfig,
    		}
    
    		r = createReplicator(lf, bootstrapBlock, conf, clusterClientConfig.SecOpts, signer)
    		// Only clusters that are equipped with a recent config block can replicate.
    		if conf.General.GenesisMethod == "file" {
    			r.replicateIfNeeded(bootstrapBlock)
    		}
    
    		if reuseGrpcListener = reuseListener(conf, typ); !reuseGrpcListener {
    			clusterServerConfig, clusterGRPCServer = configureClusterListener(conf, serverConfig, ioutil.ReadFile)
    		}
    
    		// If we have a separate gRPC server for the cluster,
    		// we need to update its TLS CA certificate pool.
    		serversToUpdate = append(serversToUpdate, clusterGRPCServer)
    	}
    
    	// if cluster is reusing client-facing server, then it is already
    	// appended to serversToUpdate at this point.
    	if grpcServer.MutualTLSRequired() && !reuseGrpcListener {
    		serversToUpdate = append(serversToUpdate, grpcServer)
    	}
    
    	tlsCallback := func(bundle *channelconfig.Bundle) {
    		logger.Debug("Executing callback to update root CAs")
    		updateTrustedRoots(caSupport, bundle, serversToUpdate...)
    		if clusterType {
    			updateClusterDialer(caSupport, clusterDialer, clusterClientConfig.SecOpts.ServerRootCAs)
    		}
    	}
    
    	sigHdr, err := signer.NewSignatureHeader()
    	if err != nil {
    		logger.Panicf("Failed creating a signature header: %v", err)
    	}
    
    	expirationLogger := flogging.MustGetLogger("certmonitor")
    	crypto.TrackExpiration(
    		serverConfig.SecOpts.UseTLS,
    		serverConfig.SecOpts.Certificate,
    		[][]byte{clusterClientConfig.SecOpts.Certificate},
    		sigHdr.Creator,
    		expirationLogger.Warnf, // This can be used to piggyback a metric event in the future
    		time.Now(),
    		time.AfterFunc)
    
    	// 初始化Registrar
    	manager := initializeMultichannelRegistrar(clusterBootBlock, r, clusterDialer, clusterServerConfig, clusterGRPCServer, conf, signer, metricsProvider, opsSystem, lf, tlsCallback)
    	mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
    	expiration := conf.General.Authentication.NoExpirationChecks
    	// 初始化gRPC服务端结构
    	// 分别响应 Deliver() 和 Broadcast() 两个 gRPC 调用。
    	server := NewServer(manager, metricsProvider, &conf.Debug, conf.General.Authentication.TimeWindow, mutualTLS, expiration)
    
    	logger.Infof("Starting %s", metadata.GetVersionInfo())
    	go handleSignals(addPlatformSignals(map[os.Signal]func(){
    		syscall.SIGTERM: func() {
    			grpcServer.Stop()
    			if clusterGRPCServer != grpcServer {
    				clusterGRPCServer.Stop()
    			}
    		},
    	}))
    
    	if !reuseGrpcListener && clusterType {
    		logger.Info("Starting cluster listener on", clusterGRPCServer.Address())
    		go clusterGRPCServer.Start()
    	}
    	// 开启profile服务
    	initializeProfilingService(conf)
    	// 绑定gRPC服务并启动
    	ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)
    	logger.Info("Beginning to serve requests")
    	grpcServer.Start()
    }
    
    

    extractSysChanLastConfig

    github.comhyperledgerfabricorderercommonservermain.go

    // Extract system channel last config block
    func extractSysChanLastConfig(lf blockledger.Factory, bootstrapBlock *cb.Block) *cb.Block {
    	// Are we bootstrapping?
    	chainCount := len(lf.ChainIDs())
    	// 如果是首次启动,默认先创建系统通道的本地账本结构
    	if chainCount == 0 {
    		logger.Info("Bootstrapping because no existing channels")
    		return nil
    	}
        ...
    }
    

    initializeMultichannelRegistrar

    github.comhyperledgerfabricorderercommonservermain.go

    func initializeMultichannelRegistrar(
    	bootstrapBlock *cb.Block,
    	ri *replicationInitiator,
    	clusterDialer *cluster.PredicateDialer,
    	srvConf comm.ServerConfig,
    	srv *comm.GRPCServer,
    	conf *localconfig.TopLevel,
    	signer crypto.LocalSigner,
    	metricsProvider metrics.Provider,
    	healthChecker healthChecker,
    	lf blockledger.Factory,
    	callbacks ...channelconfig.BundleActor,
    ) *multichannel.Registrar {
    	genesisBlock := extractBootstrapBlock(conf)
    	// Are we bootstrapping?
    	if len(lf.ChainIDs()) == 0 {
    		initializeBootstrapChannel(genesisBlock, lf)
    	} else {
    		logger.Info("Not bootstrapping because of existing channels")
    	}
    
    	consenters := make(map[string]consensus.Consenter)
    
    	// 创建Registrar
    	registrar := multichannel.NewRegistrar(*conf, lf, signer, metricsProvider, callbacks...)
    
    	var icr etcdraft.InactiveChainRegistry
    	if isClusterType(bootstrapBlock) {
    		etcdConsenter := initializeEtcdraftConsenter(consenters, conf, lf, clusterDialer, bootstrapBlock, ri, srvConf, srv, registrar, metricsProvider)
    		icr = etcdConsenter.InactiveChainRegistry
    	}
    
    	// 初始化共识插件
    	consenters["solo"] = solo.New()
    	var kafkaMetrics *kafka.Metrics
    	consenters["kafka"], kafkaMetrics = kafka.New(conf.Kafka, metricsProvider, healthChecker, icr, registrar.CreateChain)
    	// Note, we pass a 'nil' channel here, we could pass a channel that
    	// closes if we wished to cleanup this routine on exit.
    	go kafkaMetrics.PollGoMetricsUntilStop(time.Minute, nil)
    
    	registrar.Initialize(consenters)
    	return registrar
    }
    

    Initialize

    github.comhyperledgerfabricorderercommonmultichannel egistrar.go

    // 启动共识
    func (r *Registrar) Initialize(consenters map[string]consensus.Consenter) {
    	r.consenters = consenters
    	existingChains := r.ledgerFactory.ChainIDs()
    
    	// 启动本地所有的账本结构的共识过程
    	for _, chainID := range existingChains {
    		rl, err := r.ledgerFactory.GetOrCreate(chainID)
    		if err != nil {
    			logger.Panicf("Ledger factory reported chainID %s but could not retrieve it: %s", chainID, err)
    		}
    		configTx := configTx(rl)
    		if configTx == nil {
    			logger.Panic("Programming error, configTx should never be nil here")
    		}
    		ledgerResources := r.newLedgerResources(configTx)
    		chainID := ledgerResources.ConfigtxValidator().ChainID()
    
    		// 如果是系统账本(默认在首次启动时会自动创建)
    		// 启动共识过程
    		if _, ok := ledgerResources.ConsortiumsConfig(); ok {
    			if r.systemChannelID != "" {
    				logger.Panicf("There appear to be two system chains %s and %s", r.systemChannelID, chainID)
    			}
    
    			chain := newChainSupport(
    				r,
    				ledgerResources,
    				r.consenters,
    				r.signer,
    				r.blockcutterMetrics,
    			)
    			r.templator = msgprocessor.NewDefaultTemplator(chain)
    			chain.Processor = msgprocessor.NewSystemChannel(chain, r.templator, msgprocessor.CreateSystemChannelFilters(r, chain, r.config))
    
    			// Retrieve genesis block to log its hash. See FAB-5450 for the purpose
    			iter, pos := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{Oldest: &ab.SeekOldest{}}})
    			defer iter.Close()
    			if pos != uint64(0) {
    				logger.Panicf("Error iterating over system channel: '%s', expected position 0, got %d", chainID, pos)
    			}
    			genesisBlock, status := iter.Next()
    			if status != cb.Status_SUCCESS {
    				logger.Panicf("Error reading genesis block of system channel '%s'", chainID)
    			}
    			logger.Infof("Starting system channel '%s' with genesis block hash %x and orderer type %s",
    				chainID, genesisBlock.Header.Hash(), chain.SharedConfig().ConsensusType())
    
    			r.chains[chainID] = chain
    			r.systemChannelID = chainID
    			r.systemChannel = chain
    			// We delay starting this chain, as it might try to copy and replace the chains map via newChain before the map is fully built
    			// 启动共识过程
    			defer chain.start()
    		} else { // 应用账本
    			logger.Debugf("Starting chain: %s", chainID)
    			chain := newChainSupport(
    				r,
    				ledgerResources,
    				r.consenters,
    				r.signer,
    				r.blockcutterMetrics,
    			)
    			r.chains[chainID] = chain
    			chain.start()
    		}
    
    	}
    
    	if r.systemChannelID == "" {
    		logger.Panicf("No system chain found.  If bootstrapping, does your system channel contain a consortiums group definition?")
    	}
    }
    

    chain.start()

    1.0是如此,以协程的方式

    func (chain *chainImpl) Start() {
        go startThread(chain)
    }
    

    1.4看起来是根据不同的公式插件不同处理

    // TODO待研究是如何进行插拔的

    // Start instructs the orderer to begin serving the chain and keep it current.
    func (c *Chain) Start() {
    	c.logger.Infof("Starting Raft node")
    
    	if err := c.configureComm(); err != nil {
    		c.logger.Errorf("Failed to start chain, aborting: +%v", err)
    		close(c.doneC)
    		return
    	}
    
    	isJoin := c.support.Height() > 1
    	if isJoin && c.opts.MigrationInit {
    		isJoin = false
    		c.logger.Infof("Consensus-type migration detected, starting new raft node on an existing channel; height=%d", c.support.Height())
    	}
    	c.Node.start(c.fresh, isJoin)
    
    	close(c.startC)
    	close(c.errorC)
    
    	go c.gc()
    	go c.serveRequest()
    
    	es := c.newEvictionSuspector()
    
    	interval := DefaultLeaderlessCheckInterval
    	if c.opts.LeaderCheckInterval != 0 {
    		interval = c.opts.LeaderCheckInterval
    	}
    
    	c.periodicChecker = &PeriodicCheck{
    		Logger:        c.logger,
    		Report:        es.confirmSuspicion,
    		CheckInterval: interval,
    		Condition:     c.suspectEviction,
    	}
    	c.periodicChecker.Run()
    }
    

    小结

    主要分为两个部分:初始化配置,核心启动

    Orderer整体架构

    客户端通过Broadcast接口向Orderer提交背书过的交易,客户端(cli,peer)通过Deliver接口订阅区块事件,从Orderer获取区块

    架构图

    多通道

    Fabric 支持多通道特性,而Orderer是多通道的核心组成部分。多通道由Registrar、ChainSupport、BlockWriter等一些重要部件组成

    • Registrar是所有通道资源的汇总,访问每一条通道,都要经由Registrar
    • ChainSupport代表了每一条通道,它融合了一条通道所有的资源
    • BlockWriter 是区块达成共识后,Orderer写入区块到账本需要使用的接口

    共识插件

    Fabric的共识是插件化的,抽象出了Orderer所使用的共识接口,任何一种共识插件,只要满足给定的接口,就可以配合Fabric Orderer使用。

    gRPC通信

    • Broadcast:用来接受客户端提交的待排序的交易
    • Deliver:客户端用来接受Orderer节点发送的共识后的区块

    Local Config

    用来解析orderer节点的配置文件: orderer.yaml

    // Load parses the orderer YAML file and environment, producing
    // a struct suitable for config use, returning error on failure.
    func Load() (*TopLevel, error) {
    	config := viper.New()
    	coreconfig.InitViper(config, "orderer")
    	...
    }
    

    本地配置,不需要节点之间的统一配置,相关配置有:

    • 网络相关配置
    • 账本类型、位置
    • raft文件位置
    • ...

    通道配置/上链配置:

    • 共识类型
    • 区块大小
    • 切块时间
    • 区块交易数
    • 各种共识的配置
    • ...

    Metadata

    //TODO 暂未搞懂它干嘛的,生成区块后还可以继续修改,不知道存在意义是什么。

    MsgProcessor

    orderer收到交易后需要对交易进行多项检查,不同的通道可以设置不同的MsgProcessor,也就可以进行不同的检查

    当前Processor分为两个:

    • 应用通道的叫StandardChannel
    • 系统通道的叫SystemChannel
    func (s *StandardChannel) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
    	oc, ok := s.support.OrdererConfig()
    	if !ok {
    		logger.Panicf("Missing orderer config")
    	}
    	if oc.Capabilities().ConsensusTypeMigration() {
    		if oc.ConsensusState() != orderer.ConsensusType_STATE_NORMAL {
    			return 0, errors.WithMessage(
    				ErrMaintenanceMode, "normal transactions are rejected")
    		}
    	}
    
    	configSeq = s.support.Sequence()
    	err = s.filters.Apply(env)
    	return
    }
    

    这里主要是进行各种验证相关的操作

    	err = s.filters.Apply(env)
    
    func (rs *RuleSet) Apply(message *ab.Envelope) error {
    	for _, rule := range rs.rules {
    		err := rule.Apply(message)
    		if err != nil {
    			return err
    		}
    	}
    	return nil
    }
    

    BlockCutter

    BlockCutter用来把收到的交易分成多个组,每组交易会打包到一个区块中。而分组的过程,就是切块,每组交易被称为一个Batch,它有一个缓冲区用来存放待切块交易。

    type receiver struct {
        // 缓冲区收到配置交易,配置交易要放到单独区块,如果缓冲区有交易,缓冲区已有交易会先切到1个区块
    	sharedConfigFetcher   OrdererConfigFetcher
    	// 缓冲区内交易数,达到区块包含的交易上限(默认500)
    	pendingBatch          []*cb.Envelope
    	// 缓冲区内交易总大小,达到区块大小上限(默认10MB)
    	pendingBatchSizeBytes uint32
        // 缓冲区存在交易,并且未出块的时间,达到切块超时时间(默认2s)
    	PendingBatchStartTime time.Time
    	ChannelID             string
    	Metrics               *Metrics
    }
    

    这块更像是一个单纯的打包机器,不涉及排序,哪个交易先写入BlockCutter的缓冲区,哪个交易就在前面。

    BlockWriter

    写区块
    这一部分主要是区块的生成,写,落盘

    小结

    Orderer的功能,组成

    Orderer处理交易

    接受消息的数据结构

    // Envelope wraps a Payload with a signature so that the message may be authenticated
    type Envelope struct {
    	// A marshaled Payload
    	// 一个信息的body
    	Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"`
    	// A signature by the creator specified in the Payload header
    	// 一个发送者的签名
    	Signature            []byte   `protobuf:"bytes,2,opt,name=signature,proto3" json:"signature,omitempty"`
    	XXX_NoUnkeyedLiteral struct{} `json:"-"`
    	XXX_unrecognized     []byte   `json:"-"`
    	XXX_sizecache        int32    `json:"-"`
    }
    

    普通交易信息在Orderer中的流程

    • Orderer 的 Broadcast 接口收到来自客户端提交的交易,会获取交易所在的链的资源,并进行首次检查,然后提交给该链的共识,对交易进行排序,最后向客户端发送响应。
    // broadcast.go
    
    // Handle reads requests from a Broadcast stream, processes them, and returns the responses to the stream
    func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
    	// 获取发送者的addr
    	addr := util.ExtractRemoteAddress(srv.Context())
    	logger.Debugf("Starting new broadcast loop for %s", addr)
    	for {
    		// 循环等待请求,其实是通过Orderer 的 Broadcast 接口收到来自客户端提交的交易
    		msg, err := srv.Recv()
    		if err == io.EOF {
    			logger.Debugf("Received EOF from %s, hangup", addr)
    			return nil
    		}
    		if err != nil {
    			logger.Warningf("Error reading from %s: %s", addr, err)
    			return err
    		}
    
    		// 验证处理信息
    		resp := bh.ProcessMessage(msg, addr)
    		err = srv.Send(resp)
    		if resp.Status != cb.Status_SUCCESS {
    			return err
    		}
    
    		if err != nil {
    			logger.Warningf("Error sending to %s: %s", addr, err)
    			return err
    		}
    	}
    
    }
    

    配置交易在Orderer中的流程

    基本同上

    节点接受和广播区块

    核心过程

    Kafka作为共识插件情况

    • 客户端通过gRPC发送交易信息给orderer节点的Broadcast()接口
    • Orderer接口收到请求后,提取消息,解析检查消息,通过验证后封装发给Kafka
    • Orderer同时不断从Kafka拉去排序好的信息,然后会打包成区块

    Broadcast

    流程图


    Broadcast,意味着客户端将请求消息(例如完成背书后的交易)通过 gRPC 接口发送给 Ordering 服务。Orderer 进行本地验证处理后,会转到共识模块处理。

    Orderer接收的主要有那些消息:链码的实例化,调用;通道的创建和更新。

    代码流程

    解析消息
    • 来自客户端的消息,首先交给server处理
    // Broadcast receives a stream of messages from a client for ordering
    func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
    	logger.Debugf("Starting new Broadcast handler")
    	defer func() {
    		if r := recover(); r != nil {
    			logger.Criticalf("Broadcast client triggered panic: %s
    %s", r, debug.Stack())
    		}
    		logger.Debugf("Closing Broadcast stream")
    	}()
    	return s.bh.Handle(&broadcastMsgTracer{
    		AtomicBroadcast_BroadcastServer: srv,
    		msgTracer: msgTracer{
    			debug:    s.debug,
    			function: "Broadcast",
    		},
    	})
    }
    

    对应的grpc服务是broadcast里的recv

    type AtomicBroadcast_BroadcastServer interface {
    	Send(*BroadcastResponse) error
    	Recv() (*common.Envelope, error)
    	grpc.ServerStream
    }
    

    其实client到orderer的信息是通过Envelope包装

    type Envelope struct {
    	// A marshaled Payload
    	Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"`
    	// A signature by the creator specified in the Payload header
    	Signature            []byte   `protobuf:"bytes,2,opt,name=signature,proto3" json:"signature,omitempty"`
    	XXX_NoUnkeyedLiteral struct{} `json:"-"`
    	XXX_unrecognized     []byte   `json:"-"`
    	XXX_sizecache        int32    `json:"-"`
    }
    

    到此,orderer的rpc部分的接受结束,接下来就是对信息的解析验证排序处理,然后返回给client一个响应

    type AtomicBroadcast_BroadcastServer interface {
    	Send(*BroadcastResponse) error
    	Recv() (*common.Envelope, error)
    	grpc.ServerStream
    }
    

    其中的信息

    • broadcas.Handler()
      • 解析消息:判断是否为配置消息,决定消息应由哪个通道结构进行处理,注意对于创建应用通道消息,处理器指定为系统的通道结构;
      • 处理消息:选用对应的通道结构对消息进行处理,包括普通消息和配置消息;
      • 返回响应消息给请求方。
    // Handler is designed to handle connections from Broadcast AB gRPC service
    type Handler struct {
    	SupportRegistrar ChannelSupportRegistrar
    	Metrics          *Metrics
    }
    
    // Handle reads requests from a Broadcast stream, processes them, and returns the responses to the stream
    func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
    	// 获取发送者的addr
    	addr := util.ExtractRemoteAddress(srv.Context())
    	logger.Debugf("Starting new broadcast loop for %s", addr)
    	for {
    		// 循环等待请求,其实是通过Orderer 的 Broadcast 接口收到来自客户端提交的交易
    		msg, err := srv.Recv()
    		if err == io.EOF {
    			logger.Debugf("Received EOF from %s, hangup", addr)
    			return nil
    		}
    		if err != nil {
    			logger.Warningf("Error reading from %s: %s", addr, err)
    			return err
    		}
    
    		// 验证处理接受信息
    		resp := bh.ProcessMessage(msg, addr)
    		// 将信息广播出去
    		err = srv.Send(resp)
    		if resp.Status != cb.Status_SUCCESS {
    			return err
    		}
    
    		if err != nil {
    			logger.Warningf("Error sending to %s: %s", addr, err)
    			return err
    		}
    	}
    
    }
    
    • Registrar.BroadcastChannelSupport()

      • channel 头部从消息信封结构中解析出来
      • 是否为配置信息根据消息头中通道类型进行判断(是否为 cb.HeaderType_CONFIG_UPDATE)
      • 通过字典结构查到对应的 ChainSupport 结构(应用通道、系统通道)作为处理器。
    // BroadcastChannelSupport returns the message channel header, whether the message is a config update
    // and the channel resources for a message or an error if the message is not a message which can
    // be processed directly (like CONFIG and ORDERER_TRANSACTION messages)
    func (r *Registrar) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, *ChainSupport, error) {
    	// channel 头部从消息信封结构中解析出来;是否为配置信息根据消息头中通道类型进行判断(是否为 cb.HeaderType_CONFIG_UPDATE)
    	chdr, err := utils.ChannelHeader(msg)
    	if err != nil {
    		return nil, false, nil, fmt.Errorf("could not determine channel ID: %s", err)
    	}
    
    	// 获取通道
    	cs := r.GetChain(chdr.ChannelId)
    	// New channel creation
    	// 如果为空则默认系统通道。如收到新建应用通道请求时,Orderer 本地并没有该应用通道对应结构,因此也为空。
    	if cs == nil {
    		cs = r.systemChannel
    	}
    
    	isConfig := false
    	switch cs.ClassifyMsg(chdr) {
    	// 只有 CONFIG_UPDATE 会返回 ConfigUpdateMsg
    	case msgprocessor.ConfigUpdateMsg:
    		isConfig = true
    	case msgprocessor.ConfigMsg:
    		return chdr, false, nil, errors.New("message is of type that cannot be processed directly")
    	default:
    	}
    
    	return chdr, isConfig, cs, nil
    }
    
    • commonutils.ChannelHeader()
    // ChannelHeader returns the *cb.ChannelHeader for a given *cb.Envelope.
    func ChannelHeader(env *cb.Envelope) (*cb.ChannelHeader, error) {
        // 解析消息体
    	envPayload, err := UnmarshalPayload(env.Payload)
    	if err != nil {
    		return nil, err
    	}
    
    	if envPayload.Header == nil {
    		return nil, errors.New("header not set")
    	}
    
    	if envPayload.Header.ChannelHeader == nil {
    		return nil, errors.New("channel header not set")
    	}
        // 解析真正的通道头
    	chdr, err := UnmarshalChannelHeader(envPayload.Header.ChannelHeader)
    	if err != nil {
    		return nil, errors.WithMessage(err, "error unmarshaling channel header")
    	}
    
    	return chdr, nil
    }
    
    根据解析结果,处理交易信息
    普通交易信息
    // ProcessMessage validates and enqueues a single message
    func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) {
            ...
            消息检查
    		configSeq, err := processor.ProcessNormalMsg(msg)
    		
            // 这里主要看客户端和peer节点发来的消息类型,前面已经判断过了
            // 假设创建通道的信息,消息类型则为配置信息
            if !isConfig {
            ...
            共识排序
    		err = processor.Order(msg, configSeq)
            ...
    
    • 消息检查
    // ProcessNormalMsg will check the validity of a message based on the current configuration.  It returns the current
    // configuration sequence number and nil on success, or an error if the message is not valid
    func (s *StandardChannel) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
    	oc, ok := s.support.OrdererConfig()
    	if !ok {
    		logger.Panicf("Missing orderer config")
    	}
    	if oc.Capabilities().ConsensusTypeMigration() {
    		if oc.ConsensusState() != orderer.ConsensusType_STATE_NORMAL {
    			return 0, errors.WithMessage(
    				ErrMaintenanceMode, "normal transactions are rejected")
    		}
    	}
    	// 获取配置的序列号,映射到 common.configtx 包中 configManager 结构体的对应方法
    	configSeq = s.support.Sequence()
    	// 进行过滤检查,实现为 orderer.common.msgprocessor 包中 RuleSet 结构体的对应方法。
    	err = s.filters.Apply(env)
    	return
    }
    

    过滤器会在创建 ChainSupport 结构时候初始化:

    应用通道:orderer.common.mspprocessor 包中的

    CreateStandardChannelFilters(filterSupport channelconfig.Resources) *RuleSet
    

    系统通道:orderer.common.mspprocessor 包中的

     CreateSystemChannelFilters(chainCreator ChainCreator, ledgerResources channelconfig.Resources) *RuleSet
    
    • 共识排序

    根据不同的共识插件,具体逻辑不一致
    // TODO 待补充

    配置交易信息
    config, configSeq, err := processor.ProcessConfigUpdateMsg(msg) // 合并配置更新,生成新的配置信封结构
    processor.Configure(config, configSeq) //入队列操作,将生成的配置信封结构消息扔给后端队列(如 Kafka)
    
    • standardchannel.ProcessConfigUpdateMsg()
    // orderer/common/msgprocessor/standardchannel.go
    func (s *StandardChannel) ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
        logger.Debugf("Processing config update message for channel %s", s.support.ChainID())
    
        seq := s.support.Sequence() // 获取当前配置的版本号
        err = s.filters.Apply(env) // 校验权限,是否可以更新配置
        if err != nil {
            return nil, 0, err
        }
    
        // 根据输入的更新配置交易消息生成配置信封结构:Config 为更新后配置字典;LastUpdate 为输入的更新配置交易
        // 最终调用 `common/configtx` 包下 `ValidatorImpl.ProposeConfigUpdate()` 方法。
        configEnvelope, err := s.support.ProposeConfigUpdate(env)
        if err != nil {
            return nil, 0, err
        }
    
        // 生成签名的配置信封结构,通道头类型为 HeaderType_CONFIG。即排序后消息类型将由 CONFIG_UPDATE 变更为 CONFIG
        config, err = utils.CreateSignedEnvelope(cb.HeaderType_CONFIG, s.support.ChainID(), s.support.Signer(), configEnvelope, msgVersion, epoch)
        if err != nil {
            return nil, 0, err
        }
    
        err = s.filters.Apply(config) // 校验生成的配置消息是否合法
        if err != nil {
            return nil, 0, err
        }
    
        return config, seq, nil
    }
    
    • standardchannel.ProcessConfigUpdateMsg()

    对于系统通道情况,除了调用普通通道结构的对应方法来处理普通的更新配置交易外,还会负责新建应用通道请求。

    func (s *SystemChannel) ProcessConfigUpdateMsg(envConfigUpdate *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
        // 首先从消息体获取通道ID
        channelID, err := utils.ChannelID(envConfigUpdate)
        
        // 判断获取到的通道ID是否为已经存在的用户通道ID,如果是的话转到StandardChannel中的ProcessConfigUpdateMsg()方法进行处理
        if channelID == s.support.ChainID() { 
            return s.StandardChannel.ProcessConfigUpdateMsg(envConfigUpdate)
        }
    
        // 从系统通道中获取当前最新的配置,例如创建通道则会走进这个方法
        // orderer/common/msgprocessor/systemchannel.go#DefaultTemplator.NewChannelConfig()
        bundle, err := s.templator.NewChannelConfig(envConfigUpdate)
    
        // 合并来自客户端的配置更新信封结构,创建配置信封结构 ConfigEnvelope
        newChannelConfigEnv, err := bundle.ConfigtxValidator().ProposeConfigUpdate(envConfigUpdate)
    
        // 封装新的签名信封结构,其 Payload.Data 是 newChannelConfigEnv
        newChannelEnvConfig, err := utils.CreateSignedEnvelope(cb.HeaderType_CONFIG, channelID, s.support.Signer(), newChannelConfigEnv, msgVersion, epoch)
    
        // 处理新建应用通道请求,封装为 ORDERER_TRANSACTION 类型消息
        wrappedOrdererTransaction, err := utils.CreateSignedEnvelope(cb.HeaderType_ORDERER_TRANSACTION, s.support.ChainID(), s.support.Signer(), newChannelEnvConfig, msgVersion, epoch)
    
        s.StandardChannel.filters.Apply(wrappedOrdererTransaction) // 再次校验配置
    
        // 返回封装后的签名信封结构
        return wrappedOrdererTransaction, s.support.Sequence(), nil
    }
    
    • DefaultTemplator.NewChannelConfig()
    // NewChannelConfig creates a new template channel configuration based on the current config in the ordering system channel.
    func (dt *DefaultTemplator) NewChannelConfig(envConfigUpdate *cb.Envelope) (channelconfig.Resources, error) {
    	// 反序列化payload
    	configUpdatePayload, err := utils.UnmarshalPayload(envConfigUpdate.Payload)
    	if err != nil {
    		return nil, fmt.Errorf("Failing initial channel config creation because of payload unmarshaling error: %s", err)
    	}
    	// 反序列化配置信息
    	configUpdateEnv, err := configtx.UnmarshalConfigUpdateEnvelope(configUpdatePayload.Data)
    	if err != nil {
    		return nil, fmt.Errorf("Failing initial channel config creation because of config update envelope unmarshaling error: %s", err)
    	}
    
    	if configUpdatePayload.Header == nil {
    		return nil, fmt.Errorf("Failed initial channel config creation because config update header was missing")
    	}
    
    	// 获取通道头信息
    	channelHeader, err := utils.UnmarshalChannelHeader(configUpdatePayload.Header.ChannelHeader)
    	if err != nil {
    		return nil, fmt.Errorf("Failed initial channel config creation because channel header was malformed: %s", err)
    	}
    
    	// 反序列化配置更新信息
    	configUpdate, err := configtx.UnmarshalConfigUpdate(configUpdateEnv.ConfigUpdate)
    	if err != nil {
    		return nil, fmt.Errorf("Failing initial channel config creation because of config update unmarshaling error: %s", err)
    	}
    
    	if configUpdate.ChannelId != channelHeader.ChannelId {
    		return nil, fmt.Errorf("Failing initial channel config creation: mismatched channel IDs: '%s' != '%s'", configUpdate.ChannelId, channelHeader.ChannelId)
    	}
    
    	if configUpdate.WriteSet == nil {
    		return nil, fmt.Errorf("Config update has an empty writeset")
    	}
    
    	if configUpdate.WriteSet.Groups == nil || configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey] == nil {
    		return nil, fmt.Errorf("Config update has missing application group")
    	}
    
    	if uv := configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey].Version; uv != 1 {
    		return nil, fmt.Errorf("Config update for channel creation does not set application group version to 1, was %d", uv)
    	}
    
    	// 根据之前定义的各项策略对通道进行配置,具体的策略可以看configtx.yaml文件
    	consortiumConfigValue, ok := configUpdate.WriteSet.Values[channelconfig.ConsortiumKey]
    	if !ok {
    		return nil, fmt.Errorf("Consortium config value missing")
    	}
    
    	consortium := &cb.Consortium{}
    	err = proto.Unmarshal(consortiumConfigValue.Value, consortium)
    	if err != nil {
    		return nil, fmt.Errorf("Error reading unmarshaling consortium name: %s", err)
    	}
    
    	applicationGroup := cb.NewConfigGroup()
    	consortiumsConfig, ok := dt.support.ConsortiumsConfig()
    	if !ok {
    		return nil, fmt.Errorf("The ordering system channel does not appear to support creating channels")
    	}
    
    	consortiumConf, ok := consortiumsConfig.Consortiums()[consortium.Name]
    	if !ok {
    		return nil, fmt.Errorf("Unknown consortium name: %s", consortium.Name)
    	}
    
    	applicationGroup.Policies[channelconfig.ChannelCreationPolicyKey] = &cb.ConfigPolicy{
    		Policy: consortiumConf.ChannelCreationPolicy(),
    	}
    	applicationGroup.ModPolicy = channelconfig.ChannelCreationPolicyKey
    
    	// Get the current system channel config
    	// 获取当前系统通道的配置信息
    	systemChannelGroup := dt.support.ConfigtxValidator().ConfigProto().ChannelGroup
    
    	// If the consortium group has no members, allow the source request to have no members.  However,
    	// if the consortium group has any members, there must be at least one member in the source request
    	if len(systemChannelGroup.Groups[channelconfig.ConsortiumsGroupKey].Groups[consortium.Name].Groups) > 0 &&
    		len(configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey].Groups) == 0 {
    		return nil, fmt.Errorf("Proposed configuration has no application group members, but consortium contains members")
    	}
    
    	// If the consortium has no members, allow the source request to contain arbitrary members
    	// Otherwise, require that the supplied members are a subset of the consortium members
    	if len(systemChannelGroup.Groups[channelconfig.ConsortiumsGroupKey].Groups[consortium.Name].Groups) > 0 {
    		for orgName := range configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey].Groups {
    			consortiumGroup, ok := systemChannelGroup.Groups[channelconfig.ConsortiumsGroupKey].Groups[consortium.Name].Groups[orgName]
    			if !ok {
    				return nil, fmt.Errorf("Attempted to include a member which is not in the consortium")
    			}
    			applicationGroup.Groups[orgName] = proto.Clone(consortiumGroup).(*cb.ConfigGroup)
    		}
    	}
    
    	channelGroup := cb.NewConfigGroup()
    
    	// Copy the system channel Channel level config to the new config
    	// 将系统通道的配置信息复制
    	for key, value := range systemChannelGroup.Values {
    		channelGroup.Values[key] = proto.Clone(value).(*cb.ConfigValue)
    		if key == channelconfig.ConsortiumKey {
    			// Do not set the consortium name, we do this later
    			continue
    		}
    	}
    
    	for key, policy := range systemChannelGroup.Policies {
    		channelGroup.Policies[key] = proto.Clone(policy).(*cb.ConfigPolicy)
    	}
    
    	// Set the new config orderer group to the system channel orderer group and the application group to the new application group
    	// 新的配置信息中Order组配置使用系统通道的配置,同时将定义的application组配置赋值到新的配置信息
    	channelGroup.Groups[channelconfig.OrdererGroupKey] = proto.Clone(systemChannelGroup.Groups[channelconfig.OrdererGroupKey]).(*cb.ConfigGroup)
    	channelGroup.Groups[channelconfig.ApplicationGroupKey] = applicationGroup
    	channelGroup.Values[channelconfig.ConsortiumKey] = &cb.ConfigValue{
    		Value:     utils.MarshalOrPanic(channelconfig.ConsortiumValue(consortium.Name).Value()),
    		ModPolicy: channelconfig.AdminsPolicyKey,
    	}
    
    	// Non-backwards compatible bugfix introduced in v1.1
    	// The capability check should be removed once v1.0 is deprecated
    	if oc, ok := dt.support.OrdererConfig(); ok && oc.Capabilities().PredictableChannelTemplate() {
    		channelGroup.ModPolicy = systemChannelGroup.ModPolicy
    		zeroVersions(channelGroup)
    	}
    
    	// 将创建的新的配置打包为Bundle
    	bundle, err := channelconfig.NewBundle(channelHeader.ChannelId, &cb.Config{
    		ChannelGroup: channelGroup,
    	})
    
    	if err != nil {
    		return nil, err
    	}
    
    	return bundle, nil
    }
    
    • 共识部分
      // TODO 待补充

    • 返回响应成功,这里会是对信息接受成功的后对client的一个状态响应

    return &ab.BroadcastResponse{Status: cb.Status_SUCCESS}
    

    Deliver

    Deliver部分主要是将生成的区块发送给peer,这部分类似生产消费模型,orderer就是一个区块的生产者,peer就是消费区块的地方。

    orderer部分的主要逻辑在deliverBlocks,其中部分逻辑如下:

    	for {
    		if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
    			if number > chain.Reader().Height()-1 {
    				return cb.Status_NOT_FOUND, nil
    			}
    		}
    
    		var block *cb.Block
    		var status cb.Status
    
    		iterCh := make(chan struct{})
    
    		// 这里开启一个协程的作用是避免不必要的阻塞?
    		go func() {
    			block, status = cursor.Next()
    			close(iterCh)
    		}()
    
    		select {
    		case <-ctx.Done():
    			logger.Debugf("Context canceled, aborting wait for next block")
    			return cb.Status_INTERNAL_SERVER_ERROR, errors.Wrapf(ctx.Err(), "context finished before block retrieved")
    		case <-erroredChan:
    			// TODO, today, the only user of the errorChan is the orderer consensus implementations.  If the peer ever reports
    			// this error, we will need to update this error message, possibly finding a way to signal what error text to return.
    			logger.Warningf("Aborting deliver for request because the backing consensus implementation indicates an error")
    			return cb.Status_SERVICE_UNAVAILABLE, nil
    		case <-iterCh:
    			// Iterator has set the block and status vars
    		}
    
    		if status != cb.Status_SUCCESS {
    			logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
    			return status, nil
    		}
    
    		// increment block number to support FAIL_IF_NOT_READY deliver behavior
    		number++
    
    		if err := accessControl.Evaluate(); err != nil {
    			logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err)
    			return cb.Status_FORBIDDEN, nil
    		}
    
    		// 返回peer的区块请求
    		logger.Infof("Start delivering block [channel: %s, txid: %s, type: %s, from: %s, blockNum: %d, prevHash: %s] at (%s)", chdr.ChannelId, chdr.TxId, cb.HeaderType_name[chdr.Type], addr, number, base64.StdEncoding.EncodeToString(block.Header.DataHash[:]), strconv.FormatInt(time.Now().UnixNano(), 10))
    		if err := srv.SendBlockResponse(block); err != nil {
    			logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
    			return cb.Status_INTERNAL_SERVER_ERROR, err
    		}
    
    		logger.Infof("End delivering block [channel: %s, txid: %s, type: %s, from: %s, blockNum: %d, prevHash: %s] at (%s)", chdr.ChannelId, chdr.TxId, cb.HeaderType_name[chdr.Type], addr, number, base64.StdEncoding.EncodeToString(block.Header.DataHash[:]), strconv.FormatInt(time.Now().UnixNano(), 10))
    
    		h.Metrics.BlocksSent.With(labels...).Add(1)
    
    		// 如果读到了事先在请求消息中的结束的位置,跳出循环,因为peer第一次请求发送的stopNum很大,所以这个for循环会/// 不断的迭代
    		if stopNum == block.Header.Number {
    			break
    		}
    
    	}
    

    这个是一个并发的异步阻塞模型,可以看到每个区块的获取都是一个协程,select部分决定了是否可以继续执行下一步。

    func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
    	addr := util.ExtractRemoteAddress(srv.Context())
    	logger.Debugf("Starting new broadcast loop for %s", addr)
    	for {
    		msg, err := srv.Recv()
    		//logger.Infof("Recving a new message at %d", time.Now().UnixNano())
    		if err == io.EOF {
    			logger.Debugf("Received EOF from %s, hangup", addr)
    			return nil
    		}
    		if err != nil {
    			logger.Warningf("Error reading from %s: %s", addr, err)
    			return err
    		}
    		logger.Infof("Start Processing message at %d", time.Now().UnixNano())
    		resp := bh.ProcessMessage(msg, addr)
    		logger.Debugf("End Processing message at %d", time.Now().UnixNano())
    
    		err = srv.Send(resp)
    		if resp.Status != cb.Status_SUCCESS {
    			return err
    		}
    
    		if err != nil {
    			logger.Warningf("Error sending to %s: %s", addr, err)
    			return err
    		}
    	}
    
    }
    

    orderer接受到peer的区块请求rpc服务,orderer会通过SendBlockResponse服务来发送区块

    // ResponseSender defines the interface a handler must implement to send
    // responses.
    type ResponseSender interface {
    	SendStatusResponse(status cb.Status) error
    	SendBlockResponse(block *cb.Block) error
    }
    
    type AtomicBroadcast_DeliverServer interface {
    	Send(*DeliverResponse) error
    	Recv() (*common.Envelope, error)
    	grpc.ServerStream
    }
    

    流程图

    下图展示了Orderer向Peer传递区块的宏观视角,能够展示多个通道在Orderer和Peer间传递区块的情况:

    单通道区块同步

    生成区块流程

    总体步骤

    • 接收client的消息
    • 处理验证消息
    • 交给共识部分排序打包
    • 将块发送给peer

    详细内容可以对照前面来查看

    fabric-samples中的并发版本

    这是fabric官方的一个并发测试版本,优点是快速部署完整的网络加完善的脚本,缺点是目前只能在单机部署

    步骤

    • cd into the first-network folder within fabric-samples, e.g. cd ~/fabric-samples/first-network
    • Open docker-compose-cli.yaml in your favorite editor, and edit the following lines:
      • In the volumes section of the cli container, edit the second line which refers to the chaincode folder to point to the chaincode folder within the high-throughput folder,
        • e.g../../chaincode/:/opt/gopath/src/github.com/hyperledger/fabric/examples/chaincode/go --> ./../high-throughput/chaincode/:/opt/gopath/src/github.com/hyperledger/fabric/examples/chaincode/go
      • Again in the volumes section, edit the fourth line which refers to the scripts folder so it points to the scripts folder within the high-throughput folder, e.g.
        • ./scripts:/opt/gopath/src/github.com/hyperledger/fabric/peer/scripts/ --> ./../high-throughput/scripts/:/opt/gopath/src/github.com/hyperledger/fabric/peer/scripts/
      • Finally, comment out the docker exec cli scripts/script.sh command from the byfn.sh script by placing a # before it so that the standard BYFN end to end script doesn't run, e.g.
        • docker exec cli scripts/script.sh $CHANNEL_NAME $CLI_DELAY $LANGUAGE $CLI_TIMEOUT $VERBOSE
      • We can now bring our network up by typing in ./byfn.sh -m up -c mychannel
      • Open a new terminal window and enter the CLI container using docker exec -it cli bash, all operations on the network will happen within this container from now on.

    如果没有问题则可以进行对网络的操作,详情可参考官方文档

    如何利用wrk对这个网络进行测试

    • 下载编译wrk
    • 启动一个网络监听

    简单示例

    r := gin.Default()
            r.GET("/update", func(c *gin.Context) {
                    //读取配置文件,创建SDK对象
                    configProvider := config.FromFile("/root/fabric-samples/first-network/test/config.yaml")
                    sdk, err := fabsdk.New(configProvider)
                    if err != nil {
                            log.Fatalf("create sdk fail: %s
    ", err.Error())
                    }else{
                            fmt.Println("create a new fabsdk")
                    }
    
                    //调用合约
                    channelProvider := sdk.ChannelContext("mychannel",
                            fabsdk.WithUser("Admin"),
                            fabsdk.WithOrg("org1"))
    
                    channelClient, err := channel.New(channelProvider)
                    if err != nil {
                            log.Fatalf("create channel client fail: %s
    ", err.Error())
                    } else {
                            fmt.Println("create channelClient succeed")
                    }
    
                    println("---------------ok-----------")
    
                    //构建函数请求
                    var args1 [][]byte
    
                    args1 = append(args1, []byte("myvar"), []byte("10"), []byte("+"))
    
                    var target =[]string{"peer0.org1.example.com","peer1.org2.example.com"}
    
                    //调用channelClient执行请求
                    //response, err := channelClient.Execute(request,channel.WithTargetEndpoints(target...))
    
    
                    request := channel.Request{
                            ChaincodeID: "bigdatacc",
                            Fcn:         "update",
                            Args:        args1,
                    }
                    response, err := channelClient.Execute(request,channel.WithTargetEndpoints(target...))
                    //response, err := channelClient.Execute(request)
                    if err != nil {
                            log.Fatal("query fail: ", err.Error())
                    } else {
                            fmt.Printf("response is [%s]
    ", string(response.Payload))
                    }
            })
    
            r.Run(":8080")
    
    
    • 利用wrk对这个网络进行请求测试
    wrk -t10 -c150 -d10 --timeout 100 http://0.0.0.0:8080/update
    

    fabric源码中的protobuf对象

    大致是orderer和common这块的protobuf的分析

    orderer

    ab.proto

    • 广播响应

    这里指client段发送给orderer信息,然后orderer返回响应

    message BroadcastResponse {
        // Status code, which may be used to programatically respond to success/failure
        common.Status status = 1;
        // Info string which may contain additional information about the status returned
        string info = 2;
    }
    
    • 区块请求信息

    这里指peer段向orderer请求区块的一些信息

    message SeekNewest { }
    
    message SeekOldest { }
    
    message SeekPosition {
        oneof Type {
            SeekNewest newest = 1;
            SeekOldest oldest = 2;
            SeekSpecified specified = 3;
        }
    }
    
    message SeekInfo {
       // If BLOCK_UNTIL_READY is specified, the reply will block until the requested blocks are available,
       // if FAIL_IF_NOT_READY is specified, the reply will return an error indicating that the block is not
       // found.  To request that all blocks be returned indefinitely as they are created, behavior should be
       // set to BLOCK_UNTIL_READY and the stop should be set to specified with a number of MAX_UINT64
        enum SeekBehavior {
            BLOCK_UNTIL_READY = 0;
            FAIL_IF_NOT_READY = 1;
        }
    
        // SeekErrorTolerance indicates to the server how block provider errors should be tolerated.  By default,
        // if the deliver service detects a problem in the underlying block source (typically, in the orderer,
        // a consenter error), it will begin to reject deliver requests.  This is to prevent a client from waiting
        // for blocks from an orderer which is stuck in an errored state.  This is almost always the desired behavior
        // and clients should stick with the default STRICT checking behavior.  However, in some scenarios, particularly
        // when attempting to recover from a crash or other corruption, it's desirable to force an orderer to respond
        // with blocks on a best effort basis, even if the backing consensus implementation is in an errored state.
        // In this case, set the SeekErrorResponse to BEST_EFFORT to ignore the consenter errors.
        enum SeekErrorResponse {
            STRICT = 0;
            BEST_EFFORT = 1;
        }
        SeekPosition start = 1;               // The position to start the deliver from
        SeekPosition stop = 2;                // The position to stop the deliver
        SeekBehavior behavior = 3;            // The behavior when a missing block is encountered
        SeekErrorResponse error_response = 4; // How to respond to errors reported to the deliver service
    }
    
    • 分发响应

    这里指orderer响应peer的区块请求

    message DeliverResponse {
        oneof Type {
            common.Status status = 1;
            common.Block block = 2;
        }
    }
    
    • 定义广播和分发rpc服务
    service AtomicBroadcast {
        // broadcast receives a reply of Acknowledgement for each common.Envelope in order, indicating success or type of failure
        rpc Broadcast(stream common.Envelope) returns (stream BroadcastResponse) {}
    
        // deliver first requires an Envelope of type DELIVER_SEEK_INFO with Payload data as a mashaled SeekInfo message, then a stream of block replies is received.
        rpc Deliver(stream common.Envelope) returns (stream DeliverResponse) {}
    }
    

    configuration.proto

    • 共识的类型定义
    message ConsensusType {
        // The consensus type: "solo", "kafka" or "etcdraft".
        string type = 1;
        // Opaque metadata, dependent on the consensus type.
        bytes metadata = 2;
    
        // State defines the orderer mode of operation, typically for consensus-type migration.
        // NORMAL is during normal operation, when consensus-type migration is not, and can not, take place.
        // MAINTENANCE is when the consensus-type can be changed.
        enum State {
            STATE_NORMAL = 0;
            STATE_MAINTENANCE = 1;
        }
        // The state signals the ordering service to go into maintenance mode, typically for consensus-type migration.
        State state = 3;
    }
    
    • 切块的一些配置
    message BatchSize {
        // Simply specified as number of messages for now, in the future
        // we may want to allow this to be specified by size in bytes
        uint32 max_message_count = 1;
        // The byte count of the serialized messages in a batch cannot
        // exceed this value.
        uint32 absolute_max_bytes = 2;
        // The byte count of the serialized messages in a batch should not
        // exceed this value.
        uint32 preferred_max_bytes = 3;
    }
    

    common.proto

    • HTTP的状态信息
    enum Status {
        UNKNOWN = 0;
        SUCCESS = 200;
        BAD_REQUEST = 400;
        FORBIDDEN = 403;
        NOT_FOUND = 404;
        REQUEST_ENTITY_TOO_LARGE = 413;
        INTERNAL_SERVER_ERROR = 500;
        NOT_IMPLEMENTED = 501;
        SERVICE_UNAVAILABLE = 503;
    }
    
    • HeaderType

    比如channelheader的类型就是从这检索

    enum HeaderType {
        // Prevent removed tag re-use
        // Uncomment after fabric-baseimage moves to 3.5.1
        // reserved 7;
        // reserved "PEER_RESOURCE_UPDATE";
    
        MESSAGE = 0;                   // Used for messages which are signed but opaque
        CONFIG = 1;                    // Used for messages which express the channel config
        CONFIG_UPDATE = 2;             // Used for transactions which update the channel config
        ENDORSER_TRANSACTION = 3;      // Used by the SDK to submit endorser based transactions
        ORDERER_TRANSACTION = 4;       // Used internally by the orderer for management
        DELIVER_SEEK_INFO = 5;         // Used as the type for Envelope messages submitted to instruct the Deliver API to seek
        CHAINCODE_PACKAGE = 6;         // Used for packaging chaincode artifacts for install
        PEER_ADMIN_OPERATION = 8;      // Used for invoking an administrative operation on a peer
        TOKEN_TRANSACTION = 9;         // Used to denote transactions that invoke token management operations
    }
    
    • Header

    签名头,通道头

    message Header {
        bytes channel_header = 1;
        bytes signature_header = 2;
    }
    
    • ChannelHeader
      通道头,channelid,txid,version等信息
    message ChannelHeader {
        int32 type = 1; // Header types 0-10000 are reserved and defined by HeaderType
    
        // Version indicates message protocol version
        int32 version = 2;
    
        // Timestamp is the local time when the message was created
        // by the sender
        google.protobuf.Timestamp timestamp = 3;
    
        // Identifier of the channel this message is bound for
        string channel_id = 4;
    
        // An unique identifier that is used end-to-end.
        //  -  set by higher layers such as end user or SDK
        //  -  passed to the endorser (which will check for uniqueness)
        //  -  as the header is passed along unchanged, it will be
        //     be retrieved by the committer (uniqueness check here as well)
        //  -  to be stored in the ledger
        string tx_id = 5;
    
        // The epoch in which this header was generated, where epoch is defined based on block height
        // Epoch in which the response has been generated. This field identifies a
        // logical window of time. A proposal response is accepted by a peer only if
        // two conditions hold:
        // 1. the epoch specified in the message is the current epoch
        // 2. this message has been only seen once during this epoch (i.e. it hasn't
        //    been replayed)
        uint64 epoch = 6;
    
        // Extension that may be attached based on the header type
        bytes extension = 7;
    
        // If mutual TLS is employed, this represents
        // the hash of the client's TLS certificate
        bytes tls_cert_hash = 8;
    }
    
    • SignatureHeader

    签名头

    message SignatureHeader {
        // Creator of the message, a marshaled msp.SerializedIdentity
        bytes creator = 1;
    
        // Arbitrary number that may only be used once. Can be used to detect replay attacks.
        bytes nonce = 2;
    }
    
    • Playload

    消息内容

    message Payload {
    
        // Header is included to provide identity and prevent replay
        Header header = 1;
    
        // Data, the encoding of which is defined by the type in the header
        bytes data = 2;
    }
    
    • Envelope

    信封用签名包裹有效负载,以便可以对消息进行身份验证

    message Envelope {
        // A marshaled Payload
        bytes payload = 1;
    
        // A signature by the creator specified in the Payload header
        bytes signature = 2;
    }
    
    • Block

    区块分区块头,区块内容,区块元数据

    message Block {
        BlockHeader header = 1;
        BlockData data = 2;
        BlockMetadata metadata = 3;
    }
    
    message BlockHeader {
        uint64 number = 1; // The position in the blockchain
        bytes previous_hash = 2; // The hash of the previous block header
        bytes data_hash = 3; // The hash of the BlockData, by MerkleTree
    }
    
    • BlockData

    可以看到它是一个二维字节数组,符合我们的多条交易打包的需求

    message BlockData {
        repeated bytes data = 1;
    }
    
    message BlockMetadata {
        repeated bytes metadata = 1;
    }
    

    ledger.proto

    账本的结构比较简单,区块高度,hash

    message BlockchainInfo {
        uint64 height = 1;
        bytes currentBlockHash = 2;
        bytes previousBlockHash = 3;
    
    }
    

    主要参考

  • 相关阅读:
    【转】React Native 关于箭头函数、普通函数与点击事件的调用
    【转】React Native Config.h not found ( glog-0.3.4 )
    微软必应Bing搜索引擎这几天无法访问!
    Beyond Compare 4 提示错误“这个授权密钥已被吊销”的解决办法
    flock
    Getting.Started.with.Unity.2018.3rd.Edition
    Joe Hocking
    Unity 2018 By Example 2nd Edition
    Unity 2017 Game Optimization 新版
    Why is it called “armature” instead of “skeleton”? or perhaps “rig”?
  • 原文地址:https://www.cnblogs.com/CherryTab/p/13796254.html
Copyright © 2011-2022 走看看