一、Kafka 核心 API
上文中对 Kafka 做了一些简单的介绍,那么在开发过程中我们如何去访问 Kafka 呢?这就需要使用到本文将要介绍的Kafka客户端API。下图是官方文档中的一个图,形象的描述了能与 Kafka集成的客户端类型
Kafka的五类客户端API类型如下:
- AdminClient API:允许管理和检测Topic、broker以及其他Kafka实例,与Kafka自带的脚本命令作用类似
- Producer API:发布消息到1个或多个Topic,也就是生产者或者说发布方需要用到的API
- Consumer API:订阅1个或多个Topic,并处理产生的消息,也就是消费者或者说订阅方需要用到的API
- Stream API:高效地将输入流转换到输出流,通常应用在一些流处理场景
- Connector API:从一些源系统或应用程序拉取数据到Kafka,如上图中的DB
本文中,我们将主要介绍 AdminClient API。
二、AdminClient API
显然,操作AdminClient API的前提是需要创建一个 AdminClient
实例。代码示例:
/** * 创建AdminClient客户端对象 */ public static AdminClient createAdminClientByProperties() { Properties prop = new Properties(); // 配置Kafka服务的访问地址及端口号 prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.182.128:9092"); // 创建AdminClient实例 return AdminClient.create(prop); } /** * 创建AdminClient的第二种方式 */ public static AdminClient createAdminClientByMap(){ Map<String, Object> conf = Maps.newHashMap(); // 配置Kafka服务的访问地址及端口号 conf.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.182.128:9092"); // 创建AdminClient实例 return AdminClient.create(conf); }
创建了 AdminClient
的实例对象后,我们就可以通过它提供的方法操作 Kafka,常用的方法如下:
方法名称 | 作用 |
createTopics | 创建一个或多个Topic |
listTopics | 查询Topic列表 |
deleteTopics | 删除一个或多个Topic |
describeTopics | 查询Topic的描述信息 |
describeConfigs | 查询Topic、Broker等的所有配置项信息 |
alterConfigs | 用于修改Topic、Broker等的配置项信息(该方法在新版本中被标记为已过期) |
incrementalAlterConfigs | 同样也是用于修改Topic、Broker等的配置项信息,但功能更多、更灵活,用于代替alterConfigs |
createPartitions | 用于调整Topic的Partition数量,只能增加不能减少或删除,也就是说新设置的Partition数量必须大于等于之前的Partition数量 |
Tips:
describeTopics
和describeConfigs
的意义主要是在监控上,很多用于监控Kafka的组件都会使用到这两个API,因为通过这两个API可以获取到Topic自身和周边的详细信息
三、创建 Topic
使用createTopics
方法可以创建Topic,传入的参数也与kafka-topics.sh
命令脚本的参数一样。代码示例:
/** * 创建一个或多个topic * * @param topicNames topic名称的集合 */ public static void createTopic(List<String> topicNames) throws Exception { // 创建AdminClient客户端对象 AdminClient adminClient = BuildAdminClient.createAdminClientByMap(); List<NewTopic> topicList = Lists.newArrayList(); /** * 定义topic信息 * String name topic名 * int numPartitions 分区数 * short replicationFactor 副本数,必须不能大于broker数量 */ topicNames.forEach(topicName -> topicList.add( new NewTopic(topicName, 1, Short.parseShort("1")))); // 创建topic CreateTopicsResult result = adminClient.createTopics(topicList); // get方法是一个阻塞方法,一定要等到createTopics完成之后才进行下一步操作 result.all().get(); // 打印新创建的topic名 result.values().forEach((name, future) -> System.out.println("topicName:" + name)); // 关闭资源 adminClient.close(); }
四、删除 Topic
deleteTopics
方法可以删除一个或多个Topic,代码示例:
/** * 删除一个或多个topic * * @param topicNames topic名称的集合 */ public static void removeTopic(List<String> topicNames) throws Exception { // 创建AdminClient客户端对象 AdminClient adminClient = BuildAdminClient.createAdminClientByProperties(); // 删除topic集合 DeleteTopicsResult result = adminClient.deleteTopics(topicNames); // get方法是一个阻塞方法,一定要等到deleteTopics完成之后才进行下一步操作 result.all().get(); // 关闭资源 adminClient.close(); }
五、查看 Topics 列表
listTopics
方法用于查询Topic列表,通过传入 ListTopicsOptions
参数可以设置一些可选项。代码示例:
/** * 获取所有的topic信息,包括Kafka内部的topic * 如:__consumer_offsets,internal=true */ public static void listTopicsWithOptions() throws Exception { // 创建AdminClient客户端对象 AdminClient adminClient = BuildAdminClient.createAdminClientByProperties(); ListTopicsOptions options = new ListTopicsOptions(); // 列出内部的Topic options.listInternal(true); // 列出所有的topic ListTopicsResult result = adminClient.listTopics(options); // 获取所有topic的名字,返回的是一个Set集合 Set<String> topicNames = result.names().get(); // 打印所有topic的名字 topicNames.forEach(System.out::println); // 获取所有topic的信息,返回的是一个Collection集合 // (name=hello-kafka, internal=false),internal代表是否为内部的topic Collection<TopicListing> topicListings = result.listings().get(); // 打印所有topic的信息 topicListings.forEach(System.out::println); // 关闭资源 adminClient.close(); }
六、查看 Topic 的描述信息
一个 Topic 会有自身的描述信息,例如:partition
的数量,副本集的数量,是否为 internal
等等。AdminClient
中提供了 describeTopics
方法来查询这些描述信息。代码示例:
/** * 获取topic的描述信息 * * topic name = a-topic, desc = (name=a-topic, internal=false, partitions=(partition=0, leader=192.168.182.128:9092 (id: 0 rack: null), replicas=192.168.182.128:9092 (id: 0 rack: null), isr=192.168.182.128:9092 (id: 0 rack: null)), authorizedOperations=null) * topic name = b-topic, desc = (name=b-topic, internal=false, partitions=(partition=0, leader=192.168.182.128:9092 (id: 0 rack: null), replicas=192.168.182.128:9092 (id: 0 rack: null), isr=192.168.182.128:9092 (id: 0 rack: null)), authorizedOperations=null) */ public static void describeTopics(List<String> topics) throws Exception { // 创建AdminClient客户端对象 AdminClient adminClient = BuildAdminClient.createAdminClientByProperties(); // 获取Topic的描述信息 DescribeTopicsResult result = adminClient.describeTopics(topics); // 解析描述信息结果, Map<String, TopicDescription> ==> topicName:topicDescription Map<String, TopicDescription> topicDescriptionMap = result.all().get(); topicDescriptionMap.forEach((topicName, description) -> System.out.printf("topic name = %s, desc = %s ", topicName, description)); // 关闭资源 adminClient.close(); }
七、查看 Topic 的配置信息
除了Kafka自身的配置项外,其内部的Topic也会有非常多的配置项,我们可以通过describeConfigs
方法来获取某个Topic中的配置项信息。代码示例:
/** * 获取topic的配置信息 */ public static void describeConfigTopics(List<String> topicNames) throws Exception { // 创建AdminClient客户端对象 AdminClient adminClient = BuildAdminClient.createAdminClientByMap(); List<ConfigResource> configResources = Lists.newArrayListWithCapacity(64); topicNames.forEach(topicName -> configResources.add( // 指定要获取的源 new ConfigResource(ConfigResource.Type.TOPIC, topicName))); // 获取topic的配置信息 DescribeConfigsResult result = adminClient.describeConfigs(configResources); // 解析topic的配置信息 Map<ConfigResource, Config> resourceConfigMap = result.all().get(); resourceConfigMap.forEach((configResource, config) -> System.out.printf("topic config ConfigResource = %s, Config = %s ", configResource, config)); // 关闭资源 adminClient.close(); }
八、修改 Topic 的分区数量
createPartitions
方法来调整Topic的Partition数量,但是需要注意在Kafka中Partition只能增加不能减少。代码示例:/** * 修改topic的分区数量 * 只能增加不能减少 */ public static void updateTopicPartition(List<String> topicNames, Integer partitionNum) throws Exception { // 创建AdminClient客户端对象 AdminClient adminClient = BuildAdminClient.createAdminClientByMap(); // 构建修改分区的topic请求参数 Map<String, NewPartitions> newPartitions = Maps.newHashMap(); topicNames.forEach(topicName -> newPartitions.put(topicName, NewPartitions.increaseTo(partitionNum))); // 执行修改 CreatePartitionsResult result = adminClient.createPartitions(newPartitions); // get方法是一个阻塞方法,一定要等到createPartitions完成之后才进行下一步操作 result.all().get(); // 关闭资源 adminClient.close(); }
Tips:
- Partition的索引从0开始,所以第一个
partition=0
,第二个partition=1
九、修改 Topic 配置信息
除了可以查看Topic的配置项信息外,AdminClient
还提供了相关方法来修改Topic配置项的值。在早期版本中,使用alterConfigs
方法来修改配置项。代码示例:
/** * 修改topic的配置信息 * 使用旧版api:alterConfigs */ public static void updateTopicConfigOld(List<String> topicNames) throws Exception { // 创建AdminClient客户端对象 AdminClient adminClient = BuildAdminClient.createAdminClientByMap(); List<ConfigResource> configResources = Lists.newArrayListWithCapacity(64); // 指定要修改的ConfigResource类型及名称 topicNames.forEach(topicName -> configResources.add(new ConfigResource(ConfigResource.Type.TOPIC, topicName))); // 建立修改的配置项,配置项以ConfigEntry形式存在 Config config = new Config(Collections.singletonList(new ConfigEntry("preallocate", "true"))); // 参数构造 Map<ConfigResource, Config> configMap = Maps.newHashMap(); configResources.forEach(configResource -> configMap.put(configResource, config)); // 修改topic 配置,用的是老api,已经过时 AlterConfigsResult result = adminClient.alterConfigs(configMap); // get方法是一个阻塞方法,一定要等到alterConfigs完成之后才进行下一步操作 result.all().get(); // 关闭资源 adminClient.close(); }
执行以上代码,成功将 topic 的配置项 preallocate
的值改为了 true。
在新版本中则是使用 incrementalAlterConfigs
方法来修改Topic的配置项,该方法使用起来相对于 alterConfigs
要略微复杂一些,但因此功能更多、更灵活。代码示例:
/** * 修改topic的配置信息 * 使用新版api:incrementalAlterConfigs */ public static void updateTopicConfigNew(List<String> topicNames) throws Exception { // 创建AdminClient客户端对象 AdminClient adminClient = BuildAdminClient.createAdminClientByMap(); List<ConfigResource> configResources = Lists.newArrayListWithCapacity(64); // 指定要修改的ConfigResource类型及名称 topicNames.forEach(topicName -> configResources.add(new ConfigResource(ConfigResource.Type.TOPIC, topicName))); // 配置项同样以ConfigEntry形式存在,只不过增加了操作类型 // 以及能够支持操作多个配置项,相对来说功能更多、更灵活 Collection<AlterConfigOp> configs = Lists.newArrayList( new AlterConfigOp(new ConfigEntry("preallocate", "false"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0.5"), AlterConfigOp.OpType.SET), new AlterConfigOp(new ConfigEntry("unclean.leader.election.enable", "false"), AlterConfigOp.OpType.SET)); // 参数构造 Map<ConfigResource, Collection<AlterConfigOp>> configMaps = Maps.newHashMap(); configResources.forEach(configResource -> configMaps.put(configResource, configs)); // 下面这个是新api.但是有些麻烦 // 在某些版本中,incrementalAlterConfigs方法可能会存在些问题,对单实例的Kafka支持得不是很好,会出现无法成功修改配置项的情况 AlterConfigsResult result = adminClient.incrementalAlterConfigs(configMaps); // get方法是一个阻塞方法,一定要等到incrementalAlterConfigs完成之后才进行下一步操作 result.all().get(); // 关闭资源 adminClient.close(); }
- 在某些版本中,
incrementalAlterConfigs
方法可能会存在些问题,对单实例的 Kafka 支持得不是很好,会出现无法成功修改配置项的情况,此时就可以使用alterConfigs
方法来代替。
执行以上代码,修改了三个配置项的值:preallocate、min.cleanable.dirty.ratio 和 unclean.leader.election.enable。