zoukankan      html  css  js  c++  java
  • Spark基础排序+二次排序(java+scala)

    1.基础排序算法

    sc.textFile("/data/putfile.txt").flatMap(_.split(" ")).map(word=>(word,1)).reduceByKey(_+_,1).map(pair=>(pair._2,pair._1)).sortByKey(false).map(pair=>(pair._2,pair._1)).collect
    //key value交换
    sc.setLogLevel("WARN")

    2.二次排序算法

    所谓二次排序就是指排序的时候考虑两个维度(有可能10次排序)
    Java版本

    package com.dt.java.spark;
    
    import akka.util.HashCode;
    import scala.math.Ordered;
    
    import java.io.Serializable;
    
    //实现Ordered接口(scala的)
    public class SecondarySort implements Ordered<SecondarySort>,Serializable {
    //自定义二次排序的key
    private int first;
    private int second;
    
    public int getFirst() {
    return first;
    }
    
    public void setFirst(int first) {
    this.first = first;
    }
    
    public int getSecond() {
    return second;
    }
    
    public void setSecond(int second) {
    this.second = second;
    }
    
    public SecondarySort(int first,int second)
    {
    this.first =first;
    this.second=second;
    }
    
    @Override
    public int compare(SecondarySort that) {
    if (this.first - that.getFirst()!=0)
    {
    return this.first - that.getFirst();
    }else
    {
    return this.second - that.getSecond();
    }
    
    }
    
    @Override
    public boolean $less(SecondarySort that) {
    if(this.first < that.getFirst())
    {
    return true;
    }else if(this.first == that.getFirst() && this.second < that.getSecond())
    {
    return true;
    }
    return false;
    }
    
    @Override
    public boolean $greater(SecondarySort that) {
    
    if(this.first > that.getFirst()){
    return true;
    }else if(this.first == that.getFirst() && this.second > that.second)
    {
    return true;
    }
    return false;
    }
    
    @Override
    public boolean $less$eq(SecondarySort that) {
    if(this.$less(that)){
    return true;
    }else if(this.first == that.getFirst() && this.second == that.second)
    {
    return true;
    }
    return false;
    }
    
    @Override
    public boolean $greater$eq(SecondarySort that) {
    if(this.$greater(that))
    {
    return true;
    }else if(this.first == that.getFirst() && this.second == that.getSecond())
    {
    return true;
    }
    return false;
    }
    
    @Override
    public int compareTo(SecondarySort that) {
    if (this.first - that.getFirst()!=0)
    {
    return this.first - that.getFirst();
    }else
    {
    return this.second - that.getSecond();
    }
    }
    
    @Override
    public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;
    
    SecondarySort that = (SecondarySort) o;
    
    if (first != that.first) return false;
    return second == that.second;
    
    }
    
    @Override
    public int hashCode() {
    int result = first;
    result = 31 * result + second;
    return result;
    }
    }
    package com.dt.java.spark;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import scala.Tuple2;
    
    //二次排序,具体实现步骤
    //第一步:按照Ordered和Serrializable接口实现自定义排序的Key
    //第二步:将要进行二次排序的文件加载进来生成《key,value》类型的RDD
    //第三步:使用sortByKey基于自定义的Key进行二次排序
    //第四步:去除掉排序的key,,只保留排序结果
    public class SecondarySortApp {
    public static void main(String[] args){
    
    SparkConf conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);
    
    JavaRDD lines = sc.textFile("D:\JavaWorkspaces\sparkproject\sparktest.txt");
    
    JavaPairRDD,String> pairs = lines.mapToPair(new PairFunction, SecondarySort, String>() {
    @Override
    public Tuple2, String> call(String line) throws Exception {
    String[] splited = line.split(" ");
    SecondarySort key = new SecondarySort(Integer.valueOf(splited[0]),Integer.valueOf(splited[1]));
    return new Tuple2, String>(key,line);
    }
    }
    );
    
    JavaPairRDD,String> sorted = pairs.sortByKey();//完成二次排序
    //过滤掉排序后自定的key,保留排序的结果
    JavaRDD secondarysorted = sorted.map(new Function, String>, String>() {
    @Override
    public String call(Tuple2, String> sortedContent) throws Exception {
    return sortedContent._2();
    }
    }
    );
    //
    secondarysorted.foreach(new VoidFunction() {
    @Override
    public void call(String sorted) throws Exception {
    System.out.println(sorted);
    }
    });
    }
    }//生成默认的构造器

    Scala版本

    package com.dt.scala.spark
    
    class SecondarySort(val first:Int, val second:Int) extends Ordered[SecondarySort] with Serializable{
    override def compare(that: SecondarySort): Int = {
    if(this.first - that.first != 0)
    {
    this.first - that.first
    }else {
    this.second - that.second
    }
    
    }
    }
    package com.dt.scala.spark
    
    import org.apache.spark.{SparkContext, SparkConf}
    
    object SecondarySortApp {
    def main (args: Array[String]) {
    //第一步;创建spark的配置对象sparkconf
    
    val conf = new SparkConf()//创建sparkconf对象
    conf.setAppName("SecondarySortApp")//设置应用程序的名称
    conf.setMaster("local")//设置本地运行
    
    //创建sparkcontext对象,sparkcontext是程序的唯一入口
    
    val sc = new SparkContext(conf)
    
    val lines = sc.textFile("D:\JavaWorkspaces\sparkproject\sparktest.txt")
    
    val pairWithSortkey = lines.map(line =>(
    new SecondarySort( line.split(" ")(0).toInt,line.split(" ")(1).toInt),line
    ))
    
    val sorted = pairWithSortkey.sortByKey(false)
    
    val sortedResult = sorted.map(sortedline => sortedline._2)
    sortedResult.collect.foreach(println)
    }
    }
  • 相关阅读:
    5个排序算法
    原生侧边栏sidebar
    静态方法、实例方法、继承
    函数作用域之闭包与this!
    OOP面向对象编程(下)
    数组方法篇二
    对象
    nginx windows负载均衡入门
    NVelocity
    python3简单爬虫
  • 原文地址:https://www.cnblogs.com/itboys/p/6015007.html
Copyright © 2011-2022 走看看