zoukankan      html  css  js  c++  java
  • Spark学习笔记:(一)入门 glance

    参考:  http://spark.apache.org/docs/latest/quick-start.html
    其它资料:   
    http://mojijs.com/2015/04/190845/index.html
    http://taoistwar.gitbooks.io/spark-developer-guide/content/index.html
    http://rdc.taobao.org/?p=2024#转换
    http://blog.csdn.net/jediael_lu/article/details/45333195
    http://www.zhihu.com/question/26568496
     
     
     概述:       a fast and general-purpose cluster computing system

    Apache Spark是一个新兴大数据处理引擎,Scala是其编程语言,也支持python和java。Spark主要特点是提供了一个集群的分布式内存抽象(即RDD,操作本地集合->操作分布式数据集),以支持需要工作集的应用。

     
    Spark使用方式:
    • 通过shell使用Scala,Python,Java的API,使用MLib或SQL等tools。
    • 编程application。
    Spark与Hadoop的比较:
    • Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,对于迭代运算效率更高;
    • 提供的数据集操作类型(Transformations+Actions)有很多种,不像Hadoop只提供了Map和Reduce两种操作。
    • Spark支持故障恢复的方式也不同,提供两种方式,Linage,通过数据的血缘关系,再执行一遍前面的处理,Checkpoint,将数据集存储到持久存储中。
    • Spark的优势不仅体现在性能提升上的,Spark框架为批处理(Spark Core),交互式(Spark SQL),流式(Spark Streaming),机器学习(MLlib),图计算(GraphX)提供一个统一的数据处理平台,这相对于使用Hadoop有很大优势。
     

    1.RDD(Resilient Distributed Dataset,弹性分布数据集),RDD就是一个不可变的带分区的记录集合,RDD也是Spark中的编程模型。RDDs can be created from Hadoop InputFormats (such as HDFS files) or by transforming other RDDs.

    scala> val textFile = sc.textFile("README.md")
    textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3    //这是输出
    • 表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应不同的RDD实现。
    • RDD必须是可序列化的。静态类型。
    • 可以控制存储级别(内存、磁盘等)来进行重用。当内存不足时,RDD会spill到disk。
    • 可以cache到内存中,每次 对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。这对于迭代运算比 较常见的机器学习算法, 交互式数据挖掘来说,效率提升比较大。
     
    2.Spark提供了RDD上的两类操作,转换(Transformations)和动作(Actions)。RDDs have actions, which return values, and transformations, which return pointers to new RDDs.
    ./bin/spark-shell
    • 转换是用来定义一个新的RDD,包括map, flatMap, filter, union, sample, join, groupByKey, cogroup, ReduceByKey, cros, sortByKey, mapValues等。
    scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
    linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09
    • 动作是返回一个结果,包括collect, reduce, count, save, lookupKey。 
    scala> textFile.count() // Number of items in this RDD
    res0: Long = 126
    
    scala> textFile.first() // First item in this RDD
    res1: String = # Apache Spark
    • chain together transformations and actions:
    scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
    res3: Long = 15
     
    3.Spark 可以很容易的实现MapReduce,Spark的WordCount的示例如下所示:
    val spark = new SparkContext(master, appName, [sparkHome], [jars])
    val file = spark.textFile("hdfs://...")
    val counts = file.flatMap(line => line.split(" "))
                     .map(word => (word, 1))
                     .reduceByKey(_ + _)
    counts.saveAsTextFile("hdfs://...")

    其中的file是根据HDFS上的文件创建的RDD,后面的flatMap,map,reduceByKe都创建出一个新的RDD,一个简短的程序就能够执行很多个转换和动作。

    在Spark中,所有RDD的转换都是是惰性求值的。RDD的转换操作会生成新的RDD,新的RDD的数据依赖于原来的RDD的数据,每个RDD又包含多个分区。那么一段程序实际上就构造了一个由相互依赖的多个RDD组成的有向无环图(DAG)。并通过在RDD上执行动作将这个有向无环图作为一个Job提交给Spark执行。

    5.将RDD写入缓存会大大提高处理效率。

    scala> linesWithSpark.cache()
    res7: spark.RDD[String] = spark.FilteredRDD@17e51082
    
    scala> linesWithSpark.count()
    res8: Long = 15

    6.编程

    /* SimpleApp.scala */
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    
    object SimpleApp {
      def main(args: Array[String]) {
        val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
        val conf = new SparkConf().setAppName("Simple Application")
        val sc = new SparkContext(conf)
        val logData = sc.textFile(logFile, 2).cache()
        val numAs = logData.filter(line => line.contains("a")).count()
        val numBs = logData.filter(line => line.contains("b")).count()
        println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
      }
    }

    use the spark-submit script to run program

    # Your directory layout should look like this
    $ find .
    .
    ./simple.sbt
    ./src
    ./src/main
    ./src/main/scala
    ./src/main/scala/SimpleApp.scala
    
    # Package a jar containing your application
    $ sbt package
    ...
    [info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar
    
    # Use spark-submit to run your application
    $ YOUR_SPARK_HOME/bin/spark-submit 
      --class "SimpleApp" 
      --master local[4] 
      target/scala-2.10/simple-project_2.10-1.0.jar
    ...
    Lines with a: 46, Lines with b: 23

     

     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
     
  • 相关阅读:
    如何学习掌握一门新技术
    Linux多线程编程(不限Linux)
    腾讯后台开发面试题2
    腾讯后台开发面试题
    【转】Linux杀死fork产生的子进程的僵尸进程defunct
    【转】Linux网络编程入门
    【转】揭开Socket编程的面纱
    【转】简单理解socket
    【转】404、500、502等HTTP状态码介绍
    【转】fread函数和fwrite函数
  • 原文地址:https://www.cnblogs.com/aezero/p/4565321.html
Copyright © 2011-2022 走看看