启动kafka和flink
1、进入zookeeper的bin目录下启动zookeeper
./zkServer.sh start
2、进入kafka的bin目录下启动kafka
/kafka-server-start.sh -daemon /opt/module/kafka-0.11/config/server.properties
3、进入flink的bin目录下启动flink
./start-cluster.sh
kafka启动生产者
kafka主题为sensor
./bin/kafka-console-producer.sh --broker-list 192.168.158.202:90992 --topic sensor
添加pom依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.12</artifactId> <version>1.10.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency>
执行
Java代码如下
package com.test.apitest.souceTest; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import java.util.Properties; public class SourceTest02_kafka { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // kafka配置项 Properties properties = new Properties(); properties.setProperty("bootstrap.servers","192.168.153.202:9092"); properties.setProperty("group.id", "consumer-group"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest"); // 从kafka中读取数据 DataStreamSource<String> sensor = env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties)); sensor.print(); //执行任务 env.execute(); } }