zoukankan      html  css  js  c++  java
  • [大数据之Spark]——快速入门

    本篇文档是介绍如何快速使用spark,首先将会介绍下spark在shell中的交互api,然后展示下如何使用java,scala,python等语言编写应用。可以查看编程指南了解更多的内容。

    为了良好的阅读下面的文档,最好是结合实际的练习。首先需要下载spark,然后安装hdfs,可以下载任意版本的hdfs。

    Spark Shell 交互

    基本操作

    Spark Shell提供给用户一个简单的学习API的方式 以及 快速分析数据的工具。在shell中,既可以使用scala(运行在java虚拟机,因此可以使用java库)也可以使用python。可以在spark的bin目录下启动spark shell:

    ./bin/spark-shell.sh
    

    spark操作对象是一种分布式的数据集合,叫做Resilient Distributed Dataset(RDD)。RDD可以通过hdfs文件创建,也可以通过RDD转换得来。

    下面就实际操作下,看看效果。我的本地有个文件——test.txt,内容为:

    hello world
    haha nihao
    

    可以通过这个文件创建一个新的RDD

    val textFile = sc.textFile("test.txt")
    textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21
    

    在Spark中,基于RDD可以作两种操作——Actions算子操作以及Transformations转换操作。

    我们可以使用一些算子操作体验下:

    scala> textFile.count() //RDD有用的数量
    res1: Long = 2
    
    scala> textFile.first() //RDD第一行
    res3: String = hello world
    

    再执行一些转换操作,比如使用filter转换,返回一个新的RDD集合:

    scala> val lines = textFile.filter(line=>line.contains("hello"))
    lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:23
    
    scala> lines.count()
    res4: Long = 1
    
    scala> val lines = textFile.filter(line=>line.contains("haha"))
    lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:23
    
    scala> lines.count()
    res5: Long = 1
    
    scala> lines.first()
    res6: String = haha nihao
    

    更多RDD操作

    RDD算子和转换可以组成很多复杂的计算,比如我们想找出最多一行中单词最多的单词数量:

    scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
    res4: Long = 15
    

    这个操作会把一行通过split切分计数,转变为一个整型的值,然后创建成新的RDD。reduce操作用来寻找单词最多的那一行。

    用户可以在任何时候调用方法和库,可以使用Math.max()函数:

    scala> import java.lang.Math
    import java.lang.Math
    
    scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
    res5: Int = 15
    

    一个很常见的数据操作就是map reduce,这个操作在hadoop中很常见。Spark可以轻松的实现Mapreduce任务:

    scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
    wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:28
    

    这里使用了flatMap,map以及reduceByKey等转换操作来计算每个单词在文件中的数量。为了在shell中显示,可以使用collect()触发计算:

    scala> wordCounts.collect()
    res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
    

    缓存

    Spark也支持在分布式的环境下基于内存的缓存,这样当数据需要重复使用的时候就很有帮助。比如当需要查找一个很小的hot数据集,或者运行一个类似PageRank的算法。

    举个简单的例子,对linesWithSpark RDD数据集进行缓存,然后再调用count()会触发算子操作进行真正的计算,之后再次调用count()就不会再重复的计算,直接使用上一次计算的结果的RDD了:

    scala> linesWithSpark.cache()
    res7: linesWithSpark.type = MapPartitionsRDD[2] at filter at <console>:27
    
    scala> linesWithSpark.count()
    res8: Long = 19
    
    scala> linesWithSpark.count()
    res9: Long = 19
    

    看起来缓存一个100行左右的文件很愚蠢,但是如果再非常大的数据集下就非常有用了,尤其是在成百上千的节点中传输RDD计算的结果。你也可以通过bin/spark-shell向集群提交任务,可以参考编程指南

    独立应用

    要使用spark api写一个自己的应用也很简单,可以基于scala、java、python去写一些简单的应用。

    /* SimpleApp.scala */
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    
    object SimpleApp {
      def main(args: Array[String]) {
        val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
        val conf = new SparkConf().setAppName("Simple Application")
        val sc = new SparkContext(conf)
        val logData = sc.textFile(logFile, 2).cache()
        val numAs = logData.filter(line => line.contains("a")).count()
        val numBs = logData.filter(line => line.contains("b")).count()
        println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
      }
    }
    

    注意应用需要定义main()方法。这个程序仅仅是统计文件中包含字符ab的分别都有多少行。你可以设置YOUR_SPARK_HOME替换自己的文件目录。不像之前在shell中的例子那样,我们需要自己初始化sparkContext。

    通过SparkConf构造方法创建SparkContext。

    应用依赖于spark api,因此需要在程序中配置sbt的配置文件——simple.sbt,它声明了spark的依赖关系。

    name := "Simple Project"
    version := "1.0"
    scalaVersion := "2.11.7"
    libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0"
    

    为了让sbt正确的工作,还需要创建SimpleApp.scala以及simple.sbt。然后就可以执行打包命令,通过spark-submit运行了:

    # Your directory layout should look like this 你的工程目录应该向下面这样
    $ find .
    .
    ./simple.sbt
    ./src
    ./src/main
    ./src/main/scala
    ./src/main/scala/SimpleApp.scala
    
    # Package a jar containing your application 运行sbt命令进行打包
    $ sbt package
    ...
    [info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar
    
    # Use spark-submit to run your application 通过spark-submit提交任务jar包
    $ YOUR_SPARK_HOME/bin/spark-submit 
      --class "SimpleApp" 
      --master local[4] 
      target/scala-2.11/simple-project_2.11-1.0.jar
    ...
    Lines with a: 46, Lines with b: 23
    

    其他地址

    通过上面的例子,就可以运行起来自己的Spark应用了。

    那么可以参考下面的链接获得更多的内容:

    • 为了更深入的学习,可以阅读Spark编程指南
    • 如果想要运行Spark集群,可以参考部署指南
    • 最后,Spark在examples目录中内置了多种语言版本的例子,如scala,java,python,r等等。你可以通过下面的命令运行:
    # For Scala and Java, use run-example:
    ./bin/run-example SparkPi
    
    # For Python examples, use spark-submit directly:
    ./bin/spark-submit examples/src/main/python/pi.py
    
    # For R examples, use spark-submit directly:
    ./bin/spark-submit examples/src/main/r/dataframe.R
    
  • 相关阅读:
    TextBox 只有下划线
    can't find web control library(web控件库)
    DropDownListSalesAC”有一个无效 SelectedValue,因为它不在项目列表中。
    IDE、SATA、SCSI、SAS、FC、SSD 硬盘类型
    如何打印1px表格
    CSS控制打印 分页
    Virtual Server could not open its emulated Ethernet switch driver. To fix this problem, reenable the Virtual Server Emulated Et
    Xml中SelectSingleNode方法中的xpath用法
    热带水果莫入冰箱?水果存放冰箱大法
    探索Asp.net的Postback机制
  • 原文地址:https://www.cnblogs.com/xing901022/p/5894562.html
Copyright © 2011-2022 走看看