zoukankan      html  css  js  c++  java
  • 菜鸟系列Fabric源码学习 — kafka共识机制

    有兴趣的关注IT程序员客栈哦

    Fabric 1.4源码分析 kafka共识机制

    本文档主要介绍kafka共识机制流程。在查看文档之前可以先阅览raft共识流程以及orderer服务启动流程。

    1. kafka 简介

    Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统,一种高吞吐量的分布式发布订阅消息系统。kafka详细介绍可以参考这一篇博客。kafka介绍

    2. kafka共识

    kafka共识当中,每个orderer节点即是生产者Producer也是消费者Consumer,在具体设计当中,每个channel对应一个topic,并且为了保证顺序性,只设置了一个patition。(参见orderer启动初始化kafka共识代码kafka.New(conf, metricsProvider, healthChecker, registrar)),关于kafka共识,这里推荐一篇博客,可以看看设计思路以及实现流程。The ABCs of Kafka in Hyperledger Fabric

    实现共识算法需要实现的接口。

    type Consenter interface {
        // 处理普通交易
    	Order(env *cb.Envelope, configSeq uint64) error
        // 处理配置交易
    	Configure(config *cb.Envelope, configSeq uint64) error
    	WaitReady() error
    }
    

    而接口chain在Consenter接口基础上增加来部分接口

    type Chain interface {
    	Order(env *cb.Envelope, configSeq uint64) error
    	Configure(config *cb.Envelope, configSeq uint64) error
    	WaitReady() error
    	Errored() <-chan struct{}
    	// 分配资源
    	Start()
    	// 释放资源
    	Halt()
    	MigrationStatus() migration.Status
    }
    

    kafka共识实现代码路径为:orderer/consensus/kafka/chain.go;首先,在创建chain时会调用start()方法分配资源,在kafka共识中,会初始化生产者producer、消费者Consumer以及一些配置。后续重点通过源码来介绍producer和consumer模块实现以及kafka共识整个交易的流程,即主要介绍交易排序整个流程。在此基础上,解决个人的一些疑问。

    3. 交易排序处理

    3.1 orderer作为生产者

    首先,当发送一个交易给orderer时,会调用orderer模块的broadcast()服务,其中会调用bh.ProcessMessage(msg, addr)方法根据交易类型调用不同的方法处理。

    其中无论是配置交易还是普通交易都会调用chain.enqueue()方法,通过chain.producer.SendMessage(message)方法将交易写入kafka。从而orderer作为生产者角色功能就是将客户端发过来的交易写入kafka。再次强调一下,每个通道对应一个topic,每个topic只有一个patition。

    func (chain *chainImpl) enqueue(kafkaMsg *ab.KafkaMessage) bool {
    	logger.Debugf("[channel: %s] Enqueueing envelope...", chain.ChainID())
    	select {
    	case <-chain.startChan: // The Start phase has completed
    		select {
    		case <-chain.haltChan: // The chain has been halted, stop here
    			logger.Warningf("[channel: %s] consenter for this channel has been halted", chain.ChainID())
    			return false
    		default: // The post path
    			payload, err := utils.Marshal(kafkaMsg)
    			if err != nil {
    				logger.Errorf("[channel: %s] unable to marshal Kafka message because = %s", chain.ChainID(), err)
    				return false
    			}
    			message := newProducerMessage(chain.channel, payload)
    			if _, _, err = chain.producer.SendMessage(message); err != nil {
    				logger.Errorf("[channel: %s] cannot enqueue envelope because = %s", chain.ChainID(), err)
    				return false
    			}
    			logger.Debugf("[channel: %s] Envelope enqueued successfully", chain.ChainID())
    			return true
    		}
    	default: // Not ready yet
    		logger.Warningf("[channel: %s] Will not enqueue, consenter for this channel hasn't started yet", chain.ChainID())
    		return false
    	}
    }
    

    3.2 orderer作为消费者

    orderer作为消费者的功能为:将kafka对应topic里面的交易打包成区块,并写入账本。

    其中,在orderer创建对应chain的时候调用chain.start()。

    func (chain *chainImpl) Start() {
    	go startThread(chain)
    }
    

    kafka会开启协程go startThread(chain),其中会对kafka进行一系列初始化工作。最后会调用chain.processMessagesToBlocks()方法,生成对应区块。

    func (chain *chainImpl) processMessagesToBlocks() ([]uint64, error) {
        ...
    	for {
    		select {
    	    ...
    	   	case in, ok := <-chain.channelConsumer.Messages():
                ...
    			switch msg.Type.(type) {
                ...
    			case *ab.KafkaMessage_Regular:
    				if err := chain.processRegular(msg.GetRegular(), in.Offset); err != nil {
    					logger.Warningf("[channel: %s] Error when processing incoming message of type REGULAR = %s", chain.ChainID(), err)
    					counts[indexProcessRegularError]++
    				} else {
    					counts[indexProcessRegularPass]++
    				}
    			}
                ...
    	}
    }
    

    其中,会对chain.processRegular(msg.GetRegular(), in.Offset)消息进行处理。

    其中,会针对配置交易和普通交易进行分别处理。普通交易会调用chain.BlockCutter().Ordered(message)生成对应的batchs,配置交易会一个交易一个区块,直接调用chain.BlockCutter().Cut()生成batch。然后再生成区块、写入账本。

    4. 问题思考

    1. kafka共识模式下动态更新系统通道配置添加orderer,是否就可提供排序服务。
      经查看代码,在orderer启动过程中,只有raft共识会判断该orderer(raft节点)是否在对应对consenter集群中。如果不在则会创建inactivechain,无法提供排序服务(必须更新consenter才行)。但是kafka不存在上述过程,在orderer启动后,会从kafka同步系统通道区块,当区块包括创建通道交易时,会创建应用通道,同步应用通道区块(该流程类似raft共识写账本流程)。因此,该orderer可以提供服务,但是如果是每个组织提供orderer的场景、由于没有更新应用通道排序组织,从而导致无法通过服务发现获取该orderer信息。
    如果你觉得写的不错,请移步www.itkezhan.top或者关注公众号IT程序员客栈
  • 相关阅读:
    C
    A
    枚举子集的几种方法
    Codeforces Round #476 (Div. 2) [Thanks, Telegram!] ABCDE
    wannafly挑战赛14
    2018西安电子科大同步赛
    概率dp学习记录
    xcoj 1103 插线板(树链刨分求最大子段和)
    bzoj 2286(虚树+树形dp) 虚树模板
    bzoj3012(Trie)
  • 原文地址:https://www.cnblogs.com/i-dandan/p/12216983.html
Copyright © 2011-2022 走看看