import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object DStream_转换操作 { def main(args: Array[String]): Unit = { val conf=new SparkConf().setAppName("转换操作").setMaster("local[2]") val sc=new StreamingContext(conf,Seconds(4)) val lines=sc.socketTextStream("localhost",8899) sc.checkpoint("file:///usr/local2/spark/mycode/kafa3/checkpoint") val words=lines.flatMap(x=>x.split(" ")) val wordsStream=words.map(x=>(x,1)) 3.val stateStream=wordsStream.updateStateByKey[Int](update) sc.checkpoint("file:///usr/local2/spark/mycode/kafa2/checkpoint") 1. //val wordCount=words.map(x=>(x,1)).reduceByKeyAndWindow(_+_,_-_,Seconds(16),Seconds(4),2)//DStream有状态转换操作 2. val wordCount=words.map(x=>(x,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(16),Seconds(4),2) wordCount.print(100) stateStream.print() sc.start() sc.awaitTermination() } val update=(values:Seq[Int],state:Option[Int])=>{ val currentCount=values.foldLeft(0)(_+_) val previousCount= state.getOrElse(0) Some(currentCount+previousCount) } }
注意:
reduceByKeyAndWindow中的Seconds(16)是滑动窗口长度,Seconds(4)是滑动窗口时间间隔(每隔多长时间滑动一次窗口)这两个值必须是 new StreamingContext(conf,Seconds(4)) 中Seconds(4)的倍数(>=1)
如果第二个4<滑动窗口时间间隔 程序结果的时间线就变成了以滑动窗口时间间隔为准
1,2,3区别:
1.会保留历史对象的名字列表
2.不会保留
3.在历史值的基础上累加,但(1,2)会随着窗口滑动,所有对象的值会变为0
4.(1和2适合统计实时时间段内词频)