zoukankan      html  css  js  c++  java
  • 理解Spark的RDD

    RDD是个抽象类,定义了诸如map()、reduce()等方法,但实际上继承RDD的派生类一般只要实现两个方法:

    • def getPartitions: Array[Partition]
    • def compute(thePart: Partition, context: TaskContext): NextIterator[T]

    getPartitions()用来告知怎么将input分片;

    compute()用来输出每个Partition的所有行(行是我给出的一种不准确的说法,应该是被函数处理的一个单元);

    以一个hdfs文件HadoopRDD为例:

      override def getPartitions: Array[Partition] = {
        val jobConf = getJobConf()
        // add the credentials here as this can be called before SparkContext initialized
        SparkHadoopUtil.get.addCredentials(jobConf)
        val inputFormat = getInputFormat(jobConf)
        if (inputFormat.isInstanceOf[Configurable]) {
          inputFormat.asInstanceOf[Configurable].setConf(jobConf)
        }
        val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
        val array = new Array[Partition](inputSplits.size)
        for (i <- 0 until inputSplits.size) {
          array(i) = new HadoopPartition(id, i, inputSplits(i))
        }
        array
      }

    它直接将各个split包装成RDD了,再看compute():

      override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
        val iter = new NextIterator[(K, V)] {
    
          val split = theSplit.asInstanceOf[HadoopPartition]
          logInfo("Input split: " + split.inputSplit)
          var reader: RecordReader[K, V] = null
          val jobConf = getJobConf()
          val inputFormat = getInputFormat(jobConf)
          HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
            context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
          reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
    
          // Register an on-task-completion callback to close the input stream.
          context.addTaskCompletionListener{ context => closeIfNeeded() }
          val key: K = reader.createKey()
          val value: V = reader.createValue()
    
          // Set the task input metrics.
          val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
          try {
            /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
             * always at record boundaries, so tasks may need to read into other splits to complete
             * a record. */
            inputMetrics.bytesRead = split.inputSplit.value.getLength()
          } catch {
            case e: java.io.IOException =>
              logWarning("Unable to get input size to set InputMetrics for task", e)
          }
          context.taskMetrics.inputMetrics = Some(inputMetrics)
    
          override def getNext() = {
            try {
              finished = !reader.next(key, value)
            } catch {
              case eof: EOFException =>
                finished = true
            }
            (key, value)
          }
    
          override def close() {
            try {
              reader.close()
            } catch {
              case e: Exception => logWarning("Exception in RecordReader.close()", e)
            }
          }
        }
        new InterruptibleIterator[(K, V)](context, iter)
      }

    它调用reader返回一系列的K,V键值对。

    再来看看数据库的JdbcRDD:

      override def getPartitions: Array[Partition] = {
        // bounds are inclusive, hence the + 1 here and - 1 on end
        val length = 1 + upperBound - lowerBound
        (0 until numPartitions).map(i => {
          val start = lowerBound + ((i * length) / numPartitions).toLong
          val end = lowerBound + (((i + 1) * length) / numPartitions).toLong - 1
          new JdbcPartition(i, start, end)
        }).toArray
      }
    它直接将结果集分成numPartitions份。其中很多参数都来自于构造函数:

    class JdbcRDD[T: ClassTag](
        sc: SparkContext,
        getConnection: () => Connection,
        sql: String,
        lowerBound: Long,
        upperBound: Long,
        numPartitions: Int,
        mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)

    再看看compute()函数:

      override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] {
        context.addTaskCompletionListener{ context => closeIfNeeded() }
        val part = thePart.asInstanceOf[JdbcPartition]
        val conn = getConnection()
        val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
    
        // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results,
        // rather than pulling entire resultset into memory.
        // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html
        if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) {
          stmt.setFetchSize(Integer.MIN_VALUE)
          logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ")
        }
    
        stmt.setLong(1, part.lower)
        stmt.setLong(2, part.upper)
        val rs = stmt.executeQuery()
    
        override def getNext: T = {
          if (rs.next()) {
            mapRow(rs)
          } else {
            finished = true
            null.asInstanceOf[T]
          }
        }
    
        override def close() {
          try {
            if (null != rs && ! rs.isClosed()) {
              rs.close()
            }
          } catch {
            case e: Exception => logWarning("Exception closing resultset", e)
          }
          try {
            if (null != stmt && ! stmt.isClosed()) {
              stmt.close()
            }
          } catch {
            case e: Exception => logWarning("Exception closing statement", e)
          }
          try {
            if (null != conn && ! conn.isClosed()) {
              conn.close()
            }
            logInfo("closed connection")
          } catch {
            case e: Exception => logWarning("Exception closing connection", e)
          }
        }
      }

    这段代码就是一段sql分页查询执行情况(顺便吐槽一下,这段代码写得确实比较渣。。。确定sql里面不会在limit前面出现整形变量?有兴趣的同仁们,赶紧操起MyBatis或者Hibernate去投稿吧!)

    以上内容为本人原创,转载请注明博客地址:http://blog.csdn.net/bluejoe2000/article/details/41415087

    以下内容为转载,来自:http://developer.51cto.com/art/201309/410276_1.htm

    ◆ RDD的特点:

    1. 它是在集群节点上的不可变的、已分区的集合对象。
    2. 通过并行转换的方式来创建如(map, filter, join, etc)。
    3. 失败自动重建。
    4. 可以控制存储级别(内存、磁盘等)来进行重用。
    5. 必须是可序列化的。
    6. 是静态类型的。

    ◆ RDD的好处

    1. RDD只能从持久存储或通过Transformations操作产生,相比于分布式共享内存(DSM)可以更高效实现容错,对于丢失部分数据分区只需根据它的lineage就可重新计算出来,而不需要做特定的Checkpoint。
    2. RDD的不变性,可以实现类Hadoop MapReduce的推测式执行。
    3. RDD的数据分区特性,可以通过数据的本地性来提高性能,这与Hadoop MapReduce是一样的。
    4. RDD都是可序列化的,在内存不足时可自动降级为磁盘存储,把RDD存储于磁盘上,这时性能会有大的下降但不会差于现在的MapReduce。

    ◆ RDD的存储与分区

    1. 用户可以选择不同的存储级别存储RDD以便重用。
    2. 当前RDD默认是存储于内存,但当内存不足时,RDD会spill到disk。
    3. RDD在需要进行分区把数据分布于集群中时会根据每条记录Key进行分区(如Hash 分区),以此保证两个数据集在Join时能高效。

    ◆ RDD的内部表示

    在RDD的内部实现中每个RDD都可以使用5个方面的特性来表示:

    1. 分区列表(数据块列表)
    2. 计算每个分片的函数(根据父RDD计算出此RDD)
    3. 对父RDD的依赖列表
    4. 对key-value RDD的Partitioner【可选】
    5. 每个数据分片的预定义地址列表(如HDFS上的数据块的地址)【可选】

    ◆ RDD的存储级别

    RDD根据useDisk、useMemory、deserialized、replication四个参数的组合提供了11种存储级别:

    1. val NONE = new StorageLevel(falsefalsefalse)   
    2.     val DISK_ONLY = new StorageLevel(truefalsefalse)   
    3.     val DISK_ONLY_2 = new StorageLevel(truefalsefalse, 2)   
    4.     val MEMORY_ONLY = new StorageLevel(falsetruetrue)   
    5.     val MEMORY_ONLY_2 = new StorageLevel(falsetruetrue, 2)   
    6.     val MEMORY_ONLY_SER = new StorageLevel(falsetruefalse)   
    7.     val MEMORY_ONLY_SER_2 = new StorageLevel(falsetruefalse, 2)   
    8.     val MEMORY_AND_DISK = new StorageLevel(truetruetrue)   
    9.     val MEMORY_AND_DISK_2 = new StorageLevel(truetruetrue, 2)   
    10.     val MEMORY_AND_DISK_SER = new StorageLevel(truetruefalse)   
    11.     val MEMORY_AND_DISK_SER_2 = new StorageLevel(truetruefalse, 2)  

    ◆ RDD定义了各种操作,不同类型的数据由不同的RDD类抽象表示,不同的操作也由RDD进行抽实现。

    RDD的生成

    ◆ RDD有两种创建方式:

    1、从Hadoop文件系统(或与Hadoop兼容的其它存储系统)输入(例如HDFS)创建。

    2、从父RDD转换得到新RDD。

    ◆ 下面来看一从Hadoop文件系统生成RDD的方式,如:val file = spark.textFile("hdfs://..."),file变量就是RDD(实际是HadoopRDD实例),生成的它的核心代码如下:

    1. // SparkContext根据文件/目录及可选的分片数创建RDD, 这里我们可以看到Spark与Hadoop MapReduce很像   
    2.    // 需要InputFormat, Key、Value的类型,其实Spark使用的Hadoop的InputFormat, Writable类型。   
    3.    def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {   
    4.        hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable],   
    5.        classOf[Text], minSplits) .map(pair => pair._2.toString) }  
    6.    
    7.    // 根据Hadoop配置,及InputFormat等创建HadoopRDD    
    8.    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) 

    ◆ 对RDD进行计算时,RDD从HDFS读取数据时与Hadoop MapReduce几乎一样的:

    RDD的转换与操作

    ◆ 对于RDD可以有两种计算方式:转换(返回值还是一个RDD)与操作(返回值不是一个RDD)。

    ◆ 转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。

    ◆ 操作(Actions) (如:count, collect, save等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。


  • 相关阅读:
    【深度学习系列1】 深度学习在腾讯的平台化和应用实践
    js复制button在ie下的解决方式
    兔子--Calling startActivity() from outside of an Activity context requires the FLAG_ACTIVITY_NEW_TASK
    UART串口协议基础1
    高校站点群建设方案简单介绍
    oracle10G之前介质下载地址【珍藏版】
    程序猿打新总结 6月份 新股申购秘籍
    斯坦福IOS开发第五课(第一部分)
    O2O领域添新军,正品网加快布局的战略考量
    如风一样,飞翔------Day37
  • 原文地址:https://www.cnblogs.com/bluejoe/p/5115912.html
Copyright © 2011-2022 走看看