zoukankan      html  css  js  c++  java
  • Spark 学习笔记之 Streaming Window

    Streaming Window:

    上图意思:每隔2秒统计前3秒的数据

    slideDuration: 2

    windowDuration: 3

    例子:

    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    
    object WindowStreaming {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("KafkaDirect").setMaster("local[1]")
        val ssc = new StreamingContext(conf, Seconds(1))
        val kafkaMapParams = Map[String, Object](
          "bootstrap.servers" -> "192.168.1.151:9092,192.168.1.152:9092,192.168.1.153:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "g1",
          "auto.offset.reset" -> "latest", //earliest|latest
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
        val topicsSet = Set("ScalaTopic")
        val kafkaStream = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          Subscribe[String, String](topicsSet, kafkaMapParams)
        )
    
        val finalResultRDD: DStream[(Int, String)] = kafkaStream.flatMap(row => row.value().split(" "))
          .map((_, 1)).reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(3), Seconds(2))
          .transform(rdd => rdd.map(tuple => (tuple._2, tuple._1))
            .sortByKey(false).map(tuple => (tuple._1, tuple._2))
          )
    
        finalResultRDD.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    
    
    }
    

    运行结果:

  • 相关阅读:
    Linux阶段总结
    Java基础单词总结
    毕业设计
    tips: ubuntu apt sources.list 设置
    Java编程从0到1系列 目录
    EnvironmentError: mysql_config not found
    pyenv 2.7 环境安装MySQL-python ERROR
    dtd语法规则
    HashMap和HashTable之间的区别
    Vector(同步)和ArrayList(异步)异同
  • 原文地址:https://www.cnblogs.com/AK47Sonic/p/8052451.html
Copyright © 2011-2022 走看看