zoukankan      html  css  js  c++  java
  • Spark: Best practice for retrieving big data from RDD to local machine

    've got big RDD(1gb) in yarn cluster. On local machine, which use this cluster I have only 512 mb. I'd like to iterate over values in RDD on my local machine. I can't use collect(), because it would create too big array locally which more then my heap. I need some iterative way. There is method iterator(), but it requires some additional information, I can't provide.

    UDP: commited toLocalIterator method

    shareimprove this question
     
        
    toLocalIterator is not ideal if you want to iterate locally over a partition at a time – Landon Kuhn Oct 29 '14 at 2:25
    2  
    @LandonKuhn why not? – Tom Yubing Dong Aug 4 '15 at 23:02

    5 Answers

    Update: RDD.toLocalIterator method that appeared after the original answer has been written is a more efficient way to do the job. It uses runJob to evaluate only a single partition on each step.

    TL;DR And the original answer might give a rough idea how it works:

    First of all, get the array of partition indexes:

    val parts = rdd.partitions

    Then create smaller rdds filtering out everything but a single partition. Collect the data from smaller rdds and iterate over values of a single partition:

    for (p <- parts) {
        val idx = p.index
        val partRdd = rdd.mapPartitionsWithIndex(a => if (a._1 == idx) a._2 else Iterator(), true)
        //The second argument is true to avoid rdd reshuffling
        val data = partRdd.collect //data contains all values from a single partition 
                                   //in the form of array
        //Now you can do with the data whatever you want: iterate, save to a file, etc.
    }

    I didn't try this code, but it should work. Please write a comment if it won't compile. Of cause, it will work only if the partitions are small enough. If they aren't, you can always increase the number of partitions with rdd.coalesce(numParts, true).

    shareimprove this answer
     
        
    does this code cause each partition to be computed in serial when it loops through and call mapPartitionsWithIndex? What's the best way to remedy this? – foboi1122 Nov 18 '15 at 0:42
        
    @foboi1122 Please see updated answer – Wildfire Nov 18 '15 at 8:36 
        
    @Wildfire Will this approach resolve this. Else how to resolve using any or might be this approach. – ChikuMiku 2 days ago 

    Wildfire answer seems semantically correct, but I'm sure you should be able to be vastly more efficient by using the API of Spark. If you want to process each partition in turn, I don't see why you can't using map/filter/reduce/reduceByKey/mapPartitions operations. The only time you'd want to have everything in one place in one array is when your going to perform a non-monoidal operation - but that doesn't seem to be what you want. You should be able to do something like:

    rdd.mapPartitions(recordsIterator => your code that processes a single chunk)
    

    Or this

    rdd.foreachPartition(partition => {
      partition.toArray
      // Your code
    })
    
    shareimprove this answer
     
        
    Is't these operators execute on cluster? – epahomov Apr 3 '14 at 7:05
    1  
    Yes it will, but why are you avoiding that? If you can process each chunk in turn, you should be able to write the code in such a way so it can distribute - like using aggregate. – samthebest Apr 3 '14 at 15:54
        
    Is not the iterator returned by forEachPartitition the data iterator for a single partition - and not an iterator of all partitions? – javadba May 20 at 8:23

    Here is the same approach as suggested by @Wildlife but written in pyspark.

    The nice thing about this approach - it lets user access records in RDD in order. I'm using this code to feed data from RDD into STDIN of the machine learning tool's process.

    rdd = sc.parallelize(range(100), 10)
    def make_part_filter(index):
        def part_filter(split_index, iterator):
            if split_index == index:
                for el in iterator:
                    yield el
        return part_filter
    
    for part_id in range(rdd.getNumPartitions()):
        part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
        data_from_part_rdd = part_rdd.collect()
        print "partition id: %s elements: %s" % (part_id, data_from_part_rdd)
    

    Produces output:

    partition id: 0 elements: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    partition id: 1 elements: [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
    partition id: 2 elements: [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
    partition id: 3 elements: [30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
    partition id: 4 elements: [40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
    partition id: 5 elements: [50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
    partition id: 6 elements: [60, 61, 62, 63, 64, 65, 66, 67, 68, 69]
    partition id: 7 elements: [70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
    partition id: 8 elements: [80, 81, 82, 83, 84, 85, 86, 87, 88, 89]
    partition id: 9 elements: [90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
    
    shareimprove this answer
     

    Map/filter/reduce using Spark and download the results later? I think usual Hadoop approach will work.

    Api says that there are map - filter - saveAsFile commands:https://spark.incubator.apache.org/docs/0.8.1/scala-programming-guide.html#transformations

    shareimprove this answer
     
        
    Bad option. I don't want to do serialization/deserialization. So I want this data retrieving from spark – epahomov Feb 11 '14 at 10:37
        
    How do you intend to get 1gb without serde(i.e. storing on the disk.) ? on a node with 512mb ? – scrapcodesFeb 12 '14 at 9:13
    1  
    By iterating over the RDD. You should be able to get each partition in sequence to send each data item in sequence to the master, which can then pull them off the network and work on them. – interfect Feb 12 '14 at 18:07

    For Spark 1.3.1 , the format is as follows

    val parts = rdd.partitions
        for (p <- parts) {
            val idx = p.index
            val partRdd = data.mapPartitionsWithIndex { 
               case(index:Int,value:Iterator[(String,String,Float)]) => 
                 if (index == idx) value else Iterator()}
            val dataPartitioned = partRdd.collect 
            //Apply further processing on data                      
        }
    

     

  • 相关阅读:
    Coursera self-driving2, State Estimation and Localization Week2, kalman filter 卡尔曼滤波
    Coursera Self-driving1, introduction
    Coursera, Big Data 5, Graph Analytics for Big Data, Week 5
    初创电商公司Drop的数据湖实践
    什么是LakeHouse?
    Apache Hudi 0.5.1版本重磅发布
    Apache Hudi使用问题汇总(一)
    ApacheHudi常见问题汇总
    写入Apache Hudi数据集
    Hudi基本概念
  • 原文地址:https://www.cnblogs.com/seaspring/p/5631085.html
Copyright © 2011-2022 走看看