zoukankan      html  css  js  c++  java
  • sparkstreaming消费receive

    package two

    /**
    * Created by zhoucw on 上午2:11.
    */
    import java.util.HashMap

    import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}

    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.SparkConf

    /**
    * Consumes messages from one or more topics in Kafka and does wordcount.
    * Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
    * <zkQuorum> is a list of one or more zookeeper servers that make quorum
    * <group> is the name of kafka consumer group
    * <topics> is a list of one or more kafka topics to consume from
    * <numThreads> is the number of threads the kafka consumer should use
    *
    * Example:
    * `$ bin/run-example
    * org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03
    * my-consumer-group topic1,topic2 1`
    */
    object ReceiveKafkaWordCount {
    def main(args: Array[String]) {
    if (args.length < 4) {
    System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
    System.exit(1)
    }


    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[4]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L))
    .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
    }
    }


    // scalastyle:o

  • 相关阅读:
    hdu 4115 石头剪子布(2-sat问题)
    AFNetWorking POST Multi-Part Request 上传图片
    左右c++与java中国的垃圾问题的分析与解决
    ACM核武器
    MAX2323E
    cocos2d-x 发动机分析:程序如何开始和结束?
    STL 源代码分析 算法 stl_heap.h
    Android 4.4(KitKat)表格管理子系统
    Swift
    Swift
  • 原文地址:https://www.cnblogs.com/heguoxiu/p/10149665.html
Copyright © 2011-2022 走看看