zoukankan      html  css  js  c++  java
  • DStream 转换操作------有状态转换操作

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    
    object DStream_转换操作 {
      def main(args: Array[String]): Unit = {
        val conf=new SparkConf().setAppName("转换操作").setMaster("local[2]")
        val sc=new StreamingContext(conf,Seconds(4))
        val lines=sc.socketTextStream("localhost",8899)
        sc.checkpoint("file:///usr/local2/spark/mycode/kafa3/checkpoint")
    
        val words=lines.flatMap(x=>x.split(" "))
        val wordsStream=words.map(x=>(x,1))
        3.val stateStream=wordsStream.updateStateByKey[Int](update)
          sc.checkpoint("file:///usr/local2/spark/mycode/kafa2/checkpoint")
        1. //val wordCount=words.map(x=>(x,1)).reduceByKeyAndWindow(_+_,_-_,Seconds(16),Seconds(4),2)//DStream有状态转换操作
        2. val wordCount=words.map(x=>(x,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(16),Seconds(4),2)
         wordCount.print(100)
        stateStream.print()
        sc.start()
        sc.awaitTermination()
      }
       val update=(values:Seq[Int],state:Option[Int])=>{
         val currentCount=values.foldLeft(0)(_+_)
         val previousCount= state.getOrElse(0)
         Some(currentCount+previousCount)
       }
    }

    注意:

    reduceByKeyAndWindow中的Seconds(16)是滑动窗口长度,Seconds(4)是滑动窗口时间间隔(每隔多长时间滑动一次窗口)这两个值必须是 new StreamingContext(conf,Seconds(4)) 中Seconds(4)的倍数(>=1)
    如果第二个4<滑动窗口时间间隔 程序结果的时间线就变成了以滑动窗口时间间隔为准
    1,2,3区别:
    1.会保留历史对象的名字列表
    2.不会保留
    3.在历史值的基础上累加,但(1,2)会随着窗口滑动,所有对象的值会变为0
    4.(1和2适合统计实时时间段内词频)
  • 相关阅读:
    Delphi
    delphi trayIcon控件,如何实现窗口最小化的时候到系统托盘
    delphi2010自带 TTrayIcon 托盘图标控件使用方法
    通过例子来简单了解下TProgressBar的使用。 pas文件程序如下
    ORA-12154,ORA-12560解决过程
    博客备份小工具3
    各大招聘网站信息实时查询浏览
    IE7中使用Jquery动态操作name问题
    js问题杂记
    动态sql
  • 原文地址:https://www.cnblogs.com/soyo/p/7700356.html
Copyright © 2011-2022 走看看