zoukankan      html  css  js  c++  java
  • Spark学习之再谈RDD

    走进RDD

    案例

    • 给定一个网站的访问记录, 俗称 Access log 计算其中出现的独立 IP, 以及其访问的次数
    /**
     * @author noor9
     * @date 2021-01-14-16:09
     */
    class AccessLogAgg {
      @Test
      def ipAgg():Unit = {
        //1. 创建sparkContext
        val conf = new SparkConf().setMaster("local[2]").setAppName("ip_agg")
        val sc = new SparkContext(conf)
        //2. 读取文件
        val sourceRDD = sc.textFile("dataset/access_log_sample.txt")
        //3. 取出ip
        val ipRDD = sourceRDD.map(item => (item.split(" ")(0),1))
        //4. 简单清洗
        //  4.1. 去掉空数据
        //  4.2. 去掉非法数据
        //  4.3. 根据业务再调整
        val cleanRDD = ipRDD.filter(item => StringUtils.isNotEmpty(item._1))
        //5. 聚合
        val ipAggRDD = cleanRDD.reduceByKey( (curr,agg) => curr+agg)
        //6. 排序
        val sortedRDD = ipAggRDD.sortBy(item => item._2,ascending = false)//默认升序
        //7. 取出结果
        sortedRDD.take(10).foreach(item => println(item))
      }
    }
    

    为什么会出现RDD

    ​ 在 RDD 出现之前, MapReduce 是比较主流的, 而 MapReduce 如何执行迭代计算的任务呢?

    MapReduce执行任务

    ​ 可以看出多个 MapReduce 任务之间没有基于内存的数据共享方式, 只能通过磁盘来进行共享

    ​ 然而这种方式明显比较低效

    ​ 所以便引入了RDD,那RDD 如何解决迭代计算非常低效的问题呢?

    RDD执行任务

    ​ 在 Spark 中, 其实最终 Job3 从逻辑上的计算过程是: Job3 = (Job1.map).filter, 整个过程是共享内存的, 而不需要将中间结果存放在可靠的分布式文件系统中

    ​ 这种方式可以在保证容错的前提下, 提供更多的灵活, 更快的执行速度, RDD 在执行迭代型任务时候的表现可以通过下面代码体现

    // 线性回归
    val points = sc.textFile(...)
    	.map(...)
    	.persist(...)
    val w = randomValue
    for (i <- 1 to 10000) {
        val gradient = points.map(p => p.x * (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y)
        	.reduce(_ + _)
        w -= gradient
    }
    

    ​ 在这个例子中, 进行了大致 10000 次迭代, 如果在 MapReduce 中实现, 可能需要运行很多 Job, 每个 Job 之间都要通过 HDFS 共享结果, 熟快熟慢一窥便知!

    RDD特点

    RDD 不仅是数据集, 也是编程模型

    ​ RDD 即是一种数据结构, 同时也提供了上层 API, 同时 RDD 的 API 和 Scala 中对集合运算的 API 非常类似, 同样也都是各种算子02adfc1bcd91e70c1619fc6a67b13f92 RDD 的算子大致分为两类:

    • Transformation 转换操作, 例如 map flatMap filter
    • Action 动作操作, 例如 reduce collect show

    ​ 执行 RDD 的时候, 在执行到转换操作的时候, 并不会立刻执行, 直到遇见了 Action 操作, 才会触发真正的执行, 这个特点叫做 惰性求值

    RDD 可以分区

    分区示意图 RDD 是一个分布式计算框架, 所以, 一定是要能够进行分区计算的, 只有分区了, 才能利用集群的并行计算能力同时, RDD 不需要始终被具体化, 也就是说: RDD 中可以没有数据, 只要有足够的信息知道自己是从谁计算得来的就可以, 这是一种非常高效的容错方式。

    RDD 是只读

    RDD 是只读的, 不允许任何形式的修改. 虽说不能因为 RDD 和 HDFS 是只读的, 就认为分布式存储系统必须设计为只读的. 但是设计为只读的, 会显著降低问题的复杂度, 因为 RDD 需要可以容错, 可以惰性求值, 可以移动计算, 所以很难支持修改.

    • RDD2 中可能没有数据, 只是保留了依赖关系和计算函数, 那修改啥?
    • 如果因为支持修改, 而必须保存数据的话, 怎么容错?如果允许修改, 如何定位要修改的那一行?
    • RDD 的转换是粗粒度的, 也就是说, RDD 并不感知具体每一行在哪.

    RDD 是可以容错的

    RDD 的容错有两种方式

    • 保存 RDD 之间的依赖关系, 以及计算函数, 出现错误重新计算
    • 直接将 RDD 的数据存放在外部存储系统, 出现错误直接读取, Checkpoint

    什么叫做弹性分布式数据集

    • 分布式

      ​ RDD 支持分区, 可以运行在集群中

    • 弹性

      • RDD 支持高效的容错
      • RDD 中的数据即可以缓存在内存中, 也可以缓存在磁盘中, 也可以缓存在外部存储中
    • 数据集

      • RDD 可以不保存具体数据, 只保留创建自己的必备信息, 例如依赖和计算函数
      • RDD 也可以缓存起来, 相当于存储具体数据

    RDD五大属性

    • Partition List 分片列表, 记录 RDD 的分片, 可以在创建 RDD 的时候指定分区数目, 也可以通过算子来生成新的 RDD 从而改变分区数目
    • Compute Function 为了实现容错, 需要记录 RDD 之间转换所执行的计算函数
    • RDD Dependencies RDD 之间的依赖关系, 要在 RDD 中记录其上级 RDD 是谁, 从而实现容错和计算
    • Partitioner 为了执行 Shuffled 操作, 必须要有一个函数用来计算数据应该发往哪个分区
    • Preferred Location 优先位置, 为了实现数据本地性操作, 从而移动计算而不是移动存储, 需要记录每个 RDD 分区最好应该放置在什么位置
  • 相关阅读:
    poj 3304 直线与线段相交
    poj 2318 叉积+二分
    AC自动机
    MySQL报错:Packets larger than max_allowed_packet are not allowed 的解决方案
    SCOPE_IDENTITY的作用
    Truncate table、Delete与Drop table的区别
    .Net Attribute特性
    vs2010 调试快捷键
    TFS和VSS的简单对比
    做网站用UTF-8还是GB2312 & 各国语言对应字符集
  • 原文地址:https://www.cnblogs.com/xp-thebest/p/14279860.html
Copyright © 2011-2022 走看看