1 import java.util.Properties 2 3 import org.apache.flink.api.common.serialization.SimpleStringSchema 4 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} 5 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer 6 7 object FlinkDemo05_KafkaSource { 8 9 val prop = new Properties() 10 prop.setProperty("bootstrap.servers", "linux01:9092") 11 prop.setProperty("group.id", "flink-grp") 12 prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 13 prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 14 prop.setProperty("auto.offset.reset", "latest") 15 16 def main(args: Array[String]): Unit = { 17 //1 创建环境 18 val env = StreamExecutionEnvironment.getExecutionEnvironment 19 //2 获取Stream 20 import org.apache.flink.api.scala._ 21 val topic = "flink-topic" 22 val schema = new SimpleStringSchema() 23 val dStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String](topic, schema, prop)) 24 //3 计算 25 val result = dStream.flatMap(_.split("\s")).filter(_.nonEmpty).map((_,1)).keyBy(0).sum(1) 26 result.print() 27 28 //4 执行 29 env.execute("kafka source job") 30 31 } 32 }