zoukankan      html  css  js  c++  java
  • Spark RDD教程

    这个教程将会帮助你理解和使用Apache Spark RDD。所有的在这个教程中使用的RDD例子将会提供在github上,供大家快速的浏览。

    什么是RDD(Rssilient Distributed Dataset)?

    RDD是Spark的基础数据结构,是Spark和Spark内核的主要数据抽象。RDD是容错的、不可变的对象分布式集合,这意味一旦创建了RDD,就不能更改它。RDD中的每个数据集都被划分为逻辑分区,这些逻辑分区可以在集群的不同节点上计算。

    换句话说,RDD是类似于Scala中的集合的对象集合,不同之处在于RDD是分散在多个物理服务器(也称为集群中的节点)上的多个JVM上计算的,而Scala集合则位于单个JVM上。

    另外,RDD提供对数据进行分区和分配的数据抽象,这些数据只在在多个节点上并行运行计算,而大多数时候,在RDD上进行转换时,我们不必担心默认情况下Spark提供的并行性。

    本Apache Spark RDD教程使用Scala示例描述了RDD上可用的基本C座,例如map,filter和persist等。此外,本教程还介绍了pair RDD函数,该函数可在键值对的RDD上运行,例如groupByKey和join等。

    RDD的优势

    • In-Memory Processing
    • Immutability
    • Fault Tolerance
    • Lazy Evolution
    • Partitioning
    • Parallelize

    限制

    Spark RDDs不太适合对状态存储(如web应用程序的存储系统)进行更新的应用程序。对于这些应用程序,使用执行传统更新日志记录和数据检查点(如数据库)的系统更有效。RDD的目标是为批处理分析提供一个有效的编程模型,而不考虑这些异步应用程序。

    RDD的创建

    RDD主要以两种不同的方式创建,首先是并行化现有集合,其次是引用外部存储系统(HDFS,S3等)中的数据集。

    在查看实例之前,首先让我们使用SparkSession类中定义的builder模式方法初始化SparkSession。在初始化时,我们需要提供如下所示的主名称和应用程序名称。

    val spark:SparkSession = SparkSession.builder()
            .master("local[1]")
            .appName("SparkByExamples.com")
            .getOrCreate()
    

    使用sparkContext.parallelize()

    sparkContext.parallelize用于并行化驱动程序中的现有集合。这是创建RDD的基本方法,主要在POC或原型制作时使用,它要求在创建RDD之前将所有数据都存在于驱动程序中,因此它并不是最常用于生产应用程序的。

    val dataSeq = Seq(("Java", 1000), ("Python", 2000), ("Scala", 3000))
    val rdd = spark.sparkContext.parallelize(dataSeq)
    

    对于生产应用程序,我们主要通过使用外部存储系统(如HDFS、S3、HBase e.t.c)来创建RDD。

    使用sparkContext.textFile()

    使用testFile()方法,我们能把一个txt文件读到RDD中。

    val rdd2 = spark.sparkContext.textFile("/path/textFile.txt")
    

    使用sparkContext.wholeTextFiles()

    wholeTextFiles()方法返回一个PairRDD,键是文件路径,值是内容

    val rdd3 = spark.SparkContext.wholeTextFiles("/path/textFile.txt")
    

    除了使用text文件,还可以使用csv文件,json和其他格式的文件。

    使用sparkContext.emptyRDD

    使用sparkContext的emptyRDD()方法,创建一个没有数据的RDD。这个方法创建一个空的RDD,并且没有分区。

    val rdd = spark.sparkContext.emptyRDD
    val rddString = spark.sparkContext.emptyRDD[String]
    

    创建带分区的空的RDD

    有时我们可能需要按分区将空的RDD写入文件,在这种情况下,您应该使用分区创建空的RDD。

    val rdd2 = spark.sparkContext.parallelize(Seq.empty[String])
    

    RDD并行和重新分区

    当我们使用parallelize()或textFile()或SparkContext的wholeTextFile()方法来初始化RDD时,它会根据资源可用性自动将数据分割为分区。

    getNumPartitions- 返回数据的分区数。在RDD上应用的任何转换都是并行执行的。Spark将为集群的每个分区运行一个任务。

    println("initial partition count:" + rdd.getNumPartitions)
    // Outputs: initial partition count:2
    

    手动设置并行度- 我们可以手动设置一个我们需要的分区数量,将分区数作为第二参数传递给这些函数sparkContext.parallelize(dataSeq, 10)

    使用重新 分区和合并进行重新分配:有时候我们可能需要重新划分RDD,Spark提供了两种重新划分的方法;首先使用repartition()方法从所有节点shuffle数据,也称为完全混洗。第二种coalesce()方法,该方法shuffle最少节点的数据,举个例子,如果你有数据分布在4个分区,现在你使用coalesce(2),仅仅只从两个节点移动数据。

    这两个函数都会重新分配分区。repartition()方法的代价非常的巨大,它将会混洗集群上所有节点的数据。

    val reparRdd = rdd.repartiton(4)
    println("re-partition count:" + reparRdd.getNumPartitions)
    // Outputs: "re-partition count:4"
    

    Note:repartition() or coalesce()方法都返回一个新的RDD

    RDD操作

    RDD转换:转换时惰性操作,这些操作不会更新RDD,而是返回另一个RDD

    RDD操作:除法计算并返回RDD值得操作。

    RDD转换例子

    Spark RDD上的转换操作返回另一个RDD,并且转换操作是惰性的,这意味着他们不会立即执行,直到你调用一个RDD action时才会执行。RDD上的一些转换操作,如flatMap, map, reductByKey, filter, sortByKey,这些转换操作都会返回一个新的RDD,而不是更新已有的RDD。

    在这个Spark RDD转换教程中,我将使用单词计数示例老解释转换。下图演示了我们将要使用的不同的RDD转换。

    image

    首先,从一个text文件创建一个RDD。

    val rdd:RDD[String] = spark.spark.Context.textFile("src/main/scala/test.txt")
    

    flatMap:flatMap转换将RDD展平,并返回新的RDD。在下面的示例中,首先它在RDD中空格分隔记录,最后将其展平。结果RDD在每个记录上都包含一个单词。

    val rdd2 = rdd.flatMap(f => f.split(" "))
    

    map:映射转换用于任何复杂的操作,比如添加一个列,更新一个列e.t.c。映射转换的输出总是与输入有相同数量的记录。

    在我们的单词计数示例中,我们将为每个单词添加一个值为1的新列,RDD的结果为PairRDDFunctions,其中包含键值对,String类型的单词为Key,Int类型的1位为value。为了更好的理解,我们为rdd3变量定义了类型。

    val rdd3:RDD[(String:Int)] = rdd2.map(m => (m, 1))
    

    filter:filter转换操作是用来在RDD中过滤记录的。在我们的例子中,过滤所有以'a'开头的单词。

    val rdd4 = rdd3.filter(a => a._1.startsWith("a"))
    

    reductByKey:reduceByKey用指定的函数来合并相同key对应的value值。在我们的示例中,它通过对值应用sum函数来减少单词字符串。我们的RDD的结果包含唯一的单词和他们的计数。

    val rdd5 = rdd4.reductByKey(_ + _)
    

    sortByKey:sortByKey转换是对RDD的key列进行排序。在我们的示例中,首先我们使用映射转换RDD[(String,Int)] to RDD[(Int,String)],并应用sortBykey,它在理想情况下对整数值进行排序。最后,使用println语句的foreach返回RDD中的所有单词及其作为键-值对的计数。

    val rdd6 = rdd5.map(a => (a._2, a._1)).sortByKey()
    // Print rd6 result to console
    rdd6.foreach(println)
    

    RDD Actions with example

    RDD Action操作从RDD返回原始值。换句话说,任何返回非RDD[T]的RDD函数都被视为一个动作。

    count:返回RDD中的记录数

    //Action - count
    println("Count : " + rdd6.count())
    

    first:返回第一条记录

    // Action - first
    val firstRec = rdd6.first()
    println("First Record : " + firstRec._1 + "," + firstRec._2)
    

    max:返回最大的记录

    val datMax = rdd6.max()
    println("Max Record : " + datMax._1 + "," + datMax._2)
    

    reduct:将记录减少为单个,我们可以使用它来计数或求和

    val totalWordCount = rdd6.reduce((a, b) => (a._1 + b._1, a._2))
    println("dataReduce Record : " + totalWordCount._1)
    

    take:返回指定数目的记录

    val data3 = rdd6.take(3)
    data3.foreach(f => {
        println("data3 Key:" + f._1 + ", Value:" + f._2)
    })
    

    collect:以数据形式返回RDD中的所有数据。当你在处理带有成千上万亿数据的巨大的RDD时,请小心使用此操作,因为你可能会耗尽驱动程序上的内存。

    val data = rdd6.collect()
    data.foreach(f => {
        println("Key:" + f._1 + ", Value:" + f._2)
    })
    

    saveAsTextFile:使用saveAsTextFile操作,可以把RDD写入到text文件。

    rdd6.saveAsTextFile("/tmp/wordCount")
    
  • 相关阅读:
    安装lnmp 时如何修改数据库数据存储地址及默认访问地址
    ubuntu 设置root用户密码并实现root用户登录
    解决ubuntu 远程连接问题
    linux 搭建FTP服务器
    PHP 根据ip获取对应的实际地址
    如何发布自己的composer包
    使用composer安装composer包报Your requirements could not be resolved to an installable set of packages
    laravel 框架配置404等异常页面
    使用Xshell登录linux服务器报WARNING! The remote SSH server rejected X11 forwarding request
    IoTSharp 已支持国产松果时序数据库PinusDB
  • 原文地址:https://www.cnblogs.com/xiagnming/p/12492018.html
Copyright © 2011-2022 走看看