需求
实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。
第一个拦截器,添加时间戳
public class TimeInterceptor implements ProducerInterceptor<String, String> { @Override public void configure(Map<String, ?> map) { } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return new ProducerRecord<>(record.topic(), record.partition(), record.key(), System.currentTimeMillis() + "," + record.value()); } @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { } @Override public void close() { } }
第二个拦截器,记录并打印成功失败的消息数
public class CounterInterceptor implements ProducerInterceptor<String, String> { int success; int error; @Override public void configure(Map<String, ?> map) { } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) { return producerRecord; } @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { if (recordMetadata != null) { success++; } else { error++; } } @Override public void close() { System.out.println("success: " + success); System.out.println("error: " + error); } }
生产者代码
public class InterceptorProducer { public static void main(String[] args) { Properties props = new Properties(); //只配置这三项,其他用默认配置 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //拦截器 ArrayList<String> interceptor = new ArrayList<>(); interceptor.add("com.atguigu.interceptor.TimeInterceptor"); interceptor.add("com.atguigu.interceptor.CounterInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptor); //生产者 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); //发送数据 for (int i = 0; i < 10; i++) { ProducerRecord<String,String> producerRecord = new ProducerRecord<>("first","atguigu" + i); kafkaProducer.send(producerRecord); } //关闭资源。上面发送的5条消息,既没有16k,也不到1毫秒,可能不会发送。关闭才发送送。 //会调用拦截器、分区器中的close方法 kafkaProducer.close(); } }