zoukankan      html  css  js  c++  java
  • RDD操作

    写了一个月spark,用了无数RDD操作,总结一下~

    RDD Transform

    • map(func) : 数据集中的每个元素经过用户自定义的函数转换形成一个新的RDD
      val rdd = sc.parallelize(1 to 5)
      val result = rdd.map(x => x to 3)

       输出:

      Range(1, 2, 3)
      Range(2, 3)
      Range(3)
      Range()
      Range()
    • flatmap(func) : 与map类似,但每个元素输入项都可以被映射到0个或多个的输出项,最终将结果”扁平化“后输出
      val rdd = sc.parallelize(1 to 5)
      val result = rdd.flatMap(x => x to 3)

      输出:

      1
      2
      3
      2
      3
      3
    • union(otherDataset) : 将两个RDD中的数据集进行合并,最终返回两个RDD的并集,若RDD中存在相同的元素也不会去重
      val rdd1 = sc.parallelize(1 to 3)
      val rdd2 = sc.parallelize(3 to 5)
      val result = rdd1.union(rdd2)

      输出:

      1
      2
      3
      3
      4
      5
    • intersection(otherDataSet) : 返回两个RDD的交集
      val rdd1 = sc.parallelize(1 to 3)
      val rdd2 = sc.parallelize(3 to 5)
      val result = rdd1.intersection(rdd2)

      输出:

      3                      
    • cogroup(otherDataset, numPartitions) : 对两个RDD(如:(K,V)和(K,W))相同Key的元素先分别做聚合,最后返回(K,Iterator<V>,Iterator<W>)形式的RDD, numPartitions设置分区数,提高作业并行度
      val rdd1 = sc.parallelize(List(("A", 1), ("B", 2), ("A", 2), ("B", 3), ("C", 4)))
      val rdd2 = sc.parallelize(List(("A", "A1"), ("B", "B1"), ("A", "A2"), ("B", "B2")))
      val result = rdd1.cogroup(rdd2)

      输出:

      (A,(CompactBuffer(1, 2),CompactBuffer(A1, A2)))
      (B,(CompactBuffer(2, 3),CompactBuffer(B1, B2)))
      (C,(CompactBuffer(4),CompactBuffer()))

    • join(otherDataset, [numTasks]) : 对两个RDD先进行cogroup操作形成新的RDD,再对每个Key下的元素进行笛卡尔积,numPartitions设置分区数,提高作业并行度
      val rdd1 = sc.parallelize(List(("A", 1), ("B", 2), ("A", 2), ("B", 3), ("C", 4)))
      val rdd2 = sc.parallelize(List(("A", "A1"), ("B", "B1"), ("A", "A2"), ("B", "B2")))
      val result = rdd1.join(rdd2)

      输出:

      // 注意:没有C,和空集的笛卡儿积为空集
      (A,(1,A1))
      (A,(1,A2))
      (A,(2,A1))
      (A,(2,A2))
      (B,(2,B1))
      (B,(2,B2))
      (B,(3,B1))
      (B,(3,B2))
    • distinct([numTasks]) : 对RDD中的元素进行去重
      val rdd1 = sc.parallelize(1 to 3)
      val rdd2 = sc.parallelize(3 to 5)
      val result = rdd1.union(rdd2).distinct()

      输出:

      1
      2
      3
      4
      5
    • reduceByKey(func, [numTasks]) : 按Key进行分组,使用给定的func函数聚合value值, numPartitions设置分区数,提高作业并行度
      val rdd1 = sc.parallelize(List(("A", 1), ("B", 2), ("A", 2), ("B", 3), ("C", 4)))
      val result = rdd1.reduceByKey(_+_)

      输出:

      (A,3)
      (B,5)
      (C,4)
    • groupByKey(numPartitions) : 按Key进行分组,返回[K,Iterable[V]],numPartitions设置分区数,提高作业并行度
      val rdd1 = sc.parallelize(List(("A", 1), ("B", 2), ("A", 2), ("B", 3), ("C", 4)))
      val result = rdd1.groupByKey()

      输出:

      (A,CompactBuffer(1, 2))
      (B,CompactBuffer(2, 3))
      (C,CompactBuffer(4))
    • sortByKey(accending, numPartitions) : 返回以Key排序的(K,V)键值对组成的RDD,accending为true时表示升序,为false时表示降序,numPartitions设置分区数,提高作业并行度 (很明显得功能,不举例了
    • combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine)
      createCombiner : 在第一次遇到Key时创建组合器函数,将RDD数据集中的V类型值转换C类型值(V => C)
      mergeValue : 合并值函数,再次遇到相同的Key时,将createCombiner道理的C类型值与这次传入的V类型值合并成一个C类型值(C,V)=>C
      mergeCombiners : 合并组合器函数,将C类型值两两合并成一个C类型值
      partitioner : 使用已有的或自定义的分区函数,默认是HashPartitioner
      mapSideCombine : 是否在map端进行Combine操作,默认为true
      val people = List(("male", "Mobin"), ("male", "Kpop"), ("female", "Lucy"), ("male", "Lufei"), ("female", "Amy"))
      val rdd = sc.parallelize(people)
      val result = rdd.combineByKey(
        (x: String) => (List(x), 1),
        (peo: (List[String], Int), x : String) => (x :: peo._1, peo._2 + 1),
        (sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1 ::: sex2._1, sex1._2 + sex2._2)
      )

      输出:

      (male,(List(Mobin, Kpop, Lufei),3))
      (female,(List(Lucy, Amy),2))

      过程解析:

      Partition1:
      K="male"  -->  ("male","Mobin")  --> createCombiner("Mobin") =>  peo1 = (  List("Mobin") , 1 )
      K="male"  -->  ("male","Kpop")  --> mergeValue(peo1,"Kpop") =>  peo2 = (  "Kpop"  ::  peo1_1 , 1 + 1 )    //Key相同调用mergeValue函数对值进行合并
      K="female"  -->  ("female","Lucy")  --> createCombiner("Lucy") =>  peo3 = (  List("Lucy") , 1 )
       
      Partition2:
      K="male"  -->  ("male","Lufei")  --> createCombiner("Lufei") =>  peo4 = (  List("Lufei") , 1 )
      K="female"  -->  ("female","Amy")  --> createCombiner("Amy") =>  peo5 = (  List("Amy") , 1 )
       
      Merger Partition:
      K="male" --> mergeCombiners(peo2,peo4) => (List(Lufei,Kpop,Mobin))
      K="female" --> mergeCombiners(peo3,peo5) => (List(Amy,Lucy))

    RDD Action

    Action操作都很简单,就不举例了

    • reduce(func) : 通过函数func先聚集各分区的数据集,再聚集分区之间的数据,func接收两个参数,返回一个新值,新值再做为参数继续传递给函数func,直到最后一个元素
    • collect() : 以数据的形式返回数据集中的所有元素给Driver程序,为防止Driver程序内存溢出,一般要控制返回的数据集大小
    • count() : 返回数据集元素个数
    • first() : 返回数据集的第一个元素
    • saveAsTextFile(path) : 将最终的结果数据保存到指定的目录中
  • 相关阅读:
    sql分页存储过程疑惑:Row_Number与临时表哪个好?
    SQL之剪切
    在sqlserver2005中安装sql server 2000的示例数据库northwind
    安装SQL2005示例数据库
    Firefox浏览器中,Flex的FileReference上传文件,引发IOError
    Java获取方法的调用者
    ABAP screen常见语法
    XPO永久删除记录方法
    XPO的UpCasting
    在ASP.NET项目中使用XPO的最佳准则
  • 原文地址:https://www.cnblogs.com/yZiii/p/9681358.html
Copyright © 2011-2022 走看看