zoukankan      html  css  js  c++  java
  • spark rdd--分区理解

    1.分区的作用

    RDD 使用分区来分布式并行处理数据, ,在分布式集群里,网络通信的代价很大,数据分区,减少网络传输可以极大提升性能。

    2.分区和 Shuffle 的关系

    分区的主要作用是用来实现并行计算, 本质上和 Shuffle 没什么关系, 但是往往在进行数据处理的时候, 例如 reduceByKey, groupByKey 等聚合操作, 需要把 Key 相同的 Value 拉取到一起进行计算, 这个时候因为这些 Key 相同的 Value 可能会坐落于不同的分区, 于是理解分区才能理解 Shuffle 的根本原理。

    Spark 中的 Shuffle 操作的特点:
    只有 Key-Value 型的 RDD 才会有 Shuffle 操作, 例如 RDD[(K, V)], 但是有一个特例, 就是 repartition 算子可以对任何数据类型 Shuffle。
    早期版本 Spark 的 Shuffle 算法是 Hash base shuffle, 后来改为 Sort base shuffle, 更适合大吞吐量的场景。

    3.spark分区操作

    3.1 创建 RDD 时指定分区数

    scala> val rdd1 = sc.parallelize(1 to 100, 6)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
     
    scala> rdd1.partitions.size
    res1: Int = 6
     
    scala> val rdd2 = sc.textFile("hdfs:///dataset/wordcount.txt", 6)
    rdd2: org.apache.spark.rdd.RDD[String] = hdfs:///dataset/wordcount.txt MapPartitionsRDD[3] at textFile at <console>:24
     
    scala> rdd2.partitions.size
    res2: Int = 7

    rdd1 是通过本地集合创建的, 创建的时候通过第二个参数指定了分区数量. rdd2 是通过读取 HDFS 中文件创建的, 同样通过第二个参数指定了分区数, 因为是从 HDFS 中读取文件, 所以最终的分区数是由 Hadoop 的 InputFormat 来指定的, 所以比指定的分区数大了一个.
    3.2 通过 coalesce 算子指定

    coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T]    #numPartitions新生成的 RDD 的分区数,shuffle是否 Shuffle

    如果 shuffle 参数指定为 false, 运行计划中确实没有 ShuffledRDD, 没有 shuffled 这个过程。
    如果 shuffle 参数指定为 true, 运行计划中有一个 ShuffledRDD, 有一个明确的显式的 shuffled 过程。
    如果 shuffle 参数指定为 false 却增加了分区数, 分区数并不会发生改变, 这是因为增加分区是一个宽依赖, 没有 shuffled 过程无法做到, 后续会详细解释宽依赖的概念。

    3.3 通过 repartition 算子指定

    repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

    repartition 算子本质上就是 coalesce(numPartitions, shuffle = true)

    scala> val source = sc.parallelize(1 to 100, 6)
    source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at parallelize at <console>:24
     
    scala> source.partitions.size
    res7: Int = 6
     
    scala> source.repartition(100).partitions.size 
    res8: Int = 100
     
    scala> source.repartition(1).partitions.size 
    res9: Int = 1

    repartition 算子无论是增加还是减少分区都是有效的, 因为本质上 repartition 会通过 shuffle 操作把数据分发给新的 RDD 的不同的分区, 只有 shuffle 操作才可能做到增大分区数, 默认情况下, 分区函数是 RoundRobin, 如果希望改变分区函数, 也就是数据分布的方式, 可以通过自定义分区函数来实现

    4.读取hdfs文件分区数

    Spark从HDFS读入文件的分区数默认等于HDFS文件的块数(blocks),HDFS中的block是分布式存储的最小单元。如果我们上传一个30GB的非压缩的文件到HDFS,HDFS默认的块容量大小128MB,因此该文件在HDFS上会被分为235块(30GB/128MB);Spark读取SparkContext.textFile()读取该文件,默认分区数等于块数即235。

    5.读取kafka分区数

    spark读取kafka数据模式为direct,分区数为topic分区数。

    6.如何设置分区数

    1、分区数越多越好吗?

    分区数太多意味着任务数太多,每次调度任务也是很耗时的,所以分区数太多会导致总体耗时增多。

    2、分区数太少会有什么影响?

    分区数少则每个分区要处理的数据量就会增大,从而对每个结点的内存要求就会提高;还有分区数不合理,会导致数据倾斜问题。

    3、合理的分区数是多少?如何设置?

    RDD分区的一个分区原则:尽可能是得分区的个数等于集群核心数目。executor * core

    4、Spark默认的分区个数?

    无论是本地模式、Standalone模式、YARN模式或Mesos模式,我们都可以通过spark.default.parallelism来配置其默认分区个数,若没有设置该值,则根据不同的集群环境确定该值。

    本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N
    Apache Mesos:默认的分区数为8
    Standalone或YARN:默认取集群中所有executor使用core总数,和2比较取最大值。对于parallelize来说,没有在方法中的指定分区数,则默认为spark.default.parallelism,对于textFile来说,没有在方法中的指定分区数,则默认为min(defaultParallelism,2),而defaultParallelism对应的就是spark.default.parallelism。如果是从hdfs上面读取文件,其分区数为文件分片数(128MB/片)

     7.分区器

    1.如果是从HDFS里面读取出来的数据,不需要分区器。因为HDFS本来就分好区了。分区数我们是可以控制的,但是没必要有分区器。

    2.非key-value RDD分区,没必要设置分区器,但是非要设置也行。

    al testRDD = sc.textFile("C:\Users\Administrator\IdeaProjects\myspark\src\main\hello.txt")
      .flatMap(line => line.split(","))
      .map(word => (word, 1)).partitionBy(new HashPartitioner(2))

    3.Key-value形式的时候,我们就有必要了。

    HashPartitioner

    val resultRDD = testRDD.reduceByKey(new HashPartitioner(2),(x:Int,y:Int) => x+ y)
    //如果不设置默认也是HashPartitoiner,分区数跟spark.default.parallelism一样
    println(resultRDD.partitioner)
    println("resultRDD"+resultRDD.getNumPartitions)

    RangePartitioner

    val resultRDD = testRDD.reduceByKey((x:Int,y:Int) => x+ y)
    val newresultRDD=resultRDD.partitionBy(new RangePartitioner[String,Int](3,resultRDD))
    println(newresultRDD.partitioner)
    println("newresultRDD"+newresultRDD.getNumPartitions)
    按照范围进行分区的,如果是字符串,那么就按字典顺序的范围划分。如果是数字,就按数据自的范围划分自定义分区

    需要实现2个方法

    class MyPartitoiner(val numParts:Int) extends  Partitioner{
      override def numPartitions: Int = numParts
      override def getPartition(key: Any): Int = {
        val domain = new URL(key.toString).getHost
        val code = (domain.hashCode % numParts)
        if (code < 0) {
          code + numParts
        } else {
          code
        }
      }
    }
    
    object DomainNamePartitioner {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("word count").setMaster("local")
    
        val sc = new SparkContext(conf)
    
        val urlRDD = sc.makeRDD(Seq(("http://baidu.com/test", 2),
          ("http://baidu.com/index", 2), ("http://ali.com", 3), ("http://baidu.com/tmmmm", 4),
          ("http://baidu.com/test", 4)))
        //Array[Array[(String, Int)]]
        // = Array(Array(),
        // Array((http://baidu.com/index,2), (http://baidu.com/tmmmm,4),
        // (http://baidu.com/test,4), (http://baidu.com/test,2), (http://ali.com,3)))
        val hashPartitionedRDD = urlRDD.partitionBy(new HashPartitioner(2))
        hashPartitionedRDD.glom().collect()
    
        //使用spark-shell --jar的方式将这个partitioner所在的jar包引进去,然后测试下面的代码
        // spark-shell --master spark://master:7077 --jars spark-rdd-1.0-SNAPSHOT.jar
        val partitionedRDD = urlRDD.partitionBy(new MyPartitoiner(2))
        val array = partitionedRDD.glom().collect()
    
      }
    }
  • 相关阅读:
    字体
    当前li的同级且不包含当前li
    溢出用省略号显示
    .NET Core中使用Cookie步骤
    .NET Core中使用Session步骤
    asp.net core 读取配置
    Asp.Net Core run on Ubuntu
    .net core中使用GB2312编码
    ubuntu mysql 安装
    samba的安装
  • 原文地址:https://www.cnblogs.com/chong-zuo3322/p/13260106.html
Copyright © 2011-2022 走看看