zoukankan      html  css  js  c++  java
  • Action算子

    import org.apache.spark.rdd.RDD
    
    /*
    action算子(行动算子) --> 是触发转换算子计算
    一个action算子触发,就会产生job
    action算子的返回值基本上就不是RDD,所以在action算子后面在触发计算,就需要区分计算的数据了
    
     */
    object ActionFunctionDemo{
      def main(args: Array[String]): Unit = {
           val sc = CreateSparkContext.createSparkContxt("ActionFunctionDemo","local[*]")
           //1.计算求和 reduce(多用于没有key的情况下进行求和)
           val rdd1 = sc.parallelize(List(2,1,3,6,5),2)
           val rdd1_1:Int = rdd1.reduce(_+_)
           println(rdd1_1)
    
          //2.将RDD中数据转换Scala中数组存储[不可变数组]
          println(rdd1.collect.toBuffer)
    
        //3.返回RDD的元素个数
         println(rdd1.count())
    
        //4. top 取出对应数量值,默认降序,若输入0,返回一个空数组[数组是一个不可变数据]
        println(rdd1.top(3).toBuffer)
    
        //5.take  顺序取出对应数量值 返回也是一个数组[是一个不可变数组]
          println( rdd1.take(3).toBuffer)
    
        //6.takeOrdered 顺序取出对应数量的值, 默认是升序 返回的是一个数组[并不可变数组]
        println(rdd1.takeOrdered(3).toBuffer)
    
        //7.first  取出第一个值 等价于  take(1)  取值等价,不等价返回值
          println(rdd1.first())
    
         //8.对输出数据进行处理,将数据写成文件
          //rdd1.saveAsTextFile("文件存储路径[本地/hdfs]")
    
        //9 countBykey --> 统计key的个数并生成一个Map k是key的名字 v 是key的个数
          val rdd2 = sc.parallelize(List(("key1",2),("key2",2)))
          val col: collection.Map[String, Long] = rdd2.countByKey()
         //10.遍历算子 跟scala中没有区别
          rdd1.foreach(x => println(x))
    
    
        /*其他算子*/
      // 统计value的个数(但是会将集合中的一个元素看做是一个value并统计其个数)--> Action算子
        val tupleToLong: collection.Map[(String, Int), Long] = rdd2.countByValue()
        
        //filterByRange  对RDD中元素进行过滤,但是指定范围内的数据(包括开始位置和结束位置)  -> 转换算子
        val rdd3 = sc.parallelize(List(("e",3),("c",5),("d",4),("c",2),("a",1)))
        val rdd3_1: RDD[(String, Int)] = rdd3.filterByRange("c","e")
        
        //flatMapValue对应是kv对中的value值进行扁平化处理    --> 转换算子
        val rdd3_2 = sc.parallelize(List(("a","1 2"),("b","3 4")))
        //这个算子对应的是value值
          val rdd3_3: RDD[(String, String)] = rdd3_2.flatMapValues(_.split(" "))
        
        //必须记住 foreachPartition   --> Action
        // 循环处理分区内数据, 一般是用于数据持久化,即存入数据库中
        val rdd4 = sc.parallelize(List(1,2,3,4,5,6),2)
        rdd4.foreachPartition(x => println(x.reduce(_+_)))  
    
        //keyBy 以传入的函数作为key,RDD中元组作为value ,返回一个新的元组  --> 转换算子
        val rdd5 = sc.parallelize(List("dog","cat"),3)
        val rdd5_1: RDD[(Int, String)] = rdd5.keyBy(_.length)
    
        //必须记住  collectAsMap  --> 将RDD中对偶元组(二元组) 转换为一个Map --> Action算子
        val stringToInt: collection.Map[String, Int] = rdd2.collectAsMap()
        
        //转换算子 --> keys所有key值  values所有value值  --> RDD中对偶元组 
        
    
    
    
    
    
    
      }
    }
    

      

  • 相关阅读:
    Elasticsearch-PHP 索引操作2
    Elasticsearch-PHP 索引操作
    Linux系统编程1_C标准函数库和系统调用
    Git命令_git commit
    Git命令_git log
    Linux27_配置samba
    计算机网络12_整理
    理解操作系统8——字符设备与块设备
    网站开发基础知识3_会话和cookie
    网站开发基础知识2_前后端分离
  • 原文地址:https://www.cnblogs.com/yumengfei/p/12030611.html
Copyright © 2011-2022 走看看