zoukankan      html  css  js  c++  java
  • Spark- Transformation实战

    RDD的算子分为两类,是 Trans formation(Lazy),一类是 Action(触发任务执行
    RDD不存在真正要计算的数据,而是记录了RDD的转换关系(调用了什么方法,传入什么函数)

    RDD的 Trans formation的特点
    1. lazy
    2.生成新的RDD

    package cn.rzlee.spark.core
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object TransformationOperation {
      def main(args: Array[String]): Unit = {
    
      //map()
      //filter()
      //flatMap()
        // groupByKey()
      //reduceByKey()
        //sortByKey()
        join()
      }
    
    
      // 将集合中每个元素乘以2
      def map(){
        val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[1]")
        val sc = new SparkContext(conf)
    
        val numbers = Array(1,2,3,4,5)
        val numberRDD: RDD[Int] = sc.parallelize(numbers,1)
        numberRDD.foreach(num=>println(num))
    
      }
      
      // 过滤出集合中的偶数
      def filter(): Unit ={
        val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[1]")
        val sc = new SparkContext(conf)
    
        val numbers = Array(1,2,3,4,5)
        val numberRDD: RDD[Int] = sc.parallelize(numbers,1)
        val evenNumbersRdd = numberRDD.filter(num=>num%2==0)
        evenNumbersRdd.foreach(num=>println(num))
      }
    
    
      // 将行拆分为单词
      def flatMap(): Unit ={
        val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[1]")
        val sc = new SparkContext(conf)
    
    
        val lineArray = Array("hello you", "just do it", "go go go")
        val lines = sc.parallelize(lineArray, 1)
        val words: RDD[String] = lines.flatMap(line=>line.split(" "))
        words.foreach(word=>println(word))
      }
    
    
      // 将每个班级的成绩进行分组
      def groupByKey(): Unit ={
        val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[1]")
        val sc = new SparkContext(conf)
        val scoresList = Array(Tuple2("class1", 50), Tuple2("class1", 95), Tuple2("class2", 60), Tuple2("class2", 88))
        val scores: RDD[(String, Int)] = sc.parallelize(scoresList, 1)
        val groupedScoreds = scores.groupByKey()
        groupedScoreds.foreach(scored=>{
          println(scored._1)
          scored._2.foreach(singleScore=>println(singleScore))
          println("=====================================")
        })
      }
    
      // 统计每个班级的总分
      def reduceByKey(): Unit ={
        val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[1]")
        val sc = new SparkContext(conf)
    
        val scoresList = Array(Tuple2("class1", 50), Tuple2("class1", 95), Tuple2("class2", 60), Tuple2("class2", 88))
        val scores: RDD[(String, Int)] = sc.parallelize(scoresList, 1)
        val totalScores: RDD[(String, Int)] = scores.reduceByKey(_+_)
        totalScores.foreach(totalScore=>println(totalScore._1 +" : " + totalScore._2))
    
      }
    
    
    
    
      //将学生分数进行排序
      def sortByKey(): Unit ={
        val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[1]")
        val sc = new SparkContext(conf)
        val scoreList = Array(Tuple2(90,"leo"), Tuple2(99, "kent"), Tuple2(80,"Jeo"), Tuple2(91,"Ben"), Tuple2(96,"Sam"))
        val scores: RDD[( Int,String)] = sc.parallelize(scoreList, 1)
        val sortedScores = scores.sortByKey(false)
        sortedScores.foreach(student=>println(student._2 +" : " + student._1))
      }
    
      // 打印每个学生的成绩
      def join(): Unit ={
        val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[1]")
        val sc = new SparkContext(conf)
        
        val studentsList = Array(Tuple2(1,"leo"), Tuple2(2, "Sam"), Tuple2(3, "kevin"))
        val scoresList = Array(Tuple2(1,60), Tuple2(2,70), Tuple2(3,80))
    
        val students: RDD[(Int, String)] = sc.parallelize(studentsList,1)
        val scores: RDD[(Int, Int)] = sc.parallelize(scoresList,1)
        val studentScores: RDD[(Int, (String, Int))] = students.join(scores)
        studentScores.foreach(studentScore=>{
          println("studentid: "+studentScore._1)
          println("studentNmae:"+studentScore._2._1)
          println("studentScore: "+ studentScore._2._2)
          println("###################################################")
        })
      }
    

    // 打印每个学生的成绩
    // cogroup相当于full join
    def cogroup(): Unit ={
    val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[1]")
    val sc = new SparkContext(conf)

    val studentsList = Array(Tuple2(1,"leo"), Tuple2(2, "Sam"), Tuple2(3, "kevin"))
    val scoresList = Array(Tuple2(1,60), Tuple2(2,70), Tuple2(3,80))

    val students: RDD[(Int, String)] = sc.parallelize(studentsList,1)
    val scores: RDD[(Int, Int)] = sc.parallelize(scoresList,1)

    val studentScores: RDD[(Int, (Iterable[String], Iterable[Int]))] = students.cogroup(scores)
    studentScores.foreach(studentScore =>{
    println("studentid: " + studentScore._1)
    println("studentname: "+ studentScore._2._1)
    println("studentscore: "+ studentScore._2._2)

    })
    }

    #union求并集,注意类型要一致

    val rdd6 = sc.parallelize(List(5,6,4,7))
    val rdd7 = sc.parallelize(List(1,2,3,4))
    val rdd8 = rdd6.union(rdd7)
    rdd8.distinct.sortBy(x=>x).collect
    

    #intersection求交集

    val rdd9 = rdd6.intersection(rdd7)
    

    #join(连接)  注意按照key相join

    val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3)))
    val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7), ("tom", 2)))
    
    
    val rdd3 = rdd1.join(rdd2)
    val rdd3 = rdd1.leftOuterJoin(rdd2)
    val rdd3 = rdd1.rightOuterJoin(rdd2)
    

    #cogroup 有点像全外连接

        // cogroup 

    val rdd1 = sc.parallelize(List(("tom", 1), ("tom", 2), ("jerry", 3), ("kitty", 2)))
    val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))

    val rdd3 = rdd1.cogroup(rdd2)

    println(rdd3.collect().toBuffer)

    #cartesian笛卡尔积

    val rdd1 = sc.parallelize(List("tom", "jerry"))
    val rdd2 = sc.parallelize(List("tom", "kitty", "shuke"))
    val rdd3 = rdd1.cartesian(rdd2)
    
  • 相关阅读:
    从GoogleClusterData统计每个用户的使用率、平均每次出价
    简单的大众点评爬虫
    导入GoogleClusterData到MySQL
    高斯分布(正态分布)
    解决Mysql无法导入存在null数据的问题
    使用Python操作MySQL
    [Vue warn]: Duplicate keys detected: '0'. This may cause an update error.
    css多行超出时,超出高度,显示省略号
    mock.js学习之路(二)easy-mock(Vue中使用)
    mock.js学习之路一(Vue中使用)
  • 原文地址:https://www.cnblogs.com/RzCong/p/9893406.html
Copyright © 2011-2022 走看看