zoukankan      html  css  js  c++  java
  • Spark Streaming之二:StreamingContext解析

    1.1 创建StreamingContext对象

    1.1.1通过SparkContext创建

    源码如下:

    def this(sparkContext: SparkContext, batchDuration: Duration) = {
        this(sparkContext, null, batchDuration)
      }

    第一参数为sparkContext对象,第二个参数为批次时间;

    创建实例:

    val ssc = new StreamingContext(sc, Seconds(5))

    1.1.2通过SparkConf创建

    源码如下:

    def this(conf: SparkConf, batchDuration: Duration) = {
        this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
      }

    第一参数为SparkConf对象,第二个参数为批次时间;

    创建实例:

    val conf =new SparkConf().setAppName("StreamTest")

    val ssc = new StreamingContext(conf,Seconds(5))

    1.1.3通过SparkConf参数创建

    源码如下:

    def this(
          master: String,
          appName: String,
          batchDuration: Duration,
          sparkHome: String = null,
          jars: Seq[String] = Nil,
          environment: Map[String, String] = Map()) = {
        this(StreamingContext.createNewSparkContext(master, appName, sparkHome, jars, environment),
             null, batchDuration)
      }

    第一参数为需要创建SparkConf对象的详细参数,master-spark地址,appName-对象名称,sparkHome- sparkHome环境变量,jars, environment,第二个参数为批次时间;

    创建实例:

    val ssc = newStreamingContext(“ spark://host:port”, "StreamTest", Seconds(5),      System.getenv("SPARK_HOME"),StreamingContext.jarOfClass(this.getClass))

     

    1.1.4通过checkpointfile参数创建

    源码如下:

    def this(path: String, hadoopConf: Configuration) =
        this(null, CheckpointReader.read(path, new SparkConf(), hadoopConf).orNull, null)

    第一参数为checkpoint file的路径,第二个参数为haoop的配置

      

    1.2创建Dstream监听对象

    1.2.1 fileStream

    val ssc = new StreamingContext(sparkConf, Seconds(10))  
        ssc.fileStream(directory)

    源码如下:

    /**
       * Create an input stream that monitors a Hadoop-compatible filesystem
       * for new files and reads them using the given key-value types and input format.
       * Files must be written to the monitored directory by "moving" them from another
       * location within the same file system. File names starting with . are ignored.
       * @param directory HDFS directory to monitor for new file
       * @tparam K Key type for reading HDFS file
       * @tparam V Value type for reading HDFS file
       * @tparam F Input format for reading HDFS file
       */
      def fileStream[
        K: ClassTag,
        V: ClassTag,
        F <: NewInputFormat[K, V]: ClassTag
      ] (directory: String): InputDStream[(K, V)] = {
        new FileInputDStream[K, V, F](this, directory)
      }

    参数:K-读入HDFS的Key的类型,V-读入HDFS的Value的类型,F-读入HDFS的类型;directory-监听HDFS的路径,filter-对监听HDFS的文件进行过滤的函数,newFilesOnly-是否只监听新增文件;

    fileStream可以通过设置filter函数,对监听目录下的文件进行过滤,只对满足条件的文件进行监听和处理;

    默认过滤方法:

    def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")

    该方法是过滤以隐藏文件。

    fileStream可以通过设置newFilesOnly为TRUE或者FALES,是否处理监听目录下已存在的文件,默认是不处理已存在文件,只处理新增加文件,如果设置为FALES,可以处理前一个窗口时间内的老文件。

    源码如下:在class FileInputDStream里:

    private val initialModTimeIgnoreThreshold = if (newFilesOnly) clock.getTimeMillis() else 0L
    private def findNewFiles(currentTime: Long): Array[String] = {
        try {
          lastNewFileFindingTime = clock.getTimeMillis()
    
          // Calculate ignore threshold
          val modTimeIgnoreThreshold = math.max(
            initialModTimeIgnoreThreshold,   // initial threshold based on newFilesOnly setting
            currentTime - durationToRemember.milliseconds  // trailing end of the remember window
          )
    ...

    modTimeIgnoreThreshold是时间窗口过滤条件,通过newFilesOnly值来取的是当前时间或者前一个窗口时间。

    创建实例:

    // 创建新过滤函数

       def myFilter(path:Path): Boolean = path.getName().contains("data")

    // 创建fileStream

    val data1 = ssc.fileStream[LongWritable,Text, TextInputFormat](Spath1, pa => myFilter(pa),false).map(_._2.toString)

     

    1.2.2 textFileStream

    val ssc = new StreamingContext(sparkConf, Seconds(10))  
        ssc.textFileStream(directory)

    源码如下:

    /**
       * Create an input stream that monitors a Hadoop-compatible filesystem
       * for new files and reads them as text files (using key as LongWritable, value
       * as Text and input format as TextInputFormat). Files must be written to the
       * monitored directory by "moving" them from another location within the same
       * file system. File names starting with . are ignored.
       * @param directory HDFS directory to monitor for new file
       */
      def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") {
        fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
      }

    参数:directory监听的目录;

    其实textFileStream是fileStream的一个实例。

    创建实例:

       val StreamFile1=ssc.textFileStream(Spath1)

    1.2.3 socketTextStream

    val ssc = new StreamingContext(sparkConf, Seconds(10))  
        ssc.socketStream(hostname, port, converter, storageLevel)

    源码如下:

    /**
       * Creates an input stream from TCP source hostname:port. Data is received using
       * a TCP socket and the receive bytes it interpreted as object using the given
       * converter.
       * @param hostname      Hostname to connect to for receiving data
       * @param port          Port to connect to for receiving data
       * @param converter     Function to convert the byte stream to objects
       * @param storageLevel  Storage level to use for storing the received objects
       * @tparam T            Type of the objects received (after converting bytes to objects)
       */
      def socketStream[T: ClassTag](
          hostname: String,
          port: Int,
          converter: (InputStream) => Iterator[T],
          storageLevel: StorageLevel
        ): ReceiverInputDStream[T] = {
        new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
      }

    参数:hostname是主机IP,port是端口号,storageLevel数据的存储级别,默认2份MEMORY_AND_DISK;

    创建实例:

    val lines = ssc.socketTextStream(serverIP, serverPort);

    1.2.4 rawSocketStream

    val ssc = new StreamingContext(sparkConf, Seconds(10))  
        ssc.rawSocketStream(hostname, port, storageLevel)

    源码如下:

    /**
       * Create an input stream from network source hostname:port, where data is received
       * as serialized blocks (serialized using the Spark's serializer) that can be directly
       * pushed into the block manager without deserializing them. This is the most efficient
       * way to receive data.
       * @param hostname      Hostname to connect to for receiving data
       * @param port          Port to connect to for receiving data
       * @param storageLevel  Storage level to use for storing the received objects
       *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
       * @tparam T            Type of the objects in the received blocks
       */
      def rawSocketStream[T: ClassTag](
          hostname: String,
          port: Int,
          storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
        ): ReceiverInputDStream[T] = withNamedScope("raw socket stream") {
        new RawInputDStream[T](this, hostname, port, storageLevel)
      }

    rawSocketStream类似于socketTextStream;参照socketTextStream。

    1.2.5 networkStream

    源码如下:

     /**

      * Create an input stream with any arbitrary user implemented receiver.

      * Find more details at:http://spark.apache.org/docs/latest/streaming-custom-receivers.html

      * @param receiver Custom implementation of Receiver

      */

     @deprecated("Use receiverStream","1.0.0")

     def networkStream[T: ClassTag](

       receiver: Receiver[T]): ReceiverInputDStream[T] = {

       receiverStream(receiver)

      }

    创建实例:

    参照:http://spark.apache.org/docs/latest/streaming-custom-receivers.html

    1.2.6 receiverStream

    val ssc = new StreamingContext(sparkConf, Seconds(10))  
        ssc.receiverStream(receiver)

    源码如下:

    /**
       * Create an input stream with any arbitrary user implemented receiver.
       * Find more details at http://spark.apache.org/docs/latest/streaming-custom-receivers.html
       * @param receiver Custom implementation of Receiver
       */
      def receiverStream[T: ClassTag](receiver: Receiver[T]): ReceiverInputDStream[T] = {
        withNamedScope("receiver stream") {
          new PluggableInputDStream[T](this, receiver)
        }
      }

    参照:http://spark.apache.org/docs/latest/streaming-custom-receivers.html

    1.2.7 actorStream

    源码如下:

     /**

      * Create an input stream with any arbitrary user implemented actorreceiver.

      * Find more details at:http://spark.apache.org/docs/latest/streaming-custom-receivers.html

      * @param props Props object defining creation of the actor

      * @param name Name of the actor

      * @param storageLevel RDD storage level (default:StorageLevel.MEMORY_AND_DISK_SER_2)

      *

      * @note An important point to note:

      *       Since Actor may exist outsidethe spark framework, It is thus user's responsibility

      *       to ensure the type safety,i.e parametrized type of data received and actorStream

      *       should be same.

      */

     defactorStream[T: ClassTag](

         props: Props,

         name: String,

         storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,

         supervisorStrategy: SupervisorStrategy =ActorSupervisorStrategy.defaultStrategy

       ): ReceiverInputDStream[T] = {

       receiverStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))

      }

    创建实例:

    val StreamFile1 = ssc.actorStream[String](Props(newCustomActor()),"CustomReceiver")

    参照:http://spark.apache.org/docs/latest/streaming-custom-receivers.html

    1.2.8 queueStream

    val ssc = new StreamingContext(sparkConf, Seconds(10))  
        ssc.queueStream(queue, oneAtATime)

    源码如下:

    /**
       * Create an input stream from a queue of RDDs. In each batch,
       * it will process either one or all of the RDDs returned by the queue.
       *
       * NOTE: Arbitrary RDDs can be added to `queueStream`, there is no way to recover data of
       * those RDDs, so `queueStream` doesn't support checkpointing.
       *
       * @param queue      Queue of RDDs. Modifications to this data structure must be synchronized.
       * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
       * @tparam T         Type of objects in the RDD
       */
      def queueStream[T: ClassTag](
          queue: Queue[RDD[T]],
          oneAtATime: Boolean = true
        ): InputDStream[T] = {
        queueStream(queue, oneAtATime, sc.makeRDD(Seq[T](), 1))
      }

    1.2.9 union DStream

    val ssc = new StreamingContext(sparkConf, Seconds(10))  
        ssc.union(streams)

    源码如下:

    /**
       * Create a unified DStream from multiple DStreams of the same type and same slide duration.
       */
      def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = withScope {
        new UnionDStream[T](streams.toArray)
      }

    对同一类型的DStream进行合并,生成一个新的DStream,其中要求DStream的数据格式一致,批次时间间隔一致。

    1.2.10 transform DStream

    val ssc = new StreamingContext(sparkConf, Seconds(10))  
        ssc.transform(dstreams, transformFunc)

    源码如下:

    /**
       * Create a new DStream in which each RDD is generated by applying a function on RDDs of
       * the DStreams.
       */
      def transform[T: ClassTag](
          dstreams: Seq[DStream[_]],
          transformFunc: (Seq[RDD[_]], Time) => RDD[T]
        ): DStream[T] = withScope {
        new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
      }

    对Dstream进行transform操作生成一个新的Dstream。

    1.3 Checkpointing

    状态的操作是基于多个批次的数据的。它包括基于window的操作和updateStateByKey。因为状态的操作要依赖于上一个批次的数据,所以它要根据时间,不断累积元数据。为了清空数据,它支持周期性的检查点,通过把中间结果保存到hdfs上。因为检查操作会导致保存到hdfs上的开销,所以设置这个时间间隔,要很慎重。对于小批次的数据,比如一秒的,检查操作会大大降低吞吐量。但是检查的间隔太长,会导致任务变大。通常来说,5-10秒的检查间隔时间是比较合适的。

    实例:

       ssc.checkpoint("hdfs://192.168.1.100:9000/check")

       val StreamFile1=ssc.textFileStream(Spath1)

    StreamFile1.checkpoint(Seconds(30))

    转载请注明出处:

    http://blog.csdn.net/sunbow0/article/details/42966467

  • 相关阅读:
    第15章 RCC—使用HSE/HSI配置时钟—零死角玩转STM32-F429系列
    第14章 启动文件详解—零死角玩转STM32-F429系列
    第13章 GPIO-位带操作—零死角玩转STM32-F429系列
    第12章 GPIO输入-按键检测—零死角玩转STM32-F429系列
    使用Vmware过程中,突然网络连接不上问题
    Yaf自定义autoload以实现Model文件和Controller文件命名区分
    Yaf学习过程中遇到的问题小记
    网页出现横向滚动条的原因可能是使用bootstrap不当引起
    微信小程序开发(一)
    nginx 启动报错找不到nginx.pid文件
  • 原文地址:https://www.cnblogs.com/duanxz/p/4391279.html
Copyright © 2011-2022 走看看