本文翻自官方博客,略有添加:https://github.com/mesos/spark/wiki/Spark-Programming-Guide
Spark发指南
从高的面看,其实每一个Spark的用,都是一个Driver类,通运行用户定义的main函,在集群上行各种并发操作和算
Spark提供的最主要的抽象,是一个性分布式据集(RDD),它是一种特殊集合,可以分布在集群的点上,以函式程操作集合的方式,行各种各样的并发操作。它可以由hdfs上的一个文件建而,或者是Driver程序中,从一个已经存在的集合而。用户可以据集存在存中,它被有效的重用,行并发操作。最后,分布式据集可以自动的从点失中恢复,再次行算。
Spark的第二个抽象,是并行算中使用的共享变量。默,Spark并发运行一个函,它是以多个的task,在不同的点上运行,它递每一个变量的一个拷贝,到每一个立task使用到的函中,因此些变量并非共享的。然而有候,我需要在任务中能够被共享的变量,或者在任务与动程序之间共享。Spark支持两种类型的共享变量:
广播变量: 可以在存的所有点中被,用于存变量(只)
累加器: 只能用做加法的变量,例如和求和
本指南通一些样例展示些特征。者最好是熟悉Scala,尤其是包的法。留意,Spark可以通Spark-Shell的解器行交互式运行。你可能会需要它。
接入Spark
了一个Spark的用,你需要Spark和它的依,加入到CLASSPATH中。最的方法,就是运行sbt/sbt assemblySpark和它的依,打到一个Jar里面core/target/scala_2.9.1/spark-core-assembly-0.0.0.jar,然后它加入到你的CLASSPATH中。或者你可以spark发布到maven的本地存中,使用sbt/sbt publish。它在织org.spark-project下成一个spark-core.
另外,你会需要入一些Spark的类和式, 下面几行加入到你程序的部
import spark.SparkContext
import SparkContext._
初始化Spark
Spark程序需要做的第一件事情,就是建一个SparkContext象,它告Spark如何一个集群。个通常是通下面的构造器实的:
new SparkContext(master, jobName, [sparkHome], [jars])
Master参是一个字符串,指定了接的Mesos集群,或者用特殊的字符串“local”指明用local模式运行。如下面的描述一般,JobName是你任务的名,在集群上运行的候,会在Mesos的Web UI控界面示。后面的两个参,是用在你的代,部署到mesos集群上运行使用的,后面会提到。
在Spark的解器中,一个特殊的SparkContext变量已经你建,变量名字叫sc。建你自己的SparkContext是不会生效的。你可以通设置MASTER境变量,master接到需要的上下文。
MASTER=local; ./spark-shell
Master的命名
Master的名字可以是以下3个格式中的一种
Master Name |
Meaning |
local |
本地化运行Spark,使用一个Worker程(有并行)
|
local[K] |
本地化运行Spark,使用K个Worker程(根据机器的CPU核设定)
|
HOST:PORT |
Spark接到指定的Mesos Master,在集群上运行。Host参是Mesos Master的Hostname, 端口是master配置的端口,默5050. 注意:在早期的Mesos版本(spark的old-mesos分支),你必使用master@HOST:PORT. |
集群部署
如果你想你的任务运行在一个集群上,你需要指定2个可参:
- SparkHome:Spark在集群机器上的安装路(必全部一致)
- Jars:在本地机器上,包含了你任务的代和依的Jars文件列表。 Spark会把它部署到所有的集群点上。 你需要使用自己的系统你的作业,打包成一套jars文件。例如,如果你使用sbt,那么sbt-assembly插件是一个好方法,你的代和依,变成一个一的jar文件。
如果有一些类是公用的,需要在不同的作业间共享,你可能需要手工拷贝到mesos的点上,在conf/spark-env中,通设置SPARK_CLASSPATH境变量指向它。信息可以参考配置
分布式据集
Spark的核心概念,是性分布式据集(RDD),一个有容机制,可以被并行操作的集合。目前有两种类型的RDD:并行集合(Parrallelized Collections),接收一个已经存在的Scala集合,在它上面运行各种并发算; Hadoop据集(Hadoop DataSets),在一个文件的每条上,运行各种函。只要文件系统是Hdfs,或者hadoop支持的任意存系统。两种RDD都可以通相同的方式行操作。
并行集合
并行集合是通用SparkContext的parallelize方法,在一个已经存在的Scala集合(只要是seq象就可以)上建而。集合的象会被拷贝建一个分布式据集,可以被并行操作。下面通spark解器的例子,展示如何从一个建一个并发集合
scala> val data = Array(1, 2, 3, 4, 5)
data: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val distData = sc.parallelize(data)
distData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e
一旦被建,分布据集(distData)可以被并行操作。例如,我可以用distData.reduce(_ +_) 的元素相加。我会在后的分布式据集做一步描述。
建并行集合的一个重要参,是slices的目,它指定了据集切分几份。在集群模式中,Spark会在一份slice上起一个Task。典型的,你可以在集群中的每个cpu上,起2-4个Slice (也就是每个cpu分配2-4个Task)。一般,Spark会根据集群的况,自动设定slices的目。然而,你也可以手动的设置它,通parallelize方法的第二个参(例如:sc.parallelize(data, 10)).
Hadoop据集
Spark可以建分布式据集,从任何存在HDFS文件系统或者Hadoop支持的其它文件系统(包括本地文件,Amazon S3, Hypertable, HBase等等)上的文件。 Spark可以支持Text File, SequenceFiles 及其它任何Hadoop输入格式
文本文件的RDDs可以通SparkContext的textFile方法建,方法接受文件的URI地址(或者机器上的文件本地路,或者一个hdfs://, sdn://,kfs://,其它URI).里是一个用例子:
scala> val distFile = sc.textFile(“data.txt”)
distFile: spark.RDD[String] = spark.HadoopRDD@1d4cee08
一旦被建,distFile可以行据集操作。例如,我可以使用如下的map和reduce操作所有行的长度相加:
distFile.map(_.size).reduce(_ + _ )
方法也接受可的第二参,控制文件的分片目。默,Spark每一块文件建一个分片(HDFS默的块大小64MB),但是你可以通入一个更大的值指定更多的分片。注意,你不能指定一个比块个更少的片值(和hadoop中,Map不能小于Block一样)
于SequenceFiles,使用SparkContext的sequenceFile[K, V]方法,K和V是文件中的key和values类型。他必是Hadoop的Writable的子类,例如IntWritable和Text。另外,Spark允你指定几种原生的通用Writable类型,例如:sequencFile[Int, String]会自动取IntWritable和Texts
最后,于其它类型的Hadoop输入格式,你可以使用SparkContext.hadoopRDD方法,它可以接收任意类型的JobConf和输入格式类,类型和值类型。按照Hadoop作业一样的方法,设置输入源就可以了。
分布式据集操作
分布式据集支持两种操作:
(transformation):根据有的据集建一个新的据集
动作(actions):在据集上运行算后,返回一个值动程序
例如,Map是一个,据集的每一个元素,都经一个函行算后,返回一个新的分布式据集作果。而另一方面,Reduce是一个动作,据集的所有元素,用某个函行聚合,然后最果返回动程序,而并行的reduceByKey是返回一个分布式据集
所有Spark中的都是惰性的,也就是,并不会马上发生算。相反的,它只是住用到基据集上的些(Transformation)。而些(Transformation),只会在有一个动作(Action)发生,要求返回果动用,才真正行算。个设Spark更加有效率的运行。例如,我可以实,通map建一个据集,然后再用reduce,而只返回reduce的果driver,而不是整个大的据集。
spark提供的一个重要操作是Caching。你cache一个分布式据集,每个点会存据集的所有片,并在存中算,并在其它操作中重用。会使得后的算更加的快速(通常是10倍),存是spark中一个构造迭代算法的工具,也可以在解器中交互使用。
下面的表格列出目前支持的和动作:
(Transformations)
Transformation |
Meaning |
map(func)
|
返回一个新的分布式据集,由每个原元素经func函后成 |
filter(func)
|
返回一个新的据集,由经func函后返回值true的原元素成 |
flatMap(func) |
类似于map,但是每一个输入元素,会被映射0到多个输出元素(因此,func函的返回值是一个Seq,而不是一元素) |
sample(withReplacement,frac,seed)
|
根据定的随机种子seed,随机抽样出量frac的据 |
union(otherDataset)
|
返回一个新的据集,由原据集和参联合而成 |
groupByKey([numTasks])
|
在一个由(K,V)成的据集上用,返回一个(K,Seq[V])的据集。注意:默情况下,使用8个并行任务行分,你可以入numTask可参,根据据量设置不同目的Task (groupByKey和filter合,可以实类似Hadoop中的Reduce功能) |
reduceByKey(func, [numTasks]) |
在一个(K,V)的据集上使用,返回一个(K,V)的据集,key相同的值,都被使用指定的reduce函聚合到一起。和groupbykey类似,任务的个是可以通第二个可参配置的。 |
join(otherDataset, [numTasks]) |
在类型(K,V)和(K,W)类型的据集上用,返回一个(K,(V,W)),每个key中的所有元素都在一起的据集 |
groupWith(otherDataset, [numTasks]) |
在类型(K,V)和(K,W)类型的据集上用,返回一个据集,成元素(K, Seq[V], Seq[W]) Tuples。个操作在其它框架,CoGroup |
cartesian(otherDataset) |
笛卡尔。但在据集T和U上用,返回一个(T,U)的据集,所有元素交互行笛卡尔。 |
sortByKey([ascendingOrder]) |
在类型( K, V )的据集上用,返回以K行排序的(K,V)据集。升序或者降序由boolean型的ascendingOrder参决定 (类似于Hadoop的Map-Reduce中间段的Sort,按Key行排序) |
Actions(动作)
Action |
Meaning |
reduce(func) |
通函func聚集据集中的所有元素。Func函接受2个参,返回一个值。个函必是联性的,确保可以被正确的并发行 |
collect() |
在Driver的程序中,以的形式,返回据集的所有元素。通常会在使用filter或者其它操作后,返回一个足够小的据子集再使用,直接整个RDD集Collect返回,很可能会Driver程序OOM |
count() |
返回据集的元素个 |
take(n) |
返回一个,由据集的前n个元素成。注意,个操作目前并非在多个点上,并行行,而是Driver程序所在机器,机算所有的元素 (Gateway的存力会增大,需要慎使用) |
first() |
返回据集的第一个元素(类似于take(1)) |
saveAsTextFile(path) |
据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其它hadoop支持的文件系统。Spark会用每个元素的toString方法,并它文件中的一行文本 |
saveAsSequenceFile(path) |
据集的元素,以sequencefile的格式,保存到指定的目下,本地系统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必由key-value成,并都实了Hadoop的Writable接口,或式可以Writable(Spark包括了基本类型的,例如Int,Double,String等等) |
foreach(func) |
在据集的每一个元素上,运行函func。通常用于更新一个累加器变量,或者和外部存系统做交互 |
存
用RDD的cache()方法,可以它在第一次算后,果保持存在存。据集的不同部分,会被存在算它的不同的集群点上,后的据集使用更快。存是有容功能的,如果任一分的RDD据失了,它会被使用原建它的,再算一次(不需要全部重新算,只算失的分)
Shared Variables
共享变量
一般,一个函被递Spark操作(例如map和reduce),通常是在集群点上运行,在函中使用到的所有变量,都做分拷贝,供函操作,而不会互相影响。些变量会被拷贝到每一台机器,而在程机器上,在变量的所有更新,都不会被播回Driver程序。然而,Spark提供两种有限的共享变量,供两种公用的使用模式:广播变量和累加器
广播变量
广播变量允程序保留一个只的变量,存在每一台机器上,而非每个任务保存一份拷贝。他可以使用,例如,每个点一个大的输入据集,以一种高效的方式。Spark也会,使用一种高效的广播算法,减少沟通的耗。
广播变量是从变量V建的,通用SparkContext.broadcast(v)方法。个广播变量是一个v的分装器,它的只可以通用value方法得。如下的解器模块展示了如何用:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
在广播变量被建后,它能在集群运行的任何函上,被取代v值行用,从而v值不需要被再次递到些点上。另外,象v不能在被广播后修改,是只的,从而保所有点的变量,收到的都是一模一样的。
累加器
累加器是只能通合操作“加”起的变量,可以高效的被并行支持。他可以用实器(如同MapReduce中)和求和。Spark原生就支持Int和Double类型的器,程序可以添加新的类型。
一个器,可以通用SparkContext.accumulator(V)方法建。运行在集群上的任务,可以使用+=加值。然而,它不能取器的值。Driver程序需要取值的候,它可以使用.value方法。
如下的解器,展示了如何利用累加器,一个里面的所有元素相加
scala> val accum = sc.accumulator(0)
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
…
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10
更多料
在Spark的网站上,你可以看到Spark样例程序
另外,Spark包括了一些例子,在examples/src/main/scala上,有些既有Spark版本,又有本地非并行版本,允你看到如果要程序以集群化的方式跑起的,需要做什么改变。你可以运行它,通类名递spark中的run脚本 — 例如./run spark.examples.SparkPi. 每一个样例程序,都会打印使用帮助,运行任何参。