zoukankan      html  css  js  c++  java
  • sparkstreaming消费receive

    package two

    /**
    * Created by zhoucw on 上午2:11.
    */
    import java.util.HashMap

    import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}

    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.SparkConf

    /**
    * Consumes messages from one or more topics in Kafka and does wordcount.
    * Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
    * <zkQuorum> is a list of one or more zookeeper servers that make quorum
    * <group> is the name of kafka consumer group
    * <topics> is a list of one or more kafka topics to consume from
    * <numThreads> is the number of threads the kafka consumer should use
    *
    * Example:
    * `$ bin/run-example
    * org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03
    * my-consumer-group topic1,topic2 1`
    */
    object ReceiveKafkaWordCount {
    def main(args: Array[String]) {
    if (args.length < 4) {
    System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
    System.exit(1)
    }


    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[4]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L))
    .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
    }
    }


    // scalastyle:o

  • 相关阅读:
    Delphi公用函数单元
    Delphi XE5 for Android (十一)
    Delphi XE5 for Android (十)
    Delphi XE5 for Android (九)
    Delphi XE5 for Android (八)
    Delphi XE5 for Android (七)
    Delphi XE5 for Android (五)
    Delphi XE5 for Android (四)
    Delphi XE5 for Android (三)
    Delphi XE5 for Android (二)
  • 原文地址:https://www.cnblogs.com/heguoxiu/p/10149665.html
Copyright © 2011-2022 走看看