zoukankan      html  css  js  c++  java
  • Kafka 核心 API ==> AdminClient

    一、Kafka 核心 API

      上文中对 Kafka 做了一些简单的介绍,那么在开发过程中我们如何去访问 Kafka 呢?这就需要使用到本文将要介绍的Kafka客户端API。下图是官方文档中的一个图,形象的描述了能与 Kafka集成的客户端类型

    Kafka的五类客户端API类型如下:

    • AdminClient API:允许管理和检测Topic、broker以及其他Kafka实例,与Kafka自带的脚本命令作用类似
    • Producer API:发布消息到1个或多个Topic,也就是生产者或者说发布方需要用到的API
    • Consumer API:订阅1个或多个Topic,并处理产生的消息,也就是消费者或者说订阅方需要用到的API
    • Stream API:高效地将输入流转换到输出流,通常应用在一些流处理场景
    • Connector API:从一些源系统或应用程序拉取数据到Kafka,如上图中的DB

    本文中,我们将主要介绍 AdminClient API。

    二、AdminClient API

    显然,操作AdminClient API的前提是需要创建一个 AdminClient 实例。代码示例:

    /**
     * 创建AdminClient客户端对象
     */
    public static AdminClient createAdminClientByProperties() {
    
      Properties prop = new Properties();
    
      // 配置Kafka服务的访问地址及端口号
      prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.182.128:9092");
    
      // 创建AdminClient实例
      return AdminClient.create(prop);
    }
    
    /**
     * 创建AdminClient的第二种方式
     */
    public static AdminClient createAdminClientByMap(){
    
      Map<String, Object> conf = Maps.newHashMap();
    
      // 配置Kafka服务的访问地址及端口号
      conf.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.182.128:9092");
    
      // 创建AdminClient实例
      return AdminClient.create(conf);
    }

    创建了 AdminClient 的实例对象后,我们就可以通过它提供的方法操作 Kafka,常用的方法如下:

    方法名称 作用
    createTopics 创建一个或多个Topic
    listTopics  查询Topic列表 
    deleteTopics  删除一个或多个Topic 
    describeTopics  查询Topic的描述信息 
    describeConfigs  查询Topic、Broker等的所有配置项信息 
    alterConfigs  用于修改Topic、Broker等的配置项信息(该方法在新版本中被标记为已过期)
    incrementalAlterConfigs  同样也是用于修改Topic、Broker等的配置项信息,但功能更多、更灵活,用于代替alterConfigs 
    createPartitions  用于调整Topic的Partition数量,只能增加不能减少或删除,也就是说新设置的Partition数量必须大于等于之前的Partition数量 

    Tips:

    • describeTopics describeConfigs 的意义主要是在监控上,很多用于监控Kafka的组件都会使用到这两个API,因为通过这两个API可以获取到Topic自身和周边的详细信息

    三、创建 Topic

    使用createTopics方法可以创建Topic,传入的参数也与kafka-topics.sh命令脚本的参数一样。代码示例: 

    /**
     * 创建一个或多个topic
     *
     * @param topicNames topic名称的集合
     */
    public static void createTopic(List<String> topicNames) throws Exception {
      // 创建AdminClient客户端对象
      AdminClient adminClient = BuildAdminClient.createAdminClientByMap();
    
      List<NewTopic> topicList = Lists.newArrayList();
      /**
       * 定义topic信息
       * String name                topic名
       * int numPartitions          分区数
       * short replicationFactor    副本数,必须不能大于broker数量
       */
      topicNames.forEach(topicName -> topicList.add(
              new NewTopic(topicName, 1, Short.parseShort("1"))));
    
      // 创建topic
      CreateTopicsResult result = adminClient.createTopics(topicList);
    
      // get方法是一个阻塞方法,一定要等到createTopics完成之后才进行下一步操作
      result.all().get();
    
      // 打印新创建的topic名
      result.values().forEach((name, future) -> System.out.println("topicName:" + name));
    
      // 关闭资源
      adminClient.close();
    }

    四、删除 Topic

    deleteTopics 方法可以删除一个或多个Topic,代码示例:

    /**
     * 删除一个或多个topic
     *
     * @param topicNames topic名称的集合
     */
    public static void removeTopic(List<String> topicNames) throws Exception {
    
        // 创建AdminClient客户端对象
        AdminClient adminClient = BuildAdminClient.createAdminClientByProperties();
    
        // 删除topic集合
        DeleteTopicsResult result = adminClient.deleteTopics(topicNames);
    
        // get方法是一个阻塞方法,一定要等到deleteTopics完成之后才进行下一步操作
        result.all().get();
    
        // 关闭资源
        adminClient.close();
    }

    五、查看 Topics 列表

    listTopics 方法用于查询Topic列表,通过传入 ListTopicsOptions 参数可以设置一些可选项。代码示例:

    /**
     * 获取所有的topic信息,包括Kafka内部的topic
     * 如:__consumer_offsets,internal=true
     */
    public static void listTopicsWithOptions() throws Exception {
        // 创建AdminClient客户端对象
        AdminClient adminClient = BuildAdminClient.createAdminClientByProperties();
    
        ListTopicsOptions options = new ListTopicsOptions();
        // 列出内部的Topic
        options.listInternal(true);
    
        // 列出所有的topic
        ListTopicsResult result = adminClient.listTopics(options);
    
        // 获取所有topic的名字,返回的是一个Set集合
        Set<String> topicNames = result.names().get();
    
        // 打印所有topic的名字
        topicNames.forEach(System.out::println);
    
        // 获取所有topic的信息,返回的是一个Collection集合
        // (name=hello-kafka, internal=false),internal代表是否为内部的topic
        Collection<TopicListing> topicListings = result.listings().get();
    
        // 打印所有topic的信息
        topicListings.forEach(System.out::println);
    
        // 关闭资源
        adminClient.close();
    }

    六、查看 Topic 的描述信息

    一个 Topic 会有自身的描述信息,例如:partition 的数量,副本集的数量,是否为 internal 等等。AdminClient 中提供了 describeTopics 方法来查询这些描述信息。代码示例:

    /**
     * 获取topic的描述信息
     * 
     * topic name = a-topic, desc = (name=a-topic, internal=false, partitions=(partition=0, leader=192.168.182.128:9092 (id: 0 rack: null), replicas=192.168.182.128:9092 (id: 0 rack: null), isr=192.168.182.128:9092 (id: 0 rack: null)), authorizedOperations=null)
     * topic name = b-topic, desc = (name=b-topic, internal=false, partitions=(partition=0, leader=192.168.182.128:9092 (id: 0 rack: null), replicas=192.168.182.128:9092 (id: 0 rack: null), isr=192.168.182.128:9092 (id: 0 rack: null)), authorizedOperations=null)
     */
    public static void describeTopics(List<String> topics) throws Exception {
        // 创建AdminClient客户端对象
        AdminClient adminClient = BuildAdminClient.createAdminClientByProperties();
    
        // 获取Topic的描述信息
        DescribeTopicsResult result = adminClient.describeTopics(topics);
    
        // 解析描述信息结果, Map<String, TopicDescription> ==> topicName:topicDescription
        Map<String, TopicDescription> topicDescriptionMap = result.all().get();
        topicDescriptionMap.forEach((topicName, description) -> System.out.printf("topic name = %s, desc = %s 
    ", topicName, description));
    
        // 关闭资源
        adminClient.close();
    }

    七、查看 Topic 的配置信息

    除了Kafka自身的配置项外,其内部的Topic也会有非常多的配置项,我们可以通过describeConfigs方法来获取某个Topic中的配置项信息。代码示例:

    /**
     * 获取topic的配置信息
     */
    public static void describeConfigTopics(List<String> topicNames) throws Exception {
        // 创建AdminClient客户端对象
        AdminClient adminClient = BuildAdminClient.createAdminClientByMap();
    
        List<ConfigResource> configResources = Lists.newArrayListWithCapacity(64);
        topicNames.forEach(topicName -> configResources.add(
                // 指定要获取的源
                new ConfigResource(ConfigResource.Type.TOPIC, topicName)));
    
        // 获取topic的配置信息
        DescribeConfigsResult result = adminClient.describeConfigs(configResources);
    
        // 解析topic的配置信息
        Map<ConfigResource, Config> resourceConfigMap = result.all().get();
        resourceConfigMap.forEach((configResource, config) -> System.out.printf("topic config ConfigResource = %s, Config = %s 
    ", configResource, config));
    
        // 关闭资源
        adminClient.close();
    }

    八、修改 Topic 的分区数量

    在创建Topic时我们需要设定Partition的数量,但如果觉得初始设置的Partition数量太少了,那么就可以使用createPartitions方法来调整Topic的Partition数量,但是需要注意在Kafka中Partition只能增加不能减少。代码示例:
    /**
     * 修改topic的分区数量
     * 只能增加不能减少
     */
    public static void updateTopicPartition(List<String> topicNames, Integer partitionNum) throws Exception {
        // 创建AdminClient客户端对象
        AdminClient adminClient = BuildAdminClient.createAdminClientByMap();
    
        // 构建修改分区的topic请求参数
        Map<String, NewPartitions> newPartitions = Maps.newHashMap();
        topicNames.forEach(topicName -> newPartitions.put(topicName, NewPartitions.increaseTo(partitionNum)));
    
        // 执行修改
        CreatePartitionsResult result = adminClient.createPartitions(newPartitions);
    
        // get方法是一个阻塞方法,一定要等到createPartitions完成之后才进行下一步操作
        result.all().get();
    
        // 关闭资源
        adminClient.close();
    }

     Tips:

    • Partition的索引从0开始,所以第一个partition=0,第二个partition=1

    九、修改 Topic 配置信息

    除了可以查看Topic的配置项信息外,AdminClient还提供了相关方法来修改Topic配置项的值。在早期版本中,使用alterConfigs方法来修改配置项。代码示例:

    /**
     * 修改topic的配置信息
     * 使用旧版api:alterConfigs
     */
    public static void updateTopicConfigOld(List<String> topicNames) throws Exception {
        // 创建AdminClient客户端对象
        AdminClient adminClient = BuildAdminClient.createAdminClientByMap();
    
        List<ConfigResource> configResources = Lists.newArrayListWithCapacity(64);
        // 指定要修改的ConfigResource类型及名称
        topicNames.forEach(topicName -> configResources.add(new ConfigResource(ConfigResource.Type.TOPIC, topicName)));
    
        // 建立修改的配置项,配置项以ConfigEntry形式存在
        Config config = new Config(Collections.singletonList(new ConfigEntry("preallocate", "true")));
    
        // 参数构造
        Map<ConfigResource, Config> configMap = Maps.newHashMap();
        configResources.forEach(configResource -> configMap.put(configResource, config));
    
        // 修改topic 配置,用的是老api,已经过时
        AlterConfigsResult result = adminClient.alterConfigs(configMap);
    
        // get方法是一个阻塞方法,一定要等到alterConfigs完成之后才进行下一步操作
        result.all().get();
    
        // 关闭资源
        adminClient.close();
    }

    执行以上代码,成功将 topic 的配置项 preallocate 的值改为了 true。

    在新版本中则是使用 incrementalAlterConfigs 方法来修改Topic的配置项,该方法使用起来相对于 alterConfigs 要略微复杂一些,但因此功能更多、更灵活。代码示例:

    /**
     * 修改topic的配置信息
     * 使用新版api:incrementalAlterConfigs
     */
    public static void updateTopicConfigNew(List<String> topicNames) throws Exception {
        // 创建AdminClient客户端对象
        AdminClient adminClient = BuildAdminClient.createAdminClientByMap();
    
        List<ConfigResource> configResources = Lists.newArrayListWithCapacity(64);
        // 指定要修改的ConfigResource类型及名称
        topicNames.forEach(topicName -> configResources.add(new ConfigResource(ConfigResource.Type.TOPIC, topicName)));
    
        // 配置项同样以ConfigEntry形式存在,只不过增加了操作类型
        // 以及能够支持操作多个配置项,相对来说功能更多、更灵活
        Collection<AlterConfigOp> configs = Lists.newArrayList(
                new AlterConfigOp(new ConfigEntry("preallocate", "false"), AlterConfigOp.OpType.SET),
                new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0.5"), AlterConfigOp.OpType.SET),
                new AlterConfigOp(new ConfigEntry("unclean.leader.election.enable", "false"), AlterConfigOp.OpType.SET));
        // 参数构造
        Map<ConfigResource, Collection<AlterConfigOp>> configMaps = Maps.newHashMap();
        configResources.forEach(configResource -> configMaps.put(configResource, configs));
    
        // 下面这个是新api.但是有些麻烦
        // 在某些版本中,incrementalAlterConfigs方法可能会存在些问题,对单实例的Kafka支持得不是很好,会出现无法成功修改配置项的情况
        AlterConfigsResult result = adminClient.incrementalAlterConfigs(configMaps);
    
        // get方法是一个阻塞方法,一定要等到incrementalAlterConfigs完成之后才进行下一步操作
        result.all().get();
    
        // 关闭资源
        adminClient.close();
    }
    Tips:
    • 在某些版本中,incrementalAlterConfigs 方法可能会存在些问题,对单实例的 Kafka 支持得不是很好,会出现无法成功修改配置项的情况,此时就可以使用alterConfigs方法来代替。

    执行以上代码,修改了三个配置项的值:preallocate、min.cleanable.dirty.ratio 和 unclean.leader.election.enable。

  • 相关阅读:
    lincode 题目记录5
    lintcode题目记录4
    lintcode 题目记录3
    lintcode 题目记录2
    剖析ASP.NET Core(Part 2)- AddMvc(译)
    剖析ASP.NET Core MVC(Part 1)- AddMvcCore(译)
    如何 RESTFul 你的服务(译)
    Windows + IIS 环境部署Asp.Net Core App
    Asp.Net Core 缓存的使用(译)
    [转载].NET Core 轻量级模板引擎 Mustachio
  • 原文地址:https://www.cnblogs.com/L-Test/p/13439049.html
Copyright © 2011-2022 走看看