消费者api,自动提交offset
public class MyConsumer {
public static void main(String[] args) {
Properties props = new Properties();
//连接的集群
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
//开启自动提交(消费偏移量)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//自动提交的延迟
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
//KV的反序列化类
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG,"gc");
//消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
//订阅主题
kafkaConsumer.subscribe(Collections.singletonList("first"));
while (true){
//获取数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
//解析数据
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key()+"-"+consumerRecord.value());
}
}
}
}
手动提交offset,同步提交
public class ConsumerOffsetSync {
public static void main(String[] args) {
Properties props = new Properties();
//连接的集群
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
//关闭自动提交(消费偏移量)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
//KV的反序列化类
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG,"gc1");
//重置offset。
//earliest:从头开始消费,触发的条件1,换组;条件2:保留的offset指向的数据已经不存在
//latest:默认值,消费最新的数据。
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
//订阅主题
kafkaConsumer.subscribe(Collections.singletonList("first"));
while (true){
//获取数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
//解析数据
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord.key()+"-"+consumerRecord.value());
}
//同步提交,当前线程会阻塞直到 offset 提交成功
kafkaConsumer.commitSync();
}
}
}
手动提交offset,异步提交
//异步提交
kafkaConsumer.commitAsync((offsets, exception) -> {
if (exception != null) {
System.err.println("Commit failed for" +
offsets);
}
});