zoukankan      html  css  js  c++  java
  • 【spark系列3】spark开发简单指南

    分布式数据集创建之textFile


            文本文件的RDDs能够通过SparkContext的textFile方法创建,该方法接受文件的URI地址(或者机器上的文件本地路径,或者一个hdfs://, sdn://,kfs://,其他URI).这里是一个调用样例:

    scala> val distFile = sc.textFile(“data.txt”)

    distFile: spark.RDD[String] = spark.HadoopRDD@1d4cee08



    分布式数据集操作之转换和动作


    分布式数据集支持两种操作:
    1. 转换(transformations):依据现有的数据集创建一个新的数据集
    2. 动作(actions):在数据集上执行计算后,返回一个值给驱动程序



    数据集操作之map和reduce


           一旦被创建,distFile能够进行数据集操作。比如,我们能够使用例如以下的map和reduce操作将全部行数的长度相加:

    distFile.map(_.size).reduce(_ + _ )

            方法也接受可选的第二參数,来控制文件的分片数目。默认来说,Spark为每一块文件创建一个分片(HDFS默认的块大小为64MB),可是你能够通过传入一个更大的值来指定很多其它的分片。注意,你不能指定一个比块个数更少的片值(和hadoop中,Map数不能小于Block数一样)

    1. Map是一个转换,将数据集的每个元素,都经过一个函数进行计算后,返回一个新的分布式数据集作为结果。
    2. Reduce是一个动作,将数据集的全部元素,用某个函数进行聚合,然后将终于结果返回驱动程序,而并行的reduceByKey还是返回一个分布式数据集




    转换是惰性的


            全部Spark中的转换都是惰性的,也就是说,并不会立即发生计算。相反的,它仅仅是记住应用到基础数据集上的这些转换(Transformation)。

           而这些转换(Transformation),仅仅会在有一个动作(Action)发生,要求返回结果给驱动应用时,才真正进行计算。这个设计让Spark更加有效率的执行。比如,我们能够实现,通过map创建一个数据集,然后再用reduce,而仅仅返回reduce的结果给driver,而不是整个大的数据集。



    重要转换操作之caching(缓存)


            spark提供的一个重要转换操作是Caching。当你cache一个分布式数据集时,每一个节点会存储该数据集的全部片,并在内存中计算,并在其他操作中重用。这将会使得兴许的计算更加的高速(一般是10倍),缓存是spark中一个构造迭代算法的关键工具,也能够在解释器中交互使用。

            调用RDD的cache()方法,能够让它在第一次计算后,将结果保持存储在内存。数据集的不同部分,将会被存储在计算它的不同的集群节点上,让兴许的数据集使用更快。缓存是有容错功能的,假设任一分区的RDD数据丢失了,它会被使用原来创建它的转换,再计算一次(不须要所有又一次计算,仅仅计算丢失的分区)。



    眼下支持的转换(transformation


    Transformation

    Meaning

    map(func)

     

    返回一个新的分布式数据集,由每一个原元素经过func函数转换后组成

    filter(func)

     

    返回一个新的数据集,由经过func函数后返回值为true的原元素组成

    flatMap(func)

    类似于map,可是每个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)

    sample(withReplacementfracseed)

     

    依据给定的随机种子seed,随机抽样出数量为frac的数据

    union(otherDataset)

     

    返回一个新的数据集,由原数据集和參数联合而成

    groupByKey([numTasks])

     

    在一个由(K,V)对组成的数据集上调用,返回一个(KSeq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你能够传入numTask可选參数,依据数据量设置不同数目的Task

    groupByKeyfilter结合,能够实现类似Hadoop中的Reduce功能)

    reduceByKey(func, [numTasks])

    在一个(KV)对的数据集上使用,返回一个(KV)对的数据集,key同样的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是能够通过第二个可选參数来配置的。

    join(otherDataset, [numTasks])

    在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每一个key中的全部元素都在一起的数据集

    groupWith(otherDataset, [numTasks])

    在类型为(K,V)(K,W)类型的数据集上调用,返回一个数据集,组成元素为(K, Seq[V], Seq[W]) Tuples。这个操作在其他框架,称为CoGroup

    cartesian(otherDataset)

    笛卡尔积。但在数据集TU上调用时,返回一个(TU)对的数据集,全部元素交互进行笛卡尔积。

    sortByKey([ascendingOrder])

    在类型为( K, V )的数据集上调用,返回以K为键进行排序的(KV)对数据集。升序或者降序由boolean型的ascendingOrder參数决定

    (类似于HadoopMap-Reduce中间阶段的Sort,按Key进行排序)




    眼下支持的动作(actions


    Action

    Meaning

    reduce(func)

    通过函数func聚集数据集中的全部元素。Func函数接受2个參数,返回一个值。这个函数必须是关联性的,确保能够被正确的并发运行

    collect()

    Driver的程序中,以数组的形式,返回数据集的全部元素。这一般会在使用filter或者其他操作后,返回一个足够小的数据子集再使用,直接将整个RDDCollect返回,非常可能会让Driver程序OOM

    count()

    返回数据集的元素个数

    take(n)

    返回一个数组,由数据集的前n个元素组成。注意,这个操作眼下并不是在多个节点上,并行运行,而是Driver程序所在机器,单机计算全部的元素

    (Gateway的内存压力会增大,须要慎重使用)

    first()

    返回数据集的第一个元素(类似于take1))

    saveAsTextFile(path)

    将数据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者不论什么其他hadoop支持的文件系统。Spark将会调用每一个元素的toString方法,并将它转换为文件里的一行文本

    saveAsSequenceFile(path)

    将数据集的元素,以sequencefile的格式,保存到指定的文件夹下,本地系统,hdfs或者不论什么其他hadoop支持的文件系统。RDD的元素必须由key-value对组成,并都实现了HadoopWritable接口,或隐式能够转换为WritableSpark包含了基本类型的转换,比如IntDoubleString等等)

    foreach(func)

    在数据集的每个元素上,执行函数func。这通经常使用于更新一个累加器变量,或者和外部存储系统做交互





    两种共享变量之广播变量和累加器


            一般来说,当一个函数被传递给Spark操作(比如map和reduce),一般是在集群结点上执行,在函数中使用到的全部变量,都做分别拷贝,供函数操作,而不会互相影响。这些变量会被复制到每一台机器,而在远程机器上,在对变量的全部更新,都不会被传播回Driver程序。然而,Spark提供两种有限的共享变量,供两种公用的使用模式:广播变量和累加器。



    广播变量


            广播变量同意程序猿保留一个仅仅读的变量,缓存在每一台机器上,而非每一个任务保存一份拷贝。他们能够使用,比如,给每一个结点一个大的输入数据集,以一种高效的方式。Spark也会尝试,使用一种高效的广播算法,来降低沟通的损耗。

            广播变量是从变量V创建的,通过调用SparkContext.broadcast(v)方法。这个广播变量是一个v的分装器,它的仅仅能够通过调用value方法获得。例如以下的解释器模块展示了怎样应用:
    scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

    broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)

    scala> broadcastVar.value

    res0: Array[Int] = Array(1, 2, 3)

            在广播变量被创建后,它能在集群执行的不论什么函数上,被代替v值进行调用,从而v值不须要被再次传递到这些结点上。另外,对象v不能在被广播后改动,是仅仅读的,从而保证全部结点的变量,收到的都是一模一样的。



    累加器


            累加器是仅仅能通过组合操作“加”起来的变量,能够高效的被并行支持。他们能够用来实现计数器(如同MapReduce中)和求和。Spark原生就支持Int和Double类型的计数器,程序猿能够加入新的类型。

            一个计数器,能够通过调用SparkContext.accumulator(V)方法来创建。执行在集群上的任务,能够使用+=来加值。然而,它们不能读取计数器的值。当Driver程序须要读取值的时候,它能够使用.value方法。

            例如以下的解释器,展示了怎样利用累加器,将一个数组里面的全部元素相加

    scala> val accum = sc.accumulator(0)

    accum: spark.Accumulator[Int] = 0

    scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)



    10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

    scala> accum.value

    res2: Int = 10



    spark的例子程序


            在Spark的站点上,你能够看到Spark例子程序
            另外,Spark包含了一些例子,在examples/src/main/scala上,有些既有Spark版本号,又有本地非并行版本号,同意你看到假设要让程序以集群化的方式跑起来的话,须要做什么改变。你能够执行它们,通过将类名传递给spark中的run脚本 — 比如./run spark.examples.SparkPi. 每个例子程序,都会打印使用帮助,当执行时没不论什么參数时。



    參考资料

    1.spark随谈——开发指南(译)http://www.linuxidc.com/Linux/2013-08/88595p2.htm


    /*

    注:

    本文全部内容来自參考资料1。

    转载请注明来源:http://blog.csdn.net/ksearch/article/details/24145757

    */


  • 相关阅读:
    无线鼠标换电池了
    Jython Interactive Servlet Console YOU WILL NEVER KNOW IT EXECLLENT!!! GOOD
    Accessing Jython from Java Without Using jythonc
    jython podcast cool isnt't it?
    Python里pycurl使用记录
    Creating an Interactive JRuby Console for the Eclipse Environment
    微软为AJAX和jQuery类库提供CDN服务
    Download A File Using Cygwin and cURL
    What is JMRI?这个是做什么用的,我真没看懂但看着又很强大
    用curl 发送指定的大cookie的http/https request
  • 原文地址:https://www.cnblogs.com/mfrbuaa/p/3758382.html
Copyright © 2011-2022 走看看