zoukankan      html  css  js  c++  java
  • Spark DStream 输出 编程进阶

    5.DStream 输出

      输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据
    库或输出到屏幕上)。与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没
    有被执行输出操作,那么这些 DStream 就都不会被求值。如果 StreamingContext 中没有设定输出
    操作,整个 context 就都不会启动。
     
    输出操作如下:
    (1)print():在运行流程序的驱动结点上打印 DStream 中每一批次数据的最开始 10 个元素。这
    用于开发和调试。在 Python API 中,同样的操作叫 print()。
    (2)saveAsTextFiles(prefix, [suffix]):以 text 文件形式存储这个 DStream 的内容。每一批次的存
    储文件名基于参数中的 prefix 和 suffix。”prefix-Time_IN_MS[.suffix]”.
    (3)saveAsObjectFiles(prefix, [suffix]):以 Java 对象序列化的方式将 Stream 中的数据保存为
    SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python 中
    目前不可用。
    (4)saveAsHadoopFiles(prefix, [suffix]):将 Stream 中的数据保存为 Hadoop files. 每一批次的存
    储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。Python API Python 中目前不可用。
    (5)foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream 的每一个
    RDD。其中参数传入的函数 func 应该实现将每一个 RDD 中数据推送到外部系统,如将 RDD 存
    入文件或者通过网络将其写入数据库。注意:函数 func 在运行流应用的驱动中被执行,同时其中
    一般函数 RDD 操作从而强制其对于流 RDD 的运算。
     
      通用的输出操作 foreachRDD(),它用来对 DStream 中的 RDD 运行任意计算。这和 transform()
    有些类似,都可以让我们访问任意 RDD。在 foreachRDD()中,可以重用我们在 Spark 中实现的所
    有行动操作。比如,常见的用例之一是把数据写到诸如 MySQL 的外部数据库中。
     
    注意:
    (1)连接不能写在 driver 层面;
    (2)如果写在 foreach 则每个 RDD 都创建,得不偿失;
    (3)增加 foreachPartition,在分区创建。
     
     
     
     
     

    第 6 章 DStream 编程进阶

    6.1 累加器和广播变量

      累加器(Accumulators)和广播变量(Broadcast variables)不能从 Spark Streaming 的检查点中恢复。
    如果你启用检查并也使用了累加器和广播变量,那么你必须创建累加器和广播变量的延迟单实例
    从而在驱动因失效重启后他们可以被重新实例化。如下例述:
    object WordBlacklist {
      @volatile private var instance: Broadcast[Seq[String]] = null
      def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
        if (instance == null) {
          synchronized {
            if (instance == null) {
              val wordBlacklist = Seq("a", "b", "c")
              instance = sc.broadcast(wordBlacklist)
            }
          }
        }
        instance
      }
    }

    object DroppedWordsCounter { @
    volatile private var instance: LongAccumulator = null def getInstance(sc: SparkContext): LongAccumulator = { if (instance == null) { synchronized { if (instance == null) { instance = sc.longAccumulator("WordsInBlacklistCounter") } } } instance } }

    wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time)
    =>   // Get or register the blacklist Broadcast   val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
      
    // Get or register the droppedWordsCounter Accumulator   val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
      
    // Use blacklist to drop words and use droppedWordsCounter to count them   val counts = rdd.filter { case (word, count) =>     if (blacklist.value.contains(word)) {       droppedWordsCounter.add(count)       false     } else {       true     }   }.collect().mkString("[", ", ", "]")
      val output
    = "Counts at time " + time + " " + counts })
     

    6.2 DataFrame ans SQL Operations

      你可以很容易地在流数据上使用 DataFrames 和 SQL。你必须使用 SparkContext 来创建
    StreamingContext 要用的 SQLContext。此外,这一过程可以在驱动失效后重启。我们通过创建一
    个实例化的 SQLContext 单实例来实现这个工作。如下例所示。我们对前例 word count 进行修改
    从而使用 DataFrames 和 SQL 来产生 word counts。每个 RDD 被转换为 DataFrame,以临时表格配
    置并用 SQL 进行查询。
    val words: DStream[String] = ...
    words.foreachRDD { rdd
    =>
      // Get the singleton instance of SparkSession   val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
      
    import spark.implicits._   // Convert RDD[String] to DataFrame   val wordsDataFrame = rdd.toDF("word")
      
    // Create a temporary view   wordsDataFrame.createOrReplaceTempView("words")
      
    // Do word count on DataFrame using SQL and print it   val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")   wordCountsDataFrame.show() }
      你也可以从不同的线程在定义于流数据的表上运行 SQL 查询(也就是说,异步运行
    StreamingContext)。仅确定你设置 StreamingContext 记住了足够数量的流数据以使得查询操作可
    以运行。否则,StreamingContext 不会意识到任何异步的 SQL 查询操作,那么其就会在查询完成
    之后删除旧的数据。例如,如果你要查询最后一批次,但是你的查询会运行 5 分钟,那么你需要
    调用 streamingContext.remember(Minutes(5))(in Scala, 或者其他语言的等价操作)。
     
     
     

    6.3 Caching / Persistence

      和 RDDs 类似,DStreams 同样允许开发者将流数据保存在内存中。也就是说,在 DStream 上
    使用 persist()方法将会自动把 DStreams 中的每个 RDD 保存在内存中。当 DStream 中的数据要被
    多次计算时,这个非常有用(如在同样数据上的多次操作)。对于像 reduceByWindow 和
    reduceByKeyAndWindow 以及基于状态的(updateStateByKey)这种操作,保存是隐含默认的。因此,
    即使开发者没有调用 persist(),由基于窗操作产生的 DStreams 会自动保存在内存中。
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
  • 相关阅读:
    Ethical Hacking
    Ethical Hacking
    Ethical Hacking
    Ethical Hacking
    Ethical Hacking
    Ethical Hacking
    Ethical Hacking
    Can you answer these queries? HDU
    Count the Colors ZOJ
    Balanced Lineup POJ
  • 原文地址:https://www.cnblogs.com/LXL616/p/11161494.html
Copyright © 2011-2022 走看看