zoukankan      html  css  js  c++  java
  • Spark(RDD)

    RDD

    1.所谓的RDD,其实就是一个数据结构,类似于链表中的Node

    2.RDD中有适合并行计算的分区操作

    3.RDD中封装了最小的计算单元,目的是更适合重复使用

    4.Spark的计算主要就是通过组合RDD的操作,完成业务需求

    1.从集合(内存)中创建RDD

    从集合中创建RDD,Spark主要提供了两个方法:parallelize和makeRDD

    val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
            conf.set("spark.default.parallelism", "4")
            val sc = new SparkContext(conf)
          val rdd: RDD[Int] = sc.makeRdd(List(1,2,3,4),2)
          val rdd: RDD[Int] = sc.textFile("input",2)
          sc.stop()
    
    

    2.RDD转换算子

    RDD根据数据处理方式的不同将算子整体上分为Value类型、双Value类型和Key-Value类型

    value类型

    1)map

    将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
    val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
    val dataRDD1: RDD[Int] = dataRDD.map(
        num => {
            num * 2
        }
    )
    val dataRDD2: RDD[String] = dataRDD1.map(
        num => {
            "" + num
        }
    )
    
    

    2)mapPartitions

    将待处理的数据以分区为单位发送到计算节点进行处理,map 是一个数据一个数据的处理,mapPartitions是一个分区为处理单位
    // TODO 算子 - 转换 -
            val rdd = sc.makeRDD(List(1,2,3,4), 2)
            // 获取每个数据分区的最大值
            // 【1,2】【3,4】
            val rdd1 = rdd.mapPartitions(
                list => {
                    val max = list.max
                    List(max).iterator
                }
            )
            rdd1.collect.foreach(println)
    

    3)mapPartitionsWithIndex

    将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
     val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),2)
    
        val rdd2: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex {
          case (index, datas) => {
    
            datas.map((_,index))
          }
        }
    

    4)flatMap

    将处理的数据进行扁平化后再进行映射处理,
    val rdd = sc.makeRDD(
                List(
                    "Hello Scala", "Hello Spark"
                )
            )
            val rdd1 = sc.makeRDD(
                List(
                    List(1,2), List(3,4)
                )
            )
    
            // 整体 => 个体
            //val rdd2 = rdd.flatMap(_.split(" "))
            val rdd2 = rdd.flatMap(
                str => { // 整体(1)
                    // 容器(个体(N))
                    str.split(" ")
                }
            )
    
            val rdd3 = rdd1.flatMap(
                list => {
                    list
                }
            )
    
    val rdd = sc.makeRDD(
                List(List(1,2),3,List(4,5))
            )
    
            val rdd1 = rdd.flatMap {
                case list : List[_] => list
                case other => List(other)
            }
    
            rdd1.collect.foreach(println)
    
    

    5)glom

    将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变 
    相当于聚合,将相同分区的数据放到一个数组中
    val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6), 2)
    
            val rdd1: RDD[Array[Int]] = rdd.glom()
    
            rdd1.collect().foreach(a => println(a.mkString(",")))
    
     val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),2)
        val rdd2: RDD[Array[Int]] = rdd.glom()
         val rdd3: RDD[Int] = rdd2.map(_.max)
        println(rdd3.collect().sum)
    

    6)groupBy 可以实现wordcount

    默认情况下,数据处理后,所在分区不会发生改变
    Spark要求,一个组的数据必须在一个分区中
    一个分区的数据被打乱重新和其他分区的数据组合在一起,这个操作称之为shuffle
    shuffle操作不允许在内存中等待,必须落盘
    从服务器日志数据apache.log中获取每个时间段访问量
    

    val rdd: RDD[String] = sc.textFile("data/apache.log")
       //(10,1),(11,1),(10,1)
        val rdd2: RDD[(String, Int)] = rdd.map(line => {
          val strings: Array[String] = line.split(" ")
          val str: String = strings(3)
          val strings1: Array[String] = str.split(":")
          (strings1(1), 1)
        })
        val rdd3: RDD[(String, Iterable[(String, Int)])] = rdd2.groupBy(_._1)
        //(10,list((10,1),(10,1)))
        val rdd4: RDD[(String, Int)] = rdd3.mapValues(_.size)
        rdd4.collect().foreach(println)
    

    7)filter

    将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。

    当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

     val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6))
    
            // filter算子可以按照指定的规则对每一条数据进行筛选过滤
            // 数据处理结果为true,表示数据保留,如果为false,数据就丢弃
            val rdd1 = rdd.filter(
                num => num % 2 == 1    //等于1的留下  其他的舍弃
            )
    
            rdd1.collect.foreach(println)
    

    8)sample

    // 抽取数据,采样数据
            // 第一个参数表示抽取数据的方式:true. 抽取放回,false. 抽取不放回
            // 第二个参数和第一个参数有关系
            //     如果抽取不放回的场合:参数表示每条数据被抽取的几率
            //     如果抽取放回的场合:参数表示每条数据希望被重复抽取的次数
            // 第三个参数是【随机数】种子
            //     随机数不随机,所谓的随机数,其实是通过随机算法获取的一个数
            //     3 = xxxxx(10)
            //     7 = xxxxx(3)
            //val rdd1: RDD[Int] = rdd.sample(false, 0.5)
            //val rdd1: RDD[Int] = rdd.sample(true, 2)
            val rdd1: RDD[Int] = rdd.sample(false, 0.5, 2)
            rdd1.collect.foreach(println)
    

    9) coalesce

     // TODO 算子 - 转换 - 缩减分区
            val rdd : RDD[Int] = sc.makeRDD(
                List(1,2,3,4,5,6), 3
            )
    
            // 缩减 (合并), 默认情况下,缩减分区不会shuffle
            //val rdd1: RDD[Int] = rdd.coalesce(2)
            // 这种方式在某些情况下,无法解决数据倾斜问题,所以还可以在缩减分区的同时,进行数据的shuffle操作
            val rdd2: RDD[Int] = rdd.coalesce(2, true)   //true代表数据的shuffle操作  将数据打乱和其他分区的数据组合在一起
    
            rdd.saveAsTextFile("output")
            rdd2.saveAsTextFile("output1")
    

    10) distinct

    底层有shuffle

     // TODO 算子 - 转换
            val rdd : RDD[Int] = sc.makeRDD(
                List(1,1,1)
            )
    
            // map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
            // 【1,1,1】
            // 【(1, null),(1, null),(1, null)】
            // 【null, null, null】
            // 【null, null】
            // 【(1, null)】
            // 【1】
            val rdd1: RDD[Int] = rdd.distinct()
            rdd1.collect.foreach(println)
    
            //List(1,1,1,1,1).distinct
    

    11) repartition

    该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,
    repartition操作都可以完成,因为无论如何都会经shuffle过程。
     // TODO 算子 - 转换 - 缩减分区
            val rdd : RDD[Int] = sc.makeRDD(
                List(1,2,3,4,5,6), 2
            )
            // 扩大分区 - repartition
            // 在不shuffle的情况下,coalesce算子扩大分区是没有意义的。
            //val rdd1: RDD[Int] = rdd.coalesce(3, true)
    
            val rdd1: RDD[Int] = rdd.repartition(3)  
    

    12) sortBy

    排序 默认升序,第二个参数改为flase即为降序
    该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,
    默认为升序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。中间存在shuffle的过程
        val rdd: RDD[Int] = sc.makeRDD(List(1,3,6,2,1,7,5),2)
       val rdd1: RDD[Int] = rdd.sortBy(num=>num,false,3)    //参数1:排序规则 参数2:默认升序  flase为降序,参数3:3为分区数,默认分区不变
        rdd1.collect().foreach(println)
        sc.stop()
    

    双Value类型

    1)intersection(交集)union(并集)subtract(差集)zip(拉链)

    intersection:对源RDD和参数RDD求交集后返回一个新的RDD 如果数据类型不一致的话报错

    union:对源RDD和参数RDD求并集后返回一个新的RDD,保留重复元素

    subtract:以一个RDD元素为主,去除两个RDD中重复元素,将其他元素保留下来。求差集

    zip:将两个RDD中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的相同位置的元素。每个分区元素个数相同,分区数相同

    // TODO 算子 - 转换 - 排序
        val rdd : RDD[Int] = sc.makeRDD(
          List(1,2,3,4),2
        )
    
        val rdd1 : RDD[Int] = sc.makeRDD(
          List(3,4,5,6),2
        )
        val rdd2 : RDD[String] = sc.makeRDD(
          List("3","4","5", "6"),2
        )
        // 交集
        println(rdd.intersection(rdd1).collect().mkString(","))
        // 并集
        println(rdd.union(rdd1).collect().mkString(","))
        // 差集
        println(rdd.subtract(rdd1).collect().mkString(","))
    
        // 拉链
        // 英文翻译:
        // Can only zip RDDs with same number of elements in each partition  每个分区元素个数相同
        // Can't zip RDDs with unequal numbers of partitions: List(2, 3) 分区个数相同
        println(rdd.zip(rdd1).collect().mkString(","))
        println(rdd.zip(rdd2).collect().mkString(","))
    打印结果
    4,3
    1,2,3,4,3,4,5,6
    2,1
    (1,3),(2,4),(3,5),(4,6)
    (1,3),(2,4),(3,5),(4,6)
    

    Key-Value类型

    1)partitionBy

    将数据按照指定Partitioner重新进行分区。Spark默认的分区器是HashPartitioner

    val rdd : RDD[Int] = sc.makeRDD(
          List(1,2,3,4),2
        )
        val rdd1: RDD[(Int, Int)] = rdd.map((_,1))
        val rdd2: RDD[(Int, Int)] = rdd1.partitionBy(new HashPartitioner(2))
      rdd2.collect().foreach(println)
    

    2)reduceByKey (可以实现wordcount)

    可以将数据按照相同的Key对Value进行聚合 对key分组然后进行对value聚合 相当于groupByKey+mapValues

     // TODO 算子 - 转换 - 排序
        val rdd  = sc.makeRDD(
         List(("a",1),("b",1),("a",2))
        )
        val rdd1: RDD[(String, Int)] = rdd.reduceByKey(_+_)
        rdd1.collect().foreach(println)
        sc.stop()
    

    3)groupByKey 有shuffle流程

    // TODO groupByKey & groupBy
            // 1. groupBy不需要考虑数据类型,groupByKey必须保证数据kv类型
            // 2. groupBy按照指定的规则进行分组,groupByKey必须根据key对value分组
            // 3. 返回结果类型
            //    groupByKey => (String, Iterable[Int])
            //    groupBy    => (String, Iterable[(String, Int)])
    val rdd : RDD[(String, Int)] = sc.makeRDD(
                List(
                    ("a", 1),
                    ("a", 1),
                    ("a", 1)
                )
            )
    val rdd1: RDD[(String, Iterable[Int])] = rdd.groupByKey()
    
            val rdd2: RDD[(String, Int)] = rdd1.mapValues(_.size)
    
            rdd2.collect.foreach(println)
    

    reduceBykey 和groupByKey的区别
    从shuffle的角度:reduceByKey和groupByKey都存在shuffle的操作,但是reduceByKey可以在shuffle前对分区内相同key的数据进行预聚合(combine)功能,
    这样会减少落盘的数据量,从而减少了读取数据量的时间。而groupByKey只是进行分组,不存在数据量减少的问题,reduceByKey性能比较高。
    从功能的角度:reduceByKey其实包含分组和聚合的功能。groupByKey只能分组,不能聚合,所以在分组聚合的场合下,
    推荐使用reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用groupByKey
    
    

    4)aggregateByKey

    aggregateByKey算子存在函数柯里化
    第一个参数列表中有一个参数
    参数为零值,表示计算初始值 zero, z, 用于数据进行分区内计算
    第二个参数列表中有两个参数
    第一个参数表示 分区内计算规则
    第二个参数表示 分区间计算规则
    
     val rdd: RDD[(String, Int)] = sc.makeRDD(List(
          ("a", 1), ("a", 2), ("c", 3),
          ("b", 4), ("c", 5), ("c", 6)
        ), 2)
        val rdd1: RDD[(String, Int)] = rdd.aggregateByKey(0)(
          (x, y) => math.max(x, y),
          (x, y) => x + y
        )
        rdd1.collect().foreach(println)
    (b,4)
    (a,2)
    (c,9)
    

    5)foldByKey

    如果aggregateByKey算子的分区内计算逻辑和分区间计算逻辑相同,那么可以使用foldByKey算子简化

    // TODO aggregateByKey也可以实现WordCount ( 4 / 10 )
            val rdd2 = rdd.aggregateByKey(0)(_+_, _+_)
    
            // TODO foldByKey也可以实现WordCount ( 5 / 10 )
            // TODO 如果aggregateByKey算子的分区内计算逻辑和分区间计算逻辑相同,那么可以使用foldByKey算子简化
            val rdd3 = rdd.foldByKey(0)(_+_)
    

    6)combineByKey

     // combineByKey算子有三个参数
            // 第一个参数表示 当第一个数据不符合我们的规则时,用于进行转换的操作
            // 第二个参数表示 分区内计算规则
            // 第三个参数表示 分区间计算规则
    
    val rdd = sc.makeRDD(
                List(
                    ("a", 1), ("a", 2), ("b", 3),
                    ("b", 4), ("b", 5), ("a", 6)
                ),
                2
            )
    val rdd2 = rdd.combineByKey(
                num => (num, 1),
                (x : (Int, Int), y) => {
                    (x._1 + y, x._2 + 1)
                },
                ( x : (Int, Int), y:(Int, Int) ) => {
                    (x._1 + y._1, x._2 + y._2)
                }
            )
            rdd2.collect.foreach(println)
    

    7)sortByKey

    返回一个按照key进行排序

        val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
        val value: RDD[(String, Int)] = dataRDD1.sortByKey()
        value.collect().foreach(println)
    

    8)join

    spark中join操作主要针对于两个数据集中相同的key的数据连接
    join操作可能会产生笛卡尔乘积,可能会出现shuffle,性能比较差
    所以如果能使用其他方式实现同样的功能,不推荐使用join
    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
        val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))
        rdd.join(rdd1).collect().foreach(println)
    

    行动算子

    1)aggregate

    // aggregate & aggregateByKey的区别?
            // 1. 数据格式
            // 2. aggregateByKey是一个转换算子,所以执行后会产生新的RDD
            //    aggregate是一个行动算子,所以执行后会得到结果
            // 3. aggregateByKey执行计算时,初始值只会参与分区内计算
            //    aggregate执行计算时,初始值会参与分区内计算,也会参与分区间的计算
            //  【1,4】,【3,2】
            //  【5,1,4】,【5,3,2】
            // 【10】【10】
            // 【5, 10, 10】
    val rdd = sc.makeRDD(List(1,4,3,2),2)
            val i: Int = rdd.aggregate(5)(_ + _, _ + _)
            val j: Int = rdd.fold(5)(_ + _)
            val k: Int = rdd.reduce(_ + _)
    25
    

    2)collect

    在驱动程序(Driver)中,以数组Array的形式返回数据集的所有元素
    将数据从Executor端采集回到Driver端
    // collect会将数据全部拉取到Driver端的内存中,形成数据集合,可能会导致内存溢出
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    
    // 收集数据到Driver
    rdd.collect().foreach(println)
    
    

    3)foreach

    collect是按照分区号进行采集,先采集0号分区,依次采集

            // TODO 算子 - 行动
            val rdd = sc.makeRDD(
                List(1,4,3,2),2
            )
    
            // collect是按照分区号码进行采集
            rdd.collect.foreach(println)
            println("****************************")
            rdd.foreach(println)
    1
    4
    3
    2
    ****************************
    1
    3
    4
    2
    
  • 相关阅读:
    0902-用GAN生成动漫头像
    0901-生成对抗网络GAN的原理简介
    AES加密
    排序问题
    js中0.1+0.2!=0.3的问题
    关于JavaScript中Number整数最大长度的一个疑问
    IEEE 754标准
    关于浏览器接口Preview中的数值和postman中获取到的不一致问题
    .Net Core 配置之long类型 前端精度丢失和时间格式设置
    .netcore GRPC根据协议生成代码,以及去掉非空判断
  • 原文地址:https://www.cnblogs.com/xiao-bu/p/14843007.html
Copyright © 2011-2022 走看看