  • Spark Streaming之三:DStream解析



    1.1.1 Duration

    Spark Streaming的时间类型,单位是毫秒;


    1)new Duration(milli seconds)






    1.1.2 slideDuration

    /** Time interval after which the DStream generates a RDD */
      def slideDuration: Duration


    1.1.3 dependencies

    /** List of parent DStreams on which this DStream depends on */
      def dependencies: List[DStream[_]]


    1.1.4 compute

    /** Method that generates a RDD for the given time */
      def compute(validTime: Time): Option[RDD[T]]


    1.1.5 zeroTime

    // Time zero for the DStream
      private[streaming] var zeroTime: Time = null


    1.1.6 rememberDuration

    // Duration for which the DStream will remember each RDD created
      private[streaming] var rememberDuration: Duration = null


    1.1 7 storageLevel

    // Storage level of the RDDs in the stream
      private[streaming] var storageLevel: StorageLevel = StorageLevel.NONE


    1.1.8 parentRememberDuration

    // Duration for which the DStream requires its parent DStream to remember each RDD created
      private[streaming] def parentRememberDuration = rememberDuration


    1.1.9 persist

    /** Persist the RDDs of this DStream with the given storage level */
      def persist(level: StorageLevel): DStream[T] = {
        if (this.isInitialized) {
          throw new UnsupportedOperationException(
            "Cannot change storage level of a DStream after streaming context has started")
        this.storageLevel = level


    1.1.10 checkpoint

       * Enable periodic checkpointing of RDDs of this DStream
       * @param interval Time interval after which generated RDD will be checkpointed
      def checkpoint(interval: Duration): DStream[T] = {
        if (isInitialized) {
          throw new UnsupportedOperationException(
            "Cannot change checkpoint interval of a DStream after streaming context has started")
        checkpointDuration = interval


    1.1.11 initialize

       * Initialize the DStream by setting the "zero" time, based on which
       * the validity of future times is calculated. This method also recursively initializes
       * its parent DStreams.
      private[streaming] def initialize(time: Time) {
        if (zeroTime != null && zeroTime != time) {
          throw new SparkException(s"ZeroTime is already initialized to $zeroTime"
            + s", cannot initialize it again to $time")
        zeroTime = time
        // Set the checkpoint interval to be slideDuration or 10 seconds, which ever is larger
        if (mustCheckpoint && checkpointDuration == null) {
          checkpointDuration = slideDuration * math.ceil(Seconds(10) / slideDuration).toInt
          logInfo(s"Checkpoint interval automatically set to $checkpointDuration")
        // Set the minimum value of the rememberDuration if not already set
        var minRememberDuration = slideDuration
        if (checkpointDuration != null && minRememberDuration <= checkpointDuration) {
          // times 2 just to be sure that the latest checkpoint is not forgotten (#paranoia)
          minRememberDuration = checkpointDuration * 2
        if (rememberDuration == null || rememberDuration < minRememberDuration) {
          rememberDuration = minRememberDuration
        // Initialize the dependencies

    initialize,DStream初始化,其初始时间通过"zero" time设置;

    1.1.12 getOrCompute

       * Get the RDD corresponding to the given time; either retrieve it from cache
       * or compute-and-cache it.
      private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {


    1.1.13 generateJob

       * Generate a SparkStreaming job for the given time. This is an internal method that
       * should not be called directly. This default implementation creates a job
       * that materializes the corresponding RDD. Subclasses of DStream may override this
       * to generate their own jobs.
      private[streaming] def generateJob(time: Time): Option[Job] = {
        getOrCompute(time) match {
          case Some(rdd) =>
            val jobFunc = () => {
              val emptyFunc = { (iterator: Iterator[T]) => {} }
              context.sparkContext.runJob(rdd, emptyFunc)
            Some(new Job(time, jobFunc))
          case None => None


    1.1.14 clearMetadata


      *Clear metadata that are older than `rememberDuration` of this DStream.

      * This is an internal method that should notbe called directly. This default

      * implementation clears the old generatedRDDs. Subclasses of DStream may override

      * this to clear their own metadata alongwith the generated RDDs.


     private[streaming]defclearMetadata(time: Time) {


    1.1.15 updateCheckpointData


      * Refresh the list of checkpointed RDDs thatwill be saved along with checkpoint of

      * this stream. This is an internal methodthat should not be called directly. This is

      * a default implementation that saves onlythe file names of the checkpointed RDDs to

      * checkpointData. Subclasses of DStream(especially those of InputDStream) may override

      * this method to save custom checkpointdata.


     private[streaming]defupdateCheckpointData(currentTime:Time) {


    1.2 DStream基本操作

    1.2.1 map

     /** Return a newDStreamby applying a function toall elements of this DStream. */

     defmap[U: ClassTag](mapFunc: T=> U): DStream[U] = {

       newMappedDStream(this, context.sparkContext.clean(mapFunc))



    1.2.2 flatMap


      * Return a new DStream by applying afunction to all elements of this DStream,

      * and then flattening the results


     defflatMap[U:ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = {

       newFlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))




     /** Return a new DStream containing only the elements that satisfy apredicate. */

     def filter(filterFunc: T => Boolean): DStream[T] = newFilteredDStream(this, filterFunc)


    1.2.4 glom


      * Return a new DStream in which each RDD isgenerated by applying glom() to each RDD of

      * this DStream. Applying glom() to an RDD coalescesall elements within each partition into

      * an array.


     defglom(): DStream[Array[T]] =new GlommedDStream(this)


    1.2.5 repartition


      * Return a new DStream with an increased ordecreased level of parallelism. Each RDD in the

      * returned DStream has exactly numPartitionspartitions.


     defrepartition(numPartitions: Int):DStream[T] =this.transform(_.repartition(numPartitions))


    1.2.6 mapPartitions


      * Return a new DStream in which each RDD isgenerated by applying mapPartitions() to each RDDs

      * of this DStream. Applying mapPartitions()to an RDD applies a function to each partition

      * of the RDD.



         mapPartFunc: Iterator[T] => Iterator[U],

         preservePartitioning: Boolean = false

       ): DStream[U] = {

       newMapPartitionedDStream(this, context.sparkContext.clean(mapPartFunc), preservePartitioning)



    1.2.7 reduce


      * Return a new DStream in which each RDD hasa single element generated by reducing each RDD

      * of this DStream.


     defreduce(reduceFunc:(T, T) => T): DStream[T] =

       this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)


    1.2.8 count


      * Return a new DStream in which each RDD hasa single element generated by counting each RDD

      * of this DStream.


     defcount(): DStream[Long] = {

       this.map(_=> (null,1L))


           .reduceByKey(_ + _)




    1.2.9 countByValue


      * Return a new DStream in which each RDDcontains the counts of each distinct value in

      * each RDD of this DStream. Hashpartitioning is used to generate

      * the RDDs with `numPartitions` partitions(Spark's default number of partitions if

      * `numPartitions` not specified).


     defcountByValue(numPartitions:Int = ssc.sc.defaultParallelism)(implicit ord: Ordering[T] = null)

         : DStream[(T, Long)] =

       this.map(x => (x, 1L)).reduceByKey((x: Long, y: Long) => x +y, numPartitions)


    1.2.10 foreachRDD


      * Apply a function to each RDD in thisDStream. This is an output operator, so

      * 'this' DStream will be registered as anoutput stream and therefore materialized.


     defforeachRDD(foreachFunc:(RDD[T], Time) => Unit) {

       // because the DStream is reachable from the outer objecthere, and because

       // DStreams can't be serialized with closures, we can'tproactively check

       // it for serializability and so we pass the optionalfalse to SparkContext.clean

       newForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register()



    1.2.11 transform


      * Return a new DStream in which each RDD isgenerated by applying a function

      * on each RDD of 'this' DStream.


     deftransform[U:ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = {

       // because the DStream is reachable from the outer objecthere, and because

       // DStreams can't be serialized with closures, we can'tproactively check

       // it for serializability and so we pass the optionalfalse to SparkContext.clean

       transform((r: RDD[T], t: Time) =>context.sparkContext.clean(transformFunc(r),false))



    1.2.12 transformWith


      * Return a new DStream in which each RDD isgenerated by applying a function

      * on each RDD of 'this' DStream and 'other'DStream.


     deftransformWith[U: ClassTag,V: ClassTag](

         other: DStream[U], transformFunc:(RDD[T], RDD[U]) => RDD[V]

       ): DStream[V] = {

       // because the DStream is reachable from the outer objecthere, and because

       // DStreams can't be serialized with closures, we can'tproactively check

       // it for serializability and so we pass the optionalfalse to SparkContext.clean

       valcleanedF = ssc.sparkContext.clean(transformFunc, false)

       transformWith(other, (rdd1: RDD[T], rdd2:RDD[U], time: Time) => cleanedF(rdd1, rdd2))



    1.2.13 print


      * Print the first ten elements of each RDDgenerated in this DStream. This is an output

      * operator, so this DStream will beregistered as an output stream and there materialized.


     defprint() {

       defforeachFunc = (rdd: RDD[T], time: Time) => {

         valfirst11 = rdd.take(11)

         println ("-------------------------------------------")

         println ("Time: " + time)

         println ("-------------------------------------------")


         if(first11.size > 10) println("...")



       newForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()



    1.2.14 window


      * Return a new DStream in which each RDDcontains all the elements in seen in a

      * sliding window of time over this DStream.The new DStream generates RDDs with

      * the same interval as this DStream.

      * @param windowDuration width of thewindow; must be a multiple of this DStream's interval.


     defwindow(windowDuration:Duration): DStream[T] = window(windowDuration,this.slideDuration)



      * Return a new DStreaminwhich each RDD contains all the elements in seen in a

      * sliding window of time over this DStream.

      * @param windowDuration width of thewindow; must be a multiple of this DStream's

      *                       batching interval

      * @param slideDuration  sliding interval of the window (i.e., theinterval after which

      *                       the new DStream willgenerate RDDs); must be a multiple of this

      *                       DStream's batchinginterval


     def window(windowDuration:Duration, slideDuration: Duration): DStream[T] = {

       newWindowedDStream(this, windowDuration, slideDuration)



    1.2.15 reduceByWindow


      * Return a new DStream in which each RDD hasa single element generated by reducing all

      * elements in a sliding window over thisDStream.

      * @param reduceFunc associativereduce function

      * @param windowDuration width of thewindow; must be a multiple of this DStream's

      *                       batching interval

      * @paramslideDuration sliding interval of thewindow (i.e., the interval after which

      *                       the new DStream willgenerate RDDs); must be a multiple of this

      *                       DStream's batchinginterval


     def reduceByWindow(

         reduceFunc: (T, T) => T,

         windowDuration: Duration,

         slideDuration: Duration

       ): DStream[T] = {





      * Return a new DStream in which each RDD hasa single element generated by reducing all

      * elements in a sliding window over thisDStream. However, the reduction is done incrementally

      * using the old window's reduced value :

      *  1.reduce the new values that entered the window (e.g., adding new counts)

      *  2."inverse reduce" the old values that left the window (e.g.,subtracting old counts)

      * This is more efficient than reduceByWindow without "inversereduce" function.

      * However, it is applicable to only "invertible reduce functions".

      * @param reduceFunc associativereduce function

      * @param invReduceFunc inverse reducefunction

      * @param windowDuration width of thewindow; must be a multiple of this DStream's

      *                       batching interval

      * @param slideDuration  sliding interval of the window (i.e., theinterval after which

      *                       the new DStream willgenerate RDDs); must be a multiple of this

      *                       DStream's batchinginterval



         reduceFunc:(T, T) => T,

         invReduceFunc: (T, T) => T,

         windowDuration: Duration,

         slideDuration: Duration

       ): DStream[T] = {

         this.map(x=> (1, x))

             .reduceByKeyAndWindow(reduceFunc,invReduceFunc, windowDuration, slideDuration,1)





    1.2.16 countByWindow


      * Return a new DStream in which each RDD hasa single element generated by counting the number

      * of elements in a sliding window over thisDStream. Hash partitioning is used to generate

      * the RDDs with Spark's default number ofpartitions.

      * @param windowDuration width of thewindow; must be a multiple of this DStream's

      *                       batching interval

      * @param slideDuration  sliding interval of the window (i.e., theinterval after which

      *                       the new DStream willgenerate RDDs); must be a multiple of this

      *                       DStream's batchinginterval


     defcountByWindow(windowDuration:Duration, slideDuration: Duration): DStream[Long] = {

       this.map(_=>1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)





      * Return a new DStream in which each RDDcontains the count of distinct elements in

      * RDDs in a sliding window over thisDStream. Hash partitioning is used to generate

      * the RDDs with `numPartitions` partitions(Spark's default number of partitions if

      * `numPartitions` not specified).

      * @param windowDuration width of thewindow; must be a multiple of this DStream's

      *                       batching interval

      * @param slideDuration  sliding interval of the window (i.e., theinterval after which

      *                       the new DStream willgenerate RDDs); must be a multiple of this

      *                       DStream's batchinginterval

      * @param numPartitions  number of partitions of each RDD in the newDStream.



         windowDuration: Duration,

         slideDuration: Duration,

         numPartitions: Int =ssc.sc.defaultParallelism)

         (implicitord: Ordering[T] = null)

         : DStream[(T, Long)] =


       this.map(x=> (x, 1L)).reduceByKeyAndWindow(

         (x: Long, y: Long) => x + y,

         (x: Long, y: Long) => x - y,




         (x: (T, Long)) => x._2 != 0L




    1.2.18 union


      * Return a new DStream by unifying data ofanother DStream with this DStream.

      * @paramthat Another DStream having the same slideDuration as this DStream.


     defunion(that:DStream[T]): DStream[T] =new UnionDStream[T](Array(this, that))



      * Return all the RDDs defined by theInterval object (both end times included)


     def slice(interval:Interval): Seq[RDD[T]] = {

       slice(interval.beginTime, interval.endTime)



    1.2.19 slice


      * Return all the RDDs between 'fromTime' to'toTime' (both included)


     defslice(fromTime:Time, toTime: Time): Seq[RDD[T]] = {

       if(!isInitialized) {

         thrownew SparkException(this + " has not beeninitialized")


       if(!(fromTime - zeroTime).isMultipleOf(slideDuration)) {

         logWarning("fromTime (" + fromTime + ") is not amultiple of slideDuration ("

           + slideDuration + ")")


       if(!(toTime - zeroTime).isMultipleOf(slideDuration)) {

         logWarning("toTime (" + fromTime + ") is not amultiple of slideDuration ("

           + slideDuration + ")")


       valalignedToTime = toTime.floor(slideDuration)

       valalignedFromTime = fromTime.floor(slideDuration)


       logInfo("Slicing from " + fromTime + " to " + toTime +

         " (aligned to " + alignedFromTime + " and " + alignedToTime + ")")


       alignedFromTime.to(alignedToTime,slideDuration).flatMap(time => {

         if(time >= zeroTime) getOrCompute(time) elseNone






      * Save each RDD in this DStream as aSequence file of serialized objects.

      * The file name at each batch interval isgenerated based on `prefix` and

      * `suffix`:"prefix-TIME_IN_MS.suffix".


     defsaveAsObjectFiles(prefix: String, suffix: String = ""){

       valsaveFunc = (rdd: RDD[T], time: Time) => {

         valfile = rddToFileName(prefix, suffix, time)






    1.2.21 saveAsTextFiles


      * Save each RDD in this DStreamasat text file, using string representation

      * of elements. The file name at each batchinterval is generated based on

      * `prefix` and `suffix`:"prefix-TIME_IN_MS.suffix".


     defsaveAsTextFiles(prefix:String, suffix: String ="") {

       valsaveFunc = (rdd: RDD[T], time: Time) => {

         valfile = rddToFileName(prefix, suffix, time)







      * Register this streaming as an outputstream. This would ensure that RDDs of this

      * DStream will be generated.


     private[streaming]defregister(): DStream[T] = {










