一、Producer发送
1、Producer发送模式
同步发送
异步发送
异步回调发送
2、异步发送
/**
* Producer异步发送
*/
public static void producerSend(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"118.xx.xx.101:9092");
properties.put(ProducerConfig.ACKS_CONFIG,"all");
properties.put(ProducerConfig.RETRIES_CONFIG,"0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
Producer<String,String> producer = new KafkaProducer<String, String>(properties);
// 消息对象
for(int i = 0; i< 10; i++)
{
ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC_NAME,"key-" + i,"value-" + i);
producer.send(record);
}
//关闭通道
producer.close();
}
配置说明
//消息保障策略
properties.put(ProducerConfig.ACKS_CONFIG,"all");
//重试
properties.put(ProducerConfig.RETRIES_CONFIG,"0");
//批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
//多长时间发送一个批次
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
//最大缓存
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
// key序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//value序列化
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
// partition负载均衡
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.example.kafkademo.producer.PartitionDemo");
3、Producer异步阻塞发送
/**
* Producer异步阻塞发送
*/
public static void producerSyncSend() throws Exception{
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"118.xx.xx.101:9092");
properties.put(ProducerConfig.ACKS_CONFIG,"all");
properties.put(ProducerConfig.RETRIES_CONFIG,"0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
Producer<String,String> producer = new KafkaProducer<String, String>(properties);
// 消息对象
for(int i = 0; i< 10; i++)
{
String key = "key-" + i;
ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC_NAME,key,"value-" + i);
Future<RecordMetadata> send = producer.send(record);
RecordMetadata recordMetadata = send.get();
System.out.println("key:" + key + " , recordMetadata ,partition:" + recordMetadata.partition()
+",offset: " + recordMetadata.offset());
}
//关闭通道
producer.close();
}
这里RecordMetadata recordMetadata = send.get(); 会等待发送结束。
4、异步回调

