zoukankan      html  css  js  c++  java
  • SparkCore| 算子

    RDD

    RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。    A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable可类比String,它也是不可变的,但是可有很多方法,如切分...

    1. RDD的属性

    每个属性对应一个方法,getPartitions: Array[Partition]、compute、getDependencies、Partitioner、 getPreferredLocations(每个分区对应一个Task,把Task发送到哪个位置记录下来)

    * Internally, each RDD is characterized by five main properties:
    * 1) - A list of partitions;  一组分区(Partition),即数据集的基本组成单位; 所有的RDD都有分区;
    * 2) - A function for computing each split;    一个计算每个分区的函数; 
    * 3) - A list of dependencies on other RDDs;  RDD之间的依赖关系,不是所有的RDD都有依赖;
    * 4) - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned即Hash分区器);一个Partitioner,即RDD的分片函数;只有键值对RDD才有分区器
    * 5) - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
    *    an HDFS file); 一个列表,存储存取每个Partition的优先位置(preferred location)。(每个Task任务发送到离数据最近的位置--节点的Executor上),如果一个节点的Executor由于内存cpu等原因不能执行,
    spark会对它有个降级,给同一个节点的另外一个Executor去执行,它如果还是不能执行就去同一个机架上的其他机器上的Executor(跨节点传输数据了),这又是一个降级;如果同一个机架上的都不行,则给同一个机房的其他机架上发,又是一个降级;
    移动数据不如移动计算;

      分区并行计算 task

    2. RDD特点

    RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息。RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。

    1)弹性

    存储的弹性:内存与磁盘的自动切换;(可以基于内存也可以基于磁盘)

    容错的弹性:数据丢失可以自动恢复;(RDD记录了数据怎么计算的,数据丢失了可在上一级自动恢复)

    计算的弹性:计算出错重试机制;(Executor挂了,Driver可以转移到其他Executor)

    分片的弹性:可根据需要重新分片。(总的数据量不变,分区数是可变的)

     2)分区

    RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。如果RDD是通过已有的文件系统构建,则compute函数是读取(逻辑)指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换。

    3)只读

      RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。

    由一个RDD转换到另一个RDD,可以通过丰富的操作算子实现,不再像MapReduce那样只能写map和reduce了。

    RDD的操作算子包括两类,一类叫做transformations,它是用来将RDD进行转化,构建RDD的血缘关系(懒加载、懒执行,只有遇到action才会真正的执行);另一类叫做actions,它是用来触发RDD的计算,得到RDD的相关计算结果或者将RDD保存的文件系统中。

     4)依赖

    RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖。依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应(从上游RDD看)的,(上游的某一个分区被下游的一个或多个分区所使用);

    另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系

    窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女;

    宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,会引起shuffle,总结:宽依赖我们形象的比喻为超生

     5)缓存

    可以缓存到内存也可以缓存到磁盘,缓存没有删除依赖关系;任务执行完之后不管是缓存到内存还是磁盘,它都会被删除掉;

    如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。如下图所示,RDD-1经过一系列的转换后得到RDD-n并保存到hdfs,RDD-1在这一过程中会有个中间结果,如果将其缓存到内存,那么在随后的RDD-1转换到RDD-m这一过程中,就不会计算其之前的RDD-0了。

                            ----->R5  
    R1--->R2--->R3----->R4 ,RDD3缓存到内存计算1次即可,这样子R5、R6从内存掉即可;缓存默认是没开启的,需要调方法;
                            ----->R6

    RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中。

    但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

     * Persist this RDD with the default storage level (`MEMORY_ONLY`).
        def cache(): this.type = persist()
     * Persist this RDD with the default storage level (`MEMORY_ONLY`).
        def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

    通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。

    StorageLevel.scala 存储级别源码  ,在存储级别的末尾加上“_2”来把持久化数据存为两份

    object StorageLevel {
      val NONE = new StorageLevel(false, false, false, false)
      val DISK_ONLY = new StorageLevel(true, false, false, false)
      val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
      val MEMORY_ONLY = new StorageLevel(false, true, false, true)
      val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
      val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
      val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
      val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
      val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
      val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
      val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
      val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
    ...
    View Code

     

    缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

    例如:

    scala> val rdd = sc.makeRDD(Array("kris"))
    scala> val nocache = rdd.map(_.toString + System.currentTimeMillis)
    scala> nocache.collect
    res0: Array[String] = Array(kris1554979614968)                               
    
    scala> nocache.collect
    res1: Array[String] = Array(kris1554979627951)
    
    scala> nocache.collect
    res2: Array[String] = Array(kris1554979629257)
    
    scala> val cache = rdd.map(_.toString + System.currentTimeMillis).cache  //将RDD转换为携带当前时间戳并做缓存
    cache: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at map at <console>:26
    
    scala> cache.collect // 多次打印做了相同的缓存结果
    res3: Array[String] = Array(kris1554979702053)
    
    scala> cache.collect
    res4: Array[String] = Array(kris1554979702053)
    View Code

    RDD若缓存到磁盘(或者内存中),当任务跑完结束时,它会把整个缓存的目录都删除掉,以至于缓存不能被其他任务所使用;

     6)RDD CheckPoint 检查点机制

    缓存到HDFS上,文件一直都在;切断了依赖;

    检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能。

    为当前RDD设置检查点。该函数将会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。同上CheckPoint也需要调用 rdd.checkpoint

    虽然RDD的血缘关系天然地可以实现容错,当RDD的某个分区数据失败或丢失,可以通过血缘关系重建。但是对于长时间迭代型应用来说,随着迭代的进行,RDDs之间的血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能。为此,RDD支持checkpoint将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs了,它可以从checkpoint处拿到数据。

      如果依赖链特别长,可把上游那个存起来缓存起来,直接从缓存里边拿即可,就不会从头开始计算;checkPoint,把依赖给切断,给它缓存起来,下游的RDD对上游也没有依赖,直接从缓存中去取; 而缓存是没有切断依赖的;如果新起一个jar包,CheckPoint是它执行时可直接从缓存(如缓存到了HDFS)中拿, 而缓存(缓存是只有一个jar包中可用,其他任务不可用)还要从头进行计算(它有依赖关系);

    sc.setCheckpointDir("./checkPoint") checkpoint是会再启一个进程再计算一次,所以它会计算2次;

    一般CheckPoint会和缓存结合使用,这样子CheckPoint就只是计算一次了;

      def main(args: Array[String]): Unit = {
        //初始化sc
        val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
        val sc = new SparkContext(conf)
        sc.setLogLevel("ERROR")
    
        sc.setCheckpointDir("./checkPoint")
        val rdd: RDD[Int] = sc.makeRDD(List(1)).map(x => {
          println("计算一次")
          x
        })
        rdd.cache() //sCheckpoint和缓存结合使用,就只计算一次
    //    rdd.persist(StorageLevel.DISK_ONLY)
        rdd.checkpoint()
        rdd.collect()
        rdd.collect()
    
      }
    View Code

      CheckPoint和缓存都可以缓存到磁盘上,根本区别是CheckPoint切断了依赖,缓存的方式不管是缓存到内存还是磁盘,任务执行完之后Driver和Executor就会释放,它会把你缓存到磁盘的那个目录文件都删除;而CheckPoint会一直存在磁盘上;

     缓存和checkpoint的区别
       cache,persist:不会切断血缘关系,可以指定存储级别
       checkpoint:切断血缘关系,持久化存储

    3. RDD编程

     编程模型

    在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换。经过一系列的transformations定义RDD之后,就可以调用actions触发RDD的计算,action可以是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。

      要使用Spark,开发者需要编写一个Driver程序,它被提交到集群以调度运行Worker。Driver中定义了一个或多个RDD,并调用RDD上的action,Worker则执行RDD分区计算任务。

     3.1 RDD的创建

    在Spark中创建RDD的创建方式可以分为三种:从集合中(内存)创建RDD;从外部存储(HDFS、本地磁盘、mysql等)创建RDD;从其他RDD创建(转换算子、action算子)

    1)从内存中创建:
    /** Distribute a local Scala collection to form an RDD.
    * This method is identical to `parallelize`.
    */
    def makeRDD[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
    parallelize(seq, numSlices)
    }
    def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {  可指定位置,发送到哪个分区的task,这种方法一般不用;
    def parallelize[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
    }
    scala> val x = sc.makeRDD(List(1,2,3,4))
    x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
    
    scala> x.collect
    res0: Array[Int] = Array(1, 2, 3, 4)
    
    scala> val y = sc.parallelize(1 to 5).collect
    y: Array[Int] = Array(1, 2, 3, 4, 5)
    
    scala> z.collect
    res1: Array[String] = Array(Hello World, Hello java, Hello spark, "")

    查看分区数

    scala> x.getNumPartitions
    res2: Int = 8
    
    scala> x.partitions.size
    res4: Int = 8

    3.2 默认分区规则:

    ① 从集合中创建默认分区规则:
      在SparkContext中查找makeRDD

    local模式分区数默认=核数;集群模式 math.max(totalCoreCount.get(),2)

    numSlices: Int = defaultParallelism): RDD[T] = withScope 
    taskScheduler.defaultParallelism
    def defaultParallelism(): Int  ctrl+h 特质--看它的实现类
     override def defaultParallelism(): Int = backend.defaultParallelism()
    def defaultParallelism(): Int 特质
     override def defaultParallelism(): Int =
        scheduler.conf.getInt("spark.default.parallelism", totalCores) alt+<-返回;总核数totalCores 8个
    
      def makeRDD[T: ClassTag](
          seq: Seq[T], 
          numSlices: Int = defaultParallelism): RDD[T] = withScope { ##defaultParallelism 8个
        parallelize(seq, numSlices)  #8个,如果从集合中创建RDD,Local模式的默认分区数是总核数
      }    
        
    CoarseGrainedSchedulerBackend  yarn或standalone模式
      override def defaultParallelism(): Int = {
        conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) #总核数与2取最大值
      }    
    View Code

    ② 从文件系统中读默认分区规则:

    scala> val z = sc.textFile("./wc.txt")
    z: org.apache.spark.rdd.RDD[String] = ./wc.txt MapPartitionsRDD[6] at textFile at <console>:24
    
    scala> z.getNumPartitions
    res6: Int = 2
      def textFile(
          path: String,
          minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
        assertNotStopped()
        hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
          minPartitions).map(pair => pair._2.toString).setName(path)
      }
    
        def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
      def defaultParallelism: Int = {
        assertNotStopped()
        taskScheduler.defaultParallelism
      }    
      def defaultParallelism(): Int -->查看它的特质实现类   override def defaultParallelism(): Int = backend.defaultParallelism()
      def defaultParallelism(): Int -->查看它的特质实现类 
        override def defaultParallelism(): Int =
        scheduler.conf.getInt("spark.default.parallelism", totalCores)  ##总核数 8
    返回:def defaultMinPartitions: Int = math.min(defaultParallelism, 2) ##defaultParallelism为8
        textFile中: minPartitions: Int = defaultMinPartitions): RDD[String] = withScope 这个值为2
    hadoopFile中用到这个方法
        hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
          minPartitions).map(pair => pair._2.toString).setName(path)    
     def hadoopFile[K, V](
          path: String,
          inputFormatClass: Class[_ <: InputFormat[K, V]],
          keyClass: Class[K],
          valueClass: Class[V],
          minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
        assertNotStopped()
        
        new HadoopRDD(
          this,
          confBroadcast,
          Some(setInputPathsFunc),
          inputFormatClass,
          keyClass,
          valueClass,
          minPartitions).setName(path)
    class HadoopRDD[K, V](
        sc: SparkContext,
        broadcastedConf: Broadcast[SerializableConfiguration],
        initLocalJobConfFuncOpt: Option[JobConf => Unit],
        inputFormatClass: Class[_ <: InputFormat[K, V]],
        keyClass: Class[K],
        valueClass: Class[V],
        minPartitions: Int)      
      
    override def getPartitions: Array[Partition] = {
        val inputSplits = inputFormat.getSplits(jobConf, minPartitions)}  ##
        
        getSplits-->InputFormat-->FileInputFormat找getSplits方法
        18  long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits); totalSize总大小wc.txt总共36字节,numSplits要传的参数2
        1  long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize); --> private long minSplitSize = 1L;
          long blockSize = file.getBlockSize(); #块大小,HDFS上128M,windows是32M
          long splitSize = this.computeSplitSize(goalSize, minSize, blockSize); (18, 1, 128) -->18
            return Math.max(minSize, Math.min(goalSize, blockSize)); 18
            文件切片机制按1.1倍判断,
            (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize
            36/22 > 1.1  -->36-22=14/22 <1.1不切, 最终得到2片切片
    View Code

    3.3 RDD的转换

    RDD整体上分为Value类型和Key-Value类型

     Value类型

      1.map(func)

    作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

    /**
    * Return a new RDD by applying a function to all elements of this RDD.
    */
    def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
    }
    map:
    scala> val x = sc.makeRDD(1 to 4)  #sc.parallelize(1 to 4)
    x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at makeRDD at <console>:24
    
    scala> x.map(x=>x.toString).collect
    res9: Array[String] = Array(1, 2, 3, 4)
    
    scala> x.map(x=>(x,1)).collect  ##.map(_ * 2).collect()所有元素*2
    res10: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1))
    scala> x.map((_,1)).collect
    res12: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1))

    2. mapPartitions(func)

    mapPartitions:作用:类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。
    假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。
    /**
    * Return a new RDD by applying a function to each partition of this RDD.
    * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
    * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
    */
    def mapPartitions[U: ClassTag](
    f: Iterator[T] => Iterator[U],
    preservesPartitioning: Boolean = false): RDD[U] = withScope {
    val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
    this,
    (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
    preservesPartitioning)
    }
    
    scala> val x = sc.makeRDD(1 to 8)
    x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at makeRDD at <console>:24
    
    scala> x.mapPartitions(x=>x.map((_,1))).collect  ## x.mapPartitions((x=>x.map(_*2))).collect
    res13: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1), (8,1))
    
    scala> x.map((_, 1)).collect
    res14: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1), (8,1))

    map()和mapPartition()的区别  (①传入函数执行次数不同;②效率不一样,map执行完一条内存就释放而mapPartition是一个分区的处理完内存也不释放)
      1. map():每次处理一条数据。(处理完一条内存就释放了)
      2. mapPartition():每次处理一个分区的数据(相当于批处理,一个分区数据批处理完它的内存不会释放),这个分区的数据处理完后,原RDD中分区的数据才能释放,可能导致OOM。
       3. 开发指导:当内存空间较大的时候建议使用mapPartition(),以提高处理效率。


     3.  mapPartitionsWithIndex(func)

    作用:类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U];

    /**
    * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
    * of the original partition.
    * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
    * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
    */
    def mapPartitionsWithIndex[U: ClassTag](
    f: (Int, Iterator[T]) => Iterator[U], ##int分区号(从0开始的),分区数据---->转换为U类型
    preservesPartitioning: Boolean = false): RDD[U] = withScope {
    val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
    this,
    (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
    preservesPartitioning)
    }
    scala> val x = sc.makeRDD(List("kris", "alex", "heihei"))
    x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[16] at makeRDD at <console>:24
    
    scala> x.mapPartitionsWithIndex((index,par)=>par.map((index,_))).collect
    res15: Array[(Int, String)] = Array((2,kris), (5,alex), (7,heihei))

     4.flatMap(func) 可以一对一,也可以一对多

      作用:类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

    /**
    * Return a new RDD by first applying a function to all elements of this
    * RDD, and then flattening the results.
    */
    def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { ##函数返回的必须是可迭代的
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
    }

    scala> val x = sc.makeRDD(1 to 4)
    x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[18] at makeRDD at <console>:24
    
    scala> x.map(x=>x).collect
    res16: Array[Int] = Array(1, 2, 3, 4)
    
    scala> x.flatMap(x=>x).collect
    <console>:27: error: type mismatch;
     found   : Int
     required: TraversableOnce[?] ##需要可迭代的
           x.flatMap(x=>x).collect
                        ^
    scala> x.flatMap(x=>x.toString).collect 
    res18: Array[Char] = Array(1, 2, 3, 4)
    
    
    scala> val x = sc.makeRDD(List(Array(1,2,3,4), Array(5,6,7)))
    x: org.apache.spark.rdd.RDD[Array[Int]] = ParallelCollectionRDD[22] at makeRDD at <console>:24
    
    scala> x.flatMap(x=>x).collect
    res20: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7)
    5.glom 实际开发中用的并不是很多

      作用:将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]

      /**
       * Return an RDD created by coalescing all elements within each partition into an array.
       */
      def glom(): RDD[Array[T]] = withScope {
        new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
      }
    scala> val x = sc.makeRDD(1 to 10)
    x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at makeRDD at <console>:24
    
    scala> x.getNumPartitions
    res21: Int = 8
    
    scala> x.glom.collect
    res22: Array[Array[Int]] = Array(Array(1), Array(2), Array(3), Array(4, 5), Array(6), Array(7), Array(8), Array(9, 10))  
      

    6. groupBy(func)

    作用:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。

     def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
        groupBy[K](f, defaultPartitioner(this))
      }
    scala> val x = sc.makeRDD(1 to 5)
    x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at makeRDD at <console>:24
    
    scala> x.groupBy(_%2).collect
    res23: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4)), (1,CompactBuffer(1, 3, 5)))
    scala> x.groupBy(x=>x>1).collect
    res24: Array[(Boolean, Iterable[Int])] = Array((false,CompactBuffer(1)), (true,CompactBuffer(2, 3, 4, 5)))
      CompactBuffer
    * An append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers. private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable { } 指定在spark包下使用,似有的

     7. filter(func) 

    作用:过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。

      
      /**
       * Return a new RDD containing only the elements that satisfy a predicate.
       */
      def filter(f: T => Boolean): RDD[T] = withScope {
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[T, T](
          this,
          (context, pid, iter) => iter.filter(cleanF),
          preservesPartitioning = true)
      }
    
    scala> val x = sc.makeRDD(List("Hello Kris", "baidu", "jd"))
    x: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[37] at makeRDD at <console>:24
    
    scala> x.filter(x=>x.contains("Kris")).collect
    res28: Array[String] = Array(Hello Kris)
    scala> val x = sc.makeRDD(1 to 4)
    x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[39] at makeRDD at <console>:24
    
    scala> x.filter(x => x>1).collect   ## x.filter(x=>x%2).collect 这样子写就不可以了,它的返回值不是Boolean类型
    res29: Array[Int] = Array(2, 3, 4)

      8. sample(withReplacement, fraction, seed)
      放回和不放回抽样

    作用:以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样seed用于指定随机数生成器种子。  不放回的抽样,每个元素被抽中的概率[0,1]; 有放回的抽样,每个元素抽中的次数 >=0

    源码如下
    /**
       * Return a sampled subset of this RDD.
       *
       * @param withReplacement can elements be sampled multiple times (replaced when sampled out)
       * @param fraction expected size of the sample as a fraction of this RDD's size
       *  without replacement: probability that each element is chosen; fraction must be [0, 1]
       *  with replacement: expected number of times each element is chosen; fraction must be greater
       *  than or equal to 0
       * @param seed seed for the random number generator
       *
       * @note This is NOT guaranteed to provide exactly the fraction of the count
       * of the given [[RDD]].
       */
     def sample(
          withReplacement: Boolean,
          fraction: Double,
          seed: Long = Utils.random.nextLong): RDD[T] = {  #它如果是固定的,产生的随机数也是固定的
        require(fraction >= 0,
          s"Fraction must be nonnegative, but got ${fraction}")
    
        withScope {
          require(fraction >= 0.0, "Negative fraction value: " + fraction)
          if (withReplacement) {
            new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
          } else {
            new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
          }
        }
      }  
    View Code
    抽取的数的比例尽量靠近比例,不一定就是抽取的比例数
    false 不放回抽样:
    scala> val x = sc.makeRDD(1 to 10)
    x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[41] at makeRDD at <console>:24
    scala> x.sample(false,1,100).collect
    res33: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    
    scala> x.sample(false,0,100).collect
    res34: Array[Int] = Array()
    
    scala> x.sample(false,0.5,100).collect
    res35: Array[Int] = Array(1, 8, 9)
    
    scala> x.sample(false,0.5,10).collect
    res36: Array[Int] = Array(2, 4, 5, 7, 8, 9)
    
    scala> x.sample(false,0.5,10).collect  ##seed随机数生成器一样,它返回的结果是一样的
    res37: Array[Int] = Array(2, 4, 5, 7, 8, 9)
    
    scala> x.sample(false,0.5,10).collect
    res38: Array[Int] = Array(2, 4, 5, 7, 8, 9)
    
    scala> x.sample(false,0.5,9).collect
    res39: Array[Int] = Array(2, 5, 6, 7)
    
    scala> x.sample(false,0.5,8).collect
    res40: Array[Int] = Array(4)  
      不放回, 每个元素被抽中的概率 [0,1]
      放回,每个元素抽中的次数>=0

    true 放回的抽样: scala> x.sample(true,0,100).collect res41: Array[Int] = Array() scala> x.sample(true,0,100).collect res42: Array[Int] = Array() scala> x.sample(true,1,10).collect res43: Array[Int] = Array(2, 3, 4, 4, 4, 5, 8, 8, 9) scala> x.sample(true,2,10).collect res44: Array[Int] = Array(2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 5, 7, 8, 8, 9, 9, 10) scala> x.sample(true,3,10).collect res45: Array[Int] = Array(1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 5, 5, 5, 5, 6, 7, 8, 8, 9, 9, 10) scala> x.sample(false,0.5,System.currentTimeMillis).collect res46: Array[Int] = Array(1, 2, 4, 7, 8, 10) scala> x.sample(false,0.5,System.currentTimeMillis).collect res47: Array[Int] = Array(2, 3, 4, 6, 7) scala> x.sample(false,0.5,System.currentTimeMillis).collect res48: Array[Int] = Array(4, 6, 7, 9, 10)

      9. distinct([numTasks])) 

     作用:对源RDD进行去重后返回一个新的RDD。

    scala> val x = sc.makeRDD(List(1,2,1,1,2,1))
    x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[59] at makeRDD at <console>:24
    
    scala> x.distinct().collect
    res49: Array[Int] = Array(1, 2)  

      10.  coalesce(numPartitions)  

     作用:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。

       def coalesce(numPartitions: Int, shuffle: Boolean = false,  #shuffle是打乱重组
                   partitionCoalescer: Option[PartitionCoalescer] = Option.empty) 

    不产生shuffle想把它的分区数变多是不可以的,变少是可以的;
      合并为一个分区不需要打乱重组,而拆分必须打乱重组

    scala> val x = sc.makeRDD(1 to 4)
    x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[63] at makeRDD at <console>:24
    
    scala> x.getNumPartitions
    res50: Int = 8
    
    scala> x.glom.collect
    res51: Array[Array[Int]] = Array(Array(), Array(1), Array(), Array(2), Array(), Array(3), Array(), Array(4))
    
    scala> x.coalesce(3, false).collect
    res52: Array[Int] = Array(1, 2, 3, 4)
    
    scala> x.coalesce(3, false).glom.collect
    res53: Array[Array[Int]] = Array(Array(1), Array(2), Array(3, 4))
    
    scala> x.coalesce(3, false).getNumPartitions
    res54: Int = 3
    
    scala> x.coalesce(5,false).getNumPartitions
    res55: Int = 5
    
    scala> x.coalesce(9,false).getNumPartitions
    res56: Int = 8
    
    scala> x.coalesce(9,true).getNumPartitions
    res57: Int = 9    

      11.repartition(numPartitions)
      改变分区数,肯定会产生shufle

      作用:根据分区数,重新通过网络随机洗牌所有数据。

       def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
        coalesce(numPartitions, shuffle = true)
      } 
    scala> x.repartition(10).getNumPartitions
    res58: Int = 10  

    coalesce与repartition两个算子的作用以及区别与联系。(①都可改变rdd分区数;②repartition是coalesce的一种特殊情况,肯定产生shuffle)  

      1. coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
      2. repartition实际上是调用的coalesce,进行shuffle。

    12. sortBy(func,[ascending], [numTasks])  
    作用;使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。
       /**
       * Return this RDD sorted by the given key function.
       */
      def sortBy[K](
          f: (T) => K,
          ascending: Boolean = true,
          numPartitions: Int = this.partitions.length)
          (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
        this.keyBy[K](f)
            .sortByKey(ascending, numPartitions)
            .values
      }
    scala> val x = sc.makeRDD(List(5,4,3,2))
    x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[79] at makeRDD at <console>:24
    
    scala> x.sortBy(x=>x).collect
    res60: Array[Int] = Array(2, 3, 4, 5)
    
    scala> x.sortBy(x=>x%2).collect
    res61: Array[Int] = Array(4, 2, 5, 3)

     13. pipe(command, [envVars])
     作用:管道,针对每个分区,都执行一个shell脚本,返回输出的RDD。
        注意:脚本需要放在Worker节点可以访问到的位置

      /**
       * Return an RDD created by piping elements to a forked external process.
       */
      def pipe(command: String): RDD[String] = withScope {
        // Similar to Runtime.exec(), if we are given a single string, split it into words
        // using a standard StringTokenizer (i.e. by spaces)
        pipe(PipedRDD.tokenize(command))
      }
    [kris@hadoop101 spark-local]$ vim pipe.sh
    #!/bin/bash
    echo "AA"
    while read LINE; do
            echo ">>>"${LINE}
    done
     ## while是默认读取整行,for是默认以空格切割  
    scala
    > val x = sc.makeRDD(1 to 4, 1) x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[90] at makeRDD at <console>:24 scala> x.collect res63: Array[Int] = Array(1, 2, 3, 4) scala> x.pipe("./pipe.sh").collect res65: Array[String] = Array(AA, >>>1, >>>2, >>>3, >>>4) scala> val x = sc.makeRDD(1 to 4, 3) x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[92] at makeRDD at <console>:24 scala> x.pipe("./pipe.sh").collect res66: Array[String] = Array(AA, >>>1, AA, >>>2, AA, >>>3, >>>4) ##在每个分区前都会打印AA

     scala> x.pipe("./pipe.sh").getNumPartitions
     res45: Int = 8

     scala> x.pipe("./pipe.sh").collect
     res46: Array[String] = Array(AA, AA, >>>1, AA, AA, >>>2, AA, AA, >>>3, AA, AA, >>>4)

     

    双Value类型交互

      1 union(otherDataset) 并集
         作用:对源RDD和参数RDD求并集后返回一个新的RDD

        union没有shuffle   

      /**
       * Return the union of this RDD and another one. Any identical elements will appear multiple
       * times (use `.distinct()` to eliminate them).
       */
      def union(other: RDD[T]): RDD[T] = withScope {
        sc.union(this, other)
      }
    scala> val x = sc.makeRDD(1 to 4)
    x: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[94] at makeRDD at <console>:24
    
    scala> val y = sc.makeRDD(3 to 8)
    y: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[95] at makeRDD at <console>:24
    
    scala> x.union(y).collect
    res67: Array[Int] = Array(1, 2, 3, 4, 3, 4, 5, 6, 7, 8)
    
      sql中的union会去重,union all是直接把结果拿过来做个并集;
      

      2. subtract (otherDataset) 差集

      作用:计算差的一种函数,去除两个RDD中相同的元素,不同的RDD将保留下来

    x (1 to 4),y(3 to 8)

    scala> x.subtract(y).collect
    res68: Array[Int] = Array(1, 2)
    scala> y.subtract(x).collect
    res69: Array[Int] = Array(8, 5, 6, 7)

      3. intersection(otherDataset) 交集

      作用:对源RDD和参数RDD求交集后返回一个新的RDD

    scala> x.intersection(y).collect
    res70: Array[Int] = Array(3, 4)
    
    scala> y.intersection(x).collect
    res71: Array[Int] = Array(3, 4)

      4.cartesian(otherDataset)  笛卡尔积
      作用:笛卡尔积(尽量避免使用)

    scala> x.cartesian(y).collect
    res72: Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (1,6), (1,7), (1,8), (2,3), (2,4), (2,5), (2,6), (2,7), (2,8), (3,3), (3,4), (3,5), (3,6), (3,7), (3,8), (4,3), (4,4), (4,5), (4,6), (4,7), (4,8))

     5. zip(otherDataset)
    作用:将两个RDD组合成Key/ Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都要相同(需要注意的点),否则会抛出异常。

    scala> y.collect
    res75: Array[Int] = Array(3, 4, 5, 6)
    
    scala> x.collect
    res76: Array[Int] = Array(1, 2, 3, 4)
    
    scala> x.getNumPartitions
    res77: Int = 8
    
    scala> y.getNumPartitions
    res78: Int = 8
    
    scala> x.zip(y).collect
    res79: Array[(Int, Int)] = Array((1,3), (2,4), (3,5), (4,6))
    
    scala> y.zip(x).collect
    res80: Array[(Int, Int)] = Array((3,1), (4,2), (5,3), (6,4))
    
    如果分区数不一样java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(8, 3)
    如果元素数量不一样 Can only zip RDDs with same number of elements in each partition

    Key-Value类型 键值对RDD,只有kv键值对RDD才有分区器这个概念

     1 partitionBy案例
    并不是一创建就会有默认分区器HashPartitioner,这里默认是多部分算子都是用的是Hash分区器

    scala> val x = sc.textFile("./wc.txt").flatMap(_.split(" ")).map((_,1))
    x: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[130] at map at <console>:24
    
    scala> x.partitioner
    res91: Option[org.apache.spark.Partitioner] = None 
    
    scala> val x = sc.textFile("./wc.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) ##这样才会产生默认分区器
    x: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[135] at reduceByKey at <console>:24
    
    scala> x.partitioner
    res92: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)

    分区器有HashPartitionerRangePartitioner
       abstract class Partitioner extends Serializable
      class HashPartitioner(partitions: Int) extends Partitioner
     作用:对pairRDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程。

    源码:
     /**
       * Return a copy of the RDD partitioned using the specified partitioner.
       */
      def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
        if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
          throw new SparkException("HashPartitioner cannot partition array keys.")
        }
        if (self.partitioner == Some(partitioner)) {
          self
        } else {
          new ShuffledRDD[K, V, V](self, partitioner)
        }
      }

     必须是key,value类型的才有分区器

    scala> x.partitioner
    res92: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)
    
    scala> 
    scala> x.getNumPartitions
    res93: Int = 2
    scala> val y = x.partitionBy(new org.apache.spark.HashPartitioner(4))  ##如果要改变RDD的分区数或分区器,都可以直接调用partitionBy; 重写分区;
    y: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[136] at partitionBy at <console>:26
    
    scala> y.getNumPartitions
    res96: Int = 4

    如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程。
       我们传进去的分区器是自己的分区器就不会产生shuffle

    scala> y.partitionBy(y.partitioner)
    <console>:29: error: type mismatch;
     found   : Option[org.apache.spark.Partitioner]
     required: org.apache.spark.Partitioner
           y.partitionBy(y.partitioner)
                           ^
    
    scala> y.partitionBy(y.partitioner.get)
    res98: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[136] at partitionBy at <console>:26
    
    scala> res98.partitioner
    res99: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@4)
    
    scala> res98.getNumPartitions
    res100: Int = 4
    
    scala> y.partitionBy(y.partitioner.get)
    res101: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[136] at partitionBy at <console>:26
    
    scala> y.partitionBy(y.partitioner.get).collect
    res102: Array[(String, Int)] = Array(("",1), (spark,1), (Hello,3), (World,1), (java,1))
    
    scala> y.partitionBy(y.partitioner.get).count
    res103: Long = 5

    scala> val x = sc.textFile("./wc.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey
    reduceByKey   reduceByKeyLocally
    
    scala> val x = sc.textFile("./wc.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    x: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[141] at reduceByKey at <console>:24
    
    scala> x.getNumPartitions
    res105: Int = 2
    
    scala> x.partitionBy(new org.apache.spark.HashPartitioner(2)).collect
    res106: Array[(String, Int)] = Array((Hello,3), ("",1), (World,1), (java,1), (spark,1))

    只有传进去的是当前对象的partition才不会产生shuffle;   scala> y.partitionBy(new org.apache.spark.HashPartitioner(4)).count  res104: Long = 5 链太长了它会走缓存的

    2.  reduceByKey(func, [numTasks]) 

     在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。

    scala> val rdd = sc.parallelize(List(("female", 1), ("male", 6), ("female", 5), ("male", 2)))
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[143] at parallelize at <console>:24
    
    scala> rdd.reduceByKey
    reduceByKey   reduceByKeyLocally
    
    scala> rdd.reduceByKey(_+_).collect
    res108: Array[(String, Int)] = Array((female,6), (male,8))
    
    scala> rdd.reduceByKey(_*_).collect
    res110: Array[(String, Int)] = Array((female,5), (male,12))
    
    scala> rdd.reduceByKey(_ max _).collect  #先在分区内进行计算,最终分区之间也要做计算;
    res111: Array[(String, Int)] = Array((female,5), (male,6))
    3. groupByKey
    作用:groupByKey也是对每个key进行操作,但只生成一个seq
    scala> val rdd = sc.parallelize(List(("female", 1), ("male", 6), ("female", 5), ("male", 2), ("male", 3)))
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[150] at parallelize at <console>:24
    
    scala> rdd.groupByKey().collect
    res114: Array[(String, Iterable[Int])] = Array((female,CompactBuffer(1, 5)), (male,CompactBuffer(6, 2, 3)))

    groupByKey与reduceByKey的区别   (reduceByKey有预聚合功能,效率比较高)

      1. reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。先在分区内部进行聚合,
      2. groupByKey:按照key进行分组,直接进行shuffle。 有多少条数据直接进行shuffle,打乱重组直接发到下游
      3. 开发指导:reduceByKeygroupByKey性能高,建议使用。但是需要注意是否会影响业务逻辑。

     4. aggregateByKey
     参数:(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)
    1. 作用:在kv对的RDD中,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。
    2. 参数描述:
    (1)zeroValue:给每一个分区中的每一种key一个初始值;
    (2)seqOp:函数用于在每一个分区中用初始值逐步迭代value;##values与初始化迭代聚合
    (3)combOp:函数用于合并每个分区中的结果。 #分区之间的聚合
    3. 需求:创建一个pairRDD,取出每个分区相同key对应值的最大值,然后相加

    源码:

     def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
     
        // Serialize the zero value to a byte array so that we can get a new clone of it on each key
        val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)
        val zeroArray = new Array[Byte](zeroBuffer.limit)
        zeroBuffer.get(zeroArray)
    
        lazy val cachedSerializer = SparkEnv.get.serializer.newInstance()
        val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray))
    
        // We will clean the combiner closure later in `combineByKey`
        val cleanedSeqOp = self.context.clean(seqOp)
        combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
          cleanedSeqOp, combOp, partitioner)
      }
    View Code
    scala> val rdd = sc.parallelize(List(("a",3), ("a",2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)
    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
    scala> rdd.getNumPartitions
    res27: Int = 2
    
    scala> rdd.collect
    res5: Array[(String, Int)] = Array((a,3), (a,2), (c,4), (b,3), (c,6), (c,8))
    scala> rdd.glom.collect
    res28: Array[Array[(String, Int)]] = Array(Array((a,3), (a,2), (c,4)), Array((b,3), (c,6), (c,8)))
    
    scala> rdd.aggregateByKey(0)(_ max _, _+_).collect
    res0: Array[(String, Int)] = Array((b,3), (a,3), (c,12))    
      
    求平均值:
    scala> rdd.aggregateByKey((0,0))((init,v)=>(init._1+v,init._2+1),(x,y)=>(x._1+y._1,x._2+y._2)).collect
    res2: Array[(String, (Int, Int))] = Array((b,(3,1)), (a,(5,2)), (c,(18,3)))
      求出每个相同key的和,相同key的个数; 和,次数
    scala> val result = rdd.aggregateByKey((0,0))((init,v)=>(init._1+v,init._2+1),(x,y)=>(x._1+y._1,x._2+y._2)) 
    //(0,0)代表和,次数; init是k的初始值,v是同一种k的v,和相加,次数也相加;init是元组类型,访问它的k即_1 和它的个数v
    //每一个分区的每一种key--->初始值元组(0,0)
    //每个分区之间做运行也需要传两个参数,(x,y)是每个分区之间做处理,每个x是代表元组类型,每个分区的每种key会得到一个元组;x,y都是k v对 result: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[3] at aggregateByKey at <console>:26 scala> result.collect res31: Array[(String, (Int, Int))] = Array((b,(3,1)), (a,(5,2)), (c,(18,3))) scala> result.mapValues(x=>x._1/x._2).collect res3: Array[(String, Int)] = Array((b,3), (a,2), (c,6)) scala> result.mapValues(x=>x._1.toDouble/x._2).collect res4: Array[(String, Double)] = Array((b,3.0), (a,2.5), (c,6.0))

    6 foldByKey案例

    参数:(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

    1. 作用:aggregateByKey的简化操作,seqop和combop相同
    scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
    rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:24
    scala> rdd.collect
    res39: Array[(Int, Int)] = Array((1,3), (1,2), (1,4), (2,3), (3,6), (3,8))
    scala> rdd.glom.collect
    res41: Array[Array[(Int, Int)]] = Array(Array((1,3), (1,2)), Array((1,4), (2,3)), Array((3,6), (3,8)))
    计算相同key对应值的相加结果
    scala> rdd.foldByKey(0)(_+_).collect
    res42: Array[(Int, Int)] = Array((3,14), (1,9), (2,3))
    求平均值:
    scala> rdd.map(x => (x._1, (x._2, 1))).foldByKey((0, 0)) ((x, y) => (x._1+y._1, x._2+y._2)).mapValues(x => x._1.toDouble/x._2).collect
    res44: Array[(Int, Double)] = Array((3,7.0), (1,3.0), (2,3.0))

    7 combineByKey[C] 案例

    参数:(createCombiner: V => C,  mergeValue: (C, V) => C,  mergeCombiners: (C, C) => C)

    1. 作用:针对相同K,将V合并成一个集合。
    2. 参数描述:

    (1)createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值

    (2)mergeValue: 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并

    (3)mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。

    1. 需求:创建一个pairRDD,根据key计算每种key的均值。(先计算每个key出现的次数以及可以对应值的总和,再相除得到结果)
    scala> val input = sc.parallelize(Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),2)
    input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[52] at parallelize at <console>:26
    scala> val result = input.combineByKey((_,1), (acc:(Int, Int), v) => (acc._1 + v, acc._2 + 1), (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)).collect
    result: Array[(String, (Int, Int))] = Array((b,(286,3)), (a,(274,3)))
    scala> val result = input.combineByKey((_,1), (acc:(Int, Int), v) => (acc._1 + v, acc._2 + 1), (x: (Int, Int), y: (Int, Int)) => (x._1 + y._1, x._2 + y._2)).mapValues(x => x._1.toDouble / x._2).collect
    result: Array[(String, Double)] = Array((b,95.33333333333333), (a,91.33333333333333))

    8 sortByKey([ascending], [numTasks]) 案例

    1. 作用:在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

    2. 需求:创建一个pairRDD,按照key的正序和倒序进行排序

    scala> val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
    rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[50] at parallelize at <console>:24
    scala> rdd.collect
    res58: Array[(Int, String)] = Array((3,aa), (6,cc), (2,bb), (1,dd))
    scala> rdd.sortByKey().collect
    res57: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc))
    scala> rdd.sortByKey(false).collect
    res59: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd))

    9 mapValues案例

    1. 针对于(K,V)形式的类型只对V进行操作

    2. 需求:创建一个pairRDD,并将value添加字符串"|||"

    scala> val rdd = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c")))
    rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[57] at parallelize at <console>:24
    
    scala> rdd.mapValues(_+"|").collect
    res60: Array[(Int, String)] = Array((1,a|), (1,d|), (2,b|), (3,c|))

    10 join(otherDataset, [numTasks]) 案例

    1. 作用:在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

    2. 需求:创建两个pairRDD,并将key相同的数据聚合到一个元组。

    scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c"),(4,"d")))
    rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[67] at parallelize at <console>:24
    
    scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6),(5,5)))
    rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[68] at parallelize at <console>:24
    
    scala> rdd.collect
    res68: Array[(Int, String)] = Array((1,a), (2,b), (3,c), (4,d))
    
    scala> rdd1.collect
    res69: Array[(Int, Int)] = Array((1,4), (2,5), (3,6), (5,5))
    
    scala> rdd.leftOuterJoin(rdd1).collect
    res70: Array[(Int, (String, Option[Int]))] = Array((1,(a,Some(4))), (2,(b,Some(5))), (3,(c,Some(6))), (4,(d,None)))
    
    scala> rdd.join(rdd1).collect
    res71: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))
    
    scala> rdd.rightOuterJoin(rdd1).collect
    res72: Array[(Int, (Option[String], Int))] = Array((1,(Some(a),4)), (2,(Some(b),5)), (3,(Some(c),6)), (5,(None,5)))

    11 cogroup(otherDataset, [numTasks]) 案例

    1. 作用:在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD

    2. 需求:创建两个pairRDD,并将key相同的数据聚合到一个迭代器。

    scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
    rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[78] at parallelize at <console>:24
    scala> val rdd1 = sc.parallelize(Array((1,4), (2,5), (3,6)))
    rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[80] at parallelize at <console>:24
    scala> rdd.collect
    res74: Array[(Int, String)] = Array((1,a), (2,b), (3,c))
    
    scala> rdd1.collect
    res75: Array[(Int, Int)] = Array((1,4), (2,5), (3,6))
    scala> rdd.cogroup(rdd1).collect
    res73: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((1,(CompactBuffer(a),CompactBuffer(4))), (2,(CompactBuffer(b),CompactBuffer(5))), (3,(CompactBuffer(c),CompactBuffer(6))))

     求平均值

    ① def aggregateByKey[U: ClassTag]
      (zeroValue: U) 给每一个分区中的每一种key一个初始值;
      (seqOp: (U, V) => U, combOp: (U, U) => U)
    : RDD[(K, U)] = self.withScope seqOp函数用于在每一个分区中用初始值逐步迭代value; combOp:函数用于合并每个分区中的结果 combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),cleanedSeqOp, combOp, partitioner) ② def foldByKey 它是aggregateByKey的简化操作,seqop和combop相同
      (zeroValue: V) 初始值要跟rdd 的 V一致;
      (func: (V, V)
    => V): RDD[(K, V)] = self.withScope combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, partitioner) ③ def combineByKey[C](   createCombiner: V => C, //给每个分区内部的每一种key一个初始函数   mergeValue: (C, V) => C, //合并每个分区内部同种key的值,返回类型跟初始函数返回类型相同   mergeCombiners: (C, C) => C, //分区之间,相同的key的值进行聚合   partitioner: Partitioner, //分区器   mapSideCombine: Boolean = true, //是否进行预聚合   serializer: Serializer = null): RDD[(K, C)] = self.withScope {     combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine, serializer)(null) } ④ def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) aggregateByKey,reduceByKey, foldByKey def combineByKeyWithClassTag[C]( createCombiner: V => C, 给每个分区内部每一种k一个初始函数,得到一个初始值 mergeValue: (C, V) => C, 分区内部同种key不同值进行合并,第一个参数,是是一个函数的返回值 mergeCombiners: (C, C) => C, 分区之间,同种k的value合并 partitioner: Partitioner, 分区器 mapSideCombine: Boolean = true, 是否进行预聚合 serializer: Serializer = null)
     scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
    
    scala> rdd.glom.collect
    res16: Array[Array[(Int, Int)]] = Array(Array((1,3), (1,2)), Array((1,4), (2,3)), Array((3,6), (3,8)))
    ①aggregateByKey 
    scala> rdd.aggregateByKey((0,0)) ((init,v) => (init._1+v, init._2+1), (x,y) => (x._1+y._1, x._2+y._2)).collect
    res15: Array[(Int, (Int, Int))] = Array((3,(14,2)), (1,(9,3)), (2,(3,1)))
    ②foldByKey 初始值必须要跟rdd的v一样,
    scala> rdd.map(x => (x._1, (x._2, 1))).foldByKey((0,0)) ((x,y) => (x._1+y._1, x._2+y._2)).collect
    res30: Array[(Int, (Int, Int))] = Array((3,(14,2)), (1,(9,3)), (2,(3,1)))
    ③combineByKey
    scala> rdd.combineByKey(x => (x,1), (acc:(Int,Int),newValue) => (acc._1+newValue, acc._2+1),(x:(Int, Int), y:(Int, Int))=>(x._1+y._1, x._2+y._2)).collect
    res29: Array[(Int, (Int, Int))] = Array((3,(14,2)), (1,(9,3)), (2,(3,1)))
    ④reduceByKey
    scala> rdd.map(x=>(x._1, (x._2, 1))).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).collect
    res35: Array[(Int, (Int, Int))] = Array((3,(14,2)), (1,(9,3)), (2,(3,1)))
    scala> rdd.map(x=>(x._1, (x._2, 1))).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x => x._1.toDouble/x._2).collect
    res6: Array[(Int, Double)] = Array((3,7.0), (1,3.0), (2,3.0))

    Action 

    action算子
    行动算子都调了sc.runjob

    1 reduce(func)案例

    def reduce(f: (T, T) => T): T

    1. 作用:通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。

    2. 需求:创建一个RDD,将所有元素聚合得到结果

    scala> val rdd = sc.makeRDD(1 to 10, 2)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[83] at makeRDD at <console>:24
    scala> rdd.collect
    res78: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    
    scala> rdd.reduce(_+_)
    res77: Int = 55
    scala> rdd.map((_,1)).collect
    res79: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1), (6,1), (7,1), (8,1), (9,1), (10,1))
    
    scala> rdd.map((_,1)).reduce((x,y) => (x._1+y._1, x._2+y._2))
    res82: (Int, Int) = (55,10)
    scala> rdd.reduce(_ max _)
    res84: Int = 10
    scala> val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5)))
    rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[86] at makeRDD at <console>:24
    scala> rdd2.reduce((x,y) => (x._1 + y._1, x._2+y._2))
    res86: (String, Int) = (caad,12)

    2 collect()案例  collect不要随意用,少用

    1. 作用:在驱动程序中,以数组的形式返回数据集的所有元素。

    2. 需求:创建一个RDD,并将RDD内容收集到Driver端打印

    scala> val rdd = sc.parallelize(1 to 10)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[87] at parallelize at <console>:24
    
    scala> rdd.collect
    res87: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

    3 count()案例

    1. 作用:返回RDD中元素的个数

    2. 需求:创建一个RDD,统计该RDD的条数

    scala> rdd.count
    res88: Long = 10

    4 first()案例 --底层调用了take

    1. 作用:返回RDD中的第一个元素

    2. 需求:创建一个RDD,返回该RDD中的第一个元素

    scala> rdd.first
    res89: Int = 1

    5 take(n)案例 take 和collect一样都是把数据弄到driver内存里,慎用

    1. 作用:返回一个由RDD的前n个元素组成的数组

    2. 需求:创建一个RDD,统计该RDD的条数

    scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at parallelize at <console>:24
    
    scala> rdd.take(3)
    res91: Array[Int] = Array(2, 5, 4)

    6 takeOrdered(n)案例

    1. 作用:返回该RDD排序后的前n个元素组成的数组

    2. 需求:创建一个RDD,统计该RDD的条数

    scala> rdd.takeOrdered(3)
    res92: Array[Int] = Array(2, 3, 4)
    
    scala> rdd.takeOrdered(6)
    res93: Array[Int] = Array(2, 3, 4, 5, 6, 8)

    7 aggregate案例

    1. 参数:(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)

    2. 作用:aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。

    3. 需求:创建一个RDD,将所有元素相加得到结果

    aggregate()分区内部元素一个初始值()分区内部元素初始值进行合并()分区之间,同种k的value合并
    aggregate(0)(_+_, _+_)求和

    scala> var rdd1 = sc.makeRDD(1 to 10,2)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[92] at makeRDD at <console>:24
    scala> rdd1.glom.collect
    res99: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5), Array(6, 7, 8, 9, 10))
    scala> rdd1.aggregate(0)(_ max _, _+_)
    res97: Int = 15
    scala> rdd1.aggregate(0)(_+_, _+_)
    res101: Int = 55

       aggregateByKey和aggregate的区别和联系:①参数个数一样; ②分区内部和分区之间聚合对象不一样;
         aggregateByKey:对同种key的值;  aggregate是对rdd的元素

    8 fold(num)(func)案例

    1. 作用:折叠操作,aggregate的简化操作,seqop和combop一样。

    2. 需求:创建一个RDD,将所有元素相加得到结果

    scala> rdd1.fold(0)(_+_)
    res102: Int = 55
    
    scala> rdd1.fold(0)(_ max _)
    res103: Int = 10   

    scala中x.reduce(_+_)

    9 saveAsTextFile(path)

    作用:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

    scala> rdd.saveAsTextFile("./textFile.txt")

    10 saveAsSequenceFile(path) 

    作用:将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。

    11 saveAsObjectFile(path) 

    作用:用于将RDD中的元素序列化成对象,存储到文件中。

    12 countByKey()案例

    1. 作用:针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。

    2. 需求:创建一个PairRDD,统计每种key的个数

    scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
    rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[96] at parallelize at <console>:24
    scala> rdd.glom.collect
    res107: Array[Array[(Int, Int)]] = Array(Array((1,3), (1,2)), Array((1,4), (2,3)), Array((3,6), (3,8)))
    scala> rdd.countByKey
    res106: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)

    13 foreach(func)案例

    1. 作用:在数据集的每一个元素上,运行函数func进行更新。

    2. 需求:创建一个RDD,对每个元素进行打印

    scala> var rdd = sc.makeRDD(1 to 5,2)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[100] at makeRDD at <console>:24
    scala> rdd.foreach(println(_))
    1
    2
    3
    4
    5
    def foreachPartition(f: Iterator[T] => Unit): Unit = withScope
    可以控制它的链接数,(可写数据库连接池等);每个记录创建一次;(跟map和mapPartition类似)
    rdd.foreachPartition(x => {
          x.foreach(println(_))
        })





  • 相关阅读:
    spark SQL之 DataFrame和DataSet
    scala之 保留小数
    spark之 避免数据倾斜之 给名字分区(百家姓)
    hive之 'client_protocol' is unset!
    hive之报错:ls:cannot access '/usr/local/spark/lib/spark-assembly-*.jar':No such file or directory
    hive之 Error: Duplicate key name 'PCS_STATS_IDX' (state=42000,code=1061) ----Hive schematool -initSchema
    Maven中需要注意的点
    spark之 Idea操作
    scala之 一句话打印三角形
    scala 之 BaseDao
  • 原文地址:https://www.cnblogs.com/shengyang17/p/10658549.html
Copyright © 2011-2022 走看看