zoukankan      html  css  js  c++  java
  • spark rdd算子和共享变量

    RDD转换算子

    map(function)
    传入的集合元素进行RDD[T]转换 def map(f: T => U): org.apache.spark.rdd.RDD[U]

    scala> sc.parallelize(List(1,2,3,4,5),3).map(item => item*2+" " )
    res1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at map at <console>:25
    
    scala> sc.parallelize(List(1,2,3,4,5),3).map(item => item*2+" " ).collect
    res2: Array[String] = Array("2 ", "4 ", "6 ", "8 ", "10 ")

    filter(func)
    将满足条件结果记录def filter(f: T=> Boolean): org.apache.spark.rdd.RDD[T]

    scala> sc.parallelize(List(1,2,3,4,5),3).filter(item=>item==3).collect
    res3:Array[Int]=Array(3)

    flatMap(func)
    将一个元素转换成元素的数组,然后对数组展开。def flatMap[U](f: T=> TraversableOnce[U]): org.apache.spark.rdd.RDD[U]

    scala> sc.parallelize(List("ni hao","hello spark"),3).flatMap(line=>line.split("\s+")).collect
    res4: Array[String] = Array(ni, hao, hello, spark)

    mapPartitions(func)
    于map类似,但在RDD的每个分区上单独运行,因此当在类型T的RDD上运行时,func必须是Iterator => Iterator 类型
    def mapPartitions[U](f: Iterator[Int] => Iterator[U],preservesPartitioning: Boolean): org.apache.spark.rdd.RDD[U]

    scala> sc.parallelize(List(1,2,3,4,5),3).mapPartitions(items=> for(i<-items;if(i%2==0)) yield i*2 ).collect()
    res7: Array[Int] = Array(4, 8)

    mapPartitionsWithIndex(func)
    于mapPartitions类似,但也为func提供了表示索引的整数值,因此当在类型T的RDD上运行时,func必须时类型(Int,Iterator <T>)=> Iterator <U>
    def mapPartitionsWithIndex[U](f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean): org.apache.spark.rdd.RDD[U]

    scala> sc.parallelize(List(1,2,3,4,5),3).mapPartitionsWithIndex((p,items)=> for(i<-items) yield (p,i)).collect
    res11: Array[(Int, Int)] = Array((0,1), (1,2), (1,3), (2,4), (2,5))

    sample(withReplacement, fraction, seed)
    对数据进行一定比例的采样,使用withReplacement参数控制是否允许重复采样

    scala> sc.parallelize(List(1,2,3,4,5,6,7),3).sample(false,0.7,1L).collect
    res13: Array[Int] = Array(1, 4, 6, 7)

    union(otherDataset)
    返回一个新数据集,其中包含源数据集和参数中元素的并集

    scala> var rdd1=sc.parallelize(Array(("张三",1000),("李四",100),("赵六",300)))
    scala> var rdd2=sc.parallelize(Array(("张三",1000),("王五",100),("温七",300)))
    scala> rdd1.union(rdd2).collect
    res16: Array[(String, Int)] = Array((张三,1000), (李四,100), (赵六,300), (张三,1000), (王五,100), (温七,300))

    intersection(otherDataset)
    返回包含源数据集和参数中元素交集的新RDD

    scala> var rdd1=sc.parallelize(Array(("张三",1000),("李四",100),("赵六",300)))
    scala> var rdd2=sc.parallelize(Array(("张三",1000),("王五",100),("温七",300)))
    scala> rdd1.intersection(rdd2).collect
    res17: Array[(String, Int)] = Array((张三,1000))

    distinct
    去重

    scala> sc.parallelize(List(1,2,3,3,5,7,2),3).distinct.collect
    res19: Array[Int] = Array(3, 1, 7, 5, 2

    groupByKey
    在(K,V)对的数据集上调用时,返回(K,Iterable )对的数据集。 注意:如果要对每个键执行聚合(例如总和或平均值)进行分组,则使用reduceByKey或aggregateByKey将产生更好的性能。 注意:默认情况下,输出中的并行级别取决于父RDD的分区数。您可以传递可选的numPartitions参数来设置不同数量的任务。

    scala> sc.parallelize(List("ni hao","hello spark"),3).flatMap(line=>line.split("\s+")).map(word=>(word,1)).groupByKey(3).map(tuple=>(tuple._1,tuple._2.sum)).collect

    reduceByKey(func,[numpartitions])
    当调用(k,v)对的数据集时,返回(k,v)对的数据集,其中使用给定的reduce函数func集合每个键的值,该函数必须时类型(v,v)=>v

    scala> sc.parallelize(List("ni hao","hello spark"),3).flatMap(line=>line.split("\s+")).map(word=>(word,1)).reduceByKey((v1,v2)=>v1+v2).collect()
    res33: Array[(String, Int)] = Array((hao,1), (hello,1), (spark,1), (ni,1))
    
    scala> sc.parallelize(List("ni hao","hello spark"),3).flatMap(line=>line.split("\s+")).map(word=>(word,1)).reduceByKey(_+_).collect()
    res34: Array[(String, Int)] = Array((hao,1), (hello,1), (spark,1), (ni,1))

    aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
    当调用(k,v)对的数据集时,返回(k,u)对的数据集,其中使用给定的组合函数和中性“零”值聚合每个键的值。允许与输入值类型不同的聚合值类型,同时避免不必要的分配。

    scala> sc.parallelize(List("ni hao","hello spark"),3).flatMap(line=>line.split("\s+")).map(word=>(word,1)).aggregateByKey(0L)((z,v)=>z+v,(u1,u2)=>u1+u2).collect
    res35: Array[(String, Long)] = Array((hao,1), (hello,1), (spark,1), (ni,1))

    sortByKey([ascending], [numPartitions])
    当调用K实现Ordered的(K,V)对数据集时,返回按键升序或降序排序的(K,V)对数据集,如布尔升序参数中所指定。

    scala> sc.parallelize(List("ni hao","hello spark"),3).flatMap(line=>line.split("\s+")).map(word=>(word,1)).aggregateByKey(0L)((z,v)=>z+v,(u1,u2)=>u1+u2).sortByKey(false).collect()
    res37: Array[(String, Long)] = Array((spark,1), (ni,1), (hello,1), (hao,1))

    sortBy(func,[ascending], [numPartitions])
    对(K,V)数据集调用sortBy时,用户可以通过指定func指定排序规则,T => U 要求U必须实现Ordered接口

    scala> sc.parallelize(List("ni hao","hello spark"),3).flatMap(line=>line.split("\s+")).map(word=>(word,1)).aggregateByKey(0L)((z,v)=>z+v,(u1,u2)=>u1+u2).sortBy(_._2,true,2).collect
    res42: Array[(String, Long)] = Array((hao,1), (hello,1), (spark,1), (ni,1))

    join
    当调用类型(K,V)和(K,W)的数据集时,返回(K,(V,W))对的数据集以及每个键的所有元素对。通过leftOuterJoin,reghtOuterJoin和fullOuterJoin支持外连接。

    scala> var rdd1=sc.parallelize(Array(("001","张三"),("002","李四"),("003","王五")))
    scala> var rdd2=sc.parallelize(Array(("001",("apple",18.0)),("001",("orange",18.0))))
    scala> rdd1.join(rdd2).collect
    res43: Array[(String, (String, (String, Double)))] = Array((001,(张三,(apple,18.0))), (001,(张三,(orange,18.0))))

    cogroup
    当调用类型(K,V)和(K,W)的数据集时,返回(K,(Iterable,Iterable))元组的数据集。此操作也称为groupWith.

    scala> var rdd1=sc.parallelize(Array(("001","张三"),("002","李四"),("003","王五")))
    scala> var rdd2=sc.parallelize(Array(("001","apple"),("001","orange"),("002","book")))
    scala> rdd1.cogroup(rdd2).collect()
    res46: Array[(String, (Iterable[String], Iterable[String]))] = Array((001,(CompactBuffer(张三),CompactBuffer(apple, orange))), (002,(CompactBuffer(李四),CompactBuffer(book))), (003,(CompactBuffer(王五),CompactBuffer())))

    cartesian
    当调用类型为T和U的数据集时,返回(T,U)对的数据集

    scala> var rdd1=sc.parallelize(List("a","b","c"))
    scala> var rdd2=sc.parallelize(List(1,2,3,4))
    scala> rdd1.cartesian(rdd2).collect()
    res47: Array[(String, Int)] = Array((a,1), (a,2), (a,3), (a,4), (b,1), (b,2), (b,3), (b,4), (c,1), (c,2), (c,3), (c,4))

    coalesce(numPartitions)
    将RDD中的分区数减少为numPartitions。过滤大型数据集后,可以使用概算子减少分区数

    scala> sc.parallelize(List("ni hao","hello spark"),3).coalesce(1).partitions.length
    res50: Int = 1
    
    scala> sc.parallelize(List("ni hao","hello spark"),3).coalesce(1).getNumPartitions
    res51: Int = 1

    repartition
    随机重新调整RDD中的数据以创建更多或更少的分区。

    scala> sc.parallelize(List("a","b","c"),3).mapPartitionsWithIndex((index,values)=>for(i<-values) yield (index,i) ).collect
    res52: Array[(Int, String)] = Array((0,a), (1,b), (2,c))
    
    scala> sc.parallelize(List("a","b","c"),3).repartition(2).mapPartitionsWithIndex((index,values)=>for(i<-values) yield (index,i) ).collect
    res53: Array[(Int, String)] = Array((0,a), (0,c), (1,b))

     

    执行算子

    collect
    用在测试环境下,通常使用collect算子将远程计算的结果拿到Driver端,用于测试

    scala> var rdd1=sc.parallelize(List(1,2,3,4,5),3).collect().foreach(println)

    saveAsTextFile
    将计算结果存储在文件系统中,一般存储在HDFS上

    scala> sc.parallelize(List("ni hao","hello spark"),3).flatMap(_.split("\s+")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false,3).saveAsTextFile("hdfs:///wordcounts")

    foreach
    迭代遍历所有的RDD中的元素,通常是将foreach传递的数据写到外围系统中,比如说可以将数据写入到Hbase中。

    scala> sc.parallelize(List("ni hao","hello spark"),3).flatMap(.split("s+")).map((,1)).reduceByKey(+).sortBy(.2,false,3).foreach(println)
    (hao,1)
    (hello,1)
    (spark,1)
    (ni,1)

    注意如果使用以上代码写数据到外围系统,会因为不断创建和关闭连接影响写入效率,一般推荐使用foreachPartition

    val lineRDD: RDD[String] = sc.textFile("file:///E:/demo/words/t_word.txt")
    lineRDD.flatMap(line=>line.split(" "))
    .map(word=>(word,1))
    .groupByKey()
    .map(tuple=>(tuple._1,tuple._2.sum))
    .sortBy(tuple=>tuple._2,false,3)
    .foreachPartition(items=>{
    //创建连接
    items.foreach(t=>println("存储到数据库"+t))
    //关闭连接
    })

     

    共享变量

    变量广播
    通常情况下,当一个RDD的很多操作都需要使用driver中定义的变量时,每次操作,driver都要把变量发送给worker节点一次,如果这个变量中的数据很大的话,会产生很高的传输负载,导致执行效率降低。使用广播变量可以使程序高效地将一个很大的只读数据发送给多个worker节点,而且对每个worker节点只需要传输一次,每次操作时executor可以直接获取本地保存的数据副本,不需要多次传输。

    val conf = new SparkConf().setAppName("demo").setMaster("local[2]")
    val sc = new SparkContext(conf)
    
    val userList = List(
    "001,张三,28,0",
    "002,李四,18,1",
    "003,王五,38,0",
    "004,zhaoliu,38,-1"
    )
    val genderMap = Map("0" -> "", "1" -> "")
    val bcMap = sc.broadcast(genderMap)
    
    sc.parallelize(userList,3)
    .map(info=>{
    val prefix = info.substring(0, info.lastIndexOf(","))
    val gender = info.substring(info.lastIndexOf(",") + 1)
    val genderMapValue = bcMap.value
    val newGender = genderMapValue.getOrElse(gender, "未知")
    prefix + "," + newGender
    }).collect().foreach(println)
    
    sc.stop()

    累加变量

    Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能。但是确给我们提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。

    scala> var count=sc.longAccumulator("count")
    scala> sc.parallelize(List(1,2,3,4,5,6),3).foreach(item=> count.add(item))
    scala> count.value
    res1: Long = 21
  • 相关阅读:
    python 查看源代码
    团队项目5-冲刺合集
    系统设计(团队作业4)
    《次元唤醒 需求规格说明书v1.0》
    团队选题报告
    来自异次元的一篇博客
    《口算大作战 概念版》功能规格说明书
    我不会优化啊!!!
    Python装饰器实现异步回调
    Python杀死windows进程
  • 原文地址:https://www.cnblogs.com/chong-zuo3322/p/13265859.html
Copyright © 2011-2022 走看看