zoukankan      html  css  js  c++  java
  • Kafka API: TopicMetadata

    Jusfr 原创,转载请注明来自博客园

    TopicMetadataRequest/TopicMetadataResponse

    前文简单说过“Kafka是自描述的”,是指其broker、topic、partition 信息可以通过 TopicMetadata API 获取。

    TopicMetadataRequest 的内容非常简单,是一个包含 TopicName 的数组,TopicMetadataResponse 则告诉使用者 Broker、Topic、Partition 的分布情况。

    使用空数组可以获取完整数据。

    在 Chuye.Kafka 里,使用 Connection/Router 对应的发起一个请求:

        var section = new KafkaConfigurationSection("jusfr.redis", 9092);
        var demoTopics = new String[0];
        var connection = new Router(section);    
        connection.TopicMetadata(demoTopics).Dump("Metadata");
    

    Connection.TopicMetadata() 使用 TopicName 数组作为参数构造了一个 TopicMetadataRequest 实例,将其序列化,发送 KafkaConfigurationSection 指向的主机和端口,读取响应再解析为 TopicMetadataResponse 对象,单机部署的 TopicMetadataResponse 可能有如下结构:

    TopicMetadata

    当 Kafka 服务的启动参数auto.create.topics.enable设置为true的时候,TopicMetadataRequest 传递的 TopicName 不存在时将被自动创建;

    集群模式下 Topic 的自动创建复杂一些,Kafka 携带的 bin/kafka-topics.sh 提供了再多参数。


    Zookeeper

    • 如何使用程序查询、删除 Topic? 如何彻底删除 Topic ?
    • 如何在集群模式下管理 Topic

    源码阅读得知,Kafka 对 TopicMetadataRequest 的响应是通过引用 Zookeeper 来完成的。Zookeeper 在 .Net 上的实现有 ZooKeeperNet, NuGet 上是3.4.6.2 版本。

    Zookeeper 编程又是一大块内容,这里只是略加提及。

    ZooKeeper 的两个方法最重要:GetChildren()GetData(),前者提供了路径查询,后者提供了节点数据获取,可以使用以下代码递归访问:

    void Main() {
    	ZooKeeper zk = new ZooKeeper("jusfr.mac", TimeSpan.FromSeconds(10), null);
    	var paths = zk.GetChildren("/", false).ToArray();
    	foreach (var path in paths) {
    		GetChildren(zk, "/" + path);
    	}
    }
    
    void GetChildren(ZooKeeper zk, String path) {
        var data = zk.GetData(path, null, null); 
    	var paths = zk.GetChildren(path, false).ToArray();
    	if (paths.Length > 0) {
    		foreach (var p in paths) {
    			GetChildren(zk, path + "/" + p);
    		}
    	}
    }
    

    在集群环境下部分响应示例

    topics

    // /brokers/topics/demoTopic1
    {"version":1,"partitions":{"0":[2]}}
    
    
    // /brokers/topics/demoTopic1/partitions/0/state
    {"controller_epoch":1,"leader":2,"version":1,"leader_epoch":0,"isr":[2]} 
    
    // /brokers/ids/1
    {"jmx_port":-1,"timestamp":"1457431238732","endpoints":["PLAINTEXT://jusfr.kafka-1:9093"],"host":"jusfr.kafka-1","version":2,"port":9093} 
    
    

    路径 /brokers/topics 存储了topic 信息,/admin/delete_topics 存储了被删除的 topic,这只是一个标记,由于 Kafka 是基于文件系统的,你需要等待 Kafka 在某个时机真正移除它们。部分参考

    delete_topics

    由于 Kafka 通过 Zookeeper 返回元数据,故任何 Broker 节点都能应答 TopicMetadataRequest 并提供完整响应;

    TopicMeatadata in cluster

    可以看到 demoTopic3 的 PartitionId=0 分区所在 Leader=1,即 Broker NodeId=1 的节点 jusfr.kafka-1:9093 ,PartitionId=1 分区所在 Leader=2,即 Broker NodeId=2 的节点 jusfr.kafka-2:9094。读写 demoTopic3 的分区0 需要连接到主机 jusfr.kafka-1、端口9093,读写 demoTopic3 的分区1 需要连接到主机 jusfr.kafka-2、端口9094,此过程我称为 Broker route。错误的 Broker 访问、不正确的 server.properties 配置可能触发状态码为 UnknownTopicOrPartition 的响应。

    Chuye.Kafka 的 Router 对象从 IRouter 定义,继续自Connection,重写了 Route 方法,内部便是 Partition-Broker 检查逻辑。集群模式下涉及到 Zookeeper 编程,Chuye.Kafka 可能未能给予支持。

    Jusfr 原创,转载请注明来自博客园

  • 相关阅读:
    龙芯地址空间详解
    JS匿名函数 Amy
    JS正则表达式 Amy
    JS对象 Amy
    Java 位图法排序
    Java Final
    JAVA 数组
    Java shuffle 算法
    jQuery object and DOM element
    Javascript 声明时用“var”跟不用"var"的区别
  • 原文地址:https://www.cnblogs.com/Jusfr/p/5257258.html
Copyright © 2011-2022 走看看