zoukankan      html  css  js  c++  java
  • Spark 学习笔记之 distinct/groupByKey/reduceByKey

    distinct/groupByKey/reduceByKey:

    distinct:

    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.SparkSession
    
    object TransformationsDemo {
      def main(args: Array[String]): Unit = {
        val sparkSession = SparkSession.builder().appName("TransformationsDemo").master("local[1]").getOrCreate()
        val sc = sparkSession.sparkContext
        testDistinct(sc)
      }
    
      private def testDistinct(sc: SparkContext) = {
        val rdd = sc.makeRDD(Seq("aa", "bb", "cc", "aa", "cc"), 1)
        //对RDD中的元素进行去重操作
        rdd.distinct(1).collect().foreach(println)
      }
    }
    

    运行结果:

    groupByKey:

    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.SparkSession
    
    object TransformationsDemo {
      def main(args: Array[String]): Unit = {
        val sparkSession = SparkSession.builder().appName("TransformationsDemo").master("local[1]").getOrCreate()
        val sc = sparkSession.sparkContext
        testGroupByKey(sc)
    
      }
    
      private def testGroupByKey(sc: SparkContext) = {
        val rdd: RDD[(String, Int)] = sc.makeRDD(Seq(("aa", 1), ("bb", 1), ("cc", 1), ("aa", 1), ("cc", 1)), 1)
        //pair RDD,即RDD的每一行是(key, value),key相同进行聚合
        rdd.groupByKey().map(v => (v._1, v._2.sum)).collect().foreach(println)
      }
    }
    

    运行结果:

    reduceByKey:

    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.SparkSession
    
    object TransformationsDemo {
      def main(args: Array[String]): Unit = {
        val sparkSession = SparkSession.builder().appName("TransformationsDemo").master("local[1]").getOrCreate()
        val sc = sparkSession.sparkContext
        testReduceByKey(sc)
    
      }
    
      private def testReduceByKey(sc: SparkContext) = {
        val rdd: RDD[(String, Int)] = sc.makeRDD(Seq(("aa", 1), ("bb", 1), ("cc", 1), ("aa", 1), ("cc", 1)), 1)
        //pair RDD,即RDD的每一行是(key, value),key相同进行聚合
        rdd.reduceByKey(_+_).collect().foreach(println)
      }
    }
    

    运行结果:

    groupByKey与 reduceByKey区别:

    reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义。groupByKey也是对每个key进行操作,但只生成一个sequence。因为groupByKey不能自定义函数,我们需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作。当调用 groupByKey时,所有的键值对(key-value pair) 都会被移动。在网络上传输这些数据非常没有必要。避免使用 GroupByKey。

  • 相关阅读:
    进程-线程-消息队列
    用Ogre实现《天龙八部》场景中水面(TerrainLiquid)详解
    TCP协议三次握手过程分析【图解,简单清晰】
    excel批量删除sql语句
    批量删除指定盘指定格式文件
    Linux命令速查手册(第2版)学习
    List、Map、Set 三个接口,存取元素时,各有什么特点
    HashMap 什么时候进行扩容呢
    两个对象值相同 (x.equals(y) == true),但却可有不同的 hash code,这句话对不对?
    ArrayList,Vector, LinkedList 的存储性能和特性
  • 原文地址:https://www.cnblogs.com/AK47Sonic/p/7784199.html
Copyright © 2011-2022 走看看