zoukankan      html  css  js  c++  java
  • Spark 学习笔记 (一): 初探Spark 程序设计RDD

    Spark 学习笔记 (一): 初探Spark 程序设计之RDD

    本文主要介绍Spark基本数据结构RDD的原理和使用,以及搭建了基于Docker的Spark集群开发测试环境,最后给出了几个实际程序例子,算是Saprk入了门:)

    一、 RDD

    RDD是Spark中最核心的概念

    1.初识RDD --- Resilient Distributed Datasets 弹性分布式数据集

    • 数据集:RDD是数据集合的抽象,分布在集群中的只读对象的集合

      • 一个RDD由多个Partition构成,也就是一个RDD被分区成Partiton存在不同节点上
      • 一个Partition可以存储在此磁盘或内存中
      • 通过并行transform操作进行一个RDD到另一个RDD的转换
    • 分布式:

      • Partition 是分布式存储的
      • 数据的计算是多个节点协同计算得到的
    • 弹性:RDD 可以在不改变内部存储数据记录的前提下,去调整并行计算计算单元的划分结构,弹性这一特性,也是为并行计算服务的

    • 容错性:分布式的一般问题是需要具有容错性,那么RDD本身是具有容错性的,

    RDD 内部的数据集合在逻辑上和物理上被划分成多个小子集合Partition,这样的每一个子集合我们将其称为分区,分区的个数会决定并行计算的粒度,而每一个分区数值的计算都是在一个单独的任务Task中进行,因此并行任务的个数,也是由 RDD分区的个数决定的

    Partition -> Task

    2. Spark运行模式

    Spark

    先放个图,看下Spark整体程序是怎么执行的

    • 整个集群分为 Master 节点和 Worker 节点,相当于 Hadoop 的 Master 和 Slave 节点

    • Master 节点上常驻 Master 守护进程,负责管理全部的 Worker 节点

    • Worker 节点上常驻 Worker 守护进程,负责与 Master 节点通信并管理 executors

    • Driver 官方解释是 “The process running the main() function of the application and creating the SparkContext”。 Application 就是用户自己写的 Spark 程序(driver program)

    • 每个 Worker 上存在一个或者多个 ExecutorBackend 进程。每个进程包含一个 Executor 对象,该对象持有一个线程池,每个线程可以执行一个 task。

    3.Spark程序设计

    • Scala基础

      • 用函数式编程的方式可以很方便处理集合:
      var list = List(1, 2, 3)
      list.foreach(println)
      

    Spark 的RDD,封装了各种类似于Scala集合的算子map、filter、reduce等,且都是分布式执行的

    • Spark程序设计基本流程

        1. 创建SparkContext对象:定义了Spark执行环境和配置参数;注意每个Spark程序有且仅有一个SparkContext
        1. 创建RDD:从Scala集合或者在Hadoops数据集上创建
        • (1) 从Scala集合映射成RDD:sc.parallelize()创建,第二个参数是Partition数目
        val slices = 10   //Partition数目,即并行的task数目启动10个map task进行处理
        val n = 100000 * slices 
        val count = sc.parallelize(1 to n, slices).map { i => 
            val x = random * 2 - 1 
            val y = random * 2 - 1 
            (x*x + y*y < 1) 1 else 0
        }.reduce(_ + _)
        
        • (2) 将本地文件/HDFS文件映射成RDD:

          • 文本文件:
          sc.textFile("tile.txt")   //将本地文本文件加载成RDD
          sc.textFile(“hdfs://nn:9000/path/file”)   //hdfs文件或目录
          
          • sequenceFile文件:
          sc.sequenceFile(“file.txt”)  //将本地二进制文件加载成RDD
          
          • 使用任意自定义的Hadoop InputFormat
          sc.hadoopFile(path, inputFmt, keyClass, valClass)
          
          • 读取HDFS创建RDD:
          inputRdd = sc.textFile(“hdfs:///data/input”)
          inputRdd = sc.textFile(“hdfs://namenode:8020/data/input”)
          

          HDFS的datanode的Block和Spark数据的partiton是一一映射的,也和task一一映射,也就是下图所示的就会启动5个task
          Spark2

          • 读取HBase创建RDD:
          import org.apache.spark._ 
          import org.apache.hadoop.hbase.mapreduce.TableInputFormat
          
          //创建SparkContext 
          val sparkConf = new SparkConf()
          val sc = new SparkContext(conf )
          
          // 设置hbase configuration val hbaseConf = HBaseConfiguration.create() hbaseConf.addResource(new Path(“hbase-site.xml")) hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)
          
          //创建hbase RDD 
          val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
          
          //获取总行数 
          al count = hBaseRDD.count()
          
        1. 在RDD上进行转换transformation和action
        • Transformation:将一个RDD通过一种规则,映射成另一种RDD;
          Action: 返回结果或者保存结果,只有action才会触发程序的执行,注意Spark中遇到action的时候计算才会去分布式执行

          在 Spark 中,所有的转换(transformations)都是惰性(lazy)的,它们不会马上计算它们的结果。相反的,它们仅仅记录转换操作是应用到哪些基础数据集(例如一个文件)上的。转换仅仅在这个时候计算:当动作(action) 需要一个结果返回给驱动程序的时候。这个设计能够让 Spark 运行得更加高效。例如,我们可以实现:通过 map 创建一个新数据集在 reduce 中使用,并且仅仅返回 reduce 的结果给 driver,而不是整个大的映射过的数据集。

          • 常见的操作集合:
            ac
        • Transformation函数例子:

        //创建RDD 
        val listRdd = sc.parallelize(List(1, 2, 3), 3)
        
        // 将RDD传入函数,生成新的RDD 
        val squares = listRdd.map(x => x*x) // {1, 4, 9}
        
        // 对RDD中元素进行过滤,生成新的RDD 
        val even = squares.filter(_ % 2 == 0) // {4}
        
        // 将一个元素映射成多个,生成新的RDD 
        nums.flatMap(x => 1 to x) // => {1, 1, 2, 1, 2, 3}
        
        • Action函数例子:
        //创建新的RDD 
        val nums = sc.parallelize(List(1, 2, 3), 2) 
        
        // 将RDD保存为本地集合(返回到driver端) 
        nums.collect() // => Array(1, 2, 3) 
        
        // 返回前K个元素 
        nums.take(2) // => Array(1, 2) 
        
        // 计算元素总数 
        nums.count() // => 3 
        
        // 合并集合元素 
        nums.reduce(_ + _) // => 6 
        
        // 将RDD写到HDFS中 
        nums.saveAsTextFile(“hdfs://nn:8020/output”) 
        nums.saveAsSequenceFile(“hdfs://nn:8020/output”)
        
        • Key/Value类型的RDD的操作
        val pets = sc.parallelize( List((“cat”, 1), (“dog”, 1), (“cat”, 2))) 
        pets.reduceByKey(_ + _) // => {(cat, 3), (dog, 1)} 
        pets.groupByKey() // => {(cat, Seq(1, 2)), (dog, Seq(1)} 
        pets.sortByKey() // => {(cat, 1), (cat, 2), (dog, 1)}
        

        reduceByKey自动在map端进行本地combine

        • 控制ReduceTasks数目:有一个参数执行并发度
        words.reduceByKey(_ + _, 5)
        

        用户也可以通过修改spark.default.parallelism设置默认并行度
        默认并行度为最初的RDD partition数目

      留一个思考问题:那么这些操作都是怎么分布式执行的呢?

        1. 返回结果:保存到HFDS或者Hive或者HBase
        • 将结果保存的HBase:

    4. 其他RDD操作

    • Sample()从数据集采样

    • union() 合并多个RDD

    • cartesian 求笛卡尔积

    • 共享变量:Accumulators和Broadcast Variables

      一般来说上述的操作都是对数据在远端worker node上拷贝的数据进行操作,对数据的效果并不会回传

      • Accumulator(累加器,计数器)

        • 类似于MapReduce中的counter,将数据从一个节点发送到其他各个节点上去;

        • 通常用于监控,调试,记录符合某类特征的数据数目等

        import SparkContext._ 
        
        val total_counter = sc.accumulator(0L, "total_counter") 
        val counter0 = sc.accumulator(0L, "counter0") 
        val counter1 = sc.accumulator(0L, "counter1")
        
        val count = sc.parallelize(1 to n, slices).map { i => 
            total_counter += 1 
            val x = random * 2 - 1 
            val y = random * 2 – 1 
            if (x*x + y*y < 1) { 
                counter1 += 1 
                } else { 
                    counter0 += 1 
                    } 
            if (x*x + y*y < 1) 1 else 0 
        }.reduce(_ + _)
        
        
      • 广播变量

        • 广播机制 : 高效分发大对象,比如字典(map),集合(set)等,每个executor一份, 而不是每个task一份;
          引用自Spark doc里的介绍:

        Spark 的 action(动作)操作是通过一系列的 stage(阶段)进行执行的,这些 stage(阶段)是通过分布式的 “shuffle” 操作进行拆分的。Spark 会自动广播出每个 stage(阶段)内任务所需要的公共数据。这种情况下广播的数据使用序列化的形式进行缓存,并在每个任务运行前进行反序列化。这也就意味着,只有在跨越多个 stage(阶段)的多个任务会使用相同的数据,或者在使用反序列化形式的数据特别重要的情况下,使用广播变量会有比较好的效果。

        • 包括HttpBroadcast和TorrentBroadcast两种

        • HttpBroadcast与TorrentBroadcast

          • 广播
            b
        • 总结:如果一个变量非常大,每一个task计算逻辑都要用到这个变量,则应该将其广播出去,更高效

    5.Cache基本概念与使用

    • 允许将RDD缓存到内存中或磁盘上,以便于重用,如果想多次使用某个 RDD,强烈推荐在该 RDD 上调用 persist 方法.

      Spark 中一个很重要的能力是将数据 persisting 持久化(或称为 caching 缓存),在多个操作间都可以访问这些持久化的数据。当持久化一个 RDD 时,每个节点的其它分区都可以使用 RDD 在内存中进行计算,在该数据上的其他 action 操作将直接使用内存中的数据。这样会让以后的 action 操作计算速度加快(通常运行速度会加速 10 倍)。缓存是迭代算法和快速的交互式使用的重要工具。

    • Spark提供了多种缓存级别,以便于用户根据实际需求进行调整

      • 存储
    • 如何选择存储级别?

      • trade-off: Spark 的存储级别的选择,核心问题是在 memory 内存使用率和 CPU 效率之间进行权衡。

    6.基于Docker的Spark开发测试环境搭建:

    原项目可以直接在本机跑,默认是Spark的单机模式;

    • (1)首先下载镜像并启动:
    > docker pull sequenceiq/spark:1.5.1
    > sudo docker run -it sequenceiq/spark:1.5.1 bash
    

    遇到问题:在Docker中启动master的时候,ip是Docker 的地址,我的宿主机访问不到,原因是启动Docker 的时候没有端口映射,所以重新run一遍镜像:

    docker run -p 127.0.0.1:8081:8080  -it sequenceiq/spark:1.5.1 bash
    

    将宿主机的8081端口映射到Docker的8080端口

    • (2)
    cd /usr/local/spark 
    cp conf/spark-env.sh.template conf/spark-env.sh 
    vi conf/spark-env.sh
    

    加入两行代码:

    
    
    • (3) 启动Master和Slave
    ./sbin/start-master.sh 
    ./sbin/start-slave.sh 172.17.0.109:7077
    

    然后宿主机浏览器访问http://localhost:8081就可以访问到Spark UI界面惹!

    7.几个实际例子:

      1. 分布式估算Pi
    /**
      * 并行估算pi
      * Area1 = x * x , Area2 = Pi * (x / 2) * (x / 2)
      * Area1 / Area2 = 4 / pi
      * 4 / pi = x / y  => pi = 4 * y / x
      */
    
    object SparkPi {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Spark Pi").setMaster("local[1]");
        val sc = new SparkContext(conf);
        val slices = if (args.length > 0) args(0).toInt else 2;
        val areaSqure = 100000 * slices;
    
        //并行估算areaCircle的值:也就是撒areaSqure这么多个点,求落在圆内的多少个点,就近似等于圆的面积
        val areaCircle = sc.parallelize(1 to areaSqure, slices).map{i =>
          val x = new Random().nextInt() * 2 - 1
          val y = new Random().nextInt() * 2 - 1
          if (x * x + y * y < 1) 1 else 0
        }.reduce(_ + _)
    
        println("Pi is roughly " + 4.0 * areaCircle / areaSqure)
      }
    }
    
    
      1. log query

        任务:如何统计每个用户在每台机器(ip)上查询(query)的次数和返回结果累积大小(byte)?

    /**
      * 日志查询任务:统计每个用户在每台机器(ip)上查询(query)的次数和返回结果累积大小(byte)
      * 分析:key: 每个用户在每台机器上的query ,value:次数和结果累积大小(byte)
      */
    
    object LogQuery {
    
      val apacheLogRegex =
        """^([d.]+) (S+) (S+) [([wd:/]+s[+-­‐]d{4})] "(.+?)" (d{3}) ([d-­‐]+) "([^"]+)" "([^"]+)".*""".r
    
      def extractKey(line : String): (String, String, String) = {
        apacheLogRegex.findFirstIn(line) match {
          case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) =>
            if (user != ""-­‐"") (ip, user, query)
            else (null, null, null)
          case _ => (null, null, null)
        }
      }
    
      def extractStats(line: String): Stats = {
        apacheLogRegex.findFirstIn(line) match {
          case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) =>
            new Stats(1, bytes.toInt)
          case _ => new Stats(1, 0)
        }
      }
    
      class Stats(val count: Int, val numBytes: Int) extends Serializable {
        def merge(other: Stats) = new Stats(count + other.count, numBytes + other.numBytes)
        override def toString = "bytes=%s	n=%s".format(numBytes, count)
      }
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("LogQuery").setMaster("local[1]")
        val sc = new SparkContext(conf)
        val dataset = sc.textFile(args(0))
    
        dataset.map(line => (extractKey(line), extractStats(line)))
          .reduceByKey((a, b) => a.merge(b))
            .collect().foreach {
          case (user, query) => println("%s	%s".format(user, query))
        }
      }
    }
    
    
      1. 逻辑回归
        找出一条最优的线,将所有点分成两部分
    
    
  • 相关阅读:
    linux weblogic的sh文件
    linux 安装weblogic(转载)
    linux 安装jdk
    linux 用户和用户组
    测试开发工程师必备软硬能力&高级测试开发工程师需要具备什么能力?
    postman强大的团队协作功能
    requests(一): 发送一个json格式的post请求
    python操作Excel模块openpyxl
    appium环境安装app自动化
    夜神模拟器怎么连接adb
  • 原文地址:https://www.cnblogs.com/shawshawwan/p/10279673.html
Copyright © 2011-2022 走看看