zoukankan      html  css  js  c++  java
  • Spark Streaming之窗口函数和状态转换函数

    流处理主要有3种应用场景:无状态操作、window操作、状态操作。

    reduceByKeyAndWindow

    import kafka.serializer.StringDecoder
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming._
    import org.apache.spark.{SparkContext, SparkConf}
    
    object ClickStream {
      def main (args: Array[String]){
        // 屏蔽不必要的日志显示在终端上
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    
         //创建SparkConf对象,设置应用程序的名称,在程序运行的监控界面可以看到名称
        val conf = new SparkConf().setAppName("ClickStream").setMaster("local[*]")   
        val sc = new SparkContext(conf)
    
        //此处设置Batch Interval是在Spark Streaming中生成基本Job的时间单位,窗口和滑动时间间隔一定是该Batch Interval的整数倍
        val ssc = new StreamingContext(sc, Seconds(args(0).toLong))
    
        //由于用到了窗口函数,需要复用前面的RDD,必须checkpoint,注意复用的RDD之间是没有任何关系的
        ssc.checkpoint(args(1))
    
        val topics = Set("clickstream")    //所要获取数据在kafka上的主题
        val brokers = "yz4203.hadoop.data.sina.com.cn:19092,yz4202.hadoop.data.sina.com.cn:19092,10.39.4.212:19092,10.39.4.201:19092"
        val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
        //val offset = "largest"    //values: smallest, largest ,控制读取最新的数据,还是旧的数据, 默认值为largest
    
        //从Spark1.3开始,我们能够使用如下方式高效地从kafka上获取数据
        val kvsTemp = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
        val kvs = kvsTemp.map(line => line._2)                 //第一部分是null为key,第二部分才是所需数据,为string类型
    
        //根据需求对流进来的数据进行清洗、转换等处理
        val data = kvs.map(_.split("\t")).filter(_(53) == "finance").map(_(27)).map(_.split("\?")(0)).filter(! _.contains("iframe")).map((_, 1))
    
        //滑动窗口长度为1小时,滑动间隔为10分钟,这会得到过去1小时内,url和pv的对应关系
        //val pvWindow = data.reduceByKeyAndWindow((v1: Int, v2: Int) => v1+v2, Minutes(60), Minutes(10))
    
         //滑动窗口长度为1小时,滑动间隔为10分钟,这同样会得到过去1小时内,url和pv的对应关系,只不过这是加新减旧,第一个参数加上新的,第2个参数,减去上一个batch的。
    //和上一个版本的reduceByKeyAndWindow每次都会重新算相比(叠加方式),这种方式(增量方式)更加高效优雅
    val pvWindow = data.reduceByKeyAndWindow(_ + _, _ - _, Minutes(60), Minutes(10)) pvWindow.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminat ssc.stop(true, true) //优雅地结束 } }

    countByValueAndWindow

    countByValueAndWindow的源码如下所示:

     /**
       * Return a new DStream in which each RDD contains the count of distinct elements in
       * RDDs in a sliding window over this DStream. Hash partitioning is used to generate
       * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
       * `numPartitions` not specified).
       * @param windowDuration width of the window; must be a multiple of this DStream's
       *                       batching interval
       * @param slideDuration  sliding interval of the window (i.e., the interval after which
       *                       the new DStream will generate RDDs); must be a multiple of this
       *                       DStream's batching interval
       * @param numPartitions  number of partitions of each RDD in the new DStream.
       */
      def countByValueAndWindow(
          windowDuration: Duration,
          slideDuration: Duration,
          numPartitions: Int = ssc.sc.defaultParallelism)
          (implicit ord: Ordering[T] = null)
          : DStream[(T, Long)] = ssc.withScope {
        this.map((_, 1L)).reduceByKeyAndWindow(
          (x: Long, y: Long) => x + y,
          (x: Long, y: Long) => x - y,
          windowDuration,
          slideDuration,
          numPartitions,
          (x: (T, Long)) => x._2 != 0L
        )
      }

    reduceByWindow

    reduceByWindow的源码如下所示:

    /**
       * Return a new DStream in which each RDD has a single element generated by reducing all
       * elements in a sliding window over this DStream. However, the reduction is done incrementally
       * using the old window's reduced value :
       *  1. reduce the new values that entered the window (e.g., adding new counts)
       *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
       *  This is more efficient than reduceByWindow without "inverse reduce" function.
       *  However, it is applicable to only "invertible reduce functions".
       * @param reduceFunc associative and commutative reduce function
       * @param invReduceFunc inverse reduce function; such that for all y, invertible x:
       *                      `invReduceFunc(reduceFunc(x, y), x) = y`
       * @param windowDuration width of the window; must be a multiple of this DStream's
       *                       batching interval
       * @param slideDuration  sliding interval of the window (i.e., the interval after which
       *                       the new DStream will generate RDDs); must be a multiple of this
       *                       DStream's batching interval
       */
      def reduceByWindow(
          reduceFunc: (T, T) => T,
          invReduceFunc: (T, T) => T,
          windowDuration: Duration,
          slideDuration: Duration
        ): DStream[T] = ssc.withScope {
          this.map((1, _))
              .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
              .map(_._2)
      }

    countByWindow

    countByWindow的源码如下所示:

     /**
       * Return a new DStream in which each RDD has a single element generated by counting the number
       * of elements in a sliding window over this DStream. Hash partitioning is used to generate
       * the RDDs with Spark's default number of partitions.
       * @param windowDuration width of the window; must be a multiple of this DStream's
       *                       batching interval
       * @param slideDuration  sliding interval of the window (i.e., the interval after which
       *                       the new DStream will generate RDDs); must be a multiple of this
       *                       DStream's batching interval
       */
      def countByWindow(
          windowDuration: Duration,
          slideDuration: Duration): DStream[Long] = ssc.withScope {
        this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
      }

    由此可见,countByValueAndWindow、reduceByWindow、countByWindow的底层实现都是“加新减旧”版本的reduceByKeyAndWindow。

    上面,求出了每一小时窗口内的Url和Pv的对应关系,如果想求出相同的Url在上一个窗口的Pv和本次窗口的Pv的比值,那么这时侯updateStateByKey,mapWithState就粉墨登场了。由于updateStateByKey和mapWithState二者之间有10倍左右的性能差异。

    这里,只涉及mapWithState。

    mapWithState

    import kafka.serializer.StringDecoder
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming._
    import org.apache.spark.{SparkContext, SparkConf}
    
    object ClickStream {
      def main (args: Array[String]){
        // 屏蔽不必要的日志显示在终端上
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    
         //创建SparkConf对象,设置应用程序的名称,在程序运行的监控界面可以看到名称
        val conf = new SparkConf().setAppName("ClickStream").setMaster("local[*]")   
        val sc = new SparkContext(conf)
    
        //此处设置Batch Interval是在Spark Streaming中生成基本Job的时间单位,窗口和滑动时间间隔一定是该Batch Interval的整数倍
        val ssc = new StreamingContext(sc, Seconds(args(0).toLong))
    
        //由于用到了窗口函数,需要复用前面的RDD,必须checkpoint,注意复用的RDD之间是没有任何关系的
        ssc.checkpoint(args(1))
    
        val topics = Set("clickstream")    //所要获取数据在kafka上的主题
        val brokers = yz4207.hadoop.data.sina.com.cn:19092,yz4203.hadoop.data.sina.com.cn:19092,yz4202.hadoop.data.sina.com.cn:19092,10.39.4.212:19092,10.39.4.201:19092"
        val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
        //val offset = "largest"    //values: smallest, largest ,控制读取最新的数据,还是旧的数据, 默认值为largest
    
        //从Spark1.3开始,我们能够使用如下方式高效地从kafka上获取数据
        val kvsTemp = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
        val kvs = kvsTemp.map(line => line._2)                 //第一部分是null为key,第二部分才是所需数据,为string类型
    
        //根据需求对流进来的数据进行清洗、转换等处理
        val data = kvs.map(_.split("\t")).filter(_(53) == "finance").map(_(27)).map(_.split("\?")(0)).filter(! _.contains("iframe")).map((_, 1))
    
        //滑动窗口长度为1小时,滑动间隔为10分钟,这会得到过去1小时内,url和pv的对应关系
        //val pvWindow = data.reduceByKeyAndWindow((v1: Int, v2: Int) => v1+v2, Minutes(60), Minutes(10))
    
         //滑动窗口长度为1小时,滑动间隔为10分钟,这同样会得到过去1小时内,url和pv的对应关系,只不过这是加新减旧,第一个参数加上新的,第2个参数,减去上一个batch的。和上一个版本的reduceByKeyAndWindow每次都会重新算相比(叠加方式),
    //这种方式(增量方式)更加高效优雅
    val pvWindow = data.reduceByKeyAndWindow(_ + _, _ - _, Minutes(60), Minutes(10)) //key是K, value是新值,state是原始值(本batch之前的状态值)。这里你需要把state更新为新值 val mappingFunc = (key: String, value: Option[Int], state: State[Int]) => { val currentPV = value.getOrElse(0) val output = (key, currentPV, state.getOption().getOrElse(0)) state.update(currentPV) output } //StateSpec只是一个包裹,实际操作仍然是定义的mappingFunc函数 val urlPvs = pvWindow.mapWithState(StateSpec.function(mappingFunc)) //url,当前batch的PV,上一个batch的PV urlPvs.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminat ssc.stop(true, true) //优雅地结束 } }
  • 相关阅读:
    Selenium(一):元素定位
    白盒测试系列(五)条件组合覆盖
    LDAP(轻型目录访问协议)
    Spring Intorduce、History and Design Philosophy
    CORS
    mysql创建用户并授权某个数据库
    Introduce Servlet 、Filter
    web.xml的简单解释以及Hello1中web.xml的简单分析
    ANNOTATION and analyse hello1.java
    Analysis of container and Injection in Java, their history and future.
  • 原文地址:https://www.cnblogs.com/itboys/p/10594426.html
Copyright © 2011-2022 走看看