kafka 在 0.10 版本引入和拦截器机制。一共有两种:Kafka Producer端的拦截器和Kafka Consumer端的拦截器。本文讲述的是Kafka Producer端的拦截器,它主要用来在消息发出之前对消息进行拦截或者修改,也可以用于Producer的Callback回调之前进行相应的预处理。
实现ProducerInterceptor 接口就可以在producer回调。ProducerInterceptor 里面主要有4个方法。
1、ProducerRecord<k, v> onSend(ProducerRecord<k, v> record):Producer在将消息序列化和分配分区之前会调用拦截器的这个方法来对消息进行相应的操作。
2、void onAcknowledgement(RecordMetadata metadata, Exception exception):在消息被应答(Acknowledgement)之前或者消息发送失败时调用,优先于用户设定的Callback之前执行。这个方法运行在Producer的IO线程中,所以这个方法里实现的代码逻辑越简单越好,否则会影响消息的发送速率。
3、void close():关闭当前的拦截器,此方法主要用于执行一些资源的清理工作
4、configure(Map<string, ?> configs):用来初始化此类的方法,这个是ProducerInterceptor接口的父接口Configurable中的方法。
一般情况下只需要关注并实现onSend或者onAcknowledgement方法即可。下面我们来举个案例,通过onSend方法来过滤消息体为空的消息以及通过onAcknowledgement方法来计算发送消息的成功率