Kafka对主题的管理通过Kafka-topics.sh来执行,内容如下:
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
内部调用了kafka.admin.TopicCommand类
1.创建主题
如果配置了参数auto.create.topics.enable为true(默认为true),则遇到未创建主题的时候会按照默认值自动创建主题,所以不建议设置为true
通用的创建方式是通过kafka-topics.sh脚本来创建,命令如下:
bin/kafka-topics.sh --bootstrap-server hostname:9092 --create --replication-factor 1 --partitions 1 --topic xxx
2.分区副本的分配
指为集群指定创建主题时的分区副本分配方案,即在哪个broker中创建哪些分区的副本
使用kafka-topics.sh脚本创建主题时内部分配逻辑按照机架信息分为两种策略:指定机架信息和未指定机架信息,如果所有broker节点没有配置broker.rack参数,就是未指定机架,具体实现为kafka.admin.AdminUtils中的assignReplicasToBrokersRackUnaware()方法:
private def assignReplicasToBrokersRackUnaware(nPartitions: Int, //分区数
replicationFactor: Int, //副本因子
brokerList: Seq[Int], //broker列表
fixedStartIndex: Int, //起始索引,默认值为-1
startPartitionId: Int //起始分区编号): Map[Int, Seq[Int]] = {
val ret = mutable.Map[Int, Seq[Int]]() //保存分配结果,最后返回该集合
val brokerArray = brokerList.toArray
//如果起始索引小于0,根据brokerid列表长度随机生成一个,保证长度有效
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
//保证起始分区号不小于0,从这里可以看出默认情况下创建主题时总是从编号为0的分区依次轮询分配
var currentPartitionId = math.max(0, startPartitionId)
//指定副本间隔,为了将副本均匀分布到不同的broker上面
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
//轮询所有分区,将每个分区的副本分配到不同broker上面
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
//保存该分区所有副本分配的broker集合
for (j <- 0 until replicationFactor - 1)
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
//保存该分区所有副本的分配信息
ret.put(currentPartitionId, replicaBuffer)
//为下一个分区分配副本
currentPartitionId += 1
}
ret
}
replicaIndex方法如下:
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
}
3.KafkaAdminClient
基本使用
KafkaAdminClient继承了org.apache.kafka.clients.admin.AdminClient抽象类
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* @author: masheng
* @description: 创建主题
* @date: 2020/07/28 22:33
*/
public class CreateTopic {
private static final String TOPIC = "topic_admin";
private static final String BROKER_LIST = "localhost:9092";
public static Properties initConfig() {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
return properties;
}
public static void main(String[] args) {
Properties properties = initConfig();
//创建一个KafkaAdminClient实例
AdminClient client = AdminClient.create(properties);
NewTopic newTopic = new NewTopic(TOPIC, 4, (short) 1);
//真正创建主题
CreateTopicsResult result = client.createTopics(Collections.singleton(newTopic));
try {
//等待服务端返回
result.all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
//释放资源
client.close();
}
}