zoukankan      html  css  js  c++  java
  • Spark Transformations介绍

    背景

    本文介绍是基于Spark 1.3源码

    如何创建RDD?

    RDD可以从普通数组创建出来,也可以从文件系统或者HDFS中的文件创建出来。

    举例:从普通数组创建RDD,里面包含了1到9这9个数字,它们分别在3个分区中。

    
    scala> val a = sc.parallelize(1 to 9, 3)
    a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[103] at parallelize at <console>:21
    
    

    举例:读取文件README.md来创建RDD,文件中的每一行就是RDD中的一个元素

    scala> val file = sc.textFile("README.md")
    file: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[3] at textFile at <console>:21
    

    虽然还有别的方式可以创建RDD,但在本文中我们主要使用上述两种方式来创建RDD以说明Transformations。

    map

    map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

    定义

    
    def map[U: ClassTag](f: T => U): RDD[U]
    

    举例:

    
    scala> val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
    a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:21
    
    scala> val b = a.map(_.length)
    b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at map at <console>:23
    scala> val c=a.zip(b)
    c: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[6] at zip at <console>:25 
    scala> c.collect
    res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))
    

    filter

    filter作用于原RDD中每个元素,过滤掉原RDD中f返回值为false的元素

    定义

    
    def filter(f: T => Boolean): RDD[T]
    

    举例

    
    scala> val file=sc.textFile("README.md")
    file: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[19] at textFile at <console>:21
    
    
    
    scala> file.filter(line=>line.contains("spark")).count
    res5: Long = 11
     
    
    scala> file.filter(line=>line.contains("spark")).collect
    res6: Array[String] = Array(<http://spark.apache.org/>, guide, on the [project web page](http://spark.apache.org/documentation.html), ["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html)., " ./bin/spark-shell", " ./bin/pyspark", "examples to a cluster. This can be a mesos:// or spark:// URL, ", " MASTER=spark://host:7077 ./bin/run-example SparkPi", Testing first requires [building Spark](#building-spark). Once Spark is built, tests, ["Specifying the Hadoop Version"](http://spark.apache.org/docs/latest/building-with-maven.html#specifying-the-hadoop-version), ["Third Party Hadoop Distributions"](http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html), Please refer to the [Configuration guide](http://spark.apache.org/docs/latest/configurat...
    

    flatMap

    flatMap和map的区别是作用于map的函数只会返回一个元素,作用后元素个数不变,而作用于flatMap的函数返回包含0个或多个元素list的迭代器

    定义

    
    def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
    
    

    举例

    
    scala> val file=sc.textFile("README.md")
    file: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[25] at textFile at <console>:21
    
    
    
    scala> file.flatMap(_.split(" ")).take(5)
    res11: Array[String] = Array(#, Apache, Spark, "", Spark)
    
     
    
    scala> file.map(_.split(" ")).take(5)
    res12: Array[Array[String]] = Array(Array(#, Apache, Spark), Array(""), Array(Spark, is, a, fast, and, general, cluster, computing, system, for, Big, Data., It, provides), Array(high-level, APIs, in, Scala,, Java,, and, Python,, and, an, optimized, engine, that), Array(supports, general, computation, graphs, for, data, analysis., It, also, supports, a))
    
    scala> file.map(_.length).take(5)
    res1: Array[Int] = Array(14, 0, 78, 72, 73)
    

    我们在统计一个文件中有多少单词时,应该使用flatMap,如果使用map分词,每行返回一个数组。如果计算每行的长度应该使用map

    mapPartitions

    mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。

    定义

    
    def mapPartitions[U: ClassTag](
        f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
    

    f即为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数f,f的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。

    举例

    
    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    val nums=sc.parallelize(1 to 9,3)
    nums.mapPartitions(iter=>{
    var res = List[(Int, Int)]()
    var pre = iter.next
    while (iter.hasNext) {
    val cur = iter.next;
    res ::= (pre, cur)
    pre = cur
    }
    res.iterator
    }).collect()
     
    // Exiting paste mode, now interpreting. 
    nums: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:21
    res1: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))
    

    上述例子经过mapPartitions把分区中一个元素和它的下一个元素组成一个Tuple。因为分区中最后一个元素没有下一个元素了,所以(3,4)和(6,7)不在结果中。
    mapPartitions还有些变种,比如mapPartitionsWithIndex、mapPartitionsWithContext、

    mapPartitionsWithSplit,但是从1.2开始mapPartitionsWithContext、mapPartitionsWithSplit这些已作废,下面介绍mapPartionsWithIndex。

    mapPartionsWithIndex

    定义

    def mapPartitionsWithIndex[U: ClassTag](
        f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
    

    mapPartitionsWithIndex类似于mapPartitions,只是作用函数是两个参数,多了partition的索引。

    举例

    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    val nums=sc.parallelize(1 to 9,3)
    nums.mapPartitionsWithIndex((index,iter)=>{
     if(index == 0)
     iter.toList.map(_*2).iterator
     else
     iter
    }).collect()
    
    // Exiting paste mode, now interpreting.
    
    nums: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21 
    res0: Array[Int] = Array(2, 4, 6, 4, 5, 6, 7, 8, 9)
    
    

    参考

    http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
    https://spark.apache.org/docs/latest/programming-guide.html

  • 相关阅读:
    IDEA里运行程序时出现Error:scalac:error while loading JUnit4 , Scala signature JUnit4 has wrong version错误的解决办法(图文详解)
    全网最详细的最新稳定OSSEC搭建部署(ossec-server(CentOS6.X)和ossec-agent(CentOS6.X))(图文详解)
    SPSS学习系列之SPSS Modeler怎么修改默认的内存大小(图文详解)
    [转]innodb的锁时间
    [转]MySQL-死锁查询
    [转]mysql优化——show processlist命令详解
    mysql 手册
    [转]1、蓝牙核心技术了解(蓝牙协议、架构、硬件和软件笔记)
    [转]低功耗蓝牙技术连接的那点事
    [转]Ble蓝牙的使用手册
  • 原文地址:https://www.cnblogs.com/jacksu-tencent/p/4526535.html
Copyright © 2011-2022 走看看