zoukankan      html  css  js  c++  java
  • Fink 写数据到kafka (kafka sink)

    POM 文件依赖:

        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.11</artifactId>
                <version>1.10.2</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.11</artifactId>
                <version>1.10.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
                <version>1.10.1</version>
            </dependency>
        </dependencies>

    Kafka sink代码(从 netcat中读取数据)

    package com.kpwong.sink
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
    
    object KafkaSinkTest {
      def main(args: Array[String]): Unit = {
    
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        val socketDS: DataStream[String] = env.socketTextStream("hadoop202",9999)
    
        socketDS.addSink(new FlinkKafkaProducer011[String]("hadoop202:9092","two",new SimpleStringSchema()))
        //消费者 读取kafka数据命令
        //bin/kafka-console-consumer.sh --zookeeper hadoop202:2181 --from-beginning --topic two
    
        env.execute("Kafka Sink Test")
      }
    }

    运行结果:

    输入数据:

     kafka 接受到的数据命令:

    bin/kafka-console-consumer.sh --zookeeper hadoop202:2181 --topic two

  • 相关阅读:
    Android中的进程
    简单解析三种JAVA调用方式-同步,异步,回调
    Android BroadCastReceiver介绍
    Android 消息处理机制-Looper,Handler,MessageQueue
    Android onPause 和onSaveInstanceState
    Android finish后没有执行 onDestory()
    自定义Linearlayout
    python学习笔记一
    笔试题目汇总
    互联网_http协议
  • 原文地址:https://www.cnblogs.com/kpwong/p/14092079.html
Copyright © 2011-2022 走看看