- Hyperledger Fabric Orderer部分源码分析
Hyperledger Fabric Orderer部分源码分析
笔记
Orderer的数据结构
前言
Orderer负责接收交易,把交易打包成区块,然后区块在所有Orderer节点之间达成一致,再分发给Peer的功能,这涉及了:
- 网络:gRPC接受交易Brodcast,向peer发送区块
- 切块:把交易按一定规则打包成区块
- 共识:所有Orderer节点达成一致
简单来讲Orderer和Peer有点像发布订阅,生产消费的关系
Registrar
| |-- multichannel
| | |-- blockwriter.go
| | |-- blockwriter_test.go
| | |-- chainsupport.go
| | |-- chainsupport_test.go
| | |-- registrar.go
| | |-- registrar_test.go
| | `-- util_test.go
Registrar serves as a point of access and control for the individual channel resources.
它负责了每个channel资源的访问和控制点,要对某个通道(链)怎么样,得从这入手。(通道和链其实都是区块链)
type Registrar struct {
lock sync.RWMutex
// 保存了每一条链,每一条链在Orderer中都以ChainSupport代表,也就是说根据一个链的chainID来查询进而进行后续操作
chains map[string]*ChainSupport
config localconfig.TopLevel
// 保存了所有的共识插件,每个共识插件都是一个Consenter,Fabric 1.4中共识插件有Solo、Kafka、EtcdRaft
consenters map[string]consensus.Consenter
// 用来读取和创建链的账本
ledgerFactory blockledger.Factory
// 用来对Orderer中的数据进行签名,以及创建SignatureHeader
signer crypto.LocalSigner
blockcutterMetrics *blockcutter.Metrics
// 系统链ID
systemChannelID string
// 系统链实例
systemChannel *ChainSupport
templator msgprocessor.ChannelConfigTemplator
callbacks []channelconfig.BundleActor
}
ChainSupport
ChainSupport汇集了一条通道所需要的所有资源,所以说一个ChainSupport代表了一条链。
// ChainSupport holds the resources for a particular channel.
type ChainSupport struct {
// 账本读写
*ledgerResources
// 处理交易信息,根据不同类型有不同的处理
msgprocessor.Processor
// 把区块写入到账本,看源码这里是只允许一个thread操作,因此也加入了协程的操作
*BlockWriter
// Orderer的共识实例,比如每条链都有自己的Raft共识实例,它们互不干扰
consensus.Chain
// 交易切块
cutter blockcutter.Receiver
crypto.LocalSigner
}
Chain
Chain是接口,它的实现并不一条链,而是一条链的共识实例,可以是Solo、Kafka和EtcdRaft,它运行在单独的协程,使用Channel和ChainSupport通信,它调用其它接口完成切块,以及让所有的Orderer节点对交易达成一致。
type Chain interface {
// 普通消息/交易排序
Order(env *cb.Envelope, configSeq uint64) error
// 配置消息/交易排序
Configure(config *cb.Envelope, configSeq uint64) error
// 等待排序集群可用
WaitReady() error
// 当排序集群发送错误时,会关闭返回的通道
Errored() <-chan struct{}
// 启动当前链
Start()
// 停止当前链,并释放资源
Halt()
}
Consenter
type Consenter interface {
HandleChain(support ConsenterSupport, metadata *cb.Metadata) (Chain, error)
}
Consenter也是接口,它只有1个功能用来创建Chain。每种共识插件,都有自己单独的consenter实现,分别用来创建solo实例、kafka实例或etcdraft实例。
ConsenterSupport
type ConsenterSupport interface {
crypto.LocalSigner
msgprocessor.Processor
// VerifyBlockSignature verifies a signature of a block with a given optional
// configuration (can be nil).
VerifyBlockSignature([]*cb.SignedData, *cb.ConfigEnvelope) error
// BlockCutter returns the block cutting helper for this channel.
// 把消息切成区块
BlockCutter() blockcutter.Receiver
// SharedConfig provides the shared config from the channel's current config block.
// 当前链的Orderer配置
SharedConfig() channelconfig.Orderer
// ChannelConfig provides the channel config from the channel's current config block.
// 当前链的通道配置
ChannelConfig() channelconfig.Channel
// CreateNextBlock takes a list of messages and creates the next block based on the block with highest block number committed to the ledger
// Note that either WriteBlock or WriteConfigBlock must be called before invoking this method a second time.
// 生成区块
CreateNextBlock(messages []*cb.Envelope) *cb.Block
// Block returns a block with the given number,
// or nil if such a block doesn't exist.
// 读区块
Block(number uint64) *cb.Block
// WriteBlock commits a block to the ledger.
// 写区块
WriteBlock(block *cb.Block, encodedMetadataValue []byte)
// WriteConfigBlock commits a block to the ledger, and applies the config update inside.
// 写区块并更新配置
WriteConfigBlock(block *cb.Block, encodedMetadataValue []byte)
// Sequence returns the current config squence.
Sequence() uint64
// ChainID returns the channel ID this support is associated with.
ChainID() string
// Height returns the number of blocks in the chain this channel is associated with.
Height() uint64
// Append appends a new block to the ledger in its raw form,
// unlike WriteBlock that also mutates its metadata.
// 以原始数据的格式追加区块,不像WriteBlock那样修改元数据
Append(block *cb.Block) error
}
小结
- Registrar 包容万象,主要是ChainSupport和Consenter,Consenter是可插拔的
- ChainSupport 代表了一条链,能够指向属于本条链的共识实例,该共识实例由对应共识类型的Consenter创建
- 共识实例使用ConsenterSupport访问共识外部资源
Order主要模块
orderer
├── README.md
├── common
│ ├── blockcutter 缓存待打包的交易,切块
│ ├── bootstrap 启动时替换通道创世块
│ ├── broadcast orderer的Broadcast接口
│ ├── cluster (Raft)集群服务
│ ├── localconfig 解析orderer配置文件orderer.yaml
│ ├── metadata 区块元数据填写
│ ├── msgprocessor 交易检查
│ ├── multichannel 多通道支持:Registrar、chainSupport、写区块
│ └── server Orderer节点的服务端程序
├── consensus 共识插件
│ ├── consensus.go 共识插件需要实现的接口等定义
│ ├── etcdraft raft共识插件
│ ├── inactive 未激活时的raft
│ ├── kafka kafka共识插件
│ ├── mocks 测试用的共识插件
│ └── solo solo共识插件
├── main.go orderer程序入口
├── mocks
│ ├── common
│ └── util
└── sample_clients orderer的客户端程序样例
├── broadcast_config
├── broadcast_msg
└── deliver_stdout
Orderer启动过程
步骤
- 加载配置文件
- 设置Logger
- 设置本地MSP
- 核心启动部分:
- 加载创世块
- 创建账本工厂
- 创建本机gRPCServer
- 如果共识需要集群(raft),创建集群gRPCServer
- 创建Registrar:设置好共识插件,启动各通道,如果共识是raft,还会设置集群的gRPC接口处理函数Step
- 创建本机server:它是原子广播的处理服务,融合了Broadcast处理函数、deliver处理函数和registrar
- 开启profile
- 启动集群gRPC服务
- 启动本机gRPC服务
启动入口main
github.comhyperledgerfabricorderercommonservermain.go
// Main is the entry point of orderer process
func Main() {
fullCmd := kingpin.MustParse(app.Parse(os.Args[1:]))
// "version" command
if fullCmd == version.FullCommand() {
fmt.Println(metadata.GetVersionInfo())
return
}
// 从本地配置文件和环境变量中读取配置信息,构建配置树结构
conf, err := localconfig.Load()
if err != nil {
logger.Error("failed to parse config: ", err)
os.Exit(1)
}
// 配置日志级别
initializeLogging()
// 配置 MSP 结构
initializeLocalMsp(conf)
prettyPrintStruct(conf)
// 完成启动后的核心工作
Start(fullCmd, conf)
}
Start
github.comhyperledgerfabricorderercommonservermain.go
// Start provides a layer of abstraction for benchmark test
func Start(cmd string, conf *localconfig.TopLevel) {
// 加载/创建创世块
bootstrapBlock := extractBootstrapBlock(conf)
if err := ValidateBootstrapBlock(bootstrapBlock); err != nil {
logger.Panicf("Failed validating bootstrap block: %v", err)
}
opsSystem := newOperationsSystem(conf.Operations, conf.Metrics)
err := opsSystem.Start()
if err != nil {
logger.Panicf("failed to initialize operations subsystem: %s", err)
}
defer opsSystem.Stop()
metricsProvider := opsSystem.Provider
// 创建操作的账本工厂
lf, _ := createLedgerFactory(conf, metricsProvider)
sysChanLastConfigBlock := extractSysChanLastConfig(lf, bootstrapBlock)
clusterBootBlock := selectClusterBootBlock(bootstrapBlock, sysChanLastConfigBlock)
// 初始化签名结构
signer := localmsp.NewSigner()
logObserver := floggingmetrics.NewObserver(metricsProvider)
flogging.Global.SetObserver(logObserver)
serverConfig := initializeServerConfig(conf, metricsProvider)
// 创建本机gRPC服务连接
grpcServer := initializeGrpcServer(conf, serverConfig)
caSupport := &comm.CredentialSupport{
AppRootCAsByChain: make(map[string]comm.CertificateBundle),
OrdererRootCAsByChainAndOrg: make(comm.OrgRootCAs),
ClientRootCAs: serverConfig.SecOpts.ClientRootCAs,
}
var r *replicationInitiator
clusterServerConfig := serverConfig
clusterGRPCServer := grpcServer // by default, cluster shares the same grpc server
clusterClientConfig := comm.ClientConfig{SecOpts: &comm.SecureOptions{}, KaOpts: &comm.KeepaliveOptions{}}
var clusterDialer *cluster.PredicateDialer
var reuseGrpcListener bool
typ := consensusType(bootstrapBlock)
var serversToUpdate []*comm.GRPCServer
clusterType := isClusterType(clusterBootBlock)
// 如果共识需要集群(raft),创建集群gRPCServer
if clusterType {
logger.Infof("Setting up cluster for orderer type %s", typ)
clusterClientConfig = initializeClusterClientConfig(conf)
clusterDialer = &cluster.PredicateDialer{
ClientConfig: clusterClientConfig,
}
r = createReplicator(lf, bootstrapBlock, conf, clusterClientConfig.SecOpts, signer)
// Only clusters that are equipped with a recent config block can replicate.
if conf.General.GenesisMethod == "file" {
r.replicateIfNeeded(bootstrapBlock)
}
if reuseGrpcListener = reuseListener(conf, typ); !reuseGrpcListener {
clusterServerConfig, clusterGRPCServer = configureClusterListener(conf, serverConfig, ioutil.ReadFile)
}
// If we have a separate gRPC server for the cluster,
// we need to update its TLS CA certificate pool.
serversToUpdate = append(serversToUpdate, clusterGRPCServer)
}
// if cluster is reusing client-facing server, then it is already
// appended to serversToUpdate at this point.
if grpcServer.MutualTLSRequired() && !reuseGrpcListener {
serversToUpdate = append(serversToUpdate, grpcServer)
}
tlsCallback := func(bundle *channelconfig.Bundle) {
logger.Debug("Executing callback to update root CAs")
updateTrustedRoots(caSupport, bundle, serversToUpdate...)
if clusterType {
updateClusterDialer(caSupport, clusterDialer, clusterClientConfig.SecOpts.ServerRootCAs)
}
}
sigHdr, err := signer.NewSignatureHeader()
if err != nil {
logger.Panicf("Failed creating a signature header: %v", err)
}
expirationLogger := flogging.MustGetLogger("certmonitor")
crypto.TrackExpiration(
serverConfig.SecOpts.UseTLS,
serverConfig.SecOpts.Certificate,
[][]byte{clusterClientConfig.SecOpts.Certificate},
sigHdr.Creator,
expirationLogger.Warnf, // This can be used to piggyback a metric event in the future
time.Now(),
time.AfterFunc)
// 初始化Registrar
manager := initializeMultichannelRegistrar(clusterBootBlock, r, clusterDialer, clusterServerConfig, clusterGRPCServer, conf, signer, metricsProvider, opsSystem, lf, tlsCallback)
mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
expiration := conf.General.Authentication.NoExpirationChecks
// 初始化gRPC服务端结构
// 分别响应 Deliver() 和 Broadcast() 两个 gRPC 调用。
server := NewServer(manager, metricsProvider, &conf.Debug, conf.General.Authentication.TimeWindow, mutualTLS, expiration)
logger.Infof("Starting %s", metadata.GetVersionInfo())
go handleSignals(addPlatformSignals(map[os.Signal]func(){
syscall.SIGTERM: func() {
grpcServer.Stop()
if clusterGRPCServer != grpcServer {
clusterGRPCServer.Stop()
}
},
}))
if !reuseGrpcListener && clusterType {
logger.Info("Starting cluster listener on", clusterGRPCServer.Address())
go clusterGRPCServer.Start()
}
// 开启profile服务
initializeProfilingService(conf)
// 绑定gRPC服务并启动
ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)
logger.Info("Beginning to serve requests")
grpcServer.Start()
}
extractSysChanLastConfig
github.comhyperledgerfabricorderercommonservermain.go
// Extract system channel last config block
func extractSysChanLastConfig(lf blockledger.Factory, bootstrapBlock *cb.Block) *cb.Block {
// Are we bootstrapping?
chainCount := len(lf.ChainIDs())
// 如果是首次启动,默认先创建系统通道的本地账本结构
if chainCount == 0 {
logger.Info("Bootstrapping because no existing channels")
return nil
}
...
}
initializeMultichannelRegistrar
github.comhyperledgerfabricorderercommonservermain.go
func initializeMultichannelRegistrar(
bootstrapBlock *cb.Block,
ri *replicationInitiator,
clusterDialer *cluster.PredicateDialer,
srvConf comm.ServerConfig,
srv *comm.GRPCServer,
conf *localconfig.TopLevel,
signer crypto.LocalSigner,
metricsProvider metrics.Provider,
healthChecker healthChecker,
lf blockledger.Factory,
callbacks ...channelconfig.BundleActor,
) *multichannel.Registrar {
genesisBlock := extractBootstrapBlock(conf)
// Are we bootstrapping?
if len(lf.ChainIDs()) == 0 {
initializeBootstrapChannel(genesisBlock, lf)
} else {
logger.Info("Not bootstrapping because of existing channels")
}
consenters := make(map[string]consensus.Consenter)
// 创建Registrar
registrar := multichannel.NewRegistrar(*conf, lf, signer, metricsProvider, callbacks...)
var icr etcdraft.InactiveChainRegistry
if isClusterType(bootstrapBlock) {
etcdConsenter := initializeEtcdraftConsenter(consenters, conf, lf, clusterDialer, bootstrapBlock, ri, srvConf, srv, registrar, metricsProvider)
icr = etcdConsenter.InactiveChainRegistry
}
// 初始化共识插件
consenters["solo"] = solo.New()
var kafkaMetrics *kafka.Metrics
consenters["kafka"], kafkaMetrics = kafka.New(conf.Kafka, metricsProvider, healthChecker, icr, registrar.CreateChain)
// Note, we pass a 'nil' channel here, we could pass a channel that
// closes if we wished to cleanup this routine on exit.
go kafkaMetrics.PollGoMetricsUntilStop(time.Minute, nil)
registrar.Initialize(consenters)
return registrar
}
Initialize
github.comhyperledgerfabricorderercommonmultichannel egistrar.go
// 启动共识
func (r *Registrar) Initialize(consenters map[string]consensus.Consenter) {
r.consenters = consenters
existingChains := r.ledgerFactory.ChainIDs()
// 启动本地所有的账本结构的共识过程
for _, chainID := range existingChains {
rl, err := r.ledgerFactory.GetOrCreate(chainID)
if err != nil {
logger.Panicf("Ledger factory reported chainID %s but could not retrieve it: %s", chainID, err)
}
configTx := configTx(rl)
if configTx == nil {
logger.Panic("Programming error, configTx should never be nil here")
}
ledgerResources := r.newLedgerResources(configTx)
chainID := ledgerResources.ConfigtxValidator().ChainID()
// 如果是系统账本(默认在首次启动时会自动创建)
// 启动共识过程
if _, ok := ledgerResources.ConsortiumsConfig(); ok {
if r.systemChannelID != "" {
logger.Panicf("There appear to be two system chains %s and %s", r.systemChannelID, chainID)
}
chain := newChainSupport(
r,
ledgerResources,
r.consenters,
r.signer,
r.blockcutterMetrics,
)
r.templator = msgprocessor.NewDefaultTemplator(chain)
chain.Processor = msgprocessor.NewSystemChannel(chain, r.templator, msgprocessor.CreateSystemChannelFilters(r, chain, r.config))
// Retrieve genesis block to log its hash. See FAB-5450 for the purpose
iter, pos := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Oldest{Oldest: &ab.SeekOldest{}}})
defer iter.Close()
if pos != uint64(0) {
logger.Panicf("Error iterating over system channel: '%s', expected position 0, got %d", chainID, pos)
}
genesisBlock, status := iter.Next()
if status != cb.Status_SUCCESS {
logger.Panicf("Error reading genesis block of system channel '%s'", chainID)
}
logger.Infof("Starting system channel '%s' with genesis block hash %x and orderer type %s",
chainID, genesisBlock.Header.Hash(), chain.SharedConfig().ConsensusType())
r.chains[chainID] = chain
r.systemChannelID = chainID
r.systemChannel = chain
// We delay starting this chain, as it might try to copy and replace the chains map via newChain before the map is fully built
// 启动共识过程
defer chain.start()
} else { // 应用账本
logger.Debugf("Starting chain: %s", chainID)
chain := newChainSupport(
r,
ledgerResources,
r.consenters,
r.signer,
r.blockcutterMetrics,
)
r.chains[chainID] = chain
chain.start()
}
}
if r.systemChannelID == "" {
logger.Panicf("No system chain found. If bootstrapping, does your system channel contain a consortiums group definition?")
}
}
chain.start()
1.0是如此,以协程的方式
func (chain *chainImpl) Start() {
go startThread(chain)
}
1.4看起来是根据不同的公式插件不同处理
// TODO待研究是如何进行插拔的
// Start instructs the orderer to begin serving the chain and keep it current.
func (c *Chain) Start() {
c.logger.Infof("Starting Raft node")
if err := c.configureComm(); err != nil {
c.logger.Errorf("Failed to start chain, aborting: +%v", err)
close(c.doneC)
return
}
isJoin := c.support.Height() > 1
if isJoin && c.opts.MigrationInit {
isJoin = false
c.logger.Infof("Consensus-type migration detected, starting new raft node on an existing channel; height=%d", c.support.Height())
}
c.Node.start(c.fresh, isJoin)
close(c.startC)
close(c.errorC)
go c.gc()
go c.serveRequest()
es := c.newEvictionSuspector()
interval := DefaultLeaderlessCheckInterval
if c.opts.LeaderCheckInterval != 0 {
interval = c.opts.LeaderCheckInterval
}
c.periodicChecker = &PeriodicCheck{
Logger: c.logger,
Report: es.confirmSuspicion,
CheckInterval: interval,
Condition: c.suspectEviction,
}
c.periodicChecker.Run()
}
小结
主要分为两个部分:初始化配置,核心启动
Orderer整体架构
客户端通过Broadcast接口向Orderer提交背书过的交易,客户端(cli,peer)通过Deliver接口订阅区块事件,从Orderer获取区块
架构图
多通道
Fabric 支持多通道特性,而Orderer是多通道的核心组成部分。多通道由Registrar、ChainSupport、BlockWriter等一些重要部件组成
- Registrar是所有通道资源的汇总,访问每一条通道,都要经由Registrar
- ChainSupport代表了每一条通道,它融合了一条通道所有的资源
- BlockWriter 是区块达成共识后,Orderer写入区块到账本需要使用的接口
共识插件
Fabric的共识是插件化的,抽象出了Orderer所使用的共识接口,任何一种共识插件,只要满足给定的接口,就可以配合Fabric Orderer使用。
gRPC通信
- Broadcast:用来接受客户端提交的待排序的交易
- Deliver:客户端用来接受Orderer节点发送的共识后的区块
Local Config
用来解析orderer节点的配置文件: orderer.yaml
// Load parses the orderer YAML file and environment, producing
// a struct suitable for config use, returning error on failure.
func Load() (*TopLevel, error) {
config := viper.New()
coreconfig.InitViper(config, "orderer")
...
}
本地配置,不需要节点之间的统一配置,相关配置有:
- 网络相关配置
- 账本类型、位置
- raft文件位置
- ...
通道配置/上链配置:
- 共识类型
- 区块大小
- 切块时间
- 区块交易数
- 各种共识的配置
- ...
Metadata
//TODO 暂未搞懂它干嘛的,生成区块后还可以继续修改,不知道存在意义是什么。
MsgProcessor
orderer收到交易后需要对交易进行多项检查,不同的通道可以设置不同的MsgProcessor,也就可以进行不同的检查
当前Processor分为两个:
- 应用通道的叫StandardChannel
- 系统通道的叫SystemChannel
func (s *StandardChannel) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
oc, ok := s.support.OrdererConfig()
if !ok {
logger.Panicf("Missing orderer config")
}
if oc.Capabilities().ConsensusTypeMigration() {
if oc.ConsensusState() != orderer.ConsensusType_STATE_NORMAL {
return 0, errors.WithMessage(
ErrMaintenanceMode, "normal transactions are rejected")
}
}
configSeq = s.support.Sequence()
err = s.filters.Apply(env)
return
}
这里主要是进行各种验证相关的操作
err = s.filters.Apply(env)
func (rs *RuleSet) Apply(message *ab.Envelope) error {
for _, rule := range rs.rules {
err := rule.Apply(message)
if err != nil {
return err
}
}
return nil
}
BlockCutter
BlockCutter用来把收到的交易分成多个组,每组交易会打包到一个区块中。而分组的过程,就是切块,每组交易被称为一个Batch,它有一个缓冲区用来存放待切块交易。
type receiver struct {
// 缓冲区收到配置交易,配置交易要放到单独区块,如果缓冲区有交易,缓冲区已有交易会先切到1个区块
sharedConfigFetcher OrdererConfigFetcher
// 缓冲区内交易数,达到区块包含的交易上限(默认500)
pendingBatch []*cb.Envelope
// 缓冲区内交易总大小,达到区块大小上限(默认10MB)
pendingBatchSizeBytes uint32
// 缓冲区存在交易,并且未出块的时间,达到切块超时时间(默认2s)
PendingBatchStartTime time.Time
ChannelID string
Metrics *Metrics
}
这块更像是一个单纯的打包机器,不涉及排序,哪个交易先写入BlockCutter的缓冲区,哪个交易就在前面。
BlockWriter
写区块
这一部分主要是区块的生成,写,落盘
小结
Orderer的功能,组成
Orderer处理交易
接受消息的数据结构
// Envelope wraps a Payload with a signature so that the message may be authenticated
type Envelope struct {
// A marshaled Payload
// 一个信息的body
Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"`
// A signature by the creator specified in the Payload header
// 一个发送者的签名
Signature []byte `protobuf:"bytes,2,opt,name=signature,proto3" json:"signature,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
普通交易信息在Orderer中的流程
- Orderer 的 Broadcast 接口收到来自客户端提交的交易,会获取交易所在的链的资源,并进行首次检查,然后提交给该链的共识,对交易进行排序,最后向客户端发送响应。
// broadcast.go
// Handle reads requests from a Broadcast stream, processes them, and returns the responses to the stream
func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
// 获取发送者的addr
addr := util.ExtractRemoteAddress(srv.Context())
logger.Debugf("Starting new broadcast loop for %s", addr)
for {
// 循环等待请求,其实是通过Orderer 的 Broadcast 接口收到来自客户端提交的交易
msg, err := srv.Recv()
if err == io.EOF {
logger.Debugf("Received EOF from %s, hangup", addr)
return nil
}
if err != nil {
logger.Warningf("Error reading from %s: %s", addr, err)
return err
}
// 验证处理信息
resp := bh.ProcessMessage(msg, addr)
err = srv.Send(resp)
if resp.Status != cb.Status_SUCCESS {
return err
}
if err != nil {
logger.Warningf("Error sending to %s: %s", addr, err)
return err
}
}
}
配置交易在Orderer中的流程
基本同上
节点接受和广播区块
核心过程
Kafka作为共识插件情况
- 客户端通过gRPC发送交易信息给orderer节点的Broadcast()接口
- Orderer接口收到请求后,提取消息,解析检查消息,通过验证后封装发给Kafka
- Orderer同时不断从Kafka拉去排序好的信息,然后会打包成区块
Broadcast
流程图
Broadcast,意味着客户端将请求消息(例如完成背书后的交易)通过 gRPC 接口发送给 Ordering 服务。Orderer 进行本地验证处理后,会转到共识模块处理。
Orderer接收的主要有那些消息:链码的实例化,调用;通道的创建和更新。
代码流程
解析消息
- 来自客户端的消息,首先交给server处理
// Broadcast receives a stream of messages from a client for ordering
func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
logger.Debugf("Starting new Broadcast handler")
defer func() {
if r := recover(); r != nil {
logger.Criticalf("Broadcast client triggered panic: %s
%s", r, debug.Stack())
}
logger.Debugf("Closing Broadcast stream")
}()
return s.bh.Handle(&broadcastMsgTracer{
AtomicBroadcast_BroadcastServer: srv,
msgTracer: msgTracer{
debug: s.debug,
function: "Broadcast",
},
})
}
对应的grpc服务是broadcast里的recv
type AtomicBroadcast_BroadcastServer interface {
Send(*BroadcastResponse) error
Recv() (*common.Envelope, error)
grpc.ServerStream
}
其实client到orderer的信息是通过Envelope包装
type Envelope struct {
// A marshaled Payload
Payload []byte `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"`
// A signature by the creator specified in the Payload header
Signature []byte `protobuf:"bytes,2,opt,name=signature,proto3" json:"signature,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
到此,orderer的rpc部分的接受结束,接下来就是对信息的解析验证排序处理,然后返回给client一个响应
type AtomicBroadcast_BroadcastServer interface {
Send(*BroadcastResponse) error
Recv() (*common.Envelope, error)
grpc.ServerStream
}
其中的信息
- broadcas.Handler()
- 解析消息:判断是否为配置消息,决定消息应由哪个通道结构进行处理,注意对于创建应用通道消息,处理器指定为系统的通道结构;
- 处理消息:选用对应的通道结构对消息进行处理,包括普通消息和配置消息;
- 返回响应消息给请求方。
// Handler is designed to handle connections from Broadcast AB gRPC service
type Handler struct {
SupportRegistrar ChannelSupportRegistrar
Metrics *Metrics
}
// Handle reads requests from a Broadcast stream, processes them, and returns the responses to the stream
func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
// 获取发送者的addr
addr := util.ExtractRemoteAddress(srv.Context())
logger.Debugf("Starting new broadcast loop for %s", addr)
for {
// 循环等待请求,其实是通过Orderer 的 Broadcast 接口收到来自客户端提交的交易
msg, err := srv.Recv()
if err == io.EOF {
logger.Debugf("Received EOF from %s, hangup", addr)
return nil
}
if err != nil {
logger.Warningf("Error reading from %s: %s", addr, err)
return err
}
// 验证处理接受信息
resp := bh.ProcessMessage(msg, addr)
// 将信息广播出去
err = srv.Send(resp)
if resp.Status != cb.Status_SUCCESS {
return err
}
if err != nil {
logger.Warningf("Error sending to %s: %s", addr, err)
return err
}
}
}
-
Registrar.BroadcastChannelSupport()
- channel 头部从消息信封结构中解析出来
- 是否为配置信息根据消息头中通道类型进行判断(是否为 cb.HeaderType_CONFIG_UPDATE)
- 通过字典结构查到对应的 ChainSupport 结构(应用通道、系统通道)作为处理器。
// BroadcastChannelSupport returns the message channel header, whether the message is a config update
// and the channel resources for a message or an error if the message is not a message which can
// be processed directly (like CONFIG and ORDERER_TRANSACTION messages)
func (r *Registrar) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, *ChainSupport, error) {
// channel 头部从消息信封结构中解析出来;是否为配置信息根据消息头中通道类型进行判断(是否为 cb.HeaderType_CONFIG_UPDATE)
chdr, err := utils.ChannelHeader(msg)
if err != nil {
return nil, false, nil, fmt.Errorf("could not determine channel ID: %s", err)
}
// 获取通道
cs := r.GetChain(chdr.ChannelId)
// New channel creation
// 如果为空则默认系统通道。如收到新建应用通道请求时,Orderer 本地并没有该应用通道对应结构,因此也为空。
if cs == nil {
cs = r.systemChannel
}
isConfig := false
switch cs.ClassifyMsg(chdr) {
// 只有 CONFIG_UPDATE 会返回 ConfigUpdateMsg
case msgprocessor.ConfigUpdateMsg:
isConfig = true
case msgprocessor.ConfigMsg:
return chdr, false, nil, errors.New("message is of type that cannot be processed directly")
default:
}
return chdr, isConfig, cs, nil
}
- commonutils.ChannelHeader()
// ChannelHeader returns the *cb.ChannelHeader for a given *cb.Envelope.
func ChannelHeader(env *cb.Envelope) (*cb.ChannelHeader, error) {
// 解析消息体
envPayload, err := UnmarshalPayload(env.Payload)
if err != nil {
return nil, err
}
if envPayload.Header == nil {
return nil, errors.New("header not set")
}
if envPayload.Header.ChannelHeader == nil {
return nil, errors.New("channel header not set")
}
// 解析真正的通道头
chdr, err := UnmarshalChannelHeader(envPayload.Header.ChannelHeader)
if err != nil {
return nil, errors.WithMessage(err, "error unmarshaling channel header")
}
return chdr, nil
}
根据解析结果,处理交易信息
普通交易信息
// ProcessMessage validates and enqueues a single message
func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) {
...
消息检查
configSeq, err := processor.ProcessNormalMsg(msg)
// 这里主要看客户端和peer节点发来的消息类型,前面已经判断过了
// 假设创建通道的信息,消息类型则为配置信息
if !isConfig {
...
共识排序
err = processor.Order(msg, configSeq)
...
- 消息检查
// ProcessNormalMsg will check the validity of a message based on the current configuration. It returns the current
// configuration sequence number and nil on success, or an error if the message is not valid
func (s *StandardChannel) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
oc, ok := s.support.OrdererConfig()
if !ok {
logger.Panicf("Missing orderer config")
}
if oc.Capabilities().ConsensusTypeMigration() {
if oc.ConsensusState() != orderer.ConsensusType_STATE_NORMAL {
return 0, errors.WithMessage(
ErrMaintenanceMode, "normal transactions are rejected")
}
}
// 获取配置的序列号,映射到 common.configtx 包中 configManager 结构体的对应方法
configSeq = s.support.Sequence()
// 进行过滤检查,实现为 orderer.common.msgprocessor 包中 RuleSet 结构体的对应方法。
err = s.filters.Apply(env)
return
}
过滤器会在创建 ChainSupport 结构时候初始化:
应用通道:orderer.common.mspprocessor 包中的
CreateStandardChannelFilters(filterSupport channelconfig.Resources) *RuleSet
系统通道:orderer.common.mspprocessor 包中的
CreateSystemChannelFilters(chainCreator ChainCreator, ledgerResources channelconfig.Resources) *RuleSet
- 共识排序
根据不同的共识插件,具体逻辑不一致
// TODO 待补充
配置交易信息
config, configSeq, err := processor.ProcessConfigUpdateMsg(msg) // 合并配置更新,生成新的配置信封结构
processor.Configure(config, configSeq) //入队列操作,将生成的配置信封结构消息扔给后端队列(如 Kafka)
- standardchannel.ProcessConfigUpdateMsg()
// orderer/common/msgprocessor/standardchannel.go
func (s *StandardChannel) ProcessConfigUpdateMsg(env *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
logger.Debugf("Processing config update message for channel %s", s.support.ChainID())
seq := s.support.Sequence() // 获取当前配置的版本号
err = s.filters.Apply(env) // 校验权限,是否可以更新配置
if err != nil {
return nil, 0, err
}
// 根据输入的更新配置交易消息生成配置信封结构:Config 为更新后配置字典;LastUpdate 为输入的更新配置交易
// 最终调用 `common/configtx` 包下 `ValidatorImpl.ProposeConfigUpdate()` 方法。
configEnvelope, err := s.support.ProposeConfigUpdate(env)
if err != nil {
return nil, 0, err
}
// 生成签名的配置信封结构,通道头类型为 HeaderType_CONFIG。即排序后消息类型将由 CONFIG_UPDATE 变更为 CONFIG
config, err = utils.CreateSignedEnvelope(cb.HeaderType_CONFIG, s.support.ChainID(), s.support.Signer(), configEnvelope, msgVersion, epoch)
if err != nil {
return nil, 0, err
}
err = s.filters.Apply(config) // 校验生成的配置消息是否合法
if err != nil {
return nil, 0, err
}
return config, seq, nil
}
- standardchannel.ProcessConfigUpdateMsg()
对于系统通道情况,除了调用普通通道结构的对应方法来处理普通的更新配置交易外,还会负责新建应用通道请求。
func (s *SystemChannel) ProcessConfigUpdateMsg(envConfigUpdate *cb.Envelope) (config *cb.Envelope, configSeq uint64, err error) {
// 首先从消息体获取通道ID
channelID, err := utils.ChannelID(envConfigUpdate)
// 判断获取到的通道ID是否为已经存在的用户通道ID,如果是的话转到StandardChannel中的ProcessConfigUpdateMsg()方法进行处理
if channelID == s.support.ChainID() {
return s.StandardChannel.ProcessConfigUpdateMsg(envConfigUpdate)
}
// 从系统通道中获取当前最新的配置,例如创建通道则会走进这个方法
// orderer/common/msgprocessor/systemchannel.go#DefaultTemplator.NewChannelConfig()
bundle, err := s.templator.NewChannelConfig(envConfigUpdate)
// 合并来自客户端的配置更新信封结构,创建配置信封结构 ConfigEnvelope
newChannelConfigEnv, err := bundle.ConfigtxValidator().ProposeConfigUpdate(envConfigUpdate)
// 封装新的签名信封结构,其 Payload.Data 是 newChannelConfigEnv
newChannelEnvConfig, err := utils.CreateSignedEnvelope(cb.HeaderType_CONFIG, channelID, s.support.Signer(), newChannelConfigEnv, msgVersion, epoch)
// 处理新建应用通道请求,封装为 ORDERER_TRANSACTION 类型消息
wrappedOrdererTransaction, err := utils.CreateSignedEnvelope(cb.HeaderType_ORDERER_TRANSACTION, s.support.ChainID(), s.support.Signer(), newChannelEnvConfig, msgVersion, epoch)
s.StandardChannel.filters.Apply(wrappedOrdererTransaction) // 再次校验配置
// 返回封装后的签名信封结构
return wrappedOrdererTransaction, s.support.Sequence(), nil
}
- DefaultTemplator.NewChannelConfig()
// NewChannelConfig creates a new template channel configuration based on the current config in the ordering system channel.
func (dt *DefaultTemplator) NewChannelConfig(envConfigUpdate *cb.Envelope) (channelconfig.Resources, error) {
// 反序列化payload
configUpdatePayload, err := utils.UnmarshalPayload(envConfigUpdate.Payload)
if err != nil {
return nil, fmt.Errorf("Failing initial channel config creation because of payload unmarshaling error: %s", err)
}
// 反序列化配置信息
configUpdateEnv, err := configtx.UnmarshalConfigUpdateEnvelope(configUpdatePayload.Data)
if err != nil {
return nil, fmt.Errorf("Failing initial channel config creation because of config update envelope unmarshaling error: %s", err)
}
if configUpdatePayload.Header == nil {
return nil, fmt.Errorf("Failed initial channel config creation because config update header was missing")
}
// 获取通道头信息
channelHeader, err := utils.UnmarshalChannelHeader(configUpdatePayload.Header.ChannelHeader)
if err != nil {
return nil, fmt.Errorf("Failed initial channel config creation because channel header was malformed: %s", err)
}
// 反序列化配置更新信息
configUpdate, err := configtx.UnmarshalConfigUpdate(configUpdateEnv.ConfigUpdate)
if err != nil {
return nil, fmt.Errorf("Failing initial channel config creation because of config update unmarshaling error: %s", err)
}
if configUpdate.ChannelId != channelHeader.ChannelId {
return nil, fmt.Errorf("Failing initial channel config creation: mismatched channel IDs: '%s' != '%s'", configUpdate.ChannelId, channelHeader.ChannelId)
}
if configUpdate.WriteSet == nil {
return nil, fmt.Errorf("Config update has an empty writeset")
}
if configUpdate.WriteSet.Groups == nil || configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey] == nil {
return nil, fmt.Errorf("Config update has missing application group")
}
if uv := configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey].Version; uv != 1 {
return nil, fmt.Errorf("Config update for channel creation does not set application group version to 1, was %d", uv)
}
// 根据之前定义的各项策略对通道进行配置,具体的策略可以看configtx.yaml文件
consortiumConfigValue, ok := configUpdate.WriteSet.Values[channelconfig.ConsortiumKey]
if !ok {
return nil, fmt.Errorf("Consortium config value missing")
}
consortium := &cb.Consortium{}
err = proto.Unmarshal(consortiumConfigValue.Value, consortium)
if err != nil {
return nil, fmt.Errorf("Error reading unmarshaling consortium name: %s", err)
}
applicationGroup := cb.NewConfigGroup()
consortiumsConfig, ok := dt.support.ConsortiumsConfig()
if !ok {
return nil, fmt.Errorf("The ordering system channel does not appear to support creating channels")
}
consortiumConf, ok := consortiumsConfig.Consortiums()[consortium.Name]
if !ok {
return nil, fmt.Errorf("Unknown consortium name: %s", consortium.Name)
}
applicationGroup.Policies[channelconfig.ChannelCreationPolicyKey] = &cb.ConfigPolicy{
Policy: consortiumConf.ChannelCreationPolicy(),
}
applicationGroup.ModPolicy = channelconfig.ChannelCreationPolicyKey
// Get the current system channel config
// 获取当前系统通道的配置信息
systemChannelGroup := dt.support.ConfigtxValidator().ConfigProto().ChannelGroup
// If the consortium group has no members, allow the source request to have no members. However,
// if the consortium group has any members, there must be at least one member in the source request
if len(systemChannelGroup.Groups[channelconfig.ConsortiumsGroupKey].Groups[consortium.Name].Groups) > 0 &&
len(configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey].Groups) == 0 {
return nil, fmt.Errorf("Proposed configuration has no application group members, but consortium contains members")
}
// If the consortium has no members, allow the source request to contain arbitrary members
// Otherwise, require that the supplied members are a subset of the consortium members
if len(systemChannelGroup.Groups[channelconfig.ConsortiumsGroupKey].Groups[consortium.Name].Groups) > 0 {
for orgName := range configUpdate.WriteSet.Groups[channelconfig.ApplicationGroupKey].Groups {
consortiumGroup, ok := systemChannelGroup.Groups[channelconfig.ConsortiumsGroupKey].Groups[consortium.Name].Groups[orgName]
if !ok {
return nil, fmt.Errorf("Attempted to include a member which is not in the consortium")
}
applicationGroup.Groups[orgName] = proto.Clone(consortiumGroup).(*cb.ConfigGroup)
}
}
channelGroup := cb.NewConfigGroup()
// Copy the system channel Channel level config to the new config
// 将系统通道的配置信息复制
for key, value := range systemChannelGroup.Values {
channelGroup.Values[key] = proto.Clone(value).(*cb.ConfigValue)
if key == channelconfig.ConsortiumKey {
// Do not set the consortium name, we do this later
continue
}
}
for key, policy := range systemChannelGroup.Policies {
channelGroup.Policies[key] = proto.Clone(policy).(*cb.ConfigPolicy)
}
// Set the new config orderer group to the system channel orderer group and the application group to the new application group
// 新的配置信息中Order组配置使用系统通道的配置,同时将定义的application组配置赋值到新的配置信息
channelGroup.Groups[channelconfig.OrdererGroupKey] = proto.Clone(systemChannelGroup.Groups[channelconfig.OrdererGroupKey]).(*cb.ConfigGroup)
channelGroup.Groups[channelconfig.ApplicationGroupKey] = applicationGroup
channelGroup.Values[channelconfig.ConsortiumKey] = &cb.ConfigValue{
Value: utils.MarshalOrPanic(channelconfig.ConsortiumValue(consortium.Name).Value()),
ModPolicy: channelconfig.AdminsPolicyKey,
}
// Non-backwards compatible bugfix introduced in v1.1
// The capability check should be removed once v1.0 is deprecated
if oc, ok := dt.support.OrdererConfig(); ok && oc.Capabilities().PredictableChannelTemplate() {
channelGroup.ModPolicy = systemChannelGroup.ModPolicy
zeroVersions(channelGroup)
}
// 将创建的新的配置打包为Bundle
bundle, err := channelconfig.NewBundle(channelHeader.ChannelId, &cb.Config{
ChannelGroup: channelGroup,
})
if err != nil {
return nil, err
}
return bundle, nil
}
-
共识部分
// TODO 待补充 -
返回响应成功,这里会是对信息接受成功的后对client的一个状态响应
return &ab.BroadcastResponse{Status: cb.Status_SUCCESS}
Deliver
Deliver部分主要是将生成的区块发送给peer,这部分类似生产消费模型,orderer就是一个区块的生产者,peer就是消费区块的地方。
orderer部分的主要逻辑在deliverBlocks,其中部分逻辑如下:
for {
if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
if number > chain.Reader().Height()-1 {
return cb.Status_NOT_FOUND, nil
}
}
var block *cb.Block
var status cb.Status
iterCh := make(chan struct{})
// 这里开启一个协程的作用是避免不必要的阻塞?
go func() {
block, status = cursor.Next()
close(iterCh)
}()
select {
case <-ctx.Done():
logger.Debugf("Context canceled, aborting wait for next block")
return cb.Status_INTERNAL_SERVER_ERROR, errors.Wrapf(ctx.Err(), "context finished before block retrieved")
case <-erroredChan:
// TODO, today, the only user of the errorChan is the orderer consensus implementations. If the peer ever reports
// this error, we will need to update this error message, possibly finding a way to signal what error text to return.
logger.Warningf("Aborting deliver for request because the backing consensus implementation indicates an error")
return cb.Status_SERVICE_UNAVAILABLE, nil
case <-iterCh:
// Iterator has set the block and status vars
}
if status != cb.Status_SUCCESS {
logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
return status, nil
}
// increment block number to support FAIL_IF_NOT_READY deliver behavior
number++
if err := accessControl.Evaluate(); err != nil {
logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err)
return cb.Status_FORBIDDEN, nil
}
// 返回peer的区块请求
logger.Infof("Start delivering block [channel: %s, txid: %s, type: %s, from: %s, blockNum: %d, prevHash: %s] at (%s)", chdr.ChannelId, chdr.TxId, cb.HeaderType_name[chdr.Type], addr, number, base64.StdEncoding.EncodeToString(block.Header.DataHash[:]), strconv.FormatInt(time.Now().UnixNano(), 10))
if err := srv.SendBlockResponse(block); err != nil {
logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
return cb.Status_INTERNAL_SERVER_ERROR, err
}
logger.Infof("End delivering block [channel: %s, txid: %s, type: %s, from: %s, blockNum: %d, prevHash: %s] at (%s)", chdr.ChannelId, chdr.TxId, cb.HeaderType_name[chdr.Type], addr, number, base64.StdEncoding.EncodeToString(block.Header.DataHash[:]), strconv.FormatInt(time.Now().UnixNano(), 10))
h.Metrics.BlocksSent.With(labels...).Add(1)
// 如果读到了事先在请求消息中的结束的位置,跳出循环,因为peer第一次请求发送的stopNum很大,所以这个for循环会/// 不断的迭代
if stopNum == block.Header.Number {
break
}
}
这个是一个并发的异步阻塞模型,可以看到每个区块的获取都是一个协程,select部分决定了是否可以继续执行下一步。
func (bh *Handler) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
addr := util.ExtractRemoteAddress(srv.Context())
logger.Debugf("Starting new broadcast loop for %s", addr)
for {
msg, err := srv.Recv()
//logger.Infof("Recving a new message at %d", time.Now().UnixNano())
if err == io.EOF {
logger.Debugf("Received EOF from %s, hangup", addr)
return nil
}
if err != nil {
logger.Warningf("Error reading from %s: %s", addr, err)
return err
}
logger.Infof("Start Processing message at %d", time.Now().UnixNano())
resp := bh.ProcessMessage(msg, addr)
logger.Debugf("End Processing message at %d", time.Now().UnixNano())
err = srv.Send(resp)
if resp.Status != cb.Status_SUCCESS {
return err
}
if err != nil {
logger.Warningf("Error sending to %s: %s", addr, err)
return err
}
}
}
orderer接受到peer的区块请求rpc服务,orderer会通过SendBlockResponse服务来发送区块
// ResponseSender defines the interface a handler must implement to send
// responses.
type ResponseSender interface {
SendStatusResponse(status cb.Status) error
SendBlockResponse(block *cb.Block) error
}
type AtomicBroadcast_DeliverServer interface {
Send(*DeliverResponse) error
Recv() (*common.Envelope, error)
grpc.ServerStream
}
流程图
下图展示了Orderer向Peer传递区块的宏观视角,能够展示多个通道在Orderer和Peer间传递区块的情况:
单通道区块同步
生成区块流程
总体步骤
- 接收client的消息
- 处理验证消息
- 交给共识部分排序打包
- 将块发送给peer
详细内容可以对照前面来查看
fabric-samples中的并发版本
这是fabric官方的一个并发测试版本,优点是快速部署完整的网络加完善的脚本,缺点是目前只能在单机部署
步骤
- cd into the first-network folder within fabric-samples, e.g. cd ~/fabric-samples/first-network
- Open docker-compose-cli.yaml in your favorite editor, and edit the following lines:
- In the volumes section of the cli container, edit the second line which refers to the chaincode folder to point to the chaincode folder within the high-throughput folder,
- e.g../../chaincode/:/opt/gopath/src/github.com/hyperledger/fabric/examples/chaincode/go --> ./../high-throughput/chaincode/:/opt/gopath/src/github.com/hyperledger/fabric/examples/chaincode/go
- Again in the volumes section, edit the fourth line which refers to the scripts folder so it points to the scripts folder within the high-throughput folder, e.g.
- ./scripts:/opt/gopath/src/github.com/hyperledger/fabric/peer/scripts/ --> ./../high-throughput/scripts/:/opt/gopath/src/github.com/hyperledger/fabric/peer/scripts/
- Finally, comment out the docker exec cli scripts/script.sh command from the byfn.sh script by placing a # before it so that the standard BYFN end to end script doesn't run, e.g.
- docker exec cli scripts/script.sh $CHANNEL_NAME $CLI_DELAY $LANGUAGE $CLI_TIMEOUT $VERBOSE
- We can now bring our network up by typing in ./byfn.sh -m up -c mychannel
- Open a new terminal window and enter the CLI container using docker exec -it cli bash, all operations on the network will happen within this container from now on.
- In the volumes section of the cli container, edit the second line which refers to the chaincode folder to point to the chaincode folder within the high-throughput folder,
如果没有问题则可以进行对网络的操作,详情可参考官方文档
如何利用wrk对这个网络进行测试
- 下载编译wrk
- 启动一个网络监听
简单示例
r := gin.Default()
r.GET("/update", func(c *gin.Context) {
//读取配置文件,创建SDK对象
configProvider := config.FromFile("/root/fabric-samples/first-network/test/config.yaml")
sdk, err := fabsdk.New(configProvider)
if err != nil {
log.Fatalf("create sdk fail: %s
", err.Error())
}else{
fmt.Println("create a new fabsdk")
}
//调用合约
channelProvider := sdk.ChannelContext("mychannel",
fabsdk.WithUser("Admin"),
fabsdk.WithOrg("org1"))
channelClient, err := channel.New(channelProvider)
if err != nil {
log.Fatalf("create channel client fail: %s
", err.Error())
} else {
fmt.Println("create channelClient succeed")
}
println("---------------ok-----------")
//构建函数请求
var args1 [][]byte
args1 = append(args1, []byte("myvar"), []byte("10"), []byte("+"))
var target =[]string{"peer0.org1.example.com","peer1.org2.example.com"}
//调用channelClient执行请求
//response, err := channelClient.Execute(request,channel.WithTargetEndpoints(target...))
request := channel.Request{
ChaincodeID: "bigdatacc",
Fcn: "update",
Args: args1,
}
response, err := channelClient.Execute(request,channel.WithTargetEndpoints(target...))
//response, err := channelClient.Execute(request)
if err != nil {
log.Fatal("query fail: ", err.Error())
} else {
fmt.Printf("response is [%s]
", string(response.Payload))
}
})
r.Run(":8080")
- 利用wrk对这个网络进行请求测试
wrk -t10 -c150 -d10 --timeout 100 http://0.0.0.0:8080/update
fabric源码中的protobuf对象
大致是orderer和common这块的protobuf的分析
orderer
ab.proto
- 广播响应
这里指client段发送给orderer信息,然后orderer返回响应
message BroadcastResponse {
// Status code, which may be used to programatically respond to success/failure
common.Status status = 1;
// Info string which may contain additional information about the status returned
string info = 2;
}
- 区块请求信息
这里指peer段向orderer请求区块的一些信息
message SeekNewest { }
message SeekOldest { }
message SeekPosition {
oneof Type {
SeekNewest newest = 1;
SeekOldest oldest = 2;
SeekSpecified specified = 3;
}
}
message SeekInfo {
// If BLOCK_UNTIL_READY is specified, the reply will block until the requested blocks are available,
// if FAIL_IF_NOT_READY is specified, the reply will return an error indicating that the block is not
// found. To request that all blocks be returned indefinitely as they are created, behavior should be
// set to BLOCK_UNTIL_READY and the stop should be set to specified with a number of MAX_UINT64
enum SeekBehavior {
BLOCK_UNTIL_READY = 0;
FAIL_IF_NOT_READY = 1;
}
// SeekErrorTolerance indicates to the server how block provider errors should be tolerated. By default,
// if the deliver service detects a problem in the underlying block source (typically, in the orderer,
// a consenter error), it will begin to reject deliver requests. This is to prevent a client from waiting
// for blocks from an orderer which is stuck in an errored state. This is almost always the desired behavior
// and clients should stick with the default STRICT checking behavior. However, in some scenarios, particularly
// when attempting to recover from a crash or other corruption, it's desirable to force an orderer to respond
// with blocks on a best effort basis, even if the backing consensus implementation is in an errored state.
// In this case, set the SeekErrorResponse to BEST_EFFORT to ignore the consenter errors.
enum SeekErrorResponse {
STRICT = 0;
BEST_EFFORT = 1;
}
SeekPosition start = 1; // The position to start the deliver from
SeekPosition stop = 2; // The position to stop the deliver
SeekBehavior behavior = 3; // The behavior when a missing block is encountered
SeekErrorResponse error_response = 4; // How to respond to errors reported to the deliver service
}
- 分发响应
这里指orderer响应peer的区块请求
message DeliverResponse {
oneof Type {
common.Status status = 1;
common.Block block = 2;
}
}
- 定义广播和分发rpc服务
service AtomicBroadcast {
// broadcast receives a reply of Acknowledgement for each common.Envelope in order, indicating success or type of failure
rpc Broadcast(stream common.Envelope) returns (stream BroadcastResponse) {}
// deliver first requires an Envelope of type DELIVER_SEEK_INFO with Payload data as a mashaled SeekInfo message, then a stream of block replies is received.
rpc Deliver(stream common.Envelope) returns (stream DeliverResponse) {}
}
configuration.proto
- 共识的类型定义
message ConsensusType {
// The consensus type: "solo", "kafka" or "etcdraft".
string type = 1;
// Opaque metadata, dependent on the consensus type.
bytes metadata = 2;
// State defines the orderer mode of operation, typically for consensus-type migration.
// NORMAL is during normal operation, when consensus-type migration is not, and can not, take place.
// MAINTENANCE is when the consensus-type can be changed.
enum State {
STATE_NORMAL = 0;
STATE_MAINTENANCE = 1;
}
// The state signals the ordering service to go into maintenance mode, typically for consensus-type migration.
State state = 3;
}
- 切块的一些配置
message BatchSize {
// Simply specified as number of messages for now, in the future
// we may want to allow this to be specified by size in bytes
uint32 max_message_count = 1;
// The byte count of the serialized messages in a batch cannot
// exceed this value.
uint32 absolute_max_bytes = 2;
// The byte count of the serialized messages in a batch should not
// exceed this value.
uint32 preferred_max_bytes = 3;
}
common.proto
- HTTP的状态信息
enum Status {
UNKNOWN = 0;
SUCCESS = 200;
BAD_REQUEST = 400;
FORBIDDEN = 403;
NOT_FOUND = 404;
REQUEST_ENTITY_TOO_LARGE = 413;
INTERNAL_SERVER_ERROR = 500;
NOT_IMPLEMENTED = 501;
SERVICE_UNAVAILABLE = 503;
}
- HeaderType
比如channelheader的类型就是从这检索
enum HeaderType {
// Prevent removed tag re-use
// Uncomment after fabric-baseimage moves to 3.5.1
// reserved 7;
// reserved "PEER_RESOURCE_UPDATE";
MESSAGE = 0; // Used for messages which are signed but opaque
CONFIG = 1; // Used for messages which express the channel config
CONFIG_UPDATE = 2; // Used for transactions which update the channel config
ENDORSER_TRANSACTION = 3; // Used by the SDK to submit endorser based transactions
ORDERER_TRANSACTION = 4; // Used internally by the orderer for management
DELIVER_SEEK_INFO = 5; // Used as the type for Envelope messages submitted to instruct the Deliver API to seek
CHAINCODE_PACKAGE = 6; // Used for packaging chaincode artifacts for install
PEER_ADMIN_OPERATION = 8; // Used for invoking an administrative operation on a peer
TOKEN_TRANSACTION = 9; // Used to denote transactions that invoke token management operations
}
- Header
签名头,通道头
message Header {
bytes channel_header = 1;
bytes signature_header = 2;
}
- ChannelHeader
通道头,channelid,txid,version等信息
message ChannelHeader {
int32 type = 1; // Header types 0-10000 are reserved and defined by HeaderType
// Version indicates message protocol version
int32 version = 2;
// Timestamp is the local time when the message was created
// by the sender
google.protobuf.Timestamp timestamp = 3;
// Identifier of the channel this message is bound for
string channel_id = 4;
// An unique identifier that is used end-to-end.
// - set by higher layers such as end user or SDK
// - passed to the endorser (which will check for uniqueness)
// - as the header is passed along unchanged, it will be
// be retrieved by the committer (uniqueness check here as well)
// - to be stored in the ledger
string tx_id = 5;
// The epoch in which this header was generated, where epoch is defined based on block height
// Epoch in which the response has been generated. This field identifies a
// logical window of time. A proposal response is accepted by a peer only if
// two conditions hold:
// 1. the epoch specified in the message is the current epoch
// 2. this message has been only seen once during this epoch (i.e. it hasn't
// been replayed)
uint64 epoch = 6;
// Extension that may be attached based on the header type
bytes extension = 7;
// If mutual TLS is employed, this represents
// the hash of the client's TLS certificate
bytes tls_cert_hash = 8;
}
- SignatureHeader
签名头
message SignatureHeader {
// Creator of the message, a marshaled msp.SerializedIdentity
bytes creator = 1;
// Arbitrary number that may only be used once. Can be used to detect replay attacks.
bytes nonce = 2;
}
- Playload
消息内容
message Payload {
// Header is included to provide identity and prevent replay
Header header = 1;
// Data, the encoding of which is defined by the type in the header
bytes data = 2;
}
- Envelope
信封用签名包裹有效负载,以便可以对消息进行身份验证
message Envelope {
// A marshaled Payload
bytes payload = 1;
// A signature by the creator specified in the Payload header
bytes signature = 2;
}
- Block
区块分区块头,区块内容,区块元数据
message Block {
BlockHeader header = 1;
BlockData data = 2;
BlockMetadata metadata = 3;
}
message BlockHeader {
uint64 number = 1; // The position in the blockchain
bytes previous_hash = 2; // The hash of the previous block header
bytes data_hash = 3; // The hash of the BlockData, by MerkleTree
}
- BlockData
可以看到它是一个二维字节数组,符合我们的多条交易打包的需求
message BlockData {
repeated bytes data = 1;
}
message BlockMetadata {
repeated bytes metadata = 1;
}
ledger.proto
账本的结构比较简单,区块高度,hash
message BlockchainInfo {
uint64 height = 1;
bytes currentBlockHash = 2;
bytes previousBlockHash = 3;
}
主要参考
- 杨保华《区块链原理、设计与应用》
- https://lessisbetter.site/