zoukankan      html  css  js  c++  java
  • Spark 学习笔记之 Streaming Window

    Streaming Window:

    上图意思:每隔2秒统计前3秒的数据

    slideDuration: 2

    windowDuration: 3

    例子:

    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    
    object WindowStreaming {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("KafkaDirect").setMaster("local[1]")
        val ssc = new StreamingContext(conf, Seconds(1))
        val kafkaMapParams = Map[String, Object](
          "bootstrap.servers" -> "192.168.1.151:9092,192.168.1.152:9092,192.168.1.153:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "g1",
          "auto.offset.reset" -> "latest", //earliest|latest
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
        val topicsSet = Set("ScalaTopic")
        val kafkaStream = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          Subscribe[String, String](topicsSet, kafkaMapParams)
        )
    
        val finalResultRDD: DStream[(Int, String)] = kafkaStream.flatMap(row => row.value().split(" "))
          .map((_, 1)).reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(3), Seconds(2))
          .transform(rdd => rdd.map(tuple => (tuple._2, tuple._1))
            .sortByKey(false).map(tuple => (tuple._1, tuple._2))
          )
    
        finalResultRDD.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    
    
    }
    

    运行结果:

  • 相关阅读:
    java栈的最大深度?
    String hashCode 方法为什么选择数字31作为乘子
    LinkedList 源码分析(JDK 1.8)
    ArrayList 源码分析
    LinkedHashMap 源码详细分析(JDK1.8)
    Java并发基础:了解无锁CAS就从源码分析
    IntelliJ IDEA(2018)安装详解
    HashMap 源码详细分析(JDK1.8)
    Java原子类实现原理分析
    谈谈Java中的volatile
  • 原文地址:https://www.cnblogs.com/AK47Sonic/p/8052451.html
Copyright © 2011-2022 走看看