zoukankan      html  css  js  c++  java
  • IDEA Spark Streaming 操作(文件源)

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object DStream_file {
      def main(args: Array[String]): Unit = {
            val sparkConf=new SparkConf().setAppName("wordCount").setMaster("local[4]")
            val ss=new StreamingContext(sparkConf,Seconds(15)) //每15秒监听一次sreaming文件夹
            val lines=ss.textFileStream("file:///usr/local2/spark/mycode/streaming")
            val words=lines.flatMap(_.split(" "))
            val wordCounts=words.map(x=>(x,1)).reduceByKey((x,y)=>x+y)
            wordCounts.print(100)   //打印100组
             ss.start()
             ss.awaitTerminationOrTimeout(100000)  //运行100秒程序自动结束
      }
    }

    结果:

    -------------------------------------------
    Time: 1508045550000 ms
    -------------------------------------------

    -------------------------------------------
    Time: 1508045565000 ms
    -------------------------------------------

    -------------------------------------------
    Time: 1508045580000 ms
    -------------------------------------------
    (88,2)
    (4,1)
    (8,1)
    (ya,1)
    (55,2)
    (me,2)
    (49,1)
    (i,4)
    (9,1)
    (but,1)
    (1,2)
    (dont,1)
    (2,2)
    (79,1)
    (you,4)
    (know,2)
    (3,2)
    (like,2)
    (76,1)

    -------------------------------------------
    Time: 1508045595000 ms
    -------------------------------------------

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    /**
    * Created by soyo on 17-10-15.
    */
    object DStream_file {
    def main(args: Array[String]): Unit = {
    val sparkConf=new SparkConf().setAppName("wordCount").setMaster("local[2]")
    val ss=new StreamingContext(sparkConf,Seconds(10))
    val lines=ss.textFileStream("file:///usr/local2/spark/mycode/streaming")
    val words=lines.flatMap(_.split(" "))
    val wordCounts=words.map(x=>(x,1)).reduceByKey((x,y)=>x+y)
    wordCounts.print(100)
    ss.start()
    ss.awaitTerminationOrTimeout(20000) //运行20秒程序自动结束
    }

    }
  • 相关阅读:
    快捷JS PHP
    css userAgent (简易浏览器区分) PHP
    http://fw.qq.com/ipaddress PHP
    JS竖排文字 PHP
    奇怪的body PHP
    使用36进制,无损压缩GUID到26位 PHP
    链接<a>执行JS PHP
    纯JS省市区三级联动 PHP
    Table 样式 PHP
    Exceeded storage allocation. The server response was: 4.3.1 Message size exceeds fixed maximum message size
  • 原文地址:https://www.cnblogs.com/soyo/p/7670710.html
Copyright © 2011-2022 走看看