一、原生API
(一)生产者
1、生产者配置
@Data public class DemoProducer { private KafkaProducer<Integer, String> producer; public DemoProducer(){ Properties properties = new Properties(); properties.put("bootstrap.servers","192.168.206.131:9092,192.168.206.132:9092,192.168.206.133:9092"); properties.put("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer"); properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); this.producer = new KafkaProducer<Integer, String>(properties); } public Future<RecordMetadata> send(ProducerRecord<Integer, String> record) { return producer.send(record, (Callback)null); } }
2、发送消息
private final String topic = "cities"; public void sendMsg() throws Exception { DemoProducer producer = new DemoProducer(); int partition = 0; int key = 1; String cityName = "beijing"; //指定主题及消息 ProducerRecord<Integer,String> record = new ProducerRecord<>(topic,cityName); //指定主题、key、消息 //ProducerRecord<Integer,String> record = new ProducerRecord<>(topic,key,cityName); //指定主题、partition、key、消息 //ProducerRecord<Integer,String> record = new ProducerRecord<>(topic,partition,key,cityName); Future<RecordMetadata> future = producer.send(record); RecordMetadata recordMetadata = future.get(); log.info("=====================【{}】", recordMetadata.offset()); log.info("=====================【{}】", recordMetadata.partition()); log.info("=====================【{}】", recordMetadata.timestamp()); log.info("=====================【{}】", recordMetadata.topic()); }
3、发送消息测试类
@Test void sendTest() throws Exception{ NativeService nativeService = new NativeService(); nativeService.sendMsg(); }
(二)批量发送消息
1、配置生产者
@Data
public class DemoBatchProducer {
private KafkaProducer<Integer, String> producer;
public DemoBatchProducer(){
Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.206.131:9092,192.168.206.132:9092,192.168.206.133:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("batch.size",16384);
properties.put("linger",50);
this.producer = new KafkaProducer<Integer, String>(properties);
}
public Future<RecordMetadata> send(ProducerRecord<Integer, String> record) {
return producer.send(record, (Callback)null);
}
}
2、消息发送
public void sendMsg1() throws Exception {
DemoBatchProducer producer = new DemoBatchProducer();
int partition = 0;
int key = 1;
String cityName = "beijing";
for(int i=0; i<50; i++) {
ProducerRecord<Integer,String> record = new ProducerRecord<>(topic,cityName + i*1000);
Future<RecordMetadata> future = producer.send(record);
RecordMetadata recordMetadata = future.get();
log.info("【{}】=====================【{}】", i+1, recordMetadata.offset());
log.info("【{}】=====================【{}】", i+1, recordMetadata.partition());
log.info("【{}】=====================【{}】", i+1, recordMetadata.timestamp());
log.info("【{}】=====================【{}】", i+1, recordMetadata.topic());
}
}
3、测试类
@Test
void batchSendTest() throws Exception{
NativeService nativeService = new NativeService();
nativeService.sendMsg1();
}
(三)消费者
1、配置消费者
@Data public class DemoConsumer{ private KafkaConsumer<Integer, String> consumer; public DemoConsumer(){ Properties properties = new Properties(); properties.put("bootstrap.servers","192.168.206.131:9092,192.168.206.132:9092,192.168.206.133:9092"); properties.put("group.id", "mygroup1"); properties.put("enable.auto.commit", "true"); properties.put("max.poll.records", "500"); properties.put("auto.commit.interval,ms","1000"); properties.put("session.timeout.ms","30000"); properties.put("heartbeat.interval.ms","10000"); properties.put("auto.offset.reset","earliest"); properties.put("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer"); properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer<Integer, String>(properties); } }
2、具体的调用方法
public void autoDoWork(){ DemoConsumer consumer = new DemoConsumer(); consumer.getConsumer().subscribe(Collections.singletonList(topic)); ConsumerRecords<Integer, String> records = consumer.getConsumer().poll(1000); for (ConsumerRecord<Integer, String> consumerRecord: records) { log.info("============topic={},partition={},key={},value={}===============",consumerRecord.topic(), consumerRecord.partition(), consumerRecord.key(), consumerRecord.value()); } }
3、测试类
@Test void consumerTest() throws Exception{ NativeService nativeService = new NativeService(); nativeService.doWork(); }
(四)手动提交
上面的消费是自动提交offset的方式对broker中的消息进行消费的,但是自动提交可能出现消息重复消费的问题,所以在生产情况下,一般都对消息进行手动提交。
手动提交可以分为同步提交、异步提交和同异步联合提交三种情况。
无论是哪种情况,手动提交都需要修改消费者配置,需要设置自动提交标识为false,同时设置手动提交最大值。
@Data public class DemoConsumer{ private KafkaConsumer<Integer, String> consumer; public DemoConsumer(){ Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.206.131:9092,192.168.206.132:9092,192.168.206.133:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup1"); // properties.put("enable.auto.commit", "true"); properties.put("max.poll.records", "500"); properties.put("auto.commit.interval,ms","1000"); properties.put("session.timeout.ms","30000"); properties.put("heartbeat.interval.ms","10000"); properties.put("auto.offset.reset","earliest"); properties.put("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer"); properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); //设置是否手动提交及最大提交数 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,10); this.consumer = new KafkaConsumer<Integer, String>(properties); } }
1、同步提交
同步提交与自动提交就多了一次调用commitSync函数。
同步提交是消费者向broker提交offset后等待broker成功响应。若没有收到broker的响应,则会重新提交,直到获取到响应。但是在整个过程中,消费者是处于阻塞状态,这样就严重的影响了消费者的吞吐量。
public void syncDoWork(){ DemoConsumer consumer = new DemoConsumer(); consumer.getConsumer().subscribe(Collections.singletonList(topic)); ConsumerRecords<Integer, String> records = consumer.getConsumer().poll(1000); for (ConsumerRecord<Integer, String> consumerRecord: records) { log.info("============topic={},partition={},key={},value={}===============",consumerRecord.topic(), consumerRecord.partition(), consumerRecord.key(), consumerRecord.value()); consumer.getConsumer().commitSync(); } }
2、异步提交
由于同步提交操作会影响消费者的吞吐量,因此就有了异步提交。异步提交就是提交后不再等待broker的响应,直接开始做后续处理,提高了消费者的吞吐量。
异步提交与同步提交就是调用的提交方法改为commitAsync,该方法可以有回调函数,回调函数中可以对异常信息做判断和输出等操作。
public void asyncDoWork(){ DemoConsumer consumer = new DemoConsumer(); consumer.getConsumer().subscribe(Collections.singletonList(topic)); ConsumerRecords<Integer, String> records = consumer.getConsumer().poll(1000); for (ConsumerRecord<Integer, String> consumerRecord: records) { log.info("============topic={},partition={},key={},value={}===============",consumerRecord.topic(), consumerRecord.partition(), consumerRecord.key(), consumerRecord.value()); consumer.getConsumer().commitAsync((offsets, e) -> { if(e != null){ log.info("提交失败,offsets=【{}】,失败原因【{}】", offsets, e); } }); } }
3、同异步手动提交
上面提到得异步提交可能会造成重复消费,因此可以使用同异步手动提交的方式进行提交。
在同异步手动提交的情况下,如果出现提交失败,后续提交会将这次提交失败的offset提交,因此不会影响消费者的消费。
同异步手动提交与异步提交代码的唯一区别就是需要在回调函数中判断如果提交失败,则需要同步方式进行再次提交。
public void syncasyncDoWork(){ DemoConsumer consumer = new DemoConsumer(); consumer.getConsumer().subscribe(Collections.singletonList(topic)); ConsumerRecords<Integer, String> records = consumer.getConsumer().poll(1000); for (ConsumerRecord<Integer, String> consumerRecord: records) { log.info("============topic={},partition={},key={},value={}===============",consumerRecord.topic(), consumerRecord.partition(), consumerRecord.key(), consumerRecord.value()); consumer.getConsumer().commitAsync((offsets, e) -> { if(e != null){ log.info("提交失败,offsets=【{}】,失败原因【{}】", offsets, e); consumer.getConsumer().commitSync(); } }); } }
二、Spring Boot Kafka
使用SpringBoot与原生的API主要调整三点,分别是:producer和consumer等配置项直接放入配置文件、发送消息使用KafkaTemplate、消费者使用KafkaListener注解即可。
1、配置项
kafka: topic: cities spring: kafka: bootstrap-servers: 192.168.206.131:9092,192.168.206.132:9092,192.168.206.133:9092 producer: key-serializer: org.apache.kafka.common.serialization.IntegerSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer batch-size: 16384 consumer: group-id: mygroup1 enable-auto-commit: false key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 10
2、发送消息
@RestController @RequestMapping("/boot") @Slf4j public class BootProducerApi { @Autowired private KafkaTemplate kafkaTemplate; @Value("${kafka.topic}") private String topic; @GetMapping("/send") public void sendMsg() throws Exception { String cityName = "LY"; for(int i=0; i<50; i++) { ProducerRecord<Integer,String> record = new ProducerRecord<>(topic,cityName + i*1000); ListenableFuture<SendResult> future = kafkaTemplate.send(record); RecordMetadata recordMetadata = future.get().getRecordMetadata(); log.info("producer=======【{}】=======【{}】", i+1, recordMetadata.offset()); log.info("producer=======【{}】=======【{}】", i+1, recordMetadata.partition()); log.info("producer=======【{}】=======【{}】", i+1, recordMetadata.timestamp()); log.info("producer=======【{}】=======【{}】", i+1, recordMetadata.topic()); } } }
3、消费者
@Component @Slf4j public class BootConsumer { @KafkaListener(topics = "${kafka.topic}") public void onMsg(String msg){ log.info("consumer============msg=【{}】",msg); } }