zoukankan      html  css  js  c++  java
  • sparkstreaming消费direct

    package two

    /**
    * Created by on 上午2:19.
    */
    import kafka.serializer.StringDecoder

    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.SparkConf

    /**
    * Consumes messages from one or more topics in Kafka and does wordcount.
    * Usage: DirectKafkaWordCount <brokers> <topics>
    * <brokers> is a list of one or more Kafka brokers
    * <topics> is a list of one or more kafka topics to consume from
    *
    * Example:
    * $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port
    * topic1,topic2
    */
    object DirectKafkaWordCount {
    def main(args: Array[String]) {
    if (args.length < 2) {
    System.err.println(s"""
    |Usage: DirectKafkaWordCount <brokers> <topics>
    | <brokers> is a list of one or more Kafka brokers
    | <topics> is a list of one or more kafka topics to consume from
    |
    """.stripMargin)
    System.exit(1)
    }


    val Array(brokers, topics) = args

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topicsSet)

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
    }
    }
    // scalastyle:on println

  • 相关阅读:
    杨中科 向HtmlAgilityPack道歉:解析HTML还是你好用
    感觉这个JQuery不错,查询方便
    数据库异步操作
    Command 设计模式
    osg 细节裁剪 SAMLL_FEATURE_CULLING
    errno错误代码
    清空std::stringstream
    eclipse android javabuilder +CDTbuilder
    mfc c++ system调用 控制台窗口
    Androidndkr8e wordlist 第二个参数不是数值参数
  • 原文地址:https://www.cnblogs.com/heguoxiu/p/10149677.html
Copyright © 2011-2022 走看看