5.DStream 输出
输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据
库或输出到屏幕上)。与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没
有被执行输出操作,那么这些 DStream 就都不会被求值。如果 StreamingContext 中没有设定输出
操作,整个 context 就都不会启动。
输出操作如下:
(1)print():在运行流程序的驱动结点上打印 DStream 中每一批次数据的最开始 10 个元素。这
用于开发和调试。在 Python API 中,同样的操作叫 print()。
(2)saveAsTextFiles(prefix, [suffix]):以 text 文件形式存储这个 DStream 的内容。每一批次的存
储文件名基于参数中的 prefix 和 suffix。”prefix-Time_IN_MS[.suffix]”.
(3)saveAsObjectFiles(prefix, [suffix]):以 Java 对象序列化的方式将 Stream 中的数据保存为
SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python 中
目前不可用。
(4)saveAsHadoopFiles(prefix, [suffix]):将 Stream 中的数据保存为 Hadoop files. 每一批次的存
储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。Python API Python 中目前不可用。
(5)foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream 的每一个
RDD。其中参数传入的函数 func 应该实现将每一个 RDD 中数据推送到外部系统,如将 RDD 存
入文件或者通过网络将其写入数据库。注意:函数 func 在运行流应用的驱动中被执行,同时其中
一般函数 RDD 操作从而强制其对于流 RDD 的运算。
通用的输出操作 foreachRDD(),它用来对 DStream 中的 RDD 运行任意计算。这和 transform()
有些类似,都可以让我们访问任意 RDD。在 foreachRDD()中,可以重用我们在 Spark 中实现的所
有行动操作。比如,常见的用例之一是把数据写到诸如 MySQL 的外部数据库中。
注意:
(1)连接不能写在 driver 层面;
(2)如果写在 foreach 则每个 RDD 都创建,得不偿失;
(3)增加 foreachPartition,在分区创建。
第 6 章 DStream 编程进阶
6.1 累加器和广播变量
累加器(Accumulators)和广播变量(Broadcast variables)不能从 Spark Streaming 的检查点中恢复。
如果你启用检查并也使用了累加器和广播变量,那么你必须创建累加器和广播变量的延迟单实例
从而在驱动因失效重启后他们可以被重新实例化。如下例述:
object WordBlacklist { @volatile private var instance: Broadcast[Seq[String]] = null def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { if (instance == null) { synchronized { if (instance == null) { val wordBlacklist = Seq("a", "b", "c") instance = sc.broadcast(wordBlacklist) } } } instance } }
object DroppedWordsCounter { @volatile private var instance: LongAccumulator = null def getInstance(sc: SparkContext): LongAccumulator = { if (instance == null) { synchronized { if (instance == null) { instance = sc.longAccumulator("WordsInBlacklistCounter") } } } instance } }
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => // Get or register the blacklist Broadcast val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use blacklist to drop words and use droppedWordsCounter to count them val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { droppedWordsCounter.add(count) false } else { true } }.collect().mkString("[", ", ", "]")
val output = "Counts at time " + time + " " + counts })
6.2 DataFrame ans SQL Operations
你可以很容易地在流数据上使用 DataFrames 和 SQL。你必须使用 SparkContext 来创建
StreamingContext 要用的 SQLContext。此外,这一过程可以在驱动失效后重启。我们通过创建一
个实例化的 SQLContext 单实例来实现这个工作。如下例所示。我们对前例 word count 进行修改
从而使用 DataFrames 和 SQL 来产生 word counts。每个 RDD 被转换为 DataFrame,以临时表格配
置并用 SQL 进行查询。
val words: DStream[String] = ...
words.foreachRDD { rdd =>
// Get the singleton instance of SparkSession val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._ // Convert RDD[String] to DataFrame val wordsDataFrame = rdd.toDF("word")
// Create a temporary view wordsDataFrame.createOrReplaceTempView("words")
// Do word count on DataFrame using SQL and print it val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() }
你也可以从不同的线程在定义于流数据的表上运行 SQL 查询(也就是说,异步运行
StreamingContext)。仅确定你设置 StreamingContext 记住了足够数量的流数据以使得查询操作可
以运行。否则,StreamingContext 不会意识到任何异步的 SQL 查询操作,那么其就会在查询完成
之后删除旧的数据。例如,如果你要查询最后一批次,但是你的查询会运行 5 分钟,那么你需要
调用 streamingContext.remember(Minutes(5))(in Scala, 或者其他语言的等价操作)。
6.3 Caching / Persistence
和 RDDs 类似,DStreams 同样允许开发者将流数据保存在内存中。也就是说,在 DStream 上
使用 persist()方法将会自动把 DStreams 中的每个 RDD 保存在内存中。当 DStream 中的数据要被
多次计算时,这个非常有用(如在同样数据上的多次操作)。对于像 reduceByWindow 和
reduceByKeyAndWindow 以及基于状态的(updateStateByKey)这种操作,保存是隐含默认的。因此,
即使开发者没有调用 persist(),由基于窗操作产生的 DStreams 会自动保存在内存中。