producer在发送数据时,会经过拦截器和序列化,最后到达相应的分区。在经过拦截器时,我们可以对发送的数据做进步的处理。
要正确的使用拦截器需要以下步骤:
1.实现拦截器ProducerInterceptor的方法
2.在producer的prop中配置
prop.put("interceptor.classes", "com.xxx.interceptor.xxxInterceptor")
如果是拦截器链的话,在后面追加即可
prop.put("interceptor.classes", ""com.xxx.interceptor.xxxInterceptor1,com.xxx.interceptor.xxxInterceptor2");
生产者的拦截器需要实现ProducerInterceptor接口中的方法来实现
@Override
public void configure(Map<String, ?> arg0) {} #获取broker的配置信息
@Override public void close() {} #在producer关闭时调用此方法
@Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {} #数据在写到broker时,无论是否成功的回调 @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {} #拦截的信息
生产者:
public class ProducerDemo { private static final Logger LOG = LoggerFactory.getLogger(ProducerDemo.class); public static void main(String[] args) throws InterruptedException, ExecutionException { //1.加载配置信息 Properties prop = loadProperties(); //2.创建生产者 KafkaProducer<String,String> producer = new KafkaProducer<>(prop); //3.发送内容 String sendContent = "hello_kafka"; IntStream.range(0, 10).forEach(i ->{ try { ProducerRecord<String,String> record = new ProducerRecord<>("test1",sendContent+"_"+i); Future<RecordMetadata> future = producer.send(record); RecordMetadata recordMetadata = future.get(); LOG.info("发送的数据是 :{},offset:是{},partition是:{}",sendContent,recordMetadata.offset(),recordMetadata.partition()); } catch (Exception e) { e.printStackTrace(); } }); producer.close(); //回调拦截器中的close方法 } //配置文件的设置 public static Properties loadProperties() { Properties prop = new Properties(); prop.put("bootstrap.servers", "192.168.100.144:9092,192.168.100.145:9092,192.168.100.146:9092"); prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prop.put("interceptor.classes", "com.zpb.interceptor.ProducerInterceptorDemo,com.zpb.interceptor.ProducerInterceptorDemo2"); prop.put("acks", "all"); //发送到所有的ISR队列中 return prop; } }
拦截器一:
public class ProducerInterceptorDemo implements ProducerInterceptor<String, String>{ private static final Logger LOG = LoggerFactory.getLogger(ProducerInterceptorDemo.class); private volatile long succNum = 0; private volatile long failNum = 0; @Override public void configure(Map<String, ?> arg0) { LOG.info("configure ==>"+arg0); } @Override public void close() { double succRatio = succNum/succNum+failNum; LOG.info("成功率是:"+succRatio*100); } @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { if(null == e){ succNum++; }else{ failNum++; } } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) { String prefixValue = producerRecord.value()+"prefix_1"; return new ProducerRecord<String, String>(producerRecord.topic(),prefixValue); } }
拦截器二:
public class ProducerInterceptorDemo2 implements ProducerInterceptor<String, String>{ @Override public void configure(Map<String, ?> configs) { } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { String prefixValue = record.value()+"prefix_2"; return new ProducerRecord<String, String>(record.topic(), prefixValue); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } }