zoukankan      html  css  js  c++  java
  • Spark RDD

    scala> val rdd1 = sc.parallelize(List(63,45,89,23,144,777,888))
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:15
    
    查看该RDD的分区数量
    scala> rdd1.partitions.length
    res0: Int = 1
    
    创建时指定分区数量
    scala> val rdd1 = sc.parallelize(List(63,45,89,23,144,777,888),3)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:15
    查看分区数量
    scala> rdd1.partitions.length
    res1: Int = 3
    
    

    map/filter

    scala> val rdd1 = sc.parallelize(List(1,2,100,3,4))
    scala> val rdd2 = rdd1.map(x => x*2).collect
    rdd2: Array[Int] = Array(2, 4, 200, 6, 8)
    从小到大
    scala> val rdd3 = rdd1.map(_*2).sortBy(x =>x,true).collect
    rdd3: Array[Int] = Array(2, 4, 6, 8, 200)
    从大到小
    scala> val rdd3 = rdd1.map(_*2).sortBy(x =>x,false).collect
    rdd3: Array[Int] = Array(200, 8, 6, 4, 2)
    
    过滤出大于等于50的元素
    scala> val rdd1 = sc.parallelize(List(1,2,100,3,4))
    scala> val rdd2 = rdd1.filter(_>50).collect
    rdd2: Array[Int] = Array(100)
    scala> val rdd2 = rdd1.filter(x => x>50).collect
    rdd2: Array[Int] = Array(100)
    过滤出偶数
    scala> val rdd2 = rdd1.filter(_%2==0).collect
    rdd2: Array[Int] = Array(2, 100, 4)
    
    

    map/flatMap

    scala> val rdd1 = sc.parallelize(Array("w u","j i a","d o n g"))
    按空格分隔
    scala> rdd1.map(_.split(" ")).collect
    res28: Array[Array[String]] = Array(Array(w, u), Array(j, i, a), Array(d, o, n, g))
    分隔并压平
    scala> val rdd2 = rdd1.flatMap(_.split(" "))
    scala> rdd2.collect
    res5: Array[String] = Array(w, u, j, i, a, d, o, n, g)
    
    scala> val rdd1 = sc.parallelize(List(List("w u","j i a","d o n g"),List("j i a n g","r u i")))
    scala> val rdd2 = rdd1.map(_.map(_.split(" "))).collect
    rdd2: Array[List[Array[String]]] = Array(List(Array(w, u), Array(j, i, a), Array(d, o, n, g)), List(Array(j, i, a, n, g), Array(r, u, i)))
    
    scala> val rdd2 = rdd1.map(_.flatMap(_.split(" "))).collect
    rdd2: Array[List[String]] = Array(List(w, u, j, i, a, d, o, n, g), List(j, i, a, n, g, r, u, i))
    
    scala> val rdd2 = rdd1.flatMap(_.flatMap(_.split(" "))).collect
    rdd2: Array[String] = Array(w, u, j, i, a, d, o, n, g, j, i, a, n, g, r, u, i)
    

    union/intersecttion/distinct

    scala> val rdd1 = sc.parallelize(List(1,2,3,4))
    scala> val rdd2 = sc.parallelize(List(5,6,4,3))
    求并集
    scala> val rdd3 = rdd1.union(rdd2)
    rdd3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 4, 3) 
    求交集
    scala> val rdd4 = rdd1.intersection(rdd2).collect
    rdd4: Array[Int] = Array(4, 3)
    去重
    scala> val rdd5 = rdd3.distinct
    rdd5: Array[Int] = Array(1, 2, 3, 4, 5, 6)
    

    sortBy

    rdd1里的每个元素乘以2,然后排序
    scala> val rdd1 = sc.parallelize(List(1,2,100,3,4))
    scala> val rdd2 = rdd1.map(_*2).sortBy(x => x,true)//为什么sortBy里面用下划线不行?
    scala> rdd2.collect
    res21: Array[Int] = Array(2, 4, 6, 8, 200)
    
    注意区别一下两种情况,根据=>右边的情况sort
    scala> rdd1.sortBy(x=>x,true).collect
    res22: Array[Int] = Array(1, 2, 3, 4,100)
    
    rdd1.sortBy(x=>100-x,true).collect
    res23: Array[Int] = Array(100, 4, 3, 2, 1)
    

    groupByKey/groupBy/reduceByKey/sortByKey

    scala> val rdd1 = sc.parallelize(Array(("class1",50),("class2",80),("class2",70),("class1",90)))
    rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[29] at parallelize at <console>:15
    scala> val rdd2 = rdd1.groupByKey()
    rdd2: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[30] at groupByKey at <console>:17
    scala> val rdd2 = rdd1.groupByKey() 大专栏  Spark RDD.collect
    rdd2: Array[(String, Iterable[Int])] = Array((class1,CompactBuffer(50, 90)), (class2,CompactBuffer(80, 70)))
    scala> rdd2.foreach(score => {println(score._1);score._2.foreach(singlescore => println(singlescore))})
    class1
    50
    90
    class2
    80
    70
    scala> val rdd5 = sc.parallelize(Array(("class1",50),("class2",80),("class2",70),("class1",50)))
    rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[47] at parallelize at <console>:24
    scala> val rdd2=rdd1.groupBy(x=>x._2).collect
    rdd2: Array[(Int, Iterable[(String, Int)])] = Array((80,CompactBuffer((class2,80))), (50,CompactBuffer((class1,50), (class1,50))), (70,CompactBuffer((class2,70))))
    
    scala> val rdd1 = sc.parallelize(Array(("class1",50),("class2",80),("class2",70),("class1",90)))
    rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[32] at parallelize at <console>:15
    scala> val rdd2 = rdd1.reduceByKey(_+_).collect
    rdd2: Array[(String, Int)] = Array((class1,140), (class2,150))
    
    scala> val rdd1 = sc.parallelize(List(("tom",1),("jerry",2),("kitty",3)))
    scala> val rdd2 = sc.parallelize(List(("jerry",9),("tom",8),("shuke",7)))
    scala> val rdd3 = rdd1.union(rdd2)
    key进行聚合
    scala> val rdd4 = rdd3.reduceByKey(_+_)
    scala> rdd4.collect
    res23: Array[(String, Int)] = Array((tom,9), (jerry,11), (shuke,7), (kitty,3))
    value的降序排序
    scala> val rdd5 = rdd4.map(t=>(t._2,t._1)).sortByKey(false).map(t=>(t._2,t._1))
    scala> rdd5.collect
    res24: Array[(String, Int)] = Array((jerry,11), (tom,9), (shuke,7), (kitty,3))
    
    
    
    scala> val rdd1 = sc.parallelize(Array(("class1",50),("class2",80),("class2",70),("class1",90)))
    rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:15
    scala> val rdd2 = rdd1.sortByKey().collect
    rdd2: Array[(String, Int)] = Array((class1,50), (class1,90), (class2,80), (class2,70))
    scala> rdd2.foreach(score => println(score._1+":"+score._2))
    class1:50
    class1:90
    class2:80
    class2:70
    

    join/leftOuterJoin/rightOuterJoin/union

    顺序影响结果
    scala> val rdd1 = sc.parallelize(List(("tom",1),("jerry",2),("kitty",3)))
    scala> val rdd2 = sc.parallelize(List(("jerry",9),("tom",8),("shuke",7)))
    scala> val rdd3 = rdd1.join(rdd2).collect
    rdd3: Array[(String, (Int, Int))] = Array((tom,(1,8)), (jerry,(2,9)))
    scala> rdd2.join(rdd1).collect
    res5: Array[(String, (Int, Int))] = Array((tom,(8,1)), (jerry,(9,2)))
    
    scala> val rdd2 = sc.parallelize(List(("jerry",9),("tom",8),("shuke",7),("tom",2)))
    scala> val rdd3 = rdd1.join(rdd2).collect
    rdd3: Array[(String, (Int, Int))] = Array((tom,(1,8)), (tom,(1,2)), (jerry,(2,9)))
    
    scala> val rdd3 = rdd1.leftOuterJoin(rdd2).collect
    rdd3: Array[(String, (Int, Option[Int]))] = Array((tom,(1,Some(8))), (tom,(1,Some(2))), (jerry,(2,Some(9))), (kitty,(3,None)))
    scala> val rdd3 = rdd1.rightOuterJoin(rdd2).collect
    rdd3: Array[(String, (Option[Int], Int))] = Array((tom,(Some(1),8)), (tom,(Some(1),2)), (jerry,(Some(2),9)), (shuke,(None,7)))
    
    scala> val rdd3 = rdd1.union(rdd2).collect
    rdd3: Array[(String, Int)] = Array((tom,1), (jerry,2), (kitty,3), (jerry,9), (tom,8), (shuke,7), (tom,2))
    
    scala> val rdd3 = rdd1.union(rdd2)
    scala> val rdd4 = rdd3.groupByKey
    scala> rdd4.collect
    res11: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(1, 8, 2)), (jerry,CompactBuffer(2, 9)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3)
    求每个单词出现的次数
    scala> val rdd5 = rdd3.groupByKey.map(x=>(x._1,x._2.sum))
    scala> rdd5.collect
    res12: Array[(String, Int)] = Array((tom,11), (jerry,11), (shuke,7), (kitty,3))
    scala> rdd3.groupByKey.mapValues(_.sum).collect
    res14: Array[(String, Int)] = Array((tom,11), (jerry,11), (shuke,7), (kitty,3))
    
  • 相关阅读:
    文件高级应用和函数基础
    字符编码,文件操作
    数据类型分类,深浅拷贝
    容器数据类型内置方法
    数字类型和字符串类型内置方法
    流程控制循环
    python 运算和流程控制
    【MySQL】SQL教程
    【MySQL】数据库字段类型
    【java】HashSet
  • 原文地址:https://www.cnblogs.com/lijianming180/p/12041549.html
Copyright © 2011-2022 走看看