/**
*重置kafka消费进度
*参数中需要指定kafka集群中一台broker地址,要重置的topic名称,消费组,以及partition个数
*/
public static void seekLatest(String broker, String topic, String group, int partitionCnt){
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, group);
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
configProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,100);
configProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
DefaultKafkaConsumerFactory<Long, byte[]> factory = new DefaultKafkaConsumerFactory<>(configProps, new LongDeserializer(), new ByteArrayDeserializer());
org.apache.kafka.clients.consumer.KafkaConsumer<Long, byte[]> consumer = (org.apache.kafka.clients.consumer.KafkaConsumer<Long, byte[]>) factory.createConsumer();
consumer.subscribe(Arrays.asList(topic));
List<TopicPartition> topicPartitionLists = new ArrayList<>();
for(int i = 0 ; i < partitionCnt; i ++){
topicPartitionLists.add(new TopicPartition(topic, i));
}
consumer.poll(1);
Map<TopicPartition, Long> topicPartitionLongMap = consumer.endOffsets(topicPartitionLists);
for(TopicPartition tp : topicPartitionLongMap.keySet()){
consumer.seek(tp, topicPartitionLongMap.get(tp));
}
consumer.close();
}