zoukankan      html  css  js  c++  java
  • SparkRDD常用算子

    sparkRDD算子

    分区/只读/依赖/缓存/checkPoint

    Transformation

    map(func)

    返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

    scala> var source  = sc.parallelize(1 to 10)
    source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
    
    scala> source.collect()
    res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    
    scala> val mapadd = source.map(_ * 2)
    mapadd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at <console>:26
    
    scala> mapadd.collect()
    res8: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
    

    filter(func)

    返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成

    scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))
    sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24
    scala> val filter = sourceFilter.filter(_.contains("xiao"))
    filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:26
    scala> sourceFilter.collect()
    res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi)
    scala> filter.collect()
    res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)
    

    flatmap(func)

    类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

    scala> val sourceFlat = sc.parallelize(1 to 5)
    sourceFlat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24
    
    scala> sourceFlat.collect()
    res11: Array[Int] = Array(1, 2, 3, 4, 5)
    
    scala> val flatMap = sourceFlat.flatMap(1 to _)
    flatMap: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at flatMap at <console>:26
    
    scala> flatMap.collect()
    res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)
    

    mapPartition(func)

    类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区

    mapPartitionWithIndex(func)

    类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]

    sample(withReplacement,fraction,seed)

    以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。例子从RDD中随机且有放回的抽出50%的数据,随机种子值为3(即可能以1 2 3的其中一个起始值)

    tabkesample

    和Sample的区别是:takeSample返回的是最终的结果集合。

    union(otherDataset)

    对源RDD和参数RDD求并集后返回一个新的RDD

    scala> val rdd1 = sc.parallelize(1 to 5)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24
    
    scala> val rdd2 = sc.parallelize(5 to 10)
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:24
    
    scala> val rdd3 = rdd1.union(rdd2)
    rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[25] at union at <console>:28
    
    scala> rdd3.collect()
    res18: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)
    

    intersection(otherDataset)

    对源RDD和参数RDD求交集后返回一个新的RDD

    scala> val rdd1 = sc.parallelize(1 to 7)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at <console>:24
    
    scala> val rdd2 = sc.parallelize(5 to 10)
    rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:24
    
    scala> val rdd3 = rdd1.intersection(rdd2)
    rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at intersection at <console>:28
    
    scala> rdd3.collect()
    [Stage 15:=============================>(2 + 2)
    res19: Array[Int] = Array(5, 6, 7)
    

    distinct(numTasks)

    对源RDD进行去重后返回一个新的RDD. 默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。

    scala> val distinctRdd = sc.parallelize(List(1,2,1,5,2,9,6,1))
    distinctRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:24
    
    scala> val unionRDD = distinctRdd.distinct()
    unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[37] at distinct at <console>:26
    
    scala> unionRDD.collect()
    [Stage 16:> (0 + 4) [Stage 16:=============================>(2 + 2) 
    res20: Array[Int] = Array(1, 9, 5, 6, 2)
    
    scala> val unionRDD = distinctRdd.distinct(2)
    unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[40] at distinct at <console>:26
    
    scala> unionRDD.collect()
    res21: Array[Int] = Array(6, 2, 1, 9, 5)
    

    partitionBy

    对RDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD.

    reduceByKey(fun,[numTasks])

    在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置

    groupByKey

    groupByKey也是对每个key进行操作,但只生成一个sequence。

    combinerByKey[C]

    对相同K,把V合并成一个集合.

    aggregateBykey()

    在kv对的RDD中,,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。

    seqOp函数用于在每一个分区中用初始值逐步迭代value,combOp函数用于合并每个分区中的结果****

    foldByKey()

    aggregateByKey的简化操作,seqop和combop相同

    sortByKey([ascending],[numTasks])

    在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

    scala> val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
    rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[14] at parallelize at <console>:24
    
    scala> rdd.sortByKey(true).collect()
    res9: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc))
    
    scala> rdd.sortByKey(false).collect()
    res10: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd))
    

    sortBy(func,[ascending],[numTasks])

    与sortByKey类似,但是更灵活,可以用func先对数据进行处理,按照处理后的数据比较结果排序。

    scala> val rdd = sc.parallelize(List(1,2,3,4))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24
    
    scala> rdd.sortBy(x => x).collect()
    res11: Array[Int] = Array(1, 2, 3, 4)
    
    scala> rdd.sortBy(x => x%3).collect()
    res12: Array[Int] = Array(3, 4, 1, 2)
    

    join(otherDataset,[numTasks])

    在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

    scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
    rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[32] at parallelize at <console>:24
    
    scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(3,6)))
    rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24
    
    scala> rdd.join(rdd1).collect()
    res13: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))
    

    cogroup(otherDataset,[numTasks])

    在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD

    cartsian(otherDataset)

    笛卡尔积

    pipe(command,[envVars])

    对于每个分区,都执行一个perl或者shell脚本,返回输出的RDD

    Shell脚本
    #!/bin/sh
    echo "AA"
    while read LINE; do
       echo ">>>"${LINE}
    done
    注意:shell脚本需要集群中的所有节点都能访问到。
    
    scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),1)
    rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[50] at parallelize at <console>:24
    
    scala> rdd.pipe("/home/bigdata/pipe.sh").collect()
    res18: Array[String] = Array(AA, >>>hi, >>>Hello, >>>how, >>>are, >>>you)
    
    scala> val rdd = sc.parallelize(List("hi","Hello","how","are","you"),2)
    rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[52] at parallelize at <console>:24
    
    scala> rdd.pipe("/home/bigdata/pipe.sh").collect()
    res19: Array[String] = Array(AA, >>>hi, >>>Hello, AA, >>>how, >>>are, >>>you)
    
    pipe.sh:
    #!/bin/sh
    echo "AA"
    while read LINE; do
       echo ">>>"${LINE}
    done
    

    coalesce(numPartitions)

    缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。

    scala> val rdd = sc.parallelize(1 to 16,4)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at parallelize at <console>:24
    
    scala> rdd.partitions.size
    res20: Int = 4
    
    scala> val coalesceRDD = rdd.coalesce(3)
    coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[55] at coalesce at <console>:26
    
    scala> coalesceRDD.partitions.size
    res21: Int = 3
    

    repartition(numPartitions)

    根据分区数重新通过网络对数据随机洗牌

    repartitionAndSortWithPartition(partitioner)

    repartitionAndSortWithinPartitions函数是repartition函数的变种,与repartition函数不同的是,repartitionAndSortWithinPartitions在给定的partitioner内部进行排序,性能比repartition要高。

    golm

    将每一个分区形成一个数组,形成新的RDD类型RDD[Array[T]]

    scala> val rdd = sc.parallelize(1 to 16,4)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24
    
    scala> rdd.glom().collect()
    res25: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))
    

    mapValues

    针对(K,V)形式的类型只对V进行操作

    scala> val rdd3 = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c")))
    rdd3: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[67] at parallelize at <console>:24
    
    scala> rdd3.mapValues(_+"|||").collect()
    res26: Array[(Int, String)] = Array((1,a|||), (1,d|||), (2,b|||), (3,c|||))
    

    subtract

    计算差的一中函数去除两个RDD中相同的元素,不同的RDD保存下来

    scala> val rdd = sc.parallelize(3 to 8)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[70] at parallelize at <console>:24
    
    scala> val rdd1 = sc.parallelize(1 to 5)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[71] at parallelize at <console>:24
    
    scala> rdd.subtract(rdd1).collect()
    res27: Array[Int] = Array(8, 6, 7)
    

    Action

    reduce(func)

    通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的

    collect

    在驱动程序中,以数组的形式返回数据集的所有元素

    count()

    返回RDD的元素个数

    first

    返回RDD的第一个元素(类似于take(1))

    take(n)

    返回一个由数据集的前n个元素组成的数组

    tabkeSample(withReplacement,num, [seed])

    返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子

    takeOrdered(n)

    返回前几个的排序

    aggregate(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)

    aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。

    fold(num)(func)

    折叠操作,aggregate的简化操作,seqop和combop一样。

    saveAsTextFile(path)

    将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

    saveAsSequenceFile(path)

    将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。

    saveAsObjectFile(path)

    用于将RDD中的元素序列化成对象,存储到文件中。

    countByKey()

    针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。

    foreach(func)

    在数据集的每一个元素上,运行函数func进行更新。

    数值RDD统计操作

    方法 含义
    count() RDD中元素个数
    mean() 元素的平均值
    sum() 总和
    max() 最大值
    min() 最小值
    variance() 元素的平方差
    sampleVariance() 采样计算出平方差
    stdev() 标准差
    sampleStdev() 采样计算出标准差

    读数据与保存数据的主要方式

    文本文件输入输出

    scala> sc.textFile("./README.md")
    res6: org.apache.spark.rdd.RDD[String] = ./README.md MapPartitionsRDD[7] at textFile at <console>:25
    
    scala> val readme = sc.textFile("./README.md")
    readme: org.apache.spark.rdd.RDD[String] = ./README.md MapPartitionsRDD[9] at textFile at <console>:24
    
    scala> readme.collect()
    res7: Array[String] = Array(# Apache Spark, "", Spark is a fast and general cluster...
    scala> readme.saveAsTextFile("hdfs://node01:8020/test")
    

    JSON文件输入输出

    scala> import org.json4s._  
    import org.json4s._
    
    scala> import org.json4s.jackson.JsonMethods._  
    import org.json4s.jackson.JsonMethods._
    
    scala> import org.json4s.jackson.Serialization  
    import org.json4s.jackson.Serialization
    
    scala> var result = sc.textFile("examples/src/main/resources/people.json")
    result: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.json MapPartitionsRDD[7] at textFile at <console>:47
    
    scala> implicit val formats = Serialization.formats(ShortTypeHints(List())) 
    formats: org.json4s.Formats{val dateFormat: org.json4s.DateFormat; val typeHints: org.json4s.TypeHints} = org.json4s.Serialization$$anon$1@61f2c1da
    
    scala>  result.collect()
    res3: Array[String] = Array({"name":"Michael"}, {"name":"Andy", "age":30}, {"name":"Justin", "age":19})
    

    CSV文件输入输出

    SequenceFile文件输入输出

    scala> val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee")))
    data: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[16] at parallelize at <console>:24
    
    scala>  data.saveAsSequenceFile("hdfs://node01:8020/sequdata")
    scala> val sdata = sc.sequenceFile[Int,String]("hdfs://node01:8020/sequdata/*")
    sdata: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[19] at sequenceFile at <console>:24
    
    scala> sdata.collect()
    res14: Array[(Int, String)] = Array((2,aa), (3,bb), (4,cc), (5,dd), (6,ee))
    

    对象文件输入输出

    scala> val data=sc.parallelize(List((2,"aa"),(3,"bb"),(4,"cc"),(5,"dd"),(6,"ee")))
    data: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[20] at parallelize at <console>:24
    
    scala> data.saveAsObjectFile("hdfs://node01:8020/objfile")
    scala> import org.apache.spark.rdd.RDD
    import org.apache.spark.rdd.RDD
    
    scala> val objrdd =sc.objectFile[(Int,String)]("hdfs://node01:8020/objfile/p*")
    objrdd: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[28] at objectFile at <console>:25
    
    scala> objrdd.collect()
    res20: Array[(Int, String)] = Array((2,aa), (3,bb), (4,cc), (5,dd), (6,ee))
    

    Hadoop输入输出格式

    scala> import org.apache.hadoop.io._
    import org.apache.hadoop.io._
    scala> val data = sc.parallelize(Array((30,"hadoop"), (71,"hive"), (11,"cat")))
    data: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[47] at parallelize at <console>:35
    
    scala> data.saveAsNewAPIHadoopFile("hdfs://node01:8020/output4/",classOf[LongWritable] ,classOf[Text] ,classOf[org.apache.hadoop.mapreduce.lib.output.TextOutputFormat[LongWritable, Text]])
    

    文件系统的输入输出

    Spark 支持读写很多种文件系统, 像本地文件系统、Amazon S3、HDFS等。

    数据库的输入输出

    mysql读取

    def main (args: Array[String] ) {
      val sparkConf = new SparkConf ().setMaster ("local[2]").setAppName ("JdbcApp")
      val sc = new SparkContext (sparkConf)
    
      val rdd = new org.apache.spark.rdd.JdbcRDD (
        sc,
        () => {
          Class.forName ("com.mysql.jdbc.Driver").newInstance()
          java.sql.DriverManager.getConnection ("jdbc:mysql://localhost:3306/rdd", "root", "hive")
        },
        "select * from rddtable where id >= ? and id <= ?;",
        1,
        10,
        1,
        r => (r.getInt(1), r.getString(2)))
    
      println (rdd.count () )
      rdd.foreach (println (_) )
      sc.stop ()
    }
    

    MySQL写入

    def main(args: Array[String]) {
      val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
      val sc = new SparkContext(sparkConf)
      val data = sc.parallelize(List("Female", "Male","Female"))
    
      data.foreachPartition(insertData)
    }
    
    def insertData(iterator: Iterator[String]): Unit = {
    Class.forName ("com.mysql.jdbc.Driver").newInstance()
      val conn = java.sql.DriverManager.getConnection("jdbc:mysql://localhost:3306/rdd", "root", "admin")
      iterator.foreach(data => {
        val ps = conn.prepareStatement("insert into rddtable(name) values (?)")
        ps.setString(1, data) 
        ps.executeUpdate()
      })
    }
    

    Hbase数据库读取

    def main(args: Array[String]) {
      val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
      val sc = new SparkContext(sparkConf)
    
      val conf = HBaseConfiguration.create()
      //HBase中的表名
      conf.set(TableInputFormat.INPUT_TABLE, "fruit")
    
      val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
        classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
        classOf[org.apache.hadoop.hbase.client.Result])
    
      val count = hBaseRDD.count()
      println("hBaseRDD RDD Count:"+ count)
      hBaseRDD.cache()
      hBaseRDD.foreach {
        case (_, result) =>
          val key = Bytes.toString(result.getRow)
          val name = Bytes.toString(result.getValue("info".getBytes, "name".getBytes))
          val color = Bytes.toString(result.getValue("info".getBytes, "color".getBytes))
          println("Row key:" + key + " Name:" + name + " Color:" + color)
      }
      sc.stop()
    }
    

    Hbase数据库写入

    def main(args: Array[String]) {
      val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
      val sc = new SparkContext(sparkConf)
    
      val conf = HBaseConfiguration.create()
      val jobConf = new JobConf(conf)
      jobConf.setOutputFormat(classOf[TableOutputFormat])
      jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit_spark")
    
      val fruitTable = TableName.valueOf("fruit_spark")
      val tableDescr = new HTableDescriptor(fruitTable)
      tableDescr.addFamily(new HColumnDescriptor("info".getBytes))
    
      val admin = new HBaseAdmin(conf)
      if (admin.tableExists(fruitTable)) {
        admin.disableTable(fruitTable)
        admin.deleteTable(fruitTable)
      }
      admin.createTable(tableDescr)
    
      def convert(triple: (Int, String, Int)) = {
        val put = new Put(Bytes.toBytes(triple._1))
        put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2))
        put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3))
        (new ImmutableBytesWritable, put)
      }
      val initialRDD = sc.parallelize(List((1,"apple",11), (2,"banana",12), (3,"pear",13)))
      val localData = initialRDD.map(convert)
    
      localData.saveAsHadoopDataset(jobConf)
    }
    
  • 相关阅读:
    Android笔记之开机自启
    Android笔记之广播
    Hive笔记之collect_list/collect_set(列转行)
    Hive笔记之数据库操作
    hive笔记之row_number、rank、dense_rank
    Linux Shell管道调用用户定义函数(使shell支持map函数式特性)
    Linux shell爬虫实现树洞网鼓励师(自动回复Robot)
    分享一些免费的接码平台(国外号码)
    爬虫技能之内容提取:如何从有不可见元素混淆的页面中抽取数据
    ctf writeup之程序员密码
  • 原文地址:https://www.cnblogs.com/lishisan/p/11219553.html
Copyright © 2011-2022 走看看