zoukankan      html  css  js  c++  java
  • Spark—RDD编程常用转换算子代码实例

    Spark—RDD编程常用转换算子代码实例

    Spark rdd 常用 Transformation 实例:

    1、def map[U: ClassTag](f: T => U): RDD[U]   将函数应用于RDD的每一元素,并返回一个新的RDD

    package top.ruandb
    import org.apache.spark.{SparkConf, SparkContext}
    object RddTest extends App{
      val sparkConf = new  SparkConf().setAppName("RddTest").setMaster("local[2]")
      val sc = new SparkContext(sparkConf)
      //map
      var source = sc.parallelize(1 to 10)
      source.collect().foreach(e=>print(e+","))//1 2 3 4 5 6 7 8 9 10
      var sourceMap = source.map(_*10)
      sourceMap.collect().foreach(e=>print(e+","))//10 20 30 40 50 60 70 80 90 100
    }

    2、def filter(f: T => Boolean): RDD[T] 通过提供的产生boolean条件的表达式来返回符合结果为True新的RDD

    //filter
      var source = sc.parallelize(1 to 10)
      source.collect().foreach(e=>print(e+" "))//1 2 3 4 5 6 7 8 9 10
      var sourceMap = source.filter(_.<(5))
      sourceMap.collect().foreach(e=>print(e+" "))//1 2 3 4

    3、def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]   将函数应用于RDD中的每一项,对于每一项都产生一个集合,并将集合中的元素压扁成一个集合。

    //flatMap
      var source = sc.parallelize(1 to 5)
      source.collect().foreach(e=>print(e+" "))//1 2 3 4 5 
      var sourceMap = source.flatMap(x=>(1 to x))
      sourceMap.collect().foreach(e=>print(e+" "))//1 1 2 1 2 3 1 2 3 4 1 2 3 4 5

    4、def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]    将函数应用于RDD的每一个分区,每一个分区运行一次,函数需要能够接受Iterator类型,然后返回Iterator。

    package top.ruandb
    import org.apache.spark.{SparkConf, SparkContext}
    object RddTest {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("RddTest").setMaster("local[2]")
        val sc = new SparkContext(sparkConf)
        //mapPartitions
        var source = sc.parallelize(List(("lucy", "female"), ("jack", "male"), ("jams", "male")))
        source.collect().foreach(e => print(e + " "))//(lucy,female) (jack,male) (jams,male)
        var sourceMap = source.mapPartitions(partitionsFun)
        sourceMap.collect().foreach(e => print(e + " ")) //jams jack
      }
      def partitionsFun(iter:Iterator[(String,String)]): Iterator[String] ={
        var males = List[String]()
        while(iter.hasNext){
          val next = iter.next()
          next match {
            case (_,"male") => males = next._1::males
            case _ =>
          }
        }
        return males.iterator
      }
    }

    5、def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]  将函数应用于RDD中的每一个分区,每一个分区运行一次,函数能够接受 一个分区的索引值 和一个代表分区内所有数据的Iterator类型,需要返回Iterator类型。

    package top.ruandb
    import org.apache.spark.{SparkConf, SparkContext}
    object RddTest {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("RddTest").setMaster("local[2]")
        val sc = new SparkContext(sparkConf)
        //mapPartitionsWithIndex
        var source = sc.parallelize(List(("lucy", "female"), ("jack", "male"), ("jams", "male")))
        source.collect().foreach(e => print(e + " "))//(lucy,female) (jack,male) (jams,male)
        var sourceMap = source.mapPartitionsWithIndex(partitionsFunWithIndex)
        sourceMap.collect().foreach(e => print(e + " ")) //[1]jams [1]jack
      }
      def partitionsFunWithIndex(index:Int,iter:Iterator[(String,String)]): Iterator[String] ={
        var males = List[String]()
        while(iter.hasNext){
          val next = iter.next()
          next match {
            case (_,"male") => males="["+index+"]"+next._1 :: males
            case _ =>
          }
        }
        males.iterator
      }
    }

    6、def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] 在RDD中移seed为种子返回大致上有fraction比例个数据样本RDD,withReplacement表示是否采用放回式抽样。

    package top.ruandb
    import org.apache.spark.{SparkConf, SparkContext}
    object RddTest {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("RddTest").setMaster("local[2]")
        val sc = new SparkContext(sparkConf)
        //sample
        var source = sc.parallelize(1 to 10)
        source.collect().foreach(e => print(e + " "))//1 2 3 4 5 6 7 8 9 10
        var sourceMap = source.sample(true,0.4,2)
        sourceMap.collect().foreach(e => print(e + " ")) //1 2 2
      }
    }

    7、def union(other: RDD[T]): RDD[T]  将两个RDD中的元素进行合并,返回一个新的RDD

    //union
    var source = sc.parallelize(1 to 3)
    source.collect().foreach(e => print(e + " "))//1 2 3 
    var rdd = sc.parallelize(6 to 9)
    var sourceMap = source.union(rdd)
    sourceMap.collect().foreach(e => print(e + " "))//1 2 3 6 7 8 9

    8、def intersection(other: RDD[T]): RDD[T]  将两个RDD做交集,返回一个新的RDD

    //intersection
    var source = sc.parallelize(1 to 8)
    source.collect().foreach(e => print(e + " "))//1 2 3 4 5 6 7 8
    var rdd = sc.parallelize(6 to 9)
    var sourceMap = source.intersection(rdd)
    sourceMap.collect().foreach(e => print(e + " "))//6 8 7

    9、def distinct(): RDD[T]  将当前RDD进行去重后,返回一个新的RDD

    //distinct
    var source = sc.parallelize(List(1,1,2,2,3,3,4,4,5,5))
    source.collect().foreach(e => print(e + " "))//1 1 2 2 3 3 4 4 5 5 
    var sourceMap = source.distinct()
    sourceMap.collect().foreach(e => print(e + " "))//4 2 1 3 5

    10、def partitionBy(partitioner: Partitioner): RDD[(K, V)]  根据设置的分区器重新将RDD进行分区,返回新的RDD

    //partitionBy
    var source = sc.parallelize(List((1,"111"),(2,"222"),(3,"333"),(4,"444")),4)
    source.collect().foreach(e => print(e + " "))
    print("分区数:"+source.partitions.size)//(1,111) (2,222) (3,333) (4,444) 分区数:4
    var sourceMap = source.partitionBy(new HashPartitioner(2))
    sourceMap.collect().foreach(e => print(e + " "))
    print("分区数:"+sourceMap.partitions.size)//(2,222) (4,444) (1,111) (3,333) 分区数:2

    11、def reduceByKey(func: (V, V) => V): RDD[(K, V)]   根据Key值将相同Key的元组的值用func进行计算,返回新的RDD

    //reduceByKey
    var source = sc.parallelize(List(("hello",1),("world",1),("hello",1),("world",1)))
    source.collect().foreach(e => print(e + " "))//(hello,1) (world,1) (hello,1) (world,1)
    var sourceMap = source.reduceByKey((x,y)=>x+y)
    sourceMap.collect().foreach(e => print(e + " "))//(hello,2) (world,2)

    12、def groupByKey(): RDD[(K, Iterable[V])]   将相同Key的值进行聚集,输出一个(K, Iterable[V])类型的RDD

    //groupByKey
    var source = sc.parallelize(List(("hello",1),("world",1),("hello",1),("world",1)))
    source.collect().foreach(e => print(e + " "))//(hello,1) (world,1) (hello,1) (world,1)
    var sourceMap = source.groupByKey()
    sourceMap.collect().foreach(e => print(e + " "))//(hello,CompactBuffer(1, 1)) (world,CompactBuffer(1, 1))

    13、def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]   根据key分别使用CreateCombiner和mergeValue进行相同key的数值聚集,通过mergeCombiners将各个分区最终的结果进行聚集。

    package top.ruandb
    import org.apache.spark.{ SparkConf, SparkContext}
    object RddTest {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("RddTest").setMaster("local[2]")
        val sc = new SparkContext(sparkConf)
        //combineByKey 计算平均成绩
        var scores = Array(("lucy", 89), ("jack", 77), ("lucy", 100), ("james", 65), ("jack", 99), 
          ("james", 44))
        var input = sc.parallelize(scores);
        input.collect().foreach(e => print(e + " "))
        //(lucy,89) (jack,77) (lucy,100) (james,65) (jack,99) (james,44)
        var output = input.combineByKey((v) => (v, 1),
            (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
            (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
        output.collect().foreach(e => print(e + " "))//(james,(109,2)) (jack,(176,2)) (lucy,(189,2))
        var result = output.map{case (key,value) => (key,value._1/value._2.toDouble)}
        result.collect().foreach(e => print(e + " "))//(james,54.5) (jack,88.0) (lucy,94.5)
      }
    }

    14、def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,

        combOp: (U, U) => U): RDD[(K, U)]   通过seqOp函数将每一个分区里面的数据和初始值迭代带入函数返回最终值,comOp将每一个分区返回的最终值根据key进行合并操作。

  • 相关阅读:
    土地出让金骤降是“危”还是“机”?
    EasyMonkeyDevice vs MonkeyDevice&amp;HierarchyViewer API Mapping Matrix
    Keepalived+Lvs+Mysql主主复制
    【SDUT 3038】迷之博弈
    js面向对象编程:if中能够使用那些作为推断条件呢?
    crm使用url打开窗口视图
    让你的javascript函数拥有记忆功能,降低全局变量的使用
    crm操作知识库文章实体
    技术揭秘12306改造(一):尖峰日PV值297亿下可每秒出票1032张-CSDN.NET
    大众点评的大数据实践-CSDN.NET
  • 原文地址:https://www.cnblogs.com/jnba/p/10830515.html
Copyright © 2011-2022 走看看