zoukankan      html  css  js  c++  java
  • Spark 中 RDD 的详细介绍

    RDD ---弹性分布式数据集


    RDD概述

    RDD论文

    中文版 : http://spark.apachecn.org/paper/zh/spark-rdd.html

    RDD产生背景

    为了解决开发人员能在大规模的集群中以一种容错的方式进行内存计算,提出了 RDD 的概念,而当前的很多框架对迭代式算法场景与交互性数据挖掘场景的处理性能非常差, 这个是RDDs 的提出的动机。

    什么是 RDD

    RDD 是 Spark 的计算模型。RDD(Resilient Distributed Dataset)叫做弹性的分布式数据集合,是 Spark 中最基本的数据抽象,它代表一个不可变、只读的,被分区的数据集。操作 RDD 就像操作本地集合一样,有很多的方法可以调用,使用方便,而无需关心底层的调度细节。


    创建RDD

    1 . 集合并行化创建 (通过scala集合创建) scala中的本地集合 -->Spark RDD

    spark-shell --master spark://hadoop01:7077

    scala> val arr = Array(1,2,3,4,5)
    scala> val rdd = sc.parallelize(arr)
    scala> val rdd = sc.makeRDD(arr)
    scala> rdd.collect
    res0: Array[Int] = Array(1, 2, 3, 4, 5)

    通过集合并行化方式创建RDD,适用于本地测试,做实验

    2 外部文件系统 , 比如 HDFS

    读取HDFS文件系统

    val rdd2 = sc.textFile("hdfs://hadoop01:9000/words.txt")

    读取本地文件

    val rdd2 = sc.textFile(“file:///root/words.txt”)

    scala> val rdd2 = sc.textFile("file:root/word.txt")
    scala> rdd2.collect
    res2: Array[String] = Array(hadoop hbase java, hbase java spark, java, hadoop hive hive, hive hbase)

    3 从父RDD转换成新的子RDD

    调用 Transformation 类的方法,生成新的 RDD

    只要调用transformation类的算子,都会生成一个新的RDD。RDD中的数据类型,由传入给算子的函数的返回值类型决定

    注意:action类的算子,不会生成新的 RDD

    scala> rdd.collect
    res3: Array[Int] = Array(1, 2, 3, 4, 5)

    scala> val rdd = sc.parallelize(arr)
    scala> val rdd2 = rdd.map(_*100)
    scala> rdd2.collect
    res4: Array[Int] = Array(100, 200, 300, 400, 500)

    Spark上的所有的方法 , 有一个专有的名词 , 叫做算子


    RDD分区

    说对RDD进行操作 , 实际上是操作的RDD上的每一个分区 , 分区的数量决定了并行的数量 .

    使用 rdd.partitions.size 查看分区数量

    scala> rdd.partitions.size
    res7: Int = 4

    scala> rdd2.partitions.size
    res8: Int = 4

    如果从外部创建RDD,比如从hdfs中读取数据,正常情况下,分区的数量和我们读取的文件的block块数是一致的,但是如果只有一个block块,那么分区数量是2.也就是说最低的分区数量是2

    如果是集合并行化创建得到的RDD,分区的数量,默认的和最大可用的cores数量相等。

    (--total-executor-cores > 可用的 cores? 可用的 cores:--total-executor-cores)

    集合并行化得到的RDD的分区 :

    默认情况下,一个application使用多少个cores,就有多少个分区

    分区的数量 = 运行任务的可用的cores(默认一个cores,能处理一个任务)

    可以指定分区的数量:

    通过集合并行化创建的RDD是可以任意修改分区的数量的

    val rdd = sc.makeRDD(arr,分区的数值)

    scala> val arr = Array(List(1,3),List(4,6))
    scala> val rdd3 = sc.parallelize(arr,3)
    scala> rdd3.partitions.size
    res1: Int = 3

    这种方式,多用于测试

    读取外部文件RDD的分区

    正常情况下,读取HDFS中的文件,默认情况下,读到的文件有几个block块,得到的RDD就有几个分区。

    当读取一个文件,不足一个block块的时候,会是2个分区

    默认情况下,分区的数量  = 读取的文件的block块的数量,但是至少是2个

    scala> val rdd1 = sc.textFile("hdfs://hadoop01:9000/hbase-1.2.6-bin.tar.gz")
    scala> rdd1.partitions.size
    res2: Int = 1

    scala> val rdd2 = sc.textFile("hdfs://hadoop01:9000/hadoop-2.8.3.tar.gz")
    scala> rdd2.partitions.size
    res3: Int = 1

    scala> val rdd3 = sc.textFile("hdfs://hadoop01:9000/ideaIU-2017.2.2.exe")
    scala> rdd3.partitions.size
    res4: Int = 4

    hadoop-2.8.3文件200多M,有俩个块,按说有俩个分区,hbase不到100M,有一个块,按说应该有2个分区,结果这俩个都是一个分区,是不正常的,不知道问题在哪里,希望知道的大佬指点一下

    idea文件500多M有4个块,有四个分区,是正常的

    textFile自身提供了修改分区的API

    sc.textFile(path,分区数量)

    1 这里的分区数量,不能少于读取的数据的block块的数量

    2 当设置的分区的数量大于block的数量的时候,读取数据的API会根据我们的数据进行优化

    scala> val rdd3 = sc.textFile("hdfs://hadoop01:9000/ideaIU-2017.2.2.exe",5)
    scala> rdd3.partitions.size
    res7: Int = 5

    真正的想要改变分区的数量:用算子

    repartition,coalesce,专用于修改分区数量

    读取HDFS上的数据,写入到HDFS中的数据,使用的API都是hadoop的API

    总结:

    默认情况下,分区的数量 = 读取文件的block块的数量

    分区的数量至少是2个

    通过转换类的算子

    默认情况下,分区的数量是不变的。map  flatMap  filter

    groupByKey,reduceByKey 默认是不变的,但是可以通过参数来改变

    repartition(分区数量),coalesce(分区数量),根据指定的分区数量重新分区

    union:分区数量会增加

     scala> val rdd2 = rdd1.flatMap(_.split(" ")).map((_,1))
     rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[14] at map at <console>:26
      
    scala> val rdd3 = rdd2.reduceByKey(_+_)
     rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[15] at reduceByKey at <console>:28
      
     scala> rdd2.partitions.size
     res7: Int = 3
      
     scala> rdd3.partitions.size
     res8: Int = 3
      
     scala> val rdd3 = rdd2.reduceByKey(_+_,6)
     rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[16] at reduceByKey at <console>:28
      
     scala> rdd3.partitions.size
    res9: Int = 6

    总结:

    集合并行化:

    val arr = Array[Int](1,4,5,6) –》  sc.makeRDD(arr)     RDD[Int]

    默认情况下, 分区数量 =  application使用的 cores

    sc.makeRDD(data,分区数量)

    读取HDFS数据:

    默认情况下, 分区数量 =  读取的数据的block块的数量

    至少是2个

    通过转换类的算子获取的RDD :

    默认情况下,分区的数量是不变的。

    简单来说,rdd分区数量就决定了任务的并行的数量。

  • 相关阅读:
    C#double类型转换string类型
    数据分析测试
    第三周进度
    质量属性战术——可用性战术
    开学第二周进度报告
    开学第一周进度报告
    质量属性的六个常见属性场景
    架构漫谈有感03
    架构漫谈有感02
    读架构漫谈有感--软件架构师如何工作
  • 原文地址:https://www.cnblogs.com/cnndevelop/p/14252704.html
Copyright © 2011-2022 走看看