zoukankan      html  css  js  c++  java
  • [Spark]-源码解析-RDD之transform

      

        1.Map系转换

        Map转换算是 RDD 的经典转换操作之一了.就以它开头.Map的源码如下  

      def map[U: ClassTag](f: T => U): RDD[U] = withScope {
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
      }

      1.1  sc.clean(f)   

          第一眼看见的就是一个 clean 函数.这是做什么的.来看看定义        

          /**
           * Clean a closure to make it ready to be serialized and sent to tasks
           * (removes unreferenced variables in $outer's, updates REPL variables)
           * If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively
           * check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt>
           * if not.
           */
          private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
            ClosureCleaner.clean(f, checkSerializable)
            f
          }

        说的非常清楚.clean 整理一个闭包,使其可以序列化并发送到任务.这里想明白.为什么会序列化闭包传送到任务.

          因为Map的闭包执行逻辑本身是上传到Driver端的,但是真正的执行是交给executor的.所以才会有序列化这个闭包传输出去.

        序列化一个闭包的逻辑就是 ClosureCleaner.clean里的内容.

          这里代码太多了.序列化一个闭包的思路就是反射.匿名函数也是函数调用.就用反射把这个函数位置找出记下来传输给执行端.执行端根据函数位置反射调用.

      1.2 MapPartitionsRDD     
            /**
             * An RDD that applies the provided function to every partition of the parent RDD.
             */
            private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
                var prev: RDD[T],
                f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
                preservesPartitioning: Boolean = false)
              extends RDD[U](prev) {
    
              override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
    
              override def getPartitions: Array[Partition] = firstParent[T].partitions
    
              override def compute(split: Partition, context: TaskContext): Iterator[U] =
                f(context, split.index, firstParent[T].iterator(split, context))
    
              override def clearDependencies() {
                super.clearDependencies()
                prev = null
              }
            }

        1.2.1 这是一个MapPartitionsRDD.注意这里,它是如何描述MapPartitionsRDD的.

          prev 这是父RDD f 这是Map函数. 就没有了.没有MapPartitionsRDD 本身的RDD数据.

          这再次印证了转换不会产生任何数据.它只是单纯了记录父RDD以及如何转换的过程就完了,不会在转换阶段就产生任何数据集

        1.2.2 preservesPartitioning

          preservesPartitioning 表示是否保持父RDD的分区信息. 
          
           如果为false(默认为false),则会对结果重新分区.也就是Map系默认都会分区
         如果为true,保留分区. 则按照
    firstParent 保留分区   
            /** Returns the first parent RDD */
            protected[spark] def firstParent[U: ClassTag]: RDD[U] = {
              dependencies.head.rdd.asInstanceOf[RDD[U]]
            }       
          可以看出(dependencies是依赖栈,所有head是直属父级) 如果保留RDD分区.是保留的父级RDD的分区.
        1.2.3 override def compute(split: Partition, context: TaskContext)
          来看看 MapPartitionRDD 是如何重写定义计算逻辑的
            firstParent[T].iterator(split, context) 遍历 父RDD的每个分区进行计算
          
            /**
               * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
               * This should ''not'' be called by users directly, but is available for implementors of custom
               * subclasses of RDD.
               */
              final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
                if (storageLevel != StorageLevel.NONE) {
                  getOrCompute(split, context)
                } else {
                  computeOrReadCheckpoint(split, context)
                }
              }

            尝试以缓存的方式读取父级RDD.

              如果设定的存储级别不是不缓存级别.都会尝试从缓存读取

              如果设定的存储级别是不缓存,依然会尝试从CheckPoint 读取

              总之,Spark的原则是 能不重计算读取就不重计算

            对于没有持久化的RDD,compute方法实现了依赖链的递归调用(compute->firstParent.iterator->compute)

              
    compute => firstParent.iterator => getOrCompute(真正计算) => computeOrReadCheckpoint
                    computeOrReadCheckpoint =>(isCheckpointedAndMaterialized) => firstParent.iterator
                                 => this.
    compute
     
    getOrCompute
     


    注意这里的 val cleanF = sc.clean(f).

    sc.clean(f) 函数定义是这样的
  • 相关阅读:
    C++指针(一)
    探讨mvc下linq多表查询使用viewModel的问题
    利用aspnetPager控件加存储过程实现对gridview的分页
    Javascript 笔记
    对于最近学习WCF的总结——唔聊的服务
    今天面试的一些知识性总结
    使用C#反序列化plist文件
    Running x86 apps on WinRT devices
    ItemsSource绑定中遇到的问题
    字节对齐 详解
  • 原文地址:https://www.cnblogs.com/NightPxy/p/9342715.html
Copyright © 2011-2022 走看看