zoukankan      html  css  js  c++  java
  • Spark: Best practice for retrieving big data from RDD to local machine

    've got big RDD(1gb) in yarn cluster. On local machine, which use this cluster I have only 512 mb. I'd like to iterate over values in RDD on my local machine. I can't use collect(), because it would create too big array locally which more then my heap. I need some iterative way. There is method iterator(), but it requires some additional information, I can't provide.

    UDP: commited toLocalIterator method

    shareimprove this question
     
        
    toLocalIterator is not ideal if you want to iterate locally over a partition at a time – Landon Kuhn Oct 29 '14 at 2:25
    2  
    @LandonKuhn why not? – Tom Yubing Dong Aug 4 '15 at 23:02

    5 Answers

    Update: RDD.toLocalIterator method that appeared after the original answer has been written is a more efficient way to do the job. It uses runJob to evaluate only a single partition on each step.

    TL;DR And the original answer might give a rough idea how it works:

    First of all, get the array of partition indexes:

    val parts = rdd.partitions

    Then create smaller rdds filtering out everything but a single partition. Collect the data from smaller rdds and iterate over values of a single partition:

    for (p <- parts) {
        val idx = p.index
        val partRdd = rdd.mapPartitionsWithIndex(a => if (a._1 == idx) a._2 else Iterator(), true)
        //The second argument is true to avoid rdd reshuffling
        val data = partRdd.collect //data contains all values from a single partition 
                                   //in the form of array
        //Now you can do with the data whatever you want: iterate, save to a file, etc.
    }

    I didn't try this code, but it should work. Please write a comment if it won't compile. Of cause, it will work only if the partitions are small enough. If they aren't, you can always increase the number of partitions with rdd.coalesce(numParts, true).

    shareimprove this answer
     
        
    does this code cause each partition to be computed in serial when it loops through and call mapPartitionsWithIndex? What's the best way to remedy this? – foboi1122 Nov 18 '15 at 0:42
        
    @foboi1122 Please see updated answer – Wildfire Nov 18 '15 at 8:36 
        
    @Wildfire Will this approach resolve this. Else how to resolve using any or might be this approach. – ChikuMiku 2 days ago 

    Wildfire answer seems semantically correct, but I'm sure you should be able to be vastly more efficient by using the API of Spark. If you want to process each partition in turn, I don't see why you can't using map/filter/reduce/reduceByKey/mapPartitions operations. The only time you'd want to have everything in one place in one array is when your going to perform a non-monoidal operation - but that doesn't seem to be what you want. You should be able to do something like:

    rdd.mapPartitions(recordsIterator => your code that processes a single chunk)
    

    Or this

    rdd.foreachPartition(partition => {
      partition.toArray
      // Your code
    })
    
    shareimprove this answer
     
        
    Is't these operators execute on cluster? – epahomov Apr 3 '14 at 7:05
    1  
    Yes it will, but why are you avoiding that? If you can process each chunk in turn, you should be able to write the code in such a way so it can distribute - like using aggregate. – samthebest Apr 3 '14 at 15:54
        
    Is not the iterator returned by forEachPartitition the data iterator for a single partition - and not an iterator of all partitions? – javadba May 20 at 8:23

    Here is the same approach as suggested by @Wildlife but written in pyspark.

    The nice thing about this approach - it lets user access records in RDD in order. I'm using this code to feed data from RDD into STDIN of the machine learning tool's process.

    rdd = sc.parallelize(range(100), 10)
    def make_part_filter(index):
        def part_filter(split_index, iterator):
            if split_index == index:
                for el in iterator:
                    yield el
        return part_filter
    
    for part_id in range(rdd.getNumPartitions()):
        part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
        data_from_part_rdd = part_rdd.collect()
        print "partition id: %s elements: %s" % (part_id, data_from_part_rdd)
    

    Produces output:

    partition id: 0 elements: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    partition id: 1 elements: [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
    partition id: 2 elements: [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
    partition id: 3 elements: [30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
    partition id: 4 elements: [40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
    partition id: 5 elements: [50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
    partition id: 6 elements: [60, 61, 62, 63, 64, 65, 66, 67, 68, 69]
    partition id: 7 elements: [70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
    partition id: 8 elements: [80, 81, 82, 83, 84, 85, 86, 87, 88, 89]
    partition id: 9 elements: [90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
    
    shareimprove this answer
     

    Map/filter/reduce using Spark and download the results later? I think usual Hadoop approach will work.

    Api says that there are map - filter - saveAsFile commands:https://spark.incubator.apache.org/docs/0.8.1/scala-programming-guide.html#transformations

    shareimprove this answer
     
        
    Bad option. I don't want to do serialization/deserialization. So I want this data retrieving from spark – epahomov Feb 11 '14 at 10:37
        
    How do you intend to get 1gb without serde(i.e. storing on the disk.) ? on a node with 512mb ? – scrapcodesFeb 12 '14 at 9:13
    1  
    By iterating over the RDD. You should be able to get each partition in sequence to send each data item in sequence to the master, which can then pull them off the network and work on them. – interfect Feb 12 '14 at 18:07

    For Spark 1.3.1 , the format is as follows

    val parts = rdd.partitions
        for (p <- parts) {
            val idx = p.index
            val partRdd = data.mapPartitionsWithIndex { 
               case(index:Int,value:Iterator[(String,String,Float)]) => 
                 if (index == idx) value else Iterator()}
            val dataPartitioned = partRdd.collect 
            //Apply further processing on data                      
        }
    

     

  • 相关阅读:
    KMP算法
    找出第二大的数
    webpack 3 优化
    CocoaPods安装
    自适应水平垂直居中
    找出两个数组中都有,并且重复次数最多的元素
    swift 笔记
    Promise 用es5的基础实现
    $.ajax仿axios封装
    js基础拖拽效果
  • 原文地址:https://www.cnblogs.com/seaspring/p/5631085.html
Copyright © 2011-2022 走看看