zoukankan      html  css  js  c++  java
  • Spark 学习笔记之 Streaming Window

    Streaming Window:

    上图意思:每隔2秒统计前3秒的数据

    slideDuration: 2

    windowDuration: 3

    例子:

    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark.streaming.kafka010.KafkaUtils
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    
    object WindowStreaming {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("KafkaDirect").setMaster("local[1]")
        val ssc = new StreamingContext(conf, Seconds(1))
        val kafkaMapParams = Map[String, Object](
          "bootstrap.servers" -> "192.168.1.151:9092,192.168.1.152:9092,192.168.1.153:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "g1",
          "auto.offset.reset" -> "latest", //earliest|latest
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
        val topicsSet = Set("ScalaTopic")
        val kafkaStream = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          Subscribe[String, String](topicsSet, kafkaMapParams)
        )
    
        val finalResultRDD: DStream[(Int, String)] = kafkaStream.flatMap(row => row.value().split(" "))
          .map((_, 1)).reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(3), Seconds(2))
          .transform(rdd => rdd.map(tuple => (tuple._2, tuple._1))
            .sortByKey(false).map(tuple => (tuple._1, tuple._2))
          )
    
        finalResultRDD.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    
    
    }
    

    运行结果:

  • 相关阅读:
    sql把一个表的某几列的数据存到另一个表里
    java四舍五入保留两位小数4种方法
    SWT 全接触
    详解Java多线程编程中LockSupport
    详解Java多线程编程中LockSupport类的线程阻塞用法
    Java多线程中join方法的理解
    java 线程方法join的简单总结
    彻底理解ThreadLocal
    recyclerView 列表类控件卡顿优化
    Android中RelativeLayout和LinearLayout性能分析
  • 原文地址:https://www.cnblogs.com/AK47Sonic/p/8052451.html
Copyright © 2011-2022 走看看