原地址:http://spark.apache.org/docs/latest/quick-start.html
这篇指导对使用Spark提供了一个快速的介绍。我们首先介绍API,通过spark交互式shell(Python或Scala)。然后如何在JAVA、scala、python上写应用程序。
跟随这篇指导,首先从spark网站上下载一个release。因为我们不使用HDFS,你可以下载任何版本的hadoop。
注意:在spark2.0之前,主要的编程界面是RDD。在spark2.0之后,RDD被Dataset取代。Dataset很像RDD,但是会有更多的优化,RDD的接口依旧智齿,并且你能够在《RDD编程指导》得到一个更完整的引用。然而,我们非常推荐你使用DataSet,会比RDD有更好的表现。看《SQL编程指导》得到更多关于Dateset的指导。
spark Shell 的交互分析:
基础:
spark shell 提供了一个简单的方式去学习API,同样也是一个非常强大的交互式Dataset分析工具。可以应用在scala(运行与JAVA VM,可以使用现有的JAVA库)、python。在spark目录下运行下面代码开启spark
./bin/spark-shell
spark首要概念是分布式items收集。Dataset能够从Hadoop输入格式(HDFS)上创建,或者从其他Dataset转化。让我们从spark源文件目录中的README文件中做一个新的Dataset。
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]
你可以直接从Dataset中获取值,通过使用一些action,或转化dataset为一个新的。详细请读《API doc》。
scala> textFile.count() // Number of items in this Dataset res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs scala> textFile.first() // First item in this Dataset res1: String = # Apache Spark
现在让我们把这个Dataset转化为一个新的。我们用filter返回一个新的Dataset(新的Dataset是原文件条目的子集)
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
我们可以连接转化(transformation)和行动(actions)
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? res3: Long = 15
更多的dataset操作
Dataset的action和transformtion可以用来做更复杂的计算任务。找出单词数最多的那一行:
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res4: Long = 15
第一个map是得到一个整数,创建一个新的Dataset,reduce用来找最大的单词数。他们的参数是scala函数,你可以使用scala,java。例如,我们可以在任何地方简单地公开调用函数。我们使用Math.max函数让这段胆码更容易理解。
scala> import java.lang.Math import java.lang.Math scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res5: Int = 15
一个常见的数据流模式叫做MapReduce,因Hadoop而普及。spark可以很简单地继承MapReduce:
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
在这里,我们调用flatmap图转化一个行Dataset为一个单词Dataset,然后结合groupByKey和count去计算每个单词在文件中出现的次数,使用pair。收集单词数,使用:
scala> wordCounts.collect() res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
缓冲:
spark同样支持把数据放在集群内存的缓存里,当数据重复被存取的时候这非常有用,比如查询一个很火的dataset,或者跑一些迭代算法(PageRank等)。举一个简单例子,我们把linesWithSpark
放进缓存。
scala> linesWithSpark.cache() res7: linesWithSpark.type = [value: string] scala> linesWithSpark.count() res8: Long = 15 scala> linesWithSpark.count() res9: Long = 15
用spark浏览和缓存100行的文本看起来很愚蠢,有趣的是这些相同的函数被使用在一个非常大的数据集,甚至跨越数十数百个节点。你同样跨越使用 bin/spark-shell 连接集群,正如RDD编程中的描述。
独立程序:
假设我们希望用sparkAPI写一个独立程序,我们将通过一个简单的程序java(Maven)
这个例子将使用Maven去编译一个应用jar,任何类似的build系统也都能工作。
我们将创建一个非常简单的spark应用。SimpleApp.java
/* SimpleApp.java */ import org.apache.spark.sql.SparkSession; public class SimpleApp { public static void main(String[] args) { String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate(); Dataset<String> logData = spark.read.textFile(logFile).cache(); long numAs = logData.filter(s -> s.contains("a")).count(); long numBs = logData.filter(s -> s.contains("b")).count(); System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs); spark.stop(); } }
这个程序只是计算了包含‘a’的行的数量和包含‘b’的行的数量,在Spark的README中。注意你需要替换YOURS_SPARK_HOME为你本地spark安装的地方。不像前面spark-shell的例子(初始化自己SparkSession),我们初始化一个SparkSession作为程序的一部分。
去build这个程序,我们也需要写一个Maven-pom.xml文件列出spark是一个依赖。注意spark要人工标出scala的版本。
<project> <groupId>edu.berkeley</groupId> <artifactId>simple-project</artifactId> <modelVersion>4.0.0</modelVersion> <name>Simple Project</name> <packaging>jar</packaging> <version>1.0</version> <dependencies> <dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency> </dependencies> </project>
我们通过标准的Maven目录结构来展示这些文件
$ find . ./pom.xml ./src ./src/main ./src/main/java ./src/main/java/SimpleApp.java
下一步:
祝贺你运行了第一个你的spark应用
--一个深入的API概述,请开始《RDD编程指导》和《SQL编程指导》,或者看其他部分“编程指导”菜单。
--在集群上跑程序,请看《部署概述》
--最后,spark包含了一些样例在example目录下(scala,java,python,R),你可以这样运行他们
# For Scala and Java, use run-example:
./bin/run-example SparkPi