zoukankan      html  css  js  c++  java
  • Spark(三)【RDD中的自定义排序】

    在RDD中默认的算子sortBy,sortByKey只能真的值类型数据升序或者降序

    现需要对自定义对象进行自定义排序。

    一组Person对象

      /**
       * Person 样例类
       * @param name
       * @param age
       */
      case class Person1(name: String, age: Int) {
        override def toString = {
          "name: " + name + ",age: " + age
        }
    val list = List(Person1("tom", 12), Person1("tom1", 13), Person1("tom2", 13))
    

    sortBy:单Value类型RDD排序

    方法一:类继承Ordered

      //类继承Ordered
      case class Person(name: String, age: Int) extends Ordered[Person] with Serializable {
        //重写toString
        override def toString = {
          "name: " + name + ",age: " + age
        }
    
        //自定义排序
        override def compare(that: Person): Int = {
          //先按照age降序排序
          var result = -this.age.compareTo(that.age)
          //如果age相同,按照name升序排序
          if (result == 0) {
            result = this.name.compareTo(that.name)
          }
          result
        }
      }
    

    使用

        val list = List(Person("tom", 12), Person("tom1", 13), Person("tom2", 13))
        val conf = new SparkConf().setAppName("RDDTest").setMaster("local[*]")
        val sc = new SparkContext(conf)
        sc.makeRDD(list).sortBy(x=>x).saveAsTextFile("output2")
    

    方法二:实现Ordering

       //类不需要改动
       case class Person1(name: String, age: Int) {
        override def toString = {
          "name: " + name + ",age: " + age
        }
    

    使用

        val list = List(Person1("tom", 12), Person1("tom1", 13), Person1("tom2", 13))
        val rdd = sc.makeRDD(list)
        //自定义排序: age降序,name升序,先按照谁排序,就放在前面                        reverse:反转,降序
        rdd.sortBy(person => (person.age, person.name), numPartitions = 1)(Ordering.Tuple2(Ordering.Int.reverse, Ordering.String)
          , ClassTag(classOf[Tuple2[String, Int]])).saveAsTextFile("output5")
    

    sortByKey:Key-Value类型RDD排序

    只能针对key对k-v数据进行排序

    方法一:类继承Ordered

    同sortBy方法一,样例类继承ORdered

    将单值转为K-V类型,key为Person对象。

    方法二:实现Ordering

    创建一个Person1类型的隐式Ordering[Person1]的比较器

    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * @description: TODO
     * @author: HaoWu
     * @create: 2020年08月04日
     */
    object SortByKeyOrderingTest {
      def main(args: Array[String]): Unit = {
        //创建一个Person1类型的隐式Ordering[Person1]的比较器
        implicit val ord = new Ordering[Person1] {
          //自定义排序:age降序,name升序
          override def compare(x: Person1, y: Person1): Int = {
            //age降序
            var result = -x.age.compareTo(y.age)
            //name升序
            if (result == 0){
              result = x.name.compareTo(y.name)
            }
            result
          }
        }
        val list = List(Person1("tom", 12), Person1("tom1", 13), Person1("tom2", 13))
        val conf = new SparkConf().setAppName("RDDTest").setMaster("local[*]")
        val sc = new SparkContext(conf)
        val rdd = sc.makeRDD(list)
        //转为K-V形式,按照key排序
        rdd.map((_,1)).sortByKey().coalesce(1).saveAsTextFile("output")
      }
    }
    
    /**
     * Person 样例类
     * @param name
     * @param age
     */
    case class Person1(name: String, age: Int) {
      override def toString = {
        "name: " + name + ",age: " + age
      }
    }
    
    
    结果:
    (name: tom1,age: 13,1)
    (name: tom2,age: 13,1)
    (name: tom,age: 12,1)
    
  • 相关阅读:
    南阳oj 814 又见拦截导弹
    南阳 zb的生日和邮票分你一般(01背包思路)
    导弹拦截(最长下降子序列)变形
    控件绝对定位函数
    小玩意
    java获取本机所有可用字体
    按键监听类KeyListener及适配器改良方案
    编译php-memcached扩展
    Memcached安装,启动,连接
    Apache配置虚拟主机
  • 原文地址:https://www.cnblogs.com/wh984763176/p/13432806.html
Copyright © 2011-2022 走看看