zoukankan      html  css  js  c++  java
  • RDD学习

    背景:

     RDD:弹性分布式数据集,是Spark中最基本的数据抽象,用来表示分布式集合,支持分布式操作!

    五大属性

    • 分区列表: A list of partitions

    • 计算函数: A function for computing each split

    • 依赖关系: A list of dependencies on other RDDs

    • 分区器: Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

    • 计算位置:Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

    WordCount中的RDD的五大属性

    RDD的创建

    RDD中的数据可以来源于2个地方:本地集合或外部数据源

     代码:

    package cn.itcast.core
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * Author itcast
     * Desc 演示RDD的创建
     */
    object RDDDemo01_Create {
      def main(args: Array[String]): Unit = {
        //TODO 0.env/创建环境
        val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        sc.setLogLevel("WARN")
    
        //TODO 1.source/加载数据/创建RDD
        val rdd1: RDD[Int] = sc.parallelize(1 to 10) //12
        val rdd2: RDD[Int] = sc.parallelize(1 to 10,3) //3
    
        val rdd3: RDD[Int] = sc.makeRDD(1 to 10)//底层是parallelize //12
        val rdd4: RDD[Int] = sc.makeRDD(1 to 10,4) //4
    
        //RDD[一行行的数据]
        val rdd5: RDD[String] = sc.textFile("data/input/words.txt")//2
        val rdd6: RDD[String] = sc.textFile("data/input/words.txt",3)//3
        //RDD[一行行的数据]
        val rdd7: RDD[String] = sc.textFile("data/input/ratings10")//10
        val rdd8: RDD[String] = sc.textFile("data/input/ratings10",3)//10
        //RDD[(文件名, 一行行的数据),(文件名, 一行行的数据)....]
        val rdd9: RDD[(String, String)] = sc.wholeTextFiles("data/input/ratings10")//2
        val rdd10: RDD[(String, String)] = sc.wholeTextFiles("data/input/ratings10",3)//3
    
        println(rdd1.getNumPartitions)//12 //底层partitions.length     系统的逻辑处理器个数
        println(rdd2.partitions.length)//3
        println(rdd3.getNumPartitions)//12
        println(rdd4.getNumPartitions)//4
        println(rdd5.getNumPartitions)//2
        println(rdd6.getNumPartitions)//3
        println(rdd7.getNumPartitions)//10
        println(rdd8.getNumPartitions)//10
        println(rdd9.getNumPartitions)//2
        println(rdd10.getNumPartitions)//3
    
        //TODO 2.transformation
        //TODO 3.sink/输出
      }
    }

     

    RDD操作

    分类

    基本算子/操作/方法/API

    map

    faltMap

    filter

    foreach

    saveAsTextFile

    package cn.itcast.core
    
    import org.apache.commons.lang3.StringUtils
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * Author itcast
     * Desc 演示RDD的基本操作
     */
    object RDDDemo02_Basic {
      def main(args: Array[String]): Unit = {
        //TODO 0.env/创建环境
        val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        sc.setLogLevel("WARN")
    
        //TODO 1.source/加载数据/创建RDD
        val lines: RDD[String] = sc.textFile("data/input/words.txt") //2
    
        //TODO 2.transformation
        val result: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_))
          .flatMap(_.split(" "))
          .map((_, 1))
          .reduceByKey(_ + _)
    
        //TODO 3.sink/输出/action
        result.foreach(println)
        result.saveAsTextFile("data/output/result4")
      }
    }

    分区操作

     代码:

    package cn.itcast.core
    
    import org.apache.commons.lang3.StringUtils
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * Author itcast
     * Desc 演示RDD的分区操作
     */
    object RDDDemo03_PartitionOperation {
      def main(args: Array[String]): Unit = {
        //TODO 0.env/创建环境
        val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        sc.setLogLevel("WARN")
    
        //TODO 1.source/加载数据/创建RDD
        val lines: RDD[String] = sc.textFile("data/input/words.txt")
    
        //TODO 2.transformation
        val result: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_))
          .flatMap(_.split(" "))
          //.map((_, 1)) //注意:map是针对分区中的每一条数据进行操作
          /*.map(word=>{
            //开启连接--有几条数据就执行几次
            (word,1)
            //关闭连接
          })*/
          // f: Iterator[T] => Iterator[U]
          .mapPartitions(iter=>{//注意:mapPartitions是针对每个分区进行操作
            //开启连接--有几个分区就执行几次
            iter.map((_, 1))//注意:这里是作用在该分区的每一条数据上
            //关闭连接
          })
          .reduceByKey(_ + _)
    
        //TODO 3.sink/输出/action
        //Applies a function f to all elements of this RDD.
        /*result.foreach(i=>{
          //开启连接--有几条数据就执行几次
          println(i)
          //关闭连接
        })*/
        //Applies a function f to each partition of this RDD.
        result.foreachPartition(iter=>{
          //开启连接--有几个分区就执行几次
          iter.foreach(println)
          //关闭连接
        })
    
    
        //result.saveAsTextFile("data/output/result4")
      }
    }

    重分区操作

     

     代码:

    package cn.itcast.core
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * Author itcast
     * Desc 演示RDD的重分区
     */
    object RDDDemo04_RePartition{
      def main(args: Array[String]): Unit = {
        //TODO 0.env/创建环境
        val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        sc.setLogLevel("WARN")
    
        //TODO 1.source/加载数据/创建RDD
        val rdd1: RDD[Int] = sc.parallelize(1 to 10) //8
        //repartition可以增加或减少分区,注意:原来的不变
        val rdd2: RDD[Int] = rdd1.repartition(9)//底层是coalesce(分区数,shuffle=true)
        val rdd3: RDD[Int] = rdd1.repartition(7)
        println(rdd1.getNumPartitions)//8
        println(rdd2.getNumPartitions)//9
        println(rdd3.getNumPartitions)//7
    
        //coalesce默认只能减少分区,除非把shuffle指定为true,注意:原来的不变
        val rdd4: RDD[Int] = rdd1.coalesce(9)//底层是coalesce(分区数,shuffle=false)
        val rdd5: RDD[Int] = rdd1.coalesce(7)
        val rdd6: RDD[Int] = rdd1.coalesce(9,true)
        println(rdd4.getNumPartitions)//8
        println(rdd5.getNumPartitions)//7
        println(rdd6.getNumPartitions)//9
    
        //TODO 2.transformation
        //TODO 3.sink/输出
      }
    }

    聚合操作

     代码:

    package cn.itcast.core
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * Author itcast
     * Desc 演示RDD的聚合-没有key
     */
    object RDDDemo05_Aggregate_NoKey{
      def main(args: Array[String]): Unit = {
        //TODO 0.env/创建环境
        val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        sc.setLogLevel("WARN")
    
        //TODO 1.source/加载数据/创建RDD
        val rdd1: RDD[Int] = sc.parallelize(1 to 10) //8
        //TODO 2.transformation
    
        //TODO 3.sink/输出/Action
        //需求求rdd1中各个元素的和
        println(rdd1.sum())      //返回Double类型
        println(rdd1.reduce(_ + _))
        println(rdd1.fold(0)(_ + _))
        //aggregate(初始值)(局部聚合, 全局聚合)
        println(rdd1.aggregate(0)(_ + _, _ + _))
        //55.0
        //55
        //55
        //55
      }
    }

     

    package cn.itcast.core
    
    import org.apache.commons.lang3.StringUtils
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * Author itcast
     * Desc 演示RDD的聚合-有key
     */
    object RDDDemo06_Aggregate_Key {
      def main(args: Array[String]): Unit = {
        //TODO 0.env/创建环境
        val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        sc.setLogLevel("WARN")
    
        //TODO 1.source/加载数据/创建RDD
        //RDD[一行行的数据]
        val lines: RDD[String] = sc.textFile("data/input/words.txt")
    
        //TODO 2.transformation
        //RDD[(单词, 1)]
        val wordAndOneRDD: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_))
          .flatMap(_.split(" "))
          .map((_, 1))
        //分组+聚合
        //groupBy/groupByKey + sum/reduce
        //wordAndOneRDD.groupBy(_._1)
        val grouped: RDD[(String, Iterable[Int])] = wordAndOneRDD.groupByKey()
        //grouped.mapValues(_.reduce(_+_))
        val result: RDD[(String, Int)] = grouped.mapValues(_.sum)
        //reduceByKey
        val result2: RDD[(String, Int)] = wordAndOneRDD.reduceByKey(_+_)
        //foldByKey
        val result3: RDD[(String, Int)] = wordAndOneRDD.foldByKey(0)(_+_)
        //aggregateByKeye(初始值)(局部聚合, 全局聚合)
        val result4: RDD[(String, Int)] = wordAndOneRDD.aggregateByKey(0)(_ + _, _ + _)
    
        //TODO 3.sink/输出
        result.foreach(println)
        result2.foreach(println)
        result3.foreach(println)
        result4.foreach(println)
      }
    }

    面试题:reduceByKey和groupByKey的区别

     

    关联操作

    package cn.itcast.core
    
    import org.apache.commons.lang3.StringUtils
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * Author itcast
     * Desc 演示RDD的join
     */
    object RDDDemo07_Join {
      def main(args: Array[String]): Unit = {
        //TODO 0.env/创建环境
        val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        sc.setLogLevel("WARN")
    
        //TODO 1.source/加载数据/创建RDD
        //员工集合:RDD[(部门编号, 员工姓名)]
        val empRDD: RDD[(Int, String)] = sc.parallelize(
          Seq((1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"))
        )
        //部门集合:RDD[(部门编号, 部门名称)]
        val deptRDD: RDD[(Int, String)] = sc.parallelize(
          Seq((1001, "销售部"), (1002, "技术部"), (1004, "客服部"))
        )
    
        //TODO 2.transformation
        //需求:求员工对应的部门名称
        val result1: RDD[(Int, (String, String))] = empRDD.join(deptRDD)
        val result2: RDD[(Int, (String, Option[String]))] = empRDD.leftOuterJoin(deptRDD)
        val result3: RDD[(Int, (Option[String], String))] = empRDD.rightOuterJoin(deptRDD)
    
    
        //TODO 3.sink/输出
        result1.foreach(println)
        result2.foreach(println)
        result3.foreach(println)
        //(1002,(lisi,技术部))
        //(1001,(zhangsan,销售部))
    
        //(1002,(lisi,Some(技术部)))
        //(1001,(zhangsan,Some(销售部)))
        //(1003,(wangwu,None))
    
        //(1004,(None,客服部))
        //(1001,(Some(zhangsan),销售部))
        //(1002,(Some(lisi),技术部))
    
      }
    }

    排序操作

    https://www.runoob.com/w3cnote/ten-sorting-algorithm.html

    sortBy

    sortByKey

    top

    package cn.itcast.core
    
    import org.apache.commons.lang3.StringUtils
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * Author itcast
     * Desc 演示RDD的排序
     */
    object RDDDemo08_Sort {
      def main(args: Array[String]): Unit = {
        //TODO 0.env/创建环境
        val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        sc.setLogLevel("WARN")
    
        //TODO 1.source/加载数据/创建RDD
        //RDD[一行行的数据]
        val lines: RDD[String] = sc.textFile("data/input/words.txt")
    
        //TODO 2.transformation
        //RDD[(单词, 数量)]
        val result: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_))
          .flatMap(_.split(" "))
          .map((_, 1))
          .reduceByKey(_ + _)
    
        //需求:对WordCount的结果进行排序,取出top3
        val sortResult1: Array[(String, Int)] = result
          .sortBy(_._2, false) //按照数量降序排列
          .take(3)//取出前3个
    
        //result.map(t=>(t._2,t._1))
        val sortResult2: Array[(Int, String)] = result.map(_.swap)    //swap是元组交换位置
          .sortByKey(false)//按照数量降序排列
          .take(3)//取出前3个
    
        val sortResult3: Array[(String, Int)] = result.top(3)(Ordering.by(_._2)) //topN默认就是降序
    
        //TODO 3.sink/输出
        result.foreach(println)
        //(hello,4)
        //(you,2)
        //(me,1)
        //(her,3)
        sortResult1.foreach(println)
        //(hello,4)
        //(her,3)
        //(you,2)
        sortResult2.foreach(println)
        //(4,hello)
        //(3,her)
        //(2,you)
        sortResult3.foreach(println)
        //(hello,4)
        //(her,3)
        //(you,2)
    
      }
    }

    RDD的缓存/持久化

    API

     

     源码

     缓存级别

    代码演示

    package cn.itcast.core
    
    import org.apache.commons.lang3.StringUtils
    import org.apache.spark.rdd.RDD
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * Author itcast
     * Desc 演示RDD的缓存/持久化
     */
    object RDDDemo09_Cache_Persist{
      def main(args: Array[String]): Unit = {
        //TODO 0.env/创建环境
        val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        sc.setLogLevel("WARN")
    
        //TODO 1.source/加载数据/创建RDD
        //RDD[一行行的数据]
        val lines: RDD[String] = sc.textFile("data/input/words.txt")
    
        //TODO 2.transformation
        //RDD[(单词, 数量)]
        val result: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_))
          .flatMap(_.split(" "))
          .map((_, 1))
          .reduceByKey(_ + _)
    
        //TODO =====注意:resultRDD在后续会被频繁使用到,且该RDD的计算过程比较复杂,所以为了提高后续访问该RDD的效率,应该将该RDD放到缓存中
        //result.cache()//底层persist()
        //result.persist()//底层persist(StorageLevel.MEMORY_ONLY)
        result.persist(StorageLevel.MEMORY_AND_DISK)//底层persist(StorageLevel.MEMORY_ONLY)
    
    
        //需求:对WordCount的结果进行排序,取出top3
        val sortResult1: Array[(String, Int)] = result
          .sortBy(_._2, false) //按照数量降序排列
          .take(3)//取出前3个
    
        //result.map(t=>(t._2,t._1))
        val sortResult2: Array[(Int, String)] = result.map(_.swap)
          .sortByKey(false)//按照数量降序排列
          .take(3)//取出前3个
    
        val sortResult3: Array[(String, Int)] = result.top(3)(Ordering.by(_._2)) //topN默认就是降序
    
        result.unpersist()
    
        //TODO 3.sink/输出
        result.foreach(println)
        //(hello,4)
        //(you,2)
        //(me,1)
        //(her,3)
        sortResult1.foreach(println)
        //(hello,4)
        //(her,3)
        //(you,2)
        sortResult2.foreach(println)
        //(4,hello)
        //(3,her)
        //(2,you)
        sortResult3.foreach(println)
        //(hello,4)
        //(her,3)
        //(you,2)
    
      }
    }

    RDD的checkpoint

    API

     

    代码实现

    注意:缓存/持久化和Checkpoint检查点的区别

    1.存储位置

    缓存/持久化数据存默认存在内存, 一般设置为内存+磁盘(普通磁盘)

    Checkpoint检查点:一般存储在HDFS

    2.功能

    缓存/持久化:保证数据后续使用的效率高

    Checkpoint检查点:保证数据安全/也能一定程度上提高效率

    3.对于依赖关系:

    缓存/持久化:保留了RDD间的依赖关系

    Checkpoint检查点:不保留RDD间的依赖关系

    4.开发中如何使用?

    对于计算复杂且后续会被频繁使用的RDD先进行缓存/持久化,再进行Checkpoint

    sc.setCheckpointDir("./ckp")//实际中写HDFS目录
    rdd.persist(StorageLevel.MEMORY_AND_DISK)
    rdd.checkpoint()
    //频繁操作rdd
    result.unpersist()//清空缓存

    共享变量

     

    package cn.itcast.core
    
    import java.lang
    
    import org.apache.commons.lang3.StringUtils
    import org.apache.spark.broadcast.Broadcast
    import org.apache.spark.rdd.RDD
    import org.apache.spark.util.LongAccumulator
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * Author itcast
     * Desc 演示RDD的共享变量
     */
    object RDDDemo11_ShareVariable{
      def main(args: Array[String]): Unit = {
        //TODO 0.env/创建环境
        val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        sc.setLogLevel("WARN")
    
        //需求:
        // 以词频统计WordCount程序为例,处理的数据word2.txt所示,包括非单词符号,
        // 做WordCount的同时统计出特殊字符的数量
        //创建一个计数器/累加器
        val mycounter: LongAccumulator = sc.longAccumulator("mycounter")
        //定义一个特殊字符集合
        val ruleList: List[String] = List(",", ".", "!", "#", "$", "%")
        //将集合作为广播变量广播到各个节点的内存中
        val broadcast: Broadcast[List[String]] = sc.broadcast(ruleList)
    
        //TODO 1.source/加载数据/创建RDD
        val lines: RDD[String] = sc.textFile("data/input/words2.txt")
    
        //TODO 2.transformation
        val wordcountResult: RDD[(String, Int)] = lines.filter(StringUtils.isNoneBlank(_))
          .flatMap(_.split("\s+"))
          .filter(ch => {
            //获取广播数据
            val list: List[String] = broadcast.value
            if (list.contains(ch)) { //如果是特殊字符
              mycounter.add(1)
              false
            } else { //是单词
              true
            }
          }).map((_, 1))
          .reduceByKey(_ + _)
    
        //TODO 3.sink/输出
        wordcountResult.foreach(println)
        val chResult: lang.Long = mycounter.value
        println("特殊字符的数量:"+chResult)
    
      }
    }

    words2.txt

    hadoop spark # hadoop spark spark
    mapreduce ! spark spark hive !

    支持的数据源-JDBC

    package cn.itcast.core
    
    import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
    
    import org.apache.spark.rdd.{JdbcRDD, RDD}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * Author itcast
     * Desc 演示RDD的外部数据源
     */
    object RDDDemo13_DataSource2{
      def main(args: Array[String]): Unit = {
        //TODO 0.env/创建环境
        val conf: SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        sc.setLogLevel("WARN")
    
        //TODO 1.source/加载数据/创建RDD
        //RDD[(姓名, 年龄)]
        val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("jack", 18), ("tom", 19), ("rose", 20)))
    
        //TODO 2.transformation
        //TODO 3.sink/输出
        //需求:将数据写入到MySQL,再从MySQL读出来
        /*
    CREATE TABLE `t_student` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `name` varchar(255) DEFAULT NULL,
      `age` int(11) DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
         */
    
        //写到MySQL
        //dataRDD.foreach()
        dataRDD.foreachPartition(iter=>{
          //开启连接--有几个分区就开启几次
          //加载驱动
          //Class.forName("com.mysql.jdbc.Driver")
          val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","123456")
          val sql:String = "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (NULL, ?, ?);"
          val ps: PreparedStatement = conn.prepareStatement(sql)
          iter.foreach(t=>{//t就表示每一条数据
            val name: String = t._1
            val age: Int = t._2
            ps.setString(1,name)
            ps.setInt(2,age)
            ps.addBatch()
            //ps.executeUpdate()
          })
          ps.executeBatch()
          //关闭连接
          if (conn != null) conn.close()
          if (ps != null) ps.close()
        })
    
        //从MySQL读取
        /*
        sc: SparkContext,
        getConnection: () => Connection, //获取连接对象的函数
        sql: String,//要执行的sql语句
        lowerBound: Long,//sql语句中的下界
        upperBound: Long,//sql语句中的上界
        numPartitions: Int,//分区数
        mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _) //结果集处理函数
         */
        val  getConnection = () => DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","123456")
        val sql:String = "select id,name,age from t_student where id >= ? and id <= ?"
        val mapRow: ResultSet => (Int, String, Int) = (r:ResultSet) =>{
          val id: Int = r.getInt("id")
          val name: String = r.getString("name")
          val age: Int = r.getInt("age")
          (id,name,age)
        }
        val studentTupleRDD: JdbcRDD[(Int, String, Int)] = new JdbcRDD[(Int,String,Int)](
          sc,
          getConnection,
          sql,
          0,
          3,
          1,
          mapRow
        )
        studentTupleRDD.foreach(println)
      }
    }

     

     

     

     

  • 相关阅读:
    Spring Security11、登录用户管理
    Spring Security10、账号登录并发控制
    win10下怎么打开notepad++多个实例
    gnu make
    es学习记录
    upj
    JConsole连接远程Java进程
    常用的几种成本核算方法
    SQL SERVER 统计字符串中某字符出现的次数
    centos误删除系统自带的python解决方法
  • 原文地址:https://www.cnblogs.com/a155-/p/14464273.html
Copyright © 2011-2022 走看看