zoukankan      html  css  js  c++  java
  • Spark-Streaming DirectKafka count 案例

    Spark-Streaming DirectKafka count 统计跟直接 kafka 统计类似,只不过这里使用的是 Direct 的方式,Direct方式使用的 kafka 低级API,不同的地方主要是在 createDirectStream这里。

    统计代码如下

    package com.hw.streaming
    
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    import scala.collection.mutable
    
    object DirectKafkaWordCount {
      def main(args: Array[String]): Unit = {
        if (args.length < 2) {
          System.err.println(s"""
                                |Usage: DirectKafkaWordCount <brokers> <topics>
                                |  <brokers> is a list of one or more Kafka brokers
                                |  <topics> is a list of one or more kafka topics to consume from
                                |
            """.stripMargin)
          System.exit(1)
        }
    
        val Array(brokers, topics) = args
    
        // Create context with 2 second batch interval
        val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(60))
    
        // Create direct kafka stream with brokers and topics
        val topicsSet = topics.split(",").toSet
    //    smallest和from beiginning是一样的
        val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
          "auto.offset.reset"->"smallest"
        )
    //    生成Dstream
        val messages = KafkaUtils
          .createDirectStream[String, String, StringDecoder, StringDecoder](
          ssc, kafkaParams, topicsSet)
    
        // Get the lines, split them into words, count the words and print
        val lines = messages.map(_._2)
        val words = lines.flatMap(_.split(",")(1))
        val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
        wordCounts.print()
    
        // 开始计算
        ssc.start()
        ssc.awaitTermination()
    
      }
    
    }
    

    启动相关的 flume,kafka,参见:

    https://www.cnblogs.com/hanwen1014/p/11260456.html

  • 相关阅读:
    Java
    Java
    Java
    Java
    Java
    Java
    Java
    Java
    JSON
    正则表达式
  • 原文地址:https://www.cnblogs.com/hanwen1014/p/11260477.html
Copyright © 2011-2022 走看看