zoukankan      html  css  js  c++  java
  • Spark算子使用

    一、spark的算子分类

      转换算子和行动算子

      转换算子:在使用的时候,spark是不会真正执行,直到需要行动算子之后才会执行。在spark中每一个算子在计算之后就会产生一个新的RDD。

    二、在编写spark程序的时候,会遇到可以通过spark算子完成的操作,同时,scala原生语法也可以完成的操作是,两者的区别是什么?

      scala在执行语句的时候是在JVM进程执行,所有的计算全是在JVM中通过相应的调度完成。
      而spark的RDD执行时,是通过分布式计算的方式完成。

    三、转换算子的使用

    map算子:
     object suanziTest {
        def main(args: Array[String]): Unit = {
                val conf = new SparkConf().setAppName("sparkTest").setMaster("local")
                     val sc = new SparkContext(conf)
                      val maprdd: RDD[Int] = sc.makeRDD(1.to(10))
                      //方式一:spark
                      //    val result = maprdd.map(_*10)
                      //    result.foreach(println(_))
                      //    sc.stop()
                      //方式二:scala
                      val ints = maprdd.collect()
                      var result=for( x <- ints) yield (x*10)
                             result.foreach(println(_))
                              }
        }
    filter算子:
    根据条件返回符合条件的数据,并生成一个新的RDD val conf
    = new SparkConf().setAppName("sparkTest").setMaster("local") val sc = new SparkContext(conf) val maprdd: RDD[Int] = sc.makeRDD(1.to(10)) val unit = maprdd.filter(_%2==0) unit.foreach(println(_)) 比如有一个集合,元素值是1到10,将偶数拿出来对集合中的数值进行同一乘以10 maprdd.filter(_%2==0).map(_*10).foreach(println(_))
    flatMap算子:
    将函数体中计算之后的集合对象,打散(压平) val strings
    = Array("hello java","hello scala") val unit: RDD[String] = sc.makeRDD(strings) //将一行的数据转为一个元素 val unit1: RDD[String] = unit.flatMap(_.split(" ")) //将一行的数据转为一个数组 val unit2: RDD[Array[String]] = unit.map(_.split(" "))
    groupByKey算子:
    将相同Key的值放在同一个序列中(集合的一种) val data
    = List(("za",12),("za",45),("dd",13)) val value: RDD[(String, Int)] = sc.makeRDD(data) val value1: RDD[(String, Iterable[Int])] = value.groupByKey() value1.map(x=>{ var sum:Int=0 for(v <- x._2){ sum+=v } //(x._1,sum) x._1+" "+sum }).foreach(println(_)) //foreach(x=>println(x._1+" "+x._2)) sc.stop() }
    reduceByKey算子:
    将相同的key的值,进行计算之后统一返回
    //前面一个下划线表示的是,每一次叠加之后的结果 val data= List(("za",12),("za",45),("dd",13)) val value: RDD[(String, Int)] = sc.makeRDD(data) value.reduceByKey(_+_).foreach(println(_)) 达到的效果和groupByKey一样
    union算子:
    如果有多个Rdd可以将多个Rdd合并成一个,将后面的rdd的元素,追加到原来的元素,并生成一个新的RDD
    var dataA=List(("zs",30),("zs",50),("ls",30)) var dataB=List(("zs",111),("zs",2222),("ls",3333)) //将基础数据转为RDD val rddA: RDD[(String, Int)] = sc.makeRDD(dataA) val rddB: RDD[(String, Int)] = sc.makeRDD(dataB) val value: RDD[(String, Int)] = rddA.union(rddB) value.foreach(println(_))
    join算子:
    也是发生在两个rdd之上的。其原理与sql中的inner join一致 将RDDA中的第一个元素拿出来,和RDDB的第一个元素进行匹配,如果KEY相同的话将会组合成一个新的RDD元素(key,(value1,value2)) 如果RDDA中有一个元素在RDDB中没有一个匹配的话,将会出现什么结果? 没有匹配的话将不会显示出来,等于mysql中的inner join方式
    var dataA=List(("zs",30),("zs",50),("ls",30),("ww",100)) var dataB=List(("zs",111),("zs",2222),("ls",3333),("zl",3000)) //将基础数据转为RDD val rddA: RDD[(String, Int)] = sc.makeRDD(dataA) val rddB: RDD[(String, Int)] = sc.makeRDD(dataB) val value: RDD[(String, (Int, Int))] = rddA.join(rddB) value.foreach(println(_)) 结果:
    (ls,(
    30,3333)) (zs,(30,111)) (zs,(30,2222)) (zs,(50,111)) (zs,(50,2222))
    mapValues算子:
    对一个map类型中的value值进行统一操作
    var dataA=List(("zs",30),("zs",50),("ls",30),("ww",100)) 一、//sc.makeRDD(dataA).map(x=>(x._1,x._2*10)).foreach(println(_)) 二、sc.makeRDD(dataA).mapValues(_*10).foreach(println(_))
    partitionBy算子:
    如果有自定义分区的需求的话,可以采用该方式进行处理 如果只需要改变分区的数量的话,有没有必要做自定义分区? 可以采用repartiton(
    3)算子来进行处理,3为分区数

    如何做自定义分区?
    1、创建自定义分区类
    import org.apache.spark.Partitioner
      class MyPartition(val numPartition:Int) extends Partitioner{
      //定义有多少个分区
      override def numPartitions = {
      numPartition;
      }

      //定义分区的规则
    override def getPartition(key: Any) = {
      val values: String = key.toString
      if(values.startsWith("135")){
        0
      }else{
        1
      }
      }
    }

    2、object suanziTest {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("sparkTest").setMaster("local")
        val sc = new SparkContext(conf)
        var dataA=List(("zs",30),("zs",50),("ls",30),("ww",100))
        //原来分区
        println( sc.makeRDD(dataA).getNumPartitions)
        //使用自定义分区
        val partiton = new MyPartiton(2)
        println(sc.makeRDD(dataA).partitionBy(partiton).getNumPartitions)
        sc.makeRDD(dataA).partitionBy(partiton).foreach(println(_))
        sc.stop()
      }
    }

    四、行动算子的使用

    对于spark来说,当遇到行动算子的时候,才算是真正开始执行。

    count:
    统计RDD中有多少个元素 println(sc.makeRDD(dataA).count())
    collect:将RDD转为scala中的数组
    val value: RDD[Int] = sc.makeRDD(dataA)
    val ints: Array[Int] = value.collect()
    注:有时候在传递参数的时候,人家要求要一个数组,而自己手里面只有一个RDD的时候,就可以采用这种方式【相当于一种特殊的类型转换】
    reduce:
    与scala中reduce一样,要求的格式不是一个key、value结构 对于用于reduce算子的,只能有值,spark中的reduce没有类型上的要求 val data
    = 1.to(10) val unit = sc.makeRDD(data) println(unit.reduce(_ + _))
    lookup:
    要求的RDD类型必须是一个key、value类型 val strings
    = List("za","ds","fd") val unit = sc.makeRDD(strings) //将元素转换为元组,并从RDD中找到key为za的元素 val ints = unit.map((_,1)).lookup("za") ints.foreach(println(_)) 或者:unit.map((_,1)).foreach(x=>{ if(x._1.equals("za")){ println(x._2) } }) 或者 val va: RDD[(String, Int)] = unit.map((_,1)).filter(_._1.equals("za")) va.map((_._2)).foreach(println(_)) 问题:lookup和直接使用foreach的区别是什么?

    foreach的方式:拿出每一个元素,通过if条件的方式进行比较,适合条件的进行输出,
    而lookup针对于多个分区的时候,会先将需要查询的值(如"za"先进行分区计算--可以定位za具体在哪一个分区中)那么查询该值的时候,就只需要从该分区中拿到值。
    相同之处:如果一个RDD只有一个分区的时候,那么foreach等于lookup的操作,如果多条件查询的话,lookup会需要进行多次的分区操作,而foreach只需要进行一次

    sortBy:排序
    val tuples = sc.makeRDD(Array(("cc",12),("bb",32),("cc",22),("aa",18),("bb",16),("dd",16),("ee",54),("cc",1),("ff",13),("gg",32),("bb",4)))
    // 统计key出现的次数
    val counts = tuples.reduceByKey(_+_)
    // 按照value进行降序排序
    val sorts = counts.sortBy(_._2,false).foreach(println(_))
    val unit = 1.to(10)
    sc.makeRDD(unit).sortBy(x=>x,false).foreach(println(_))
    take算子:
    取出rdd中的前三个元素 sc.makeRDD(dataA).sortBy(x
    =>x,false).take(3).foreach(println(_)) take(3)表示取出RDD的前三个元素 sortBy(x=>x,false)排序
    first算子:
    等于take(1) 拿出rdd的第一个元素 tuples.first()
    saveAsTextFile:
    将结果输出到指定的目录中 val unit
    = 1.to(10) sc.makeRDD(unit).saveAsTextFile("d:/out")
    saveAsSequenceFile(序列化文件):
    将结果输出到指定的目录中,而且文件的类型为SequenceFile 要求为 RDD的元素必须由key-value对组成 sc.makeRDD(unit).map((_,1)).saveAsSequenceFile("D:/OUT") map((_,1))转为key-value形式
  • 相关阅读:
    Django实战—权限管理系统rbac组件实现
    Django模型层的DateTimeField、DateField字段设置时间格式为显示当前年月日时分秒的时间格式及时区
    mysql数据库删除一条数据之后,主键id不连续的问题解决
    python多继承(super().__init__())、*args和**kwargs、
    Django数据库操作中You are trying to add a non-nullable field 'name' to contact without a default错误处理
    配置等模版
    【SSM】(一)SSM整合-增删改查书籍
    【SpringMVC】(八)使用Ajax前后端传数据&不使用Ajax
    【SpringMVC】@RequestMapping注意点
    leetcode (堆->中级) 264,313,347,373,378,767,1642,973,1673,743,787
  • 原文地址:https://www.cnblogs.com/yfb918/p/10669445.html
Copyright © 2011-2022 走看看