zoukankan      html  css  js  c++  java
  • Kafka 如何读取offset topic内容 (__consumer_offsets)

      众所周知,由于Zookeeper并不适合大批量的频繁写入操作,新版Kafka已推荐将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets topic,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。

      不过依然有很多用户希望了解__consumer_offsets topic内部到底保存了什么信息,特别是想查询某些consumer group的位移是如何在该topic中保存的。针对这些问题,本文将结合一个实例探讨如何使用kafka-simple-consumer-shell脚本来查询该内部topic。

    1. 创建topic “test”

    bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 3 --partitions 3

    2. 使用kafka-console-producer.sh脚本生产消息

      由于默认没有指定key,所以根据round-robin方式,消息分布到不同的分区上。 (本例中生产了64条消息)

    3. 验证消息生产成功

    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test --time -1

    结果输出表明64条消息全部生产成功!

    test:2:21

    test:1:21

    test:0:22

    4. 创建一个console consumer group

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic test --from-beginning --new-consumer

    5. 获取该consumer group的group id(后面需要根据该id查询它的位移信息)

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --list --new-consumer

    输出: console-consumer-46965  (记住这个id!)

    6. 查询__consumer_offsets topic所有内容

    注意:运行下面命令前先要在consumer.properties中设置exclude.internal.topics=false

    0.11.0.0之前版本

    bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

    0.11.0.0之后版本(含)

    bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

    默认情况下__consumer_offsets有50个分区,如果你的系统中consumer group也很多的话,那么这个命令的输出结果会很多。

    7. 计算指定consumer group在__consumer_offsets topic中分区信息

    这时候就用到了第5步获取的group.id(本例中是console-consumer-46965)。Kafka会使用下面公式计算该group位移保存在__consumer_offsets的哪个分区上:

    Math.abs(groupID.hashCode()) % numPartitions

    所以在本例中,对应的分区=Math.abs("console-consumer-46965".hashCode()) % 50 = 11,即__consumer_offsets的分区11保存了这个consumer group的位移信息,下面让我们验证一下。

    8. 获取指定consumer group的位移信息 

    0.11.0.0版本之前

    bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter"

    0.11.0.0版本以后(含)

    bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"

    下面是输出结果:

    ...
    [console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092279434,ExpirationTime 1479178679434]
    [console-consumer-46965,test,1]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246]
    [console-consumer-46965,test,0]::[OffsetMetadata[22,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246]
    [console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246]
    [console-consumer-46965,test,1]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436]
    [console-consumer-46965,test,0]::[OffsetMetadata[22,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436]
    [console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436]
     ...

      上图可见,该consumer group果然保存在分区11上,且位移信息都是对的(这里的位移信息是已消费的位移,严格来说不是第3步中的位移。由于我的consumer已经消费完了所有的消息,所以这里的位移与第3步中的位移相同)。另外,可以看到__consumer_offsets topic的每一日志项的格式都是:[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]

     

      okay,写到此你应该已经知道如何查询__consumer_offsets topic的内容了吧。希望本文对你有所帮助。(Kafka当然还提供了Java APIs用于查询,具体使用方法不在这里赘述了,有兴趣的可以看这里。)

  • 相关阅读:
    AcWing356 次小生成树(lca)
    牛客 Rinne Loves Edges(dp)
    软件的生命周期和测试流程
    软件测试的学习经历回顾-第一天
    java List集合
    c#Socket通信
    c#线程2
    c#线程1
    c#Linq联合查询
    c#拓展方法
  • 原文地址:https://www.cnblogs.com/huxi2b/p/6061110.html
Copyright © 2011-2022 走看看