zoukankan      html  css  js  c++  java
  • spark学习进度11(RDD分区和我shuffle以及缓存)

    一、RDD的分区和Shuffle

    目标
    1. RDD 的分区操作

    2. Shuffle 的原理

    分区的作用

    RDD 使用分区来分布式并行处理数据, 并且要做到尽量少的在不同的 Executor 之间使用网络交换数据, 所以当使用 RDD 读取数据的时候, 会尽量的在物理上靠近数据源, 比如说在读取 Cassandra 或者 HDFS 中数据的时候, 会尽量的保持 RDD 的分区和数据源的分区数, 分区模式等一一对应

    分区和 Shuffle 的关系

    分区的主要作用是用来实现并行计算, 本质上和 Shuffle 没什么关系, 但是往往在进行数据处理的时候, 例如`reduceByKey`, `groupByKey`等聚合操作, 需要把 Key 相同的 Value 拉取到一起进行计算, 这个时候因为这些 Key 相同的 Value 可能会坐落于不同的分区, 于是理解分区才能理解 Shuffle 的根本原理

    Spark 中的 Shuffle 操作的特点
    • 只有 Key-Value 型的 RDD 才会有 Shuffle 操作, 例如 RDD[(K, V)], 但是有一个特例, 就是 repartition 算子可以对任何数据类型 Shuffle

    • 早期版本 Spark 的 Shuffle 算法是 Hash base shuffle, 后来改为 Sort base shuffle, 更适合大吞吐量的场景

      1、查看RDD分区

     

     

      2、指定RDD分区

        -----在本地集合创建的时候指定分区数

     

        -----通过读取文件创建的时候指定分区数

            

         -----如何进行重分区coalese和repatitions

      

     

           -----通过其他算子指定分区数(很多算子都可以指定分区数例如partitioner分区函数)

    很多算子都可以指定分区数:

      1、一般情况下设计shuffle操作的算子都运行指定分区数

      2、一般这些算子,可以在最后一个参数的位置传入新的分区数

      3、如果没有重新指定分区数,默认从父RDD中继承分区数

    partitioner分区函数“:

        -----shuffle过程

    23377ac4a368fc94b6f8f3117af67154
    10b536c17409ec37fa1f1b308b2b521e

    reduceByKey 这个算子本质上就是先按照 Key 分组, 后对每一组数据进行 reduce, 所面临的挑战就是 Key 相同的所有数据可能分布在不同的 Partition 分区中, 甚至可能在不同的节点中, 但是它们必须被共同计算.

    为了让来自相同 Key 的所有数据都在 reduceByKey 的同一个 reduce 中处理, 需要执行一个 all-to-all 的操作, 需要在不同的节点(不同的分区)之间拷贝数据, 必须跨分区聚集相同 Key 的所有数据, 这个过程叫做 Shuffle.

            --------------------shuffle的原理---------

    Spark 的 Shuffle 发展大致有两个阶段: Hash base shuffle 和 Sort base shuffle

    Hash base shuffle
    2daf43cc1750fffab62ae5e16fab54c2

    大致的原理是分桶, 假设 Reducer 的个数为 R, 那么每个 Mapper 有 R 个桶, 按照 Key 的 Hash 将数据映射到不同的桶中, Reduce 找到每一个 Mapper 中对应自己的桶拉取数据.

    假设 Mapper 的个数为 M, 整个集群的文件数量是 M * R, 如果有 1,000 个 Mapper 和 Reducer, 则会生成 1,000,000 个文件, 这个量非常大了.

    过多的文件会导致文件系统打开过多的文件描述符, 占用系统资源. 所以这种方式并不适合大规模数据的处理, 只适合中等规模和小规模的数据处理, 在 Spark 1.2 版本中废弃了这种方式.

    Sort base shuffle
    94f038994f8553dd32370ae78878d038

    对于 Sort base shuffle 来说, 每个 Map 侧的分区只有一个输出文件, Reduce 侧的 Task 来拉取, 大致流程如下

    1. Map 侧将数据全部放入一个叫做 AppendOnlyMap 的组件中, 同时可以在这个特殊的数据结构中做聚合操作

    2. 然后通过一个类似于 MergeSort 的排序算法 TimSort 对 AppendOnlyMap 底层的 Array 排序

      • 先按照 Partition ID 排序, 后按照 Key 的 HashCode 排序

    3. 最终每个 Map Task 生成一个 输出文件, Reduce Task 来拉取自己对应的数据

    从上面可以得到结论, Sort base shuffle 确实可以大幅度减少所产生的中间文件, 从而能够更好的应对大吞吐量的场景, 在 Spark 1.2 以后, 已经默认采用这种方式.

    但是需要大家知道的是, Spark 的 Shuffle 算法并不只是这一种, 即使是在最新版本, 也有三种 Shuffle 算法, 这三种算法对每个 Map 都只产生一个临时文件, 但是产生文件的方式不同, 一种是类似 Hash 的方式, 一种是刚才所说的 Sort, 一种是对 Sort 的一种优化(使用 Unsafe API 直接申请堆外内存)

    二、RDD的缓存

      @Test
      def prepare(): Unit = {
        // 1. 创建 SC
        val conf = new SparkConf().setAppName("cache_prepare").setMaster("local[6]")
        val sc = new SparkContext(conf)
    
        // 2. 读取文件
        val source = sc.textFile("dataset/access_log_sample.txt")
    
        // 3. 取出IP, 赋予初始频率
        val countRDD = source.map( item => (item.split(" ")(0), 1) )
    
        // 4. 数据清洗
        val cleanRDD = countRDD.filter( item => StringUtils.isNotEmpty(item._1) )
    
        // 5. 统计IP出现的次数(聚合)
        val aggRDD = cleanRDD.reduceByKey( (curr, agg) => curr + agg )
    
        // 6. 统计出现次数最少的IP(得出结论)
        val lessIp = aggRDD.sortBy(item => item._2, ascending = true).first()
    
        // 7. 统计出现次数最多的IP(得出结论)
        val moreIp = aggRDD.sortBy(item => item._2, ascending = false).first()
    
        println((lessIp, moreIp))
      }

    概要
    1. 缓存的意义

    2. 缓存相关的 API

    3. 缓存级别以及最佳实践

    4.1. 缓存的意义

    使用缓存的原因 - 多次使用 RDD

    需求: 在日志文件中找到访问次数最少的 IP 和访问次数最多的 IP

    val conf = new SparkConf().setMaster("local[6]").setAppName("debug_string")
    val sc = new SparkContext(conf)
    
    val interimRDD = sc.textFile("dataset/access_log_sample.txt")
      .map(item => (item.split(" ")(0), 1))
      .filter(item => StringUtils.isNotBlank(item._1))
      .reduceByKey((curr, agg) => curr + agg) 
    
    val resultLess = interimRDD.sortBy(item => item._2, ascending = true).first()
    val resultMore = interimRDD.sortBy(item => item._2, ascending = false).first()
    
    println(s"出现次数最少的 IP : $resultLess, 出现次数最多的 IP : $resultMore")
    
    sc.stop()
      这是一个 Shuffle 操作, Shuffle 操作会在集群内进行数据拷贝

    在上述代码中, 多次使用到了 interimRDD, 导致文件读取两次, 计算两次, 有没有什么办法增进上述代码的性能?

    使用缓存的原因 - 容错
    20190511163654

    当在计算 RDD3 的时候如果出错了, 会怎么进行容错?

    会再次计算 RDD1 和 RDD2 的整个链条, 假设 RDD1 和 RDD2 是通过比较昂贵的操作得来的, 有没有什么办法减少这种开销?

      上述两个问题的解决方案其实都是 缓存, 除此之外, 使用缓存的理由还有很多, 但是总结一句, 就是缓存能够帮助开发者在进行一些昂贵操作后, 将其结果保存下来, 以便下次使用无需再次执行, 缓存能够显著的 提升性能.

      所以, 缓存适合在一个 RDD 需要重复多次利用, 并且还不是特别大的情况下使用, 例如迭代计算等场景.

    @Test
      def cache(): Unit = {
        // 1. 创建 SC
        val conf = new SparkConf().setAppName("cache_prepare").setMaster("local[6]")
        val sc = new SparkContext(conf)
    
        // 2. 读取文件
        val source = sc.textFile("dataset/access_log_sample.txt")
    
        // 3. 取出IP, 赋予初始频率
        val countRDD = source.map( item => (item.split(" ")(0), 1) )
    
        // 4. 数据清洗
        val cleanRDD = countRDD.filter( item => StringUtils.isNotEmpty(item._1) )
    
        // 5. 统计IP出现的次数(聚合)
        var aggRDD = cleanRDD.reduceByKey( (curr, agg) => curr + agg )
    
        aggRDD=aggRDD.cache()
        //action之前进行缓存
    
        // 6. 统计出现次数最少的IP(得出结论)
        val lessIp = aggRDD.sortBy(item => item._2, ascending = true).first()
    
        // 7. 统计出现次数最多的IP(得出结论)
        val moreIp = aggRDD.sortBy(item => item._2, ascending = false).first()
    
        println((lessIp, moreIp))
      }
      

     

    @Test
      def persist(): Unit = {
        // 1. 创建 SC
        val conf = new SparkConf().setAppName("cache_prepare").setMaster("local[6]")
        val sc = new SparkContext(conf)
    
        // 2. 读取文件
        val source = sc.textFile("dataset/access_log_sample.txt")
    
        // 3. 取出IP, 赋予初始频率
        val countRDD = source.map( item => (item.split(" ")(0), 1) )
    
        // 4. 数据清洗
        val cleanRDD = countRDD.filter( item => StringUtils.isNotEmpty(item._1) )
    
        // 5. 统计IP出现的次数(聚合)
        var aggRDD = cleanRDD.reduceByKey( (curr, agg) => curr + agg )
    
        aggRDD=aggRDD.persist()
        //action之前进行缓存
    
        // 6. 统计出现次数最少的IP(得出结论)
        val lessIp = aggRDD.sortBy(item => item._2, ascending = true).first()
    
        // 7. 统计出现次数最多的IP(得出结论)
        val moreIp = aggRDD.sortBy(item => item._2, ascending = false).first()
    
        println((lessIp, moreIp))
      }

    @Test
      def persist(): Unit = {
        // 1. 创建 SC
        val conf = new SparkConf().setAppName("cache_prepare").setMaster("local[6]")
        val sc = new SparkContext(conf)
    
        // 2. 读取文件
        val source = sc.textFile("dataset/access_log_sample.txt")
    
        // 3. 取出IP, 赋予初始频率
        val countRDD = source.map( item => (item.split(" ")(0), 1) )
    
        // 4. 数据清洗
        val cleanRDD = countRDD.filter( item => StringUtils.isNotEmpty(item._1) )
    
        // 5. 统计IP出现的次数(聚合)
        var aggRDD = cleanRDD.reduceByKey( (curr, agg) => curr + agg )
    
        aggRDD=aggRDD.persist(StorageLevel.MEMORY_ONLY)
        //action之前进行缓存
    
        // 6. 统计出现次数最少的IP(得出结论)
        val lessIp = aggRDD.sortBy(item => item._2, ascending = true).first()
    
        // 7. 统计出现次数最多的IP(得出结论)
        val moreIp = aggRDD.sortBy(item => item._2, ascending = false).first()
    
        println((lessIp, moreIp))
      }

    缓存级别

    其实如何缓存是一个技术活, 有很多细节需要思考, 如下

    • 是否使用磁盘缓存?

    • 是否使用内存缓存?

    • 是否使用堆外内存?

    • 缓存前是否先序列化?

    • 是否需要有副本?

    如果要回答这些信息的话, 可以先查看一下 RDD 的缓存级别对象

    val conf = new SparkConf().setMaster("local[6]").setAppName("debug_string")
    val sc = new SparkContext(conf)
    
    val interimRDD = sc.textFile("dataset/access_log_sample.txt")
      .map(item => (item.split(" ")(0), 1))
      .filter(item => StringUtils.isNotBlank(item._1))
      .reduceByKey((curr, agg) => curr + agg)
      .persist()
    
    println(interimRDD.getStorageLevel)
    
    sc.stop()

    打印出来的对象是 StorageLevel, 其中有如下几个构造参数

    20190511170124

    根据这几个参数的不同, StorageLevel 有如下几个枚举对象

    20190511170338
    缓存级别userDisk 是否使用磁盘useMemory 是否使用内存useOffHeap 是否使用堆外内存deserialized 是否以反序列化形式存储replication 副本数

    NONE

    false

    false

    false

    false

    1

    DISK_ONLY

    true

    false

    false

    false

    1

    DISK_ONLY_2

    true

    false

    false

    false

    2

    MEMORY_ONLY

    false

    true

    false

    true

    1

    MEMORY_ONLY_2

    false

    true

    false

    true

    2

    MEMORY_ONLY_SER

    false

    true

    false

    false

    1

    MEMORY_ONLY_SER_2

    false

    true

    false

    false

    2

    MEMORY_AND_DISK

    true

    true

    false

    true

    1

    MEMORY_AND_DISK

    true

    true

    false

    true

    2

    MEMORY_AND_DISK_SER

    true

    true

    false

    false

    1

    MEMORY_AND_DISK_SER_2

    true

    true

    false

    false

    2

    OFF_HEAP

    true

    true

    true

    false

    1

    如何选择分区级别

    Spark 的存储级别的选择,核心问题是在 memory 内存使用率和 CPU 效率之间进行权衡。建议按下面的过程进行存储级别的选择:

    如果您的 RDD 适合于默认存储级别(MEMORY_ONLY),leave them that way。这是 CPU 效率最高的选项,允许 RDD 上的操作尽可能快地运行.

    如果不是,试着使用 MEMORY_ONLY_SER 和 selecting a fast serialization library 以使对象更加节省空间,但仍然能够快速访问。(Java和Scala)

    不要溢出到磁盘,除非计算您的数据集的函数是昂贵的,或者它们过滤大量的数据。否则,重新计算分区可能与从磁盘读取分区一样快.

    如果需要快速故障恢复,请使用复制的存储级别(例如,如果使用 Spark 来服务 来自网络应用程序的请求)。All 存储级别通过重新计算丢失的数据来提供完整的容错能力,但复制的数据可让您继续在 RDD 上运行任务,而无需等待重新计算一个丢失的分区.

  • 相关阅读:
    【python】python读写文件,都不乱码
    【python】python读取文件报错UnicodeDecodeError: 'gbk' codec can't decode byte 0xac in position 2: illegal multibyte sequence
    【python】python编码方式,chardet编码识别库
    【IntelliJ IDEA】idea设置UTF-8的位置
    【IntelliJ IDEA】idea或者JetBrains公司所有编辑器,设置其软件的字体样式
    【java】判断某段字符串的编码方式,并按照新的编码方式输出
    editplus多行合并成一行
    java解决手机上传竖拍照片旋转90180270度问题
    利用exif.js解决手机上传竖拍照片旋转90180270度问题
    win10下搭建storm环境
  • 原文地址:https://www.cnblogs.com/dazhi151/p/14258061.html
Copyright © 2011-2022 走看看