zoukankan      html  css  js  c++  java
  • 菜鸟系列Fabric源码学习 — peer节点启动

    Fabric 1.4 源码分析peer节点启动

    peer模块采用cobra库来实现cli命令。

    Cobra提供简单的接口来创建强大的现代化CLI接口,比如git与go工具。Cobra同时也是一个程序, 用于创建CLI程序

    peer支持的命令如下所示:

    Usage:
      peer [command]
    
    Available Commands:
      chaincode   Operate a chaincode: install|instantiate|invoke|package|query|signpackage|upgrade|list.
      channel     Operate a channel: create|fetch|join|list|update|signconfigtx|getinfo.
      help        Help about any command
      logging     Log levels: getlevel|setlevel|revertlevels.
      node        Operate a peer node: start|status.
      version     Print fabric peer version.
    
    Flags:
      -h, --help                   help for peer
          --logging-level string   Default logging level and overrides, see core.yaml for full syntax
    

    通过peer 的docker-compose文件可知,peer启动命令为peer node start。从下列代码可知,peer启动时调用serve()接口。

    var nodeStartCmd = &cobra.Command{
    	Use:   "start",
    	Short: "Starts the node.",
    	Long:  `Starts a node that interacts with the network.`,
    	RunE: func(cmd *cobra.Command, args []string) error {
    		if len(args) != 0 {
    			return fmt.Errorf("trailing args detected")
    		}
    		// Parsing of the command line is done so silence cmd usage
    		cmd.SilenceUsage = true
    		return serve(args)
    	},
    }
    

    接下来深入分析serve()接口。

    func serve(args []string) error {
    	// currently the peer only works with the standard MSP
    	// because in certain scenarios the MSP has to make sure
    	// that from a single credential you only have a single 'identity'.
    	// Idemix does not support this *YET* but it can be easily
    	// fixed to support it. For now, we just make sure that
    	// the peer only comes up with the standard MSP
      // 当前peer启动时只支持标准MSP即Fabric。
    	mspType := mgmt.GetLocalMSP().GetType()
    	if mspType != msp.FABRIC {
    		panic("Unsupported msp type " + msp.ProviderTypeToString(mspType))
    	}
    
    	// Trace RPCs with the golang.org/x/net/trace package. This was moved out of
    	// the deliver service connection factory as it has process wide implications
    	// and was racy with respect to initialization of gRPC clients and servers.
    	grpc.EnableTracing = true
    
    	logger.Infof("Starting %s", version.GetInfo())
    
    	//startup aclmgmt with default ACL providers (resource based and default 1.0 policies based).
    	//Users can pass in their own ACLProvider to RegisterACLProvider (currently unit tests do this)
      // 创建ACL提供者 ACL访问控制列表
    	aclProvider := aclmgmt.NewACLProvider(
    		aclmgmt.ResourceGetter(peer.GetStableChannelConfig),
    	)
    	// 平台注册 
    	pr := platforms.NewRegistry(
    		&golang.Platform{},
    		&node.Platform{},
    		&java.Platform{},
    		&car.Platform{},
    	)
    	// 定义部署链码提供者
    	deployedCCInfoProvider := &lscc.DeployedCCInfoProvider{}
    

    DeployedCCInfoProvider实现了DeployedChaincodeInfoProvider。

    DeployedChaincodeInfoProvider是ledger用于构建集合配置历史记录的依赖项
    LSCC模块应该为这个依赖项提供实现

    type DeployedChaincodeInfoProvider interface {
    	Namespaces() []string //命名空间
    	UpdatedChaincodes(stateUpdates map[string][]*kvrwset.KVWrite) ([]*ChaincodeLifecycleInfo, error) // 保存更新的链码
    	ChaincodeInfo(chaincodeName string, qe SimpleQueryExecutor) (*DeployedChaincodeInfo, error) // 保存链码信息
    	CollectionInfo(chaincodeName, collectionName string, qe SimpleQueryExecutor) (*common.StaticCollectionConfig, error) // 链码集合信息
    }
    

    初始化账本资源ledgermgmt.Initialize

    	// 获取通道MSP管理员。如果不存在则创建	
    	identityDeserializerFactory := func(chainID string) msp.IdentityDeserializer {
    		return mgmt.GetManagerForChain(chainID)
    	}
    	// peer 初始化
    	// 保存 peer 一些基本信息 ListenAddress TLS
    	opsSystem := newOperationsSystem()
    	// 监听 ListenAddress
    	err := opsSystem.Start()
    	if err != nil {
    		return errors.WithMessage(err, "failed to initialize operations subystems")
    	}
    	defer opsSystem.Stop()
    	metricsProvider := opsSystem.Provider
    	logObserver := floggingmetrics.NewObserver(metricsProvider)
    	flogging.Global.SetObserver(logObserver)
    	// 实例化私密数据成员membershipInfoProvider 用来判断peer是否在某个私密数据的集合中
    	membershipInfoProvider := privdata.NewMembershipInfoProvider(createSelfSignedData(), identityDeserializerFactory)
    	//initialize resource management exit
    
    	// 初始化账本资源 将前面实例化的对象都进行赋值
    	ledgermgmt.Initialize(
    		&ledgermgmt.Initializer{
    			CustomTxProcessors:            peer.ConfigTxProcessors,
    			PlatformRegistry:              pr,
    			DeployedChaincodeInfoProvider: deployedCCInfoProvider,
    			MembershipInfoProvider:        membershipInfoProvider,
    			MetricsProvider:               metricsProvider,
    			HealthCheckRegistry:           opsSystem,
    		},
    	)
    

    初始化peer GRPC服务配置

    	// Parameter overrides must be processed before any parameters are
    	// cached. Failures to cache cause the server to terminate immediately.
    	// 判断链码是否时开发者模式
    	if chaincodeDevMode {
    		logger.Info("Running in chaincode development mode")
    		logger.Info("Disable loading validity system chaincode")
    
    		viper.Set("chaincode.mode", chaincode.DevModeUserRunsChaincode)
    	}
    	// 缓存peer地址getLocalAddress address:port
    	if err := peer.CacheConfiguration(); err != nil {
    		return err
    	}
    	// 获取peer endpoint,没有则调用CacheConfiguration接口
    	peerEndpoint, err := peer.GetPeerEndpoint()
    	if err != nil {
    		err = fmt.Errorf("Failed to get Peer Endpoint: %s", err)
    		return err
    	}
    	// 获取peer Host
    	peerHost, _, err := net.SplitHostPort(peerEndpoint.Address)
    	if err != nil {
    		return fmt.Errorf("peer address is not in the format of host:port: %v", err)
    	}
    
    	listenAddr := viper.GetString("peer.listenAddress")
    	// 获取peer grpc相关配置 主要是TLS设置和心跳设置
    	serverConfig, err := peer.GetServerConfig()
    	if err != nil {
    		logger.Fatalf("Error loading secure config for peer (%s)", err)
    	}
    	// 设置GRPC最大并发2500
    	throttle := comm.NewThrottle(grpcMaxConcurrency)
    	// GRPC server的一些配置
    	serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "PeerServer")
    	serverConfig.MetricsProvider = metricsProvider
    	serverConfig.UnaryInterceptors = append(
    		serverConfig.UnaryInterceptors,
    		grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)),
    		grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),
    		throttle.UnaryServerIntercptor,
    	)
    	serverConfig.StreamInterceptors = append(
    		serverConfig.StreamInterceptors,
    		grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)),
    		grpclogging.StreamServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),
    		throttle.StreamServerInterceptor,
    	)
    

    将GRPC相关配置及Address传入创建GRPC服务器

      peerServer, err := peer.NewPeerServer(listenAddr, serverConfig)
    	if err != nil {
    		logger.Fatalf("Failed to create peer server (%s)", err)
    	}
    

    TLS及策略相关

    	// TLS相关配置  
      if serverConfig.SecOpts.UseTLS {
    		logger.Info("Starting peer with TLS enabled")
    		// set up credential support
    		cs := comm.GetCredentialSupport()
    		roots, err := peer.GetServerRootCAs()
    		if err != nil {
    			logger.Fatalf("Failed to set TLS server root CAs: %s", err)
    		}
    		cs.ServerRootCAs = roots
    
    		// set the cert to use if client auth is requested by remote endpoints
    		clientCert, err := peer.GetClientCertificate()
    		if err != nil {
    			logger.Fatalf("Failed to set TLS client certificate: %s", err)
    		}
    		comm.GetCredentialSupport().SetClientCertificate(clientCert)
    	}
    
    	mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
    	// 策略校验
    	policyCheckerProvider := func(resourceName string) deliver.PolicyCheckerFunc {
    		return func(env *cb.Envelope, channelID string) error {
    			return aclProvider.CheckACL(resourceName, channelID, env)
    		}
    	}
    

    创建deliver server 传输区块及过滤区块

    	abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)
    	pb.RegisterDeliverServer(peerServer.Server(), abServer)
    

    初始化链码服务

    startChaincodeServer将完成与链代码相关的初始化,包括:
    1)设置本地链代码安装路径
    2)创建特定链代码的CA
    3)启动特定链代码的gRPC监听服务

    	// Initialize chaincode service
    	chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem)
    
    	logger.Debugf("Running peer")
    

    注册背书服务,gossip组件初始化等操作

     // Start the Admin server
    	startAdminServer(listenAddr, peerServer.Server(), metricsProvider)
    	
    	privDataDist := func(channel string, txID string, privateData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error {
        // 分发私有数据到其他节点
    		return service.GetGossipService().DistributePrivateData(channel, txID, privateData, blkHt)
    	}
    	// 获取本地签名
    	signingIdentity := mgmt.GetLocalSigningIdentityOrPanic()
    	serializedIdentity, err := signingIdentity.Serialize()
    	if err != nil {
    		logger.Panicf("Failed serializing self identity: %v", err)
    	}
    
    	libConf := library.Config{}
    	if err = viperutil.EnhancedExactUnmarshalKey("peer.handlers", &libConf); err != nil {
    		return errors.WithMessage(err, "could not load YAML config")
    	}
    	reg := library.InitRegistry(libConf)
    	// 背书 验证相关配置
    	authFilters := reg.Lookup(library.Auth).([]authHandler.Filter)
    	endorserSupport := &endorser.SupportImpl{
    		SignerSupport:    signingIdentity,
    		Peer:             peer.Default,
    		PeerSupport:      peer.DefaultSupport,
    		ChaincodeSupport: chaincodeSupport,
    		SysCCProvider:    sccp,
    		ACLProvider:      aclProvider,
    	}
    	endorsementPluginsByName := reg.Lookup(library.Endorsement).(map[string]endorsement2.PluginFactory)
    	validationPluginsByName := reg.Lookup(library.Validation).(map[string]validation.PluginFactory)
    	signingIdentityFetcher := (endorsement3.SigningIdentityFetcher)(endorserSupport)
    	channelStateRetriever := endorser.ChannelStateRetriever(endorserSupport)
    	pluginMapper := endorser.MapBasedPluginMapper(endorsementPluginsByName)
    	pluginEndorser := endorser.NewPluginEndorser(&endorser.PluginSupport{
    		ChannelStateRetriever:   channelStateRetriever,
    		TransientStoreRetriever: peer.TransientStoreFactory,
    		PluginMapper:            pluginMapper,
    		SigningIdentityFetcher:  signingIdentityFetcher,
    	})
    	endorserSupport.PluginEndorser = pluginEndorser
    	serverEndorser := endorser.NewEndorserServer(privDataDist, endorserSupport, pr, metricsProvider)
    	auth := authHandler.ChainFilters(serverEndorser, authFilters...)
    	// Register the Endorser server
    	pb.RegisterEndorserServer(peerServer.Server(), auth)
    
    	policyMgr := peer.NewChannelPolicyManagerGetter()
    
    	// Initialize gossip component
    	err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address)
    	if err != nil {
    		return err
    	}
    	defer service.GetGossipService().Stop()
    
    	// register prover grpc service
    	// FAB-12971 disable prover service before v1.4 cut. Will uncomment after v1.4 cut
    	// err = registerProverService(peerServer, aclProvider, signingIdentity)
    	// if err != nil {
    	// 	return err
    	// }
    

    初始化系统链码。

    	// initialize system chaincodes
    	// deploy system chaincodes
    	// 部署系统链码
    	sccp.DeploySysCCs("", ccp)
    	logger.Infof("Deployed system chaincodes")
    	// 查看已经安装等链码
    	installedCCs := func() ([]ccdef.InstalledChaincode, error) {
    		return packageProvider.ListInstalledChaincodes()
    	}
    	// 创建链码的生命周期
    	lifecycle, err := cc.NewLifeCycle(cc.Enumerate(installedCCs))
    	if err != nil {
    		logger.Panicf("Failed creating lifecycle: +%v", err)
    	}
    	// HandleMetadataUpdate在链代码生命周期更改发生变化时触发
    	onUpdate := cc.HandleMetadataUpdate(func(channel string, chaincodes ccdef.MetadataSet) {
    		// 更新链码
        service.GetGossipService().UpdateChaincodes(chaincodes.AsChaincodes(), gossipcommon.ChainID(channel))
    	})
    	// 监听器 监听链码更新
    	lifecycle.AddListener(onUpdate)
    

    通道相关配置

    
    	// this brings up all the channels
    	peer.Initialize(func(cid string) {
    		logger.Debugf("Deploying system CC, for channel <%s>", cid)
    		sccp.DeploySysCCs(cid, ccp)
    		sub, err := lifecycle.NewChannelSubscription(cid, cc.QueryCreatorFunc(func() (cc.Query, error) {
          // 返回通道的查询器
    			return peer.GetLedger(cid).NewQueryExecutor()
    		}))
    		if err != nil {
    			logger.Panicf("Failed subscribing to chaincode lifecycle updates")
    		}
        // 注册该通道ChaincodeLifecycleEventListener
    		cceventmgmt.GetMgr().Register(cid, sub)
    	}, ccp, sccp, txvalidator.MapBasedPluginMapper(validationPluginsByName),
    		pr, deployedCCInfoProvider, membershipInfoProvider, metricsProvider)
    	// 获取peer一些配置
    	if viper.GetBool("peer.discovery.enabled") {
    		registerDiscoveryService(peerServer, policyMgr, lifecycle)
    	}
    
    	networkID := viper.GetString("peer.networkId")
    
    	logger.Infof("Starting peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address)
    
    	// Get configuration before starting go routines to avoid
    	// racing in tests
    	profileEnabled := viper.GetBool("peer.profile.enabled")
    	profileListenAddress := viper.GetString("peer.profile.listenAddress")
    
    	// Start the grpc server. Done in a goroutine so we can deploy the
    	// genesis block if needed.
    	serve := make(chan error)
    	// 开启peer grpc服务
    	go func() {
    		var grpcErr error
    		if grpcErr = peerServer.Start(); grpcErr != nil {
    			grpcErr = fmt.Errorf("grpc server exited with error: %s", grpcErr)
    		} else {
    			logger.Info("peer server exited")
    		}
    		serve <- grpcErr
    	}()
    
    	// Start profiling http endpoint if enabled
    	if profileEnabled {
    		go func() {
    			logger.Infof("Starting profiling server with listenAddress = %s", profileListenAddress)
    			if profileErr := http.ListenAndServe(profileListenAddress, nil); profileErr != nil {
    				logger.Errorf("Error starting profiler: %s", profileErr)
    			}
    		}()
    	}
    
    	go handleSignals(addPlatformSignals(map[os.Signal]func(){
    		syscall.SIGINT:  func() { serve <- nil },
    		syscall.SIGTERM: func() { serve <- nil },
    	}))
    
    	// peer启动区块归档任务
    	if ledgerconfig.IsDataDumpEnabled() {
    		logger.Debugf("DataDump:{DumpDir:%s,LoadDir:%s,MaxFileLimit:%d,DumpCron:%v,DumpInterval:%d,LoadRetryTimes:%d}",
    			ledgerconfig.GetDataDumpPath(), ledgerconfig.GetDataLoadPath(), ledgerconfig.GetDataDumpFileLimit(),
    			ledgerconfig.GetDataDumpCron(), ledgerconfig.GetDataDumpInterval(), ledgerconfig.GetDataLoadRetryTimes())
    		go func() {
    			cronList := ledgerconfig.GetDataDumpCron()
    
    			if cronList != nil && len(cronList) > 0 {
    				cronTask := cron.New()
    				cronTask.Start()
    				for _, crontab := range cronList {
    					logger.Debugf("Crontab addFunc for %s", crontab)
    					err := cronTask.AddFunc(crontab, func() {
    						chainInfoArray := peer.GetChannelsInfo()
    						for _, chainInfo := range chainInfoArray {
    							chainId := chainInfo.ChannelId
    							l := peer.GetLedger(chainId)
    							if err := l.DataDump(datadump.DumpForCronTab); err != nil {
    								logger.Errorf("Failed to datadump for [%s]", err)
    							}
    						}
    					})
    					if err != nil {
    						logger.Errorf("Failed to add crontab task for %s", err)
    					}
    				}
    			}
    		}()
    	}
    
    	logger.Infof("Started peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address)
    
    	if viper.GetBool("peer.enBlkrouter") {
    		go func() {
    			startBlockServer()
    		}()
    	}
    	// Block until grpc server exits
    	// 阻塞 直到grpc服务退出
    	return <-serve
    }
    

    到这里Peer节点已经启动完成了,过程还是很复杂的,这里总结一下整体的过程:

    1. 首先就是读取配置信息,创建Cache结构,以及检测其他Peer节点的信息。

      CacheConfiguration(),主要保存其他Peer节点的相关信息。

    2. 创建PeerServer

      peerServer, err := peer.NewPeerServer(listenAddr, serverConfig)

    3. 创建DeliverEventsServer

      1. abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)
      2. pb.RegisterDeliverServer(peerServer.Server(), abServer)
      3. fabric/core/peer/deliverevents.go,该服务主要用于区块的交付与过滤,主要方法:Deliver(),DeliverFiltered()
    4. 启动ChaincodeServer

      1. chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem)
      2. core/chaincode/chaincode_support.go,返回了ChaincodeSupport:为Peer提供执行链码的接口,主要功能有Launch():启动一个停止运行的链码,Stop():停止链码的运行,HandleChaincodeStream():处理链码流信息,Register():将链码注册到当前Peer节点 ,createCCMessage():创建一个交易,ExecuteLegacyInit():链码的实例化,Execute():执行链码并返回回原始的响应,processChaincodeExecutionResult():处理链码的执行结果,InvokeInit():调用链码的Init方法,Invoke():调用链码,execute():执行一个交易
    5. 启动AdminServer

      1. startAdminServer(listenAddr, peerServer.Server(), metricsProvider)
      2. core/protos/peer/admin.go文件,具有GetStatus(),StartServer(),GetModuleLogLevel(),SetModuleLogLevel()等方法
    6. 创建EndorserServer

      1. pb.RegisterEndorserServer(peerServer.Server(), auth)
      2. core/endorser/endorser.go文件,注册背书服务器,提供了一个很重要的方法:ProcessProposal(),这个方法值得看一下。
    7. 创建GossipService

      1. err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address)
      2. gossip/service/gossip_service.go,具有InitializeChannel(),createSelfSignedData(),updateAnchors(),AddPayload()等方法
    8. 部署系统链码。

    9. 初始化通道。

    10. 启动gRPC服务。

    11. 如果启用了profile,还会启动监听服务。

    参考:https://www.cnblogs.com/cbkj-xd/p/11141717.html

    如果你觉得写的不错,请移步www.itkezhan.top或者关注公众号IT程序员客栈
  • 相关阅读:
    高可用Redis服务架构分析与搭建
    Java 程序性能问题
    限流、熔断、服务降级理解
    设计模式-享元设计
    设计模式-原型设计
    java8 Stream原理
    SQL语句性能优化策略
    OAuth2和JWT
    5种常见的Docker Compose错误
    leetcode_699. 掉落的方块
  • 原文地址:https://www.cnblogs.com/i-dandan/p/11959658.html
Copyright © 2011-2022 走看看