zoukankan      html  css  js  c++  java
  • [spark]spark 编程教程

    参考:

    英文:https://spark.apache.org/docs/latest/programming-guide.html

    中文:http://www.cnblogs.com/lujinhong2/p/4651025.html 1.2.1版本的

    (一)快速入门

    老规矩,先看一个简单示例,有个认识。这个示例来自官方example的SparkPi:

    package org.lujinhong.demo.spark
    
    /*
     * 官方的sparkPi示例
     */
    
    import scala.math.random
    
    import org.apache.spark._
    
    /** Computes an approximation to pi */
    object SparkPi {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("Spark Pi").setMaster("local")
        val spark = new SparkContext(conf)
        val slices = if (args.length > 0) args(0).toInt else 2
        val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
        val count = spark.parallelize(1 until n, slices).map { i =>
          val x = random * 2 - 1
          val y = random * 2 - 1
          if (x*x + y*y < 1) 1 else 0
        }.reduce(_ + _)
        println("Pi is roughly " + 4.0 * count / n)
        spark.stop()
      }
    
    }
    
     
    注意以上的setMaster(“local”)是自己加上去的,方便直接在本地运行。如果在集群上运行,则通过spark-submit的—master参数指定。
    写好代码后,就可以直接在eclipse中右键—>运行了。

     (二)理论介绍

    1、spark中的所有操作都与RDD相关,包括创建RDD,transformation(将RDD转换为另一个RDD)和action(触发RDD的计算,以及输出等)。

    In Spark all work is expressed as either creating new RDDs, transforming existing RDDs, or calling operations on RDDs to compute a result.

     2、RDD是一个不可变的分布式对象集合,每个RDD会被分成多个分区,它们分别在不同的机器上被计算。它可以是任何的python/java/scala对象,包括你自己创建的对象。

    注意RDD是不可变的,因此若需要改变现有RDD的内容,只能通过创建一个新的RDD来实现,这也是transformation的作用。

    RDD是一个集合,因此可以通过一些迭代方法对内容进行处理

     3、RDD操作类型:对RDD的操作可以分为2种类型

    (1)Transformation: 将一个RDD转化为另一个RDD,如map, filter等操作

    (2)Action:返回计算结果给driver,写入存储等操作。

    最明显的区别:transformation返回一个RDD, action返回其它数据类型

    (三)Spark应用的主要4个工作流程如下:

    1、create:     通过读取外部数据源来创建RDD。(虽然说也可以将list/set等转化为RDD,但实际上这对于处理大数据没什么作用,一般只用作demo)

    2、transformation:  将RDD将化为另一个RDD,如filter()等。

    3、cache:  将RDD缓存下来,方便之后再使用,如persitst()等。

    4、action:   执行真正的工作,计算结果并输出,如count(),first()等。

    几个注意点

    1、创建RDD有2种方法:

    (1)从外部数据集中创建,如从文件,socket,kafka, flume等数据源

    (2)将list/set等集合转化为RDD。scala> val lines = sc.parallelize(List("apple","pear"));

    2、执行transformation只定义了操作,spark执行的是懒计算原则,即transformation不会触发真正的计算,而是等到第一个action出现时才开始真正的计算。这对于大数据量时成为重要。如读取一份大文件时,若马上将其读入内存,会占用大量的内存空间,而有可能过很长时间也会开始计算。另一方面,如果只是执行first()类似的计算,这个文件完全没必要全部读入内存,而是只读取到第一行就可以了。

    3、默认情况下,对于每一个action,spark会重新计算它用到的RDD,若一个RDD会被之后的多个action用到,可以将其缓存到内存(当作也可以到磁盘等),如读取一个文件后,先经过filter,过滤出只包括”spark”的行,此时可以将这个RDD保存到内存中,再分别计算它的count(),first()等操作。

    cache() is the same as calling persist() with the default storage level.

    4、action会触发真正的计算。

    看一个示例:

    $ bin/spark-shell

     
    (1)创建RDD
    scala> val fileContent = sc.textFile("file:///home/hadoop/spark/README.md”)
     
    (2)过滤RDD
    scala> val pythonLine = fileContent.filter(line => line.contains("spark”))
     
    (3)计算行数
    scala> pythonLine.count
    最后的输出如下:
    15/07/21 11:20:43 INFO scheduler.DAGScheduler: Job 2 finished: count at <console>:26, took 1.495956 s
    res5: Long = 11
     
    可以看出使用了1.5秒左右。
     
    (4)我们试一下缓存后再计算
    scala> pythonLine.cache()
    scala> pythonLine.count
    15/07/21 11:22:18 INFO scheduler.DAGScheduler: Job 3 finished: count at <console>:26, took 0.123537 s
    res7: Long = 11
    只使用了0.12秒
     
    (5)继续执行其它action
    scala> pythonLine.first()
    。。。。。

     

  • 相关阅读:
    数组的操作方法
    数组遍历的方法以及区别
    组件内的守卫
    路由守卫
    软件工程
    java web (j2ee)学习路线 —— 将青春交给命运
    团队作业(一)- 第五组
    软件工程
    软件工程-第二次作业
    java局部/成员/静态/实例变量
  • 原文地址:https://www.cnblogs.com/lujinhong2/p/4664021.html
Copyright © 2011-2022 走看看