zoukankan      html  css  js  c++  java
  • spark RDD的元素顺序(ordering)测试

    通过实验发现:
    foreach()遍历的顺序是乱的
    但:
    collect()取到的结果是依照原顺序的
    take()取到的结果是依照原顺序的

    为什么呢????

    另外,可以发现:
    take()取到了指定数目的元素,就不再多取了

    scala> val rdd = sc.makeRDD((0 to 9), 4)
    scala> rdd.collect
    res27: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
    
    scala> rdd.partitions
    res13: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@691, org.apache.spark.rdd.ParallelCollectionPartition@692, org.apache.spark.rdd.ParallelCollectionPartition@693, org.apache.spark.rdd.ParallelCollectionPartition@694)
    
    scala> rdd.foreach(print(_))
    0178923456
    scala> rdd.foreach(print(_))
    5623401789
    
    scala> rdd.coalesce(1, false).foreach(print _)
    0123456789
    scala> rdd.coalesce(1, false).partitions
    res28: Array[org.apache.spark.Partition] = Array(CoalescedRDDPartition(0,ParallelCollectionRDD[0] at makeRDD at <console>:21,[I@63a3554,None))
    
    scala> rdd.foreachPartition((x:Iterator[Int])=>println(x.next))
    2
    0
    5
    7
    
    scala> rdd.mapPartitions((x:Iterator[Int])=>Array(x.next()).iterator).collect
    res4: Array[Int] = Array(0, 2, 5, 7)
    
    scala> rdd.keyBy((x:Int)=>x/4).collect
    res27: Array[(Int, Int)] = Array((0,0), (0,1), (0,2), (0,3), (1,4), (1,5), (1,6), (1,7), (2,8), (2,9))
    
    scala> rdd.groupBy(_/4).collect
    res7: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(0, 1, 2, 3)), (1,CompactBuffer(4, 5, 6, 7)), (2,CompactBuffer(8, 9)))
    
    scala> val jr = rdd.toJavaRDD
    jr: org.apache.spark.api.java.JavaRDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:21
    
    scala> jr.collectPartitions(Array(0,1))
    res20: Array[java.util.List[Int]] = Array([0, 1], [2, 3, 4])
    implicit object StringAccumulator extends org.apache.spark.AccumulatorParam[String]{
    def addInPlace(r1: String, r2: String) = r1 + "," + r2
    def zero(initialValue: String) = ""
    }
    
    scala> val a = sc.accumulator("")
    a: org.apache.spark.Accumulator[String] = 
    
    scala> sc.parallelize(0 to 1000, 99).flatMap((i:Int)=>{a+="f1-"+i; (i*2 to i*2 + 1)}).flatMap((i:Int)=>{a+="f2-"+i; (i*2 to i*2 + 1)}).take(10)
    res2: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
    
    scala> a
    res3: org.apache.spark.Accumulator[String] = ,,f1-0,f2-0,f2-1,f1-1,f2-2,f2-3,f1-2,f2-4
  • 相关阅读:
    痞子衡嵌入式:利用i.MXRT1060,1010上新增的FlexSPI地址重映射(Remap)功能可安全OTA
    “既生 ExecutorService, 何生 CompletionService?”
    55
    .map() is not a function【js报错】
    内网穿透之流量代理转发
    JDK8 String类知识总结
    Java并发编程(07):Fork/Join框架机制详解
    数据源管理 | 分布式NoSQL系统,Cassandra集群管理
    Solon详解(三)- Solon的web开发
    Solon详解(二)- Solon的核心
  • 原文地址:https://www.cnblogs.com/bluejoe/p/5115824.html
Copyright © 2011-2022 走看看