zoukankan      html  css  js  c++  java
  • Spark 2.2.0 文档中文版 Quick Start 快速开始

    原地址:http://spark.apache.org/docs/latest/quick-start.html

      这篇指导对使用Spark提供了一个快速的介绍。我们首先介绍API,通过spark交互式shell(Python或Scala)。然后如何在JAVA、scala、python上写应用程序。

      跟随这篇指导,首先从spark网站上下载一个release。因为我们不使用HDFS,你可以下载任何版本的hadoop。

      注意:在spark2.0之前,主要的编程界面是RDD。在spark2.0之后,RDD被Dataset取代。Dataset很像RDD,但是会有更多的优化,RDD的接口依旧智齿,并且你能够在《RDD编程指导》得到一个更完整的引用。然而,我们非常推荐你使用DataSet,会比RDD有更好的表现。看《SQL编程指导》得到更多关于Dateset的指导。

      spark Shell 的交互分析:

      基础:

      spark shell 提供了一个简单的方式去学习API,同样也是一个非常强大的交互式Dataset分析工具。可以应用在scala(运行与JAVA VM,可以使用现有的JAVA库)、python。在spark目录下运行下面代码开启spark

    ./bin/spark-shell

      spark首要概念是分布式items收集。Dataset能够从Hadoop输入格式(HDFS)上创建,或者从其他Dataset转化。让我们从spark源文件目录中的README文件中做一个新的Dataset。

    scala> val textFile = spark.read.textFile("README.md")
    textFile: org.apache.spark.sql.Dataset[String] = [value: string]

      你可以直接从Dataset中获取值,通过使用一些action,或转化dataset为一个新的。详细请读《API doc》。

    scala> textFile.count() // Number of items in this Dataset
    res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
    
    scala> textFile.first() // First item in this Dataset
    res1: String = # Apache Spark

      现在让我们把这个Dataset转化为一个新的。我们用filter返回一个新的Dataset(新的Dataset是原文件条目的子集)

    scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
    linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

      我们可以连接转化(transformation)和行动(actions)

    scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
    res3: Long = 15

      更多的dataset操作

      Dataset的action和transformtion可以用来做更复杂的计算任务。找出单词数最多的那一行:

    scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
    res4: Long = 15

      第一个map是得到一个整数,创建一个新的Dataset,reduce用来找最大的单词数。他们的参数是scala函数,你可以使用scala,java。例如,我们可以在任何地方简单地公开调用函数。我们使用Math.max函数让这段胆码更容易理解。

    scala> import java.lang.Math
    import java.lang.Math
    
    scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
    res5: Int = 15

      一个常见的数据流模式叫做MapReduce,因Hadoop而普及。spark可以很简单地继承MapReduce:

    scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
    wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

      在这里,我们调用flatmap图转化一个行Dataset为一个单词Dataset,然后结合groupByKey和count去计算每个单词在文件中出现的次数,使用pair。收集单词数,使用:

    scala> wordCounts.collect()
    res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

      

      缓冲:

      spark同样支持把数据放在集群内存的缓存里,当数据重复被存取的时候这非常有用,比如查询一个很火的dataset,或者跑一些迭代算法(PageRank等)。举一个简单例子,我们把linesWithSpark 放进缓存。

    scala> linesWithSpark.cache()
    res7: linesWithSpark.type = [value: string]
    
    scala> linesWithSpark.count()
    res8: Long = 15
    
    scala> linesWithSpark.count()
    res9: Long = 15

      用spark浏览和缓存100行的文本看起来很愚蠢,有趣的是这些相同的函数被使用在一个非常大的数据集,甚至跨越数十数百个节点。你同样跨越使用 bin/spark-shell 连接集群,正如RDD编程中的描述。

      独立程序:

      假设我们希望用sparkAPI写一个独立程序,我们将通过一个简单的程序java(Maven)

      这个例子将使用Maven去编译一个应用jar,任何类似的build系统也都能工作。

      我们将创建一个非常简单的spark应用。SimpleApp.java

    /* SimpleApp.java */
    import org.apache.spark.sql.SparkSession;
    
    public class SimpleApp {
      public static void main(String[] args) {
        String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
        SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
        Dataset<String> logData = spark.read.textFile(logFile).cache();
    
        long numAs = logData.filter(s -> s.contains("a")).count();
        long numBs = logData.filter(s -> s.contains("b")).count();
    
        System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
    
        spark.stop();
      }
    }

       这个程序只是计算了包含‘a’的行的数量和包含‘b’的行的数量,在Spark的README中。注意你需要替换YOURS_SPARK_HOME为你本地spark安装的地方。不像前面spark-shell的例子(初始化自己SparkSession),我们初始化一个SparkSession作为程序的一部分。

      去build这个程序,我们也需要写一个Maven-pom.xml文件列出spark是一个依赖。注意spark要人工标出scala的版本。

    <project>
      <groupId>edu.berkeley</groupId>
      <artifactId>simple-project</artifactId>
      <modelVersion>4.0.0</modelVersion>
      <name>Simple Project</name>
      <packaging>jar</packaging>
      <version>1.0</version>
      <dependencies>
        <dependency> <!-- Spark dependency -->
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.2.0</version>
        </dependency>
      </dependencies>
    </project>

      我们通过标准的Maven目录结构来展示这些文件

    $ find .
    ./pom.xml
    ./src
    ./src/main
    ./src/main/java
    ./src/main/java/SimpleApp.java

      下一步:

      祝贺你运行了第一个你的spark应用

      --一个深入的API概述,请开始《RDD编程指导》和《SQL编程指导》,或者看其他部分“编程指导”菜单。

      --在集群上跑程序,请看《部署概述》

      --最后,spark包含了一些样例在example目录下(scala,java,python,R),你可以这样运行他们

    # For Scala and Java, use run-example:
    ./bin/run-example SparkPi
  • 相关阅读:
    [LeetCode]Add Two Numbers
    [LeetCode]Longest SubString Without Repeating Characters
    [LeetCode]Median of Two Sorted Arrays
    [LeetCode]Two Sum
    动态规划
    [shell编程]一个简单的脚本
    一些linux的问题
    核稀疏表示分类(KSRC)
    conda 按照指定源下载python包
    python 保留两位小数
  • 原文地址:https://www.cnblogs.com/taoshiqian/p/7200344.html
Copyright © 2011-2022 走看看