zoukankan      html  css  js  c++  java
  • 菜鸟系列Fabric源码学习 — orderer服务启动

    Fabric 1.4 orderer 服务启动流程

    1.提要

    orderer提供broadcast和deliver两个服务接口。orderer节点与各个peer节点通过grpc连接,orderer将所有peer节点通过broadcast发来的交易(Envelope格式,比如peer部署后的数据)按照配置的大小依次封装到一个个block中并写入orderer自己的账本中,然后供各个peer节点的gossip服务通过deliver来消费这个账本中的数据进行自身结点账本的同步。

    2.初始化过程

    先看看main函数。

    // Main is the entry point of orderer process
    func Main() {
    	fullCmd := kingpin.MustParse(app.Parse(os.Args[1:]))
    
    	// "version" command
    	if fullCmd == version.FullCommand() {
    		fmt.Println(metadata.GetVersionInfo())
    		return
    	}
    
    	conf, err := localconfig.Load()
    	if err != nil {
    		logger.Error("failed to parse config: ", err)
    		os.Exit(1)
    	}
    	initializeLogging()
    	initializeLocalMsp(conf)
    
    	prettyPrintStruct(conf)
    	Start(fullCmd, conf)
    }
    

    从中可知 orderer服务命令行是通过kingpin来实现的,基本上只是简单使用了下,也只实现了3个命令:

      start*
        Start the orderer node
      version
        Show version information
      benchmark
        Run orderer in benchmark mode
    

    并且从上述main函数可知,仅version有对应操作,而orderer 默认为orderer start。

    启动流程为:

    1. 加载配置(orderer.yaml/Defaults/环境变量)
    2. 初始化log(log级别和log格式)
    3. 初始化本地msp
    4. 启动服务Start()

    接下来主要关注第4步。前面基本上是配置初始化第过程。
    查看一下start函数:

    1. 从配置文件启动块路径获取配置块及验证是否可作为配置块(系统通道第一个块)
    2. 集群相关初始化配置
    3. 判断是否是raft共识及使用的是最新配置块,如果是,则进行下列流程:
      1. 获取所有应用链及其创世区块块(discoverChannels)
      2. 根据orderer是否在应用链配置块的raft节点中分类(channelsToPull topull/nottopull)
      3. 创建所有的应用通道账本
      4. 获取topull应用通道的账本(从orderer处获取)
      5. 获取系统通道账本
    if clusterType && conf.General.GenesisMethod == "file" {
    		r.replicateIfNeeded(bootstrapBlock)
    	}
    	
    func (r *Replicator) ReplicateChains() []string {
    	var replicatedChains []string
    	channels := r.discoverChannels()
    	pullHints := r.channelsToPull(channels)
    	totalChannelCount := len(pullHints.channelsToPull) + len(pullHints.channelsNotToPull)
    
    	for _, channels := range [][]ChannelGenesisBlock{pullHints.channelsToPull, pullHints.channelsNotToPull} {
    		for _, channel := range channels {
    			...
    			r.appendBlock(gb, ledger, channel.ChannelName)
    		}
    	}
    
    	for _, channel := range pullHints.channelsToPull {
    		err := r.PullChannel(channel.ChannelName)
    		...
    	}
    
    	// Last, pull the system chain.
    	if err := r.PullChannel(r.SystemChannel); err != nil && err != ErrSkipped {
    		r.Logger.Panicf("Failed pulling system channel: %v", err)
    	}
    	return replicatedChains
    }
    
    1. 启动及初始化必要模块

      1. 创建系统链
      // Are we bootstrapping?
      if len(lf.ChainIDs()) == 0 {
      	initializeBootstrapChannel(genesisBlock, lf)
      } else {
      	logger.Info("Not bootstrapping because of existing chains")
      }
      
      1. 多通道初始化(initializeMultichannelRegistrar)

        • 初始化registrar实例
        registrar := multichannel.NewRegistrar(lf, signer, metricsProvider, callbacks...)
        
        // Registrar serves as a point of access and control for the individual channel resources.
        type Registrar struct {
        	lock   sync.RWMutex
        	//当前所有通道的chain对象
        	chains map[string]*ChainSupport
            //不同共识类型的consenter
        	consenters         map[string]consensus.Consenter
        	//Factory通过chainID检索或创建新的分类帐
        	ledgerFactory      blockledger.Factory
            //签名相关
        	signer             crypto.LocalSigner
        	blockcutterMetrics *blockcutter.Metrics
        	//系统链id
        	systemChannelID    string
        	//系统链chainSupport
        	systemChannel      *ChainSupport
        	//通道配置模版
        	templator          msgprocessor.ChannelConfigTemplator
        	callbacks          []channelconfig.BundleActor
        }
        
        • 初始化共识机制
        consenters["solo"] = solo.New()
        var kafkaMetrics *kafka.Metrics
        consenters["kafka"], kafkaMetrics = kafka.New(conf, metricsProvider, healthChecker, registrar)
        go kafkaMetrics.PollGoMetricsUntilStop(time.Minute, nil)
        if isClusterType(bootstrapBlock) {
        	initializeEtcdraftConsenter(consenters, conf, lf, clusterDialer, bootstrapBlock, ri, srvConf, srv, registrar, metricsProvider)
        }
        
        • 启动orderer现存的链(系统链/应用链,通过读取链的目录查看现存链),为每条链实例化了ChainSupport对象,然后启动
        chain := newChainSupport(
           	r,
           	ledgerResources,
           	r.consenters,
           	r.signer,
           	r.blockcutterMetrics,
           )
        
        for _, chainID := range existingChains {
                ...
        		chain.start()
        		...
        }
        
      2. 启动GRPC服务
        server.go中的服务端对象实例server在main.go的main()中由server := NewServer(manager, signer)生成,使用ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)进行了注册,随后grpcServer.Start()启动起来。
        其主要的两个接口为:

      type AtomicBroadcastServer interface {
          Broadcast(AtomicBroadcast_BroadcastServer) error
          Deliver(AtomicBroadcast_DeliverServer) error
      }
      

      其接口的实现在:orderer/common/server/server.go

    3.相关模块介绍

    3.1 ChainSupport

    提供支持chain相关操作的资源,既包含账本本身,也包含了账本用到的各种工具对象,如分割工具cutter,签名工具signer,最新配置在chain中的位置信息(lastConfig的值代表当前链中最新配置所在的block的编号,lastConfigSeq的值则代表当前链中最新配置消息自身的编号)等

    type ChainSupport struct {
        // 账本相关资源 包括账本的读写及配置的获取
    	*ledgerResources
    	// 提供从客户端获取消息分类处理接口
    	msgprocessor.Processor
    	// 将区块写入磁盘
    	*BlockWriter
        // 链 提供对messages对处理方法
        //This design allows for two primary flows
        // 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka)
        // 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft)
    	consensus.Chain
        // 广播消息接收器 提供切块方法	
    	cutter blockcutter.Receiver
    	//签名
    	crypto.LocalSigner
    	// chains need to know if they are system or standard channel.
    	systemChannel bool
    }
    

    3.2 blockcutter模块

    • 块分割工具,用于分割block,具体为orderer/common/blockcutter/blockcutter.go中定义的receiver。一条一条消息数据在流水线上被传送到cutter处,cutter按照configtx.yaml中的配置,把一条条消息打包成一批消息,同时返回整理好的这批消息对应的committer集合,至此cutter的任务完成。
    MaxMessageCount指定了block中最多存储的消息数量
    AbsoluteMaxBytes指定了block最大的字节数
    PreferredMaxBytes指定了一条消息的最优的最大字节数(blockcutter处理消息的过程中会努力使每一批消息尽量保持在这个值上)。
    
    1. 若一个Envelope的数据大小(Payload+签名)大于PreferredMaxBytes时,无论当前缓存如何,立即Cut;
    2. 若一个Envelope被要求单纯存储在一个block(即该消息对应的committer的Isolated()返回为true),要立即Cut
    3. 若一个Envelope的大小加上blockcutter已有的消息的大小之和大于PreferredMaxBytes时,要立即Cut;
    4. 若blockcutter当前缓存的消息数量大于MaxMessageCount了,要立即Cut。
    5. 由configtx.yaml中BatchTimeout配置项(默认2s)控制,当时间超过此值,chain启动的处理消息的流程中主动触发的Cut。

    3.3 chain start()模块

    主要是对消息进行处理,将交易消息传输给block cutter切成块及写入账本。不同的共识机制操作不同。(后续结合consenter模块一起详细介绍)

    chain := newChainSupport(
    				r,
    				ledgerResources,
    				r.consenters,
    				r.signer,
    				r.blockcutterMetrics,
    			)
    r.chains[chainID] = chain
    chain.start()
    

    3.4 consenter模块:

    solo/kafka/etcdraft三种共识类型,用于序列化生产(即各个peer点传送来的Envelope)出来的消息。

    参考:
    https://blog.csdn.net/idsuf698987/article/details/78639203

    有兴趣的关注IT程序员客栈哦

    如果你觉得写的不错,请移步www.itkezhan.top或者关注公众号IT程序员客栈
  • 相关阅读:
    MFC常用控件使用方法
    用CImage类来显示PNG、JPG等图片
    javascript
    gSoap学习笔记
    Linux增加Swap分区
    nagios 监控shell脚本
    新机器连接老机器遇到的ERROR
    linux下PS1命令提示符设置
    python基础篇之进阶
    mysql不能使用localhost登录
  • 原文地址:https://www.cnblogs.com/i-dandan/p/11953059.html
Copyright © 2011-2022 走看看