zoukankan      html  css  js  c++  java
  • Spark环境的搭建与运行

    Spark本地安装与配置

    • 下载spark后解压,并cd到解压目录下
    • 运行实例程序测试是否一切正常
    ./bin/run-example org.apache.spark.examples.SparkPi
    
    • 在本地模式下设定要使用的线程数目local[N]
    master=local[2] ./bin/run-example org.apache.spark.examples.SparkPi
    

    Spark集群

    • Spark集群由两类程序构成:一个驱动程序和多个执行程序。
    • 本地模式中,所有的处理都是运行在一个JVM内的。
    • 如果要在一个Spark单机集群上运行示例代码,只要传入猪截点的IP和PROT端口号
    master=spark://IP:PORT ./bin/run-example org.apache.spark.examples.SparkPi
    

    Spark 编程模型

    • SparkContext 和 Spark Shell

    SparkContext类和SparkConf类

    • 创建一个四线程的SparkContext类
    val conf = new SparkConf()
    .setAppName("Text Spark APP")
    .setMaster("local[4]")
    val sc = new SparkContext(conf)
    or
    val sc = new SparkContext("local[4]", "Test Spark App")
    

    Spark shell

    • 进入程序主目录下,运行如下代码启动 Spark shell (Scala语言的shell)
    ./bin/spark shell
    
    • 启动spark shell 之后会自动初始化一个SparkContext对象。
    • 进入主目录下,运行如下代码启动Python shell。
    ./bin/pyspark
    
    • python下的SparkContext对象可以通过Python变量sc来调用。

    弹性分布式数据集

    1.创建RDD

    • 从现有的集合创建RDD
    val collection = List("a", "b", "c", "d", "e")
    val rddFromCollection = sc.parallelize(collection)
    
    • 也可以从本地文件中创建RDD
    val rddFromTextFile = sc.textFile("license")
    

    2.Spark操作

    • Spark编程模式下,所有的操作都被分为转换(transformation)和执行(action)。
    • 最常见的转换操作就是map,该操作对一个RDD的每一条记录都执行某个函数,从而将输入映射称为新的输出。
    • 实例
    • 对于之前创建的一个由若干String构成的RDD对象,通过map将每一个字符串转换为一个整数,返回一个由若干Int组成RDD对象。
    val intsFromStringsRDD = rddFromTextFile.map(line => line.size)
    
    • 执行count返回RDD中的记录数目。
    intsFromStringsRDD.cout
    
    • 如果要计算每行字符串的平均长度,可以先sum计算所有记录的总长,再除以总的记录数目。
    val sumOfRecords = intsFromStringsRDD.sum
    val numRecords = intsFromStringsRDD.count
    val aveLengthOfRecord = sumOfRecords / numRecords
    or
    val aveLengthOfRecordChained =rddFromTextFile.map(line => line.size).sum/rddFromTextFile.count
    
    • Spark中转换操作是延后的。在RDD上调用一个转换操作并不会立即触发相应的计算。相反,这些转换会链接起来,并只有在执行操作时才会被高效的计算。
    • 实例
    • 下面的代码不会触发实际的操作
    val transformedRDD = rddFromTextFile.map(line => line.size).filter(size =>size>10).map(size =>size*2)
    
    • 调用如下执行操作,计算将会被触发
    val computation = transformedRDD.sum
    

    3.RDD缓存策略

    • 将RDD缓存在集群的内存中。
    rddFromTextFile.cache
    
    • 首次缓存会花费一些时间,下一次访问就会很快,数据可以直接从内存中读取,从而减少I/O操作。

    3.广播变量和累加变量

    • 两类特殊变量:广播变量和累加变量
      +广播变量为只读变量,创建广播变量如下
    val broadcastAList = sc.broadcast(List("a", "b", "c", "d", "e"))
    
    • 广播变量可以被非驱动程序的节点访问
    sc.parallelize(List("1", "2", "3")).map(x =>broadcastAList.value ++ x).collect
    

    Spark Scala 编程入门

    • 对于Scala程序而言,需要创建两个文件:Scala代码文件以及项目的构建配置文件。
    • 项目将使用SBT(Scala Build Tool)来构建。
    • 实例
    • SBT配置文件如下。各行代码之间的空行是必须的!
    name := "scala-spark-app"
    \空行
    version := "1.0"
    \空行
    scalaVersion := "2.10.4"
    \空行
    libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0"
    
    • 导入所需要的Spark类
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    
    • 初始化所需要的SparkContext对象,并通过textFile函数来访问CSV数据文件
    def main(args: Array[String]){
        val sc = new SparkContext("local[2]", "First Spark App")
        val data = sc.textFile("data/UserPerchaseHistory.csv")
            .map(line => line.split(",")
            .map(purchaseRecord => (perchaseRecord(0), purchaseRecord(1),perchaseRecord(2)))
    
    • RDD中每条记录都是由(user, product, price)构成,对商店计算如下指标:购买总次数,客户总个数,总收入。
    val numPurchases = data.count()
    val uniqueUsers = data.map{ case(user, product, price) => user}.distinct().count()
    val totalRevenue = data.map{ case(user, product, price) => price.toDouble}.sum()
    val productsByPopularity = data
        .map{case(user, product, price) => (product, 1)}
        .reduceByKey(_ + _)
        .collect()
        .sortBy(-_._2)
    val mostPopular = productsByPopularity(0)
    
    • 结果打印
            println(Total purchases:"+ numPurchases)
            ...
        }
    }
    

    Spark Python 编程入门

    from pyspark import SparkContext
    
    sc = SparkContext("local[2]", "First Spark App")
    # spark读取本地文件
    t = sc.textFile("file:///home/users/douzhi/t.txt")
    #spark读取hdfs文件
    t = sc.textFile("hdfs:///path/...")
    print t.first()
  • 相关阅读:
    TP连接数据库报错:SQLSTATE[HY000] [2002] No such file or directory (原)
    linux的查找命令 find whereis locate
    windows下cmd无法使用telnet命令解决方法 (原)
    CDN和智能DNS原理和应用 (原)
    面试:sql语句-1-基础查询
    面试问题:对框架的理解
    Hub,bridge,switch and router的区别
    If you ever have a broken heart
    virt-viewer的简单使用
    各种虚拟化镜像文件格式
  • 原文地址:https://www.cnblogs.com/milkcoffeesugar/p/5734075.html
Copyright © 2011-2022 走看看