zoukankan      html  css  js  c++  java
  • Spark04-RDD入门

    RDD概念

    Spark 的核心是建立在统一的抽象弹性分布式数据集(Resiliennt Distributed Datasets,RDD)之上的,这使得 Spark 的各个组件可以无缝地进行集成,能够在同一个应用程序中完成大数据的处理。

    RDD是什么

    RDD,全称 弹性分布式数据集 (Resilient Distributed DataSet)。它是一种有容错机制的特殊数据集合,可以分布在集群的结点上,以函数式操作集合的方式进行各种并行操作。

    通俗点来讲,可以将 RDD 理解为一个分布式对象集合,本质上是一个只读的分区记录集合。每个 RDD 可以分成多个分区,每个分区就是一个数据集片段。一个 RDD 的不同分区可以保存到集群中的不同结点上,从而可以在集群中的不同结点上进行并行计算。

    RDD特点

    RDD 具有容错机制,并且只读不能修改,可以执行确定的转换操作创建新的 RDD。具体来讲,RDD 具有以下几个属性。

    • 只读:不能修改,只能通过转换操作生成新的 RDD。
    • 分布式:可以分布在多台机器上进行并行处理。
    • 弹性:计算过程中内存不够时它会和磁盘进行数据交换。
    • 基于内存:可以全部或部分缓存在内存中,在多次计算间重用。
    • 容错的:

    RDD 实质上是一种更为通用的迭代并行计算框架,用户可以显示控制计算的中间结果,然后将其自由运用于之后的计算。

    RDD 五大属性

    Partition List:分片列表,记录RDD的分片,可以在创建 RDD的时候指定分区数目,也可以通过算子来生成新的RDD从而改变分区数目

    Compute Function:为了实现容错,需要记录 RDD之间转换所执行的计算函数

    RDD Dependencies :RDD之间的依赖关系,要在RDD中记录其上级RDD是谁,从而实现容错和计算

    Partitioner:为了执行 Shuffled操作,必须要有一个函数用来计算数据应该发往哪个分区

    Preferred Location:优先位置,为了实现数据本地性操作,从而移动计算而不是移动存储,需要记录每个 RDD分区最好应该放在什么位置

     RDD代码入门

    主要是一个RDD大入口,sparkContext,和三种创建RDD的方法 以及 三个RDD算子初步使用,map , flatMap , reduceByKey。

    package rdd
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.junit.Test
    
    class Rdd {
      // SparkContext 创建 和 RDD 创建
      @Test
      def sparkContext(): Unit ={
        /*
        * 步骤
        * 1、创建 SparkConf,配置基本信息
        * 2、创建 SparkContext,利用上面的conf配置作为参数创建。
        * */
        val conf = new SparkConf().setMaster("local[6]").setAppName("spark_context")
        val sc = new SparkContext(conf)
    
        // sparkContext 作为 RDD 的大入口,具备许多功能,创建RDD,配置参数等。
    
    
        // 使用完后,需要对SparkContext进行关闭
      }
      // 可以将通用部分拿到方法外,类一加载就会执行,Scala的特点。
      val conf = new SparkConf().setMaster("local[6]").setAppName("spark_context")
      val sc = new SparkContext(conf)
    
      /*
      * 三种创建RDD的方式
      *   - 从本地集合创建
      *   - 从文件创建
      *   - 从 一个 RDD 衍生一个新的 RDD
      * */
      def rddCreate(): Unit ={
        /*
         * 从本地集合创建
         */
        val seq = Seq(1, 2, 3)
        sc.parallelize(seq,2) // 第二个参数可选,分区数
        sc.makeRDD(seq, 2)
    
        /*
         * 从文件创建
         */
        sc.textFile("")  // 路径传入由两种方式 hdfs://bigdata1//...   或   file://...
        // 支持aws(亚马逊) 和 阿里云 :需要指定的 API
        }
    
      /*
       * 从 一个 RDD 衍生一个新的 RDD
       * 通过rdd的三个算子:map,flatMap,reduceByKey
       */
      @Test
      def rddSuanzi(): Unit ={
        /*
        * map:将集合里的每个元素进行操作,得到一个新集合
        * */
        // 1、创建RDD
        val map_rdd1 = sc.parallelize(Seq(1, 2, 3))
        // 2、执行 map 操作
        val map_rdd2 = map_rdd1.map(item => item * 10) // 接收到的参数,方法的返回值
        // 3、获取结果 collect
        val result_map = map_rdd2.collect()
        // 打印得到结果 10 20 30
        result_map.foreach(item => print(item + "	"))
        println()
    
        /*
        * flatMap:将集合每个数据进行操作,返回的是一个个数组,falt将其展开,得到一个新的大集合
        * */
        val flat_rdd1 = sc.parallelize(Seq("hello dongao", "hello zhangsan", "hello wangwu"))
        // split() 是得到一个数组,但flat自动将其展开
        val flat_rdd2 = flat_rdd1.flatMap(item => item.split(" "))
        val result_flat = flat_rdd2.collect()
        result_flat.foreach(item => print(item + "	"))
        println()
    
        /*
        * reduceByKey:先按照key分组,然后将每个组的value聚合起来
        * */
        val reduce_rdd1 = sc.parallelize(Seq("hello dongao", "hello zhangsan", "hello wangwu"))
        val reduce_rdd2 = reduce_rdd1.flatMap(item => item.split(" ")) // 将每个单词展开
          .map(item => (item, 1)) // 将展开的每个单词,作为一个词频为 1的 元组。
          .reduceByKey( (curr, agg) => curr + agg )
        val result_reduce = reduce_rdd2.collect()
        result_reduce.foreach(item => print(item + "	"))
    
        // 停止sparkContext
        sc.stop()
      }
    }

    其中,reduceByKey 的示例如下图:

    reduceByKey 算子作用:

    首先按照 Key 分组, 接下来把整组的 Value 计算出一个聚合值, 这个操作非常类似于 MapReduce 中的 Reduce

    参数

    func → 执行数据处理的函数, 传入两个参数, 一个是当前值, 一个是局部汇总, 这个函数需要有一个输出, 输出就是这个 Key 的汇总结果,并进行更新

    注意点

    • ReduceByKey 只能作用于 Key-Value 型数据, Key-Value 型数据在当前语境中特指 Tuple2

    • ReduceByKey 是一个需要 Shuffled 的操作

    • 和其它的 Shuffled 相比, ReduceByKey是高效的, 因为类似 MapReduce 的, 在 Map 端有一个 Cominer, 这样 I/O 的数据便会减少

    map, flatMap 和 reduceByKey 算子总结

    1. map 和 flatMap 算子都是转换, 只是 flatMap 在转换过后会再执行展开, 所以 map 是一对一, flatMap 是一对多

    2. reduceByKey 类似 MapReduce 中的 Reduce

  • 相关阅读:
    u-boot编译
    本地套接字
    内核线程
    长度为0数组
    Ubuntu安装KScope
    Python基础-运算符
    如何有效地记录 Java SQL 日志?
    解谜谷歌 DevOps:什么特质可以打造世界级可靠系统?
    如何打造前所未有的服务器端监控体验?
    趣味Python入门(一):初识Python
  • 原文地址:https://www.cnblogs.com/dongao/p/14281446.html
Copyright © 2011-2022 走看看