zoukankan      html  css  js  c++  java
  • Spark 学习笔记之 union/intersection/subtract

    union/intersection/subtract:

    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.SparkSession
    
    object TransformationsDemo {
      def main(args: Array[String]): Unit = {
        val sparkSession = SparkSession.builder().appName("TransformationsDemo").master("local[1]").getOrCreate()
        val sc = sparkSession.sparkContext
        testUnion(sc)
        testIntersection(sc)
        testSubtract(sc)
    
      }
    
      private def testSubtract(sc: SparkContext) = {
        val rdd1 = sc.parallelize(1 to 3, 1)
        val rdd2 = sc.parallelize(3 to 5, 1)
        //返回在当前RDD中出现,并且不在另一个RDD中出现的元素,不去重。
        rdd1.subtract(rdd2).collect().foreach(println)
        println(s"partitions: ${rdd1.subtract(rdd2, 1).partitions.size}")
        println(s"partitions: ${rdd1.subtract(rdd2, 2).partitions.size}")
    
        val rdd3 = sc.parallelize(List(List(1, 2, 3), List(4, 5, 6)), 1)
        val rdd4 = sc.parallelize(List(List(4, 5, 6), List(7, 8, 9)), 1)
        rdd3.subtract(rdd4).collect().foreach(println)
      }
    
      private def testIntersection(sc: SparkContext) = {
        val rdd1 = sc.parallelize(1 to 2, 1)
        val rdd2 = sc.parallelize(3 to 5, 1)
        //返回两个RDD的交集,并且去重。
        rdd1.intersection(rdd2).collect().foreach(println)
        println(s"partitions: ${rdd1.intersection(rdd2, 1).partitions.size}")
        println(s"partitions: ${rdd1.intersection(rdd2, 2).partitions.size}")
    
        val rdd3 = sc.parallelize(List(List(1, 2, 3), List(4, 5, 6)), 1)
        val rdd4 = sc.parallelize(List(List(4, 5, 6), List(7, 8, 9)), 1)
        rdd3.intersection(rdd4).collect().foreach(println)
      }
    
      private def testUnion(sc: SparkContext) = {
        val rdd1 = sc.parallelize(1 to 3, 1)
        val rdd2 = sc.parallelize(3 to 5, 1)
        //将两个RDD进行合并,不去重。
        rdd1.union(rdd2).collect().foreach(println)
    
        val rdd3 = sc.parallelize(List(List(1, 2, 3), List(4, 5, 6)), 1)
        val rdd4 = sc.parallelize(List(List(4, 5, 6), List(7, 8, 9)), 1)
        rdd3.union(rdd4).collect().foreach(println)
      }
    
    }
    

    运行结果:

  • 相关阅读:
    Mac旧机「焕」新机过程记录
    Swift3.0-字符串和字符
    Swift3.0-基本运算符
    【规范建议】服务端接口返回字段类型与iOS端的解析
    【已解决】iOS11使用MJRefresh上拉加载结束tableView闪动、跳动的问题
    標準メッセージクラス
    BAPI:会計管理(FI&CO)
    BAPI:販売管理(SD)
    BAPI:生産管理(PP)
    BAPI:購買管理(MM)
  • 原文地址:https://www.cnblogs.com/AK47Sonic/p/7776380.html
Copyright © 2011-2022 走看看