zoukankan      html  css  js  c++  java
  • sparkstreaming消费receive

    package two

    /**
    * Created by zhoucw on 上午2:11.
    */
    import java.util.HashMap

    import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}

    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.SparkConf

    /**
    * Consumes messages from one or more topics in Kafka and does wordcount.
    * Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>
    * <zkQuorum> is a list of one or more zookeeper servers that make quorum
    * <group> is the name of kafka consumer group
    * <topics> is a list of one or more kafka topics to consume from
    * <numThreads> is the number of threads the kafka consumer should use
    *
    * Example:
    * `$ bin/run-example
    * org.apache.spark.examples.streaming.KafkaWordCount zoo01,zoo02,zoo03
    * my-consumer-group topic1,topic2 1`
    */
    object ReceiveKafkaWordCount {
    def main(args: Array[String]) {
    if (args.length < 4) {
    System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
    System.exit(1)
    }


    val Array(zkQuorum, group, topics, numThreads) = args
    val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[4]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint("checkpoint")

    val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L))
    .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()
    }
    }


    // scalastyle:o

  • 相关阅读:
    如何根据工单状态判断订单状态
    jquery移除click事件
    mysql字符串相关函数(并与sql server对比)
    bootstrap-table api
    js调用jquery的function函数
    Bootstrap table设置th,td内容水平、垂直居中
    Error:java: 无法访问javax.servlet.Servlet 找不到javax.servlet.Servlet的类文件
    html常用表单元素,按钮
    点击重置按钮,清空表单所有内容
    bootstrap table设置列宽
  • 原文地址:https://www.cnblogs.com/heguoxiu/p/10149665.html
Copyright © 2011-2022 走看看