案例:
自定义两个Producer 连接器,一个在消息上添加时间戳,一个统计消息成功失败个数(当然这两个拦截器可以写在一块,我们现在分两个拦截器来实现)。
1:Pom文件 添加如下代码.(slf4j 依赖 是为了去编译警告的。可以不加)
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> <scope>compile</scope> </dependency> </dependencies>
2:编写TimeInterceptor 拦截器
package com.Interceptor; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; public class TimeInterceptor implements ProducerInterceptor<String,String> { @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) { //读取 数据 String value = producerRecord.value(); value = System.currentTimeMillis() + value; //穿件一个新的ProducerRecord对象 return new ProducerRecord<String, String>(producerRecord.topic(),producerRecord.partition(),producerRecord.key(),value); } @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } }
3:编写成功统计拦截器
package com.Interceptor; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; public class CounterInterceptor implements ProducerInterceptor<String,String> { private int success; private int errors; @Override public ProducerRecord onSend(ProducerRecord producerRecord) { return producerRecord; } @Override public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { if(recordMetadata !=null) { success++; } else { errors++; } } @Override public void close() { System.out.println("Success : " + success); System.out.println("Errors : " + errors); } @Override public void configure(Map<String, ?> map) { } }
4:添加拦截器到生产者中.
package com.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.ArrayList; import java.util.Properties; public class InterceptorProducer { public static void main(String[] args) { //Create kafka 生产者配置信息 Properties properties = new Properties(); //kafka 集群, broker list properties.put("bootstrap.servers", "hadoop101:9092"); properties.put("acks", "all"); //重试次数 properties.put("retries", 1); //批次大小 properties.put("batch.size", 16384); //等待时间 properties.put("linger.ms", 1); //RecordAccumulator 缓冲区大小 32M properties.put("buffer.memory", 33554432); // key value 的序列化类 properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); //添加多个拦截器 ArrayList<String> interceptors= new ArrayList<String> (); interceptors.add("com.Interceptor.TimeInterceptor"); interceptors.add("com.Interceptor.CounterInterceptor"); properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors); //创建生产者对象 KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); //发送数据 for(int i = 11 ;i <= 20;i++) { producer.send(new ProducerRecord<String, String>("bbb","Kafkakafka--"+i)); } //关闭连接 producer.close(); } }
5:启动消费者
bin/kafka-console-consumer.sh --zookeeper hadoop101:2181 --topic bbb
6:启动生产者,查看运行结果.