Spark Streaming-DStream实战案例
作者:尹正杰
版权声明:原创作品,谢绝转载!否则将追究法律责任。
一.DStream创建
1>.文件数据源采集
文件数据流:
能够读取所有HDFS API兼容的文件系统文件,通过fileStream方法进行读取,Spark Streaming 将会监控 dataDirectory 目录并不断处理移动进来的文件,记住目前不支持嵌套目录。
streamingContext.textFileStream(dataDirectory)注意事项: 1)文件需要有相同的数据格式; 2)文件进入 dataDirectory的方式需要通过移动或者重命名来实现; 3)一旦文件移动进目录,则不能再修改,即便修改了也不会读取新数据;
温馨提示:
虽然Spark支持文件的采集,但是生产环境中不建议大家使用,毕竟需要咱们自己写scala代码,我建议大家使用比较成熟的采集工具,比如filebeat,logstash,flume等工具。
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
package com.yinzhengjie.bigdata.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object FileDataSource { def main(args: Array[String]): Unit = { /** * 1>.初始化Spark配置信息 */ val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingWordCount") /** * 2>.初始化SparkStreamingContext(实时数据分析环境对象) * * 自定义采集周期: * 以指定的时间为周期采集实时数据。我这里指定采集周期是5秒.生产环境中我们可以将这个值改小,比如每秒采集一次. */ val ssc = new StreamingContext(sparkConf, Seconds(5)) /** * 3>.监控文件夹创建DStream(即从指定文件夹中采集数据) */ val socketLineDStream:ReceiverInputDStream[String] = ssc.socketTextStream("hadoop101.yinzhengjie.org.cn", 8888) val fileDStream:DStream[String] = ssc.textFileStream("E:\yinzhengjie\bigdata\input\test") /** * 4>.将采集的数据进行扁平化操作(即将每一行数据做切分,形成一个个单词) */ val wordDStreams:DStream[String] = fileDStream.flatMap(_.split(" ")) /** * 5>.将数据进行结构的转换方便统计分析(即将单词映射成元组(word,1)) */ val wordAndOneStreams:DStream[(String,Int)] = wordDStreams.map((_, 1)) /** * 6>.将相同的单词次数做统计 */ val wordToCountDStream:DStream[(String,Int)] = wordAndOneStreams.reduceByKey(_+_) /** * 7>.将结果打印出来 */ wordToCountDStream.print() /** * 8>.启动(SparkStreamingContext)采集器 */ ssc.start() /** * 9>.Driver等待采集器的执行(即禁止main线程主动退出) */ ssc.awaitTermination() /** * 温馨提示: * 咱们的程序是实时处理数据的,因此生产环境中不能停止采集程序,因此不建议使用哟~ */ // ssc.stop() } }
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
package com.yinzhengjie.bigdata.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object HdfsDataSource { def main(args: Array[String]): Unit = { //1.初始化Spark配置信息 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount") //2.初始化SparkStreamingContext val ssc:StreamingContext = new StreamingContext(sparkConf, Seconds(5)) //3.监控HDFS文件夹创建DStream val dirStream:DStream[String] = ssc.textFileStream("hdfs://hadoop101.yinzhengjie.org.cn:9000/spark/fileStream") //4.将每一行数据做切分,形成一个个单词 val wordStreams:DStream[String] = dirStream.flatMap(_.split(" ")) //5.将单词映射成元组(word,1) val wordAndOneStreams:DStream[(String,Int)] = wordStreams.map((_, 1)) //6.将相同的单词次数做统计 val wordAndCountStreams:DStream[(String,Int)] = wordAndOneStreams.reduceByKey(_ + _) //7.打印 wordAndCountStreams.print() //8.启动SparkStreamingContext ssc.start() ssc.awaitTermination() } }
2>.RDD队列
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
package com.yinzhengjie.bigdata.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable object RDDStream { def main(args: Array[String]) { //1.初始化Spark配置信息 val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream") //2.初始化SparkStreamingContext val ssc = new StreamingContext(conf, Seconds(5)) //3.创建RDD队列(后期在队列中不断的放RDD) val rddQueue = new mutable.Queue[RDD[Int]]() /** * 4.创建QueueInputDStream * * 温馨提示: * 测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理。 */ val inputStream = ssc.queueStream(rddQueue,oneAtATime = false) //5.处理队列中的RDD数据 val mappedStream = inputStream.map((_,1)) val reducedStream = mappedStream.reduceByKey(_ + _) //6.打印结果 reducedStream.print() //7.启动任务 ssc.start() //8.循环创建并向RDD队列中放入RDD for (i <- 1 to 5) { rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10) Thread.sleep(2000) } ssc.awaitTermination() } }
3>.自定义数据源
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
package com.yinzhengjie.bigdata.spark.streaming import java.io.{BufferedReader, InputStreamReader} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver import java.net.Socket import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object CustomReceiver { def main(args: Array[String]): Unit = { /** * 1>.初始化Spark配置信息 */ val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingWordCount") /** * 2>.初始化SparkStreamingContext(实时数据分析环境对象) * * 自定义采集周期: * 以指定的时间为周期采集实时数据。我这里指定采集周期是5秒.生产环境中我们可以将这个值改小,比如每秒采集一次. */ val ssc = new StreamingContext(sparkConf, Seconds(5)) /** * 3>.通过咱们自定义的接收器来采集数据源 */ val receiverDStream:ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver("hadoop101.yinzhengjie.org.cn",8888)) /** * 4>.将采集的数据进行扁平化操作(即将每一行数据做切分,形成一个个单词) */ val wordDStreams:DStream[String] = receiverDStream.flatMap(_.split(" ")) /** * 5>.将数据进行结构的转换方便统计分析(即将单词映射成元组(word,1)) */ val wordAndOneStreams:DStream[(String,Int)] = wordDStreams.map((_, 1)) /** * 6>.将相同的单词次数做统计 */ val wordToCountDStream:DStream[(String,Int)] = wordAndOneStreams.reduceByKey(_+_) /** * 7>.将结果打印出来 */ wordToCountDStream.print() /** * 8>.启动(SparkStreamingContext)采集器 */ ssc.start() /** * 9>.Driver等待采集器的执行(即禁止main线程主动退出) */ ssc.awaitTermination() /** * 温馨提示: * 咱们的程序是实时处理数据的,因此生产环境中不能停止采集程序,因此不建议使用哟~ */ // ssc.stop() } } /** * 声明采集器 */ class MyReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){ var socket:Socket = null override def onStart(): Unit = { new Thread(new Runnable { override def run(): Unit = { receive() } }).start() } override def onStop(): Unit = { if(socket != null){ socket.close() socket = null } } def receive(): Unit = { socket = new Socket(host,port) val reader = new BufferedReader(new InputStreamReader(socket.getInputStream,"UTF-8")) var line:String = null while ((line = reader.readLine()) != null) { //判断程序何时结束 if ("END".equals(line)){ return }else{ //将采集的数据存储到采集器的内部进行转换 this.store(line) } } } }
4>.kafka数据源
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.5.0</version> </dependency>
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
[root@kafka201.yinzhengjie.com ~]# kafka-topics.sh --bootstrap-server kafka203.yinzhengjie.com:9092 --list __consumer_offsets filebeat-ubuntu-syslog nginx-172-200-5-103 syslog-172-200-5-103 yinzhengjie-kafka [root@kafka201.yinzhengjie.com ~]# [root@kafka201.yinzhengjie.com ~]# kafka-topics.sh --bootstrap-server kafka203.yinzhengjie.com:9092 --create --topic yinzhengjie2020 --partitions 3 --replication-factor 2 Created topic yinzhengjie2020. [root@kafka201.yinzhengjie.com ~]# [root@kafka201.yinzhengjie.com ~]# [root@kafka201.yinzhengjie.com ~]# kafka-topics.sh --bootstrap-server kafka203.yinzhengjie.com:9092 --list __consumer_offsets filebeat-ubuntu-syslog nginx-172-200-5-103 syslog-172-200-5-103 yinzhengjie-kafka yinzhengjie2020 [root@kafka201.yinzhengjie.com ~]# [root@kafka201.yinzhengjie.com ~]#
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
package com.yinzhengjie.bigdata.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils object KafkaSource { def main(args: Array[String]): Unit = { /** * 1>.初始化Spark配置信息 */ val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingWordCount") /** * 2>.初始化SparkStreamingContext(实时数据分析环境对象) * * 自定义采集周期: * 以指定的时间为周期采集实时数据。我这里指定采集周期是5秒.生产环境中我们可以将这个值改小,比如每秒采集一次. */ val ssc = new StreamingContext(sparkConf, Seconds(5)) /** * 3>.使用spark官方提供的kafka工具类来采集数据 */ val kafkaDStream:ReceiverInputDStream[(String,String)] = KafkaUtils.createStream( //指定SparkStreamingContext对象 ssc, //指定zookeeper去集群,注意哈,我在部署kafka是在zookeeper指定了chroot路径了的,因此这里要写全哟~ "kafka201.yinzhengjie.com:2181/yinzhengjie-kafka", //指定消费者组id "yinzhengjie2020", //指定topic及分区数信息 Map("yinzhengjie2020" -> 3) ) /** * 4>.将采集的数据进行扁平化操作(即将每一行数据做切分,形成一个个单词) * * 温馨提示: * kafka传输的消息格式是(k,v),只不过平时我们不传递K的值默认为null,因此我们在做切割时应该只取value哟~ */ val wordDStreams:DStream[String] = kafkaDStream.flatMap(t => t._2.split(" ")) /** * 5>.将数据进行结构的转换方便统计分析(即将单词映射成元组(word,1)) */ val wordAndOneStreams:DStream[(String,Int)] = wordDStreams.map((_, 1)) /** * 6>.将相同的单词次数做统计 */ val wordToCountDStream:DStream[(String,Int)] = wordAndOneStreams.reduceByKey(_+_) /** * 7>.将结果打印出来 */ wordToCountDStream.print() /** * 8>.启动(SparkStreamingContext)采集器 */ ssc.start() /** * 9>.Driver等待采集器的执行(即禁止main线程主动退出) */ ssc.awaitTermination() /** * 温馨提示: * 咱们的程序是实时处理数据的,因此生产环境中不能停止采集程序,因此不建议使用哟~ */ // ssc.stop() } }
二.DStream转换
DStream上的原语与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。
1>.无状态转化操作
无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中。
温馨提示:
针对键值对的DStream转化操作(比如 reduceByKey())要添加import StreamingContext._才能在Scala中使用。
需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上的。例如,reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。
举个例子,在之前的wordcount程序中,我们只会统计5秒内接收到的数据的单词个数,而不会累加。
无状态转化操作也能在多个DStream间整合数据,不过也是在各个时间区间内。例如,键值对DStream拥有和RDD一样的与连接相关的转化操作,也就是cogroup()、join()、leftOuterJoin()等。
我们可以在DStream上使用这些操作,这样就对每个批次分别执行了对应的RDD操作。我们还可以像在常规的Spark 中一样使用 DStream的union() 操作将它和另一个DStream 的内容合并起来,也可以使用StreamingContext.union()来合并多个流。
2>.有状态转化操作
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
package com.yinzhengjie.bigdata.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils object UpdateStateByKeyDemo { def main(args: Array[String]): Unit = { /** * 1>.初始化Spark配置信息 */ val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingWordCount") /** * 2>.初始化SparkStreamingContext(实时数据分析环境对象) * * 自定义采集周期: * 以指定的时间为周期采集实时数据。我这里指定采集周期是5秒.生产环境中我们可以将这个值改小,比如每秒采集一次. */ val ssc = new StreamingContext(sparkConf, Seconds(5)) /** * 3>.保存数据的状态,需要设定检查点的路径 */ ssc.sparkContext.setCheckpointDir("E:\yinzhengjie\bigdata\spark\checkpoint") /** * 4>.使用spark官方提供的kafka工具类来采集数据 */ val kafkaDStream:ReceiverInputDStream[(String,String)] = KafkaUtils.createStream( //指定SparkStreamingContext对象 ssc, //指定zookeeper去集群,注意哈,我在部署kafka是在zookeeper指定了chroot路径了的,因此这里要写全哟~ "kafka201.yinzhengjie.com:2181/yinzhengjie-kafka", //指定消费者组id "yinzhengjie2020", //指定topic及分区数信息 Map("yinzhengjie2020" -> 3) ) /** * 5>.将采集的数据进行扁平化操作(即将每一行数据做切分,形成一个个单词) * * 温馨提示: * kafka传输的消息格式是(k,v),只不过平时我们不传递K的值默认为null,因此我们在做切割时应该只取value哟~ */ val wordDStreams:DStream[String] = kafkaDStream.flatMap(t => t._2.split(" ")) /** * 6>.将数据进行结构的转换方便统计分析(即将单词映射成元组(word,1)) */ val wordAndOneStreams:DStream[(String,Int)] = wordDStreams.map((_, 1)) /** * 7>.使用updateStateByKey来更新状态,统计从运行开始以来单词总的次数 * * UpdateStateByKey原语用于记录历史记录: * 有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加wordcount)。 * 针对这种情况,updateStateByKey() 为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream, * 并传递一个指定如何根据新的事件 更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态)对。 * updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。 * updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,你需要做下面两步: * 1>.定义状态,状态可以是一个任意的数据类型。 * 2>.定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。 * 使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。 */ val stateDStream:DStream[(String,Int)] = wordAndOneStreams.updateStateByKey{ case (seq,buffer) => { val sum = buffer.getOrElse(0) + seq.sum Option(sum) } } /** * 8.将结果打印出来 */ stateDStream.print() /** * 9>.启动(SparkStreamingContext)采集器 */ ssc.start() /** * 10>.Driver等待采集器的执行(即禁止main线程主动退出) */ ssc.awaitTermination() /** * 温馨提示: * 咱们的程序是实时处理数据的,因此生产环境中不能停止采集程序,因此不建议使用哟~ */ // ssc.stop() } }
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
package com.yinzhengjie.bigdata.spark.streaming object ScalaWindow { def main(args: Array[String]): Unit = { val list = List(10,20,30,40,50,60) //Scala默认支持滑动窗口函数 val ints:Iterator[List[Int]] = list.sliding(3) //每次滑动3个元素 for (list <- ints){ println(list.mkString(",")) } println("*" * 20 + " 我是分割线 " + "*" * 20 ) //Scala默认支持滑动窗口函数 val ints2:Iterator[List[Int]] = list.sliding(3,3) //每次滑动3个元素并指定步长 for (list <- ints2){ println(list.mkString(",")) } } }
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
package com.yinzhengjie.bigdata.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkWindowOperations { def main(args: Array[String]): Unit = { /** * 1>.初始化Spark配置信息 */ val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingWordCount") /** * 2>.初始化SparkStreamingContext(实时数据分析环境对象) * * 自定义采集周期: * 以指定的时间为周期采集实时数据。我这里指定采集周期是5秒.生产环境中我们可以将这个值改小,比如每秒采集一次. */ val ssc = new StreamingContext(sparkConf, Seconds(3)) /** * 3>.使用spark官方提供的kafka工具类来采集数据 */ val kafkaDStream:ReceiverInputDStream[(String,String)] = KafkaUtils.createStream( //指定SparkStreamingContext对象 ssc, //指定zookeeper去集群,注意哈,我在部署kafka是在zookeeper指定了chroot路径了的,因此这里要写全哟~ "kafka201.yinzhengjie.com:2181/yinzhengjie-kafka", //指定消费者组id "yinzhengjie2020", //指定topic及分区数信息 Map("yinzhengjie2020" -> 3) ) /** * 4>.设置spark的滑动窗口大小 * Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。 * 基于窗口的操作会在一个比 StreamingContext 的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。 * 所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长,两者都必须是 StreamContext 的批次间隔的整数倍。 * * 关于Window的操作有如下原语: * (1)window(windowLength, slideInterval): * 基于对源DStream窗化的批次进行计算返回一个新的Dstream * (2)countByWindow(windowLength, slideInterval): * 返回一个滑动窗口计数流中的元素。 * (3)reduceByWindow(func, windowLength, slideInterval): * 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流。 * (4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): * 当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。 * Note:默认情况下,这个操作使用Spark的默认数量并行任务(本地是2),在集群模式中依据配置属性(spark.default.parallelism)来做grouping。 * 你可以通过设置可选参数numTasks来设置不同数量的tasks。 * (5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): * 这个函数是上述函数的更高效版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。 * 通过reduce进入到滑动窗口数据并”反向reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对keys的“加”“减”计数。 * 通过前边介绍可以想到,这个函数只适用于”可逆的reduce函数”,也就是这些reduce函数有相应的”反reduce”函数(以参数invFunc形式传入)。 * 如前述函数,reduce任务的数量通过可选参数来配置。注意:为了使用这个操作,检查点必须可用。 * (6)countByValueAndWindow(windowLength,slideInterval, [numTasks]): * 对(K,V)对的DStream调用,返回(K,Long)对的新DStream,其中每个key的值是其在滑动窗口中频率。如上,可配置reduce任务数量。 * reduceByWindow() 和 reduceByKeyAndWindow() 让我们可以对每个窗口更高效地进行归约操作: * 它们接收一个归约函数,在整个窗口上执行,比如 +。 * 除此以外,它们还有一种特殊形式,通过只考虑新进入窗口的数据和离开窗口的数据,让 Spark 增量计算归约结果。 * 这种特殊形式需要提供归约函数的一个逆函数,比 如 + 对应的逆函数为 -。对于较大的窗口,提供逆函数可以大大提高执行效率。 * * 温馨提示: * (1)窗口大小应该是采集周期的整数倍 * (2)窗口滑动步长也应该是采集周期的整数倍 */ val windowDStream:DStream[(String,String)] = kafkaDStream.window(Seconds(9),Seconds(3)) /** * 5>.将采集的数据进行扁平化操作(即将每一行数据做切分,形成一个个单词) * * 温馨提示: * kafka传输的消息格式是(k,v),只不过平时我们不传递K的值默认为null,因此我们在做切割时应该只取value哟~ */ val wordDStreams:DStream[String] = windowDStream.flatMap(t => t._2.split(" ")) /** * 6>.将数据进行结构的转换方便统计分析(即将单词映射成元组(word,1)) */ val wordAndOneStreams:DStream[(String,Int)] = wordDStreams.map((_, 1)) /** * 7>.将相同的单词次数做统计 */ val wordToCountDStream:DStream[(String,Int)] = wordAndOneStreams.reduceByKey(_+_) /** * 8>.将结果打印出来 */ wordToCountDStream.print() /** * 9>.启动(SparkStreamingContext)采集器 */ ssc.start() /** * 10>.Driver等待采集器的执行(即禁止main线程主动退出) */ ssc.awaitTermination() /** * 温馨提示: * 咱们的程序是实时处理数据的,因此生产环境中不能停止采集程序,因此不建议使用哟~ */ // ssc.stop() } }
3>.其它重要操作
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
Transform原语允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。 比如下面的例子,在进行单词统计的时候,想要过滤掉spam的信息。 val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information val cleanedDStream = wordCounts.transform { rdd => rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... }
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
连接操作(leftOuterJoin, rightOuterJoin, fullOuterJoin也可以),可以连接Stream-Stream,windows-stream to windows-stream、stream-dataset Stream-Stream Joins val stream1: DStream[String, String] = ... val stream2: DStream[String, String] = ... val joinedStream = stream1.join(stream2) val windowedStream1 = stream1.window(Seconds(20)) val windowedStream2 = stream2.window(Minutes(1)) val joinedStream = windowedStream1.join(windowedStream2) Stream-dataset joins val dataset: RDD[String, String] = ... val windowedStream = stream.window(Seconds(20))... val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
三.DStream输出
输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。
与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个context就都不会启动。
输出操作如下: print():
在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中,同样的操作叫print()。 saveAsTextFiles(prefix, [suffix]):
以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”. saveAsObjectFiles(prefix, [suffix]):
以Java对象序列化的方式将Stream中的数据保存为 SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。 saveAsHadoopFiles(prefix, [suffix]):
将Stream中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。 Python API Python中目前不可用。 foreachRDD(func):
这是最通用的输出操作,即将函数 func 用于产生于 stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。
注意:函数func在运行流应用的驱动中被执行,同时其中一般函数RDD操作从而强制其对于流RDD的运算。 通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算。这和transform() 有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。 比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中。 注意: (1)连接不能写在driver层面; (2)如果写在foreach则每个RDD都创建,得不偿失; (3)增加foreachPartition,在分区创建。