本文讲述:本地 Flink 1.7.0 (Java SDK) 读取本地 Kafka 数据,不做任何处理直接打印输出到控制台
环境:win10 + WSL
0. 下载 Flink 及 Kafka 并解压
步骤略过
1. 启动 Kafka 并创建 topic
以下命令都在解压后的 Kafka 文件夹内执行
1.1 启动 Kafka
启动 zookeeper
./bin/zookeeper-server-start.sh config/zookeeper.properties
启动 kafka 服务
./bin/kafka-server-start.sh config/server.properties
1.2 创建 topic
./bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
1.3 启动生产者和消费者
启动生产者
./bin/kafka-console-producer.sh --topic topic2 --broker-list localhost:9092
启动消费者
./bin/kafka-console-consumer.sh --topic topic2 --from-beginning --bootstrap-server localhost:9092
此时,在生产者窗口中输入任意字符,即可在消费者窗口中看到相应的输出
2. 启动 flink
进入解压后的 flink 文件夹内
./bin/start-cluster.sh
3. 使用 JAVA 中读取 Kafka 内容
代码如下:
package com.xjr7670;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("group.id", "g2"); // 第 1 个参数是固定值 group.id,第 2 个参数是自定义的组 ID
DeserializationSchema<String> deserializationSchema = new SimpleStringSchema();
String topic = "topic2";
DataStream<String> text = env.addSource(new FlinkKafkaConsumer011<String>(topic, deserializationSchema, properties));
text.print();
env.execute("Flink-Kafka demo");
}
}
POM 依赖配置略过。以上代码可直接执行,执行后,在 Kafka 生产者窗口中输入消息,即可在代码输出窗口中看到同样的消息输出
如果执行时遇到报错:
java.lang.ClassNotFoundException: org.apache.flink.api.common.serialization.DeserializationSchema
需要添加 flink/lib 下的包到项目内