zoukankan      html  css  js  c++  java
  • Spark入门

    http://spark.incubator.apache.org/

    http://spark.incubator.apache.org/documentation.html

    http://ampcamp.berkeley.edu/3/exercises/data-exploration-using-spark.html, 非常好的hand-on exercises

    源码分析

    http://jerryshao.me/archive.html

    http://www.cnblogs.com/jerrylead/

     

    下载

    http://spark.apache.org/downloads.html

    下载需要的版本,解压就可以
    当然想自己编译也可以

     

    编译

    当前spark支持Maven编译,

    http://spark.apache.org/docs/latest/building-with-maven.html

    可以简单的这样编译,

    mvn -DskipTests clean package

    Spark如果需要通过Hadoop-Client来访问HDFS, 由于不同版本的Hadoop的client协议不同, 所以编译时需要匹配特定的Hadoop版本进行编译,参考上面的链接
    同样如果需要用Yarn做资源管理,也需要在编译的时候指明

     

    Quick Start

    http://spark.apache.org/docs/latest/quick-start.html

    Spark interactive

    Spark很方便的一点是,支持命令行的方式
    这样可以简单的学习和调试,或者interactively的进行数据分析,很赞
    Scala shell (./spark-shell)
    Python interpreter (./pyspark)

    用--help来查看如果使用 (./spark-shell --help)

    并且shell不但可以local使用,可以连接实际的集群,进行大规模的数据分析,
    ./bin/spark-shell --master spark://IP:PORT

    下面看个简单的例子,统计下README的行数,可以看到在shell中会自动创建SparkContext, sc

    scala> val textFile = sc.textFile("README.md")
    textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3
    scala> textFile.count() // Number of items in this RDD
    res0: Long = 126

    看个稍微复杂点的,里面用到java的math包,
    计算文件里面最长的行包含多少单词

    scala> import java.lang.Math
    import java.lang.Math
    scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
    res5: Int = 15

     

    Standalone Applications

    可以使用Scala,Java,或Python来写Spark的app
    这里描述下用scala是如何做的,Java和Python参考上面的链接,

    先写出应用,很简单,统计出文档里面,包含a和b的行数各是多少

    /* SimpleApp.scala */
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    
    object SimpleApp {
      def main(args: Array[String]) {
        val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
        val conf = new SparkConf().setAppName("Simple Application")
        val sc = new SparkContext(conf)
        val logData = sc.textFile(logFile, 2).cache()
        val numAs = logData.filter(line => line.contains("a")).count()
        val numBs = logData.filter(line => line.contains("b")).count()
        println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
      }
    }

    用sbt编译,创建simple.sbt,注意sbt中每行中间要空行来分割

    name := "Simple Project"
    version := "1.0"
    scalaVersion := "2.10.4"
    
    libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.0"

    创建如下的目录结构,

    # Your directory layout should look like this
    $ find .
    .
    ./simple.sbt
    ./src
    ./src/main
    ./src/main/scala
    ./src/main/scala/SimpleApp.scala

    编译,

    # Package a jar containing your application
    $ sbt package
    ...
    [info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar

    提交任务,并得到结果

    # Use spark-submit to run your application
    $ YOUR_SPARK_HOME/bin/spark-submit 
      --class "SimpleApp" 
      --master local[4] 
      target/scala-2.10/simple-project_2.10-1.0.jar
    ...
    Lines with a: 46, Lines with b: 23

    maven编译Spark应用

    参考,
    http://www.scala-lang.org/old/node/345
    https://blogs.oracle.com/arungupta/entry/scala_and_maven_getting_started

    sbt用不惯,也懒的学,连spark都用maven编译,所以介绍如何用maven来编译scala写的spark应用
    比较简单Maven本身提供scala的archetype,scala-archetype-simple 

    mvn archetype:generate
          -DarchetypeGroupId=org.scala-tools.archetypes
          -DarchetypeArtifactId=scala-archetype-simple 
          -DremoteRepositories=http://scala-tools.org/repo-releases
          -DgroupId=com.xxx
          -DartifactId=sparkapp
          -Dversion=1.0-SNAPSHOT

    然后再pom里面加上依赖,

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>1.1.0</version>
    </dependency>

    默认没有maven-assembly-plugin,加上

    <plugin>
      <artifactId>maven-assembly-plugin</artifactId>
      <configuration>
         <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
         </descriptorRefs>
         <archive>
            <manifest>
               <mainClass></mainClass>
            </manifest>
         </archive>
      </configuration>
      <executions>
         <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
               <goal>single</goal>
            </goals>
         </execution>
       </executions>
    </plugin>

    别忘了修改pom里面scala的版本,然后就mvn package,就ok了

     

    Running the Examples

    同时Spark也提供了些例子大家可以参考,也可以直接运行,如下

    ./bin/run-example SparkPi

    得到结果,Pi is roughly 3.14302

    也可以看下,SparkPi的实现,很简单,

    package org.apache.spark.examples
    
    import scala.math.random
    
    import org.apache.spark._
    
    /** Computes an approximation to pi */
    object SparkPi {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("Spark Pi")
        val spark = new SparkContext(conf)
        val slices = if (args.length > 0) args(0).toInt else 2
        val n = 100000 * slices
        val count = spark.parallelize(1 to n, slices).map { i =>
          val x = random * 2 - 1
          val y = random * 2 - 1
          if (x*x + y*y < 1) 1 else 0
        }.reduce(_ + _)
        println("Pi is roughly " + 4.0 * count / n)
        spark.stop()
      }
    }

     

    Programming Guide

    http://spark.apache.org/docs/latest/quick-start.html

     

    Initializing Spark

    开始用Spark, 首先需要创建SparkContext

    This is done through the following constructor:

    new SparkContext(master, appName, [sparkHome], [jars])
    

    1. master参数, 就是指明spark集群的位置url, 支持如下一些格式

    local, Run Spark locally with one worker thread (i.e. no parallelism at all).

    local[K], Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).

    spark://HOST:PORT, Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default.

    mesos://HOST:PORT, Connect to the given Mesos cluster. The host parameter is the hostname of the Mesos master. The port must be whichever one the master is configured to use, which is 5050 by default.

    If no master URL is specified, the spark shell defaults to “local”.

    2. appName, 这个很好理解,给个名字

    3. 如果你需要部署到分布式集群上, 那么就需要指定后面两个参数,
    sparkHome, spark在worker上的的安装目录,必须保持一致
    jars, 所有的jar, 包含你的应用和依赖, spark会部署到所有worker上, 并自动加入到classpath

     

    Resilient Distributed Datasets (RDDs)

    首先数据如果要在Spark中被处理,首先需要导入成RDD
    如何生成RDD有两种方式,

    Parallelized collections

    可以将进程中的数据结构转换为RDD,这样挺方便,你可以把数据从任意源中先读出来,然后转成RDD
    但前提是数据不能太大,否则效率会是问题,因为这种case是会把数据copy到各个节点
    对于Scala,在Scala collection上面调用SparkContext的parallelize函数, 作用就是把collection分成多个slices, 并分发到各个分布式节点上, 这样便于并行处理

    scala> val data = Array(1, 2, 3, 4, 5)
    scala> val distData = sc.parallelize(data) //会将data, 分成slices, 并分发到各个节点
    distData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e

    这里slices, 其实spark会根据集群的CPU情况自动分配, 但是你也可以自己设置

    sc.parallelize(data, 10)

     

    External Datasets

    当然更为常见的是,从外部数据集中读取数据,转换为RDD
    Spark最常用的是从文件读取数据,local,hdfs或s3等
    Spark也支持从数据库,如Cassandra, HBase, 甚至是Mysql读取数据(通过jdbcRDD)


    先看看使用SparkContext的textFile方法来加载文件,
    Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3n://, kfs://, etc URI)and reads it as a collection of lines. Here is an example invocation:

    scala> val distFile = sc.textFile("data.txt")
    distFile: spark.RDD[String] = spark.HadoopRDD@1d4cee08
    

    textFile一样可以指定slices参数, 默认Spark会为每个block file创建一个slice, 你可以选择更多的slices, 但不能比block数少

    需要注意的是,对于外部存储,是各个worker独自去读自己的数据的,所以如果用分布式存储没有问题,但如果用local存储就需要保证在每个节点的改目录上都要有这个文件
    所以如果不用分布式存储,就使用共享存储,不然比较麻烦,需要自己copy到每个节点

    textFile会将文件读出成collection of lines,所以可以for直接遍历每行
    并且还支持目录,压缩文件,或通配符的读取,
    textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").

    除了textFile,还支持其他的读取接口,
    SparkContext.wholeTextFiles,用于读一个目录,返回(filename, content) pairs
    SequeceFiles, 使用sequenceFile[K, V]接口, K, V表示类型, 应该是Writable的子类
    SparkContext.hadoopRDD method, which takes an arbitrary JobConf and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source.

     

    RDD Operations

    现在已经把原始数据导入成RDD,后面就是如果继续处理
    Spark对于RDD有两种操作,transformations和actions
    transformation具有lazy特性,其实就是调用transform的时候,不会真的执行,只会记录下这个操作而已
    action,会真正的runjob去执行

    下面会列出最常用的的transform和action
    这个链接会给出所有操作的例子,不错
    http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

    Transformations

    map(func)
    Return a new distributed dataset formed by passing each element of the source through a function func.

    filter(func)
    Return a new dataset formed by selecting those elements of the source on which func returns true.

    flatMap(func), map是1->1, flatMap是1->0或多个,所以这个函数返回的是Seq,最终会把所有的Seq合成一个
    Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
    Example,
    sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect
    res85: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)

    为何需要下面两个,当map里面有比较耗时的初始化操作,比如链接和断开数据库,肯定不想每个item都做一遍
    所以用mapPartitons就可以一个partition只做一次,输入输出都是iterator
    而mapPartitionsWithSplit只是多传入split index而已,也许对于某些场景需要知道
    mapPartitions(func)
    Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator[T] => Iterator[U] when running on an RDD of type T.
    mapPartitionsWithSplit(func)
    Similar to mapPartitions, but also provides func with an integer value representing the index of the split, so func must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T.

    pipe(command, [envVars]), 以shell命令处理RDD数据
    Takes the RDD data of each partition and sends it via stdin to a shell-command. The resulting output of the command is captured and returned as a RDD of string values.
    Example,
    val a = sc.parallelize(1 to 9, 3)
    a.pipe("head -n 1").collect
    res2: Array[String] = Array(1, 4, 7)

    sample(withReplacement, fraction, seed), 随机抽样
    Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
    withReplacement,是否放回抽样
    fraction,比例,0.1表示10%
    seed,伪随机,相同的seed得到的随机序列是一样的,所以如果不设seed,同一段代码执行两遍得到的随机序列是一样的
    example,
    val a = sc.parallelize(1 to 10000, 3)
    a.sample(false, 0.1, 0).count
    res24: Long = 960

    集合操作
    union(otherDataset)
    Return a new dataset that contains the union of the elements in the source dataset and the argument.
    intersection(otherDataset)
    Return a new RDD that contains the intersection of elements in the source dataset and the argument.
    distinct([numTasks]))
    Return a new dataset that contains the distinct elements of the source dataset.
    Example,
    val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
    c.distinct.collect
    res6: Array[String] = Array(Dog, Gnu, Cat, Rat)

    下面的操作都是基于key-value pairs的,而其他的操作大都是不限制数据类型
    先看几种生成key-value pairs的函数,
    KeyBy(func)
    用于构建key-value pairs,func是用于基于item来构建key,item作为value
    Example,
    val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
    val b = a.keyBy(_.length)
    b.collect
    res26: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant)

    Zip
    Joins two RDDs by combining the i-th of either partition with each other.
    Examples,
    val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
    val b = sc.parallelize(1 to a.count.toInt, 2)
    val c = a.zip(b)
    c.sortByKey(true).collect
    res74: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3)))

    cartesian(otherDataset), 笛卡尔积
    When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
    Example,
    val x = sc.parallelize(List(1,2,3,4,5))
    val y = sc.parallelize(List(6,7,8,9,10))
    x.cartesian(y).collect
    res0: Array[(Int, Int)] = Array((1,6), (1,7), (1,8), (1,9), (1,10), (2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7), (3,8), (3,9), (3,10), (4,6), (5,6), (4,7), (5,7), (4,8), (5,8), (4,9), (4,10), (5,9), (5,10))

     

    再看看基于key-value pairs的操作,
    这类操作要求是key-value pairs,基于PairRDDFunctions
    这类操作往往需要shuffle,如group或aggregate
    并且都有个可选参数numTasks,默认就是parent的partition数目,当然也可以指定

    groupByKey([numTasks])
    When called on a dataset of (K, V) pairs, returns a dataset of (K, Seq[V]) pairs.
    如果group的目的是做sum或average,那用reduceByKey or combineByKey会有更好的效率
    Example,
    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)
    val b = a.keyBy(_.length)
    b.groupByKey.collect
    res11: Array[(Int, Seq[String])] = Array((4,ArrayBuffer(lion)), (6,ArrayBuffer(spider)), (3,ArrayBuffer(dog, cat)), (5,ArrayBuffer(tiger, eagle)))

    reduceByKey(func, [numTasks])
    When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function.
    经典的例子,统计文本词数,
    val lines = sc.textFile("data.txt")
    val pairs = lines.map(s => (s, 1))
    val counts = pairs.reduceByKey((a, b) => a + b)

    aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
    When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations.
    这个function看着比较难懂,
    aggreateByKey(zeroValue: U)(seqOp: (U, T)=> U, combOp: (U, U) =>U)
    写成这样好懂点,
    首先和reduceByKey的不同在于,reduceByKey输入输出都是(K, V)
    而aggreateByKey输出是(K, U),可以不同于输入(K, V)
    这里需要定义3个东西,类似combineByKey
    zeroValue: U,初始值,比如空列表{}
    seqOp: (U, T)=> U, seq操作符,描述如何将T合并入U,比如如何将item合并到列表
    combOp: (U, U) =>U,comb操作符,描述如果合并两个U,比如合并两个列表
    所以aggreateByKey可以看成更高抽象的,更灵活的reduce或group
    Example,
    val z = sc.parallelize(List(1,2,3,4,5,6), 2)
    z.aggreateByKey(0)(math.max(_, _), _ + _)
    res40: Int = 9
    val z = sc.parallelize(List("a","b","c","d","e","f"),2)
    z.aggreateByKey("")(_ + _, _+_)
    res115: String = abcdef

    sortByKey([ascending], [numTasks])
    When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

    join(otherDataset, [numTasks])
    When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
    Outer joins are also supported through leftOuterJoin and rightOuterJoin.
    Example,
    val a = sc.parallelize(List("dog", "salmon","elephant"), 3)
    val b = a.keyBy(_.length)
    val c = sc.parallelize(List("dog","rabbit","turkey","wolf"), 3)
    val d = c.keyBy(_.length)
    b.join(d).collect
    res0: Array[(Int, (String, String))] = Array((6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)))

    cogroup(otherDataset, [numTasks])
    When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Iterable<V>, Iterable<W>)tuples. This operation is also called groupWith.
    groupByKey, 可以用于group一个rdd里面的数据,cogroup可以同时group多个rdd的数据
    Example,
    val a = sc.parallelize(List(1, 2, 1, 3), 1)
    val b = a.map((_, "b"))
    val c = a.map((_, "c"))
    val d = a.map((_, "d"))
    b.cogroup(c, d).collect
    res2: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(b, b),CompactBuffer(c, c),CompactBuffer(d, d))), (3,(CompactBuffer(b),CompactBuffer(c),CompactBuffer(d))), (2,(CompactBuffer(b),CompactBuffer(c),CompactBuffer(d))))

    下面的操作用于改变partition的个数,
    为何要改变partition个数?因为有些操作会大幅的增加或减少RDD中的数据
    比如filter,可以会过滤掉极大部分的数据,所以此时可以用coalesce来减少partition的数目
    repartition,不光可以减少,也可以增加partition的数目,而且一定会用reshuffle来随机的balance各个partition
    coalesce(numPartitions)
    Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

    repartition(numPartitions)
    Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

    Actions

    reduce(func)
    Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
    Example,
    val a = sc.parallelize(1 to 100, 3)
    a.reduce(_ + _)
    res41: Int = 5050

    collect()
    Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
    Example,
    val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
    c.collect
    res29: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)

    count()
    Return the number of elements in the dataset.
    Example,
    val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
    c.count
    res2: Long = 4

    first()
    Return the first element of the dataset (similar to take(1)).
    Example,
    val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
    c.first
    res1: String = Gnu

    take(n)
    Return an array with the first n elements of the dataset. Note that this is currently not executed in parallel. Instead, the driver program computes all the elements.
    Example,
    val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)
    b.take(2)
    res18: Array[String] = Array(dog, cat)

    takeSample(withReplacement, num, seed)
    Return an array with a random sample of num elements of the dataset, with or without replacement, using the given random number generator seed.
    和Sample不同,
    是action所以返回的是array而不是RDD
    第二个参数num会精确指定抽样数,而不是比例
    返回array时,会进行随机排序

    saveAsTextFile(path)
    Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
    Example,
    val a = sc.parallelize(1 to 10000, 3)
    a.saveAsTextFile("mydata_a")
    这个例子会在mydata_a目录下存3个文件,part-00000,part-00001,part-00002

    saveAsSequenceFile(path)
    Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is only available on RDDs of key-value pairs that either implement Hadoop's Writable interface or are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).

    countByKey()
    Only available on RDDs of type (K, V). Returns a `Map` of (K, Int) pairs with the count of each key.
    Example,
    val c = sc.parallelize(List((3, "Gnu"), (3, "Yak"), (5, "Mouse"), (3, "Dog")), 2)
    c.countByKey
    res3: scala.collection.Map[Int,Long] = Map(3 -> 3, 5 -> 1)

    foreach(func)
    Run a function func on each element of the dataset. This is usually done for side effects such as updating an accumulator variable (see below) or interacting with external storage systems.
    Example,
    val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3)
    c.foreach(x => println(x + "s are yummy"))
    lions are yummy
    gnus are yummy
    crocodiles are yummy
    ants are yummy
    whales are yummy
    dolphins are yummy
    spiders are yummy

    Passing Functions to Spark

    使用spark有很多的transform和action作为元操作,很方便
    但总有一些特殊的逻辑,需要用比如map引用到数据上面去,那么如果来定义这些function
    1. 对于很短的逻辑,直接用Anonymous function syntax,这是最方便的
    lineLengths.reduce((a, b) => a + b)

    2. 当然不可能所有的逻辑都那么短,所以可以使用Static methods in a global singleton object
    因为Scala里面没有静态类,所以global singleton object其实就是静态类的概念
    object MyFunctions {
      def func1(s: String): String = { ... }
    }
    myRdd.map(MyFunctions.func1)

    上面两种是推荐的形式,如果你非要用普通类的成员函数,或是函数中用到类成员
    class MyClass {
      def func1(s: String): String = { ... }
      def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
    }
    class MyClass {
      val field = "Hello"
      def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
    }

    那么需要了解,这样整个类对象都需要被send到集群上去执行,比较低效
    对于用到成员变量的case,可以用局部变量替换绕过
    def doStuff(rdd: RDD[String]): RDD[String] = {
      val field_ = this.field
      rdd.map(x => field_ + x)
    }

    RDD Persistence

    在一个stage中,各个transform就是function的调用,中间结果是不会保留的,当然出于方便或避免反复计算,也可以cache某个中间结果
    这就是RDD persisitence的目的
    可以选择不同的storage level, 如果使用cache(), 就是使用默认的MEMORY_ONLY
    当然也可以使用persist()接口, 可以选择不同的level

    MEMORY_ONLY
    Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.

    MEMORY_AND_DISK
    Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.

    MEMORY_ONLY_SER
    Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.

    MEMORY_AND_DISK_SER
    Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.

    DISK_ONLY
    Store the RDD partitions only on disk.

    MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.
    Same as the levels above, but replicate each partition on two cluster nodes.

    如何选择不同的storage level?
    尽量选择memory_only
    实在memory不够, 可以考虑使用memory_only_ser, 会节省些空间, 但这样的问题是每次使用都需要做反序列化, 所以要用快的序列化库
    尽量不要使用disk, 因为一般重新计算也比从disk读快, 除非计算复杂度非常高
    一般不需要使用replica, 因为RDD都会通过用重新计算来快速fault recovery, 除非有实时需求, 不忍容忍重新计算的时间

    并且,Spark会以least-recently-used (LRU)的方式清理旧的cache,也可以通过RDD.unpersist(),手工清空某个RDD的cache

    Shared Variables

    一般来说, 对于这种分布式架构是很难提供shared variables的, 但是出于方便的要求, Spark提供两种特殊的shared variables, broadcast variables and accumulators

    Broadcast Variables

    通过接口广播read-only variable到每个节点, 更灵活些(和shipping a copy of it with tasks比较)
    注意这个值是不可以改变的, 否则就会导致各个节点不一致

    Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.

    scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) //广播数据
    broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)
    scala> broadcastVar.value   //通过value取到广播的值
    res0: Array[Int] = Array(1, 2, 3)
    

     

    Accumulators

    用于来实现counter, Spark支持int和double类型的accumulator, 用户可以实现其他类型的
    注意的是在各个节点都可以通过+=来增加accumulator, 但只有在driver上可以read accumulator的值

    The interpreter session below shows an accumulator being used to add up the elements of an array:

    scala> val accum = sc.accumulator(0)
    accum: spark.Accumulator[Int] = 0
    
    scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
    ...
    10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
    
    scala> accum.value
    res2: Int = 10
    
  • 相关阅读:
    刷题-力扣-700. 二叉搜索树中的搜索
    作业要求20191010-8 alpha week 1/2 Scrum立会报告+燃尽图 06
    20191010-7 alpha week 1/2 Scrum立会报告+燃尽图 05
    20191010-6 alpha week 1/2 Scrum立会报告+燃尽图 04
    20191010-5 alpha week 1/2 Scrum立会报告+燃尽图 03
    20191010-4 alpha week 1/2 Scrum立会报告+燃尽图 02
    20191010-3 alpha week 1/2 Scrum立会报告+燃尽图 01
    扛把子组Scrum立会报告+燃尽图 07
    扛把子组20180926-1 选题展示
    "PSP助手”微信小程序宣传视频链接及内容介绍
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3422938.html
Copyright © 2011-2022 走看看