1. 引入依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.1</version> </dependency>
2. 生产者
package org.study.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.HashMap; import java.util.Map; /** * 生产者 */ public class ProducerSample { public static void main(String[] args) { Map<String, Object> props = new HashMap<>(); //zookeeper的地址 props.put("zk.connect", "127.0.0.1:2181"); //用于建立与 kafka 集群连接的 host/port 组 props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); String topic = "test-topic"; Producer<String, String> producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<String, String>(topic,"idea-key2","java-message 1")); producer.send(new ProducerRecord<String, String>(topic,"idea-key2","java-message 2")); producer.send(new ProducerRecord<String, String>(topic,"idea-key2","java-message 3")); producer.close(); } }
3. 消费者
package org.study.kafka; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; /** * 消费者 */ public class ConsumerSample { public static void main(String[] args) { String topic = "test-topic";// topic name Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092");//用于建立与 kafka 集群连接的 host/port 组。 props.put("group.id", "testGroup1");// Consumer Group Name props.put("enable.auto.commit", "true");// Consumer 的 offset 是否自动提交 props.put("auto.commit.interval.ms", "1000");// 自动提交 offset 到 zookeeper 的时间间隔,时间是毫秒 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Consumer<String, String> consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value()); } } }