zoukankan      html  css  js  c++  java
  • 简单算子演示

    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    /**
      * 简单算子演示
      */
    object FunctionDemo1 {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("CreateRDD").setMaster("local")
        val sc = new SparkContext(conf)
    
        val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,3,7,8,9))
    
        //1.map:对RDD中每一个元素进行遍历并加以计算,返回一个全新RDD
        val rdd2: RDD[Int] = rdd.map(_ * 2)
        println(rdd2.collect().toBuffer)
    
        //2.filter:对RDD中每一个元素执行Boolean类型表达式,结果为ture 值反值存储到新的RDD中
        val rdd3: RDD[Int] = rdd2.filter(_ > 10)
        println(rdd3.collect().toBuffer)
    
        //3.flatMap:对RDD中存在集合进行压平处理,将集合内部的数据取出存储到一个全新的RDD中
        val rdd4 = sc.parallelize(Array("a b c","b c d"))
        val rdd5: RDD[String] = rdd4.flatMap(_.split(" "))
        println(rdd5.collect().toBuffer)
    
        //4.sample:随机抽样
        //抽样只能在一个范围内返回 ,但是范围会有一定的波动
        //参数说明
        /*
          withReplacement: Boolean, 表示抽取出数据是否返回原有样例中  true这个值会被放回抽样中 false 不会放回
          fraction: Double,  抽样比例 即 抽取30%  写入值就是 0.3(本身Double就是一个不精确数据)
          seed: Long = Utils.random.nextLong 种子, 随机获取数据的方式 ,默认不传
         */
        val rdd5_1 = sc.parallelize(1 to 10)
        val sample: RDD[Int] = rdd5_1.sample(false,0.3)
        println(sample.collect().toBuffer)
    
        //5.union:并集
        val rdd6 = sc.parallelize(List(5,6,7,8))
        val rdd7 = sc.parallelize(List(1,2,5,6))
        val rdd8 = rdd6  union rdd7
        println( rdd8.collect.toBuffer)
    
        //6.intersection:求交集
        val rdd9: RDD[Int] = rdd6 intersection rdd7
        println( rdd9.collect.toBuffer)
    
        //7.distinct:去重复
        println(rdd8.distinct.collect.toBuffer)
    
        //8.join:相同key才会被合并,没有相同的key将被舍弃掉
        val rdd10_1 = sc.parallelize(List(("tom",1),("jerry",3),("kitty",2)))
        val rdd10_2 = sc.parallelize(List(("jerry",2),("tom",2),("dog",10)))
        val rdd10: RDD[(String, (Int, Int))] = rdd10_1 join rdd10_2
        println(rdd10.collect().toBuffer)
    
        //9.LeftOuterJoin/rightOuterJoin:左连接/右连接
        //无论是左连接还是右连接,除了基本值外 ,剩余值的数据类型是Option类型
        val rdd10_4 = rdd10_1 leftOuterJoin rdd10_2
        val rdd10_5 = rdd10_1 rightOuterJoin rdd10_2
        println(rdd10_4.collect.toList)
        println(rdd10_5.collect.toList)
    
        //10.cartesian:笛卡尔积
        val rdd11_1 = sc.parallelize(List(("tom",1),("jerry",3),("kitty",2)))
        val rdd11_2 = sc.parallelize(List(("jerry",2),("tom",2),("dog",10)))
        val rdd11_3 = rdd11_1 cartesian rdd11_2
        println(rdd11_3.collect.toList)
    
        //11.分组
        val rdd11_4= sc.parallelize(List(("tom",1),("jerry",3),("kitty",2),("tom",2)))
        //11.1 根据传入的参数进行分组
        val rdd11_5: RDD[(String, Iterable[(String, Int)])] = rdd11_4.groupBy(_._1)
        //11.2 根据key进行分区(对KV形式是使用) -->除了指定分组之后分区的数量之外, 还可以使用自定义分区器
        val rdd11_6: RDD[(String, Iterable[Int])] = rdd11_4.groupByKey()
        //11.3 cogroup根据key进行分组(分组必须是一个对偶元组)
        val rdd11_7: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd11_1 cogroup rdd11_2
        println(rdd11_7.collect.toBuffer)
    
        /*
      ps:当前方法和groupByKey都可以对数据进行分组,但是,groupByKey会将相同key的值(value)存储在一起(一个集合)
      cogroup  参数是另外一个要合并分组的RDD(必须是对偶元组),根据相同key进行额分组,但是value不会存在一个集合中
     */
    
    
      }
    
    }
    

      

  • 相关阅读:
    .netCore读取配置文件
    初识.netCore以及如何vs2019创建项目和发布
    深度解析.NetFrameWork/CLR/C# 以及C#6/C#7新语法
    Asp.Net六大内置对象
    MVC的View本质和扩展
    Asp.net管道模型之(HttpModules 和 HttpHandler)
    Serf:Gossip Protocol
    Consul:ANTI-ENTROPY
    Consul:网络坐标
    Consul:Gossip协议
  • 原文地址:https://www.cnblogs.com/yumengfei/p/12030433.html
Copyright © 2011-2022 走看看