1.说明
DStream的API不够满足使用的时候,可以使用这两个函数,将dstream转换为rdd,然后进行操作
2.transform
transform:将DStream的操作转换为RDD的操作,调用该api最终只需要返回一个新的RDD即可
3.程序
1 package com.window.it 2 import org.apache.spark.{SparkConf, SparkContext} 3 import org.apache.spark.storage.StorageLevel 4 import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext} 5 import org.apache.spark.streaming.dstream.DStream 6 import org.apache.spark.streaming.kafka.KafkaUtils 7 object TransformDemo { 8 def main(args: Array[String]): Unit = { 9 val conf = new SparkConf() 10 .setAppName("StreamingWindowOfKafka") 11 .setMaster("local[*]") 12 val sc = SparkContext.getOrCreate(conf) 13 val ssc = new StreamingContext(sc, Seconds(5)) 14 // 当调用updateStateByKey函数API的时候,必须给定checkpoint dir 15 // 路径对应的文件夹不能存在 16 ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/4525712") 17 18 val kafkaParams = Map( 19 "group.id" -> "streaming-kafka-78912151", 20 "zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka", 21 "auto.offset.reset" -> "smallest" 22 ) 23 val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于1 24 val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder]( 25 ssc, // 给定SparkStreaming上下文 26 kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接 27 topics, // 给定读取对应topic的名称以及读取数据的线程数量 28 StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别 29 ).map(_._2) 30 31 val resultWordCount = dstream 32 .filter(line => line.nonEmpty) 33 .flatMap(line => line.split(" ").map((_, 1))) 34 .reduceByKeyAndWindow( 35 (a: Int, b: Int) => a + b, 36 Seconds(15), // 窗口大小 37 Seconds(10) // 滑动大小 38 ) 39 resultWordCount.print() // 这个也是打印数据 40 41 /** 42 * transform:将DStream的操作转换为RDD的操作,调用该api最终只需要返回一个新的RDD即可 43 */ 44 dstream.transform(rdd => { 45 // 对rdd进行预处理 46 val processedRDD = rdd 47 .filter(line => line.nonEmpty) 48 .flatMap(line => line.split(" ").map((_, 1))) 49 .reduceByKey(_ + _) 50 // 数据抽样,获取两个节点 51 val seeder = processedRDD.takeSample(true, 2) 52 // 对rdd进行处理操作, 将抽样数据和rdd中的数据进行比较,如果rdd中的word的出现次数大于等于抽样数据中的任何一个word的次数,次数*3;否则次数*2 53 val brocast = rdd.sparkContext.broadcast(seeder) 54 val resultRDD = processedRDD.mapPartitions(iter => { 55 val seederValue = brocast.value 56 iter.map { 57 case (word, count) => { 58 val vc = seederValue 59 .filter(tuple => { 60 count >= tuple._2 61 }).size 62 if (vc == 0) { 63 (word, 2, count * 2) 64 } else { 65 (word, 3, count * 3) 66 } 67 } 68 } 69 }) 70 resultRDD 71 }).print() 72 73 // 启动开始处理 74 ssc.start() 75 ssc.awaitTermination() // 等等结束,监控一个线程的中断操作 76 } 77 }
4.foreachRDD
作用和transform类型,将DStream的操作转换为RDD进行操作,区别:该api没有返回值
5.程序
1 package com.window.it 2 3 import org.apache.spark.storage.StorageLevel 4 import org.apache.spark.streaming.kafka.KafkaUtils 5 import org.apache.spark.streaming.{Seconds, StreamingContext} 6 import org.apache.spark.{SparkConf, SparkContext} 7 8 object TransformDemo { 9 def main(args: Array[String]): Unit = { 10 val conf = new SparkConf() 11 .setAppName("StreamingWindowOfKafka") 12 .setMaster("local[*]") 13 val sc = SparkContext.getOrCreate(conf) 14 val ssc = new StreamingContext(sc, Seconds(5)) 15 // 当调用updateStateByKey函数API的时候,必须给定checkpoint dir 16 // 路径对应的文件夹不能存在 17 ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/4525712") 18 19 val kafkaParams = Map( 20 "group.id" -> "streaming-kafka-78912151", 21 "zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka", 22 "auto.offset.reset" -> "smallest" 23 ) 24 val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于1 25 val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder]( 26 ssc, // 给定SparkStreaming上下文 27 kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接 28 topics, // 给定读取对应topic的名称以及读取数据的线程数量 29 StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别 30 ).map(_._2) 31 32 val resultWordCount = dstream 33 .filter(line => line.nonEmpty) 34 .flatMap(line => line.split(" ").map((_, 1))) 35 .reduceByKeyAndWindow( 36 (a: Int, b: Int) => a + b, 37 Seconds(15), // 窗口大小 38 Seconds(10) // 滑动大小 39 ) 40 resultWordCount.print() // 这个也是打印数据 41 42 dstream.foreachRDD(rdd => { 43 // TODO: 这里就可以做数据输出的代码编写 44 // TODO: 这里不要为空 45 rdd.foreachPartition(iter => { 46 // TODO: 这里在实际环境中不要为空,为空可能会出现一些问题:内存泄露的问题 47 println(iter.take(1)) 48 }) 49 }) 50 51 // 启动开始处理 52 ssc.start() 53 ssc.awaitTermination() // 等等结束,监控一个线程的中断操作 54 } 55 }
6.注意点
一个批次,DStream内部就只对应一个RDD,transform和foreachRDD API使用的过程中,不要考虑多个RDD的问题