zoukankan      html  css  js  c++  java
  • spark算子:partitionBy对数据进行分区

    def partitionBy(partitioner: Partitioner): RDD[(K, V)]

    该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。

    scala> var rdd1 = sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2)
    rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[23] at makeRDD at :21
     
    scala> rdd1.partitions.size
    res20: Int = 2
     
    //查看rdd1中每个分区的元素
    scala> rdd1.mapPartitionsWithIndex{
         |         (partIdx,iter) => {
         |           var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()
         |             while(iter.hasNext){
         |               var part_name = "part_" + partIdx;
         |               var elem = iter.next()
         |               if(part_map.contains(part_name)) {
         |                 var elems = part_map(part_name)
         |                 elems ::= elem
         |                 part_map(part_name) = elems
         |               } else {
         |                 part_map(part_name) = List[(Int,String)]{elem}
         |               }
         |             }
         |             part_map.iterator
         |            
         |         }
         |       }.collect
    res22: Array[(String, List[(Int, String)])] = Array((part_0,List((2,B), (1,A))), (part_1,List((4,D), (3,C))))
    //(2,B),(1,A)在part_0中,(4,D),(3,C)在part_1中
     
    //使用partitionBy重分区
    scala> var rdd2 = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))
    rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[25] at partitionBy at :23
     
    scala> rdd2.partitions.size
    res23: Int = 2
     
    //查看rdd2中每个分区的元素
    scala> rdd2.mapPartitionsWithIndex{
         |         (partIdx,iter) => {
         |           var part_map = scala.collection.mutable.Map[String,List[(Int,String)]]()
         |             while(iter.hasNext){
         |               var part_name = "part_" + partIdx;
         |               var elem = iter.next()
         |               if(part_map.contains(part_name)) {
         |                 var elems = part_map(part_name)
         |                 elems ::= elem
         |                 part_map(part_name) = elems
         |               } else {
         |                 part_map(part_name) = List[(Int,String)]{elem}
         |               }
         |             }
         |             part_map.iterator
         |         }
         |       }.collect
    res24: Array[(String, List[(Int, String)])] = Array((part_0,List((4,D), (2,B))), (part_1,List((3,C), (1,A))))
    //(4,D),(2,B)在part_0中,(3,C),(1,A)在part_1中

    参考:http://lxw1234.com/archives/2015/07/356.htm

  • 相关阅读:
    JQuery Ajax 在asp.net中使用总结
    直接拿来用!最火的Android开源项目(一)
    专访邓凡平:Android开发路上的快速学习之道
    C/C++使用心得:enum与int的相互转换
    学习汇编的第一步
    《汇编程序》王爽实验9的解法(显示的问题)
    《汇编程序》王爽实验10.2的解法
    GDB调试精粹及使用实例
    aptget 使用详解
    《汇编程序》王爽实验10.3的解法
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/7800793.html
Copyright © 2011-2022 走看看