zoukankan      html  css  js  c++  java
  • [Spark]-RDD详解之变量&操作

    RDD的操作

      1.1 概述

          RDD整体包含两大类操作

          transformation 从现有中创建一个新的数据集

          action 在对数据集做一定程度的计算后将结果返回

        对于所有的transformation,都是Lazy的,也就是说它不会立即执行,只是单纯的记住怎么样从原来的数据集进行转换的逻辑而已,它仅在某一个计算需要的情况下,才会被真正执行.

        因为transformation 的Lazy性,RDD支持在每次计算时都进行重新计算,当然你可以将这个RDD保存下来 (persist  or cache方法)避免每次重计算

        可以通过设置不同的存储级别,将数据保存到硬盘,内存,或者选择同步多个副本到多个节点中.

        1.2 集群环境下的变量与操作

        集群环境,所有操作最终会交给executors去执行.而变量,会以数据副本的形式交给executors.很多时候,这与我们非集群环境下的开发思维有非常大的不同.

        1.2.1 集群下的闭包

            RDD是支持闭包操作的.但务必注意的是Spark不保证对闭包之外的对象引用进行的变化.

            原因是闭包的会被序列化发生给每一个executor,对于闭包的之外的对象引用会拷贝一个副本给executor.这时多个executor执行至少是跨JVM的

            这时对这个副本对象的变更没有任何意义,因为每个JVM(executor)的副本都是独立的.

        1.2.2 集群下的print

          集群环境下,print不会在driver端有任何输出.

          原因也是一样,print最终是在每个executor执行,其输出也是在每个executor的stdout上,在driver端,是不会有这些输出的.

          如果想在driver输出,一个比较简单的办法是调用collect()将结果发送到driver端在进行print,但这样可能会造成driver内存爆掉(所有executor的数据涌入).

            比较推荐的做法是rdd.take(100).foreach(println)

         1.2.3 共享变量

          因为集群下,变量只会以副本方式交给executor,而不会将变量的值变化传回driver,所以一个可读写共享的变量是非常有用的.

          Spark提供了两种共享变量 broadcast(广播变量) 和 accumulators(累加器)

          1.2.3.1 广播变量(broadcast)

            广播变量允许将一个只读变量的副本发送到每个机器上(executor机器),而不是对每一个任务发送一个副本.这样在同一机器上的多个任务,就可以反复使用这个变量了.      

            注意:

              广播变量只会对每个节点分发一次,所以一般来说,广播变量不应该再被修改了.以保证每个广播变量的副本的值都是一致的

              如果广播变量被修改,则需要将广播变量重新分发

            另:

              举个例子:Spark的action操作本身是通过一系列的stage来完成的,这些Stage是通过分布式的shuffle操作来进行切分的.而在每个Stage之间,Spark自动使用广播变量.

              这里用法说明,只有数据本身会在多个Stage的多个任务中反复使用,或者说缓存这个数据是非常重要且非常必要的情况下,使用广播变量才有意义.

            广播变量的使用如下:      

              // SparkContext.broadcast(v)进行创建,返回的是广播变量的封装器,以.value读取广播变量的的值
              val broadcastVar = sc.broadcast(Array(1, 2, 3))
              val v = broadcastVar.value

          1.2.3.2 累加器(accumulators) 

            累加器变量仅支持累加操作,因为可以在并行计算执行一些特殊的计算(比计数或者求和).并且累加器的变化是可以在UI的Task界面上看见的(注,不支持Python)

            累加器操作,依然遵循RDD的Lazy原则:

              累加器更新操作是在Action中,并且在每个任务中只会执行一次(如果任务失败重启了,累加器更新不会执行)

              而在transformation中,累加器依然不会立即执行更新,如果transformation被重新执行了,则累加器操作会重复执行

            对于累加器变量,Spark原生支持数值类型.一个使用例子如下        

              val accum = sc.longAccumulator("My Accumulator")
              sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
              println(accum.value)

             也可以创建继承AccumulatorV2的类型,来实现自定义类型的累加器,例子如下:          

              //两个泛型参数->累加的元素类型和结果类型可以不同的
              class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
    
                private val myVector: MyVector = MyVector.createZeroVector
    
                def reset(): Unit = {
                  myVector.reset()
                }
    
                def add(v: MyVector): Unit = {
                  myVector.add(v)
                }
                ...
              }
    
              // 创建一个自定义的累加器类型:
              val myVectorAcc = new VectorAccumulatorV2
              //将这个触发器注册到SparkContext:
              sc.register(myVectorAcc, "MyVectorAcc1")

      1.3 RDD的一些基本操作

        1.3.1 Transformations 依赖关系

          RDD是由父RDD经过转换函数形成一个个子RDD(子RDD依赖父RDD).针对不同的转换函数,以父RDD分区与子RDD分区的关系为标准,Spark将这些依赖关系分为两类.

          窄依赖

            窄依赖是指转换后,父RDD的每个分区只会被某一个子RDD分区使用.(一对一或者多对一的关系).

            所以窄依赖一般出现在map,filter等子分区沿用父分区,不会发生重分区的时候.

            宽依赖

            宽依赖是指转换后,父RDD的某个或某些分区会被几个子RDD分区使用.(某个分区数据部分在这个RDD,部分在那个RDD,一对多关系)

            宽依赖一般出现在groupByKey等子分区一定会发生重分区的时候

          两种依赖关系的对比

            一般来说,窄依赖比宽依赖对执行优化更加有利

            i).窄依赖允许集群节点上以流水线的形式直接计算所有分区

               宽依赖则需要先计算好父分区的分区信息,然后再以一个shuffle完成重分区,

              ii).某个子分区异常需要重计算时,会对这个子分区所依赖的所有分区进行计算.(这是宽窄依赖都必须的),但是针对分区数据而言

              窄依赖,一个或多个父分区完全对应一个子分区.对这些父分区的重计算,利用率是100%

              宽依赖,父分区的数据不完全对应一个子分区(一对多关系,父分区的某些部分是其它分区的),但此时依然需要重计算父分区全部数据,造成计算浪费(因为白计算其它分区的数据)

        1.3.2 Transformations 操作

          map

            对RDD中的元素执行一个指定函数,将执行结果作为新元素产生一个新的RDD.

              与其它map系函数区别,map新元素的完全是Map函数的执行结果返回,所以新RDD的数量与老RDD是一一对应的.        

            val rdd = sc.parallelize(Seq("aa bb","cc dd","ee ff"),2)
            rdd.map(rec=>rec.split(" ")>).collect().map(println(_))
            //返回结果是rec.split(" ")结果(一维数组)=>[["aa","bb"],["cc","dd"]]

          flatMap 

            与map相同,但结果会扁平化.即如果结果是迭代器容器的,会将元素从容器中取出再返回       

            val rdd = sc.parallelize(Seq("aa bb","cc"),2)
             rdd.flatMap(rec=>rec.split(" ")).collect().map(println(_));
             //返回结果["aa","bb","cc"]
    
            //flatMap如以下这种方式使用是不行,flatMap返回结果必须是TraversableOnce[U](可迭代一次的类型)
            //rdd.flatMap(rec=>(rec,1)).collect().map(println(_));     

          mapPartitions

            与map相同,不过是以分区为单位,所以语法要求必须为 f: Iterator[T] => Iterator[U],注意返回结果不是以分区为单位,而是所有分区执行函数的结果的合并      

            val rdd = sc.parallelize(Seq("aa bb","cc dd","ee ff"),2)
            rdd.mapPartitions(part=>part.map(rec=>rec.split(" "))).collect().map(println(_))
            //结果是 [["aa","bb"],["cc","dd"],["ee","ff"]]

          mapPartitionsWithIndex

            与mapPartitions类似,不过它带有分区的index以供使用.所以语法要求为f: (Int, Iterator[T]) => Iterator[U]        

            val rdd = sc.parallelize(Seq("aa bb","cc dd","ee ff"),2)
            rdd.mapPartitionsWithIndex((partIdx,part)=>part.map(rec=>(partIdx,rec))).collect().map(println(_))
            //返回结果 (0,aa bb),(1,cc dd),(1,ee ff)

          sample

           抽样函数.可以从数据集中按一定比例抽取部分数据,抽取之后可以选择是否返回

           语法要求 withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong         

          val rdd = sc.parallelize(1 to 50)
           rdd.sample(false,0.2,System.currentTimeMillis).map(rec=>(rec,1)).collect().map(print(_)+" ")
           //返回结果 (8,1)(21,1)(26,1)(27,1)(34,1)(43,1)(46,1)(49,1)

          union      

          将两个数据集合并(包含数据重复)                 

          val rdd = sc.parallelize(1 to 10)
          val rdd2 = sc.parallelize(11 to 20)
          rdd.union(rdd2).map(rec=>rec.toString).collect().map(rec=>print(s"${rec} "))
          //返回结果 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20

         intersection

          将两个数据集合并,取交集作为结果返回

          val rdd = sc.parallelize(1 to 10)
          val rdd2 = sc.parallelize(5 to 15)
          rdd.intersection(rdd2).map(rec=>rec.toString).collect().map(rec=>print(s"${rec} "))
         //返回结果 6 7 9 8 10 5

           distinct      

          对当前结果集去重返回            

            val rdd = sc.parallelize(1 to 10)
            val rdd2 = sc.parallelize(5 to 15)
            rdd.union(rdd2).distinct().map(rec=>rec.toString).collect().map(rec=>print(s"${rec} "))
            //返回结果 4 14 6 8 12 10 2 13 15 11 1 3 7 9 5

        groupByKey

          将一个键值对类型的结果集按照key进行分组(如果是为分组聚合,groupByKey相比reduceByKey效率更低,因为少一个map-shuffer的combine)  

           val rdd = sc.parallelize(Seq("aa bb","cc dd","bb cc"),2)
           rdd.flatMap(rec=>rec.split(" ")).map(rec=>(rec,1)).groupByKey().collect().map(rec=>print(s" ${rec._1} ${rec._2.sum} |"));
           //返回结果 aa 1 | dd 1 | bb 2 | cc 2 |

        reduceByKey

          将一个键值对类型数据集,使用指定的函数分组聚合为另一个键值对类型数据集,(相比groupByKey性能更高,因为可以在map-shuffer进行combine减少数据量)    

          val rdd = sc.parallelize(Seq("aa bb","cc dd","bb cc"),2)
          rdd.flatMap(rec=>rec.split(" ")).map(rec=>(rec,1)).reduceByKey((value1,value2)=>value1+value2).collect().map(rec=>print(s"${rec} "))
          //返回结果 (aa,1) (dd,1) (bb,2) (cc,2)

         aggregate

           给出一个默认基准值,先使用seqOp遍历分区内元素传入基准值进行聚合,再对分区间结果使用combOp聚合为最后结果.

            注意aggregate返回的结果直接是聚合结果(不是RDD),并且要与原RDD的类型一致     

          val rdd = sc.parallelize(1 to 10);
          /**
            * zeroValue:预定义一个初始值 (0,0)
            * seqOp: (U, T) => U  分区内元素聚合,遍历元素传入基准值执行函数.(类似Map-Shuffle)
            *   U:当前基准值,T:当前元素
            *   执行的逻辑是 (基准值(默认初始值), 元素No.1) 执行seqOp ,结果再作为基准值,执行(基准值(上步结果),元素No.2),以此类推
            * combOp: (U, U) => U 分区间聚合,将各分区执行seqOp函数的结果再使用combOp聚合 (类似Reduce-Shuffle)
            */
          val aggregateResult = rdd.aggregate((0,0))(
            seqOp=(sv,tv)=>(sv._1+tv,sv._2+1),
            combOp=(v1,v2)=>(v1._1+v2._1,v2._2+v2._2)
          )
          println(aggregateResult)
          //输出结果 (55,10) (1-10的总和,1-10的个数) <=非RDD结果,并且类型必须是Int

          aggregateByKey

             与aggregate类似,但针对的是key分组,aggregateBykey是以key组为单位,对分组内元素遍历使用seqOp,再使用combOp聚合分组内       

            val rdd = sc.parallelize(Seq("a b c", "b c d"));
            rdd.flatMap(rec => rec.split(" ")).map(rec => (rec, 1))
               .aggregateByKey(0)(
                  seqOp = (sv, tv) => (sv + tv),
                  combOp = (v1, v2) => (v1 + v2)
                )
               .collect().map(rec => print(s"${rec} |"))
            //输出结果 (d,1) |(b,2) |(a,1) |(c,2) |

          sortByKey

            将一个键值对RDD按key排序转换为另一个RDD

          join

            将两个键值对RDD((K, V),(K, W)),按Key合并为一个RDD(K, (V, W)) .(Spark同时还提供 leftOuterJoin,rightOuterJoin,fullOuterJoin)        

           val rdd = sc.parallelize(Seq("a b")).flatMap(rec => rec.split(" ")).map(rec => (rec, rec));
           val rdd2 = sc.parallelize(Seq("b c")).flatMap(rec => rec.split(" ")).map(rec => (rec, rec));
    
           rdd.join(rdd2).collect().map(rec => print(s"${rec} |"))
              //两个RDD交集 (b,(b,b))
              rdd.leftOuterJoin(rdd2).collect().map(rec => print(s"${rec} |"))
              //leftOuterJoin左边全数据,右边Opt (b,(b,Some(b))) |(a,(a,None)) |
              rdd.rightOuterJoin(rdd2).collect().map(rec => print(s"${rec} |"))
              //rightOuterJoin右边全数据,左边Opt (b,(Some(b),b)) |(c,(None,c)) |
              rdd.fullOuterJoin(rdd2).collect().map(rec => print(s"${rec} |"))
              //笛卡尔乘积,Opt (b,(Some(b),Some(b))) |(a,(Some(a),None)) |(c,(None,Some(c))) |

          cogroup

            将多个键值对RDD按Key合并在一起.合并为全数据(没有丢失)

            与fullOuterJoin区别在与多个RDD情况下,cogroup按key合并为一个,fullOuterJoin为多个的笛卡尔积

            注意,如果某个数据集少某一个key,合并时是在这个数据集的位置上占CompactBuffer()的位置,而不是直接跳过        

            val rdd = sc.parallelize(Seq("a b")).flatMap(rec => rec.split(" ")).map(rec => (rec, rec));
            val rdd2 = sc.parallelize(Seq("b c")).flatMap(rec => rec.split(" ")).map(rec => (rec, rec));
            rdd.cogroup(rdd2).collect().map(rec => print(s"${rec} |"))
            //(b,(CompactBuffer(b),CompactBuffer(b))) |(a,(CompactBuffer(a),CompactBuffer())) |(c,(CompactBuffer(),CompactBuffer(c))) |

          cartesian

            返回两个RDD的笛卡尔积结果

          pipe

            使用Shell的语法操作RDD

          coalesce

            重新调整RDD的分区后形成一个新的RDD.语法要求:numPartitions: Int, shuffle: Boolean = false.

              numPartitions表示要重新调整的分区数量,shuffle表示重新调整分区时是否允许发生shuffle过程.

            如果子分区数往下减少,则子分区数设置一定会成功.但要注意,在这种情况下会造成任务的并行度降低(分区数,任务数降了),任务内存更容易爆出(单个任务的数据增大了)

            如果子分区数往上增加,则子分区数设置必须要设置shuffle=true,才会成功,否则子分区依然等于父分区

            谨记:如果没有shuffle的参与,RDD只能减少分区(窄依赖),不能增加分区

          repartition      

            只是coalesce的shuffle等于true的快捷方式. coalesce(numPartitions, shuffle = true)

                          repartitionAndSortWithinPartitions

        1.3.2 Action 操作

          reduce

            RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素组成两个元素,再被传给输入函数,直到最后只有一个值为止

            与reduceByKey的区别是  reduceByKey是一个转换操作,返回的是RDD, reduce是一个action操作,返回的是数据结果                     

            val rdd = sc.parallelize(1 to 100,2);
            val value = rdd.reduce((v1,v2)=>v1+v2)
            println(value)
            //输出结果 5050

          collect

            将一个RDD的所有元素以数组的形式发回driver端.注意这个RDD必须是足够小的数据集,否则很容易将driver端的内存撑爆

          count

            返回一个RDD的元素的个数

          first

            返回一个RDD的第一个元素

          take(n)

            返回一个RDD的前N个元素

          takeSample(withReplacementnum, [seed])

            返回一个RDD的百分比抽样(withReplacement标识元素是否放回RDD以供多次使用)

               takeOrdered(n[ordering])

            返回一个RDD按照设定的排序规则后的前N个元素

          countByKey

            只支持键值对类型,返回一个RDD的按照Key分组后的每组计数

          saveAsTextFile(path)

            将一个RDD的全部元素写入一个文本方式的本地文件,HDFS或其它任何Hadoop支持的存储系统中.(每行等于每个元素调用toString()的结果)

          saveAsSequenceFile(path)

            将一个RDD的全部元素写入一个二进制方式的本地文件,HDFS或其它任何Hadoop支持的存储系统中.

            在Scala中,它还可以用于隐式转换为可写类型的类型(Spark包含对基本类型的转换,如Int、Double、String等)。

          saveAsObjectFile(path)

            将一个RDD的全部元素使用Java序列化以简单的格式编写数据集的元素(可以使用SparkContext.objectFile()加载这些元素)。

          foreach(func)

            在数据集的每个元素上运行函数func。这通常用于处理副作用,如更新累加器或与外部存储系统交互

            注意:不可以修改foreach()之外的累加器之外的变量,见前面集群下的变量与闭包一节

      1.4 Shuffle过程

        Spark的某些操作,会引起一个Shuffle过程.Shuffle是指不同节点上的不同分区数据整合重新分区分组的机制.

        所以Shuffle是一个代价很高的操作,因为它会导致executor和不同的机器节点之间进行数据复制.

        1.4.1 Shuffle简述

          以reduceByKey为例,将原始数据中key相同的记录聚合为一个组.这里挑战是原始数据很可能是存在不同分区不同机器的(参考MapReduce执行过程)

          Spark-Shuffle与MapReduce-Shuffle的区别

            MapReduce-Shuffle结果是分区有序,分区内再按Key排序

            Spark-Shuffle结果是分区有序,但分区内Key无序.

              要对Spark-Shuffle的分区内再排序,有以下方法:

               mapPartitions 在已有的每个分区上再使用.sort排序

               repartitionAndSortWithinPartitions  重建分区,并排序

               sortBy提前对RDD本身做一个全范围排序

        1.4.2 RDD中引起Shuffle的操作

           repartition类操作 例如:repartitioncoalesce

           _ByKey操作(除了counting相关操作)例如:groupByKeyreduceByKey

           join 例如:cogroupjoin

          1.4.3 Shuffle的性能影响

          Shuffle本身是同时高耗内存,高耗磁盘IO,高耗网络IO的昂贵操作.

            Spark会启动一系列的MapReduce(Hadoop MapReduce),产生大量的数据缓冲区与归并排序,大量的pill文件与归并Merge等等

  • 相关阅读:
    专利质检助手
    商务代表 销售 区别
    javax.servlet.jsp.tagext.TagAttributeInfo.<init> tomcat-embed-core-8.0.35
    poli-java开源BI软件
    QFLOW ECM软件 政府机构 自动化工作流程 文件管理
    Devops Tools
    吴军博士的新书《见识》
    轻流 CEO 薄智元 BPM (SaaS aPaaS) 低(无)代码平台 乐高积木
    北京 知识产权 交易中心
    Java高并发秒杀API系列
  • 原文地址:https://www.cnblogs.com/NightPxy/p/9245707.html
Copyright © 2011-2022 走看看