zoukankan      html  css  js  c++  java
  • 大数据学习之Spark Streaming进阶 52

    1、StreamingContext对象详解

    初始化StreamingContext

     方式一:从SparkConf对象中创建

     从一个现有的SparkContext实例中创建

     

     程序中的几点说明:

     appName参数是应用程序在集群UI上显示的名称。

    masterSparkMesosYARN集群的URL,或者一个特殊的“local [*]”字符串来让程序以本地模式运行。

     

    当在集群上运行程序时,不需要在程序中硬编码master参数,而是使用spark-submit提交应用程序并将masterURL以脚本参数的形式传入。但是,对于本地测试和单元测试,您可以通过“local[*]”来运行Spark Streaming程序(请确保本地系统中的cpu核心数够用)。

     

     StreamingContext会内在的创建一个SparkContext的实例(所有Spark功能的起始点),你可以通过ssc.sparkContext访问到这个实例。

     批处理的时间窗口长度必须根据应用程序的延迟要求和可用的集群资源进行设置。

     

    请务必记住以下几点:

     一旦一个StreamingContextt开始运作,就不能设置或添加新的流计算。

     一旦一个上下文被停止,它将无法重新启动。

     同一时刻,一个JVM中只能有一个StreamingContext处于活动状态。

     StreamingContext上的stop()方法也会停止SparkContext。 要仅停止StreamingContext(保持SparkContext活跃),请将stop() 方法的可选参数stopSparkContext设置为false

     只要前一个StreamingContext在下一个StreamingContext被创建之前停止(不停止SparkContext),SparkContext就可以被重用来创建多个StreamingContext

     

     

    2、离散流(DStreams):Discretized Streams

     

    l DiscretizedStreamDStream Spark Streaming对流式数据的基本抽象。它表示连续的数据流,这些连续的数据流可以是从数据源接收的输入数据流,也可以是通过对输入数据流执行转换操作而生成的经处理的数据流。在内部,DStream由一系列连续的RDD表示,如下图:

     

     

    举例分析:在之前的NetworkWordCount的例子中,我们将一行行文本组成的流转换为单词流,具体做法为:将flatMap操作应用于名为linesDStream中的每个RDD上,以生成words DStreamRDD。如下图所示:

    但是DStream和RDD也有区别,下面画图说明:

    3、DStream中的转换操作(transformation)

    最后两个transformation算子需要重点介绍一下:

     

     transform(func)

     通过RDD-to-RDD函数作用于源DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD

    举例:在NetworkWordCount中,也可以使用transform来生成元组对

     

     

    updateStateByKey(func)

     操作允许不断用新信息更新它的同时保持任意状态。

    定义状态-状态可以是任何的数据类型

    定义状态更新函数-怎样利用更新前的状态和从输入流里面获取的新值更新状态

    注意:需要设置检查点

    重写NetworkWordCount程序,累计每个单词出现的频率(注意:累计)

    package demo
    
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object MyTotalNetworkWordCount {
      def main(args: Array[String]): Unit = {
        //创建一个Context对象: StreamingContext  (SparkContext, SQLContext)
        //指定批处理的时间间隔
        val conf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(conf,Seconds(5))
    
        //设置检查点
        ssc.checkpoint("hdfs://192.168.157.11:9000/spark/checkpoint")
    
        //创建一个DStream,处理数据
        val lines  = ssc.socketTextStream("192.168.157.81",7788,StorageLevel.MEMORY_AND_DISK_SER)
    
        //执行wordcount
        val words = lines.flatMap(_.split(" "))
    
        //定义函数用于累计每个单词的总频率
        val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {
          //通过Spark内部的reduceByKey按key规约,然后这里传入某key当前批次的Seq/List,再计算当前批次的总和
          val currentCount = currValues.sum
          // 已累加的值
          val previousCount = prevValueState.getOrElse(0)
          // 返回累加后的结果,是一个Option[Int]类型
          Some(currentCount + previousCount)
        }
    
        val pairs = words.map(word => (word, 1))
    
        val totalWordCounts = pairs.updateStateByKey[Int](addFunc)
        totalWordCounts.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

      

     输出结果:

    注意:如果在IDEA中,不想输出log4j的日志信息,可以将log4j.properties文件(放在src的目录下)的第一行改为:

    log4j.rootCategory=ERROR, console

     

    4、窗口操作

    Spark Streaming还提供了窗口计算功能,允许您在数据的滑动窗口上应用转换操作。下图说明了滑动窗口的工作方式:

    如图所示,每当窗口滑过originalDStream时,落在窗口内的源RDD被组合并被执行操作以产生windowed DStreamRDD。在上面的例子中,操作应用于最近3个时间单位的数据,并以2个时间单位滑动。这表明任何窗口操作都需要指定两个参数。

    窗口长度(windowlength- 窗口的时间长度(上图的示例中为:3)。

    滑动间隔(slidinginterval- 两次相邻的窗口操作的间隔(即每次滑动的时间长度)(上图示例中为:2)。

    这两个参数必须是源DStream的批间隔的倍数(上图示例中为:1)。

     

    我们以一个例子来说明窗口操作。 假设您希望对之前的单词计数的示例进行扩展,每10秒钟对过去30秒的数据进行wordcount。为此,我们必须在最近30秒的pairs DStream数据中对(word, 1)键值对应用reduceByKey操作。这是通过使用reduceByKeyAndWindow操作完成的。

    需要注意:滑动的距离必须是采样时间的整数倍。

     

    一些常见的窗口操作如下表所示。所有这些操作都用到了上述两个参数 - windowLengthslideInterval

     window(windowLength, slideInterval)

    基于源DStream产生的窗口化的批数据计算一个新的DStream

     

     countByWindow(windowLength, slideInterval)

    返回流中元素的一个滑动窗口数

     

     reduceByWindow(func, windowLength, slideInterval)

     返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数必须是相关联的以使计算能够正确的并行计算。

     

     

     reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])

     应用到一个(K,V)对组成的DStream上,返回一个由(K,V)对组成的新的DStream。每一个key的值均由给定的reduce函数聚集起来。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。你可以用numTasks参数设置不同的任务数

     

     reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

     上述reduceByKeyAndWindow() 的更高效的版本,其中使用前一窗口的reduce计算结果递增地计算每个窗口的reduce值。这是通过对进入滑动窗口的新数据进行reduce操作,以及“逆减(inverse reducing)”离开窗口的旧数据来完成的。一个例子是当窗口滑动时对键对应的值进行“一加一减”操作。但是,它仅适用于“可逆减函数(invertible reduce functions)”,即具有相应“反减”功能的减函数(作为参数invFunc)。 像reduceByKeyAndWindow一样,通过可选参数可以配置reduce任务的数量。 请注意,使用此操作必须启用检查点。

     

     countByValueAndWindow(windowLength, slideInterval, [numTasks])

    l应用到一个(K,V)对组成的DStream上,返回一个由(K,V)对组成的新的DStream。每个key的值都是它们在滑动窗口中出现的频率。

     

     

    5、输入DStreams和接收器

     

    输入DStreams表示从数据源获取输入数据流的DStreams。在NetworkWordCount例子中,lines表示输入DStream,它代表从netcat服务器获取的数据流。每一个输入流DStream和一个Receiver对象相关联,这个Receiver从源中获取数据,并将数据存入内存中用于处理。

     

    输入DStreams表示从数据源获取的原始数据流。Spark Streaming拥有两类数据源:

     

    基本源(Basic sources):这些源在StreamingContext API中直接可用。例如文件系统、套接字连接、Akkaactor

     

    高级源(Advanced sources):这些源包括Kafka,Flume,Kinesis,Twitter等等。

     

     

     

    下面通过具体的案例,详细说明:

     

     

     

    • 文件流:通过监控文件系统的变化,若有新文件添加,则将它读入并作为数据流

     

    需要注意的是:

     

    ① 这些文件具有相同的格式

     

    ② 这些文件通过原子移动或重命名文件的方式在dataDirectory创建

     

    ③ 如果在文件中追加内容,这些追加的新数据也不会被读取。

    package demo
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object FileStreaming {
      def main(args: Array[String]) {
        val conf = new SparkConf().setMaster("local[2]").setAppName("FileStreaming")
        val ssc = new StreamingContext(conf,Seconds(2))
    
        //从本地目录中读取数据:如果有新文件产生,就会读取进来
        val lines = ssc.textFileStream("d:\dowload\spark123")
    
        //打印结果
        lines.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

      

    注意:要演示成功,需要在原文件中编辑,然后拷贝一份。

    6、DStreams的输出操作

    输出操作允许DStream的操作推到如数据库、文件系统等外部系统中。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DStream转换。目前,定义了下面几种输出操作:

     

    8、缓存/持久化

    RDD类似,DStreams还允许开发人员将流数据保留在内存中。也就是说,在DStream上调用persist() 方法会自动将该DStream的每个RDD保留在内存中。如果DStream中的数据将被多次计算(例如,相同数据上执行多个操作),这个操作就会很有用。对于基于窗口的操作,如reduceByWindowreduceByKeyAndWindow以及基于状态的操作,如updateStateByKey,数据会默认进行持久化。 因此,基于窗口的操作生成的DStream会自动保存在内存中,而不需要开发人员调用persist()

     

    对于通过网络接收数据(例如KafkaFlumesockets等)的输入流,默认持久化级别被设置为将数据复制到两个节点进行容错。

     

    请注意,与RDD不同,DStreams的默认持久化级别将数据序列化保存在内存中。

     

    9、检查点支持

    流数据处理程序通常都是全天候运行,因此必须对应用中逻辑无关的故障(例如,系统故障,JVM崩溃等)具有弹性。为了实现这一特性,Spark Streaming需要checkpoint足够的信息到容错存储系统,以便可以从故障中恢复。

     

    ① 一般会对两种类型的数据使用检查点:

    1) 元数据检查点(Metadatacheckpointing - 将定义流计算的信息保存到容错存储中(如HDFS)。这用于从运行streaming程序的driver程序的节点的故障中恢复。元数据包括以下几种:

    配置(Configuration- 用于创建streaming应用程序的配置信息。

    l DStream操作(DStream operations- 定义streaming应用程序的DStream操作集合。

    不完整的batchIncomplete batches- jobs还在队列中但尚未完成的batch

     

    2) 数据检查点(Datacheckpointing- 将生成的RDD保存到可靠的存储层。对于一些需要将多个批次之间的数据进行组合的stateful变换操作,设置数据检查点是必需的。在这些转换操作中,当前生成的RDD依赖于先前批次的RDD,这导致依赖链的长度随时间而不断增加,由此也会导致基于血统机制的恢复时间无限增加。为了避免这种情况,stateful转换的中间RDD将定期设置检查点并保存到到可靠的存储层(例如HDFS)以切断依赖关系链。

     

    总而言之,元数据检查点主要用于从driver程序故障中恢复,而数据或RDD检查点在任何使用stateful转换时是必须要有的。

     

    ② 何时启用检查点:

    对于具有以下任一要求的应用程序,必须启用检查点:

    1) 使用状态转:如果在应用程序中使用updateStateByKeyreduceByKeyAndWindow(具有逆函数),则必须提供检查点目录以允许定期保存RDD检查点。

    2) 从运行应用程序的driver程序的故障中恢复:元数据检查点用于使用进度信息进行恢复。

     

    ③ 如何配置检查点:

    可以通过在一些可容错、高可靠的文件系统(例如,HDFSS3等)中设置保存检查点信息的目录来启用检查点。这是通过使用streamingContext.checkpoint(checkpointDirectory)完成的。设置检查点后,您就可以使用上述的有状态转换操作。此外,如果要使应用程序从驱动程序故障中恢复,您应该重写streaming应用程序以使程序具有以下行为:

    1) 当程序第一次启动时,它将创建一个新的StreamingContext,设置好所有流数据源,然后调用start()方法。

    2) 当程序在失败后重新启动时,它将从checkpoint目录中的检查点数据重新创建一个StreamingContext

    使用StreamingContext.getOrCreate可以简化此行为

     

    ④ 改写之前的WordCount程序,使得每次计算的结果和状态都保存到检查点目录下

     

    通过查看HDFS中的信息,可以看到相关的检查点信息,如下:

     

    package demo
    
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object MyCheckpointNetworkWordCount {
      def main(args: Array[String]): Unit = {
        //在主程序中,创建一个Streaming Context对象
        //1、读取一个检查点的目录
        //2、如果该目录下已经存有之前的检查点信息,从已有的信息上创建这个Streaming Context对象
        //3、如果该目录下没有信息,创建一个新的Streaming Context
        val context = StreamingContext.getOrCreate("hdfs://192.168.157.111:9000/spark_checkpoint",createStreamingContext)
    
        //启动任务
        context.start()
        context.awaitTermination()
      }
    
      //创建一个StreamingContext对象,并且设置检查点目录,执行WordCount程序(记录之前的状态信息)
      def createStreamingContext():StreamingContext = {
        val conf = new SparkConf().setAppName("MyCheckpointNetworkWordCount").setMaster("local[2]")
        //创建这个StreamingContext对象
        val ssc = new StreamingContext(conf,Seconds(3))
    
        //设置检查点目录
        ssc.checkpoint("hdfs://192.168.157.111:9000/spark_checkpoint")
    
        //创建一个DStream,执行WordCount
        val lines = ssc.socketTextStream("192.168.157.81",7788,StorageLevel.MEMORY_AND_DISK_SER)
    
        //分词操作
        val words = lines.flatMap(_.split(" "))
        //每个单词记一次数
        val wordPair = words.map(x=> (x,1))
    
        //执行单词计数
        //定义一个新的函数:把当前的值跟之前的结果进行一个累加
        val addFunc = (currValues:Seq[Int],preValueState:Option[Int]) => {
          //当前当前批次的值
          val currentCount = currValues.sum
    
          //得到已经累加的值。如果是第一次求和,之前没有数值,从0开始计数
          val preValueCount = preValueState.getOrElse(0)
    
          //进行累加,然后累加后结果,是Option[Int]
          Some(currentCount + preValueCount)
        }
    
        //要把新的单词个数跟之前的结果进行叠加(累计)
        val totalCount = wordPair.updateStateByKey[Int](addFunc)
    
        //输出结果
        totalCount.print()
    
        //返回这个对象
        ssc
      }
    }
    

     

      

    通过查看HDFS中的信息,可以看到相关的检查点信息,如下:

     

     

  • 相关阅读:
    c#读取XML
    Javascript 自动计算生日
    Thread.currentThread()与setDaeMon(boolean c)方法
    StringBuffer类的delete()方法和deleteCharAt()方法
    getStackTrace()方法使用
    JDBC详解
    eclipse与idea路径不匹配
    Idea导入maven项目不自动识别pom.xml
    IDEA找不到或无法加载主类
    MySQL:主从复制与处从复制同步延迟
  • 原文地址:https://www.cnblogs.com/hidamowang/p/11147832.html
Copyright © 2011-2022 走看看