zoukankan      html  css  js  c++  java
  • 【译】Spark官方文档——编程指南

    本文翻自官方博客,略有添加:https://github.com/mesos/spark/wiki/Spark-Programming-Guide

    Spark发指南

    从高的面看,其实每一个Spark的用,都是一个Driver类,通运行用户定义的main函,在集群上行各种并发操作和算

    Spark提供的最主要的抽象,是一个性分布式据集(RDD),它是一种特殊集合,可以分布在集群的点上,以函式程操作集合的方式,行各种各样的并发操作。它可以由hdfs上的一个文件建而,或者是Driver程序中,从一个已经存在的集合而。用户可以据集存在存中,它被有效的重用,行并发操作。最后,分布式据集可以自动的从点失中恢复,再次行算。

    Spark的第二个抽象,是并行算中使用的共享变量。默,Spark并发运行一个函,它是以多个的task,在不同的点上运行,它递每一个变量的一个拷贝,到每一个立task使用到的函中,因此些变量并非共享的。然而有候,我需要在任务中能够被共享的变量,或者在任务与动程序之间共享。Spark支持两种类型的共享变量:

    广播变量: 可以在存的所有点中被,用于存变量(只)

    累加器: 只能用做加法的变量,例如和求和

    本指南通一些样例展示些特征。者最好是熟悉Scala,尤其是包的法。留意,Spark可以通Spark-Shell的解器行交互式运行。你可能会需要它。

    接入Spark

    了一个Spark的用,你需要Spark和它的依,加入到CLASSPATH中。最的方法,就是运行sbt/sbt assemblySpark和它的依,打到一个Jar里面core/target/scala_2.9.1/spark-core-assembly-0.0.0.jar,然后它加入到你的CLASSPATH中。或者你可以spark发布到maven的本地存中,使用sbt/sbt publish。它在织org.spark-project下成一个spark-core.

    另外,你会需要入一些Spark的类和式, 下面几行加入到你程序的部

    import spark.SparkContext

    import SparkContext._

    初始化Spark

    Spark程序需要做的第一件事情,就是建一个SparkContext象,它告Spark如何一个集群。个通常是通下面的构造器实的:

    new SparkContext(master, jobName, [sparkHome], [jars])

    Master参是一个字符串,指定了接的Mesos集群,或者用特殊的字符串“local”指明用local模式运行。如下面的描述一般,JobName是你任务的名,在集群上运行的候,会在Mesos的Web UI控界面示。后面的两个参,是用在你的代,部署到mesos集群上运行使用的,后面会提到。

    在Spark的解器中,一个特殊的SparkContext变量已经你建,变量名字叫sc。建你自己的SparkContext是不会生效的。你可以通设置MASTER境变量,master接到需要的上下文。

    MASTER=local; ./spark-shell

    Master的命名

    Master的名字可以是以下3个格式中的一种

    Master Name

    Meaning

    local

    本地化运行Spark,使用一个Worker程(有并行)

     

    local[K]

    本地化运行Spark,使用K个Worker程(根据机器的CPU核设定)

     

    HOST:PORT

    Spark接到指定的Mesos Master,在集群上运行。Host参是Mesos Master的Hostname, 端口是master配置的端口,默5050.

    注意:在早期的Mesos版本(spark的old-mesos分支),你必使用master@HOST:PORT.

    集群部署

    如果你想你的任务运行在一个集群上,你需要指定2个可参:

    • SparkHome:Spark在集群机器上的安装路(必全部一致)
    • Jars:在本地机器上,包含了你任务的代和依的Jars文件列表。 Spark会把它部署到所有的集群点上。 你需要使用自己的系统你的作业,打包成一套jars文件。例如,如果你使用sbt,那么sbt-assembly插件是一个好方法,你的代和依,变成一个一的jar文件。

    如果有一些类是公用的,需要在不同的作业间共享,你可能需要手工拷贝到mesos的点上,在conf/spark-env中,通设置SPARK_CLASSPATH境变量指向它。信息可以参考配置

    分布式据集

    Spark的核心概念,是性分布式据集(RDD),一个有容机制,可以被并行操作的集合。目前有两种类型的RDD:并行集合(Parrallelized Collections),接收一个已经存在的Scala集合,在它上面运行各种并发算; Hadoop据集(Hadoop DataSets),在一个文件的每条上,运行各种函。只要文件系统是Hdfs,或者hadoop支持的任意存系统。两种RDD都可以通相同的方式行操作。

    并行集合

    并行集合是通用SparkContext的parallelize方法,在一个已经存在的Scala集合(只要是seq象就可以)上建而。集合的象会被拷贝建一个分布式据集,可以被并行操作。下面通spark解器的例子,展示如何从一个建一个并发集合

    scala> val data = Array(1, 2, 3, 4, 5)

    data: Array[Int] = Array(1, 2, 3, 4, 5)

    scala> val distData = sc.parallelize(data)

    distData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e

    一旦被建,分布据集(distData)可以被并行操作。例如,我可以用distData.reduce(_ +_) 的元素相加。我会在后的分布式据集做一步描述。

    建并行集合的一个重要参,是slices的目,它指定了据集切分几份。在集群模式中,Spark会在一份slice上起一个Task。典型的,你可以在集群中的每个cpu上,起2-4个Slice (也就是每个cpu分配2-4个Task)。一般,Spark会根据集群的况,自动设定slices的目。然而,你也可以手动的设置它,通parallelize方法的第二个参(例如:sc.parallelize(data, 10)).

    Hadoop据集

    Spark可以建分布式据集,从任何存在HDFS文件系统或者Hadoop支持的其它文件系统(包括本地文件,Amazon S3, Hypertable, HBase等等)上的文件。 Spark可以支持Text File, SequenceFiles 及其它任何Hadoop输入格式

    文本文件的RDDs可以通SparkContext的textFile方法建,方法接受文件的URI地址(或者机器上的文件本地路,或者一个hdfs://, sdn://,kfs://,其它URI).里是一个用例子:

    scala> val distFile = sc.textFile(“data.txt”)

    distFile: spark.RDD[String] = spark.HadoopRDD@1d4cee08

    一旦被建,distFile可以行据集操作。例如,我可以使用如下的map和reduce操作所有行的长度相加:

    distFile.map(_.size).reduce(_ + _ )

    方法也接受可的第二参,控制文件的分片目。默,Spark每一块文件建一个分片(HDFS默的块大小64MB),但是你可以通入一个更大的值指定更多的分片。注意,你不能指定一个比块个更少的片值(和hadoop中,Map不能小于Block一样)

    于SequenceFiles,使用SparkContext的sequenceFile[K, V]方法,K和V是文件中的key和values类型。他必是Hadoop的Writable的子类,例如IntWritable和Text。另外,Spark允你指定几种原生的通用Writable类型,例如:sequencFile[Int, String]会自动取IntWritable和Texts

    最后,于其它类型的Hadoop输入格式,你可以使用SparkContext.hadoopRDD方法,它可以接收任意类型的JobConf和输入格式类,类型和值类型。按照Hadoop作业一样的方法,设置输入源就可以了。

    分布式据集操作

    分布式据集支持两种操作:

    (transformation):根据有的据集建一个新的据集

    动作(actions):在据集上运行算后,返回一个值动程序

    例如,Map是一个,据集的每一个元素,都经一个函行算后,返回一个新的分布式据集作果。而另一方面,Reduce是一个动作,据集的所有元素,用某个函行聚合,然后最果返回动程序,而并行的reduceByKey是返回一个分布式据集

    所有Spark中的都是惰性的,也就是,并不会马上发生算。相反的,它只是住用到基据集上的些(Transformation)。而些(Transformation),只会在有一个动作(Action)发生,要求返回果动用,才真正行算。个设Spark更加有效率的运行。例如,我可以实,通map建一个据集,然后再用reduce,而只返回reduce的果driver,而不是整个大的据集。

    spark提供的一个重要操作是Caching。你cache一个分布式据集,每个点会存据集的所有片,并在存中算,并在其它操作中重用。会使得后的算更加的快速(通常是10倍),存是spark中一个构造迭代算法的工具,也可以在解器中交互使用。

    下面的表格列出目前支持的和动作:

    (Transformations)

    Transformation

    Meaning

    map(func)

     

    返回一个新的分布式据集,由每个原元素经func函后成

    filter(func)

     

    返回一个新的据集,由经func函后返回值true的原元素成

    flatMap(func)

    类似于map,但是每一个输入元素,会被映射0到多个输出元素(因此,func函的返回值是一个Seq,而不是一元素)

    sample(withReplacement,frac,seed)

     

    根据定的随机种子seed,随机抽样出量frac的据

    union(otherDataset)

     

    返回一个新的据集,由原据集和参联合而成

    groupByKey([numTasks])

     

    在一个由(K,V)成的据集上用,返回一个(K,Seq[V])的据集。注意:默情况下,使用8个并行任务行分,你可以入numTask可参,根据据量设置不同目的Task

    (groupByKey和filter合,可以实类似Hadoop中的Reduce功能)

    reduceByKey(func, [numTasks])

    在一个(K,V)的据集上使用,返回一个(K,V)的据集,key相同的值,都被使用指定的reduce函聚合到一起。和groupbykey类似,任务的个是可以通第二个可参配置的。

    join(otherDataset, [numTasks])

    在类型(K,V)和(K,W)类型的据集上用,返回一个(K,(V,W)),每个key中的所有元素都在一起的据集

    groupWith(otherDataset, [numTasks])

    在类型(K,V)和(K,W)类型的据集上用,返回一个据集,成元素(K, Seq[V], Seq[W]) Tuples。个操作在其它框架,CoGroup

    cartesian(otherDataset)

    笛卡尔。但在据集T和U上用,返回一个(T,U)的据集,所有元素交互行笛卡尔。

    sortByKey([ascendingOrder])

    在类型( K, V )的据集上用,返回以K行排序的(K,V)据集。升序或者降序由boolean型的ascendingOrder参决定

    (类似于Hadoop的Map-Reduce中间段的Sort,按Key行排序)

    Actions(动作)

    Action

    Meaning

    reduce(func)

    通函func聚集据集中的所有元素。Func函接受2个参,返回一个值。个函必是联性的,确保可以被正确的并发行

    collect()

    在Driver的程序中,以的形式,返回据集的所有元素。通常会在使用filter或者其它操作后,返回一个足够小的据子集再使用,直接整个RDD集Collect返回,很可能会Driver程序OOM

    count()

    返回据集的元素个

    take(n)

    返回一个,由据集的前n个元素成。注意,个操作目前并非在多个点上,并行行,而是Driver程序所在机器,机算所有的元素

    (Gateway的存力会增大,需要慎使用)

    first()

    返回据集的第一个元素(类似于take(1))

    saveAsTextFile(path)

    据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其它hadoop支持的文件系统。Spark会用每个元素的toString方法,并它文件中的一行文本

    saveAsSequenceFile(path)

    据集的元素,以sequencefile的格式,保存到指定的目下,本地系统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必由key-value成,并都实了Hadoop的Writable接口,或式可以Writable(Spark包括了基本类型的,例如Int,Double,String等等)

    foreach(func)

    在据集的每一个元素上,运行函func。通常用于更新一个累加器变量,或者和外部存系统做交互

    用RDD的cache()方法,可以它在第一次算后,果保持存在存。据集的不同部分,会被存在算它的不同的集群点上,后的据集使用更快。存是有容功能的,如果任一分的RDD据失了,它会被使用原建它的,再算一次(不需要全部重新算,只算失的分)

    Shared Variables

    共享变量

    一般,一个函被递Spark操作(例如map和reduce),通常是在集群点上运行,在函中使用到的所有变量,都做分拷贝,供函操作,而不会互相影响。些变量会被拷贝到每一台机器,而在程机器上,在变量的所有更新,都不会被播回Driver程序。然而,Spark提供两种有限的共享变量,供两种公用的使用模式:广播变量和累加器

    广播变量

    广播变量允程序保留一个只的变量,存在每一台机器上,而非每个任务保存一份拷贝。他可以使用,例如,每个点一个大的输入据集,以一种高效的方式。Spark也会,使用一种高效的广播算法,减少沟通的耗。

    广播变量是从变量V建的,通用SparkContext.broadcast(v)方法。个广播变量是一个v的分装器,它的只可以通用value方法得。如下的解器模块展示了如何用:

    scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

    broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)

    scala> broadcastVar.value

    res0: Array[Int] = Array(1, 2, 3)

    在广播变量被建后,它能在集群运行的任何函上,被取代v值行用,从而v值不需要被再次递到些点上。另外,象v不能在被广播后修改,是只的,从而保所有点的变量,收到的都是一模一样的。

    累加器

    累加器是只能通合操作“加”起的变量,可以高效的被并行支持。他可以用实器(如同MapReduce中)和求和。Spark原生就支持Int和Double类型的器,程序可以添加新的类型。

    一个器,可以通用SparkContext.accumulator(V)方法建。运行在集群上的任务,可以使用+=加值。然而,它不能取器的值。Driver程序需要取值的候,它可以使用.value方法。

    如下的解器,展示了如何利用累加器,一个里面的所有元素相加

    scala> val accum = sc.accumulator(0)

    accum: spark.Accumulator[Int] = 0

    scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

    10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

    scala> accum.value

    res2: Int = 10

    更多料

    在Spark的网站上,你可以看到Spark样例程序

    另外,Spark包括了一些例子,在examples/src/main/scala上,有些既有Spark版本,又有本地非并行版本,允你看到如果要程序以集群化的方式跑起的,需要做什么改变。你可以运行它,通类名递spark中的run脚本 — 例如./run spark.examples.SparkPi. 每一个样例程序,都会打印使用帮助,运行任何参。

  • 相关阅读:
    基于Maven的MyBatis Generator逆向工程
    JQuery对象调用reset方法:Uncaught TypeError: $(...).reset is not a function
    死锁编码及定位分析
    线程池的简介及底层原理
    转载:Mysql8.0忘记 root 密码, 如何修改?
    synchronized 和 Lock 有什么区别?
    java 中的阻塞队列及生产者-消费者中的简单应用
    java 中 CountDownLatch、CyclicBarrier 和 Semaphore 的简单使用
    java中的公平锁、非公平锁、可重入锁、递归锁、自旋锁、独占锁和共享锁
    Java 集合类的线程安全问题及解决方法
  • 原文地址:https://www.cnblogs.com/vincent-hv/p/3299176.html
Copyright © 2011-2022 走看看