zoukankan      html  css  js  c++  java
  • spark-------------RDD 转换算子-----value类型(二)

    引言

    接上一篇博客

    正文

    sample

    • 函数签名:def sample(withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T]

    • 函数说明:根据指定的规则从数据集中抽取数据

    案例:随机抽取数字

    package com.xiao.spark.core.rdd.operator.tranform
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Spark01_RDD_Operator_Sample {
      def main(args: Array[String]): Unit = {
          // TODO 准备环境
          val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
          val sc = new SparkContext(sparkConf)
    
          val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    
        /**
         * 第一个参数表示:抽取后是否放回
         * 第二个参数表示:如果不放回,则表示每个数抽取的概率       伯努利
         *               如果返回,则表示每个数被抽取的可能次数   泊松算法
         * 第三个参数表示:随机数种子,一旦随机数种子确定,抽取的数也就确定了
         */
        println(rdd.sample(true, 0.4, 1).collect().mkString(","))
          // TODO 关闭环境
          sc.stop()
      }
    }
    

    运行截图:

    distinct

    • 函数签名:

      • def distinct()(implicit ord: Ordering[T] = null): RDD[T]
      • def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
    • 函数说明:将数据集中重复的数据去重

    和scala里的list的distinct区别。spark看代码解释

    • scala底层调用了HashSet
    • spark:map(x => (x, null)).reduceByKey((x, ) => x, numPartitions).map(._1)

    案例:去重

    package com.xiao.spark.core.rdd.operator.tranform
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Spark01_RDD_Operator_Distinct {
      def main(args: Array[String]): Unit = {
          // TODO 准备环境
          val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
          val sc = new SparkContext(sparkConf)
    
          val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4))
    
        /**
         * 数据去重
         * 底层原理:
         *  map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
         *  1、 1, 2, 3, 4, 1, 2, 3, 4
         *  2、(1,null),(2,null),(3,null),(4,null),(1,null),(2,null),(3,null),(4,null)
         *  3、(1,null),(2,null),(3,null),(4,null)
         *  4、1, 2, 3, 4
         */
        rdd.distinct().collect().foreach(println)
    
          // TODO 关闭环境
          sc.stop()
      }
    }
    

    运行截图:

    coalesce

    • 函数签名:def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]

    • 函数说明:根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本

    案例:减少分区个数。代码有注释解释

    package com.xiao.spark.core.rdd.operator.tranform
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Spark01_RDD_Operator_Coalesce {
      def main(args: Array[String]): Unit = {
          // TODO 准备环境
          val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
          val sc = new SparkContext(sparkConf)
    
          val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4,5,6),numSlices = 3)
    
          /**
           * 减少分区个数
           * 第一个参数表示:分区的个数
           * 第二个参数表示:是否均衡分配
           *              false:不均衡,那么之前的分区数据不会分开  (1,2) (3,4) (5,6)三个分区变成两个分区--->(1,2) (3,4,5,6)
           *              true:调用shuffle打乱数据重新分配    (1,2) (3,4) (5,6)三个分区变成两个分区,数据会打乱,重新改分配 ---->(1,4,5) (2,3,6)
           */
          val coalesceRDD: RDD[Int] = rdd.coalesce(2)
          coalesceRDD.saveAsTextFile("output")
          // TODO 关闭环境
          sc.stop()
      }
    }
    

    运行截图:

    repartition

    • 函数签名:def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

    • 函数说明:该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition操作都可以完成,因为无论如何都会经 shuffle 过程

    案例:

    package com.xiao.spark.core.rdd.operator.tranform
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Spark01_RDD_Operator_Repartition {
      def main(args: Array[String]): Unit = {
          // TODO 准备环境
          val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
          val sc = new SparkContext(sparkConf)
    
          val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4,5,6),numSlices = 2)
    
        /**
         * coalesce(分区个数,true),可以实现扩大分区个数。必须采用shuffle,否则没有意义。
         * 因为coalesce方法中同一个分区的数据不会分开,只有采用shuffle将数据打乱,才可以实现增加分区个数
         */
    
          /**
           * 扩大分区个数
           * 第一个参数表示:分区的个数
           * 底层采用的是:coalesce(分区个数,是否采用shuffle)
    
           */
          val repartitionRDD: RDD[Int] = rdd.repartition(3)
          repartitionRDD.saveAsTextFile("output")
          // TODO 关闭环境
          sc.stop()
      }
    }
    

    运行截图:

    sortBy

    • 函数签名:def sortBy[K](f: (T) => K,ascending: Boolean = true,numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

    • 函数说明:该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程

    案例:

    package com.xiao.spark.core.rdd.operator.tranform
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Spark01_RDD_Operator_SortBy {
      def main(args: Array[String]): Unit = {
          // TODO 准备环境
          val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
          val sc = new SparkContext(sparkConf)
          // Int类型排序
          //val rdd: RDD[Int] = sc.makeRDD(List(1, 4,3,6,7,2,5))
          // 元组进行排序
          val rdd: RDD[(String, Int)] = sc.makeRDD(List(("1", 20), ("4", 15), ("24", 2)))
    
          /**
           * 对数据进行排序,第一个参数为排序数据,第二个参数是否为升序,默认是升序
           */
          //按key值排序
          //rdd.sortBy(t=>t._1).collect().foreach(println)
          //按value值排序
          rdd.sortBy(t=>t._2,false).collect().foreach(println)
    
        // TODO 关闭环境
          sc.stop()
      }
    }
    
    

    运行截图:

  • 相关阅读:
    ABP之模块分析
    AutoMapper之ABP项目中的使用介绍
    Castle Windsor常用介绍以及其在ABP项目的应用介绍
    EasyUI实战经验总结,给有需要的人
    无法发送具有此谓词类型的内容正文
    ADO.NET 新特性之SqlBulkCopy
    SVN无法Cleanup
    Mac使用操作
    Mac下的Mysql无法登陆的问题
    mac 终端 常用命令
  • 原文地址:https://www.cnblogs.com/yangxiao-/p/14334078.html
Copyright © 2011-2022 走看看