zoukankan      html  css  js  c++  java
  • IDEA Spark Streaming 操作(文件源)

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object DStream_file {
      def main(args: Array[String]): Unit = {
            val sparkConf=new SparkConf().setAppName("wordCount").setMaster("local[4]")
            val ss=new StreamingContext(sparkConf,Seconds(15)) //每15秒监听一次sreaming文件夹
            val lines=ss.textFileStream("file:///usr/local2/spark/mycode/streaming")
            val words=lines.flatMap(_.split(" "))
            val wordCounts=words.map(x=>(x,1)).reduceByKey((x,y)=>x+y)
            wordCounts.print(100)   //打印100组
             ss.start()
             ss.awaitTerminationOrTimeout(100000)  //运行100秒程序自动结束
      }
    }

    结果:

    -------------------------------------------
    Time: 1508045550000 ms
    -------------------------------------------

    -------------------------------------------
    Time: 1508045565000 ms
    -------------------------------------------

    -------------------------------------------
    Time: 1508045580000 ms
    -------------------------------------------
    (88,2)
    (4,1)
    (8,1)
    (ya,1)
    (55,2)
    (me,2)
    (49,1)
    (i,4)
    (9,1)
    (but,1)
    (1,2)
    (dont,1)
    (2,2)
    (79,1)
    (you,4)
    (know,2)
    (3,2)
    (like,2)
    (76,1)

    -------------------------------------------
    Time: 1508045595000 ms
    -------------------------------------------

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    /**
    * Created by soyo on 17-10-15.
    */
    object DStream_file {
    def main(args: Array[String]): Unit = {
    val sparkConf=new SparkConf().setAppName("wordCount").setMaster("local[2]")
    val ss=new StreamingContext(sparkConf,Seconds(10))
    val lines=ss.textFileStream("file:///usr/local2/spark/mycode/streaming")
    val words=lines.flatMap(_.split(" "))
    val wordCounts=words.map(x=>(x,1)).reduceByKey((x,y)=>x+y)
    wordCounts.print(100)
    ss.start()
    ss.awaitTerminationOrTimeout(20000) //运行20秒程序自动结束
    }

    }
  • 相关阅读:
    ExtJs 4 的filefield上传后 返回值success接受不正常
    winform treeview 通过节点名称添加子节点
    Live Writer Test
    mysqlcluster笔记
    ExtJS中form提交之后获取返回的json值
    MYSQL大小写(由于数据由windows迁移到Linux导致)
    ORACLE分科目统计每科前三名的学生的语句
    一列数据横排显示
    Oracle11g使用exp导出空表
    myeclipse2015卸载、安装、破解全过程-----myeclipse2015
  • 原文地址:https://www.cnblogs.com/soyo/p/7670710.html
Copyright © 2011-2022 走看看