zoukankan      html  css  js  c++  java
  • spark 常用技巧总结

    解析url

    scala> import java.net.URL
    import java.net.URL

    scala> val urlstr="http://www.baidu.com:8899/getUsername?userid=110&sysId=552"

    urlstr: String = http://www.baidu.com:8899/getUsername?userid=110&sysId=552

    scala> val aa=new URL(urlstr)

    aa: java.net.URL = http://www.baidu.com:8899/getUsername?userid=110&sysId=552


    scala> aa.getHost
    res262: String = www.baidu.com

    scala> aa.getPort
    res263: Int = 8899


    scala> aa.getPath
    res264: String = /getUsername

    ---------------------

    [root@host tmpdata]# hdfs dfs -cat /spark/log/web.log(文本两列以空格分割,分表表示访客ID,以及范文URL)
    10001 http://www.baidu.com/car
    a10001 http://www.baidu.com/book
    a10001 http://www.baidu.com/book
    a10001 http://www.baidu.com/book
    a10001 http://www.baidu.com/music
    a10001 http://www.baidu.com/music
    a10001 http://www.baidu.com/movie
    a10001 http://www.baidu.com/movie
    a10001 http://www.baidu.com/movie
    a10001 http://www.baidu.com/movie
    a10001 http://www.baidu.com/movie
    a10001 http://www.baidu.com/yule
    a10001 http://www.baidu.com/yule
    a10002 http://www.baidu.com/car
    a10002 http://www.baidu.com/yule
    a10002 http://www.baidu.com/yule
    a10002 http://www.baidu.com/book
    a10002 http://www.baidu.com/car
    a10002 http://www.baidu.com/music
    a10002 http://www.baidu.com/car
    a10002 http://www.baidu.com/car
    a10002 http://www.baidu.com/car
    a10002 http://www.baidu.com/movie
    a10002 http://www.baidu.com/movie
    a10002 http://www.baidu.com/yule
    a10002 http://www.baidu.com/yule

    scala> import java.net.URL
    import java.net.URL
    scala> val weblogrdd=sc.textFile("hdfs://localhost:9000/spark/log/web.log")
    weblogrdd: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/spark/log/web.log MapPartitionsRDD[301] at textFile at <console>:53

    scala> weblogrdd.count
    res282: Long = 26

    scala> val useridrdd=weblogrdd.map(x=>{val aa=x.split(" ");(aa(0),1)})

    useridrdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[310] at map at <console>:59

    scala> useridrdd.reduceByKey(_+_).collect  //每个访客的访问次数

    res291: Array[(String, Int)] = Array((10001,1), (a10002,13), (a10001,12))

    //以下实现每个访客每个分类的访问次数

    scala> weblogrdd.map(_.split(" ")).map(x=>{val url=new URL(x(1));val path=url.getPath.substring(1);(x(0),path)}).map(x=>(x,1)).reduceByKey(_+_).collect
    res313: Array[((String, String), Int)] = Array(((a10001,movie),5), ((a10002,movie),2), ((a10002,car),5), ((a10002,music),1), ((a10001,yule),2), ((a10001,book),3), ((a10002,yule),4), ((10001,car),1), ((a10001,music),2), ((a10002,book),1))

    语句解析:map(_.split(" ")) 按空格拆分字段

    map(x=>{val url=new URL(x(1));val path=url.getPath.substring(1);(x(0),path)}) 获取url的路径,形成新的RDD(访客ID,类别)

    map(x=>(x,1)) 以(访客ID,类别)为键生成新的键值对RDD

    scala> val redrdd=weblogrdd.map(_.split(" ")).map(x=>{val url=new URL(x(1));val path=url.getPath.substring(1);(x(0),path)}).map(x=>(x,1)).reduceByKey(_+_)

    redrdd: org.apache.spark.rdd.RDD[((String, String), Int)] = ShuffledRDD[411] at reduceByKey at <console>:56

    scala> redrdd.sortBy(_._2,false).collect

    res335: Array[((String, String), Int)] = Array(((a10001,movie),5), ((a10002,car),5), ((a10002,yule),4), ((a10001,book),3), ((a10002,movie),2), ((a10001,yule),2), ((a10001,music),2), ((a10002,music),1), ((10001,car),1), ((a10002,book),1))


    scala> val grouprdd=redrdd.groupBy(_._1._2)
    grouprdd: org.apache.spark.rdd.RDD[(String, Iterable[((String, String), Int)])] = ShuffledRDD[431] at groupBy at <console>:58

    scala> grouprdd.collect
    res345: Array[(String, Iterable[((String, String), Int)])] = Array((car,CompactBuffer(((a10002,car),5), ((10001,car),1))), (movie,CompactBuffer(((a10001,movie),5), ((a10002,movie),2))), (book,CompactBuffer(((a10001,book),3), ((a10002,book),1))), (music,CompactBuffer(((a10002,music),1), ((a10001,music),2))), (yule,CompactBuffer(((a10001,yule),2), ((a10002,yule),4))))

    scala> grouprdd.mapValues(_.toList.sortBy(_._2).reverse.take(1)).collect
    res351: Array[(String, List[((String, String), Int)])] = Array((car,List(((a10002,car),5))), (movie,List(((a10001,movie),5))), (book,List(((a10001,book),3))), (music,List(((a10001,music),2))), (yule,List(((a10002,yule),4))))

    -----------------------

    Operations which can cause a shuffle include repartition operations like repartition and coalesce, ‘ByKey operations (except for counting) likegroupByKey and reduceByKey, and join operations like cogroup and join.

    可能导致洗牌的操作包括重新划分操作,如 repartition 和coalesce,' ByKey操作(除了计数操作),如groupbykey和reduceByKey,以及联合操作,如联合组和连接。

    Datasets and DataFrames

    A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar.

    A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.

    Throughout this document, we will often refer to Scala/Java Datasets of Rows as DataFrames.

    ----------------------

    spark作为一个分布式计算系统,可以替代mapreduce,目前来说无法代替hadoop,spark已经内置了scala

    scala也有flatmap,但是针对集合进行处理;spark是对分布式RDD进行处理,操作对象不一样

    resilient distributed dataset (RDD)

    There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

    All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz")

    --------------------------

    scala> val r1=sc.parallelize(List(List("ad d ge","tt g a"),List("j h k e","t eqe gg")))
    r1: org.apache.spark.rdd.RDD[List[String]] = ParallelCollectionRDD[475] at parallelize at <console>:54

    scala> r1.flatMap(_.flatMap(_.split(" "))).collect
    res433: Array[String] = Array(ad, d, ge, tt, g, a, j, h, k, e, t, eqe, gg)

    Option[A] (sealed trait) 有两个取值:
        1. Some[A] 有类型A的值
        2. None 没有值
     

    RDD(k,v) 才能join操作

    scala> rdd100 union rdd101

    res448: org.apache.spark.rdd.RDD[String] = UnionRDD[501] at union at <console>:61

    scala> val gg=rdd100 union rdd101

    gg: org.apache.spark.rdd.RDD[String] = UnionRDD[502] at union at <console>:60

    scala> gg.collect

    res449: Array[String] = Array(ad, d, ge, tt, g, a, j, h, k, e, t, eqe, gg, ddag, 5, agage, gg)

    scala> rdd100.union(rdd101).collect

    res450: Array[String] = Array(ad, d, ge, tt, g, a, j, h, k, e, t, eqe, gg, ddag, 5, agage, gg)

    ------------------------------------

    scala> val rdd110=sc.makeRDD(Array(("wang",10),("zhang",5),("wang",8),("li",800),("wang",10)))
    rdd110: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[510] at makeRDD at <console>:54

    scala> rdd110.reduceByKey(_+_).collect
    res455: Array[(String, Int)] = Array((zhang,5), (li,800), (wang,28))


    scala> rdd110.groupByKey().map({case(x,y)=>(x,y.sum)}).collect
    res460: Array[(String, Int)] = Array((zhang,5), (li,800), (wang,28))

    scala> rdd110.groupByKey().map(x=>(x._1,x._2.sum)).collect
    res484: Array[(String, Int)] = Array((zhang,5), (li,800), (wang,28))

    scala> rdd110.groupByKey().mapValues(_.sum).collect
    res466: Array[(String, Int)] = Array((zhang,5), (li,800), (wang,28))

      def mapValues[U](f: Int => U): org.apache.spark.rdd.RDD[(String, U)]

    对键值对每个value都应用一个函数,但是,key不会发生变化。

    scala> rdd110.groupByKey().collect

    res474: Array[(String, Iterable[Int])] = Array((zhang,CompactBuffer(5)), (li,CompactBuffer(800)), (wang,CompactBuffer(10, 8, 10)))

    scala> rdd110.groupByKey().mapValues(_.sum).collect

    res475: Array[(String, Int)] = Array((zhang,5), (li,800), (wang,28))


    scala> rdd110.collect
    res473: Array[(String, Int)] = Array((wang,10), (zhang,5), (wang,8), (li,800), (wang,10))

    scala> rdd110.mapValues(_+10).collect
    res472: Array[(String, Int)] = Array((wang,20), (zhang,15), (wang,18), (li,810), (wang,20))

    -------------------------------------------------

    ForeachForeachPartition都是在每个partition中对iterator进行操作,

    不同的是,foreach是直接在每个partition中直接对iterator执行foreach操作,而传入的function只是在foreach内部使用,

    foreachPartition是在每个partition中把iterator给传入的function,function自己对iterator进行处理(可以避免内存溢出).

    ---------------------------------

     两个方法的差异就在于有没有 key这个单词,所以说:groupBy()方法是根据用户自定义的情况进行分组,而groupByKey()则是根据key值进行分组的,也就是说,进行groupByKey()方法的数据本身就是一种key-value类型的,并且数据的分组方式就是根据这个key值相同的进行分组的

    ------------------------------------

    mapPartitions

     def mapPartitions[U](f: Iterator[Int] => Iterator[U],preservesPartitioning: Boolean)(implicit evidence$6: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U]

    与map方法类似,map是对rdd中的每一个元素进行操作,而mapPartitions(foreachPartition)则是对rdd中的每个分区的迭代器进行操作。如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。

    SparkSql或DataFrame默认会对程序进行mapPartition的优化。

    scala> val rdd1=sc.makeRDD(1 to 10,3)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:24

    scala> rdd1.collect
    res2: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


    scala> def func2(iter:Iterator[Int]):Iterator[String]= {iter.toList.map(x=>" val:"+x).iterator}
    func2: (iter: Iterator[Int])Iterator[String]

    scala> rdd1.mapPartitions(func2).collect
    res22: Array[String] = Array(" val:1", " val:2", " val:3", " val:4", " val:5", " val:6", " val:7", " val:8", " val:9", " val:10")

     

    def mapPartitionsWithIndex[U](f: (Int, Iterator[Int]) => Iterator[U],preservesPartitioning: Boolean)(implicit evidence$9: scala.reflect.ClassTag[U]): org.apache.spark.rdd.RDD[U]

    函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引。
    scala> def func1(index:Int,iter:Iterator[Int]):Iterator[String]={iter.toList.map(x=>"partid:"+index+" val:"+x).iterator}
    func1: (index: Int, iter: Iterator[Int])Iterator[String]

    scala> rdd1.mapPartitionsWithIndex(func1).collect
    res4: Array[String] = Array(partid:0 val:1, partid:0 val:2, partid:0 val:3, partid:1 val:4, partid:1 val:5, partid:1 val:6, partid:2 val:7, partid:2 val:8, partid:2 val:9, partid:2 val:10)

     

    ----------------------------------------------------------

    和分区有关系

    scala> val rdd1=sc.makeRDD(1 to 10,3)

    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24

    scala> val rdd1=sc.makeRDD(1 to 10,3)

    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at <console>:24

    scala> rdd1.aggregate(0)(_+_,_+_)

    res3: Int = 55

    scala> rdd1.aggregate(5)(_+_,_+_)

    res4: Int = 75

    scala> val rdd1=sc.makeRDD(1 to 10,5)

    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at <console>:24

    scala> rdd1.aggregate(5)(_+_,_+_)

    res5: Int = 85

     --------------------------------------------

    scala> val rdd1=sc.makeRDD(1 to 10,5)

    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at <console>:24

    scala> def func1(index:Int,iter:Iterator[Int]):Iterator[String]={iter.toList.map(x=>"[partid:"+index+" val:"+x+"]").iterator}
    func1: (index: Int, iter: Iterator[Int])Iterator[String]

    scala> rdd1.mapPartitionsWithIndex(func1).collect
    res17: Array[String] = Array([partid:0 val:1], [partid:0 val:2], [partid:1 val:3], [partid:1 val:4], [partid:2 val:5], [partid:2 val:6], [partid:3 val:7], [partid:3 val:8], [partid:4 val:9], [partid:4 val:10])

    scala> rdd1.aggregate(0)(math.max(_,_),_+_) //每个分区的最大值
    res12: Int = 30

    scala> rdd1.aggregate(5)(math.max(_,_),_+_)//每个分区以及5取最大值结果分别为(5,5,6,8,10),各个分区合并再5,5,6,8,10相加再+5=39
    res13: Int = 39

     

    ---------------------

    scala> val rdd2=sc.makeRDD(List("a","b","c","d","e","f"),2)

    rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at makeRDD at <console>:24

    scala> rdd2.aggregate("")(_+_,_+_)

    res18: String = abcdef

    scala> rdd2.aggregate("")(_+_,_+_)

    res19: String = defabc //每个分区一个任务,先完成的任务先返回结果导致每次返回结果次序不一致

    scala> rdd2.aggregate("|")(_+_,_+_)

    res21: String = ||abc|def

    scala> rdd2.aggregate("|")(_+_,_+_)

    res22: String = ||def|abc

    scala> rdd2.aggregate("|")(_+_,_+_)

    res23: String = ||abc|def

    scala> rdd2.aggregate("|")(_+_,_+_)

    res24: String = ||abc|def

     -----------------------------------------------------

    RDD的5个特性

     * Internally, each RDD is characterized by five main properties:
     *
     *  - A list of partitions
     *  - A function for computing each split
     *  - A list of dependencies on other RDDs
     *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
     *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
     *    an HDFS file)

    大数据移动计算和移动数据

    移动计算就是把计算任务下发到数据所在的节点进行处理。

    移动数据就是将数据移动到计算任务的节点,这样将损耗大量网络开销,导致流量激增,处理效率慢。

    -----------------------

    RDD操作分为transfromation和action

    transfromation

    创建rdd的两种方式:从存储介质或者将driver端的一个集合通过并行化方式创建RDD;RDD->RDD

    action

    action执行完成后就没有RDD了,只有结果:

    collect,saveastextfile

    ------------------------------------------分区一 空字符串长度是0,0转换为字符串长度为1,导致比较后,第一分区长度为1;第二分区空字符串长度是0,0转换为字符串长度为1,与空串长度相比,最小长度为0,最终第二分区长度为0,最后合并长度为10或者01

     scala> val rdd3=sc.parallelize(List("12","23","345",""),2)
    rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[15] at parallelize at <console>:24

    scala> rdd3.aggregate("")((x,y)=>math.min(x.length,y.length).toString,_+_)

    res63: String = 01

    scala> rdd3.aggregate("")((x,y)=>math.min(x.length,y.length).toString,_+_)

    res64: String = 01

    scala> rdd3.aggregate("")((x,y)=>math.min(x.length,y.length).toString,_+_)

    res65: String = 10  

    ---------------------------------------------

    scala> val rdd4=sc.makeRDD(List(("cat",5),("dog",5),("cat",10)),2)
    rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[16] at makeRDD at <console>:24

    scala> def func2(index:Int,iter:Iterator[(String,Int)]):Iterator[String]={iter.toList.map(x=>"[partid:"+index+" val:"+x+"]").iterator}

    func2: (index: Int, iter: Iterator[(String, Int)])Iterator[String]

    scala> rdd4.mapPartitionsWithIndex(func2).collect

    res74: Array[String] = Array([partid:0 val:(cat,5)], [partid:1 val:(dog,5)], [partid:1 val:(cat,10)])


    scala> rdd4.aggregateByKey(0)(_+_,_+_).collect
    res76: Array[(String, Int)] = Array((dog,5), (cat,15)) 

    ------------------------------------

    scala> val rdd4=sc.makeRDD(List(("cat",5),("dog",8),("cat",10),("cat",6),("dog",7)),2)

    rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[73] at makeRDD at <console>:24

    scala> rdd4.mapPartitionsWithIndex(func2).collect

    res125: Array[String] = Array([partid:0 val:(cat,5)], [partid:0 val:(dog,8)], [partid:1 val:(cat,10)], [partid:1 val:(cat,6)], [partid:1 val:(dog,7)])

    scala> rdd4.aggregateByKey(0)(math.max(_,_),_+_).collect

    res126: Array[(String, Int)] = Array((dog,15), (cat,15)) //分区1按键取出最大值(cat,5),(dog,8),分区2按键取出最大值(cat,10),(dog,7),各个分区的结果再聚合

    ------------------------

    scala> val rdd4=sc.makeRDD(List(("cat",5),("dog",8),("cat",10),("cat",6),("dog",7),("aa",3)),2)

    rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[77] at makeRDD at <console>:24

    scala> rdd4.mapPartitionsWithIndex(func2).collect

    res128: Array[String] = Array([partid:0 val:(cat,5)], [partid:0 val:(dog,8)], [partid:0 val:(cat,10)], [partid:1 val:(cat,6)], [partid:1 val:(dog,7)], [partid:1 val:(aa,3)])

    scala> rdd4.aggregateByKey(100)(math.max(_,_),_+_).collect// 两个分区,根据键值以及初始值100求最大值,分别得出(cat,100),(dog,100)以及(cat,100),(dog,100),(aa,100),然后再聚合

    res129: Array[(String, Int)] = Array((aa,100), (dog,200), (cat,200))

    --------------------------------------------------

      def repartition(numPartitions: Int)(implicit ord: Ordering[String]): org.apache.spark.rdd.RDD[String]

    repartition调用的是coalesce,shuffle为true

      def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
        coalesce(numPartitions, shuffle = true)
      }

    scala> val rdd3=sc.parallelize(List("12","23","345",""),2)

    rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[106] at parallelize at <console>:24

    scala> rdd3.partitions.size

    res241: Int = 2

    scala> rdd5.repartition(5).partitions.size

    res242: Int = 5

    scala> rdd5.repartition(3).partitions.size

    res243: Int = 3

    def coalesce(numPartitions: Int,shuffle: Boolean,partitionCoalescer: Option[org.apache.spark.rdd.PartitionCoalescer])(implicit ord: Ordering[String]): org.apache.spark.rdd.RDD[String]

    coalesce shuffle默认为false

    scala> rdd5.coalesce(4,false).partitions.size

    res249: Int = 2

    scala> rdd5.coalesce(4,true).partitions.size

    res250: Int = 4

    scala> rdd3.coalesce(8).partitions.size

    res18: Int = 5

    scala> rdd3.coalesce(8,true).partitions.size

    res19: Int = 8

    --------------------------------------------

    scala> val rdd00=sc.makeRDD(List(("a",1),("b",1),("a",3),("b",3),("b",1)))

    rdd00: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[40] at makeRDD at <console>:24

    scala> rdd00.countByValue()

    res22: scala.collection.Map[(String, Int),Long] = Map((a,3) -> 1, (b,1) -> 2, (b,3) -> 1, (a,1) -> 1)

    scala> rdd00.countByKey()

    res23: scala.collection.Map[String,Long] = Map(a -> 2, b -> 3)

    --------------------------------

      def filterByRange(lower: String,upper: String): org.apache.spark.rdd.RDD[(String, Int)]

    scala> val rdd00=sc.makeRDD(List(("a",1),("b",1),("a",3),("ba",3),("b",1),("g",10)))

    rdd00: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[48] at makeRDD at <console>:24

    scala> rdd00.filterByRange("b","f").collect

    res26: Array[(String, Int)] = Array((b,1), (ba,3), (b,1))、

    -----------------------------

    scala> val rdd00=sc.makeRDD(List(("a","20 50"),("b","10 20"),("a","3")))

    rdd00: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[56] at makeRDD at <console>:24

    scala> rdd00.flatMapValues(_.split(" ")).collect //对键值进行flatmap

    res34: Array[(String, String)] = Array((a,20), (a,50), (b,10), (b,20), (a,3))

    scala> val rdd00=sc.makeRDD(List(("a",1),("b",1),("a",3),("ba",3),("b",1),("g",10)))

    rdd00: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[61] at makeRDD at <console>:24

    scala> rdd00.foldByKey(0)(_+_).collect

    res37: Array[(String, Int)] = Array((a,4), (b,2), (ba,3), (g,10))

    -------------------------------------

    scala> val rdd00=sc.makeRDD(List(("a",1),("b",1),("a",3),("ba",3),("b",1),("g",10)))

    rdd00: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[69] at makeRDD at <console>:24

    scala> val aa=rdd00.map(x=>(x._2.toString,x._1))

    aa: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[70] at map at <console>:26

    scala> aa.collect

    res41: Array[(String, String)] = Array((1,a), (1,b), (3,a), (3,ba), (1,b), (10,g))

    scala> aa.foldByKey("")(_+_).collect

    res42: Array[(String, String)] = Array((1,abb), (3,aba), (10,g))

    -------------------------

    fold与reduce功能相同,只不过fold需要制定初始值

    ------------------------

    scala> val rdd00=sc.makeRDD(List(("a",1),("b",1),("a",3),("ba",3),("b",1),("g",10)))

    rdd00: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[73] at makeRDD at <console>:24

    scala> rdd00.collectAsMap //只有键值对类型的rdd才能使用collectAsMap

    res45: scala.collection.Map[String,Int] = Map(b -> 1, g -> 10, a -> 3, ba -> 3)

    -------------------------------

       def keyBy[K](f: String => K): org.apache.spark.rdd.RDD[(K, String)] 

    scala> val hhh=sc.makeRDD(List("apple","egg","tomato","bird","elephant"))

    hhh: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[77] at makeRDD at <console>:24

    scala> val hhha=hhh.map(x=>(x,x.length))

    hhha: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[78] at map at <console>:26

    scala> hhha.collect

    res48: Array[(String, Int)] = Array((apple,5), (egg,3), (tomato,6), (bird,4), (elephant,8))

    scala> hhh.keyBy(_.length).collect //以长度为key
    res49: Array[(Int, String)] = Array((5,apple), (3,egg), (6,tomato), (4,bird), (8,elephant))

    mapValues(func)

    对键值对每个value都应用一个函数,但是,key不会发生变化。

    --------------------------------------------------------

    scala> val rdd00=sc.makeRDD(List(("a",1),("b",1),("a",3),("ba",3),("b",1),("g",10)))

    rdd00: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[15] at makeRDD at <console>:24

    scala> rdd00.mapPartitions(it=>it.map(x=>(x._1,x._2*10))).collect

    res18: Array[(String, Int)] = Array((a,10), (b,10), (a,30), (ba,30), (b,10), (g,100))

    -----------------------------

    map(function)
    map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

    mapPartitions(function)
    map()的输入函数是应用于RDD中每个元素,而mapPartitions()的输入函数是应用于每个分区

    mapValues(function)
    原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。

    flatMap(function)
    与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素

    scala> rdd00.mapPartitions(it=>it.map(x=>(x._1,0 to x._2))).collect
    res20: Array[(String, scala.collection.immutable.Range.Inclusive)] = Array((a,Range(0, 1)), (b,Range(0, 1)), (a,Range(0, 1, 2, 3)), (ba,Range(0, 1, 2, 3)), (b,Range(0, 1)), (g,Range(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))

    -----------------------------------------

       def foreachPartition(f: Iterator[(String, Int)] => Unit): Unit

    scala> rdd00.foreachPartition(it=>it.foreach(x=>println(x)))
    (ba,3)
    (b,1)
    (g,10)
    (a,1)
    (b,1)
    (a,3)

    def foreach(f: ((String, Int)) => Unit): Unit

    scala> rdd00.foreach(x=>println(x))
    (ba,3)
    (b,1)
    (g,10)
    (a,1)
    (b,1)
    (a,3)

    ---------------------------------

    scala> rdd00.reduceByKey(_+_).collect

    res28: Array[(String, Int)] = Array((b,2), (a,4), (ba,3), (g,10))

    scala> rdd00.aggregateByKey(0)(_+_,_+_).collect

    res29: Array[(String, Int)] = Array((b,2), (a,4), (ba,3), (g,10))

    scala> rdd00.foldByKey(0)(_+_).collect

    res31: Array[(String, Int)] = Array((b,2), (a,4), (ba,3), (g,10))

  • 相关阅读:
    [jenkins] 启动错误 Failed to start LSB: Jenkins Automation Server.
    SpringBoot AOP注解式拦截与方法规则拦截
    JS !function 稀奇古怪的写法
    业务场景后端优化
    对称加密和非对称加密
    第三篇 -- 余生,我累了,也懂了
    第二篇 -- 关东野客书籍
    第十一篇 -- 关于防火墙的一些问题
    第三十一篇 -- 学习第六十八天打卡20190911
    第二十九篇 -- UDP和TCP
  • 原文地址:https://www.cnblogs.com/playforever/p/9244684.html
Copyright © 2011-2022 走看看