zoukankan      html  css  js  c++  java
  • mapPartitions

    mapPartitions操作与 map类似,只不过映射的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器,如果映射过程需要频繁创建额外的对象,使用mapPartitions操作要比map操作效率高效许多。比如将RDD中的所有数据通过JDBC链接写入数据库,如果使用map函数,可能要为每个元素创建一个connection,开销很大。如果使用mapPartitions,那么只需要针对一个分区建立connection.

    Scala中的yield的主要作用是记住每次迭代中的有关值,并逐一存入到一个数组中。

    for {子句} yield {变量或表达式}

    scala> val numrdd=sc.makeRDD(1 to 10,3)

    numrdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[51] at makeRDD at <console>:25

    scala> def sumn(iter:Iterator[Int])={val aa=for(i<-iter) yield i*2;aa.toIterator}

    sumn: (iter: Iterator[Int])Iterator[Int]

    scala> numrdd.mapPartitions(sumn).collect

    res49: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

    -----------------------------------------------

    分区中的数值求和


    scala> val numRDD=sc.makeRDD(1 to 10,3)
    numRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[210] at makeRDD at <console>:25

    scala> numRDD.mapPartitions(x=>{val result=List(); var i=0;while(x.hasNext){i+=x.next()};result.::(i).toIterator}).collect
    res136: Array[Int] = Array(6, 15, 34)

    scala> numRDD.mapPartitions(x=>{

    val result=List();

    var i=0;

    while(x.hasNext)

    {

    i+=x.next()

    };

    result.::(i).toIterator

    }

    ).collect
    res136: Array[Int] = Array(6, 15, 34)

    -------------------------------------------------------------

    scala> val numRDD=sc.makeRDD(1 to 10,3)

    numRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at makeRDD at <console>:24

    scala> def partionsum(iter:Iterator[Int])={var result=List[Int]();var i:Int= 0;while(iter.hasNext){var n:Int=iter.next; i += n;} ;result.::(i).toIterator}
    partionsum: (iter: Iterator[Int])Iterator[Int]

    scala> def partionsum(iter:Iterator[Int])={

    var result=List[Int]();

    var i:Int= 0;

    while(iter.hasNext){

    var n:Int=iter.next;

    i += n;

    } ;

    result.::(i).toIterator

    }
    partionsum: (iter: Iterator[Int])Iterator[Int]

    scala> numRDD.mapPartitions(partionsum).collect

    res7: Array[Int] = Array(6, 15, 34)

     --------------------------------------

    分区内的数值进行求和,并展示分区号

    scala> val numRDD=sc.makeRDD(1 to 10,3)

    numRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at makeRDD at <console>:24

    scala> numRDD.mapPartitionsWithIndex((x,iter)=>{val result=List(); var i=0;while(iter.hasNext){i+=iter.next()};result.::(x+"|"+i).toIterator}).collect
    res138: Array[String] = Array(0|6, 1|15, 2|34)

    scala> numRDD.mapPartitionsWithIndex((x,iter)=>{

    val result=List();

    var i=0;

    while(iter.hasNext){

    i+=iter.next()

    };

    result.::(x+"|"+i).toIterator

    }).collect

    res138: Array[String] = Array(0|6, 1|15, 2|34)

    ------------------------------

    scala> val numRDD=sc.makeRDD(1 to 10,3)

    numRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at makeRDD at <console>:24

    scala> def partionwithindexsum(x:Int,iter:Iterator[Int])={var result=List[Int]();var i:Int= 0;while(iter.hasNext){var n:Int=iter.next; i += n;} ;result.::(x+"|"+i).toIterator} partionwithindexsum: (x: Int, iter: Iterator[Int])Iterator[Any]

    scala> def partionwithindexsum(x:Int,iter:Iterator[Int])={

    var result=List[Int]();

    var i:Int= 0;

    while(iter.hasNext){

    var n:Int=iter.next;

    i += n;

    } ;

    result.::(x+"|"+i).toIterator

    }

    partionwithindexsum: (x: Int, iter: Iterator[Int])Iterator[Any]

    scala> numRDD.mapPartitionsWithIndex(partionwithindexsum).collect

    res9: Array[Any] = Array(0|6, 1|15, 2|34)

    ----------------------

    统计每个分区的元素数

    scala> val numRDD=sc.makeRDD(1 to 10,3)

    numRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at makeRDD at <console>:24

    scala> def partionwithindexlength(x:Int,iter:Iterator[Int])={var result=List[Int]();var i:Int= iter.toList.length;result.::(x+"|"+i).toIterator}

    partionwithindexlength: (x: Int, iter: Iterator[Int])Iterator[Any]

    scala> def partionwithindexlength(x:Int,iter:Iterator[Int])={

    var result=List[Int]();

    var i:Int= iter.toList.length;

    result.::(x+"|"+i).toIterator

    }

    partionwithindexlength: (x: Int, iter: Iterator[Int])Iterator[Any]

    scala> numRDD.mapPartitionsWithIndex(partionwithindexlength).collect

    res10: Array[Any] = Array(0|3, 1|3, 2|4)

  • 相关阅读:
    相对路径与绝对路径问题
    javaee自定义servlet的步骤
    Struts1.X与Spring集成——另外一种方案
    菜鸟也能学cocos2dx3.0 浅析刀塔传奇(下)
    JAVA之了解类载入器Classloader
    IOS 编程中引用第三方的方类库的方法及常见问题
    通过eclipse的egit插件提交提示Auth fail
    定时器0的方式1 定时器1的方式1 数码管和led
    MongoDB入门学习(四):MongoDB的索引
    J2EE--JDBC
  • 原文地址:https://www.cnblogs.com/playforever/p/9450531.html
Copyright © 2011-2022 走看看