主题的管理
主题的管理包括创建主题、查看主题信息、修改主题、删除主题等操作,可以通过kafka的kafka-topics.sh脚本来执行这些操作,这个脚本在$KAFKA_HOME/bin/目录下。
该脚本正文仅有一行:
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
实际上调用的是kafka.admin.TopicCommand
类来执行主题管理的操作。
创建主题
如果broker端配置了auto.create.topics.enable
设置为true。那么当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为num.partitions(默认为1)、副本因子为default.replication.factor(默认为1)的主题。此外,当一个消费者从未知主题读取消息时,或当任意一个客户端向未知主题发送元数据请求时,都会按照配置参数num.partitions
和default.replication.factor
的值创建一个相应的主题。但是不建议将auto.create.topics.enable
设置为true,这回增加主题的管理和维护的难度。
使用kafka-topics.sh脚本一个分区数为4,副本因子为1的主题:
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic topic-create --partitions 4 --replication-factor 1
执行完成后,kafka会在log.dir参数所配置目录下创建对应的主题分区。
ls -al logs | grep topic-create
创建了四个文件夹,对应topic-create主题的四个分区编号,命名方式概括为<topic>-<partition>。这里的文件夹对应的不是分区,分区是一个逻辑概念而没有物理存在。
主题、分区、副本和日志关系如下图:
主题和分区都是提供给上层用户的抽象,而在副本层面或更确切地说是Log层面才有实际物理上的存在。同一个分区中的多个副本必须分布在不同的broker中,这样才能提供有效的数据冗余。
此外我们还可以从zookeeper客户端中获取。当创建一个主题时,会在/brokers/topics/
目录下创建一个同名节点,该节点记录了该主题的分区副本分配方案:
[zk: localhost:2181(CONNECTED) 0] get /brokers/topics/topic-create
图中的"1":[0]表示分区1分配一个副本,在brokerId为0的broker节点中。
使用--describe查看分区副本的分配细节:
bin/kafka-topics.sh --zookeeper 192.168.1.51:2181 --describe --topic topic-create
此外还可以通过replica-assignmemt参数来手动指定分区副本的分配方案:这种方式根据分区号的数值大小按照从小到大的顺序进行排列,分区与分区之间用逗号","隔开,分区内多个副本用":"隔开,并且在使用replica-assignment参数创建主题时不需要原本必备的partitions和replication-factor这两个参数。
--replica-assignmemt <String:
broker_id_for_part1_replica1 : broker_id_for_part1_replica2 : ...,
broker_id_for_part2_replica1 : broker_id_for_part2_replica2 : ...,
...
>
下面演示创建一个与主题topic-create相同的分配方案的主题topic-create-assign:
bin/kafka-topics.sh --zookeeper 192.168.1.51:2181 --create --topic topic-create-assign --replica-assignment 0,0,0,0
-
注意同一个分区内的副本不能有重复,比如指定了0:0,1:1这种,就会报出异常。
-
分区之间指定的副本数不同,比如0:1,0,1:0这种,也会报出异常。
-
这种0:1,,0:1,1:0,企图跳过一个分区的行为也是不允许的。
创建主题时,我们还可以通过config参数来设置所要创建主题的相关参数,以覆盖默认配置
--config <String:name1=value1> --config <String:name2=value2>
默认情况下,如果创建一个已存在的主题,会直接抛出异常。但是可以添加一个参数--if-not-exists
,那么发生命名冲突后将不做任何处理(不报错,也不创建主题),如果没有发生命名冲突,那么和不带--if-not-exists
参数的行为一样正常创建主题。
注意:使用kafka-topics.sh创建主题时还会检测是否包含"."或""字符。kafka的内部做埋点时会根据主题的名称来命名metrics的名称,并且会将"."改成下划线"",如果先命名了topic_1.2,再次创建topic.1_2主题就会抛出InvalidTopicException异常,如下图
kafka从0.10.x版本开始支持broker的机架信息,通过broker端broker.rack
参数配置,如果指定了机架信息,则在分区副本分配时会尽可能让分区副本分配到不同的机架上。
如果一个集群中有部分broker指定了机架信息,并且其余broker没有指定broker机架信息,会直接抛出异常。这时想要创建主题,要么集群所有broker都加上机架信息或者都去掉机架信息,要么使用--disable-rack-aware
参数来忽略机架信息。
代码创建主题(可能缺少一部分依赖,自己补全就行):
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.5</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.5</version>
</dependency>
java的代码:
public static void main(String[] args) {
String[] options = new String[]{
"--zookeeper", "192.168.1.51:2181",
"--create",
"--topic", "topicCommand-test",
"--partitions", "2",
"--replication-factor", "1"
};
TopicCommand.main(options);
}
查看主题
kafka-topics.sh脚本有5中指令类型:create、list、describe、alter、delete。
list和describe可以用来方便查看主题信息。
通过list指令可以查看当前所有可用的主题
bin/kafka-topics.sh --zookeeper localhost:2181 -list
通过describe指令查看单个(多个,全部)主题信息
#单个
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topicName
#多个
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topicName1,topicName
#全部
bin/kafka-topics.sh --zookeeper localhost:2181 --describe
使用describe指令查看主题信息时还可以额外指定参数增加一些附加功能
- --topics-with-overrides 找出所有包含覆盖配置的主题
- --under-replicated-partitions 可以找出所有包含失效副本的分区
- --unavailable-partitions 可以查看主题中没有leader副本的分区
修改主题
当主题被创建之后,依然允许我们对其做一定修改,例如修改分区个数、修改配置等。使用alter指令。
增加主题的分区数:
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic wj --partitions 3
修改成功的警告信息:当主题中的消息包含key时,根据key计算分区的行为就会收到影响。所以增加分区需要三思而后行,建议一开始就设置好分区数。
如果修改的主题不存在,可以使用if-exists
参数来忽略异常。
kafka不支持减少分区数,会直接抛出异常。
alter指令配合--config
参数可以达到修改主题配置值
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic wj --config max.message.bytes=20000
我们可以通过--delete-config
参数来删除之前覆盖的配置,使其恢复原有的默认值
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic wj --delete-config max.message.bytes
配置管理
kafka-configs.sh脚本是专门用来对配置进行操作的(推荐使用,不推荐使用kafka-topics.sh,已过时)。kafka-configs.sh脚本包含变更配置alter和查看配置describe两种指令类型。此外,该脚本还支持操作broker、用户和客户端的配置。
该脚本使用--entity-type
参数来指定操作配置的类型,并使用--entity-name
来指定操作配置的名称。例如查看主题wj配置:
bin/kafka-configs.sh --bootstrap-server 192.168.1.51:9092 --describe --entity-type topics --entity-name wj
entity-type只可以配置4个值:topics,brokers,clients,users
entity-type和entity-name的对应关系如下表:
entity-type | entity-name |
---|---|
topics | 指定主题的名称 |
brokers | 指定brokerId值,broker中的broker.id值 |
clients | 指定clientId值,即Product或Consumer的client.id值 |
users | 指定用户名 |
使用alter指令变更配置时,需要配合--add-config
和--delete-config
这两个参数一起使用。--add-config
用来实现配置的增改,即覆盖原有配置;--delete-config用于删除配置,恢复默认值。
bin/kafka-configs.sh --bootstrap-server 192.168.1.51:9092 --alter --entity-type topics --entity-name wj --add-config k1=v1,k2=v2
bin/kafka-configs.sh --bootstrap-server 192.168.1.51:9092 --alter --entity-type topics --entity-name wj --delete-config k1,k2
删除主题
kafka-topics.sh脚本中的delete指令可以用于删除主题。
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topicName
我们直接执行删除指令,控制台提示必须将delete.topic.enable
设置为true才能删除主题,这个参数默认值就是true,如果为false,那么删除主题的操作将会被忽略。
如果删除的主体是kafka的内部主题,删除会直接报错。
删除一个不存在主体也会报错。同alter指令一样,设置--if-exists
参数可以忽略异常。
删除主体是一个不可逆的过程,一旦删除,与其相关的所以消息都会被删除
KafkaAdminClient
前面我们用TopicCommand创建了一个主体,当然我们可以用它实现主体的删除、修改、查看等操作,实质上与使用kafka-config.sh脚本的方式无异,交互性非常差。
所以kafka提供了KafkaAdminClient作为替代方案,继承于AdminClient,方法都是见名知意的。
下面演示少部分用法:
1.创建一个主题
String brokerList = "192.168.1.51:9092";
String topic = "topic-admin-test";
Properties prop = new Properties();
prop.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
prop.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
//构建KafkaAdminClient实例
AdminClient client = AdminClient.create(prop);
//设定所创建主题的具体信息
NewTopic newTopic = new NewTopic(topic, 4, (short) 1);
//创建主题
CreateTopicsResult result = client.createTopics(Collections.singleton(newTopic));
try {
//等待服务端返回
result.all().get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
client.close();//释放资源
2.查看主题配置
private static String brokerList = "192.168.1.51:9092";
private static String topic = "topic-admin-test";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties prop = new Properties();
prop.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
prop.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
//构建KafkaAdminClient实例
AdminClient client = AdminClient.create(prop);
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
DescribeConfigsResult result = client.describeConfigs(Collections.singleton(resource));
Config config = result.all().get().get(resource);
System.out.println(config);
client.close();
}
最终的输出结果不会只列出被覆盖的配置信息,而是会列出主题中所有的配置信息。
3.增加分区
private static String brokerList = "192.168.1.51:9092";
private static String topic = "topic-admin-test";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties prop = new Properties();
prop.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
prop.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
//构建KafkaAdminClient实例
AdminClient client = AdminClient.create(prop);
NewPartitions newPartitions = NewPartitions.increaseTo(5);
Map<String, NewPartitions> newPartitionsMap = new HashMap<>();
newPartitionsMap.put(topic, newPartitions);
CreatePartitionsResult result = client.createPartitions(newPartitionsMap);
result.all().get();
client.close();
}