zoukankan      html  css  js  c++  java
  • 【译】Spark官方文档——编程指南

    本文翻自官方博客,略有添加:https://github.com/mesos/spark/wiki/Spark-Programming-Guide

    Spark发指南

    从高的面看,其实每一个Spark的用,都是一个Driver类,通运行用户定义的main函,在集群上行各种并发操作和算

    Spark提供的最主要的抽象,是一个性分布式据集(RDD),它是一种特殊集合,可以分布在集群的点上,以函式程操作集合的方式,行各种各样的并发操作。它可以由hdfs上的一个文件建而,或者是Driver程序中,从一个已经存在的集合而。用户可以据集存在存中,它被有效的重用,行并发操作。最后,分布式据集可以自动的从点失中恢复,再次行算。

    Spark的第二个抽象,是并行算中使用的共享变量。默,Spark并发运行一个函,它是以多个的task,在不同的点上运行,它递每一个变量的一个拷贝,到每一个立task使用到的函中,因此些变量并非共享的。然而有候,我需要在任务中能够被共享的变量,或者在任务与动程序之间共享。Spark支持两种类型的共享变量:

    广播变量: 可以在存的所有点中被,用于存变量(只)

    累加器: 只能用做加法的变量,例如和求和

    本指南通一些样例展示些特征。者最好是熟悉Scala,尤其是包的法。留意,Spark可以通Spark-Shell的解器行交互式运行。你可能会需要它。

    接入Spark

    了一个Spark的用,你需要Spark和它的依,加入到CLASSPATH中。最的方法,就是运行sbt/sbt assemblySpark和它的依,打到一个Jar里面core/target/scala_2.9.1/spark-core-assembly-0.0.0.jar,然后它加入到你的CLASSPATH中。或者你可以spark发布到maven的本地存中,使用sbt/sbt publish。它在织org.spark-project下成一个spark-core.

    另外,你会需要入一些Spark的类和式, 下面几行加入到你程序的部

    import spark.SparkContext

    import SparkContext._

    初始化Spark

    Spark程序需要做的第一件事情,就是建一个SparkContext象,它告Spark如何一个集群。个通常是通下面的构造器实的:

    new SparkContext(master, jobName, [sparkHome], [jars])

    Master参是一个字符串,指定了接的Mesos集群,或者用特殊的字符串“local”指明用local模式运行。如下面的描述一般,JobName是你任务的名,在集群上运行的候,会在Mesos的Web UI控界面示。后面的两个参,是用在你的代,部署到mesos集群上运行使用的,后面会提到。

    在Spark的解器中,一个特殊的SparkContext变量已经你建,变量名字叫sc。建你自己的SparkContext是不会生效的。你可以通设置MASTER境变量,master接到需要的上下文。

    MASTER=local; ./spark-shell

    Master的命名

    Master的名字可以是以下3个格式中的一种

    Master Name

    Meaning

    local

    本地化运行Spark,使用一个Worker程(有并行)

     

    local[K]

    本地化运行Spark,使用K个Worker程(根据机器的CPU核设定)

     

    HOST:PORT

    Spark接到指定的Mesos Master,在集群上运行。Host参是Mesos Master的Hostname, 端口是master配置的端口,默5050.

    注意:在早期的Mesos版本(spark的old-mesos分支),你必使用master@HOST:PORT.

    集群部署

    如果你想你的任务运行在一个集群上,你需要指定2个可参:

    • SparkHome:Spark在集群机器上的安装路(必全部一致)
    • Jars:在本地机器上,包含了你任务的代和依的Jars文件列表。 Spark会把它部署到所有的集群点上。 你需要使用自己的系统你的作业,打包成一套jars文件。例如,如果你使用sbt,那么sbt-assembly插件是一个好方法,你的代和依,变成一个一的jar文件。

    如果有一些类是公用的,需要在不同的作业间共享,你可能需要手工拷贝到mesos的点上,在conf/spark-env中,通设置SPARK_CLASSPATH境变量指向它。信息可以参考配置

    分布式据集

    Spark的核心概念,是性分布式据集(RDD),一个有容机制,可以被并行操作的集合。目前有两种类型的RDD:并行集合(Parrallelized Collections),接收一个已经存在的Scala集合,在它上面运行各种并发算; Hadoop据集(Hadoop DataSets),在一个文件的每条上,运行各种函。只要文件系统是Hdfs,或者hadoop支持的任意存系统。两种RDD都可以通相同的方式行操作。

    并行集合

    并行集合是通用SparkContext的parallelize方法,在一个已经存在的Scala集合(只要是seq象就可以)上建而。集合的象会被拷贝建一个分布式据集,可以被并行操作。下面通spark解器的例子,展示如何从一个建一个并发集合

    scala> val data = Array(1, 2, 3, 4, 5)

    data: Array[Int] = Array(1, 2, 3, 4, 5)

    scala> val distData = sc.parallelize(data)

    distData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e

    一旦被建,分布据集(distData)可以被并行操作。例如,我可以用distData.reduce(_ +_) 的元素相加。我会在后的分布式据集做一步描述。

    建并行集合的一个重要参,是slices的目,它指定了据集切分几份。在集群模式中,Spark会在一份slice上起一个Task。典型的,你可以在集群中的每个cpu上,起2-4个Slice (也就是每个cpu分配2-4个Task)。一般,Spark会根据集群的况,自动设定slices的目。然而,你也可以手动的设置它,通parallelize方法的第二个参(例如:sc.parallelize(data, 10)).

    Hadoop据集

    Spark可以建分布式据集,从任何存在HDFS文件系统或者Hadoop支持的其它文件系统(包括本地文件,Amazon S3, Hypertable, HBase等等)上的文件。 Spark可以支持Text File, SequenceFiles 及其它任何Hadoop输入格式

    文本文件的RDDs可以通SparkContext的textFile方法建,方法接受文件的URI地址(或者机器上的文件本地路,或者一个hdfs://, sdn://,kfs://,其它URI).里是一个用例子:

    scala> val distFile = sc.textFile(“data.txt”)

    distFile: spark.RDD[String] = spark.HadoopRDD@1d4cee08

    一旦被建,distFile可以行据集操作。例如,我可以使用如下的map和reduce操作所有行的长度相加:

    distFile.map(_.size).reduce(_ + _ )

    方法也接受可的第二参,控制文件的分片目。默,Spark每一块文件建一个分片(HDFS默的块大小64MB),但是你可以通入一个更大的值指定更多的分片。注意,你不能指定一个比块个更少的片值(和hadoop中,Map不能小于Block一样)

    于SequenceFiles,使用SparkContext的sequenceFile[K, V]方法,K和V是文件中的key和values类型。他必是Hadoop的Writable的子类,例如IntWritable和Text。另外,Spark允你指定几种原生的通用Writable类型,例如:sequencFile[Int, String]会自动取IntWritable和Texts

    最后,于其它类型的Hadoop输入格式,你可以使用SparkContext.hadoopRDD方法,它可以接收任意类型的JobConf和输入格式类,类型和值类型。按照Hadoop作业一样的方法,设置输入源就可以了。

    分布式据集操作

    分布式据集支持两种操作:

    (transformation):根据有的据集建一个新的据集

    动作(actions):在据集上运行算后,返回一个值动程序

    例如,Map是一个,据集的每一个元素,都经一个函行算后,返回一个新的分布式据集作果。而另一方面,Reduce是一个动作,据集的所有元素,用某个函行聚合,然后最果返回动程序,而并行的reduceByKey是返回一个分布式据集

    所有Spark中的都是惰性的,也就是,并不会马上发生算。相反的,它只是住用到基据集上的些(Transformation)。而些(Transformation),只会在有一个动作(Action)发生,要求返回果动用,才真正行算。个设Spark更加有效率的运行。例如,我可以实,通map建一个据集,然后再用reduce,而只返回reduce的果driver,而不是整个大的据集。

    spark提供的一个重要操作是Caching。你cache一个分布式据集,每个点会存据集的所有片,并在存中算,并在其它操作中重用。会使得后的算更加的快速(通常是10倍),存是spark中一个构造迭代算法的工具,也可以在解器中交互使用。

    下面的表格列出目前支持的和动作:

    (Transformations)

    Transformation

    Meaning

    map(func)

     

    返回一个新的分布式据集,由每个原元素经func函后成

    filter(func)

     

    返回一个新的据集,由经func函后返回值true的原元素成

    flatMap(func)

    类似于map,但是每一个输入元素,会被映射0到多个输出元素(因此,func函的返回值是一个Seq,而不是一元素)

    sample(withReplacement,frac,seed)

     

    根据定的随机种子seed,随机抽样出量frac的据

    union(otherDataset)

     

    返回一个新的据集,由原据集和参联合而成

    groupByKey([numTasks])

     

    在一个由(K,V)成的据集上用,返回一个(K,Seq[V])的据集。注意:默情况下,使用8个并行任务行分,你可以入numTask可参,根据据量设置不同目的Task

    (groupByKey和filter合,可以实类似Hadoop中的Reduce功能)

    reduceByKey(func, [numTasks])

    在一个(K,V)的据集上使用,返回一个(K,V)的据集,key相同的值,都被使用指定的reduce函聚合到一起。和groupbykey类似,任务的个是可以通第二个可参配置的。

    join(otherDataset, [numTasks])

    在类型(K,V)和(K,W)类型的据集上用,返回一个(K,(V,W)),每个key中的所有元素都在一起的据集

    groupWith(otherDataset, [numTasks])

    在类型(K,V)和(K,W)类型的据集上用,返回一个据集,成元素(K, Seq[V], Seq[W]) Tuples。个操作在其它框架,CoGroup

    cartesian(otherDataset)

    笛卡尔。但在据集T和U上用,返回一个(T,U)的据集,所有元素交互行笛卡尔。

    sortByKey([ascendingOrder])

    在类型( K, V )的据集上用,返回以K行排序的(K,V)据集。升序或者降序由boolean型的ascendingOrder参决定

    (类似于Hadoop的Map-Reduce中间段的Sort,按Key行排序)

    Actions(动作)

    Action

    Meaning

    reduce(func)

    通函func聚集据集中的所有元素。Func函接受2个参,返回一个值。个函必是联性的,确保可以被正确的并发行

    collect()

    在Driver的程序中,以的形式,返回据集的所有元素。通常会在使用filter或者其它操作后,返回一个足够小的据子集再使用,直接整个RDD集Collect返回,很可能会Driver程序OOM

    count()

    返回据集的元素个

    take(n)

    返回一个,由据集的前n个元素成。注意,个操作目前并非在多个点上,并行行,而是Driver程序所在机器,机算所有的元素

    (Gateway的存力会增大,需要慎使用)

    first()

    返回据集的第一个元素(类似于take(1))

    saveAsTextFile(path)

    据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其它hadoop支持的文件系统。Spark会用每个元素的toString方法,并它文件中的一行文本

    saveAsSequenceFile(path)

    据集的元素,以sequencefile的格式,保存到指定的目下,本地系统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必由key-value成,并都实了Hadoop的Writable接口,或式可以Writable(Spark包括了基本类型的,例如Int,Double,String等等)

    foreach(func)

    在据集的每一个元素上,运行函func。通常用于更新一个累加器变量,或者和外部存系统做交互

    用RDD的cache()方法,可以它在第一次算后,果保持存在存。据集的不同部分,会被存在算它的不同的集群点上,后的据集使用更快。存是有容功能的,如果任一分的RDD据失了,它会被使用原建它的,再算一次(不需要全部重新算,只算失的分)

    Shared Variables

    共享变量

    一般,一个函被递Spark操作(例如map和reduce),通常是在集群点上运行,在函中使用到的所有变量,都做分拷贝,供函操作,而不会互相影响。些变量会被拷贝到每一台机器,而在程机器上,在变量的所有更新,都不会被播回Driver程序。然而,Spark提供两种有限的共享变量,供两种公用的使用模式:广播变量和累加器

    广播变量

    广播变量允程序保留一个只的变量,存在每一台机器上,而非每个任务保存一份拷贝。他可以使用,例如,每个点一个大的输入据集,以一种高效的方式。Spark也会,使用一种高效的广播算法,减少沟通的耗。

    广播变量是从变量V建的,通用SparkContext.broadcast(v)方法。个广播变量是一个v的分装器,它的只可以通用value方法得。如下的解器模块展示了如何用:

    scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

    broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)

    scala> broadcastVar.value

    res0: Array[Int] = Array(1, 2, 3)

    在广播变量被建后,它能在集群运行的任何函上,被取代v值行用,从而v值不需要被再次递到些点上。另外,象v不能在被广播后修改,是只的,从而保所有点的变量,收到的都是一模一样的。

    累加器

    累加器是只能通合操作“加”起的变量,可以高效的被并行支持。他可以用实器(如同MapReduce中)和求和。Spark原生就支持Int和Double类型的器,程序可以添加新的类型。

    一个器,可以通用SparkContext.accumulator(V)方法建。运行在集群上的任务,可以使用+=加值。然而,它不能取器的值。Driver程序需要取值的候,它可以使用.value方法。

    如下的解器,展示了如何利用累加器,一个里面的所有元素相加

    scala> val accum = sc.accumulator(0)

    accum: spark.Accumulator[Int] = 0

    scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

    10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

    scala> accum.value

    res2: Int = 10

    更多料

    在Spark的网站上,你可以看到Spark样例程序

    另外,Spark包括了一些例子,在examples/src/main/scala上,有些既有Spark版本,又有本地非并行版本,允你看到如果要程序以集群化的方式跑起的,需要做什么改变。你可以运行它,通类名递spark中的run脚本 — 例如./run spark.examples.SparkPi. 每一个样例程序,都会打印使用帮助,运行任何参。

  • 相关阅读:
    基本技能训练之线程
    关于UEditor的使用配置(图片上传配置)
    PAT 乙级练习题1002. 写出这个数 (20)
    codeforces 682C Alyona and the Tree DFS
    codeforces 681D Gifts by the List dfs+构造
    codeforces 678E Another Sith Tournament 概率dp
    codeforces 680E Bear and Square Grid 巧妙暴力
    codeforces 678D Iterated Linear Function 矩阵快速幂
    codeforces 679A Bear and Prime 100 交互
    XTUOJ 1248 TC or CF 搜索
  • 原文地址:https://www.cnblogs.com/vincent-hv/p/3299176.html
Copyright © 2011-2022 走看看