zoukankan      html  css  js  c++  java
  • Spark Streaming之窗口函数和状态转换函数

    流处理主要有3种应用场景:无状态操作、window操作、状态操作。

    reduceByKeyAndWindow

    import kafka.serializer.StringDecoder
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming._
    import org.apache.spark.{SparkContext, SparkConf}
    
    object ClickStream {
      def main (args: Array[String]){
        // 屏蔽不必要的日志显示在终端上
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    
         //创建SparkConf对象,设置应用程序的名称,在程序运行的监控界面可以看到名称
        val conf = new SparkConf().setAppName("ClickStream").setMaster("local[*]")   
        val sc = new SparkContext(conf)
    
        //此处设置Batch Interval是在Spark Streaming中生成基本Job的时间单位,窗口和滑动时间间隔一定是该Batch Interval的整数倍
        val ssc = new StreamingContext(sc, Seconds(args(0).toLong))
    
        //由于用到了窗口函数,需要复用前面的RDD,必须checkpoint,注意复用的RDD之间是没有任何关系的
        ssc.checkpoint(args(1))
    
        val topics = Set("clickstream")    //所要获取数据在kafka上的主题
        val brokers = "yz4203.hadoop.data.sina.com.cn:19092,yz4202.hadoop.data.sina.com.cn:19092,10.39.4.212:19092,10.39.4.201:19092"
        val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
        //val offset = "largest"    //values: smallest, largest ,控制读取最新的数据,还是旧的数据, 默认值为largest
    
        //从Spark1.3开始,我们能够使用如下方式高效地从kafka上获取数据
        val kvsTemp = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
        val kvs = kvsTemp.map(line => line._2)                 //第一部分是null为key,第二部分才是所需数据,为string类型
    
        //根据需求对流进来的数据进行清洗、转换等处理
        val data = kvs.map(_.split("\t")).filter(_(53) == "finance").map(_(27)).map(_.split("\?")(0)).filter(! _.contains("iframe")).map((_, 1))
    
        //滑动窗口长度为1小时,滑动间隔为10分钟,这会得到过去1小时内,url和pv的对应关系
        //val pvWindow = data.reduceByKeyAndWindow((v1: Int, v2: Int) => v1+v2, Minutes(60), Minutes(10))
    
         //滑动窗口长度为1小时,滑动间隔为10分钟,这同样会得到过去1小时内,url和pv的对应关系,只不过这是加新减旧,第一个参数加上新的,第2个参数,减去上一个batch的。
    //和上一个版本的reduceByKeyAndWindow每次都会重新算相比(叠加方式),这种方式(增量方式)更加高效优雅
    val pvWindow = data.reduceByKeyAndWindow(_ + _, _ - _, Minutes(60), Minutes(10)) pvWindow.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminat ssc.stop(true, true) //优雅地结束 } }

    countByValueAndWindow

    countByValueAndWindow的源码如下所示:

     /**
       * Return a new DStream in which each RDD contains the count of distinct elements in
       * RDDs in a sliding window over this DStream. Hash partitioning is used to generate
       * the RDDs with `numPartitions` partitions (Spark's default number of partitions if
       * `numPartitions` not specified).
       * @param windowDuration width of the window; must be a multiple of this DStream's
       *                       batching interval
       * @param slideDuration  sliding interval of the window (i.e., the interval after which
       *                       the new DStream will generate RDDs); must be a multiple of this
       *                       DStream's batching interval
       * @param numPartitions  number of partitions of each RDD in the new DStream.
       */
      def countByValueAndWindow(
          windowDuration: Duration,
          slideDuration: Duration,
          numPartitions: Int = ssc.sc.defaultParallelism)
          (implicit ord: Ordering[T] = null)
          : DStream[(T, Long)] = ssc.withScope {
        this.map((_, 1L)).reduceByKeyAndWindow(
          (x: Long, y: Long) => x + y,
          (x: Long, y: Long) => x - y,
          windowDuration,
          slideDuration,
          numPartitions,
          (x: (T, Long)) => x._2 != 0L
        )
      }

    reduceByWindow

    reduceByWindow的源码如下所示:

    /**
       * Return a new DStream in which each RDD has a single element generated by reducing all
       * elements in a sliding window over this DStream. However, the reduction is done incrementally
       * using the old window's reduced value :
       *  1. reduce the new values that entered the window (e.g., adding new counts)
       *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
       *  This is more efficient than reduceByWindow without "inverse reduce" function.
       *  However, it is applicable to only "invertible reduce functions".
       * @param reduceFunc associative and commutative reduce function
       * @param invReduceFunc inverse reduce function; such that for all y, invertible x:
       *                      `invReduceFunc(reduceFunc(x, y), x) = y`
       * @param windowDuration width of the window; must be a multiple of this DStream's
       *                       batching interval
       * @param slideDuration  sliding interval of the window (i.e., the interval after which
       *                       the new DStream will generate RDDs); must be a multiple of this
       *                       DStream's batching interval
       */
      def reduceByWindow(
          reduceFunc: (T, T) => T,
          invReduceFunc: (T, T) => T,
          windowDuration: Duration,
          slideDuration: Duration
        ): DStream[T] = ssc.withScope {
          this.map((1, _))
              .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
              .map(_._2)
      }

    countByWindow

    countByWindow的源码如下所示:

     /**
       * Return a new DStream in which each RDD has a single element generated by counting the number
       * of elements in a sliding window over this DStream. Hash partitioning is used to generate
       * the RDDs with Spark's default number of partitions.
       * @param windowDuration width of the window; must be a multiple of this DStream's
       *                       batching interval
       * @param slideDuration  sliding interval of the window (i.e., the interval after which
       *                       the new DStream will generate RDDs); must be a multiple of this
       *                       DStream's batching interval
       */
      def countByWindow(
          windowDuration: Duration,
          slideDuration: Duration): DStream[Long] = ssc.withScope {
        this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
      }

    由此可见,countByValueAndWindow、reduceByWindow、countByWindow的底层实现都是“加新减旧”版本的reduceByKeyAndWindow。

    上面,求出了每一小时窗口内的Url和Pv的对应关系,如果想求出相同的Url在上一个窗口的Pv和本次窗口的Pv的比值,那么这时侯updateStateByKey,mapWithState就粉墨登场了。由于updateStateByKey和mapWithState二者之间有10倍左右的性能差异。

    这里,只涉及mapWithState。

    mapWithState

    import kafka.serializer.StringDecoder
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming._
    import org.apache.spark.{SparkContext, SparkConf}
    
    object ClickStream {
      def main (args: Array[String]){
        // 屏蔽不必要的日志显示在终端上
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    
         //创建SparkConf对象,设置应用程序的名称,在程序运行的监控界面可以看到名称
        val conf = new SparkConf().setAppName("ClickStream").setMaster("local[*]")   
        val sc = new SparkContext(conf)
    
        //此处设置Batch Interval是在Spark Streaming中生成基本Job的时间单位,窗口和滑动时间间隔一定是该Batch Interval的整数倍
        val ssc = new StreamingContext(sc, Seconds(args(0).toLong))
    
        //由于用到了窗口函数,需要复用前面的RDD,必须checkpoint,注意复用的RDD之间是没有任何关系的
        ssc.checkpoint(args(1))
    
        val topics = Set("clickstream")    //所要获取数据在kafka上的主题
        val brokers = yz4207.hadoop.data.sina.com.cn:19092,yz4203.hadoop.data.sina.com.cn:19092,yz4202.hadoop.data.sina.com.cn:19092,10.39.4.212:19092,10.39.4.201:19092"
        val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
        //val offset = "largest"    //values: smallest, largest ,控制读取最新的数据,还是旧的数据, 默认值为largest
    
        //从Spark1.3开始,我们能够使用如下方式高效地从kafka上获取数据
        val kvsTemp = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
        val kvs = kvsTemp.map(line => line._2)                 //第一部分是null为key,第二部分才是所需数据,为string类型
    
        //根据需求对流进来的数据进行清洗、转换等处理
        val data = kvs.map(_.split("\t")).filter(_(53) == "finance").map(_(27)).map(_.split("\?")(0)).filter(! _.contains("iframe")).map((_, 1))
    
        //滑动窗口长度为1小时,滑动间隔为10分钟,这会得到过去1小时内,url和pv的对应关系
        //val pvWindow = data.reduceByKeyAndWindow((v1: Int, v2: Int) => v1+v2, Minutes(60), Minutes(10))
    
         //滑动窗口长度为1小时,滑动间隔为10分钟,这同样会得到过去1小时内,url和pv的对应关系,只不过这是加新减旧,第一个参数加上新的,第2个参数,减去上一个batch的。和上一个版本的reduceByKeyAndWindow每次都会重新算相比(叠加方式),
    //这种方式(增量方式)更加高效优雅
    val pvWindow = data.reduceByKeyAndWindow(_ + _, _ - _, Minutes(60), Minutes(10)) //key是K, value是新值,state是原始值(本batch之前的状态值)。这里你需要把state更新为新值 val mappingFunc = (key: String, value: Option[Int], state: State[Int]) => { val currentPV = value.getOrElse(0) val output = (key, currentPV, state.getOption().getOrElse(0)) state.update(currentPV) output } //StateSpec只是一个包裹,实际操作仍然是定义的mappingFunc函数 val urlPvs = pvWindow.mapWithState(StateSpec.function(mappingFunc)) //url,当前batch的PV,上一个batch的PV urlPvs.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminat ssc.stop(true, true) //优雅地结束 } }
  • 相关阅读:
    nullnullConnecting with WiFi Direct 与WiFi直接连接
    nullnullUsing WiFi Direct for Service Discovery 直接使用WiFi服务发现
    nullnullSetting Up the Loader 设置装载机
    nullnullDefining and Launching the Query 定义和启动查询
    nullnullHandling the Results 处理结果
    装置输出喷泉装置(贪心问题)
    数据状态什么是事务?
    停止方法iOS CGD 任务开始与结束
    盘文件云存储——金山快盘
    函数标识符解决jQuery与其他库冲突的方法
  • 原文地址:https://www.cnblogs.com/itboys/p/10594426.html
Copyright © 2011-2022 走看看