zoukankan      html  css  js  c++  java
  • Spark快速入门

    Spark快速入门 - Spark 1.6.0


    转载请注明出处:http://www.cnblogs.com/BYRans/

    快速入门(Quick Start)

    本文简单介绍了Spark的使用方式。首先介绍Spark的交互界面的API使用,然后介绍如何使用Java、Scala以及Python编写Spark应用。详细的介绍请阅读Spark Programming Guide

    在按照本文进行操作之前,请确保已安装Spark。本文中的所有操作没有使用HDFS,所以您可以安装任何版本的Hadoop。

    Spark交互式Shell的使用(Interactive Analysis with the Spark Shell)

    基础(Basics)

    Spark的交互式Shell提供了一个简单的方式来学习Spark的API,同时也提供了强大的交互式数据处理能力。Spark Shell支持Scala和Python两种语言。启动支持Scala的Spark Shell方式为

    ./bin/spark-shell
    

    Spark最重要的一个抽象概念是弹性分布式数据集(Resilient Distributed Dataset)简称RDD。RDDs可以通过Hadoop InputFormats(例如HDFS文件)创建,也可以由其它RDDs转换而来。下面的例子是通过加载Spark目录下的README.md文件生成RDD的例子:

    scala> val textFile = sc.textFile("README.md")
    textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3
    

    RDDs有两种操作:

    • actions:返回计算值
    • transformations:返回一个新RDDs的引用

    actions示例如下:

    scala> textFile.count() // Number of items in this RDD
    res0: Long = 126
    
    scala> textFile.first() // First item in this RDD
    res1: String = # Apache Spark
    

    如下transformations示例,使用filter操作返回了一个新的RDD,该RDD为文件中数据项的子集,该子集符合过滤条件:

    scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
    linesWithSpark: spark.RDD[String] = spark.FilteredRDD@7dd4af09
    

    Spark也支持将actions和transformations一起使用:

    scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
    res3: Long = 15
    

    更多RDD操作(More on RDD Operations)

    RDD的actions和transformations操作可以用于更加复杂的计算。下面是查找README.md文件中单词数最多的行的单词数目:

    scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
    res4: Long = 15
    

    上面代码中,第一个map操作将一行文本按空格分隔,并计算单词数目,将line映射为一个integer值,并创建了一个新的RDD保存这些integer值。RDD调用reduce计算最大的单词数。示例中map和reduce操作的参数是Scala的函数式编程风格,Spark支持Scala、Java、Python的编程风格,并支持Scala/Java库。例如,使用Scala中的Math.max()函数让程序变得更加简洁易读:

    scala> import java.lang.Math
    import java.lang.Math
    
    scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
    res5: Int = 15
    

    随着Hadoop的流行,MapReduce变为一种常见的数据流模式。Spark可以轻松的实现MapReduce,使用Spark编写MapReduce程序更加简单:

    scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
    wordCounts: spark.RDD[(String, Int)] = spark.ShuffledAggregatedRDD@71f027b8
    

    上面示例中,使用flatMap、map和reduceByKey操作来计算每个单词在文件中出现的次数,并生成一个结构为<String,Int>的RDD。可以使用collect操作完成单词统计结果的收集整合:

    scala> wordCounts.collect()
    res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
    

    缓存(Caching)

    Spark支持将数据缓存到集群的分布式内存中。在数据会被重复访问的情况下,将数据缓存到内存能减少数据访问时间,从而提高运行效率。尤其是在数据分布在几十或几百个节点上时,效果更加明显。下面为将数据linesWithSpark缓存到内存的示例:

    scala> linesWithSpark.cache()
    res7: spark.RDD[String] = spark.FilteredRDD@17e51082
    
    scala> linesWithSpark.count()
    res8: Long = 19
    
    scala> linesWithSpark.count()
    res9: Long = 19
    

    独立应用(Self-Contained Applications)

    假设我们想使用Spark API编写独立应用程序。我们可以使用Scala、Java和Python轻松的编写Spark应用。下面示例为一个简单的应用示例:

    • Scala
    /* SimpleApp.scala */
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    
    object SimpleApp {
      def main(args: Array[String]) {
        val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
        val conf = new SparkConf().setAppName("Simple Application")
        val sc = new SparkContext(conf)
        val logData = sc.textFile(logFile, 2).cache()
        val numAs = logData.filter(line => line.contains("a")).count()
        val numBs = logData.filter(line => line.contains("b")).count()
        println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
      }
    }
    

    上面程序分别统计了README中包含字符‘a’以及‘b’的行数。与前面Spark shell例子不同的是,我们需要初始化SparkContext。
    我们通过SparkContext创建了一个SparkConf对象,SparkConf对象包含应用的基本信息。
    我们基于Spark API编写应用,所以我们需要编写一个名为“simple.sbt”的sbt配置文件,用于指明Spark为该应用的一个依赖。下面的sbt配置文件示例中,还增加了Spark的一个依赖库“spark-core”:

    name := "Simple Project"
    
    version := "1.0"
    
    scalaVersion := "2.10.5"
    
    libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
    

    为了让sbt正确执行,我们需要对SimpleApp.scala和simple.sbt根据sbt要求的目录结构布局。如果布局正确,就可以生成该应用的JAR包,使用spark-submit命令即可运行该程序。

    • Java
    /* SimpleApp.java */
    import org.apache.spark.api.java.*;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.Function;
    
    public class SimpleApp {
      public static void main(String[] args) {
        String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
        SparkConf conf = new SparkConf().setAppName("Simple Application");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> logData = sc.textFile(logFile).cache();
    
        long numAs = logData.filter(new Function<String, Boolean>() {
          public Boolean call(String s) { return s.contains("a"); }
        }).count();
    
        long numBs = logData.filter(new Function<String, Boolean>() {
          public Boolean call(String s) { return s.contains("b"); }
        }).count();
    
        System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
      }
    }
    

    该示例的代码逻辑同上一段Scala示例代码。与Scala示例类似,首先初始化了SparkContext,通过SparkContext创建了JavaSparkContext对象。并创建了RDDs以及执行transformations操作。最后,通过继承了spark.api.java.function.Function的类将函数传给Spark。

    在这里,使用Maven进行编译,Maven的pom.xml如下:

    <project>
      <groupId>edu.berkeley</groupId>
      <artifactId>simple-project</artifactId>
      <modelVersion>4.0.0</modelVersion>
      <name>Simple Project</name>
      <packaging>jar</packaging>
      <version>1.0</version>
      <dependencies>
        <dependency> <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.10</artifactId>
          <version>1.6.0</version>
        </dependency>
      </dependencies>
    </project>
    

    按照Maven的要求架构配置文件位置:

    $ find .
    ./pom.xml
    ./src
    ./src/main
    ./src/main/java
    ./src/main/java/SimpleApp.java
    

    现在,就可以使用Maven打包应用,以及使用命令./bin/spark-submit.执行该应用程序。示例如下:

    # Package a JAR containing your application
    $ mvn package
    ...
    [INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
    
    # Use spark-submit to run your application
    $ YOUR_SPARK_HOME/bin/spark-submit 
      --class "SimpleApp" 
      --master local[4] 
      target/simple-project-1.0.jar
    ...
    Lines with a: 46, Lines with b: 23
    

  • 相关阅读:
    阿里面试后的问题总结
    Spring IOC源码实现流程
    Spring Aop源码分析
    SpringCloud的分布式配置及消息总线
    阿里java编码规范考试总结
    压缩文件的压缩时候中文乱码码
    mybatis的时间比较 xml 及不解析<=的写法
    批量插入一张表的数据,并且生成不同的uuid 字符截取 批量更新 去除重复数据
    Redis集群的搭建
    Python 之 基础知识(二)
  • 原文地址:https://www.cnblogs.com/BYRans/p/5199824.html
Copyright © 2011-2022 走看看