zoukankan      html  css  js  c++  java
  • Windows下Flink读取Kafka

    代码:
    import java.util.Properties

    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

    //温度传感器读取样例类
    case class SensorReading(id: String, timestamp: Long, temperature: Double)

    object SourceTest {
    def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //1.从自定义的集合中读取数据
    import org.apache.flink.api.scala._
    // val stream1 = env.fromCollection(List(SensorReading("sensor_1", 1547718199, 35.80018327300259),
    // SensorReading("sensor_6", 1547718201, 15.402984393403084),
    // SensorReading("sensor_7", 1547718202, 6.720945201171228),
    // SensorReading("sensor_10", 1547718205, 38.101067604893444)
    // ))
    // stream1.print("stream1").setParallelism(1)
    // env.execute("source test")
    //
    // //2.从文件读取数据
    // val stream2 = env.readTextFile("sensor.txt")
    // stream2.print("stream2").setParallelism(1)
    // env.execute("source test")

    //3.以 kafka 消息队列的数据作为来源
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")

    val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
    stream3.print("stream3").setParallelism(1)
    env.execute("source test")
    }
    }

    1.启动zookerper

       

       2.启动Kafka

       

       3.启动Producer并发送数据

       

       4.结果

       

    有帮助的欢迎评论打赏哈,谢谢!

  • 相关阅读:
    第四篇:new和delete的基本用法
    第三篇:C++ 中的几种初始化
    第七篇:使用 CUDA 进行计算优化的两种思路
    第六篇:二维数组的传输 (host <-> device)
    poj 2762(强连通+判断链)
    poj 3352(边双连通分量)
    poj 3228(二分+最大流)
    poj 3522(最小生成树应用)
    poj 2349(最小生成树应用)
    poj 1733(带权并查集+离散化)
  • 原文地址:https://www.cnblogs.com/wddqy/p/12156527.html
Copyright © 2011-2022 走看看