zoukankan      html  css  js  c++  java
  • RDD执行延迟执行原理

    简化版的RDD执行延迟执行原理

    package spark
    
    import scala.collection.Iterator._
    
    /**
      * Created by Kuan on 16/4/27.
      */
    object ABC {
    
      def main(args: Array[String]) {
        val rdd = new HadoopRDD[Int]()
        rdd.map(x => x * 10).filter(x => {
          val ok = x > 20
          println(s"当前值是${x},是否满足条件呢[${ok}]")
          ok
        }).doRun
        println("*********************分隔符")
        rdd.filter(x => x > 3).doRun
    
        rdd.filter(x => {
          val ok = x > 3
          println(s"当前值是${x},是否满足条件呢[${ok}]")
          ok
        }).map(x => x * 20).doRun
      }
    }
    
    abstract class RDD[T](parent: RDD[T]) {
      def compute(): Iterator[T]
    
      def map(f: T => T): RDD[T] = {
        new MapRDD[T](this, f)
      }
    
      def filter(p: T => Boolean): RDD[T] = {
        new FilterRDD[T](this, p)
      }
    
      def iterator(): Iterator[T] = {
        //此处是简化版,spark的的参数是
        // split: Partition, context: TaskContext
        //此处可以看出,一个partition对应一个TaskContext,也就是说一个Task
        compute()
      }
    
      def doRun(): Unit = {
        println("doRun方法开始执行")
        val it = iterator()
        while (it.hasNext) {
          println(s"doRun方法打印值${it.next()}")
        }
        println("doRun方法执行结束")
      }
    }
    
    class MapRDD[T](var prev: RDD[T], f: T => T) extends RDD[T](prev) {
    
      override def compute(): Iterator[T] = {
        val it = prev.iterator()
        return new Iterator[T] {
          def hasNext: Boolean = {
            it.hasNext
          }
    
          def next(): T = {
    
            val ret = f(it.next())
            println(s"mapRDD next 返回${ret}")
            ret
          }
        }
      }
    
    }
    
    class FilterRDD[T](var prev: RDD[T], p: T => Boolean) extends RDD[T](prev) {
    
      override def compute(): Iterator[T] = {
        val it = prev.iterator()
    
        return new Iterator[T] {
          private var hd: T = _
          private var hdDefined: Boolean = false
    
          def hasNext: Boolean = {
            hdDefined || {
              do {
                if (!it.hasNext) return false
                hd = it.next()
              } while (!p(hd))
              hdDefined = true
              true
            }
          }
    
          def next(): T = {
            val ret = if (hasNext) {
              hdDefined = false;
              hd
            } else empty.next()
    
            println(s"FilterRDD next 返回${ret}")
            ret
          }
        }
      }
    
    }
    
    class HadoopRDD[Int]() extends RDD[Int](null) {
    
      override def compute(): Iterator[Int] = {
        return new Iterator[Int] {
          val it = List(1, 2, 3, 4, 5, 6).toIterator
    
          def hasNext: Boolean = {
            it.hasNext
          }
    
          def next(): Int = {
            val ret = it.next().asInstanceOf[Int]
            println(s"HadoopRDD next 返回${ret}")
            ret
          }
        }
      }
    }
    

    每个RDD在执行过程中,都会生成一个Iterator,涉及到的设计模式装饰模式。
    只有当最后一个Iterator执行hasNext和next方法时,才会依次递归执行父RDD生成的Iterator的对应方法,然后在next方法中执行我们程序事先定义好的逻辑方法。比如map或者filter。此处最后一个RDD的doRun方法类似于spark的runJob方法。触发真正的逻辑执行。

  • 相关阅读:
    轻松背后的N+疲惫——系统日志
    Wcf实现IServiceBehavior拓展机制
    一个迭代小算法,根据指定的个数对下标进行分组
    SqlServer 游标用法
    DataView RowFilter
    Asp.net Repeater 排序
    asp.net 导出Excel
    C# 导出Excel(csv )
    C# 上传图片
    C# 调用外部.exe文件
  • 原文地址:https://www.cnblogs.com/luckuan/p/5441111.html
Copyright © 2011-2022 走看看