Maven pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.jj</groupId> <artifactId>ak02</artifactId> <version>1.0.0</version> <properties> <kafka.version>1.1.0</kafka.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> <scope>compile</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
BasicProducer.java
package com.jj.ak02; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class BasicProducer { public static void main(String[] args) { // 步驟1. 設定要連線到Kafka集群的相關設定 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // Kafka集群在那裡? props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定msgKey的序列化器 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定msgValue的序列化器 // 步驟2. 產生一個Kafka的Producer的實例 Producer<String, String> producer = new KafkaProducer<>(props); // 步驟3. 指定想要發佈訊息的topic名稱 String topicName = "test"; int msgCounter = 0; try { System.out.println("Start sending messages ..."); // 步驟4. 產生要發佈到Kafka的訊息 (把訊息封裝進一個ProducerRecord的實例中) // - 參數#1: topicName // - 參數#2: msgKey // - 參數#3: msgValue producer.send(new ProducerRecord<>(topicName, null, "Hello")); producer.send(new ProducerRecord<>(topicName, null, "Hello2")); producer.send(new ProducerRecord<>(topicName, "8703147", "Hello3")); producer.send(new ProducerRecord<>(topicName, "8703147", "Hello4")); msgCounter+=4; System.out.println("Send " + msgCounter + " messages to Kafka"); } catch (Exception e) { // 錯誤處理 e.printStackTrace(); } // 步驟5. 關掉Producer實例的連線 producer.close(); System.out.println("Message sending completed!"); } }
BasicConsumer.java
package com.wistron.witlab4.ak02; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.record.TimestampType; import java.util.Arrays; import java.util.Properties; public class BasicConsumer { public static void main(String[] args) { // 步驟1. 設定要連線到Kafka集群的相關設定 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // Kafka集群在那裡? props.put("group.id", "my-group"); // <-- 這就是ConsumerGroup props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 指定msgKey的反序列化器 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 指定msgValue的反序列化器 props.put("auto.offset.reset", "earliest"); // 是否從這個ConsumerGroup尚未讀取的partition/offset開始讀 // 步驟2. 產生一個Kafka的Consumer的實例 Consumer<String, String> consumer = new KafkaConsumer<>(props); // 步驟3. 指定想要訂閱訊息的topic名稱 String topicName = "test"; // 步驟4. 讓Consumer向Kafka集群訂閱指定的topic consumer.subscribe(Arrays.asList(topicName)); // 步驟5. 持續的拉取Kafka有進來的訊息 try { System.out.println("Start listen incoming messages ..."); while (true) { // 請求Kafka把新的訊息吐出來 ConsumerRecords<String, String> records = consumer.poll(1000); // 如果有任何新的訊息就會進到下面的迭代 for (ConsumerRecord<String, String> record : records){ // ** 在這裡進行商業邏輯與訊息處理 ** // 取出相關的metadata String topic = record.topic(); int partition = record.partition(); long offset = record.offset(); TimestampType timestampType = record.timestampType(); long timestamp = record.timestamp(); // 取出msgKey與msgValue String msgKey = record.key(); String msgValue = record.value(); // 秀出metadata與msgKey & msgValue訊息 System.out.println(topic + "-" + partition + "-" + offset + " : (" + record.key() + ", " + record.value() + ")"); } } } finally { // 步驟6. 如果收到結束程式的訊號時關掉Consumer實例的連線 consumer.close(); System.out.println("Stop listen incoming messages"); } } }