zoukankan      html  css  js  c++  java
  • Spark 自定义分区及区内二次排序demo

    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.Partitioner
    import org.apache.spark.HashPartitioner
    
    object Demo {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("app")
        val sc = new SparkContext(conf)
        val data = sc.textFile("F:\test\test\ssort.txt")
        
        //先分区, 再区内排序
        data.map{x=>
          val arr = x.split(" ")
          (arr(0),arr(1).toInt)
        }.partitionBy(new MySparkPartition(2)).mapPartitions{x=>
          //此处的sortBy为scala中list集合的方法, 与Spark中RDD的sortBy方法不一样,注意区分!!!
          x.toList.sortBy{case(x,y)=>
            (x, -y)
          }.toIterator
        }.saveAsTextFile("F:\test\test\output")
        
        //data.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).top(3)(Ordering.by(_._2)).foreach(println)
        
       /*data.map{x=>
         (new SecondarySortKey(x.split(" ")(0), x.split(" ")(1).toInt))
       }.sortBy(x=>x, true).map{x=>(x.first, x.second)}.foreach(println)*/
        
        /*data.sortBy({x=>
          (new SecondarySortKey(x.split(" ")(0), x.split(" ")(1).toInt))
        }, true).foreach(println)*/
        
        /*val data1 = data.sortBy({x=>
          (new SecondarySortKey(x.split(" ")(0),x.split(" ")(1).toInt))  
        },true).map{x=>
          val arr = x.split(" ")
          (arr(0), arr(1))
        }.partitionBy(new MySparkPartition(2)).saveAsTextFile("F:\test\test\output")*/
        
        /*val l1 = List[(String,Int)](("a",1),("b",2),("d",4),("c",3),("a",2))
        //l1.sortBy(x=>(x._1,x._2))(Ordering.Tuple2(Ordering.String,Ordering.Int.reverse))
        l1.sortBy{case(x,y) =>
          (x, -y)
        }
        .foreach(println)*/
      }
    }
    
    class MySparkPartition(numsPartitions:Int) extends Partitioner{
      def numPartitions:Int = numsPartitions
      
      override def getPartition(key:Any):Int={
        if(key == "aa"){
          return 1
        }else{
          return 0
        }
      }
    }
    
    class SecondarySortKey(val first:String, val second:Int) extends Ordered[SecondarySortKey] with Serializable{
      def compare(other:SecondarySortKey):Int={
        var comp = this.first.compareTo(other.first)
        if(comp == 0){
          other.second.compareTo(this.second)
        }else{
          comp
        }
      }
    }
    

      

  • 相关阅读:
    Jquery 添加插件
    后台添加前台标签
    jQuery.validate 中文API
    jquery validate 详解二
    jquery validate 详解一
    System.Collections里的一些接口
    C#中 Reference Equals, == , Equals的区别
    关于iOS原生条形码扫描,你需要注意的两三事
    layoutSubviews何时调用的问题(转)
    layoutSubviews总结
  • 原文地址:https://www.cnblogs.com/shuzhiwei/p/11322463.html
Copyright © 2011-2022 走看看