既然翻到这里,默认就认为已经基本掌握了Kafka的基础知识,本小结主要给出一次使用Kafka Consumer API的示例。我们都知道Kafka API有旧版(0.8版之前)和新版(0.9版之后),这里讲的是新版,官网KafkaConsumer有更详细介绍,可自行前往~
1 环境配置
- 操作系统: Ubuntu 16.04
- kafka_2.11-0.10.2.2
- JDK: 1.8.0_181
- IntelliJ IDEA Maven
- VNC
2 操作过程
pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kafka.test</groupId>
<artifactId>test</artifactId>
<version>1.0-SNAPSHOT</version>
<name>maven-kafka</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.2</version>
</dependency>
<dependency>
<groupId><org.apache.kafka/groupId>
<artifactId>kafka_2.11</artifactId>
<version><0.10.2.2/version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.2.6</version>
</dependency>
</dependencies>
</project>
kafka consumer消费数据,未SASL认证,这里的代码只能消费生产者正在推送的数据:
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
public class KafkaConsumerTest {
public static void main(String[] args) throws Exception {
// Kafka consumer configuration settings
String topicName = "XXXX";
Properties props = new Properties();
props.put("bootstrap.servers", "make.kafka.com:9092,make.kafka.com:9093,make.kafka.com:9094");
props.put("group.id", "XXXX");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("offsets.storage", "kafka");
// 要发送自定义对象,需要指定对象的反序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
@SuppressWarnings("resource")
final KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props);
consumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords<String, Object> records = consumer.poll(100);
for (ConsumerRecord<String, Object> record : records) {
System.out.println(record.value());
}
}
}
}
kafka consumer消费数据,SASL认证,可以消费历史数据:
import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
public class KafkaConsumerTest {
public static void main(String[] args) throws Exception {
String topicName = "XXXX";
Properties props = new Properties();
System.setProperty("java.security.auth.login.config", "/opt/kafka/kafka1/kafka_2.11-0.10.2.2/config/kafka_client_jaas.conf"); // 环境变量添加,需要输入配置文件的路径
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("bootstrap.servers", "make.kafka.com:9092,make.kafka.com:9093,make.kafka.com:9094");
props.put("group.id", "XXXX");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("offsets.storage", "kafka");
props.put("max.poll.records",1000);
// 要发送自定义对象,需要指定对象的反序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
@SuppressWarnings("resource")
final KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props);
consumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
Map<TopicPartition,Long> beginningOffset = consumer.beginningOffsets(collection);
//--from-beginning
for(Map.Entry<TopicPartition,Long> entry : beginningOffset.entrySet()){
//seekToBeginning()
consumer.seekToBeginning(collection);
}
}
});
while (true) {
ConsumerRecords<String, Object> records = consumer.poll(20000);
for (ConsumerRecord<String, Object> record : records) {
System.out.println(record.value());
}
}
}
}
另外,选择把kafka作为消息的中间件,主要是拿到数据持久化到本地或者HDFS待分析挖掘出重要的信息,可以使用Sparkstreaming存到HDFS,这里给出从控制台信息存到了本地磁盘。
import java.io.*;
public class IO2File {
public static void main(String[] args) throws IOException{
File f = new File("out.json");
f.createNewFile();
FileOutputStream fileOutputStream = new FileOutputStream(f);
PrintStream printStream = new PrintStream(fileOutputStream);
System.setOut(printStream);
System.out.println("xxxxxxx out.json");// 结合上面的代码 直接把kafka消息的信息打印到控制台 然后存到磁盘
}
}