zoukankan      html  css  js  c++  java
  • IDEA Spark Streaming 操作(文件源)

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object DStream_file {
      def main(args: Array[String]): Unit = {
            val sparkConf=new SparkConf().setAppName("wordCount").setMaster("local[4]")
            val ss=new StreamingContext(sparkConf,Seconds(15)) //每15秒监听一次sreaming文件夹
            val lines=ss.textFileStream("file:///usr/local2/spark/mycode/streaming")
            val words=lines.flatMap(_.split(" "))
            val wordCounts=words.map(x=>(x,1)).reduceByKey((x,y)=>x+y)
            wordCounts.print(100)   //打印100组
             ss.start()
             ss.awaitTerminationOrTimeout(100000)  //运行100秒程序自动结束
      }
    }

    结果:

    -------------------------------------------
    Time: 1508045550000 ms
    -------------------------------------------

    -------------------------------------------
    Time: 1508045565000 ms
    -------------------------------------------

    -------------------------------------------
    Time: 1508045580000 ms
    -------------------------------------------
    (88,2)
    (4,1)
    (8,1)
    (ya,1)
    (55,2)
    (me,2)
    (49,1)
    (i,4)
    (9,1)
    (but,1)
    (1,2)
    (dont,1)
    (2,2)
    (79,1)
    (you,4)
    (know,2)
    (3,2)
    (like,2)
    (76,1)

    -------------------------------------------
    Time: 1508045595000 ms
    -------------------------------------------

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    /**
    * Created by soyo on 17-10-15.
    */
    object DStream_file {
    def main(args: Array[String]): Unit = {
    val sparkConf=new SparkConf().setAppName("wordCount").setMaster("local[2]")
    val ss=new StreamingContext(sparkConf,Seconds(10))
    val lines=ss.textFileStream("file:///usr/local2/spark/mycode/streaming")
    val words=lines.flatMap(_.split(" "))
    val wordCounts=words.map(x=>(x,1)).reduceByKey((x,y)=>x+y)
    wordCounts.print(100)
    ss.start()
    ss.awaitTerminationOrTimeout(20000) //运行20秒程序自动结束
    }

    }
  • 相关阅读:
    html添加遮罩层
    html绝对定位如何居中
    springboot上传文件到本地服务器
    遍历一个文件目录,把所有的子目录都列出来
    Java判断cookie中是否有某个具体的cookie
    Redis 如何打开rdb 文件
    crontab -e 如何保存退出
    linux服务器设备上没有空间
    IDEA的SonarLint插件报错Unable to create symbol table for
    使用Rome读取RSS报错,org.xml.sax.SAXParseException: 前言中不允许有内容。
  • 原文地址:https://www.cnblogs.com/soyo/p/7670710.html
Copyright © 2011-2022 走看看