二、Producer源码解析
包括构建kafkaProducer和发送消息producer.send(record)
1、构建kafkaProducer
Producer并不是接到一条发一条,Producer是批量发送的
Producer<String,String> producer = new KafkaProducer<String, String>(properties);
主要有以下步骤:
1、初始化MetricConfig,用于监控使用
2、加载负载均衡器
3.1初始化keySerializer
3.2 初始化valueSerializer
4、初始化RecordAccumulator,类似于计数器。
5、启动newSender,是一个守护线程。所以每次new KafkaProducer的时候是一个新的线程,Producer是线程安全的。
KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors<K, V> interceptors, Time time) {
ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer));
try {
Map<String, Object> userProvidedConfigs = config.originals();
this.producerConfig = config;
this.time = time;
String transactionalId = userProvidedConfigs.containsKey("transactional.id") ? (String)userProvidedConfigs.get("transactional.id") : null;
//设置clientId
this.clientId = config.getString("client.id");
LogContext logContext;
if (transactionalId == null) {
logContext = new LogContext(String.format("[Producer clientId=%s] ", this.clientId));
} else {
logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", this.clientId, transactionalId));
}
this.log = logContext.logger(KafkaProducer.class);
this.log.trace("Starting the Kafka producer");
Map<String, String> metricTags = Collections.singletonMap("client-id", this.clientId);
//1、初始化MetricConfig,用于监控使用
MetricConfig metricConfig = (new MetricConfig()).samples(config.getInt("metrics.num.samples")).timeWindow(config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS).recordLevel(RecordingLevel.forName(config.getString("metrics.recording.level"))).tags(metricTags);
List<MetricsReporter> reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class, Collections.singletonMap("client.id", this.clientId));
JmxReporter jmxReporter = new JmxReporter();
jmxReporter.configure(userProvidedConfigs);
reporters.add(jmxReporter);
MetricsContext metricsContext = new KafkaMetricsContext("kafka.producer", config.originalsWithPrefix("metrics.context."));
this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
//2、加载负载均衡器
this.partitioner = (Partitioner)config.getConfiguredInstance("partitioner.class", Partitioner.class);
long retryBackoffMs = config.getLong("retry.backoff.ms");
//3.1初始化keySerializer
if (keySerializer == null) {
this.keySerializer = (Serializer)config.getConfiguredInstance("key.serializer", Serializer.class);
this.keySerializer.configure(config.originals(), true);
} else {
config.ignore("key.serializer");
this.keySerializer = keySerializer;
}
//3.2 初始化valueSerializer
if (valueSerializer == null) {
this.valueSerializer = (Serializer)config.getConfiguredInstance("value.serializer", Serializer.class);
this.valueSerializer.configure(config.originals(), false);
} else {
config.ignore("value.serializer");
this.valueSerializer = valueSerializer;
}
userProvidedConfigs.put("client.id", this.clientId);
ProducerConfig configWithClientId = new ProducerConfig(userProvidedConfigs, false);
List<ProducerInterceptor<K, V>> interceptorList = configWithClientId.getConfiguredInstances("interceptor.classes", ProducerInterceptor.class);
if (interceptors != null) {
this.interceptors = interceptors;
} else {
this.interceptors = new ProducerInterceptors(interceptorList);
}
ClusterResourceListeners clusterResourceListeners = this.configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
this.maxRequestSize = config.getInt("max.request.size");
this.totalMemorySize = config.getLong("buffer.memory");
this.compressionType = CompressionType.forName(config.getString("compression.type"));
this.maxBlockTimeMs = config.getLong("max.block.ms");
int deliveryTimeoutMs = configureDeliveryTimeout(config, this.log);
this.apiVersions = new ApiVersions();
this.transactionManager = this.configureTransactionState(config, logContext);
//4、初始化RecordAccumulator,类似于计数器。
this.accumulator = new RecordAccumulator(logContext, config.getInt("batch.size"), this.compressionType, lingerMs(config), retryBackoffMs, deliveryTimeoutMs, this.metrics, "producer-metrics", time, this.apiVersions, this.transactionManager, new BufferPool(this.totalMemorySize, config.getInt("batch.size"), this.metrics, time, "producer-metrics"));
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList("bootstrap.servers"), config.getString("client.dns.lookup"));
if (metadata != null) {
this.metadata = metadata;
} else {
this.metadata = new ProducerMetadata(retryBackoffMs, config.getLong("metadata.max.age.ms"), config.getLong("metadata.max.idle.ms"), logContext, clusterResourceListeners, Time.SYSTEM);
this.metadata.bootstrap(addresses);
}
this.errors = this.metrics.sensor("errors");
// 5、启动newSender,是一个守护线程。所以每次new KafkaProducer的时候是一个新的线程,Producer是线程安全的。
this.sender = this.newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = "kafka-producer-network-thread | " + this.clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
config.logUnused();
AppInfoParser.registerAppInfo("kafka.producer", this.clientId, this.metrics, time.milliseconds());
this.log.debug("Kafka producer started");
} catch (Throwable var25) {
this.close(Duration.ofMillis(0L), true);
throw new KafkaException("Failed to construct kafka producer", var25);
}
}
2、发送消息
主要内容: 1、创建批次。 2、向批次中追加消息。
producer.send(record)
主要调用了doSend方法
1、 计算分区: 消息具体进入哪一个partition
2、accumulator.append 计算批次。每次发送往this.accumulator append一条记录。一批发送多少数据。
3、达到一定的记录进行消息发送
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
this.throwIfProducerClosed();
long nowMs = this.time.milliseconds();
KafkaProducer.ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), nowMs, this.maxBlockTimeMs);
} catch (KafkaException var22) {
if (this.metadata.isClosed()) {
throw new KafkaException("Producer closed while send in progress", var22);
}
throw var22;
}
nowMs += clusterAndWaitTime.waitedOnMetadataMs;
long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException var21) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var21);
}
byte[] serializedValue;
try {
serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException var20) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var20);
}
//计算分区: 消息具体进入哪一个partition
int partition = this.partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
this.setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);
this.ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
if (this.log.isTraceEnabled()) {
this.log.trace("Attempting to append record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition});
}
Callback interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp);
if (this.transactionManager != null && this.transactionManager.isTransactional()) {
this.transactionManager.failIfNotReadyForSend();
}
//accumulator.append 计算批次。每次发送往this.accumulator append一条记录。一批发送多少数据。
RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
if (result.abortForNewBatch) {
int prevPartition = partition;
this.partitioner.onNewBatch(record.topic(), cluster, partition);
partition = this.partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (this.log.isTraceEnabled()) {
this.log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", new Object[]{record.topic(), partition, prevPartition});
}
interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp);
result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}
if (this.transactionManager != null && this.transactionManager.isTransactional()) {
this.transactionManager.maybeAddPartitionToTransaction(tp);
}
if (result.batchIsFull || result.newBatchCreated) {
this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
//达到一定的记录进行消息发送
this.sender.wakeup();
}
return result.future;
} catch (ApiException var23) {
this.log.debug("Exception occurred during message send:", var23);
if (callback != null) {
callback.onCompletion((RecordMetadata)null, var23);
}
this.errors.record();
this.interceptors.onSendError(record, tp, var23);
return new KafkaProducer.FutureFailure(var23);
} catch (InterruptedException var24) {
this.errors.record();
this.interceptors.onSendError(record, tp, var24);
throw new InterruptException(var24);
} catch (KafkaException var25) {
this.errors.record();
this.interceptors.onSendError(record, tp, var25);
throw var25;
} catch (Exception var26) {
this.interceptors.onSendError(record, tp, var26);
throw var26;
}
}
三、Producer发送原理
1、直接发送
2、负载均衡
3、异步发送
Producer业务流程图

