zoukankan      html  css  js  c++  java
  • Kafka API: TopicMetadata

    Jusfr 原创,转载请注明来自博客园

    TopicMetadataRequest/TopicMetadataResponse

    前文简单说过“Kafka是自描述的”,是指其broker、topic、partition 信息可以通过 TopicMetadata API 获取。

    TopicMetadataRequest 的内容非常简单,是一个包含 TopicName 的数组,TopicMetadataResponse 则告诉使用者 Broker、Topic、Partition 的分布情况。

    使用空数组可以获取完整数据。

    在 Chuye.Kafka 里,使用 Connection/Router 对应的发起一个请求:

        var section = new KafkaConfigurationSection("jusfr.redis", 9092);
        var demoTopics = new String[0];
        var connection = new Router(section);    
        connection.TopicMetadata(demoTopics).Dump("Metadata");
    

    Connection.TopicMetadata() 使用 TopicName 数组作为参数构造了一个 TopicMetadataRequest 实例,将其序列化,发送 KafkaConfigurationSection 指向的主机和端口,读取响应再解析为 TopicMetadataResponse 对象,单机部署的 TopicMetadataResponse 可能有如下结构:

    TopicMetadata

    当 Kafka 服务的启动参数auto.create.topics.enable设置为true的时候,TopicMetadataRequest 传递的 TopicName 不存在时将被自动创建;

    集群模式下 Topic 的自动创建复杂一些,Kafka 携带的 bin/kafka-topics.sh 提供了再多参数。


    Zookeeper

    • 如何使用程序查询、删除 Topic? 如何彻底删除 Topic ?
    • 如何在集群模式下管理 Topic

    源码阅读得知,Kafka 对 TopicMetadataRequest 的响应是通过引用 Zookeeper 来完成的。Zookeeper 在 .Net 上的实现有 ZooKeeperNet, NuGet 上是3.4.6.2 版本。

    Zookeeper 编程又是一大块内容,这里只是略加提及。

    ZooKeeper 的两个方法最重要:GetChildren()GetData(),前者提供了路径查询,后者提供了节点数据获取,可以使用以下代码递归访问:

    void Main() {
    	ZooKeeper zk = new ZooKeeper("jusfr.mac", TimeSpan.FromSeconds(10), null);
    	var paths = zk.GetChildren("/", false).ToArray();
    	foreach (var path in paths) {
    		GetChildren(zk, "/" + path);
    	}
    }
    
    void GetChildren(ZooKeeper zk, String path) {
        var data = zk.GetData(path, null, null); 
    	var paths = zk.GetChildren(path, false).ToArray();
    	if (paths.Length > 0) {
    		foreach (var p in paths) {
    			GetChildren(zk, path + "/" + p);
    		}
    	}
    }
    

    在集群环境下部分响应示例

    topics

    // /brokers/topics/demoTopic1
    {"version":1,"partitions":{"0":[2]}}
    
    
    // /brokers/topics/demoTopic1/partitions/0/state
    {"controller_epoch":1,"leader":2,"version":1,"leader_epoch":0,"isr":[2]} 
    
    // /brokers/ids/1
    {"jmx_port":-1,"timestamp":"1457431238732","endpoints":["PLAINTEXT://jusfr.kafka-1:9093"],"host":"jusfr.kafka-1","version":2,"port":9093} 
    
    

    路径 /brokers/topics 存储了topic 信息,/admin/delete_topics 存储了被删除的 topic,这只是一个标记,由于 Kafka 是基于文件系统的,你需要等待 Kafka 在某个时机真正移除它们。部分参考

    delete_topics

    由于 Kafka 通过 Zookeeper 返回元数据,故任何 Broker 节点都能应答 TopicMetadataRequest 并提供完整响应;

    TopicMeatadata in cluster

    可以看到 demoTopic3 的 PartitionId=0 分区所在 Leader=1,即 Broker NodeId=1 的节点 jusfr.kafka-1:9093 ,PartitionId=1 分区所在 Leader=2,即 Broker NodeId=2 的节点 jusfr.kafka-2:9094。读写 demoTopic3 的分区0 需要连接到主机 jusfr.kafka-1、端口9093,读写 demoTopic3 的分区1 需要连接到主机 jusfr.kafka-2、端口9094,此过程我称为 Broker route。错误的 Broker 访问、不正确的 server.properties 配置可能触发状态码为 UnknownTopicOrPartition 的响应。

    Chuye.Kafka 的 Router 对象从 IRouter 定义,继续自Connection,重写了 Route 方法,内部便是 Partition-Broker 检查逻辑。集群模式下涉及到 Zookeeper 编程,Chuye.Kafka 可能未能给予支持。

    Jusfr 原创,转载请注明来自博客园

  • 相关阅读:
    Verilog非阻塞赋值的仿真/综合问题 (Nonblocking Assignments in Verilog Synthesis)上
    异步FIFO结构及FPGA设计 跨时钟域设计
    FPGA管脚分配需要考虑的因素
    An Introduction to Delta Sigma Converters (DeltaSigma转换器 上篇)
    An Introduction to Delta Sigma Converters (DeltaSigma转换器 下篇)
    中国通信简史 (下)
    谈谈德国大学的电子专业
    中国通信简史 (上)
    Verilog学习笔记
    Verilog非阻塞赋值的仿真/综合问题(Nonblocking Assignments in Verilog Synthesis) 下
  • 原文地址:https://www.cnblogs.com/Jusfr/p/5257258.html
Copyright © 2011-2022 走看看