zoukankan      html  css  js  c++  java
  • spark系列(一)----RDD

    一.RDD是什么

      RDD是Spark提供的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集。

      在spark的源码里面我们可以看到,rdd是被abstract所修饰的,他是一个抽象类,它代表一个不可变,可分区,里面的元素可并行计算的集合。

      而在spark的工作流程中,RDD的主要作用是对数据进行结构的转换,在对RDD的方法源码中可以看到,方法传参中需要将当前RDD传进去,在最后又会新建一个RDD作为输出。这种数据转换的设计,体现出了装饰者设计模式。

    二.RDD的特点

      抽象、分布式、不可变、可分区并行计算。

      1.RDD是分布式的,RDD本身不存储数据,它是拥有相同特性,或者说拥有相同数据结构的一类数据的逻辑划分,而这些数据分布在集群的各个节点上。

      2.RDD是可分区的,数据分区后,可以发送给不同的executor,RDD的分区主要是用来实现并行计算的。

      3.RDD不可变,在RDD的方法中,最终最是生成一个新的RDD做出输出,而不会直接修改原本的RDD。

      4.RDD里面封装的其实是逻辑,它的责任是告诉程序在运行时,要以什么样的逻辑去处理这一类数据。

      5.RDD中有一个叫做preferred location的列表,里面存储着分区的优先位置,而优先位置的概念是指,在spark分配任务给executor的时候,会优先分配给存有这个任务的数据的那个节点上的executor,这样executor在执行任务的时候,就不用从别的节点上拿取数据了。

     

    三.RDD的宽依赖和窄依赖

      RDD之间是存在依赖关系的,RDD中将依赖分成了两种类型,宽依赖和窄依赖,窄依赖是指父RDD的每个分区都只能被子RDD一个分区使用,相应的,宽依赖就是指RDD的分区被多个子RDD的分区所依赖(如reduceByKey)。

    四.RDD的缓存

      假如在应用程序中,某个RDD被多次重用,就可以把该RDD缓存起来,那样这个RDD里面划分的数据,只会在第一次计算的时候,从上游RDD中计算得到,而其余计算中,会直接使用缓存里面的数据进行计算。

    五.RDD的创建

      rdd可以通过三种方式创建,分区通过集合(从内存中创建),通过外部数据,通过别的RDD。

        

    六.RDD的分区

      RDD的分区代表着RDD中的数据继续逻辑化成成多少块,每个分区的数据可以交由一个executor去执行,以实现数据的并行计算,RDD的分区是可以由用户自己指定的,但是如果用户没有指定的话,在不同情况下,它有着不同的默认值。

      下面我们以makeRDD和textfile为例,看看spark的源码。

      makeRDD:

        这是以集合为基础生成的RDD,我们来看看它的具体代码

    def makeRDD[T: ClassTag](
          seq: Seq[T],
          numSlices: Int = defaultParallelism): RDD[T] = withScope {
        parallelize(seq, numSlices)
    }

        可以看到,这个方法除了要传入一个seq之外,还需要传入一个叫numSlices的参数,就是这个参数决定着并行度,而这个numSlices时从defaultparallelism这个方法那里获取到的。

    def defaultParallelism: Int = {
        assertNotStopped()
        taskScheduler.defaultParallelism
    }

        这里又看到,该值是从taskScheduler.defaultParallelism处获取的,但是继续看下去,会发现这个方法时一个抽象方法。

         因此,我们可以crtl+h看看这个方法具体在哪里实现了

        搜索结果告诉我们,在TaskSchedulerlmpl里面有这个方法的具体实现

    override def defaultParallelism(): Int = backend.defaultParallelism()
    
      // Check for speculatable tasks in all our active jobs.
      def checkSpeculatableTasks() {
        var shouldRevive = false
        synchronized {
          shouldRevive = rootPool.checkSpeculatableTasks(MIN_TIME_TO_SPECULATION)
        }
        if (shouldRevive) {
          backend.reviveOffers()
        }
    }

        这里可以看到,这个值又是从backend.defaultParallelism中传过来的,按照这种方式继续查下去,会一直查到一个叫coarseGrainedSchedulerBackend的文件中

    override def defaultParallelism(): Int = {
        conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
    }

        这里就是最终决定这个参数的值是多少的地方了,首先会读取spark.default.parallelism这个配置的值,假如没有配置,则会拿当前计算机最大内核数与2做对比,取较大值。

      textfile:

        textfile是以外部文件为基础生成的RDD,下面是他的代码 

    def textFile(
          path: String,
          minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
        assertNotStopped()
        hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
          minPartitions).map(pair => pair._2.toString).setName(path)
    }

        可以看到,这里的并行度是defaultMinPartitions(最小分区个数)这个方法决定的,这个方法没有传参

    def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

        它是拿刚刚defaultParallelism的值跟2做对比,取较小值,所以,defaultParallelism这个方法的值是怎么来的,还是得弄清楚。

                  值得一提的是,这里的defaultMinPartitions是最小分区个数,它的意思是它最少会有两个分区,但是具体有多少各分区并不确定,例如defaultMinPartitions我们输入一个2进去,对一个文件:

        abcde

        进行wrodcount,这里最后其实生成三个文件,也就是说他被分成了三个分区了,原因是,这个文件的大小是5个字节,5/2 = 2 余 1,也就是说,我每个分区分配2个字节,最后一个分区分配1个字节,所以这里最后的分区个数可能跟给定的值一致,也可能大于给定的值。

        然而,这里面又引申出另外一个问题,这三个文件里面,究竟是不是按照ab、cd、e的内容进行划分呢,实际上不是的,因此这里得出一个结论,在计算的时候,分多少个分区,与数据如何分配到分区里面,是两套规则,相互独立的,上述情况,其实最后的数据是abcde,null,null这样分配的,它具体是按照hadoop的分片规则来决定的(hadoop是按照行来切分数据的)。

        

  • 相关阅读:
    MongoDB:数据库管理
    MongoDB:用户管理
    MongoDB:入门
    彻底透析SpringBoot jar可执行原理
    轻松了解Spring中的控制反转和依赖注入(一)
    领域驱动最佳实践--用代码来告诉你来如何进行领域驱动设计
    血的教训--如何正确使用线程池submit和execute方法
    领域驱动设计之实战权限系统微服务
    为什么我们需要领域驱动设计
    【Go入门学习】golang自定义路由控制实现(二)-流式注册接口以及支持RESTFUL
  • 原文地址:https://www.cnblogs.com/QicongLiang/p/13657893.html
Copyright © 2011-2022 走看看