和性能相关的问题,可以通过上一篇总结的参数来琢磨配置,到这里,业务可以正常运作,正常发送不是问题
但是业务如果要严谨,需要关注异常情况怎么处理,这里着重总结发送失败的处理方式,另外需要看是用原生的
kafka-client还是springboot集成的springboot-kafka方式实现
kafka-clients方式
下面是KafkaProducer.class及其关键代码(KafkaProducer是线程安全的)
Producer producer = new KafkaProducer(kafkaProperties);
ProducerRecord<String,String> record = new ProducerRecord<>("topic1","key1","value111");
//这是最终的send方法
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback)
第一种,不管不顾,疯狂发送,就是send,这样写就等着被打回
producer.send(record);
第二种,同步方式,调用的是Future.get()来阻塞调用send()的线程,在catch中处理异常,这里存在两种情况
1、一切OK,就是发送失败了,比如连接突然断开,比如leader挂了,刚好选举,这时候retries属性就用上了
2、消息本身不对,比如超过设置的大小,会直接抛出异常try {
producer.send(record).get(); } catch (Exception e) { //处理你的异常,这里无法拿到发送的record消息,需要设计 }
第三种,异步方式,异常通过回调函数处理,而不是发送时出现异常立即就处理,骨架代码如下:
producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { //do something } });
查看Callback接口,可以看到不管是RecordMetadata还是Exception,都是不包含发送的record的
public interface Callback { void onCompletion(RecordMetadata var1, Exception var2); } public final class RecordMetadata { public static final int UNKNOWN_PARTITION = -1; private final long offset; private final long timestamp; private final int serializedKeySize; private final int serializedValueSize; private final TopicPartition topicPartition; private volatile Long checksum; ...... }
所以需要实现我们自己的org.apache.kafka.clients.producer.Callback接口,想办法传进去record,常规的设计思路,当然是增加一个属性,有参构造函数传进去
class MyCallback implements Callback { private Object msg; public MyCallback(Object msg) { this.msg = msg; } @Override public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception!=null){ do something with msg} } } producer.send(record, new MyCallback(record));
springboot-kafka方式
先说下几个点,springboot-kafka包是包含kafka-client包的,用法差异比价大
首先初始化一个KafkaTemplate
@Component public class KafkaConfig { @Value("${kafka.brokers}") private String brokers; @Autowired public KafkaProducerListener producerListener; /** * producer,方法名就是注入时候的属性变量名 * 使用时请注入属性:KafkaTemplate kafkaTemplate */ @Bean public KafkaTemplate<String, String> kafkaTemplate() { Map<String, Object> configs = new HashMap<>(16); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); //重试次数 configs.put(ProducerConfig.RETRIES_CONFIG, 3); //批次发送的大小,单位是byte configs.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); //请求超时时间默认给30s,每次retry都是一个完整的30s configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000); //这样可以保证最大程度的消息发送不丢失 configs.put(ProducerConfig.ACKS_CONFIG, "all"); //延迟发送的时间,目的是尽量使batch满了之后才发送,默认0 configs.put(ProducerConfig.LINGER_MS_CONFIG, 50); //生产者用来缓存等待发送到服务器的消息的内存总字节数,这里是默认值 configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); KafkaTemplate template = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configs)); template.setProducerListener(producerListener); return template; } }
然后是send方法,一般带回调的是这样
public void sendAndCallback(String msg) { ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topicName, msg); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onSuccess(SendResult<String, Object> result) { } @Override public void onFailure(Throwable ex) { } }); }
可以看到onSuccess时,可以拿到SendResult,但是onFailure时,只能拿到Throwable ex,也没法拿到msg,这是SendResult的属性
public class SendResult<K, V> { private final ProducerRecord<K, V> producerRecord; private final RecordMetadata recordMetadata; }
跟踪下send方法,在KafkaTemplate这个类里面最后找到如下位置:
可以看到producerListener的onError方法是可以处理发送失败的日志的,所以需要在定义KafkaTemplate时就定义自己的producerListener,然后set进去
定义自己的ProducerListener:
@Component @Slf4j public class KafkaProducerListener implements ProducerListener<String, Object> { @Override public void onSuccess(String topic, Integer partition, String key, Object value, RecordMetadata recordMetadata) { log.info("message send success:[]",value); } @Override public void onError(String topic, Integer partition, String key, Object value, Exception exception) { //消息发送到失败队列,或存库,或重试 } }
在上面初始化KafkaTemplate的时候,
template.setProducerListener(producerListener);
如此即可处理发送失败的消息,保证发送过程中消息尽量不丢失
最后说一下,KafkaProducer类是线程安全的,并且producer端是不支持batch发送一个List,然后多条msg到topic的