其实Kafka作为一款消息队列,本身的api并不多,也不复杂,本文列举出来生产者和消费者常见的api
public class KafkaTestProduer { public static final Logger logger = LoggerFactory.getLogger(KafkaTestProduer.class); public final static String kafkaServer = "10.37.136.172:9092"; public final static String topic = "mxz.test.202010311544"; public static Properties initConfig() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.CLIENT_ID_CONFIG, "mxz.client"); props.put(ProducerConfig.RETRIES_CONFIG, 3); return props; } public static Properties initConsumerConfig() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "mxz.client"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "202101051648"); props.put("auto.offset.reset", "earliest"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerinterceptorTTL.class.getName()); return props; } // public static void main(String[] args) { // // Properties props = initConfig(); // KafkaProducer<String, String> producer = new KafkaProducer<>(props); // // ProducerRecord<String, String> record = new ProducerRecord<>(topic, "batman"); // ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "superman"); // ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, "aquaman"); // ProducerRecord<String, String> record4 = new ProducerRecord<>(topic, "flash"); // ProducerRecord<String, String> record5 = new ProducerRecord<>(topic, "wonderwoman"); // List<ProducerRecord<String, String>> list = new ArrayList<>(); // list.add(record); // list.add(record2); // list.add(record3); // list.add(record4); // list.add(record5); // try { // for (ProducerRecord<String, String> recordItem : list) { // producer.send(recordItem, new CallbackImpl()); // } // // }finally { // producer.close(); // } // // } public static class CallbackImpl implements Callback { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { } else { System.out.print(metadata.topic() + "-" + metadata.partition() + "-" + metadata.offset()); } } } public static void main(String[] args) throws InterruptedException { logger.info("kafka consumer main begins {} "); Properties props = initConsumerConfig(); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList(topic), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { } }); // ConsumerRecords<String, String> records = consumer.poll(1000); // TopicPartition tp = new TopicPartition(topic, 0); // List<ConsumerRecord<String, String>> list = records.records(tp); // if (list.isEmpty()) { // return; // } // long offset = list.get(0).offset(); Set<TopicPartition> assignment = consumer.assignment(); while (assignment.size() == 0) { consumer.poll(100L); assignment = consumer.assignment(); } for (TopicPartition tpItem : assignment) { consumer.seek(tpItem, 19); } // Set<TopicPartition> assignment = new HashSet<>(); // while (assignment.size() == 0) { // consumer.poll(100L); // assignment = consumer.assignment(); // } // Map<TopicPartition, Long> timestampToSearch = new HashMap<>(); // for (TopicPartition tp : assignment) { // timestampToSearch.put(tp, System.currentTimeMillis() - 1000 * 1000); // } // // Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch); // for (TopicPartition tp : assignment) { // OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp); // if (offsetAndTimestamp != null) { // consumer.seek(tp, offsetAndTimestamp.offset()); //// consumer.poll(1000); // } // } // for (TopicPartition tp : assignment) { // consumer.seek(tp,10); // } ConsumerRecords<String, String> records = consumer.poll(1000); // // for (TopicPartition tp : records.partitions()) { // List<ConsumerRecord<String, String>> list = records.records(tp); // long lastConsumedOffset = list.get(list.size() - 1).offset() ; // System.out.println("lastConsumedOffset: " + lastConsumedOffset); // } // for (ConsumerRecord<String, String> record : records) { System.out.println(record.value()); } consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception == null) { logger.info("offset {}", offsets); logger.info("time {} ", System.currentTimeMillis()); System.out.println("TIME IS END"); } else { System.out.println(exception); logger.info("offset {}", offsets); logger.info("time {} ", System.currentTimeMillis()); } } }); Thread.sleep(30000); consumer.close(); } }
再加一个生产者指定partition的写法
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) { if (topic == null) throw new IllegalArgumentException("Topic cannot be null."); if (timestamp != null && timestamp < 0) throw new IllegalArgumentException( String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp)); if (partition != null && partition < 0) throw new IllegalArgumentException( String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition)); this.topic = topic; this.partition = partition; this.key = key; this.value = value; this.timestamp = timestamp; this.headers = new RecordHeaders(headers); }