zoukankan      html  css  js  c++  java
  • Flink 读取 Kafka数据

    POM

        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.11</artifactId>
                <version>1.10.2</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.11</artifactId>
                <version>1.10.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
                <version>1.10.1</version>
            </dependency>
        </dependencies>

    源码: 

    package com.kpwong.aiptest
    
    import java.util.Properties
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.api.scala._
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
    
    object KafkaTest {
    
      def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
        //从kafka读取数据
        val prob: Properties = new Properties()
        prob.setProperty("bootstrap.servers","hadoop202:9092")
        //bin/kafka-console-producer.sh --broker-list hadoop202:9092 --topic two 发送数据命令
        val kafkaDS: DataStream[String] = env.addSource( new FlinkKafkaConsumer011[String]("two",new SimpleStringSchema(),prob))
    
        kafkaDS.print()
        env.execute()
    
      }
    
    }

    Kafka发送数据:

    //bin/kafka-console-producer.sh --broker-list hadoop202:9092 --topic two 

    运行结果:

  • 相关阅读:
    美团面试准备
    SSM实战项目——Java高并发秒杀API
    接口和抽象类有什么区别
    Java中static、final、static final的区别
    多线程面试题
    idea新建maven项目没有src目录
    聊聊MyBatis缓存机制
    Java 8系列之重新认识HashMap
    数据库SQL实战练习
    牛客网刷题(一)
  • 原文地址:https://www.cnblogs.com/kpwong/p/14089801.html
Copyright © 2011-2022 走看看