zoukankan      html  css  js  c++  java
  • Kafka主题管理

    Kafka对主题的管理通过Kafka-topics.sh来执行,内容如下:

    exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
    

    内部调用了kafka.admin.TopicCommand类

    1.创建主题

    如果配置了参数auto.create.topics.enable为true(默认为true),则遇到未创建主题的时候会按照默认值自动创建主题,所以不建议设置为true

    通用的创建方式是通过kafka-topics.sh脚本来创建,命令如下:

    bin/kafka-topics.sh --bootstrap-server hostname:9092 --create --replication-factor 1 --partitions 1 --topic xxx
    

    2.分区副本的分配

    指为集群指定创建主题时的分区副本分配方案,即在哪个broker中创建哪些分区的副本

    使用kafka-topics.sh脚本创建主题时内部分配逻辑按照机架信息分为两种策略:指定机架信息和未指定机架信息,如果所有broker节点没有配置broker.rack参数,就是未指定机架,具体实现为kafka.admin.AdminUtils中的assignReplicasToBrokersRackUnaware()方法:

    private def assignReplicasToBrokersRackUnaware(nPartitions: Int, //分区数
                                                     replicationFactor: Int, //副本因子
                                                     brokerList: Seq[Int], //broker列表
                                                     fixedStartIndex: Int, //起始索引,默认值为-1
                                                     startPartitionId: Int //起始分区编号): Map[Int, Seq[Int]] = {
        val ret = mutable.Map[Int, Seq[Int]]() //保存分配结果,最后返回该集合
        val brokerArray = brokerList.toArray
        //如果起始索引小于0,根据brokerid列表长度随机生成一个,保证长度有效    
        val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
        //保证起始分区号不小于0,从这里可以看出默认情况下创建主题时总是从编号为0的分区依次轮询分配
        var currentPartitionId = math.max(0, startPartitionId)
        //指定副本间隔,为了将副本均匀分布到不同的broker上面
        var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
        //轮询所有分区,将每个分区的副本分配到不同broker上面
        for (_ <- 0 until nPartitions) {
          if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
            nextReplicaShift += 1
          val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
          val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
          //保存该分区所有副本分配的broker集合
          for (j <- 0 until replicationFactor - 1)
            replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
          //保存该分区所有副本的分配信息
          ret.put(currentPartitionId, replicaBuffer)
          //为下一个分区分配副本
          currentPartitionId += 1
        }
        ret
      }
    

    replicaIndex方法如下:

    private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
        val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
        (firstReplicaIndex + shift) % nBrokers
      }
    

    3.KafkaAdminClient

    基本使用

    KafkaAdminClient继承了org.apache.kafka.clients.admin.AdminClient抽象类

    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.AdminClientConfig;
    import org.apache.kafka.clients.admin.CreateTopicsResult;
    import org.apache.kafka.clients.admin.NewTopic;
    
    import java.util.Collections;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    /**
     * @author: masheng
     * @description: 创建主题
     * @date: 2020/07/28 22:33
     */
    public class CreateTopic {
        private static final String TOPIC = "topic_admin";
        private static final String BROKER_LIST = "localhost:9092";
    
        public static Properties initConfig() {
            Properties properties = new Properties();
            properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
            properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
            return properties;
        }
    
        public static void main(String[] args) {
            Properties properties = initConfig();
            //创建一个KafkaAdminClient实例
            AdminClient client = AdminClient.create(properties);
            NewTopic newTopic = new NewTopic(TOPIC, 4, (short) 1);
            //真正创建主题
            CreateTopicsResult result = client.createTopics(Collections.singleton(newTopic));
            try {
                //等待服务端返回
                result.all().get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            //释放资源
            client.close();
        }
    }
    
  • 相关阅读:
    [转]WebForm中使用MVC
    [转]外贸出口流程图
    [转]查看SQL Server被锁的表以及如何解锁
    [转]RDL Report in Visual Studio New page per Record
    [转]Sql Server Report Service 的部署问题
    [转]ASP.NET MVC4中@model使用多个类型实例的方法
    [转]告别写计划的烦恼!一页纸四步打造出一份牛逼的商业计划
    [转]LINQ: Using INNER JOIN, Group and SUM
    [转] 比特币『私钥』『公钥』『钱包地址』间的关系
    [转]SQL SERVER数据库删除LOG文件和清空日志的方案
  • 原文地址:https://www.cnblogs.com/jordan95225/p/13394356.html
Copyright © 2011-2022 走看看