zoukankan      html  css  js  c++  java
  • 进阶算子

    进阶算子:

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
    
    /**
      * 进阶算子
      */
    object FunctionDemo2 {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("CreateRDD").setMaster("local")
        val sc = new SparkContext(conf)
    
        //1.遍历RDD获取RDD中每一个分区中元素的值并进行计算,然后返回一个新的RDD
        val rdd1 = sc.makeRDD(List(1,2,3,4,5,6),3)
        /*
         ps: RDD = 1,2,3,4,5,6  --> 分了3个区  --> (1,2)分区1  (3,4)分区2  (5,6)分区3
    
         f: Iterator[T] => Iterator[U], 第一个参数是一个迭代器对象,这个迭代器对象获取的是对应分区中值
         preservesPartitioning: Boolean = false  第二参数是否保留父RDD的Partition分区信息,会使用默认值(这个作用会关系宽窄依赖)
         如果RDD的数据量不大,而且存在分区,建议使用mapPartition算子代替map算子,可以加快对数据的处速度
         如果RDD中数量过于庞大例如10亿条,不建议使用mapPartition,因为出现oom
         ps: 一条日志数据大小 1KB~2KB左右 --> 可以估算数据量
         */
        //第一个下划线是迭代器对象 第二个下划线是迭代器对象中存储的值
    
        val rdd2: RDD[Int] = rdd1.mapPartitions(_.map(_*10))
        println(rdd2.collect.toList)
    
        //对RDD中每个分区中数据进行遍历操作(可以打印 或 可以计算)
        //写一个迭代方法或函数 执行迭代操作
        /*
        f: (Int, Iterator[T]) => Iterator[U], 第一采参数是操作当前分区数据的函数,元组汇总第一个参数是分区序号 第二个参数是分区数据封装到的迭代器对象
          preservesPartitioning: Boolean = false 第二参数是否保留父RDD的Partition分区信息,会使用默认值(这个作用会关系宽窄依赖)
         */
        val Iter =(index:Int,iter:Iterator[(Int)])=>{
          iter.map(x => "[partID:"+index+" value:"+x+"]")
        }
        val rdd3:RDD[String] = rdd1.mapPartitionsWithIndex(Iter)
        println(rdd3.collect.toList)
    
        //1.排序
        //1.1 sortByKey:
        //根据key值进行排序,但是key必须实现Ordered接口,必须是一个对偶元组
        //这个算子有第二参数,第二个参数是boolean值决定了 是升序true  还是降序 false
        //默认是true
        val rdd4  = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
        val sorted: RDD[(Int, String)] = rdd4.sortByKey()
        println(sorted.collect.toList)
    
        //1.2 sortBy:
        //与sortByKey类似,不同点在于sortBy可以根据传入的参数决定谁来排序
        //第一个参数就是 根据谁排序
        //第二个参数就是 决定升序还是降序 是升序true 还是降序false 默认是true
        val rdd5  = sc.parallelize(List(23,432,123,5234,5423,5413,23,623,623,6423,623,25))
        val sorted_1: RDD[Int] = rdd5.sortBy(x => x,false)
        println(sorted_1.collect.toList)
    
        //2.重新分区
        /*
        一般在计算阶段即转换阶段轻易不会分区,在触发job,才会对分区进行一些修改(控制输出)
        如果在计算修改分区,绝对不会在发生shuffle的位置上修改的,而且不要频繁修改分区(只要修改分区就会触发shuffle)
         */
        val rdd6 = sc.parallelize(List(1,2,3,4,5,6),3)
        //2.1 partitions:
        //通过partitions这个算子触发RDD中分区存储到一个数组中,然后获取数据长度即当前RDD分区的个数
        println("初始分区值"+rdd6.partitions.length)
    
        //2.2 repartition:更改分区repartition可以更改分区的数量 --> 会发生shuffle
        val rdd6_1 = rdd6.repartition(1)//缩小分区
        println("修改分区个数为:"+rdd6_1.partitions.length)
        val rdd6_2 = rdd6.repartition(6)//扩大分区
        println("修改分区个数为:"+rdd6_2.partitions.length)
    
        //2.3 coalesce:允许发生将分区个数修改为小值,但是不允许将分区个数为大值
        //ps:coalesce 算子默认shuffle是false 不会发生shuffle,所以不能修改为大的分区值
        //一定要使用这个算子修改分区,建议可以开启shuffle但是没有repartition好用
        //这个算子是repartition底层实现 默认开启shuffle
        val rdd7_1 = rdd6.coalesce(1) //缩小分区是可以
        println("修改分区个数为:"+rdd7_1.partitions.length)
        val rdd7_2 = rdd6.coalesce(6) //扩大分区是不可以
        println("修改分区个数为:"+rdd7_2.partitions.length)
    
        //2.4 HashPartitioner
        val rdd8 = sc.parallelize(List(("e",5),("c",3),("d",4),("a",2),("b",1)),2)
        //更改分区,这个算子是需要传入自定分区器的,因为还没有学习自顶分区,所以这里可以使用一个万能分区 HashPartitioner
        val rdd8_1 = rdd8.partitionBy(new  HashPartitioner(4))
        println(rdd8_1.partitions.length)
        //更改分区 repartitionAndSortWithinPartitions 是repartition的一个变种,这个算子相当于集合了分区和排序
        //这个算子只能对 对偶元组使用 --> 对偶元组(二元组),会根据key的值进行排序,
        //如果需要在repartition之后再进行排序,此时就可以使用这个算子代替
        //要去传入的是一个自定义分区器即可
        rdd8.repartitionAndSortWithinPartitions(new HashPartitioner(1)).foreach(println)
    
        //3.求和
        //Spark中是很少使用Scala中求和方法sum reduce fold
        //Scala中的reduce方法在Spark中是action算子
    
        //3.1.根据相同key为一组进行求和(必须要有key)
        val rdd9 = sc.parallelize(Array(("tom",1),("jerry",3),("kitty",2),("jerry",2),("tom",2),("dog",10)))
        //返回值是一个全新的RDD,并且这个RDD中存储的是元组, key是原来的key value值时key所有value值的和
        val rdd9_1: RDD[(String, Int)] = rdd9.reduceByKey(_+_)
        println(rdd9_1.collect.toList)
    
        //3.2
        /*
        aggregateByKey(zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U) 根据相同key 为一组计算value的值
        在kv对的RDD中,按照key将value进行分区合并,合并完成后在进行全局合并
        计算流程:
        1.先做每个分区内数据的计算,zeroValue会参与到每个分区的计算
        2.zeroValue会和seqOP的逻辑进行计算,将聚合的结果会传递到combOp这个计算逻辑中也是根据相同key一组进行全局聚合
    
        zeroValue 是初始值(默认值) seqOP 局部聚合(分区内)  combOp 全局聚合(聚合分区计算的值)
         */
        val rdd10 = sc.parallelize(List(("cat",2),("cat",5),("pig",10),("dog",3),("dog",4),("cat",4)),2)
     

    /**
    * combineByKey 根据key进行求和计算看相同key为一组计算 分区值 ,然后在计算全局的值(分区计算值+分区计算值)
    * createCombiner: V => C,遍历分区中所有元素,因此每个元素的key要么是还没有遇到,要么就是已经遇到
    * 如果这个key是第一次出现,此时会调用createCombiner的函数来创建这各key的对应累加初始值
    * mergeValue: (C, V) => C, 如果这各key在当前分区中遇到多个(多个相同key),它会使用mergeValue这各函数通过之前的累加初始值
    * 和其他相同key的value进行求和计算 [分区求和]
    * mergeCombiners: (C, C) => C 由于每个分区独立的,因此对于同一个key可以有多个累加器(不用分区内),如果有两个获取多个都有key累加器
    * 就使用mergeCombiners函数进行分区结果聚合 [全局聚合]
    */

    val rdd11= sc.parallelize(List(("cat",2),("cat",5),("pig",10), ("dog",3),("dog",4),("cat",4)),2)
    //这个算子不能使用下划线
    val rdd11_1 = rdd11.combineByKey(x=>x,(a:Int,b:Int)=>a+b,(m:Int,n:Int)=>m+n)

      }
    
    }
    

      

  • 相关阅读:
    949. Largest Time for Given Digits
    450. Delete Node in a BST
    983. Minimum Cost For Tickets
    16. 3Sum Closest java solutions
    73. Set Matrix Zeroes java solutions
    347. Top K Frequent Elements java solutions
    215. Kth Largest Element in an Array java solutions
    75. Sort Colors java solutions
    38. Count and Say java solutions
    371. Sum of Two Integers java solutions
  • 原文地址:https://www.cnblogs.com/yumengfei/p/12030579.html
Copyright © 2011-2022 走看看