zoukankan      html  css  js  c++  java
  • RDDs的基本操作

    RDDs的基本特性

      1.延迟计算

        Spark对RDDs的计算是当他们第一次使用Action操作的时候。这种方式在处理大数据时很有用,可以减少数据的传输。

        Spark内部记录了metadata表来表明transformation操作是否已经被相应,metadata中只记录已经被相应的Transformation操作  

        加载数据也是延迟计算,数据只有在必要的时候才会被加载进去

      2.持久化

        RDD.persist()讲前面产生的RDD进行缓存,当再次使用时不需要进行前面产生该RDD的一系列操作,而直接在缓存中调用这个RDD即可

    RDDs的基本操作

    1.Transformation(转换)

      从之前的RDD构建新的RDD,如map()和filter()操作都属于Transformation

      (1)map()

        map()接收一个函数,把函数应用到RDD的每个元素,返回新的RDD

      val lines=sc.parallelize(Array("hello","Spark","hello","World"),4)
      val lines2=sc.map(line=>(line,1))    #这里会将所有的元素都和1组成新rdd的元素
      lines.collect().foreach(println)
      output:  (hello,1)
            (Spark,1)
            (hello,1)
            (World,1)

      (2)filter()

        filter()接收函数,返回只包含满足filter()函数元素的新RDD

      val lines3=lines.filter(line=>line.contains("hello"))  #这里只留下包含hello的
      lines3.collect().foreach(println)
      output:    (hello,1)
                  (hello,1)

      (3)flatMap()

        flatMap()对每个输入元素,输出多个元素。将RDD中元素压扁后返回一个新的RDD。

        首先我们本地存在一个文件HelloSpark.txt,文件内容如下:

              

        第一步我们将文件内容以每个元素换行方式打印,我们可以看出初始时从文件中读取的文件将内部内容分为”Hello Spark !“和”Are you ok ?“两个元素。

         val text_rdd=sc.textFile("/home/yu/data/HelloSpark.txt")
         text_rdd.collect()/foreach(println)
         out:   Hello Spark !
              Are you ok ?

        接下来我们将文件使用flatmap()函数进行处理后压扁:

        val flatmap_rdd=text_rdd.flatMap(line=>line.split(" "))
        flatmap_rdd.collect().foreach(print)
        flatmap_rdd.collect().forech(println)
        out:   HelloSparkAreyouok?
              Hello
              Spark
              !  
              Are
              you
              ok
              ?
              

        从println的输出我们可以看到split方法将整个rdd中的元素根据“ ”分成了7个元素,从print打印的结果可以看出flatMap()讲切割后文件进行了压扁处理。

      (4)集合运算

        distinct()    去重

        union()     并集

        intersection()  交集 

        substract()   A中有而B中没有的元素

          val rdd1=sc.parallelize(Array("coffe","coffe","tea","monkey","time"))
          val rdd2=sc.parallelize(Array("coffe","house"))
          val distinct_rdd=rdd1.distinct()
          val union_rdd=rdd1.union(rdd2)
          val inter_rdd=rdd1.intersection(rdd2)
          val sub_rdd=rdd1.inter       distinct_rdd.collect().
    foreach(println)       println("<------>")       union_rdd.collect().foreach(println)       println("<------>")       inter_rdd.collect().foreach(println)
          println("<------>")
          sub_rdd.collect().foreach(println)
         out:   tea
                monkey
                coffe
                time
                <------>
                coffe
                coffe
                tea
                monkey
                time
                coffe
                house
                <------>
                coffe
                <------>
                tea
                monkey
                time

     2.Action

      在RDD上计算一个结果(transformation只是在进行转换等操作,并没有计算出实际的结果),把结果返回给driver program或保存在文件系统。

      (1)collect()  

          遍历整个RDD,返回RDD的所有元素。

          注意:返回内容必须要单机内存能够容纳下(因为数据要拷贝给driver测试使用),大数据时我们一般使用saveAsTextFile()

        val rdd=sc.parallelize(Array(1,2,4,4))
        rdd.collect()
        out:Array[Int]=Array(1,2,4,4)

      (2)reduce()  

        接收一个函数,作用在RDD两个类型相同的元素上,返回新的元素。可实现RDD中元素的累加、计数,和其他类型的集聚操作。

        val sum=rdd.reduce((x,y)=>x+y)
        print(sum)
        out:11

      (3)take()

        返回RDD的n个元素(尝试访问最少的partitions),返回结果时无序的,一般有测试使用

      (4)top()

        返回排序(根据RDD中的数据比较器)后的结果,返回几个元素。

      (5)foreach() 

        计算RDD中的每个元素,但不返回到本地,可以配合println()打印良好输出。

            rdd.foreach(println)
        out:   1
             2
             4
             4
  • 相关阅读:
    QR code 乱谈(一)
    用JAVA实现数字水印(可见)
    ctf总结
    Unix/Linux常用命令
    C语言概述
    C语言发发展历史
    为什么要学习C语言
    计算机应用领域
    计算机发展趋势
    如何学习计算机
  • 原文地址:https://www.cnblogs.com/2017Crown/p/7413982.html
Copyright © 2011-2022 走看看