zoukankan      html  css  js  c++  java
  • SparkStreaming算子操作,Output操作

    一、SparkStreaming算子操作

    1.1 foreachRDD

    output operation算子,必须对抽取出来的RDD执行action类算子,代码才能执行。

    1.2 transform

    transformation类算子
    可以通过transform算子,对Dstream做RDD到RDD的任意操作。

    1.3 updateStateByKey

    1. transformation算子

    updateStateByKey作用:

    • 为SparkStreaming中每一个Key维护一份state状态,state类型可以是任意类型的,可以是一个自定义的对象,更新函数也可以是自定义的。
    • 通过更新函数对该key的状态不断更新,对于每个新的batch而言,SparkStreaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新。
    1. 使用到updateStateByKey要开启checkpoint机制和功能。
    2. 多久会将内存中的数据写入到磁盘一份?

    如果batchInterval设置的时间小于10秒,那么10秒写入磁盘一份。如果batchInterval设置的时间大于10秒,那么就会batchInterval时间间隔写入磁盘一份。

    1.4 操作窗口

    在这里插入图片描述
    假设每隔5s 1个batch,上图中窗口长度为15s,窗口滑动间隔10s。

    1. 窗口长度和滑动间隔必须是batchInterval的整数倍。如果不是整数倍会检测报错。
    2. 优化后的window操作要保存状态所以要设置checkpoint路径,没有优化的window操作可以不设置checkpoint路径。

    二、Driver HA(Standalone或者Mesos)

    因为SparkStreaming是7*24小时运行,Driver只是一个简单的进程,有可能挂掉,所以实现Driver的HA就有必要(如果使用的Client模式就无法实现Driver HA ,这里针对的是cluster模式)。Yarn平台cluster模式提交任务,AM(AplicationMaster)相当于Driver,如果挂掉会自动启动AM。这里所说的DriverHA针对的是Spark standalone和Mesos资源调度的情况下。实现Driver的高可用有两个步骤:

    第一:提交任务层面,在提交任务的时候加上选项 --supervise,当Driver挂掉的时候会自动重启Driver。
    第二:代码层面,使用JavaStreamingContext.getOrCreate(checkpoint路径,JavaStreamingContextFactory)

    Driver中元数据包括:

    1. 创建应用程序的配置信息。
    2. DStream的操作逻辑。
    3. job中没有完成的批次数据,也就是job的执行进度。

    三、Output操作

    Output Meaning
    print 打印每个batch中的前10个元素,主要用于测试,或者是不需要执行什么output操作时,用于简单触发一下job
    saveAsTextFile(prefix,[suffix]) 将每个batch的数据保存到文件中,每个batch的文件命名格式为:prefix-TIME_IN_MSI[.suffix]
    saveAsObjectFile 同上,但是将每个batch的数据以序列化对象的方式,保存到SequenceFile中
    saveAsHadoopFile 同上,将数据保存到Hadoop文件中
    foreachRDD 最常用的output操作,遍历DStream中的每个产生的RDD,进行处理。可以将每个RDD中的数据写入外部存储,比如文件,数据库,缓存等。通常在其中,是针对RDD执行action操作的,比如foreach

    算子操作实例

    1 pom.xml

    <properties>
            <spark.version>2.3.0</spark.version>
            <encoding>UTF-8</encoding>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
                <!--kafka_2.12-2.2.0-->
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.6.4</version>
            </dependency>
        </dependencies>
    

    2 StreamingTest

    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Durations, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    object StreamingTest {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local[2]")
        conf.setAppName("StreamingTest")
    
        val sc = new SparkContext()
        //new Streaming有两种方式,若使用第一种方式,则上方不需要再初始化SparkContext
        //在JYM中已经创建了SparkContext
        val ssc = new StreamingContext(conf, Durations.seconds(5))
        ssc.sparkContext.setLogLevel("Error")
        //val ssc = new StreamingContext(sc,Durations.seconds(5))
        //可通过ssc.sparkContext获取到SparkContext的值
    
        val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hostname", 9000)
        val words: DStream[String] = lines.flatMap(one => {      one.split(" ")    })
        val pairsWords: DStream[(String, Int)] = words.map(one => {      (one, 1)    })
        val result: DStream[(String, Int)] = pairsWords.reduceByKey(_ + _)
    
        //result.print()
    
        result.foreachRDD(pairRDD => {
          val newRDD: RDD[(String, Int)] = pairRDD.filter(one => {
            println("filter===============")
            true
          })
          val resultRDD: RDD[(String, Int)] = newRDD.map(one => {
            println("map**************" + one)
            one
          })
          resultRDD.count()
        })
    
        /*result.foreachRDD(wordCount => {
          println("******producer in Driver********")
          val sortRDD: RDD[(String, Int)] = wordCount.sortByKey(false)
          val result: RDD[(String, Int)] = sortRDD.filter(tp => {
            println("***********producer in Executor**********)
            true
          })
          result.foreach(println)
        })*/
    
        ssc.start()
        ssc.awaitTermination()
        //ssc.stop(true)会清空SparkContext对象
        //ssc.stop(false)则不会清空对象
        ssc.stop()
      }
    }
    

    3 UpdateStateByKey

    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Durations, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    object UpdateStateByKey {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local[2]")
        conf.setAppName("UpdateStateByKey")
    
        val sc = new SparkContext()
        //new Streaming有两种方式,若使用第一种方式,则上方不需要再初始化SparkContext
        //在JYM中已经创建了SparkContext
        val ssc = new StreamingContext(conf, Durations.seconds(5))
        ssc.sparkContext.setLogLevel("Error")
        //val ssc = new StreamingContext(sc,Durations.seconds(5))
        //可通过ssc.sparkContext获取到SparkContext的值
    
        val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hostname", 9000)
        val words: DStream[String] = lines.flatMap(one => {
          one.split(" ")
        })
        val pairsWords: DStream[(String, Int)] = words.map(one => {
          (one, 1)
        })
    
        /**
          * 根据key更状态,需要设置checkpoint来保存状态
          * 默认key的状态在内存中有一份,在checkpoint目录中有一份
          *
          * 多久会将内存中的数据(每一个key多对应的状态)写入到磁盘一份呢?
          * 如果batchInterval小于10s,那么10s会将内存中的数据写入到磁盘一份
          * 如果batchInterval大于10s,那么就以batchInterval为准
          *
          * 目的:为了防止频繁的HDFS
          * 设置checkpoint两种方式都可以
          */
        ssc.checkpoint("D:/spark")
        //ssc.sparkContext.setCheckpointDir("D:/spark")
    
        /**
          * currentValues:当前批次某个key对应所有的value组成的一个集合
          * preValue:以往批次当前Key,对应的总状态值
          */
        val result: DStream[(String, Int)] = pairsWords.updateStateByKey((currentValues: Seq[Int], preValue: Option[Int]) => {
          var totalValues = 0
          if (!preValue.isEmpty) {
            totalValues += preValue.get
          }
          for (value <- currentValues) {
            totalValues += value
          }
          Option(totalValues)
        })
    
        ssc.start()
        ssc.awaitTermination()
        //ssc.stop(true)会清空SparkContext对象
        //ssc.stop(false)则不会清空对象
        ssc.stop()
      }
    }
    

    4 WindowOperator

    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Durations, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    object WindowOperator {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local[2]")
        conf.setAppName("UpdateStateByKey")
    
        val sc = new SparkContext()
        //new Streaming有两种方式,若使用第一种方式,则上方不需要再初始化SparkContext
        //在JYM中已经创建了SparkContext
        val ssc = new StreamingContext(conf, Durations.seconds(5))
        ssc.sparkContext.setLogLevel("Error")
        //val ssc = new StreamingContext(sc,Durations.seconds(5))
        //可通过ssc.sparkContext获取到SparkContext的值
    
        val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hostname", 9000)
        val words: DStream[String] = lines.flatMap(one => {
          one.split(" ")
        })
        val pairsWords: DStream[(String, Int)] = words.map(one => {
          (one, 1)
        })
    
        /**
          * 窗口操作普通机制
          *
          * 滑动间隔和窗口长度必须是batchInterval整数倍
          */
        /*val windowResult: DStream[(String, Int)] = pairsWords.reduceByKeyAndWindow((v1: Int, v2: Int) => {
          v1 + v2
        }, Durations.seconds(15), Durations.seconds(5))*/
    
        val windowResult = pairsWords.reduceByKeyAndWindow((v1: Int, v2: Int) => {
          v1 + v2
        }, (v1: Int, v2: Int) => {
          v1 - v2
        }, Durations.seconds(15), Durations.seconds(5))
    
        windowResult.print()
    
        ssc.start()
        ssc.awaitTermination()
        //ssc.stop(true)会清空SparkContext对象
        //ssc.stop(false)则不会清空对象
        ssc.stop()
      }
    }
    
  • 相关阅读:
    史上最全的 Python 3 类型转换指南
    Python 3 进阶 —— print 打印和输出
    Go 标准库 —— sync.Mutex 互斥锁
    Python 判断文件/目录是否存在
    测试用例设计——如何提高测试覆盖率
    多语言的测试经验分享
    robotium教材(一):robotium环境搭建
    Android应用开发中出现appcompat-v7错误
    Android CPU使用率:top和dump cpuinfo的不同
    Kernel Time和User Time分别指什么
  • 原文地址:https://www.cnblogs.com/aixing/p/13327404.html
Copyright © 2011-2022 走看看