zoukankan      html  css  js  c++  java
  • flink的Kafka数据源代码样例

     1 val properties = new Properties()
     2 properties.setProperty("bootstrap.servers", "localhost:9092")
     3 properties.setProperty("group.id", "consumer-group")
     4 properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
     5 properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
     6 properties.setProperty("auto.offset.reset", "latest")
     7 val env = StreamExecutionEnvironment.getExecutionEnvironment
     8 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     9 env.setParallelism(1)
    10 val stream = env
    11   // source为来自Kafka的数据,这里我们实例化一个消费者,topic为hotitems
    12   .addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(), properties))
  • 相关阅读:
    算法复习:字符串
    【第五天打卡。
    【第四天打卡。
    【第三天打卡。
    第二天打卡。
    【唉
    配环境到崩溃系列
    所谓环境……
    【随便吐槽
    第四天。打卡。【偷懒了两天hhhh
  • 原文地址:https://www.cnblogs.com/sunpengblog/p/11794497.html
Copyright © 2011-2022 走看看