zoukankan      html  css  js  c++  java
  • Kafka:主题管理

    主题的管理

    主题的管理包括创建主题、查看主题信息、修改主题、删除主题等操作,可以通过kafka的kafka-topics.sh脚本来执行这些操作,这个脚本在$KAFKA_HOME/bin/目录下。

    该脚本正文仅有一行:

    #!/bin/bash
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
    

    实际上调用的是kafka.admin.TopicCommand类来执行主题管理的操作。

    创建主题

    如果broker端配置了auto.create.topics.enable设置为true。那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为num.partitions(默认为1)、副本因子为default.replication.factor(默认为1)的主题。此外,当一个消费者从未知主题读取消息时,或当任意一个客户端向未知主题发送元数据请求时,都会按照配置参数num.partitionsdefault.replication.factor的值创建一个相应的主题。但是不建议将auto.create.topics.enable设置为true,这回增加主题的管理和维护的难度。

    使用kafka-topics.sh脚本一个分区数为4,副本因子为1的主题:

    bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic topic-create --partitions 4 --replication-factor 1
    

    执行完成后,kafka会在log.dir参数所配置目录下创建对应的主题分区。

    ls -al logs | grep topic-create
    

    image-20210209092810943

    创建了四个文件夹,对应topic-create主题的四个分区编号,命名方式概括为<topic>-<partition>。这里的文件夹对应的不是分区,分区是一个逻辑概念而没有物理存在。

    主题、分区、副本和日志关系如下图:

    image-20210209093714473

    主题和分区都是提供给上层用户的抽象,而在副本层面或更确切地说是Log层面才有实际物理上的存在。同一个分区中的多个副本必须分布在不同的broker中,这样才能提供有效的数据冗余。

    此外我们还可以从zookeeper客户端中获取。当创建一个主题时,会在/brokers/topics/目录下创建一个同名节点,该节点记录了该主题的分区副本分配方案:

    [zk: localhost:2181(CONNECTED) 0] get /brokers/topics/topic-create
    

    image-20210209094436703

    图中的"1":[0]表示分区1分配一个副本,在brokerId为0的broker节点中。

    使用--describe查看分区副本的分配细节:

    bin/kafka-topics.sh --zookeeper 192.168.1.51:2181 --describe --topic topic-create
    

    image-20210209095059600

    此外还可以通过replica-assignmemt参数来手动指定分区副本的分配方案:这种方式根据分区号的数值大小按照从小到大的顺序进行排列,分区与分区之间用逗号","隔开,分区内多个副本用":"隔开,并且在使用replica-assignment参数创建主题时不需要原本必备的partitions和replication-factor这两个参数。

    --replica-assignmemt <String:

    broker_id_for_part1_replica1 : broker_id_for_part1_replica2 : ...,

    broker_id_for_part2_replica1 : broker_id_for_part2_replica2 : ...,

    ...

    >

    下面演示创建一个与主题topic-create相同的分配方案的主题topic-create-assign:

    bin/kafka-topics.sh --zookeeper 192.168.1.51:2181 --create --topic topic-create-assign --replica-assignment 0,0,0,0
    
    • 注意同一个分区内的副本不能有重复,比如指定了0:0,1:1这种,就会报出异常。

    • 分区之间指定的副本数不同,比如0:1,0,1:0这种,也会报出异常。

    • 这种0:1,,0:1,1:0,企图跳过一个分区的行为也是不允许的。

    创建主题时,我们还可以通过config参数来设置所要创建主题的相关参数,以覆盖默认配置

    --config <String:name1=value1> --config <String:name2=value2>
    

    默认情况下,如果创建一个已存在的主题,会直接抛出异常。但是可以添加一个参数--if-not-exists,那么发生命名冲突后将不做任何处理(不报错,也不创建主题),如果没有发生命名冲突,那么和不带--if-not-exists参数的行为一样正常创建主题。

    注意:使用kafka-topics.sh创建主题时还会检测是否包含"."或""字符。kafka的内部做埋点时会根据主题的名称来命名metrics的名称,并且会将"."改成下划线"",如果先命名了topic_1.2,再次创建topic.1_2主题就会抛出InvalidTopicException异常,如下图

    image-20210209104711942

    kafka从0.10.x版本开始支持broker的机架信息,通过broker端broker.rack参数配置,如果指定了机架信息,则在分区副本分配时会尽可能让分区副本分配到不同的机架上。

    如果一个集群中有部分broker指定了机架信息,并且其余broker没有指定broker机架信息,会直接抛出异常。这时想要创建主题,要么集群所有broker都加上机架信息或者都去掉机架信息,要么使用--disable-rack-aware参数来忽略机架信息。

    代码创建主题(可能缺少一部分依赖,自己补全就行):

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>2.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.9.5</version>
    </dependency>
    
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.9.5</version>
    </dependency>
    

    java的代码:

        public static void main(String[] args) {
            String[] options = new String[]{
                    "--zookeeper", "192.168.1.51:2181",
                    "--create",
                    "--topic", "topicCommand-test",
                    "--partitions", "2",
                    "--replication-factor", "1"
            };
            TopicCommand.main(options);
        }
    

    image-20210209111128113

    查看主题

    kafka-topics.sh脚本有5中指令类型:create、list、describe、alter、delete。

    list和describe可以用来方便查看主题信息。

    通过list指令可以查看当前所有可用的主题

    bin/kafka-topics.sh --zookeeper localhost:2181 -list
    

    通过describe指令查看单个(多个,全部)主题信息

    #单个
    bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topicName
    #多个
    bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topicName1,topicName
    #全部
    bin/kafka-topics.sh --zookeeper localhost:2181 --describe 
    

    使用describe指令查看主题信息时还可以额外指定参数增加一些附加功能

    • --topics-with-overrides 找出所有包含覆盖配置的主题
    • --under-replicated-partitions 可以找出所有包含失效副本的分区
    • --unavailable-partitions 可以查看主题中没有leader副本的分区

    修改主题

    当主题被创建之后,依然允许我们对其做一定修改,例如修改分区个数、修改配置等。使用alter指令。

    增加主题的分区数:

    bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic wj --partitions 3
    

    image-20210209161510889

    修改成功的警告信息:当主题中的消息包含key时,根据key计算分区的行为就会收到影响。所以增加分区需要三思而后行,建议一开始就设置好分区数。

    如果修改的主题不存在,可以使用if-exists参数来忽略异常。

    kafka不支持减少分区数,会直接抛出异常。

    alter指令配合--config参数可以达到修改主题配置值

    bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic wj --config max.message.bytes=20000
    

    image-20210209162505742

    我们可以通过--delete-config参数来删除之前覆盖的配置,使其恢复原有的默认值

    bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic wj --delete-config max.message.bytes
    

    image-20210209162723230

    配置管理

    kafka-configs.sh脚本是专门用来对配置进行操作的(推荐使用,不推荐使用kafka-topics.sh,已过时)。kafka-configs.sh脚本包含变更配置alter和查看配置describe两种指令类型。此外,该脚本还支持操作broker、用户和客户端的配置。

    该脚本使用--entity-type参数来指定操作配置的类型,并使用--entity-name来指定操作配置的名称。例如查看主题wj配置:

    bin/kafka-configs.sh --bootstrap-server 192.168.1.51:9092 --describe --entity-type topics --entity-name wj
    

    entity-type只可以配置4个值:topics,brokers,clients,users

    entity-type和entity-name的对应关系如下表:

    entity-type entity-name
    topics 指定主题的名称
    brokers 指定brokerId值,broker中的broker.id值
    clients 指定clientId值,即Product或Consumer的client.id值
    users 指定用户名

    使用alter指令变更配置时,需要配合--add-config--delete-config这两个参数一起使用。--add-config用来实现配置的增改,即覆盖原有配置;--delete-config用于删除配置,恢复默认值。

    bin/kafka-configs.sh --bootstrap-server 192.168.1.51:9092 --alter --entity-type topics --entity-name wj --add-config k1=v1,k2=v2
    bin/kafka-configs.sh --bootstrap-server 192.168.1.51:9092 --alter --entity-type topics --entity-name wj --delete-config k1,k2
    

    删除主题

    kafka-topics.sh脚本中的delete指令可以用于删除主题。

    bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topicName
    

    image-20210209165752123

    我们直接执行删除指令,控制台提示必须将delete.topic.enable设置为true才能删除主题,这个参数默认值就是true,如果为false,那么删除主题的操作将会被忽略。

    如果删除的主体是kafka的内部主题,删除会直接报错。

    删除一个不存在主体也会报错。同alter指令一样,设置--if-exists参数可以忽略异常。

    删除主体是一个不可逆的过程,一旦删除,与其相关的所以消息都会被删除

    KafkaAdminClient

    前面我们用TopicCommand创建了一个主体,当然我们可以用它实现主体的删除、修改、查看等操作,实质上与使用kafka-config.sh脚本的方式无异,交互性非常差。

    所以kafka提供了KafkaAdminClient作为替代方案,继承于AdminClient,方法都是见名知意的。

    下面演示少部分用法:

    1.创建一个主题

    String brokerList = "192.168.1.51:9092";
    String topic = "topic-admin-test";
    
    Properties prop = new Properties();
    prop.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
    prop.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
    //构建KafkaAdminClient实例
    AdminClient client = AdminClient.create(prop);
    //设定所创建主题的具体信息
    NewTopic newTopic = new NewTopic(topic, 4, (short) 1);
    //创建主题
    CreateTopicsResult result = client.createTopics(Collections.singleton(newTopic));
    try {
        //等待服务端返回
        result.all().get();
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
    client.close();//释放资源
    

    2.查看主题配置

    private static String brokerList = "192.168.1.51:9092";
    private static String topic = "topic-admin-test";
    
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties prop = new Properties();
        prop.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        prop.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        //构建KafkaAdminClient实例
        AdminClient client = AdminClient.create(prop);
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
        DescribeConfigsResult result = client.describeConfigs(Collections.singleton(resource));
        Config config = result.all().get().get(resource);
        System.out.println(config);
        client.close();
    }
    

    最终的输出结果不会只列出被覆盖的配置信息,而是会列出主题中所有的配置信息。

    3.增加分区

        private static String brokerList = "192.168.1.51:9092";
        private static String topic = "topic-admin-test";
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            Properties prop = new Properties();
            prop.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
            prop.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
            //构建KafkaAdminClient实例
            AdminClient client = AdminClient.create(prop);
            NewPartitions newPartitions = NewPartitions.increaseTo(5);
            Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
            newPartitionsMap.put(topic, newPartitions);
            CreatePartitionsResult result = client.createPartitions(newPartitionsMap);
            result.all().get();
            client.close();
        }
    
  • 相关阅读:
    【LeetCode】【动态规划】Edit Distance
    【LeetCode】最大子阵列 Maximum Subarray(贪婪&分治)
    【LeetCode】【矩阵旋转】Rotate Image
    解决Torch.load()错误信息: UnicodeDecodeError: 'ascii' codec can't decode byte 0x8d in position 0: ordinal not in range(128)
    使用VS Code配合Remote Development插件连接远程服务器(Mac/Linux+Windows) | Using VS Code with Remote Development Connect to Remote Server (Mac/Linux+Windows)
    Leaflet入门:添加点线面并导入GeoJSON数据|Tutorial of Leaflet: Adding Points, Lines, Polygons and Import GeoJSON File
    使用Adobe Illustrator + ArcGIS绘制地图 | Map Design Using ArcGIS + Adobe Illustrator
    PostgreSQL 速查、备忘手册 | PostgreSQL Quick Find and Tutorial
    LIRE教程之源码分析 | LIRE Tutorial of Analysis of the Source Code
    解决Tomcat错误信息:No 'Access-Control-Allow-Origin' header is present on the requested resource | Solving Tomcat Error: No 'Access-Control-Allow-Origin' header is present on the requested resource
  • 原文地址:https://www.cnblogs.com/wwjj4811/p/14393906.html
Copyright © 2011-2022 走看看