zoukankan      html  css  js  c++  java
  • Fink 写数据到kafka (kafka sink)

    POM 文件依赖:

        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.11</artifactId>
                <version>1.10.2</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.11</artifactId>
                <version>1.10.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
                <version>1.10.1</version>
            </dependency>
        </dependencies>

    Kafka sink代码(从 netcat中读取数据)

    package com.kpwong.sink
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
    
    object KafkaSinkTest {
      def main(args: Array[String]): Unit = {
    
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        val socketDS: DataStream[String] = env.socketTextStream("hadoop202",9999)
    
        socketDS.addSink(new FlinkKafkaProducer011[String]("hadoop202:9092","two",new SimpleStringSchema()))
        //消费者 读取kafka数据命令
        //bin/kafka-console-consumer.sh --zookeeper hadoop202:2181 --from-beginning --topic two
    
        env.execute("Kafka Sink Test")
      }
    }

    运行结果:

    输入数据:

     kafka 接受到的数据命令:

    bin/kafka-console-consumer.sh --zookeeper hadoop202:2181 --topic two

  • 相关阅读:
    3、Less-计算
    2、Less-混合
    1、Less-初见
    5、反射-动态代理
    4、反射-类的构造器:Constrctor
    3、反射-Field
    2、反射-Method&父类
    1、反射-Class&ClassLoader
    5、URLConnection(3)
    Linux进程状态查询
  • 原文地址:https://www.cnblogs.com/kpwong/p/14092079.html
Copyright © 2011-2022 走看看