zoukankan      html  css  js  c++  java
  • flink kafka source

     1 import java.util.Properties
     2 
     3 import org.apache.flink.api.common.serialization.SimpleStringSchema
     4 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
     5 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
     6 
     7 object FlinkDemo05_KafkaSource {
     8     
     9     val prop = new Properties()
    10     prop.setProperty("bootstrap.servers", "linux01:9092")
    11     prop.setProperty("group.id", "flink-grp")
    12     prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    13     prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    14     prop.setProperty("auto.offset.reset", "latest")
    15     
    16     def main(args: Array[String]): Unit = {
    17         //1 创建环境
    18         val env = StreamExecutionEnvironment.getExecutionEnvironment
    19         //2 获取Stream
    20         import org.apache.flink.api.scala._
    21         val topic = "flink-topic"
    22         val schema = new SimpleStringSchema()
    23         val dStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String](topic, schema, prop))
    24         //3 计算
    25         val result = dStream.flatMap(_.split("\s")).filter(_.nonEmpty).map((_,1)).keyBy(0).sum(1)
    26         result.print()
    27         
    28         //4 执行
    29         env.execute("kafka source job")
    30         
    31     }
    32 }
    View Code
  • 相关阅读:
    大数据项目实战之在线教育(01数仓需求)
    大数据项目实战之在线教育(02数仓实现)
    大数据之flink教程-TableAPI和SQL
    尚硅谷大数据技术之电商用户行为数据分析
    大数据实时项目(采集部分)
    大数据实时项目(交易额)
    作业一
    预备作业
    重建二叉树
    矩形覆盖
  • 原文地址:https://www.cnblogs.com/xiefeichn/p/13174975.html
Copyright © 2011-2022 走看看