1.大纲
` 可以构建kafka的java客户段
了解kafka客户端类型
掌握kafka客户端的基本操作
二:客户端类型
1.五类客户端类型

2.kafka客户端API类型
AdminClient:允许管理核检测Topic,broker,以及其他的kafka对象
Producer:发布消息到topic
Consumer:订阅消息,并处理消息
Stream:高效的将输入流转换为输出流
Connectors:从一些源系统或者应用程序中拉取数据到kafka
三:AdminClient API
1.程序
package com.jun.kafka.admin;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.jun.kafka.common.utils.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.ConfigResource;
import java.util.*;
import java.util.concurrent.ExecutionException;
/**
* admincliet的演示
*/
@Slf4j
public class AdminSample {
private static final String TOPIC_NAME = "jun-topic";
public static void main(String[] args) {
// alterConfig();
describeTopic();
}
/**
* 设置client
*/
public static AdminClient adminClient(){
Properties properties = new Properties();
// kafka服务器
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.19.129:9092");
return AdminClient.create(properties);
}
/**
* 创建topic
*/
public static void createTopic(){
AdminClient adminClient = adminClient();
// 副本因子
short replicationFactor = 1;
NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, replicationFactor);
CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
log.info("{}", JsonUtils.toJsonString(topics));
}
/**
* 查询所有的topic
*/
public static void topicLists(){
AdminClient adminClient = adminClient();
ListTopicsResult listTopicsResult = adminClient.listTopics();
try {
Set<String> names = listTopicsResult.names().get();
names.stream().forEach(System.out::println);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
/**
* 查询所有的topic,可以查看内部topic
*/
public static void topicOptionLists(){
AdminClient adminClient = adminClient();
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
ListTopicsResult listTopicsResult = adminClient.listTopics(options);
try {
Set<String> names = listTopicsResult.names().get();
names.stream().forEach(System.out::println);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
/**
* 删除topic
*/
public static void deleteTopic(){
AdminClient adminClient = adminClient();
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
}
/**
* 查询topic的描述
*/
public static void describeTopic(){
String NEW_TOPIC_NAME = "caojun-topic";
AdminClient adminClient = adminClient();
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(NEW_TOPIC_NAME));
try {
Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
entries.stream().forEach(entry->{
log.info("key={}", entry.getKey());
log.info("value={}", entry.getValue());
});
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
/**
* topic的配置信息
*/
public static void describeConfig(){
String NEW_TOPIC_NAME = "caojun-topic";
AdminClient adminClient = adminClient();
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, NEW_TOPIC_NAME);
DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource));
try {
Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get();
configResourceConfigMap.entrySet().stream().forEach(configResourceConfigEntry -> {
log.info("key={}", configResourceConfigEntry.getKey());
log.info("value={}", configResourceConfigEntry.getValue());
});
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
/**
* topic的配置信息修改
*/
public static void alterConfig(){
String NEW_TOPIC_NAME = "caojun-topic";
AdminClient adminClient = adminClient();
Map<ConfigResource, Config> configMap= new HashMap<>();
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, NEW_TOPIC_NAME);
Config config = new Config(Arrays.asList(new ConfigEntry("preallocate", "true")));
configMap.put(configResource, config);
AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(configMap);
}
/**
* 增加partitions
*/
public static void incrPartitions(){
String NEW_TOPIC_NAME = "caojun-topic";
AdminClient adminClient = adminClient();
Map<String, NewPartitions> partitionsMap = new HashMap<>();
NewPartitions newPartitions = NewPartitions.increaseTo(2);
partitionsMap.put(NEW_TOPIC_NAME, newPartitions);
CreatePartitionsResult partitions = adminClient.createPartitions(partitionsMap);
try {
Void aVoid = partitions.all().get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
四:ProducerClient API
1.发送模式
同步发送
异步发送
异步发送回调发送
2.异步发送
/**
* 异步发送
*/
public static void producerSend(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.19.129:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, "0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// Producer的主对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 消息对象
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key", "value");
producer.send(record);
producer.close();
}
效果:‘

3.同步发送
/**
* 同步发送,异步阻塞的方式
*/
public static void producerSyncSend(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.19.129:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, "0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// Producer的主对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 消息对象
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-sync", "value-sync");
Future<RecordMetadata> send = producer.send(record);
RecordMetadata recordMetadata = null;
try {
recordMetadata = send.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
log.info("partition:{}, offset:{}", recordMetadata.partition(), recordMetadata.offset());
producer.close();
}
4.异步回掉
/**
* 异步回调
*/
public static void producerSendWithCallback(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.19.129:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, "0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// Producer的主对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 消息对象
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-callback", "value-callback");
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
log.info("回调了");
log.info("partition:{}, offset:{}", recordMetadata.partition(), recordMetadata.offset());
}
});
producer.close();
}
效果:

5.原理分析
KafkaProducer的构造器:
KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors interceptors, Time time) {
ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer));
try {
Map<String, Object> userProvidedConfigs = config.originals();
this.producerConfig = config;
this.time = time;
String transactionalId = userProvidedConfigs.containsKey("transactional.id") ? (String)userProvidedConfigs.get("transactional.id") : null;
this.clientId = buildClientId(config.getString("client.id"), transactionalId);
LogContext logContext;
if (transactionalId == null) {
logContext = new LogContext(String.format("[Producer clientId=%s] ", this.clientId));
} else {
logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", this.clientId, transactionalId));
}
this.log = logContext.logger(KafkaProducer.class);
this.log.trace("Starting the Kafka producer");
Map<String, String> metricTags = Collections.singletonMap("client-id", this.clientId);
// kafka的监控
MetricConfig metricConfig = (new MetricConfig()).samples(config.getInt("metrics.num.samples")).timeWindow(config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS).recordLevel(RecordingLevel.forName(config.getString("metrics.recording.level"))).tags(metricTags);
List<MetricsReporter> reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class, Collections.singletonMap("client.id", this.clientId));
reporters.add(new JmxReporter("kafka.producer"));
this.metrics = new Metrics(metricConfig, reporters, time);
// 加载分区器
this.partitioner = (Partitioner)config.getConfiguredInstance("partitioner.class", Partitioner.class);
long retryBackoffMs = config.getLong("retry.backoff.ms");
// 初始化系列化
if (keySerializer == null) {
this.keySerializer = (Serializer)config.getConfiguredInstance("key.serializer", Serializer.class);
this.keySerializer.configure(config.originals(), true);
} else {
config.ignore("key.serializer");
this.keySerializer = keySerializer;
}
if (valueSerializer == null) {
this.valueSerializer = (Serializer)config.getConfiguredInstance("value.serializer", Serializer.class);
this.valueSerializer.configure(config.originals(), false);
} else {
config.ignore("value.serializer");
this.valueSerializer = valueSerializer;
}
userProvidedConfigs.put("client.id", this.clientId);
ProducerConfig configWithClientId = new ProducerConfig(userProvidedConfigs, false);
List<ProducerInterceptor<K, V>> interceptorList = configWithClientId.getConfiguredInstances("interceptor.classes", ProducerInterceptor.class);
if (interceptors != null) {
this.interceptors = interceptors;
} else {
this.interceptors = new ProducerInterceptors(interceptorList);
}
ClusterResourceListeners clusterResourceListeners = this.configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
this.maxRequestSize = config.getInt("max.request.size");
this.totalMemorySize = config.getLong("buffer.memory");
this.compressionType = CompressionType.forName(config.getString("compression.type"));
this.maxBlockTimeMs = config.getLong("max.block.ms");
this.transactionManager = configureTransactionState(config, logContext, this.log);
int deliveryTimeoutMs = configureDeliveryTimeout(config, this.log);
this.apiVersions = new ApiVersions();
// 初始化计数器
this.accumulator = new RecordAccumulator(logContext, config.getInt("batch.size"), this.compressionType, lingerMs(config), retryBackoffMs, deliveryTimeoutMs, this.metrics, "producer-metrics", time, this.apiVersions, this.transactionManager, new BufferPool(this.totalMemorySize, config.getInt("batch.size"), this.metrics, time, "producer-metrics"));
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList("bootstrap.servers"), config.getString("client.dns.lookup"));
if (metadata != null) {
this.metadata = metadata;
} else {
this.metadata = new ProducerMetadata(retryBackoffMs, config.getLong("metadata.max.age.ms"), logContext, clusterResourceListeners, Time.SYSTEM);
this.metadata.bootstrap(addresses, time.milliseconds());
}
this.errors = this.metrics.sensor("errors");
// 守护线程
this.sender = this.newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = "kafka-producer-network-thread | " + this.clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
// 启动守护线程
this.ioThread.start();
config.logUnused();
AppInfoParser.registerAppInfo("kafka.producer", this.clientId, this.metrics, time.milliseconds());
this.log.debug("Kafka producer started");
} catch (Throwable var23) {
this.close(Duration.ofMillis(0L), true);
throw new KafkaException("Failed to construct kafka producer", var23);
}
}
sender的原理:
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return this.doSend(interceptedRecord, callback);
}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
this.throwIfProducerClosed();
KafkaProducer.ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), this.maxBlockTimeMs);
} catch (KafkaException var20) {
if (this.metadata.isClosed()) {
throw new KafkaException("Producer closed while send in progress", var20);
}
throw var20;
}
long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException var19) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var19);
}
byte[] serializedValue;
try {
serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException var18) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var18);
}
// 计算分区,消息具体进入哪个partition
int partition = this.partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
this.setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);
this.ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? this.time.milliseconds() : record.timestamp();
if (this.log.isTraceEnabled()) {
this.log.trace("Attempting to append record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition});
}
// 组织calllback对象
Callback interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp);
if (this.transactionManager != null && this.transactionManager.isTransactional()) {
this.transactionManager.failIfNotReadyForSend();
}
// 计算批次
RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true);
// 超出阈值
if (result.abortForNewBatch) {
int prevPartition = partition;
this.partitioner.onNewBatch(record.topic(), cluster, partition);
partition = this.partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (this.log.isTraceEnabled()) {
this.log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", new Object[]{record.topic(), partition, prevPartition});
}
interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp);
result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false);
}
if (this.transactionManager != null && this.transactionManager.isTransactional()) {
this.transactionManager.maybeAddPartitionToTransaction(tp);
}
if (result.batchIsFull || result.newBatchCreated) {
this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
// 发送靠的是守护进程
this.sender.wakeup();
}
return result.future;
} catch (ApiException var21) {
this.log.debug("Exception occurred during message send:", var21);
if (callback != null) {
callback.onCompletion((RecordMetadata)null, var21);
}
this.errors.record();
this.interceptors.onSendError(record, tp, var21);
return new KafkaProducer.FutureFailure(var21);
} catch (InterruptedException var22) {
this.errors.record();
this.interceptors.onSendError(record, tp, var22);
throw new InterruptException(var22);
} catch (BufferExhaustedException var23) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
this.interceptors.onSendError(record, tp, var23);
throw var23;
} catch (KafkaException var24) {
this.errors.record();
this.interceptors.onSendError(record, tp, var24);
throw var24;
} catch (Exception var25) {
this.interceptors.onSendError(record, tp, var25);
throw var25;
}
}
图形示意图:


6.自定义负载均衡器
package com.jun.kafka.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 自定义分区器
*/
public class SamplePartition implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
/*
key-1
key-2
key-3
*/
String keyStr = key + "";
String keyInt = keyStr.substring(4);
System.out.println("keyStr : " + keyStr + "keyInt : " + keyInt);
int i = Integer.parseInt(keyInt);
return i % 2;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
使用:
/**
* 有自定义分区器的发送
*/
public static void producerSendWithPartition(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.19.129:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, "0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
// 分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.jun.kafka.producer.SamplePartition");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// Producer的主对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// 消息对象
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-2", "value");
producer.send(record);
producer.close();
}
7.消费传递保障
kafka提供了三种传递保障:
最多一次
至少一次
正好一次
传递依赖于Producer与Consumer的共同实现
传递保障主要依赖于Producer
依赖于配置项:
properties.put(ProducerConfig.ACKS_CONFIG, "all");