1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <groupId>com.ht</groupId> 8 <artifactId>kafkatest</artifactId> 9 <version>1.0-SNAPSHOT</version> 10 <build> 11 <plugins> 12 <plugin> 13 <groupId>org.apache.maven.plugins</groupId> 14 <artifactId>maven-compiler-plugin</artifactId> 15 <configuration> 16 <source>1.7</source> 17 <target>1.7</target> 18 </configuration> 19 </plugin> 20 </plugins> 21 </build> 22 23 24 <dependencies> 25 <dependency> 26 <groupId>org.apache.kafka</groupId> 27 <artifactId>kafka-clients</artifactId> 28 <version>0.10.0.0</version> 29 </dependency> 30 </dependencies> 31 </project>
java 代码
1 import org.apache.kafka.clients.CommonClientConfigs; 2 import org.apache.kafka.clients.consumer.ConsumerRecord; 3 import org.apache.kafka.clients.consumer.ConsumerRecords; 4 import org.apache.kafka.clients.consumer.KafkaConsumer; 5 6 import java.util.Collections; 7 import java.util.Properties; 8 9 import static org.apache.kafka.clients.consumer.ConsumerConfig.*; 10 11 /** 12 * @author sunzq 13 * @since 2017/8/29 14 */ 15 public class Application { 16 public static void main(String[] args) { 17 18 Properties props = new Properties(); 19 props.put(BOOTSTRAP_SERVERS_CONFIG, "node1:6667,node2:6667,node3:6667,node4:6667"); 20 props.put(ENABLE_AUTO_COMMIT_CONFIG, "true"); 21 props.put(GROUP_ID_CONFIG, "test08291103"); 22 // props.put(ConsumerConfig.CLIENT_ID_CONFIG, "test0829"); 23 props.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); 24 props.put(AUTO_OFFSET_RESET_CONFIG, "earliest"); 25 props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 26 props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); 27 props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); 28 29 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 30 // topic name: test9 31 consumer.subscribe(Collections.singleton("test9")); 32 while (true) { 33 ConsumerRecords<String, String> records = consumer.poll(100); 34 for (ConsumerRecord<String, String> record : records) 35 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); 36 } 37 } 38 }
启动参数
-Djava.security.krb5.conf=c:\app\conf\krb5.conf -Djava.security.auth.login.config=c:\app\conf\kafka_jaas.conf
windows 下记得用 \