zoukankan      html  css  js  c++  java
  • Flink 读取 Kafka数据

    POM

        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.11</artifactId>
                <version>1.10.2</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.11</artifactId>
                <version>1.10.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
                <version>1.10.1</version>
            </dependency>
        </dependencies>

    源码: 

    package com.kpwong.aiptest
    
    import java.util.Properties
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.api.scala._
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
    
    object KafkaTest {
    
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        //从kafka读取数据
        val prob: Properties = new Properties()
        prob.setProperty("bootstrap.servers","hadoop202:9092")
        //bin/kafka-console-producer.sh --broker-list hadoop202:9092 --topic two 发送数据命令
        val kafkaDS: DataStream[String] = env.addSource( new FlinkKafkaConsumer011[String]("two",new SimpleStringSchema(),prob))
    
        kafkaDS.print()
        env.execute()
    
      }
    
    }

    Kafka发送数据:

    //bin/kafka-console-producer.sh --broker-list hadoop202:9092 --topic two 

    运行结果:

  • 相关阅读:
    真正的e时代
    在线手册
    UVA 10616 Divisible Group Sums
    UVA 10721 Bar Codes
    UVA 10205 Stack 'em Up
    UVA 10247 Complete Tree Labeling
    UVA 10081 Tight Words
    UVA 11125 Arrange Some Marbles
    UVA 10128 Queue
    UVA 10912 Simple Minded Hashing
  • 原文地址:https://www.cnblogs.com/kpwong/p/14089801.html
Copyright © 2011-2022 走看看