zoukankan      html  css  js  c++  java
  • 大三寒假学习进度(十三)

    今天主要学习了Spark环境的搭建以及一些RDD算子的学习

    Spark环境搭建

    比起hadoop的环境搭建,要搭建起一个Spark的学习环境不要太简单。我们使用Idea创建一个Maven项目,导入scala的支持,然后导入如下依赖:

     
        <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.12</artifactId>
                <version>3.0.1</version>
            </dependency>
        </dependencies>
    

    编写一

    /**
     * @Description:
     * @author: LiuGe
     * @date: 2021/1/25
     */
    object Spark03_WordCount {
    
      def main(args: Array[String]): Unit = {
    
        val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
        val sc = new SparkContext(sparkConf)
    
        val lines: RDD[String] = sc.textFile("datas")
        val words: RDD[String] = lines.flatMap(_.split(" "))
        val wordToOne = words.map {
          word => (word, 1)
        }
        // Spark框架提供了更多的功能,可以把分组和聚合用一个方法实现
        // reduceByKey:相同的key的数据可以对value进行reduce聚合
        val wordToCount = wordToOne.reduceByKey(_ + _)
        // 5.将转换结果采集到控制台打印出来
        val array = wordToCount.collect()
        array.foreach(println)
        // 关闭连接
        sc.stop()
      }
    
    }
    

    如果控制台能打印出结果,说明我们的本地测试环境就搭建完成了。但其实真正的spark运行环境不是这样的,我这里就不记录了。

    常见RDD算子学习

    什么是RDD算子

    首先要明确的概念是RDD:

    RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

    简单说,就是Spark中最小的单位。而RDD算子指的其实就是RDD的方法(函数)

    map && flatmap && groupby && filter

    基本和scala中的功能一致,不再细说

    mapPartitions

    首先先来看代码:

    /**
     * @Description:
     * @author: LiuGe
     * @date: 2021/1/25
     */
    object Spark02_RDD_Operator_Transform {
    
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(sparkConf)
        // 算法 — mapPartitions
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
        // 可以以分区为单位进行数据转换操作
        // 但会将整个分区的数组加载到内存中进行引用
        // 处理完的数据是不会释放的,存在对象的引用
        // 内存较小,数据量较大的场合下容易出现内存溢出
        val mapRDD: RDD[Int] = rdd.mapPartitions(iter => {
          println(">>>>>>>>>>")
          iter.map(_ * 2)
        })
        mapRDD.collect()
        sc.stop()
      }
    
    }
    

    可以看到,mapPartitions相比较于map,其实就是把对每个元素的处理变成了对整个分区的处理。这样的效率会更高,但由于它处理完的数据不会自动释放,在内存不够,数据量大的时候会出现内存溢出,要注意map和mapPartitions的选择。

    mapPartitionsWithIndex

     
    /**
     * @Description:
     * @author: LiuGe
     * @date: 2021/1/25
     */
    object Spark03_RDD_Operator_Transform {
    
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(sparkConf)
        // 算法 — mapPartitions
        val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
        // [1,2], [3,4]
        val mpiRdd: RDD[Int] = rdd.mapPartitionsWithIndex((index, iter) => {
          if (index == 1) iter else Nil.iterator
        })
        mpiRdd.collect().foreach(println)
        sc.stop()
      }
    
    }
    
    

    类似的,mapPartitionsWithIndex就是加上了每个分区的编号,让我们可以实现一些特殊的需求。

    glom

     
    /**
     * @Description:
     * @author: LiuGe
     * @date: 2021/1/25
     */
    object Spark05_RDD_Operator_Transform {
    
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(sparkConf)
        // 算法 — glom
        val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4),2)
        // List => Int
        // Int => Array
        val glomRDD: RDD[Array[Int]] = rdd.glom()
        glomRDD.collect().foreach(data => println(data.mkString(",")))
        sc.stop()
      }
    
    }
    

    glom,指的是将同一个分组的数据再转换成Array。

    sample

     
    /**
     * @Description:
     * @author: LiuGe
     * @date: 2021/1/25
     */
    object Spark08_RDD_Operator_Transform {
    
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(sparkConf)
        // 算法 — sample
        val rdd = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))
        // sample算子需要传递三个参数
        // 1.第一个参数表示,抽取数据后是否将数据返回
        // 2.第二个参数表示 如果是抽取不放回的场合(基准值的概念),抽取放回的场合(条件概率)
        // 3.第三个参数 随机算法的种子 如果不传递时,会使用当前系统时间
    //    println(rdd.sample(withReplacement = false, 0.4).collect().mkString(","))
        println(rdd.sample(withReplacement = true, 2).collect().mkString(","))
    
        sc.stop()
      }
    
    }
    

    sample,指采样方法,可以随机抽取一些数据,主要用来应对数据倾斜的情况。

    distinct

     
    /**
     * @Description:
     * @author: LiuGe
     * @date: 2021/1/25
     */
    object Spark09_RDD_Operator_Transform {
    
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        val sc = new SparkContext(sparkConf)
        // 算法 — distinct
        val rdd = sc.makeRDD(List(1,2,3,4,1,2,3,4))
    
        val rdd1: RDD[Int] = rdd.distinct()
        rdd1.collect().foreach(println)
        sc.stop()
      }
    
    }
    

    去重,非常常规的功能。

    总结

    总的来说,在花费了大量时间学习scala后,发现Spark学习起来十分简单,因为和scala的语法相似度十分高,基本就相当于在写scala的程序。也是没有白学那么久的scala

  • 相关阅读:
    .net 流读取
    c#小Tip:数字格式化显示
    VS.NET优化编译速度
    Application.Run()和Form.Show()的区别
    如何利用系统函数操作文件夹及文件
    设计优秀的用户界面
    我妈过来了
    帮你免于失业的十大软件技术(转抄)
    正试图在 OS 加载程序锁内执行托管代码
    NASA World Wind
  • 原文地址:https://www.cnblogs.com/hang-hang/p/14871726.html
Copyright © 2011-2022 走看看