zoukankan      html  css  js  c++  java
  • Spark编程指南

    Spark编程指南

     (写在前面,本文是翻译自2015年7月29日的http://spark.apache.org/docs/latest/programming-guide.html,由于水平所限,肯定存在很多翻译不到位地方。本文的翻译是为了加深自己作为初学者对Spark的理解,欢迎大家指出各种理解上的错误。)

    一,概述

    每个Spark应用都有一个driver程序,用来调用你的Main函数和在集群上执行各种并行操作。Spark中最重要的一个抽象概念叫RDD,包含一组分区数据,用来在集群的节点上并发的执行。创建RDD,可以从一个Hadoop支持的文件开始,或者从一个scala数组开始。Spark还可以将RDD持久化在内存中,这样就可以在并行操作中被重复利用进而提升效能。如果集群中发生了节点故障,RDD还可以自动恢复。

    另一个Spark中的抽象概念叫做共享变量,可以运行在并行处理中。一般而言,当Spark在各个节点上并行的运行一个函数的各个任务时,Spark会将函数中用到的变量的拷贝分发到各个节点上。但有时,变量需要在任务之间共享,或者需要在任务和driver之间共享。Spark支持两种类型的共享变量:broadcast变量,它会缓存一个值在各个节点的内存中;accumulators,这种变量只能用来执行加法,比如用来计数或者求和。

    二、使用Spark

    Spark1.4.1是用Scala2.10编译的,如果用Scala编写Spark程序,需要引用一个兼容的Scala版本。再者,需要添加一个Maven依赖,比如:

    groupId = org.apache.spark

    artifactId = spark-core_2.10

    version = 1.4.1

    除此之外,如果需要访问HDFS集群,则需要添加一个与你的HDFS版本相同的hadoop-client的依赖:

    groupId = org.apache.hadoop

    artifactId = hadoop-client

    version = <your-hdfs-version>

    最终,开始编程前,如下的Spark类库的引用是必要的:

    import org.apache.spark.SparkContext

    import org.apache.spark.SparkConf

    (如果使用的Spark的版本低于1.4,则需要显示的添加import org.apache.spark.SparkContext._,这样所有隐式的转换才会生效)

    三、初始化Spark

    S park程序一开始就应该创建一个SparkContext对象,它会告诉Spark如何访问集群。但在创建SparkContext之前,需要创建一个SparkConf对象,它包含关于你的应用的一些信息。

    每个JVM只允许一个SparkContext是活动的,如果要新建一个SparkContext则必须将之前活动的SparkContext调用stop()方法。

    val conf = new SparkConf().setAppName(appName).setMaster(master)

    new SparkContext(conf)

    appName是你的应用的名字,会显示在集群的UI上。masterSpark,Mesos,Yarn cluster URL,如果是在本地运行,则可以写作local。实践中,如果运行在一个集群中,最好不要对master硬编码,而是以spark-submit提交的方式中获取响应的参数。当然,本地测试或者单元测试的时候,可以传递local去执行。

    一)、使用Shell

    Spark shell里,环境已经创建了一个SparkContext,可以通过sc直接访问,不允许自己再创建SparkContext。可以通过提交--master来设置maser参数,可以通过提交--jars所需要的以逗号分隔的Jar包列表来指明classpath,可以通过提交--packages所需要的以逗号分隔的依赖列表来添加依赖,其他的可能存在依赖的repositories,可以通过--repositories来提交。比如,要想bin/spark-shell运行在4core上,可以:

    $ ./bin/spark-shell --master local[4]

    比如,同时要添加code.jarclasspath:

    $ ./bin/spark-shell --master local[4] --jars code.jar

    比如,要添加一个maven依赖:

    $ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

    对于全部的可提交参数,可以执行spark-shell --help 进行查看。

    四、弹性分布数据集RDD

    RDDSpark的核心概念,实际上是一个可以并行执行的可容错的数据集。既可以通过driver程序中的数组的并行化来生成RDD,也可以通过引用来自外部的数据集来创建RDD,外部的数据集可以存储在HDF或者HBase上,或者任何提供Hadoop InputFormat接口的数据源。

    并行化的数组

    通过在你的driver程序上的已存的数组上调用SparkContextparallelize方法,可以创建一个并行化的数组。源数组中的元素会被copy去组成一个可以并行执行的分布式数据集。比如,如下是创建一个包含数字15的并行化数组:

    val data = Array(1, 2, 3, 4, 5)

    val distData = sc.parallelize(data)

    创建完毕,分布式数据集就可以并行化的执行了。比如,可以调用distData.reduce((a,b)=>a+b)来将数组中所有数加起来。后续会对分布式数据集上的操作有更多的介绍。

    一个很重要的参数是分区数,用来决定数据集会被切分为几份。Spark会在集群上为每个分区执行一个task。通常情况,集群上的一个CPU可以进行2-4分区的执行。Spark会根据集群的配置自动设置分区的个数,但你也可以通过设置parallelize的第二个参数来手工进行设置,e.g. sc.parallelize(data, 10)

    (某些情况下,会有slices的概念来替换partition,其实他们是一回事。)

    外部数据集

    Spark可以通过多种多样的外部存储源创建分布式数据集,只要他们支持Hadoop,比如HDFSCassandraHBaseS3,甚至你本地文件系统。Spark支持文本文件,Sequence文件,或者其他Hadoop InputFormat支持的文件类型。

    文本文件RDD可以通过SparkContexttextFile方法创建,这个方法通过文件的URI(可以是本机的路径,也可以是HDFS的路径,比如hdfs://, s3n://)来将文件一行行的读进来组成一个数组,例子如下:

    scala> val distFile = sc.textFile("data.txt")

    distFile: RDD[String] = MappedRDD@1d4cee08

    一旦创建,distFile就可以执行数据集的各种操作。比如,可以利用mapreduce将每一行的长度加总,distFile.map(s => s.length).reduce((a, b) => a + b)

    Spark读文件的时候需要注意如下事项:

    1.如果使用了一个本地路径来访问文件,则在所有工作节点的相同路径下都必须存在这个文件。要么将文件传递到所有工作节点,要么使用一个网络驱动的共享文件系统。

    2.所有Spark支持的录入方式,包括textFile,支持目录访问,压缩文件访问,通配符访问。比如,你可以使用("/my/directory"), textFile("/my/directory/*.txt"),textFile("/my/directory/*.gz")

    3.textFile的第二个可选参数用来指定文件的分区数。通常,Spark会为每一个block创建一个分区(HDFS中文件的默认的block64M),但你也可以通过指定一个更大的值来分配一个更高的分区数,但你不能要求Spark分配比block数还小的分区数。

    除了文本文件之外,SparkScalaAPI还支持其他一些数据格式:

    1.SparkContext.wholeTextFiles用来读取包含很多小文本文件的目录,返回值为(filename,content)pairs。相比之下,textFile返回的是每个文件中的每行记录。

    2.对于SequenceFiles而言,可以使用SparkContextsequenceFile[K,V]来读取,其中KV分别是文件中keyvalue的数据类型。这些数据类型必须是HadoopWritable接口的子类,比如IntWritableText。此外,Spark也允许指定内生的数据类型,比如sequenceFile[Int,String]会自动读取IntWritableText数据类型的文件。

    3.对于其他的HadoopInputFormat,可以使用SparkContext.hadoopRDD来读取,指定的参数包括任意的一个JobConf和输入类型,key类型和value类型。设置这些参数的方式与你设置Hadoop作业时一样。对于使用最新的MapReduce APIorg.apache.hadoop.mapreduce)的InputFormat,也可以使用SparkContext.newAPIHadoopRDD来读取。

    4.RDD.saveAsObjectFileSparkContext.objectFile支持将RDD保存为包含序列化Java对象的简单格式。但序列化格式,比如Avro,能提供更好的性能去保存任何一个RDD

    RDD操作

    RDD支持两种类型的操作,transformationactiontransformation会从一个RDD中生成一个新的RDDaction则通过在数据集上进行运算返回一个值到driver程序。比如,map就是一个transformation,通过在数据集的每一个元素上执行函数,返回一个新的包含执行结果的RDD。而reduce作为一个action,则通过制定的函数在RDD上的执行来聚合所有的元素,并将最终的计算所得返回给driver程序(别搞混了,有一个reduceByKey的操作,虽然名字和reduce很像,但却是一个transformation)。

    所有的transformationSpark中都是lazy的,换言之,这些操作并不被立即执行。相反,这些操作只是被记住,只有当一个action准备执行的时候,所有的transformation才会被执行。这种设计有利于Spark运行的更有效,比如,通过map创建的RDD会在后续的reduce中使用,这样reduce计算的值就会被传递给driver,而不是map生成的大得多的数据集。

    通常,每在一个RDD上执行一个action,这个RDD上的所有transformation都会被执行一次。通过调用persist(或者cache),可以将RDD持久化在内存中,这样下次使用这个RDD时,就可以比较快的访问了。这种持久化,也同时支持在硬盘或者分布在不同的节点上。

    Basics

    下面简单的代码简要说明了RDD的一些基本操作

    val lines = sc.textFile("data.txt")

    val lineLengths = lines.map(s => s.length)

    val totalLength = lineLengths.reduce((a, b) => a + b)

    第一行定义了一个基本的RDD,从外部文件读取数据。这个数据集并没有立即被读进内存或者所被执行,lines只是一个指向这个文件的引用。第二行定义了一个lineLengths,它是map操作指向的结果。同样,lineLengths也没有被立即计算。最终,当reduce执行的时候,Spark会将计算过程拆分为tasks,进而在不同的机器上运行,每个机器都会执行map操作和本地的reduce操作,最终reduce操作会将结果返回给driver程序。如果lineLengths会被再次使用,可以在reduce之前调用lineLengths.persist(),这样lineLengths会在第一次被计算出来之后被保持在内存中。

    Passing Functions to Spark

    SparkAPI有很多通过driver程序传递函数到集群上执行的情况。以下两种方式是推荐使用的:

    1.匿名函数语法,通常用在较短小的代码片段。

    2.全局单例对象的静态方法,比如你可以定义object MyFunctions,然后传递MyFunctions.func1,

    object MyFunctions {

      def func1(s: String): String = { ... }

    }

    myRdd.map(MyFunctions.func1)

    同样,也可以传递一个类实例的方法的引用,这会导致包含整个类和方法的对象被传递,比如:

    class MyClass {

      def func1(s: String): String = { ... }

      def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }

    }

    我们创建一个MyClass类,然后调用doStuffdoStuff的内部的map引用了MyClass实例的func1方法,会导致整个对象被传递到集群上去执行。这与如下写法一样,rdd.map(x => this.func1(x))

    同样的,访问外部对象的fields会导致对外部对象的整个引用:

    class MyClass {

      val field = "Hello"

      def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }

    }

    如上写法,类似rdd.map(x => this.field + x),其实this对象整个被引用了。为了避免这个情况,最简单的方法就是通过局部变量来保存field而不是去访问外部的对象:

    def doStuff(rdd: RDD[String]): RDD[String] = {

      val field_ = this.field

      rdd.map(x => field_ + x)

    }

    Understanding closures

    Spark中比较难理解的一点就是运行在集群上的方法和变量的作用域和生命周期。那些需要修改RDD外部的变量的RDD操作往往会造成比较大的误解。下面的例子中foreach()用来增加一个counter,但同样的问题在其他操作中也会发生。

    下面这个写的幼稚的RDD操作,操作结果会有很大的不同,取决于执行时在不在一个JVM里。通常,Local模式会和集群模式来比较:

    var counter = 0

    var rdd = sc.parallelize(data)

    // Wrong: Don't do this!!

    rdd.foreach(x => counter += x)

    println("Counter value: " + counter)

    Local vs. cluster modes

    上述代码的一个显而易见的错误是代码的操作没有预期结果。local模式下,运行在同一个JVM里,上述代码会加总RDD里的值,并将结果存在counter里。这事因为RDD和变量counter都运行在driver节点的同一个内存区域。但是,在cluster模式下,事情会变得更复杂,上述代码不会按预期执行。为了执行作业,SparkRDD的一系列操作划分为task,每个task在某个executor上执行。在执行之前,Spark会计算闭包closure。闭包是指executor在执行某个RDD的操作时可见的变量和方法(比如例子中的foreach())。闭包会被序列化,然后发送到各个executor上。local模式时,因为只有一个executor,所以共享相同的闭包。其他模式下,每个executor都运行在不同的工作节点,每个executor都有自己的闭包复制品。

    这样,闭包中的变量的拷贝被发送到各个executor,当foreach中用到counter的时候,用到的不是drive节点的counterdriver节点当然在内存中依然保存着一个counter,但所有的工作节点看不见他。所有的工作节点只能看到序列化传输过来的拷贝。所以,counter最终的值会是0,因为所有的操作访问的都是工作节点上序列化传输过去的闭包中的counter

    如果要确保操作按照预期执行,建议使用AccumulatorAccumulator提供一种机制,确保在集群中的不同节点中可以安全的更新一个变量。关于Accumulator更详细的讨论会在下面提供。

    通常意义上,闭包(类似循环或者本地化的方法)不应该访问全局状态。Spark没有定义或者保证这种在闭包中改变外部对象的行为的机制。有时候在local模式会成功,但这往往也是偶然的情况,同样的代码在分布式模式下就不会成功。请使用Accumulator来代替全局聚集吧。

    Printing elements of an RDD

    另一个常见的误用就是利用rdd.foreach(println)或者rdd.map(println)来打印RDD中的元素。在单机节点上,这样做会表现很正常。但是,在集群模式下,输出会输出在executorstdout,而不是driverstdout,所以driverstdout上不会有任何显示。为了在driver上打印数据,可以使用collect()先将RDD返回到driver上,然后调用rdd.collect().foreach(println)。这个操作很有可能会导致driver内存不足,因为collect()会将所有的RDD都返回到一个机器。如果你真的需要打印一些数据,一个安全的方法是使用take(),take(100).foreach(println)

    Working with Key-Value Pairs

    大部分的Spark操作都能在任意数据类型的RDD上进行,但是有一些操作是专门为key-valueRDD准备的。最常用的是shuffle操作,再有比如grouping或者aggregating

    scala中,这些操作往往是Tuple2类型的对象原生拥有的。PairRDDFunctions包装了RDDtuples,包含了所有的key-value的操作。

    例如,如下的代码在一个key-value上使用reduceByKey来计算文件中每一行出现的次数:

    val lines = sc.textFile("data.txt")

    val pairs = lines.map(s => (s, 1))

    val counts = pairs.reduceByKey((a, b) => a + b)

    也可以使用counts.sortByKey(),使得按字母序排序,然后调用counts.collect()将对象数组返回到driver程序。

    注意:如果要使用自定义对象作为key-value操作的key,则必须保证自定义的equals()hashcode()同时提供。

    Transformations

    如下列出了Spark支持的transformation

    见附1

    Actions

    如下列出了Spark支持的action

    见附2

    Shuffle operations

    有一些操作会触发shuffle。通过shuffleSpark可以将不同分区的数据聚合在一起。通常,shuffle意味着在executor和机器之间移动数据,这样导致shuffle是一个比较复杂而耗时的操作。

    Background

    reduceByKey为例,我们来理解一下shuffle过程当中到底发生了什么。reduceByKey会生成一个新的RDD,包含key,和key所关联的所有值。关键在于,key所关联的所有值有可能不在同一个分区,甚至不在同一个机器上,但他们必须知道彼此以便计算。

    Spark中,为了在特殊的地方进行特殊的操作,数据往往不会跨分区存在。在计算时,单独的task在单独的分区上执行,所以,为了一个单独的reduceByKey任务去执行而组织数据,Spark需要执行一个all-to-all的操作。从所有分区中发现所以key的所有value,然后将值跨区合并进而计算每个key的最终值,这个过程就叫shuffle

    虽然shuffled后的每个分区的数据集的是确定的,每个分区也是有序的,但是分区中的数据却不是有序的。如果要求shuffle之后的数据有序,需要如下操作:

    1.mapPartitions 可以使用.sorted来对每个分区进行排序。

    2.repartitionAndSortWithinPartitions在重分区的同时进行高效的分区排序。

    3.sortBy可以使得RDD全局有序。

    能够导致shuffle发生的操作还包括repartitioncoalesce,以及grooupByKeyreduceByKey,cogroupjoin

    Performance Impact

    由于shuffle会导致磁盘IO,数据序列化和网络IO,所以是一个比较昂贵的操作。为了给shuffle准备数据,Spark生成了一系列的任务,map任务用来组织数据,reduce任务用来聚合数据。这种命名只是沿用了MapReduce,与Spark本身的mapreduce操作没有关系。

    Spark内部,单个map任务的结果会保存在内存中,如果超出内存限制,则可能在磁盘上。然后,这些数据会基于目标分区被排序,然后写入一个文件。reduce任务就会读取相关的排序blocks

    某些shuffle操作特别消耗堆内存,因为在传递数据之前,数据都被保存在内存中。特别是reduceByKeyaggregateByKey会在map的时候创建对象在堆内存,ByKey操作则会在reduce的时候创建对象在堆内存。当数据无法在内存中放得下时,就会触发磁盘IO和垃圾回收。

    Shuffle也会在磁盘上生成大量的中间文件。在1.3版本的时候,直到相关的RDD不在使用,这些文件才会被删除。这样,当需要血缘重算时,这些shufflefile才不会被重建。如果应用长时间的保留对RDD的引用或者GC本身运行的频率比较低,垃圾回收就会在相当长时间段之后才会被触发。这意味着长时间运行的Spark程序会消耗非常大量的磁盘空间。临时存储目录可以通过Spark Context进行设置,参数是spark.local.dirshuffle可以通过不同的配置参数进行调优。

    RDD Persistence

    Spark中一个很重要的能力就是在不同操作直接持久化数据集。RDD持久化是通过每个节点在内存中保存RDD的每个分区来实现,这样后续在这个RDD上的action就可以重用它。这样后续的action往往会快10倍以上。对于迭代算法和快速交互使用,持久化是非常关键的。

    通过persist()或者cache(),都可以实现RDD的持久化。第一次通过action计算出来之后,RDD就会被保留在节点的内存中。Spark的缓存是容错的,如果某个RDD分区丢失了,可以通过transformation来重建它。

    另外,每个持久化的RDD可以通过不同的存储级别来保存,比如,持久化在磁盘,或者持久化在内存但是以序列化的java对象存在,或者备份在不同的节点,又或者保存在Tachyon中。通过传递给persist()一个storagelevel对象可以实现存储级别控制。cache()是使用StorageLevel.MEMORY_ONLY进行持久化的简写。如下是所有的存储级别:

    见附3

    Spark会自动持久化那些shuffle过程中的中间数据,虽然用户并没有要求这么做。这样做是为了避免某个节点故障而导致所有的输入都需要重算。即使如此,依然推荐用户在最终的RDD上调用persist(),如果打算后续还使用它。

    Which Storage Level to Choose?

    不同的Spark存储级别是为了提供不同的基于内存使用和CPU效率的比较。通过阅读下述,可以选择一个:

    1.如果你的RDD与默认的存储级别很匹配,那就用它好了。这是对CPU最有效的方式,RDD上的操作会尽可能的快。

    2.如果不是,则可以使用Memory_only_ser,然后选择一个快速的序列化库,这样对象的存储会更有效,这样对象也能够较合理的快速存取。

    3.如果计算的逻辑非常要紧,或者需要过滤非常大量的数据,否则不建议写到磁盘上。除此之外,重算分区和从磁盘读取是差不多的时间消耗。

    4.如果想要快速的故障恢复,则使用备份存储级别。所有的存储级别都提供通过重算数据来进行完全的故障容忍,但是备份数据允许你在不等待重算分区的前提下继续进行在RDD上的任务。

    5.在有大量内存或者多个应用的环境中,试验性质的OFF_HEAP有如下好处:

    1)允许多个executor共享在Tachyon中的同一个内存池。

    2)会显著的减少垃圾回收的消耗。

    3)单点executor故障的话,缓存数据也不会丢失。

    Removing Data

    Spark自动管理每一个节点上的缓存使用,根据LRU原则释放数据分区。如果你愿意手工释放一个RDD,可以使用RDD.unpersist()

    Shared Variables

    通常,在远程集群节点运行的Spark操作中引用的函数中的变量都是拷贝。这些变量被拷贝到每一个机器,对于这些拷贝的更新操作不会被同步回driver程序。支持广泛的,不同任务间的read-write变量读写会非常拖拉性能。尽管如此,Spark还是提供了两类首先的类型来满足两种通用场景的使用:broadcast变量和accumulator

    Broadcast Variables

    Broadcast变量允许程序员在每个机器上保留一个只读的变量,免得要传递它到每个task。可以被用来在每个节点上高效的存放一个大数据的拷贝。Spark还会尽量使用高效的广播算法来传递broadcast变量来减少网络消耗。

    Sparkaction在一组stage上执行,这些stageshuffle操作进行划分。Spark自动在每个stage广播task需要的通用数据。这种方式广播的数据会在序列化之后缓存,并在执行任务之前反序列化。这意味着显示的创建broadcast变量只有在不同的stage之间执行任务的时候才有效。

    Broadcast变量通过调用SparkContext.broadcast(v)产生的v来使用。broadcast变量是对v的包装,获取它的值可以通过调用value方法进行。代码如下:

    scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

    broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

    scala> broadcastVar.value

    res0: Array[Int] = Array(1, 2, 3)

    一旦broadcast变量创建,任何函数都应该使用这个变量而不是v,这样v就只会被传递到各个节点上一次。另外,v在广播之后不应该被修改,进而保证所有节点都得到broadcast变量的相同的值。

    Accumulator

    Accumulator变量只能通过关联操作做加法,所有可以高效的支持并发。它可以被用来实现计数或者求和。Spark内生的支持数字类型的accumulator,程序员可以开发支持新的类型。如果accumulator在创建时被赋予了名字,这个名字将显示在SparkUI上。这将有助于了解Stage运行的进度。

    通过调用SparkContext.accumulator(v),会创建一个accumulator,并赋值为初始值v。集群上的任务就可以对这个变量做加法。但是,任务却不能读取变量的值。只有driver程序能够调用value方法去读取变量的值。

    如下的示例演示了通过accumulator对数据中的元素求和:

    scala> val accum = sc.accumulator(0, "My Accumulator")

    accum: spark.Accumulator[Int] = 0

    scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

    ...

    10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

    scala> accum.value

    res2: Int = 10

    上面的代码利用了内置的Int类型的accumulator,程序员可以通过继承AccumulatorParam来实现新的类型。这个接口有两个方法:zero为新类型提供zero值,addInPlace用来将两个值加起来。如下,假设Vector类代表数学列表,代码如下:

    object VectorAccumulatorParam extends AccumulatorParam[Vector] {

      def zero(initialValue: Vector): Vector = {

        Vector.zeros(initialValue.size)

      }

      def addInPlace(v1: Vector, v2: Vector): Vector = {

        v1 += v2

      }

    }

    // Then, create an Accumulator of this type:

    val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

    Scala中,Spark支持泛式的Accumulable接口,它可以将数据加起来而不管加法元素的类型和最终结果的类型是否一致,然后可以调用SparkContext.accumulableCollection实现对泛式的Scala数组的获取。

    执行action操作时,Spark确保每个任务只对accumulator执行一次更新操作。比如,重启的任务不会再次更新accumulator。但是,对于transformation操作,用户需要明白如果task或者stage被重复执行,accumulator会被多次更新。

    Accumulator并没有改变Spark的延迟执行模型。如果需要在RDD的某个操作中被更新,它的值也仅仅是在RDD的某个action执行时才被更新。这就会导致accumulator在一个延迟的transformation中,比如map中执行,对它的更新是不被保证的。如下的代码片段演示了这个特性:

    val accum = sc.accumulator(0)

    data.map { x => accum += x; f(x) }

    // Here, accum is still 0 because no actions have caused the `map` to be computed.

    附1 Transformations

    Transformation

    Meaning

    map(func)

    Return a new distributed dataset formed by passing each element of the source through a function func.

    filter(func)

    Return a new dataset formed by selecting those elements of the source on which funcreturns true.

    flatMap(func)

    Similar to map, but each input item can be mapped to 0 or more output items (so funcshould return a Seq rather than a single item).

    mapPartitions(func)

    Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.

    mapPartitionsWithIndex(func)

    Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.

    sample(withReplacement, fraction, seed)

    Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.

    union(otherDataset)

    Return a new dataset that contains the union of the elements in the source dataset and the argument.

    intersection(otherDataset)

    Return a new RDD that contains the intersection of elements in the source dataset and the argument.

    distinct([numTasks]))

    Return a new dataset that contains the distinct elements of the source dataset.

    groupByKey([numTasks])

    When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. 
    Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. 
    Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.

    reduceByKey(func, [numTasks])

    When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

    aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

    When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

    sortByKey([ascending], [numTasks])

    When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

    join(otherDataset, [numTasks])

    When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin,rightOuterJoin, and fullOuterJoin.

    cogroup(otherDataset, [numTasks])

    When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.

    cartesian(otherDataset)

    When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).

    pipe(command, [envVars])

    Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.

    coalesce(numPartitions)

    Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

    repartition(numPartitions)

    Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

    repartitionAndSortWithinPartitions(partitioner)

    Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

    2 Actions

    Action

    Meaning

    reduce(func)

    Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

    collect()

    Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

    count()

    Return the number of elements in the dataset.

    first()

    Return the first element of the dataset (similar to take(1)).

    take(n)

    Return an array with the first n elements of the dataset.

    takeSample(withReplacement,num, [seed])

    Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.

    takeOrdered(n, [ordering])

    Return the first n elements of the RDD using either their natural order or a custom comparator.

    saveAsTextFile(path)

    Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.

    saveAsSequenceFile(path) 
    (Java and Scala)

    Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).

    saveAsObjectFile(path) 
    (Java and Scala)

    Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().

    countByKey()

    Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.

    foreach(func)

    Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. 
    Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

    3 StorageLevel

    Storage Level

    Meaning

    MEMORY_ONLY

    Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.

    MEMORY_AND_DISK

    Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.

    MEMORY_ONLY_SER

    Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.

    MEMORY_AND_DISK_SER

    Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.

    DISK_ONLY

    Store the RDD partitions only on disk.

    MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.

    Same as the levels above, but replicate each partition on two cluster nodes.

    OFF_HEAP (experimental)

    Store RDD in serialized format in Tachyon. Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors to be smaller and to share a pool of memory, making it attractive in environments with large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon, the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts from memory. If you plan to use Tachyon as the off heap store, Spark is compatible with Tachyon out-of-the-box. Please refer to this page for the suggested version pairings.

  • 相关阅读:
    关于sql json数据的处理
    时间函数strtotime的强大
    /usr/bin/install: cannot create regular file `/usr/local/jpeg6/include/jconfig.h'
    linux安装php7.2.7
    关于sql时间方面的处理
    关于centos防火墙的一些问题
    linux 安装ssl 失败原因
    linux安装php7.2.7
    拾取坐标和反查询接口api
    【转】通过点击获取地址等信息、可以传值
  • 原文地址:https://www.cnblogs.com/029zz010buct/p/4685094.html
Copyright © 2011-2022 走看看