zoukankan      html  css  js  c++  java
  • SparkStreaming DStream转换

    1、无状态转换操作

    (1)无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转换DStream中的每一个RDD。

    部分无状态转化操作:

    (2)尽管这些函数韩起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上。

    例如:reduceByKey()会化简每个事件区间中的数据,但不会化简不同区间之间的数据。

    (3)在wordcount中,我们只会统计几秒内接收到的数据的单词个数,而不会累加

    (4)无状态转化操作也能在多个DStream间整合数据,不过也是在各个事件区间内。如,键值对DStream拥有和RDD一样的与连接相关的转化操作,也就是cogroup()、join()、leftOuterJoin() 等。我们可以在DStream上使用这些操作,这样就对每个批次分别执行了对应的RDD操作。

    (5)我们还可以像在常规的 Spark 中一样使用 DStream的union() 操作将它和另一个DStream 的内容合并起来,也可以使用StreamingContext.union()来合并多个流。

    transform操作

    transform允许DStream上执行任意的RDD-to-RDD函数

    可以用来执行一些 RDD 操作, 即使这些操作并没有在 SparkStreaming 中暴露出来.

    该函数每一批次调度一次其实也就是对DStream中的RDD应用转换。

    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
      * Author z
      * Date 2019/4/28 6:51 PM
      */
    object TransformDemo {
        def main(args: Array[String]): Unit = {
            val conf = new SparkConf().setAppName("Practice").setMaster("local[2]")
           
            val sctx = new StreamingContext(conf, Seconds(3))
            val dstream: ReceiverInputDStream[String] = sctx.socketTextStream("hadoop201", 10000)
        
            val resultDStream = dstream.transform(rdd => {
                rdd.flatMap(_.split("\W")).map((_, 1)).reduceByKey(_ + _)
            })
            resultDStream.print
            sctx.start
            
            sctx.awaitTermination()
        }
    }
    

    2、有状态转换

    (1)updateStateByKey

    updateStateByKey操作允许在使用新信息不断更新状态的同时能够保留他的状态

    • 定义状态. 状态可以是任意数据类型
    • 定义状态更新函数. 指定一个函数, 这个函数负责使用以前的状态和新值来更新状态.

    在每个阶段Spark 都会在所有已经存在的key上使用状态更新函数,而不管是否有新的数据在

    def updateStateByKey[S: ClassTag](
                     updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)]
    

    wordcount案例

    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    object StreamingWordCount2 {
        def main(args: Array[String]): Unit = {
            // 设置将来访问 hdfs 的使用的用户名, 否则会出现全选不够
            System.setProperty("HADOOP_USER_NAME", "duoduo")
            val conf = new SparkConf().setAppName("StreamingWordCount2").setMaster("local[*]")
            // 1. 创建SparkStreaming的入口对象: StreamingContext  参数2: 表示事件间隔
            val ssc = new StreamingContext(conf, Seconds(5))
            // 2. 创建一个DStream
            val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop202", 9999)
            // 3. 一个个的单词
            val words: DStream[String]=lines.flatMap(_.split("""s+"""))
            // 4. 单词形成元组
            val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
            
            
            // 开始
            /*
            1. 定义状态: 每个单词的个数就是我们需要更新的状态
            2. 状态更新函数. 每个key(word)上使用一次更新新函数
                参数1: 在当前阶段 一个新的key对应的value组成的序列  在我们这个案例中是: 1,1,1,1...
                参数2: 上一个阶段 这个key对应的value
             */
            def updateFunction(newValue: Seq[Int], runningCount: Option[Int]): Option[Int] = {
                // 新的总数和状态进行求和操作
    newValue.foldLeft(0)(_+_)
                val newCount: Int = (0 /: newValue) (_ + _) + runningCount.getOrElse(0)
                Some(newCount)
            }
            // 设置检查点: 使用updateStateByKey必须设置检查点
            ssc.sparkContext.setCheckpointDir("ck")
            val stateDS: DStream[(String, Int)] = wordAndOne.updateStateByKey[Int](updateFunction _)
            //结束
            
            //6. 显示
            stateDS.print
            //7. 启动流失任务开始计算
            ssc.start()
            //8. 等待计算结束才推出主程序
            ssc.awaitTermination()
            ssc.stop(false)
        }
    }
    

    (2)window操作

    Spark Streaming 提供了窗口计算,允许执行转换操作作用在一个窗口内的数据

    默认情况下,计算只对一个时间段内的RDD进行,有了窗口之后,可以把计算应用到一个指定的窗口内的所有RDD上

    一个窗口可以包含多个时间段,基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果

    观察上图, 窗口在 DStream 上每滑动一次, 落在窗口内的那些 RDD会结合在一起, 然后在上面操作产生新的 RDD, 组成了 window DStream.

    在上面图的情况下, 操作会至少应用在 3 个数据单元上, 每次滑动 2 个时间单位. 所以, 窗口操作需要 2 个参数:

    1. 窗口长度 – 窗口的持久时间(执行一次持续多少个时间单位)(图中是 3)
    2. 滑动步长 – 窗口操作被执行的间隔(每多少个时间单位执行一次).(图中是 2 )

    注意: 这两个参数必须是源 DStream 的 interval 的倍数

    1)reduceByKeyAndWindow(reduceFunc:(V, V) => V, windowDuration: Duration)

    val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
    /*
    参数1: reduce 计算规则
    参数2: 窗口长度
    参数3: 窗口滑动步长. 每隔这么长时间计算一次.
     */
    val count: DStream[(String, Int)] =
    wordAndOne.reduceByKeyAndWindow((x: Int, y: Int) => x + y,Seconds(15), Seconds(10))
    

    2)reduceByKeyAndWindow(reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration)

    比没有invReduceFunc高效.会利用旧值来进行计算.

    invReduceFunc: (V, V) => V 窗口移动了, 上一个窗口和新的窗口会有重叠部分, 重叠部分的值可以不用重复计算了. 第一个参数就是新的值, 第二个参数是旧的值.

    ssc.sparkContext.setCheckpointDir("hdfs://hadoop201:9000/checkpoint")
    val count: DStream[(String, Int)] =
        wordAndOne
        .reduceByKeyAndWindow((x: Int, y: Int) => x + y,
        (x: Int, y: Int) => x - y,
        Seconds(15),
        Seconds(10))
    

    3)window(windowLength, slideInterval)

    基于对源 DStream 窗化的批次进行计算返回一个新的 Dstream

    4)countByWindow(windowLength, slideInterval)

    返回一个滑动窗口计数流中的元素的个数

    5)countByValueAndWindow(windowLength, slideInterval, [numTasks])

    对(K,V)对的DStream调用,返回(K,Long)对的新DStream,其中每个key的的对象的v是其在滑动窗口中频率。如上,可配置reduce任务数量。

  • 相关阅读:
    [LC] 270. Closest Binary Search Tree Value
    [LC] 452. Minimum Number of Arrows to Burst Balloons
    [LC] 494. Target Sum
    [LC] 350. Intersection of Two Arrays II
    [LC] 349. Intersection of Two Arrays
    [LC] 322. Coin Change
    scala--函数和闭包
    scala-- 内建控制结构
    scala--函数式对象
    scala --操作符和运算
  • 原文地址:https://www.cnblogs.com/hyunbar/p/12069043.html
Copyright © 2011-2022 走看看