四、Producer自定义partition负载均衡
1、创建类PartitionDemo
key的结构中带有数字,数字%2, 分别负载在partition 0和1
public class PartitionDemo implements Partitioner {
@Override
public int partition(String s, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
/*
key结构
key-1
key-2
key-3
*/
String keyStr = key + "";
String keyInt = keyStr.substring(4);
System.out.println("keyStr:" + keyStr + ",keyInt:" + keyInt);
int i = Integer.parseInt(keyInt);
return i % 2 ;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
2、发送消息
/**
* Producer异步发送带回调函数和partition负载均衡
*/
public static void producerSendWithCallbackAndPartition(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"118.xx.xx.101:9092");
properties.put(ProducerConfig.ACKS_CONFIG,"all");
properties.put(ProducerConfig.RETRIES_CONFIG,"0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.example.kafkademo.producer.PartitionDemo");
Producer<String,String> producer = new KafkaProducer<String, String>(properties);
// 消息对象
for(int i = 0; i< 10; i++) {
String key = "key-" + i;
ProducerRecord<String,String> record =
new ProducerRecord<>(TOPIC_NAME, key,"value-" + i);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("key:" + key + " , recordMetadata ,partition:" + recordMetadata.partition()
+",offset: " + recordMetadata.offset());
}
});
}
//关闭通道
producer.close();
}
3、返回结果
key:key-1 , recordMetadata ,partition:1,offset: 0 key:key-3 , recordMetadata ,partition:1,offset: 1 key:key-5 , recordMetadata ,partition:1,offset: 2 key:key-7 , recordMetadata ,partition:1,offset: 3 key:key-9 , recordMetadata ,partition:1,offset: 4 key:key-0 , recordMetadata ,partition:0,offset: 38 key:key-2 , recordMetadata ,partition:0,offset: 39 key:key-4 , recordMetadata ,partition:0,offset: 40 key:key-6 , recordMetadata ,partition:0,offset: 41 key:key-8 , recordMetadata ,partition:0,offset: 42
五、消息传递保障
1、kafka提供了三种传递保障
1、最多一次(性能最好): 收到0到1次。消息发送出去后不会去确认,要么收到一次,要么就没有收到。
2、至少一次: 收到1到多次。消息发出去了,一定要等待响应。如果没有响应,则会进行重发。
没有响应有的情况: 消息已经存起来了。但是返回的途中,或某个环节出了问题,然后又发了一次。
3、正好一次(性能最差): 在生产者分配了一个tranceId, 然后加上消息一起进行发送。
如果遇到没有响应,则会带上tranceId和消息,再发送一次。Broker这边会做一个去重。
如下代码,配置了all是最严格的,只有一次。
properties.put(ProducerConfig.ACKS_CONFIG,"all");
2、传递保障依赖于Producer和Consuer共同实现
3、传递保障主要依赖于Producer