正常的生产逻辑需要以下几步:
- 配置生产者相关参数
- 创建一个生产者对象
- 构建发送消息
- 发送消息
- 关闭生产者实例
示例代码:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* @author: masheng
* @description: kafka生产者客户端示例
* @date: 2020/07/27 17:32
*/
public class KafkaProducerAnalyze {
private static final String TOPIC = "topic_test";
private static final String BROKER_LIST = "localhost:9092";
/*
* 功能描述: 初始化配置
* @author: masheng
* @time: 2020/7/27
* @param
* @return: java.util.Properties
*/
public static Properties initConfig() {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
//所有副本都复制完返回成功,延迟最高,可以设置all、0、1三种
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return properties;
}
public static void main(String[] args) {
//1.配置相关参数
Properties properties = initConfig();
//2.初始化生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
//3.构建发送消息
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "Hello,Kafka!");
try {
//4.发送消息
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
} finally {
//5.关闭生产者实例
producer.close();
}
}
}
1.ProducerRecord
消息对象,包含多个属性,定义如下:
public class ProducerRecord<K, V> {
private final String topic; //主题
private final Integer partition; //分区
private final Headers headers; //消息头部,设置与应用相关的一些信息
private final K key; //键
private final V value; //值,消息体
//消息时间戳,包含CreateTime和LogAppendTime两种,分别表示消息创建时间和追加到日志文件的时间
private final Long timestamp;
}
2.参数配置
3个必填参数:
- bootstrap.servers
- key.serializer
- value.serializer
说明:broker端接收的消息要以字节数组(byte[])的形式存在,所以消息发往broker之前需要做相应的序列化操作,这里必须填写序列化器的全类名
提示:可以使用org.apache.kafka.clients.producer.ProducerConfig类来设置参数,防止人为错误
3.发送消息
send方法:
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
send方法有两个重载方法,可以直接使用producer.send(record).get()实现同步发送
或者也可以获取一个RecordMetadata,从该对象里面获取一些元数据信息,如果需要这些信息,可以使用这种方式
异步发送:可以在send()方法里指定一个回调函数,Kafka在返回响应时调用该函数来实现异步的发送确认,示例如下:
//回调函数对异常进行处理
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(null==exception){
System.out.println("no exception!");
}
if(null!=metadata){
System.out.print("offset:"+metadata.offset()+";partition:"+metadata.partition());
}
}
})
4.异常
KafkaProducer中一般有两种异常:可重试异常和不可重试异常
可重试异常可以通过配置retries参数来控制,如果在规定的重试次数内恢复了,就不会抛出异常,默认值为0
5.序列化
自带的有ByteArray、ByteBuffer、Bytes、Double、Integer、Long、String等类型的序列化器,都实现了org.apache.kafka.common.serialization.Serializer接口,有三个方法:
//配置当前类
void configure(Map<String, ?> configs, boolean isKey);
//执行序列化操作
byte[] serialize(String topic, T data);
//关闭当前的序列化器
void close();
StringSerializer类:
public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";
//确定编码类型
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null)
encodingValue = configs.get("serializer.encoding");
if (encodingValue instanceof String)
encoding = (String) encodingValue;
}
//将String类型转为byte[]类型
@Override
public byte[] serialize(String topic, String data) {
try {
if (data == null)
return null;
else
return data.getBytes(encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
}
}
//空方法
@Override
public void close() {
// nothing to do
}
}
如果自带的序列化器无法满足我们的需求,可以自己实现一个序列化器,如下:
//创建发送消息的对象
public class Person {
private String name;
private String tel;
public Person() {
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getTel() {
return tel;
}
public void setTel(String age) {
this.tel = age;
}
}
public class PersonSerializer implements Serializer<Person> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, Person data) {
if (data == null) {
return null;
}
byte[] name, tel;
try {
if (data.getName() != null) {
name = data.getName().getBytes("UTF-8");
} else {
name = new byte[0];
}
if (data.getTel() != null) {
tel = data.getTel().getBytes("UTF-8");
} else {
tel = new byte[0];
}
ByteBuffer buffer = ByteBuffer.allocate(name.length + tel.length);
buffer.put(name);
buffer.put(tel);
return buffer.array();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return new byte[0];
}
@Override
public void close() {
}
}
6.分区器
消息在发往broker的过程中,需要经过拦截器、序列化器、分区器等一系列作用之后,才会被发往broker
Kafka中默认的分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner,实现了Partitioner接口,该接口定义了2个方法:
//计算分区号
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
//关闭分区器时回收一些资源
public void close();
DefaultPartitioner类如下:
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public void configure(Map<String, ?> configs) {}
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
//如果key为null,以轮询的方式发往主题内的可用分区
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// 如果key不为null,对key进行hash,根据hash值计算分区号
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
public void close() {}
}
自定义分区器只需要实现Partitioner接口即可
7.拦截器
生产者拦截器可以用来在发送前做一些准备工作,比如过滤某种消息等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作
需要实现org.apache.kafka.clients.producer.ProducerInterceptor接口,该接口中包含3个方法:
//将消息序列化和计算分区之前调用,对消息进行定制化操作
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
//消息被应答之前或消息发送失败时调用,优先于用户的Callback执行,运行在Producer的I/O中,越简单越好,否则影响消息发送速度
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
//关闭拦截器时进行一些清理工作
public void close();
示例:
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
/**
* @author: masheng
* @description: 生产者拦截器示例
* @date: 2020/07/27 20:41
*/
public class ProducerInterceptor implements org.apache.kafka.clients.producer.ProducerInterceptor<String, String> {
private volatile long sendSuccess = 0;
private volatile long sendFailure = 0;
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
String modifyValue = "pre-" + record.value();
return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(),
record.key(), modifyValue, record.headers());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception == null) {
sendSuccess++;
} else {
sendFailure++;
}
}
@Override
public void close() {
double successRatio = (double) sendSuccess / (sendFailure + sendSuccess);
System.out.println("发送成功率= " + String.format("%f", successRatio * 100 + "%"));
}
@Override
public void configure(Map<String, ?> configs) {
}
}