zoukankan      html  css  js  c++  java
  • Kafka 如何读取offset topic内容 (__consumer_offsets)

      众所周知,由于Zookeeper并不适合大批量的频繁写入操作,新版Kafka已推荐将consumer的位移信息保存在Kafka内部的topic中,即__consumer_offsets topic,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。

      不过依然有很多用户希望了解__consumer_offsets topic内部到底保存了什么信息,特别是想查询某些consumer group的位移是如何在该topic中保存的。针对这些问题,本文将结合一个实例探讨如何使用kafka-simple-consumer-shell脚本来查询该内部topic。

    1. 创建topic “test”

    bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 3 --partitions 3

    2. 使用kafka-console-producer.sh脚本生产消息

      由于默认没有指定key,所以根据round-robin方式,消息分布到不同的分区上。 (本例中生产了64条消息)

    3. 验证消息生产成功

    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test --time -1

    结果输出表明64条消息全部生产成功!

    test:2:21

    test:1:21

    test:0:22

    4. 创建一个console consumer group

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic test --from-beginning --new-consumer

    5. 获取该consumer group的group id(后面需要根据该id查询它的位移信息)

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --list --new-consumer

    输出: console-consumer-46965  (记住这个id!)

    6. 查询__consumer_offsets topic所有内容

    注意:运行下面命令前先要在consumer.properties中设置exclude.internal.topics=false

    0.11.0.0之前版本

    bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

    0.11.0.0之后版本(含)

    bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

    默认情况下__consumer_offsets有50个分区,如果你的系统中consumer group也很多的话,那么这个命令的输出结果会很多。

    7. 计算指定consumer group在__consumer_offsets topic中分区信息

    这时候就用到了第5步获取的group.id(本例中是console-consumer-46965)。Kafka会使用下面公式计算该group位移保存在__consumer_offsets的哪个分区上:

    Math.abs(groupID.hashCode()) % numPartitions

    所以在本例中,对应的分区=Math.abs("console-consumer-46965".hashCode()) % 50 = 11,即__consumer_offsets的分区11保存了这个consumer group的位移信息,下面让我们验证一下。

    8. 获取指定consumer group的位移信息 

    0.11.0.0版本之前

    bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager$OffsetsMessageFormatter"

    0.11.0.0版本以后(含)

    bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"

    下面是输出结果:

    ...
    [console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092279434,ExpirationTime 1479178679434]
    [console-consumer-46965,test,1]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246]
    [console-consumer-46965,test,0]::[OffsetMetadata[22,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246]
    [console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246]
    [console-consumer-46965,test,1]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436]
    [console-consumer-46965,test,0]::[OffsetMetadata[22,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436]
    [console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436]
     ...

      上图可见,该consumer group果然保存在分区11上,且位移信息都是对的(这里的位移信息是已消费的位移,严格来说不是第3步中的位移。由于我的consumer已经消费完了所有的消息,所以这里的位移与第3步中的位移相同)。另外,可以看到__consumer_offsets topic的每一日志项的格式都是:[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]

     

      okay,写到此你应该已经知道如何查询__consumer_offsets topic的内容了吧。希望本文对你有所帮助。(Kafka当然还提供了Java APIs用于查询,具体使用方法不在这里赘述了,有兴趣的可以看这里。)

  • 相关阅读:
    How to Create a site at the specified URL and new database (CommandLine Operation)
    Using Wppackager to Package and Deploy Web Parts for Microsoft SharePoint Products and Technologies
    SQL Server Monitor v0.5 [Free tool]
    How to build Web Part
    Deploy web part in a virtual server by developing a Web Part Package file(.cab)
    How to recreate "sites" link if you delete it accidentally
    SharePoint Portal Server管理匿名访问设置
    Monitor sql connection from .Net SqlClient Data Provider
    Brief installation instruction of Sharepoint Portal Server
    How to Use SharePoint Alternate URL Access
  • 原文地址:https://www.cnblogs.com/huxi2b/p/6061110.html
Copyright © 2011-2022 走看看