zoukankan      html  css  js  c++  java
  • Kafka系列之-Kafka Protocol实例分析

      本文基于A Guide To The Kafka Protocol文档,以及Spark Streaming中实现的org.apache.spark.streaming.kafka.KafkaCluster类。整理出Kafka中有关

    • Metadata API
    • Produce API
    • Fetch API
    • Offset API(Aka ListOffset)
    • Offset Commit/Fetch API
    • Group Membership API
    • Administrative API
        

    零、准备工作

      需要运行以下部分的示例代码时,需要提前建好需要的topic,写入一些message,再用consumer消费一下。

    1、新建topic

    [hadoop@kafka001 kafka]$ bin/kafka-topics.sh --zookeeper kafka001:2181 --create --topic kafka_protocol_test --replication-factor 3 --partitions 4
    Created topic "kafka_protocol_test".
    [hadoop@kafka001 kafka]$ bin/kafka-topics.sh --zookeeper kafka001:2181 --describe --topic kafka_protocol_test
    Topic:kafka_protocol_test   PartitionCount:4    ReplicationFactor:3 Configs:
        Topic: kafka_protocol_test  Partition: 0    Leader: 1   Replicas: 1,2,3 Isr: 1,2,3
        Topic: kafka_protocol_test  Partition: 1    Leader: 2   Replicas: 2,3,4 Isr: 2,3,4
        Topic: kafka_protocol_test  Partition: 2    Leader: 3   Replicas: 3,4,1 Isr: 3,4,1
        Topic: kafka_protocol_test  Partition: 3    Leader: 4   Replicas: 4,1,2 Isr: 4,1,2

    2、produce message

      使用Kafka系列之-自定义Producer中提到的KafkaProducerTool往指定kafka_protocol_test中发送消息,

    public class ProducerTest2 {
        public static void main(String[] args) throws InterruptedException {
            KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl("D:\Files\test\kafkaconfig.properties");
            int i = 1;
            while(true) {
                kafkaProducerTool.publishMessage("message" + (i++));
            }
        }
    }

      运行一段时间后停止写入。运行一个console-consumerkafka_protocol_test消费message

    3、consume message

      运行一个console-consumerkafka_protocol_test消费。注意观察该topic每个partition中的messages数。

    [hadoop@kafka001 kafka]$ bin/kafka-console-consumer.sh --zookeeper kafka001:2181 --topic kafka_protocol_test --from-beginning

      这里写图片描述

    一、Metadata API

      这个API是通过向Kafka集群发送一个TopicMetadaaRequest请求,得到MetadataResponse响应后从MetadataResponse中解析出Metadata相关信息。
      TopicMetadataRequest的结构和示例如下
      

    case class TopicMetadataRequest(val versionId: Short,
                                    val correlationId: Int,
                                    val clientId: String,
                                    val topics: Seq[String])
    
    TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics)

      得到的MetadataResponse包含的信息如下,可以从PartitionMetadata中获取到Partition相关信息,从TopicMetadata中获取到Topic相关信息,Broker中记录了Brokerip和端口号等。

    MetadataResponse => [Broker][TopicMetadata]
      Broker => NodeId Host Port  (any number of brokers may be returned)
        NodeId => int32
        Host => string
        Port => int32
      TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
        TopicErrorCode => int16
      PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
        PartitionErrorCode => int16
        PartitionId => int32
        Leader => int32
        Replicas => [int32]
        Isr => [int32]

    1、所包含的信息

    可以查询指定Topic是否存在,
    指定topic有多少个partition,
    每个partition当前哪个broker处于leader状态,
    每个broker的host和port是什么

      如果设置了auto.create.topics.enable参数,遇到不存在的topic时,就会按默认replicationpartition新建该不存在的topic
      

    2、示例

      生成一个TopicMetadataRequest对象

    // 封装一个TopicMetadataRequest类型的请求对象
    val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics)
    // 发送该请求
    val resp: TopicMetadataResponse = consumer.send(req)
    // 其中consumer对象是SimpleConsumer类型的
    new SimpleConsumer(host, port, config.socketTimeoutMs,
          config.socketReceiveBufferBytes, config.clientId)

    (1)查询topic是否存在
      由于在TopicMetadataRequest中可以发送一组Seq[String]类型的topics,所以获取到的TopicMetadataResponse.topicsMetadataSet[TopicMetadata]类型的。
      对每个TopicMetadata对象,如果其errorCode不为ErrorMapping.NoError即表示该Topic不正常。
      

    topicMetadatas.foreach { topic =>
      if (topic.errorCode == ErrorMapping.NoError)
        println(s"topic: ${topic.topic}存在")
      else
        println(s"topic: ${topic.topic}不存在")
    }

    (2)获取Topic的Partition个数
      首先将所有TopicMetadata中正常的Topic过滤出来,然后遍历每一个TopicMetadata对象,获取其partitionsMetadata信息,其长度即Partition的个数

    val existsTopicMetadatas = topicMetadatas.filter(tm => tm.errorCode == ErrorMapping.NoError)
    existsTopicMetadatas.foreach { topic =>
       val numPartitions = topic.partitionsMetadata.length
       println(s"topic: ${topic.topic} 有${numPartitions}个partition")
    }

    (3)获取Partition具体情况
      以下代码可以获取到Topic的每个Partition中的Leader Partition以及replication节点的信息。

    existsTopicMetadatas.foreach { topic =>
        println(s"topic:${topic.topic}的Partition信息:")
        topic.partitionsMetadata.foreach { pm =>
        val leaderPartition = pm.leader
        println(s"	partition: ${pm.partitionId}")
        println(s"	leader节点:$leaderPartition")
        val replicas = pm.replicas
        println(s"	replicas节点:$replicas")
      }
    }

    3、运行结果

      传入上面新建的kafka_protocol_test以及一个不存在的topic kafka_protocol_test1,以上代码的运行结果如下:

    =============Topic相关信息===========
    topic: kafka_protocol_test存在
    topic: kafka_protocol_test1不存在
    topic: kafka_protocol_test 有4个partition
    =============Partition相关信息===========
    topic:kafka_protocol_test的Partition信息:
        partition: 0
        leader节点:Some(id:1,host:kafka001,port:9092)
        replicas节点:Vector(id:1,host:kafka001,port:9092, id:2,host:kafka002,port:9092, id:3,host:kafka003,port:9092)
        partition: 1
        leader节点:Some(id:2,host:kafka002,port:9092)
        replicas节点:Vector(id:2,host:kafka002,port:9092, id:3,host:kafka003,port:9092, id:4,host:kafka004,port:9092)
        partition: 2
        leader节点:Some(id:3,host:kafka003,port:9092)
        replicas节点:Vector(id:3,host:kafka003,port:9092, id:4,host:kafka004,port:9092, id:1,host:kafka001,port:9092)
        partition: 3
        leader节点:Some(id:4,host:kafka004,port:9092)
        replicas节点:Vector(id:4,host:kafka004,port:9092, id:1,host:kafka001,port:9092, id:2,host:kafka002,port:9092)

    二、Produce API

    三、Fetch API

    四、Offset API(Aka ListOffset)

      这个API通过向Kafka集群发送一个OffsetRequest对象,从返回的OffsetResponse对象中获取Offset相关信息。
      OffsetRequest对象描述如下

    OffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]]
      ReplicaId => int32
      TopicName => string
      Partition => int32
      Time => int64
      MaxNumberOfOffsets => int32

      上面Time的作用是,获取特定时间(单位为ms)之前的所有messages。如果设置为-1则获取最新的offset,即下一条messagesoffset位置;如果设置为-2则获取第一条messageoffset位置,即当前partition中的offset起始位置。

      OffsetResponse对象描述如下

    OffsetResponse => [TopicName [PartitionOffsets]]
      PartitionOffsets => Partition ErrorCode [Offset]
      Partition => int32
      ErrorCode => int16
      Offset => int64

    1、所包含的信息

      通过该API可以获取指定topic-partition集合的合法offset的范围,需要直接连接到PartitionLeader节点。

    2、示例

      获取指定topic下所有partitionoffset范围
      封装一个getLeaderOffsets方法,在此方法的基础上分别封装一个getEarliestLeaderOffsets方法用于获取最小offsetgetLatestLeaderOffsets用于获取最大offset
      分别传入的关键参数是前面提到的Time

    def getLatestLeaderOffsets(
           topicAndPartitions: Set[TopicAndPartition]
           ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
        getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime) // -1L
    def getEarliestLeaderOffsets(
           topicAndPartitions: Set[TopicAndPartition]
           ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
        getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime) // -2L
    
      在getLeaderOffsets中,查询到当前partition的leader节点,
      def findLeaders(topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
        // 获取当前topicAndPartitions中的所有topic
        val topics = topicAndPartitions.map(_.topic)
        // 获取topic对应的MetadataResp对象,之前已过滤不存在的topic,所以这里无需进一步过滤
        val topicMetadatas = getMetadataResp(topics.toSeq).left.get
    
        val leaderMap = topicMetadatas.flatMap { topic =>
          topic.partitionsMetadata.flatMap { pm =>
            val tp = TopicAndPartition(topic.topic, pm.partitionId)
            // 获取对应PartitionMedatada的leader节点信息
            pm.leader.map { l =>
              tp -> (l.host -> l.port)
            }
          }
        }.toMap
        Right(leaderMap)
      }

      然后在这些节点中,封装一个OffsetRequest对象,向Kafka集群获得OffsetResponse对象。

    val resp = consumer.getOffsetsBefore(req)
    val respMap = resp.partitionErrorAndOffsets

      最后从OffsetResponse对象中获取offset范围,

    val resp = getMetadataResp(topics.toSeq)
        // 如果获取的resp是left,则处理返回的Set[TopicMetadata]
    val topicAndPartitions = processRespInfo(resp) { resp =>
    val topicMetadatas = resp.left.get.asInstanceOf[Set[TopicMetadata]]
    val existsTopicMetadatas = topicMetadatas.filter(tm => tm.errorCode == ErrorMapping.NoError)
       getPartitions(existsTopicMetadatas)
    }.asInstanceOf[Set[TopicAndPartition]]
    
    // 获取指定topic-partition最早的offset
    val offsetBegin = getEarliestLeaderOffsets(topicAndPartitions).right.get
    // 获取指定topic-partition最晚的offset
    val offsetEnd = getLatestLeaderOffsets(topicAndPartitions).right.get
    
    print("=============Offset范围信息===========")
    topicAndPartitions.foreach { tp =>
       println(s"topic: ${tp.topic}, Partition: ${tp.partition} 的Offset范围:")
       println(s"	${offsetBegin(tp).offset} ~ ${offsetEnd(tp).offset}")
    }

    3、运行结果

      连接到kafka_protocol_test,运行结果如下

    topic: kafka_protocol_test, Partition: 0Offset范围:
        0 ~ 9000
    topic: kafka_protocol_test, Partition: 1Offset范围:
        0 ~ 598134
    topic: kafka_protocol_test, Partition: 2Offset范围:
        0 ~ 0
    topic: kafka_protocol_test, Partition: 3Offset范围:
        0 ~ 91000

      和第零节中图片显示结果一致。

    五、Offset Commit/Fetch API

      首先参考Offset Management文档中的描述,分析一下Kafka中有关Offset管理的文档。
      在这篇文档中主要提供了OffsetFetchOffsetCommit两个API,其中
      

    1、OffsetFetch API

      这个API可以获取一个Consumer读取messageoffset信息。发送的请求是OffsetFetchRequest类型的对象,接收到的是OffsetFetchResponse类型的响应。具体offset信息可以从OffsetFetchResponse对象中解析。
      发送的Request请求为,需要指定consumer所属的group,以及需要获取offset的所有TopicAndPartitions

    val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, 0)
    
      或得到的响应为OffsetFetchResponse类型的对象。
    val resp = consumer.fetchOffsets(req)
      其中consumer对象是SimpleConsumer类型的
    new SimpleConsumer(host, port, config.socketTimeoutMs,
          config.socketReceiveBufferBytes, config.clientId)
    
      具体获取offset的逻辑如下,
    withBrokers(Random.shuffle(config.seedBrokers)) { consumer =>
      // 连接consumer,发送该OffsetFetchRequest请求
      val resp = consumer.fetchOffsets(req)
      val respMap = resp.requestInfo
      // 从传入的topicAndPartitions中取出不包含在result中的topicAndPartition
      val needed = topicAndPartitions.diff(result.keySet)
      // 遍历每一个需要获取offset的topic-partition
      needed.foreach { tp: TopicAndPartition =>
        respMap.get(tp).foreach { ome: OffsetMetadataAndError =>
          // 如果没有错误
          if (ome.error == ErrorMapping.NoError) {
            result += tp -> ome
          } else {
            errs.append(ErrorMapping.exceptionFor(ome.error))
          }
        }
      }
      if (result.keys.size == topicAndPartitions.size) {
        return Right(result)
      }
    }

    2、OffsetCommit API

      当最终调用commit()方法,或者如果启用了autocommit参数时,这个API可以使consumer保存其消费的offset信息。
      发送的Request请求为OffsetCommitRequest类型。

      OffsetCommitRequest需要传入的参数如下,

    val offsetEnd = getLatestLeaderOffsets(topicAndPartitions).right.get
    val resetOffsets = offsetsFetch.right.get.map { offsetInfo =>
    val plus10Offset = offsetInfo._2.offset + 10
       offsetInfo._1 -> OffsetAndMetadata(if (offsetEnd(offsetInfo._1).offset >= plus10Offset) plus10Offset else offsetEnd(offsetInfo._1).offset)
        }
    // resetOffsets类型为Map[TopicAndPartition, OffsetAndMetadata]
    val req = OffsetCommitRequest(groupId, resetOffsets, 0)
    // 发送该请求的方式如下
    val resp = consumer.commitOffsets(req)

    3、GroupCoordinator API

      需要注意的是这个API在Kafka-0.9以后的版本中才提供。指定Consumer Groupoffsets数据保存在某个特定的Broker中。
      向Kafka集群发送一个GroupCoordinatorRequest类型的请求参数,该request对象中只需要指定一个groupId即可。如下所示,

    val req = new GroupCoordinatorRequest(groupId)
    val resp = consumer.send(req)

      获取到的Response对象是GroupCoordinatorResponse类型的,在resp.coordinatorOpt中返回一个BrokerEndpoint对象,可以获取该Broker对应的Id, Ip, Port等信息。

    4、示例

    (1) 运行OffsetFetch API
    (a) 获取kafka_protocol_test的consumer group消费状态
      启动一个console-consumerkafka_protocol_test topic消费messages。需要指定一个特定的group.id参数,如下所示,使用默认的consumer.properties配置文件即可。

    bin/kafka-console-consumer.sh --zookeeper kafka001:2181 --topic kafka_protocol_test --from-beginning --consumer.config ./config/consumer.properties

      运行后,将其停止,查看当前console-consumer的消费状态

    [hadoop@kafka001 kafka]$  bin/kafka-consumer-offset-checker.sh --zookeeper kafka001:2181 --topic kafka_protocol_test --group test-consumer-group
    Group           Topic                          Pid Offset          logSize         Lag             Owner
    test-consumer-group kafka_protocol_test            0   9000            9000            0               none
    test-consumer-group kafka_protocol_test            1   26886           598134          571248          none
    test-consumer-group kafka_protocol_test            2   0               0               0               none
    test-consumer-group kafka_protocol_test            3   18296           91000           72704           none

    (b) 运行OffsetFetch代码,查看运行结果

      运行时仍然传入test-consumer-group,运行结果如下

    Topic: kafka_protocol_test, Partition: 0
        Offset: 9000
    Topic: kafka_protocol_test, Partition: 1
        Offset: 26886
    Topic: kafka_protocol_test, Partition: 2
        Offset: 0
    Topic: kafka_protocol_test, Partition: 3
        Offset: 18296

      对比后发现,两个offset信息保持一致。

    (2)运行OffsetCommit API
      在这里,将OffsetFetch获取到的每个TopicAndPartition对应的Offset10,如果加10后超过其最大Offset,则取最大Offset
      在Commit前后,两次调用OffsetFetch API的代码,前后运行结果如下,
    更新前的offset

    Topic: kafka_protocol_test, Partition: 0
        Offset: 9000
    Topic: kafka_protocol_test, Partition: 1
        Offset: 26886
    Topic: kafka_protocol_test, Partition: 2
        Offset: 0
    Topic: kafka_protocol_test, Partition: 3
        Offset: 18296
    更新后的offset:(partition 0和partition 2没有变化是由于加10后超过了该partition的offset范围最大值)
    Topic: kafka_protocol_test, Partition: 0
        Offset: 9000
    Topic: kafka_protocol_test, Partition: 1
        Offset: 26896
    Topic: kafka_protocol_test, Partition: 2
        Offset: 0
    Topic: kafka_protocol_test, Partition: 3
        Offset: 18306

    (3)运行Group Coordinator API
      传入一个consumer group后,查看其运行结果

    Comsuner Group : test-consumer-group, coordinator broker is:
        id: 1, host: kafka001, port: 9092

    六、Group Membership API

      这个API从Kafka-0.9.0.0版本开始出现。
      在0.9以前的client api中,consumer是要依赖Zookeeper的。因为同一个consumer group中的所有consumer需要进行协同,进行下面所讲的rebalance。但是因为zookeeper的“herd”与“split brain”,导致一个group里面,不同的consumer拥有了同一个partition,进而会引起消息的消费错乱。为此,在0.9中,不再用zookeeper,而是Kafka集群本身来进行consumer之间的同步。下面引自kafka设计的原文:
    https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Failuredetectionprotocol

      相关知识点可以参考Kafka源码深度解析-序列7 -Consumer -coordinator协议与heartbeat实现原理

    七、Administrative API

      注意,这个API也是从Kafka-0.9之后的client版本中才提供。通过这个API可以对Kafka集群进行一些管理方面的操作,比如获取所有的Consumer Groups信息。想要获取集群中所有Consumer Groups信息,需要发送一个ListGroupRequest请求到所有的Brokers节点。
      还可以通过发送一个DescribeGroupsRequest类型的请求对象,获取对特定Consumer Group的描述。

      在Kafka-0.9之后的client中,提供了一个kafka.admin.AdminClient类,调用createSimplePlaintext方法,传入一个broker list字val client = AdminClient.createSimplePlaintext(“kafka001:9092,kafka002:9092,kafka003:9092,kafka004:9092”)AdminClient`提供了很多方法,比如

    def findCoordinator(groupId: String): Node
    def findAllBrokers(): List[Node]
    def listAllGroups(): Map[Node, List[GroupOverview]]
    def listAllConsumerGroups(): Map[Node, List[GroupOverview]]

      等等。

  • 相关阅读:
    C# 反射机制(转)
    asp.net 控件开发(三)处理标签间内容
    WCF学习(三)数据契约1
    个人的一点私事
    Asp.net 控件开发(四) 数据回传
    WCF学习 (四) 数据契约的事件
    wcf学习(一):预览
    WCF学习(二)服务契约
    css3图片3D翻转
    Javascript面向对象(一)(共有方法,私有方法,特权方法)
  • 原文地址:https://www.cnblogs.com/wuyida/p/6300206.html
Copyright © 2011-2022 走看看