zoukankan      html  css  js  c++  java
  • Fink 写数据到kafka (kafka sink)

    POM 文件依赖:

        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.11</artifactId>
                <version>1.10.2</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.11</artifactId>
                <version>1.10.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
                <version>1.10.1</version>
            </dependency>
        </dependencies>

    Kafka sink代码(从 netcat中读取数据)

    package com.kpwong.sink
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
    
    object KafkaSinkTest {
      def main(args: Array[String]): Unit = {
    
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        val socketDS: DataStream[String] = env.socketTextStream("hadoop202",9999)
    
        socketDS.addSink(new FlinkKafkaProducer011[String]("hadoop202:9092","two",new SimpleStringSchema()))
        //消费者 读取kafka数据命令
        //bin/kafka-console-consumer.sh --zookeeper hadoop202:2181 --from-beginning --topic two
    
        env.execute("Kafka Sink Test")
      }
    }

    运行结果:

    输入数据:

     kafka 接受到的数据命令:

    bin/kafka-console-consumer.sh --zookeeper hadoop202:2181 --topic two

  • 相关阅读:
    Gym
    数学公式头文件
    除法取模(比赛常用)
    ACM-ICPC 2017 Asia Urumqi A. Coins【期望dp】
    P1494 小Z的袜子 【普通莫队】
    Codeforces Round #642 (Div. 3) E—K-periodic Garland dp
    luogu P4568 [JLOI2011]飞行路线 最短路Dijkstra+dp
    luogu P2015 二叉苹果树 树形dp
    luogu P1462 通往奥格瑞玛的道路 二分+spfa
    luogu P1879 [USACO06NOV]Corn Fields G 状态压缩dp
  • 原文地址:https://www.cnblogs.com/kpwong/p/14092079.html
Copyright © 2011-2022 走看看