zoukankan      html  css  js  c++  java
  • sparkstreaming消费direct

    package two

    /**
    * Created by on 上午2:19.
    */
    import kafka.serializer.StringDecoder

    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.SparkConf

    /**
    * Consumes messages from one or more topics in Kafka and does wordcount.
    * Usage: DirectKafkaWordCount <brokers> <topics>
    * <brokers> is a list of one or more Kafka brokers
    * <topics> is a list of one or more kafka topics to consume from
    *
    * Example:
    * $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port
    * topic1,topic2
    */
    object DirectKafkaWordCount {
    def main(args: Array[String]) {
    if (args.length < 2) {
    System.err.println(s"""
    |Usage: DirectKafkaWordCount <brokers> <topics>
    | <brokers> is a list of one or more Kafka brokers
    | <topics> is a list of one or more kafka topics to consume from
    |
    """.stripMargin)
    System.exit(1)
    }


    val Array(brokers, topics) = args

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topicsSet)

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_._2)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
    }
    }
    // scalastyle:on println

  • 相关阅读:
    如何看待和选择基础设施软件
    target vs currentTarget, clientWidth vs offsetWidth
    undefined
    C# and android and socket
    hdu 4781 Assignment For Princess (2013ACMICPC 成都站 A)
    [置顶] Jquery中DOM操作(详细)
    hdu 4786 Fibonacci Tree (2013ACMICPC 成都站 F)
    android开发教程(八)——环境搭建之java-ndk
    cloudstack4.2+xenserver6.0.2 详细配置攻略
    11道php面试题
  • 原文地址:https://www.cnblogs.com/heguoxiu/p/10149677.html
Copyright © 2011-2022 走看看