zoukankan      html  css  js  c++  java
  • IDEA Spark Streaming 操作(文件源)

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object DStream_file {
      def main(args: Array[String]): Unit = {
            val sparkConf=new SparkConf().setAppName("wordCount").setMaster("local[4]")
            val ss=new StreamingContext(sparkConf,Seconds(15)) //每15秒监听一次sreaming文件夹
            val lines=ss.textFileStream("file:///usr/local2/spark/mycode/streaming")
            val words=lines.flatMap(_.split(" "))
            val wordCounts=words.map(x=>(x,1)).reduceByKey((x,y)=>x+y)
            wordCounts.print(100)   //打印100组
             ss.start()
             ss.awaitTerminationOrTimeout(100000)  //运行100秒程序自动结束
      }
    }

    结果:

    -------------------------------------------
    Time: 1508045550000 ms
    -------------------------------------------

    -------------------------------------------
    Time: 1508045565000 ms
    -------------------------------------------

    -------------------------------------------
    Time: 1508045580000 ms
    -------------------------------------------
    (88,2)
    (4,1)
    (8,1)
    (ya,1)
    (55,2)
    (me,2)
    (49,1)
    (i,4)
    (9,1)
    (but,1)
    (1,2)
    (dont,1)
    (2,2)
    (79,1)
    (you,4)
    (know,2)
    (3,2)
    (like,2)
    (76,1)

    -------------------------------------------
    Time: 1508045595000 ms
    -------------------------------------------

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    /**
    * Created by soyo on 17-10-15.
    */
    object DStream_file {
    def main(args: Array[String]): Unit = {
    val sparkConf=new SparkConf().setAppName("wordCount").setMaster("local[2]")
    val ss=new StreamingContext(sparkConf,Seconds(10))
    val lines=ss.textFileStream("file:///usr/local2/spark/mycode/streaming")
    val words=lines.flatMap(_.split(" "))
    val wordCounts=words.map(x=>(x,1)).reduceByKey((x,y)=>x+y)
    wordCounts.print(100)
    ss.start()
    ss.awaitTerminationOrTimeout(20000) //运行20秒程序自动结束
    }

    }
  • 相关阅读:
    ceph之image(转)
    CEPH集群RBD快照创建、恢复、删除、克隆(转)
    java操作ceph之rbd基本操作
    Kubernetes (1.6) 中的存储类及其动态供给
    Linux 网络编程详解九
    Linux 网络编程详解八
    Linux 网络编程详解七(并发僵尸进程处理)
    Linux 网络编程详解六(多进程服务器僵尸进程解决方案)
    Linux 网络编程详解五(TCP/IP协议粘包解决方案二)
    C语言 memset函数盲点
  • 原文地址:https://www.cnblogs.com/soyo/p/7670710.html
Copyright © 2011-2022 走看看