zoukankan      html  css  js  c++  java
  • Spark RDD

    Spark RDD的简介

    什么是RDD?

    RDD是整个Spark的基石,是一个弹性分布式的数据集,为用户屏蔽了底层复杂的计算和映射操作。

    RDD的特点:

    • RDD 是不可变的,如果对一个RDD进行转换操作会生成一个新的RDD。
    • RDD 是分区的,RDD 里面的具体数据是分布在多台机器上的 Excutor里面的。
    • RDD 是弹性的。

    RDD 的弹性特征:

    • 弹性:Spark会根据用户的配置或者当前Spark的应用运行情况去自动将RDD的数据缓存到内存或者磁盘。它是一个对用户不可见的封装的功能。
    • 容错:当你的RDD数据被删除或者丢失的时候,可以通过血统或者检查点机制恢复数据,这个对用户来说也是透明的。
    • 计算:RDD的计算是分层的,有 应用->job->Stage->TaskSet->Task ,每一层都有相对应的计算的保障与重复机制。保障你的计算不会由于一些突然的因素而发生终止。
    • 分片:你可以根据业务需求或者一些算子来重新调整RDD的数据分布情况。

    如何创建RDD

    从外部文件创建

    从外部文件创建RDD可以使用textFile()方法。

    textFile()方法源码:

      /**
       * Read a text file from HDFS, a local file system (available on all nodes), or any
       * Hadoop-supported file system URI, and return it as an RDD of Strings.
       * @param path path to the text file on a supported file system
       * @param minPartitions suggested minimum number of partitions for the resulting RDD
       * @return RDD of lines of the text file
       */
      def textFile(
          path: String,
          minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
        assertNotStopped()
        hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
          minPartitions).map(pair => pair._2.toString).setName(path)
      }
    

    textFile(path: String,minPartitions: Int = defaultMinPartitions)有两个参数:

    • 第一个参数是文件的路径

    • 第二个参数是构建RDD后的分区数量,可以跟你传入的参数将数据划分成多个分区。

    如果你不指定分区的数量,首先会获取你的Task数量即defaultParallelism,然后再和2比较。如果你的Task数量大于或等于2,那么默认会将你的数据集分成两个区。如果你的Task数量是1,那么默认会将你的数据分成一个区。

    defaultMinPartitions方法,比较你的Task数量和2的大小:

    def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
    

    defaultParallelism方法,获取你的Task数量:

      /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
      def defaultParallelism: Int = {
        assertNotStopped()
        taskScheduler.defaultParallelism
      }
    

    示例:

    从本地外部文件创建RDD:

    scala> sc.textFile("file:///usr/hdp/3.1.0.0-78/spark2/README.md")
    res0: org.apache.spark.rdd.RDD[String] = file:///usr/hdp/3.1.0.0-78/spark2/README.md MapPartitionsRDD[1] at textFile at <console>:25
    

    从hdfs文件系统中读取文件创建RDD:

    scala> sc.textFile("hdfs:///README.md")
    res2: org.apache.spark.rdd.RDD[String] = hdfs:///README.md MapPartitionsRDD[3] at textFile at <console>:25
    

    读取本地文件时,需要在文件路径前面加上file://。从hdfs上读取文件的时候,需要在文件路径上加上hdfs://。如果你不在文件路径前面加上文件系统协议,会默认到hdfs文件系统中读取文件。

    从scala变量转换成RDD

    从scala变量转换成RDD有两个方法,一个是parallelize(),另一个是makeRDD()

    parallelize()

    parallelize() 源码:

    def parallelize[T: ClassTag](
          seq: Seq[T],
          numSlices: Int = defaultParallelism): RDD[T] = withScope {
        assertNotStopped()
        new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
      }
    

    parallelize[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism)同样有两个参数:

    • 第一个参数是scala的变量。这个变量是可以是Seq下面的所有子类。

    • 第二个参数是指定创建后RDD的分区数量,如果不指定也是默认在Task数量和2中选一个最小的值作为分区数量。

    不指定分区示例:

    //从Seq创建RDD
    scala> sc.parallelize(Seq(1 to 5))
    res3: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[4] at parallelize at <console>:25
    
    scala> res3.collect
    res4: Array[scala.collection.immutable.Range.Inclusive] = Array(Range(1, 2, 3, 4, 5))
    
    //从Array创建RDD
    scala> sc.parallelize(Array(1 to 5))
    res5: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[5] at parallelize at <console>:25
    
    scala> res5.collect
    res6: Array[scala.collection.immutable.Range.Inclusive] = Array(Range(1, 2, 3, 4, 5))
    
    //从List创建RDD
    scala> sc.parallelize(List(1 to 5))
    res7: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[6] at parallelize at <console>:25
    
    scala> res7.collect
    res8: Array[scala.collection.immutable.Range.Inclusive] = Array(Range(1, 2, 3, 4, 5))
    
    

    指定分区示例:

    //我的Task是1,所以默认的分区数量是1        
    scala> sc.parallelize(Seq(1 to 5))
    res9: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[7] at parallelize at <console>:25
                                                                                                           
    scala> res9.partitions.size
    res10: Int = 1
    
    //指定数据存放到10个分区
    scala> sc.parallelize(Seq(1 to 5),10)
    res11: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[8] at parallelize at <console>:25
    
    scala> res11.partitions.size
    res12: Int = 10
    
    

    makeRDD()

    makeRDD()有两个重载的方法:

    第一个重载方法源码:

      /** Distribute a local Scala collection to form an RDD.
       *
       * This method is identical to `parallelize`.
       * @param seq Scala collection to distribute
       * @param numSlices number of partitions to divide the collection into
       * @return RDD representing distributed collection
       */
      def makeRDD[T: ClassTag](
          seq: Seq[T],
          numSlices: Int = defaultParallelism): RDD[T] = withScope {
        parallelize(seq, numSlices)
      }
    

    第一个重载方法 makeRDD[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism)

    实际和parallelize()方法参数一样,在方法体里面实际上也是调用parallelize(),这里就不展开讲了。

    第二个重载方法源码:

      /**
       * Distribute a local Scala collection to form an RDD, with one or more
       * location preferences (hostnames of Spark nodes) for each object.
       * Create a new partition for each collection item.
       * @param seq list of tuples of data and location preferences (hostnames of Spark nodes)
       * @return RDD representing data partitioned according to location preferences
       */
      def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
        assertNotStopped()
        val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
        new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), indexToPrefs)
      }
    

    第二个重载方法makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])])

    可以指定RDD的数据存放在哪个分区上,传入的参数是一个Seq。

    Seq里面的每个一元素都是一个元组,元组的第一个元素就是分区号,第二个元素是每个分区里面的数据。

    示例:

    //创建3个分区的数据
    scala> val mrdd=sc.makeRDD(List((1,List("a","b","c")),(2,List("b","c","d")),(3,List("c","d","e"))))
    mrdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24
    
    //查看分区的数量
    scala> mrdd.partitions.size
    res1: Int = 3
    
    //获取第一个分区的数据
    scala> mrdd.preferredLocations(mrdd.partitions(0))
    res2: Seq[String] = List(a, b, c)
    
    

    从其他RDD转换

    RDD的转换操作到时候会单独写一篇,敬请期待~

  • 相关阅读:
    信息学奥赛一本通(C++)在线评测系统——基础(一)C++语言——1079:计算分数加减表达式的值
    信息学奥赛一本通(C++)在线评测系统——基础(一)C++语言——1078:求分数序列和
    信息学奥赛一本通(C++)在线评测系统——基础(一)C++语言——1078:求分数序列和
    信息学奥赛一本通(C++)在线评测系统——基础(一)C++语言——1078:求分数序列和
    信息学奥赛一本通(C++)在线评测系统——基础(一)C++语言——1077:统计满足条件的4位数
    晕牛【拓扑排序】【BFS】
    过路费【Floyd】
    电视游戏问题【DP】【背包】
    音乐节拍【模拟】
    密码【高精】
  • 原文地址:https://www.cnblogs.com/Jaryer/p/13569986.html
Copyright © 2011-2022 走看看