zoukankan      html  css  js  c++  java
  • Spark算子(一)Transformat

    常用的一些简单算子:

    map(func)

    返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

    flatMap(func)

    类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

    partitionBy

     对RDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 

    否则会生成ShuffleRDD. 

    mapPartitions(func)

    类似于map,但独立地在RDD的每一个分片上运行,因此在类型为TRDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]

    假设有N个元素,有M个分区,那么map的函数的将被调用N,mapPartitions被调用M,一个函数一次处理所有分区

    mapPartitionsWithIndex(func)

    类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为TRDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]

    举个栗子:

     1 scala> val rdd = sc.parallelize(List(("kpop","female"),("zorro","male"),("mobin","male"),("lucy","female")))
     2 rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[16] at parallelize at <console>:24
     3 
     4 scala> :paste
     5 // Entering paste mode (ctrl-D to finish)
     6 
     7 def partitionsFun(iter : Iterator[(String,String)]) : Iterator[String] = {
     8   var woman = List[String]()
     9   while (iter.hasNext){
    10     val next = iter.next()
    11     next match {
    12        case (_,"female") => woman = next._1 :: woman
    13        case _ =>
    14     }
    15   }
    16   woman.iterator
    17 }
    18 
    19 // Exiting paste mode, now interpreting.
    20 
    21 partitionsFun: (iter: Iterator[(String, String)])Iterator[String]
    22 
    23 scala> val result = rdd.mapPartitions(partitionsFun)
    24 result: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at mapPartitions at <console>:28
    25 
    26 scala> result.collect()
    27 res13: Array[String] = Array(kpop, lucy)
    自定义分区

    sample(withReplacement, fraction, seed)

    以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样seed用于指定随机数生成器种子。例子RDD中随机且有放回的抽出50%的数据,随机种子值为3(即可能以1 2 3的其中一个起始值)

    takeSample

    Sample的区别是:takeSample返回的是最终的结果集合。

     

    union(otherDataset)

     

    对源RDD和参数RDD求并集后返回一个新的RDD

     

    intersection(otherDataset)

     对源RDD和参数RDD求交集后返回一个新的RDD

    distinct([numTasks]))

    对源RDD进行去重后返回一个新的RDD. 默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。

     reduceByKey(func, [numTasks])

    在一个(K,V)RDD上调用返回一个(K,V)RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置

     

    groupByKey

     

    groupByKey也是对每个key进行操作,但只生成一个sequence

    group.collect()

    res1: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))

     

    combineByKey[C](  

    createCombiner: V => C,  

    mergeValue: (C, V) => C,  

    mergeCombiners: (C, C) => C) 

     

    对相同K,把V合并成一个集合.

    createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就 和之前的某个元素的键相同。如果这是一个新的元素,combineByKey() 会使用一个叫作 createCombiner() 的函数来创建 
    那个键对应的累加器的初始值

    mergeValue: 如果这是一个在处理当前分区之前已经遇到的键, 它会使用 mergeValue() 方法将该键的累加器对应的当前值与这个新的值进行合并

    mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。

       
  • 相关阅读:
    Error creating bean with name 'entityManagerFactory' defined in class path resource
    com.sun.org.apache.regexp.internal不存在
    [Kubernetes] Best Practices/Design Patterns for Kubernetes in Production
    [Tools] Kill the process running on port 8080
    [AWS] Lab: Docker and CodeCommit Part 1
    [AWS] Lab: Docker with CodeCommit & CodeBuild Part2
    [AWS] Lab: Launching an EKS Cluster
    [AWS] Lab: Configure and Work with CodeCommit from the CLI
    [DevOps] Environment Variables with Travis
    [DevOps] CI/CD Benefits
  • 原文地址:https://www.cnblogs.com/Vowzhou/p/10842853.html
Copyright © 2011-2022 走看看