自定义拦截器实现ProducerInterceptor
package com.wangbiao.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
/**
* 拦截改造消费的消息 记录改造value用竖线分割(下游自行解析)
*/
public class TimeInterceptor implements ProducerInterceptor<String,String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
String value = record.value();
//记录改造value用竖线分割
return new ProducerRecord(record.topic(),record.partition(),record.key(),System.currentTimeMillis()+"|"+value);
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
package com.wangbiao.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
/**
* 统计拦截多少失败多少数据
*/
public class CounterInterceptor implements ProducerInterceptor {
public int success;
public int error;
@Override
public ProducerRecord onSend(ProducerRecord record) {
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (metadata!=null){
if(success==Integer.MAX_VALUE){ success=0; success++; }
success++;
}else {
if(error==Integer.MAX_VALUE){error=0; error++; }
error++;
}
}
@Override
public void close() {
System.out.println("success:"+success);
System.out.println("error:"+error);
}
@Override
public void configure(Map<String, ?> configs) {
}
}