zoukankan      html  css  js  c++  java
  • Windows下Flink读取Kafka

    代码:
    import java.util.Properties

    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

    //温度传感器读取样例类
    case class SensorReading(id: String, timestamp: Long, temperature: Double)

    object SourceTest {
    def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //1.从自定义的集合中读取数据
    import org.apache.flink.api.scala._
    // val stream1 = env.fromCollection(List(SensorReading("sensor_1", 1547718199, 35.80018327300259),
    // SensorReading("sensor_6", 1547718201, 15.402984393403084),
    // SensorReading("sensor_7", 1547718202, 6.720945201171228),
    // SensorReading("sensor_10", 1547718205, 38.101067604893444)
    // ))
    // stream1.print("stream1").setParallelism(1)
    // env.execute("source test")
    //
    // //2.从文件读取数据
    // val stream2 = env.readTextFile("sensor.txt")
    // stream2.print("stream2").setParallelism(1)
    // env.execute("source test")

    //3.以 kafka 消息队列的数据作为来源
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")

    val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
    stream3.print("stream3").setParallelism(1)
    env.execute("source test")
    }
    }

    1.启动zookerper

       

       2.启动Kafka

       

       3.启动Producer并发送数据

       

       4.结果

       

    有帮助的欢迎评论打赏哈,谢谢!

  • 相关阅读:
    Android之TabHost实现Tab切换
    银联支付SDK集成
    iOS 支付 [支付宝、银联、微信]
    MySQL数据库数据类型以及INT(M)的含义
    cherrypy
    使用PyMySQL操作mysql数据库
    面向新手的Web服务器搭建(一)——IIS的搭建
    SQLite3中自增主键相关知识总结,清零的方法、INTEGER PRIMARY KEY AUTOINCREMENT和rowid的使用
    FMDB-FMDatabaseQueue
    SQLite 数据类型
  • 原文地址:https://www.cnblogs.com/wddqy/p/12156527.html