zoukankan      html  css  js  c++  java
  • 编程模型:数据接收层

    QueueStream(主要是做实验用)

    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    import scala.collection.mutable.Queue
    
    object QueueStream {
    
      def main(args: Array[String]) {
        val sparkConf = new SparkConf().setAppName("QueueStream")
        val sc = new SparkContext(sparkConf)
        // Create the context
        val ssc = new StreamingContext(sc, Seconds(1))
    
        // 创建一个RDD类型的queue    Int类型的RDD
        val rddQueue = new Queue[RDD[Int]]()
    
        // 创建QueueInputDStream 且接受数据和处理数据
        //  queueStream  接收 rddQueue
        val inputStream = ssc.queueStream(rddQueue)
        
        // 统计与10取模的个数
        val mappedStream = inputStream.map(x => (x % 10, 1))
        val reducedStream = mappedStream.reduceByKey(_ + _)
        reducedStream.print()
    
        ssc.start()
    
        // 将RDD push到queue中,实时处理
        rddQueue += ssc.sparkContext.makeRDD(1 to 1000, 10)
    
        ssc.stop(false)
      }
    }
    

      HdfsFileStream

    import org.apache.hadoop.fs.Path
    import org.apache.hadoop.io.{LongWritable, Text}
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
     * Counts words in new text files created in the given directory
      * 1、监控目录下的文件的格式必须是统一的
      * 2、不支持嵌入文件目录
      * 3、一旦文件移动到这个监控目录下,是不能变的,往文件中追加的数据是不会被读取的
      * spark-shell --master spark://master:7077 --total-executor-cores 4 --executor-cores 2
      * hadoop fs -copyFromLocal test1-process.txt hdfs://master:9999/user/hadoop-twq/spark-course/streaming/filestream
     */
    object HdfsFileStream {
      def main(args: Array[String]) {
        val sparkConf = new SparkConf().setAppName("HdfsFileStream")
        val sc = new SparkContext(sparkConf)
        // Create the context
        val ssc = new StreamingContext(sc, Seconds(2))
    
        val filePath = "hdfs://master:9999/user/hadoop-twq/spark-course/streaming/filestream"
    
        // Create the FileInputDStream on the directory and use the
        // stream to count words in new files created
    
        // filePath 表示监控的文件目录
        // filter(Path => Boolean) 表示符合条件的文件路径
        // isNewFile 表示streaming app启动的时候是否需要处理已经存在的文件
         // fileStream[LongWritable, Text, TextInputFormat]   key是LongWritable    value类型LongWritable    输出的文件类型TextInputFormat
         //  参数:filePath文件目录     
         //     (path: Path) => path.toString.contains("process")   过滤器
         //  false   可见文件
        val linesWithText = ssc.textFileStream(filePath)   //简单写法,没有过滤器
        val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](filePath,
          (path: Path) => path.toString.contains("process"), false).map(_._2.toString)
    
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
        wordCounts.print()
    
        ssc.start()
        ssc.awaitTermination()
    
        ssc.stop(false)
      }
    }
    
    
        /*spark.streaming.fileStream.minRememberDuration = 60s 
          HDFS系统的时间需要和跑streaming app的机器的时间同步
          新文件在一定的remember window的时间段内可见,这样的新文件才会处理
       一旦文件可见,那么文件的修改时间不能变,如果向文件追加内容的话,这些内容不会被读取
    */
    

      

  • 相关阅读:
    弄明白python reduce 函数
    Linux 下载百度网盘大文件
    java 从网上下载文件的几种方式
    Windows下Python2与Python3两个版本共存的方法详解
    python 学习笔记
    Glide实现查看图片和保存图片到手机
    Android Animation 知识点速记备忘思维导图
    You must not call setTag() on a view Glide is targeting when use Glide
    前端数据流哲学
    精读《Optional chaining》
  • 原文地址:https://www.cnblogs.com/tesla-turing/p/11488174.html
Copyright © 2011-2022 走看看