zoukankan      html  css  js  c++  java
  • RDD的分区相关

    分区是rdd的一个属性,每个分区是一个迭代器

    分区器是决定数据数据如何分区 

    RDD划分成许多分区分布到集群的节点上,分区的多少涉及对这个RDD进行并行计算的粒度。用户可以获取分区数和设置分区数目,默认分区数为程序分配到的CPU核数。

    spark中,RDD计算是以分区为单位的,而且计算函数都是在对迭代器复合,不需要保存每次计算的结果。

    scala> val numrdd=sc.makeRDD(1 to 10,3)
    numrdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24

    scala> import org.apache.spark.TaskContext
    import org.apache.spark.TaskContext

    scala> numrdd.foreach(x=>{println(TaskContext.get.partitionId+"|"+x)})
    [Stage 0:>                                                          (0 + 0) / 3]2|7
    2|8
    2|9
    2|10
    0|1
    0|2
    0|3
    1|4
    1|5
    1|6
    scala> numrdd.foreach(x=>{println(TaskContext.getPartitionId+"|"+x)})
    1|4
    1|5
    1|6
    0|1
    0|2
    0|3
    2|7
    2|8
    2|9
    2|10

     -----------------------------------------------------------------------

    scala> val parRDD=sc.makeRDD(Array((100,"dog"),(100,"cat"),(200,"pear"),(100,"tiger"),(200,"apple"),(100,"lion"),(200,"banana"),(100,"elephent"),(300,"paper"),(300,"pen"),(200,"pig"),(300,"ballpen")))
    parRDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[40] at makeRDD at <console>:25

    scala> parRDD.partitions.length
    res41: Int = 4

    scala> parRDD.foreach(x=>{println(x+"|"+TaskContext.get.partitionId)})
    (100,elephent)|3
    (200,pear)|1
    (300,paper)|3
    (200,apple)|2
    (100,lion)|2
    (200,banana)|2
    (100,tiger)|1
    (100,dog)|0
    (100,cat)|0
    (300,pen)|4
    (200,pig)|4
    (300,ballpen)|4


    scala> parRDD.foreach(x=>{println(x+"|"+TaskContext.get.stageId)})
    (200,apple)|45
    (100,lion)|45
    (200,banana)|45
    (100,dog)|45
    (100,cat)|45
    (200,pear)|45
    (100,elephent)|45
    (300,paper)|45
    (100,tiger)|45
    (300,pen)|45
    (200,pig)|45
    (300,ballpen)|45

    scala> parRDD.foreach(x=>{println(x+"|"+TaskContext.get.taskAttemptId)})
    (200,apple)|190
    (100,lion)|190
    (200,banana)|190
    (100,elephent)|191
    (300,paper)|191
    (200,pear)|189
    (100,tiger)|189
    (100,dog)|188
    (100,cat)|188
    (300,pen)|192
    (200,pig)|192
    (300,ballpen)|192

    scala> parRDD.foreach(x=>{println(x+"|"+TaskContext.get.taskMetrics)})
    (100,dog)|org.apache.spark.executor.TaskMetrics@339a1fc
    (100,elephent)|org.apache.spark.executor.TaskMetrics@2c0eca15
    (200,pear)|org.apache.spark.executor.TaskMetrics@3850cb6d
    (200,apple)|org.apache.spark.executor.TaskMetrics@38090055
    (100,tiger)|org.apache.spark.executor.TaskMetrics@3850cb6d
    (100,cat)|org.apache.spark.executor.TaskMetrics@339a1fc
    (300,paper)|org.apache.spark.executor.TaskMetrics@2c0eca15
    (100,lion)|org.apache.spark.executor.TaskMetrics@38090055
    (200,banana)|org.apache.spark.executor.TaskMetrics@38090055
    (300,pen)|org.apache.spark.executor.TaskMetrics@125f9f17
    (200,pig)|org.apache.spark.executor.TaskMetrics@125f9f17
    (300,ballpen)|org.apache.spark.executor.TaskMetrics@125f9f17

    //查看每个分区的数据

    scala> def partitionValueWthID(id:Int,iter:Iterator[(Int,String)])=({var result=scala.collection.mutable.Map[Int,List[(Int,String)]](); while(iter.hasNext){var partid=id;if(result.contains(partid)){var elems=result(partid);elems::=iter.next;result(partid)=elems; } else result(partid)=List[(Int,String)]{iter.next}};result.toIterator})

    partitionValueWthID: (id: Int, iter: Iterator[(Int, String)])Iterator[(Int, List[(Int, String)])]

    scala> def partitionValueWthID(id:Int,iter:Iterator[(Int,String)])=

    (

    {

    var result=scala.collection.mutable.Map[Int,List[(Int,String)]]();

    while(iter.hasNext){

    var partid=id;

    if(result.contains(partid))  //如果分区ID的键存在,则调整键的值

    {

    var elems=result(partid);

    elems::=iter.next;

    result(partid)=elems;

    }

    else  //键值不存在,则直接赋值

    result(partid)=List[(Int,String)]{iter.next}

    };

    result.toIterator

    }

    )

    partitionValueWthID: (id: Int, iter: Iterator[(Int, String)])Iterator[(Int, List[(Int, String)])]

    scala> def partitionValueWthID(id:Int,iter:Iterator[(Int,String)])=({var result=scala.collection.mutable.Map[Int,List[(Int,String)]](); while(iter.hasNext){var partid=id;var elem=iter.next;if(result.contains(partid)){var elems=result(partid);elems::=elem;result(partid)=elems; } else result(partid)=List[(Int,String)]{elem}};result.toIterator})
    partitionValueWthID: (id: Int, iter: Iterator[(Int, String)])Iterator[(Int, List[(Int, String)])]

    scala> parRDD.mapPartitionsWithIndex(partitionValueWthID).collect

    scala> parRDD.mapPartitionsWithIndex(partitionValueWthID).collect
    res45: Array[(Int, List[(Int, String)])] = Array((0,List((100,cat), (100,dog))), (1,List((100,tiger), (200,pear))), (2,List((200,banana), (100,lion), (200,apple))), (3,List((300,paper), (100,elephent))), (4,List((300,ballpen), (200,pig), (300,pen))))

    或者


    scala> import org.apache.spark.TaskContext
    import org.apache.spark.TaskContext

    scala> parRDD.map(x=>(TaskContext.getPartitionId,x)).groupByKey().collect
    res44: Array[(Int, Iterable[(Int, String)])] = Array((0,CompactBuffer((100,dog), (100,cat))), (1,CompactBuffer((200,pear), (100,tiger))), (2,CompactBuffer((200,apple), (100,lion), (200,banana))), (3,CompactBuffer((100,elephent), (300,paper))), (4,CompactBuffer((300,pen), (200,pig), (300,ballpen))))

    -----------------------

    自定义分区

    scala> val parRDD=sc.makeRDD(Array((100,"dog"),(100,"cat"),(200,"pear"),(100,"tiger"),(200,"apple"),(100,"lion"),(200,"banana"),(100,"elephent"),(300,"paper"),(300,"pen"),(200,"pig"),(300,"ballpen")))
    parRDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[70] at makeRDD at <console>:27

    scala> def partitionValueWthID(id:Int,iter:Iterator[(Int,String)])=({var result=scala.collection.mutable.Map[Int,List[(Int,String)]](); while(iter.hasNext){var partid=id;var elem=iter.next;if(result.contains(partid)){var elems=result(partid);elems::=elem;result(partid)=elems; } else result(partid)=List[(Int,String)]{elem}};result.toIterator})
    partitionValueWthID: (id: Int, iter: Iterator[(Int, String)])Iterator[(Int, List[(Int, String)])]


    scala> class MyPartitioner extends org.apache.spark.Partitioner{
         |   override def numPartitions: Int = 2
         |   override def getPartition(key: Any): Int = {
         |     val k = key.toString.toInt
         |     if(k > 100){
         |       return 1
         |     }else{
         |       return 0
         |     }
         |   }
         | }
    defined class MyPartitioner

    scala> parRDD.partitionBy(new MyPartitioner).mapPartitionsWithIndex(partitionValueWthID).collect
    res25: Array[(Int, List[(Int, String)])] = Array((0,List((100,elephent), (100,lion), (100,tiger), (100,cat), (100,dog))), (1,List((300,ballpen), (200,pig), (300,pen), (300,paper), (200,banana), (200,apple), (200,pear))))

     ------------------------------------------------------

    scala> val arr=parRDD.keys.distinct.collect
    arr: Array[Int] = Array(100, 300, 200)

    scala> class MyPartitioner1(parts:Array[Int]) extends org.apache.spark.Partitioner{
         |   override def numPartitions: Int = parts.length+1
         |   val rules=new scala.collection.mutable.HashMap[Int,Int]()
         |   var i=1
         |   for(x<-parts)
         |    {
         |    rules+=(x->i)
         |    i+=1
         |   }
         |   override def getPartition(key: Any): Int = {
         |     val k = key.toString.toInt
         |     rules.getOrElse(k,0)
         |   }
         | }
    defined class MyPartitioner1

    class MyPartitioner1(parts:Array[Int]) extends org.apache.spark.Partitioner{
      override def numPartitions: Int = parts.length+1 //定义分区数

    //定义分区规则
      val rules=new scala.collection.mutable.HashMap[Int,Int]() 
      var i=1
      for(x<-parts)
       {
       rules+=(x->i)
       i+=1
      }

    //根据传输的key来确定该记录写入哪个分区
      override def getPartition(key: Any): Int = {
        val k = key.toString.toInt
        rules.getOrElse(k,0)
      }
    }

    scala> parRDD.partitionBy(new MyPartitioner1(arr)).mapPartitionsWithIndex(partitionValueWthID).collect
    res55: Array[(Int, List[(Int, String)])] = Array((1,List((100,elephent), (100,lion), (100,tiger), (100,cat), (100,dog))), (2,List((300,ballpen), (300,pen), (300,paper))), (3,List((200,pig), (200,banana), (200,apple), (200,pear))))

    -----------------------------------------

    repartition和partitionBy的区别

    repartition 和 partitionBy 都是对数据进行重新分区,默认都是使用 HashPartitioner,区别在于partitionBy 只能用于 PairRdd,当它们同时都用于 PairRdd时,partitionBy更接近我们的预期。repartition 其实使用了一个随机生成的数来当做 Key

    scala> val parRDD=sc.makeRDD(Array((100,"dog"),(100,"cat"),(200,"pear"),(100,"tiger"),(200,"apple"),(101,"lion"),(201,"banana"),(101,"elephent"),(300,"paper"),(300,"pen"),(200,"pig"),(300,"ballpen")))
    parRDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[8] at makeRDD at <console>:25

    scala> def partitionValueWthID(id:Int,iter:Iterator[(Int,String)])=({var result=scala.collection.mutable.Map[Int,List[(Int,String)]](); while(iter.hasNext){var partid=id;var elem=iter.next;if(result.contains(partid)){var elems=result(partid);elems::=elem;result(partid)=elems; } else result(partid)=List[(Int,String)]{elem}};result.toIterator})
    partitionValueWthID: (id: Int, iter: Iterator[(Int, String)])Iterator[(Int, List[(Int, String)])]

    scala> parRDD.repartition(4).mapPartitionsWithIndex(partitionValueWthID).collect
    res3: Array[(Int, List[(Int, String)])] = Array((0,List((200,pig), (101,elephent), (200,apple), (100,cat))), (1,List((300,ballpen), (300,paper), (101,lion), (200,pear))), (3,List((300,pen), (201,banana), (100,tiger), (100,dog))))

    scala> parRDD.partitionBy(new HashPartitioner(4)).mapPartitionsWithIndex(partitionValueWthID).collect
    res7: Array[(Int, List[(Int, String)])] = Array((0,List((300,ballpen), (200,pig), (300,pen), (300,paper), (200,apple), (100,tiger), (200,pear), (100,cat), (100,dog))), (1,List((101,elephent), (201,banana), (101,lion))))

    spark 2.2源码RDD.scala中的定义
      var position = (new Random(index)).nextInt(numPartitions)

    ----------------------
    RDD分区函数(Partitioner)
    分区划分对于shuffle类操作很关键,它决定了该操作的父RDD与子RDD之间的依赖关系。宽依赖或者窄依赖。
    spark默认提供两种划分器:哈希分区划分器(HashPartitioner)和范围分区划分器(RangePartitioner),且Partitioner只存在于(K,V)类型的RDD中,非(K,V)类型的partitioner值为None。

    scala> val parRDD=sc.makeRDD(Array((100,"dog"),(100,"cat"),(200,"pear"),(100,"tiger"),(200,"apple"),(100,"lion"),(200,"banana"),(100,"elephent"),(300,"paper"),(300,"pen"),(200,"pig"),(300,"ballpen")))
    parRDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[63] at makeRDD at <console>:25

    scala> nums.partitioner
    res18: Option[org.apache.spark.Partitioner] = None

    scala> val groupRDD=parRDD.groupByKey()
    groupRDD: org.apache.spark.rdd.RDD[(Int, Iterable[String])] = ShuffledRDD[62] at groupByKey at <console>:27

    scala> groupRDD.partitioner
    res24: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@4)

    scala> val lenRDD=groupRDD.mapValues(x=>{val arr=x.toArray;arr.length})
    lenRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[70] at mapValues at <console>:29

    scala> lenRDD.partitioner
    res34: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@5)

    scala> lenRDD.collect
    res35: Array[(Int, Int)] = Array((100,5), (300,3), (200,4))

  • 相关阅读:
    JavaScript 判断 URL
    AppCan 文件上传实例
    IIS6.0手动安装与配置asp.net2.0全过程
    轻松搭建一个Windows SVN服务器
    IIS6.0上某些文件类型不能下载
    JavaScript 数组转字符串,字符串转数组
    百度地图API 应用实例
    IIS7.5 下:HTTP 错误 404.17 Not Found 请求的内容似乎是脚本 解决方法
    SQL 交集 差集 并集 笛卡尔积 应用实例
    Win7 64位 IIS未能加载文件或程序集“System.Data.SQLite”或它的某一个依赖项
  • 原文地址:https://www.cnblogs.com/playforever/p/9466606.html
Copyright © 2011-2022 走看看