zoukankan      html  css  js  c++  java
  • RDD 算子补充

    一、RDD算子补充

         1、mapPartitions
             mapPartitions的输入函数作用于每个分区, 也就是把每个分区中的内容作为整体来处理。   (map是把每一行)

             mapPartitions一次处理一个分区的所有数据,而map算子一次处理分区中的一条数据,所以mapPartitions处理数据的速度比map快,如果RDD分区的数据很庞大,用mapPartitions容易造成内存溢出,

             如果RDD分区数据量小,从而提升速度的角度考虑,可以使用mapPartitions算子。

             JAVA实现:

             scala实现:

              
         2、mapPartitionsWithIndex

                      mapPartitionsWithIndex方法与mapPartitions方法功能类似   

                     不同的是mapPartitionsWithIndex还会对原始分区的索引进行 追踪, 这样就能知道分区所对应的元素 。  方法的参数为一个函数, 函数的输入为整型索引和迭代器。

                    JAVA实现:

    public static void mapPartitionsWithIndex(){
        	SparkConf conf=new SparkConf().setAppName("mapPartitionsWithIndex").setMaster("local");
        	JavaSparkContext sc=new JavaSparkContext(conf);
        	//模拟数据并创建初始RDD
        	JavaRDD<String> datas = sc.parallelize(Arrays.asList("张三","李四","李思","李斯"),2);
        	JavaRDD<String> result = datas.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {
    
    			/**
    			 * 
    			 */
    			private static final long serialVersionUID = 1L;
    
    			public Iterator<String> call(Integer index, Iterator<String> datass) throws Exception {
    				// TODO Auto-generated method stub
    				ArrayList<String> arrayList = new ArrayList<String>();
    				while(datass.hasNext()){
    					String  info="第"+index+"分区的数据:"+datass.next();
    					arrayList.add(info);
    				}
    				return arrayList.iterator();
    			}
    		}, false);
        	
        	List<String> collect = result.collect();
        	for(String a:collect){
        		System.out.println(a);
        	}
        	
        	
        	sc.close();
        }  

                    scala实现:

    def mapPartitionsWithIndex: Unit ={
        val conf=new SparkConf().setAppName("cogroup").setMaster("local")
        val sc=new SparkContext(conf)
        val a=sc.parallelize(Array("liu","zhao","wang","li"),2)
        val result=a.mapPartitionsWithIndex((x,y)=>{
          val arraybuffer=ArrayBuffer[String]()
          while(y.hasNext){
            val info=x+" "+y.next()
            arraybuffer+=info
          }
          arraybuffer.iterator
        })
        result.foreach(x=>{
          println(x)
        })
        sc.stop()
      }
    

      

          3、mapValues运算

              可以针对RDD内每一组(key,value)进行运算, 并且产生另外一个RDD
               例如: 将每一组( key,value) 的value进行平方运算
               kvRDD1.mapValues(x => x*x).collect

          4、union
                   union方法( 等价于++) 是将两个RDD取并集, 取并集的过程中不 会把相同元素去掉。  union操作是输入分区与输出分区多对一模式。 

            scala实现:             

    def union: Unit ={
        val conf=new SparkConf().setAppName("union").setMaster("local")
        val sc=new SparkContext(conf)
        val rdd1=sc.parallelize(Array((1,2),(2,3),(3,4)))
        val rdd2=sc.parallelize(Array((1,2),(4,3),(5,4)))
        val result=rdd1.union(rdd2)
        result.foreach(x=>{
          println(x._1+" "+x._2)
        })
        sc.stop()
      }
    结果:

    1 2
    2 3
    3 4

    1 2
    4 3
    5 4

      5、distinct
              distinct方法是将RDD中重复的元素去掉, 只留下唯一的RDD元素。

          6、intersection交集运算
               intersection方法可以获取两个RDD中相同的数据
         

    def intersection: Unit ={
        val conf=new SparkConf().setAppName("intersection").setMaster("local")
        val sc=new SparkContext(conf)
        val rdd1=sc.parallelize(Array((1,2),(2,3),(3,4)))
        val rdd2=sc.parallelize(Array((1,2),(4,3),(5,4)))
        val result=rdd1.intersection(rdd2).foreach(x=>{
          println(x._1+" "+x._2)})
        sc.stop()
      }  

          7、subtract差集运算
              intRDD1.subtract(intRDD2).collect()
              intRDD1List(3,1,2,5,5), 扣除intRDD2 List(5,6)重复的部分5, 所 以结果是(1,2,3) 
             

    //把rdd1 rdd2中的相同项,从rdd1中去除掉
    def subtract: Unit ={ val conf=new SparkConf().setAppName("intersection").setMaster("local") val sc=new SparkContext(conf) val rdd1=sc.parallelize(Array((1,2),(2,3),(3,4))) val rdd2=sc.parallelize(Array((1,2),(4,3),(5,4))) rdd1.subtract(rdd2).foreach(x=>{ println(x._1+" "+x._2)}) sc.stop() }

      8、aggregateByKey
             reduceByKey认为是aggregateByKey的简化版
             aggregateByKey最重要的一点是, 多提供了一个函数, Seq Function

             可以控制如何对每个partition中的数据进行先聚合, 类似于mapreduce中的map-side combine, 然后才是对所有partition中的数据进行全局聚合
             aggregateByKey, 分为三个参数:
                        第一个参数是, 每个key的初始值
                        第二个是个函数, Seq Function, 如何进行shuffle map-side的本地聚合
                        第三个是个函数, Combiner Function, 如何进行shuffle reduce-side的全局聚合

          Java实现:

          scala实现:        

     
    //在调用aggregateByKey算子时,第二、三两个参数时(这两个参数传入的是函数),运用柯里化的方式,不需要给函数传参
    def aggregateByKey: Unit ={
        val conf=new SparkConf().setAppName("aggregateByKey").setMaster("local")
        val sc=new SparkContext(conf)
        val lines=sc.textFile("file:///home/hadoop/product.txt")
        def seq(num1:Int,num2:Int):Int={
          num1+num2
        }
        def comb(num1:Int,num2:Int):Int={
          num1+num2
        }
        lines.map(x=>(x,1)).aggregateByKey(0)(seq,comb).foreach(x=>{
          println(x._1+" "+x._2)
        })
      }
    

      9、cartesian
              cartesian, 中文名笛卡尔乘积
             比如说两个RDD, 分别有10条数据, 用了cartesian算子以后,两个RDD的每一条数据都会和另外一个RDD的每一条数据执行一次join,最终组成了一个笛卡尔乘积 

     def cartesian: Unit ={
        val conf=new SparkConf().setAppName("cartesian").setMaster("local")
        val sc=new SparkContext(conf)
        val rdd1=sc.parallelize(Array((1,2),(2,3),(3,4)))
        val rdd2=sc.parallelize(Array((1,2),(4,3),(5,4)))
        rdd1.cartesian(rdd2).foreach(x=>{
          println(x._1+" "+x._2)
        })
      }

    (1,2) (1,2)
    (1,2) (4,3)
    (1,2) (5,4)
    (2,3) (1,2)
    (2,3) (4,3)
    (2,3) (5,4)
    (3,4) (1,2)
    (3,4) (4,3)
    (3,4) (5,4)

      10、coalesce
            coalesce算子, 功能是将RDDpartition缩减, 将一定量的数据压缩到更少的partition中去。建议的使用场景, 配合filter算子使用
           使用filter算子过滤掉很多数据以后, 比如30%的数据, 出现了很多partition中的数据不均匀的情况

           此时建议使用coalesce算子, 压缩rddpartition数量, 从而让各个partition中的数 据都更加的紧凑 
           缺点:只能减少分区数,不能增加分区数

         11、repartition
              repartition算子, 用于任意将rddpartition增多或者减少 

               与coalesce不同之处在于, coalesce仅仅能将rddpartition变少, 但是 repartition可以将rddpartiton变多
             一个很经典的场景, 使用Spark SQLhive中查询数据时  Spark SQL会根据hive对应的hdfs文件的block数量来决定加载出来的数据rdd 中有多少个partition,这里的partition数量, 是我们根本无法

    设置的

              有时候可能自动设置的partition数量过少, 导致我们后面的算子的运行特别慢    此时就可以在Spark SQL加载hive数据到rdd之后, 立即使用repartition算子, 将rddpartition数量变多 

             

    def repartition: Unit ={
        val conf=new SparkConf().setAppName("cartesian").setMaster("local")
        val sc=new SparkContext(conf)
        val a=sc.parallelize(Array("liu","zhao","wang","li","lll","aa"),3)
        val result=a.coalesce(2)//通过 coalesce减少分区
    // val result=a.repartition(4) 通过repartition增加 或者减少分区数 result.mapPartitionsWithIndex((x,y)=>{ val arr=ArrayBuffer[String]() while(y.hasNext){ val info=x+" "+y.next() arr+=info } arr.iterator }).foreach(x=>println(x)) }

      

    补充:1、map:一次处理分区中的一条数据

                 mapPartitions:一次处理分区中的所有数据

                 mapPartitionsWithIndex:一次处理分区中的所有数据 ,并且返回分区的索引,索引从0开始

                注意:如果RDD分区中的数据体量比较大,用mapPartitions或者mapPartitionsWithIndex进行计算,有可能出现内存溢出(OOM)

                        如果RDD分区数据体量比较小,此时为了提高数据计算的效率,可以使用mapPartitions或mapPartitionsWithIndex进行计算
                     

  • 相关阅读:
    Geoserver发布缓存切片(制定Gridsets)
    Oralce Spatial
    判断ArcSDE是否安装成功
    sqlserver操作geography方法
    ArcGIS Server 基于Token安全验证
    ArcGIS Server配置端口
    贝叶斯推断
    加密算法
    互联网协议认识
    yocto config mk.fs.ext4
  • 原文地址:https://www.cnblogs.com/liuwei6/p/6610814.html
Copyright © 2011-2022 走看看