zoukankan      html  css  js  c++  java
  • Spark基础:(三)Spark 键值对操作

    1、pair RDD的简介

    Spark为包含键值对类型的RDD提供了一些专有的操作,这些RDD就被称为pair RDD
    那么如何创建pair RDD呢? 在不同的语言中有着不同的创建方式
    在python和Scala语言中创建的方式都是差不多的。
    在java语言中:
    java用户还需要调用专门的Spark函数mapToPair()来创建pair RDD。例如:

     //映射,word -> (word,1)
            JavaPairRDD<String,Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<String, Integer>(s,1);
                }
            });

    2、pair RDD常见的转化操作

    (1)以键值对{(1,2),(3,4),(3,6)}为例子

    rdd.reduceByKey((x,y)=>x+y) ===> {(1,2),(3,10)}
    rdd.groupByKey() ===> {(1,[2]),(3,[4,6])}
    rdd.mapValues(x=>x+1) ===> {(1,3),(3,5),(3,7)}
    rdd.flatMapValues(x=>(x to 5)) ===> {(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)}

    (2)针对两个pair RDD的转化,以键值对rdd={(1,2),(3,4),(3,6)}和orther={(3,9)}

    rdd.substractByKey(orther) ==> {(1,2)} #删除相同的
    rdd.join(orther) ==> {(3,(4,9)),(3,(6,9))}
    rdd.rightOuterJoin(orther) ==> {(3,(Some(4),9)),(3,(Some(6),9))}
    rdd.leftOuterJoin(orther) ==> {(1,(2,None)),(3,(4,Some(9))),(3,(6,Some(9)))}
    rdd.cogroupn(orther) ==> {(1,([2],[])),(3,([4,6],[9]))} #将两个RDD中相同的键的数据分组

    (3)聚合操作
    reduceByKey()前面已经讲过,此处不再赘述。

    并行度调优: 每个RDD都有自己固定的数目的分区,分区数决定了RDD上的执行操作的并行度,在执行聚合或者分组操作时,可以要求Spark使用给定的分区数。Spark始终尝试根据集群的大小推断一些有意义默认值。但是,有时候可以根据并行度的调优来获取更好的性能要求。

    Spark还提供了repartition()函数,他会把数据通过网络进行混洗,并创建出新的分区集合,但是对数据进行重新分区是代价比较大的操作。为此,Spark提供了一款优化版的repartition(),叫coalesce()。(我们可以通过rdd.getNumPartitions查看RDD的分区)

    (4)分组操作
    groupByKey()前面已经讲过,此处不再赘述。需要注意的是:返回的是[K,Iterable[V]]类型

    (5)连接操作
    join的操作,前面已经简单介绍

    (6)数据排序
    rdd.sortByKey() #注意要提供自定义的比较函数

    3、pair RDD的行动操作

    以键值对{(1,2),(3,4),(3,6)}为例子

    rdd.countByKey() ===> {(1,1),(3,2)} #统计键出现的次数
    rdd.collectAsMap() ===> Map{(1,2),(3,4),(3,6)} #返回的Map 便于查询
    rdd.lookup(3) ===> [4,6] #返回给定键对应的所有值

    4、数据分区

    自定义分区并且持久化降低网络通信的开销
    例如:Scala实现的例子

    val sc=new SparkContext(…) val
    userData=sc.sequenceFile[UserID,UserInfo](“hdfs://…”)
    .partitionBy(new HashPartitioner(100)) //构造100个分区
    .persist()

    同样的我们还可以通过partitioner方法来获取RDD的分区方式

    希望在知识中书写人生的代码
  • 相关阅读:
    记录学生的日常
    el表达式与jstl的用法
    学习心得——袁康凯
    学习心得——刘文瑞
    学习心得——高婕
    学习心得——张坤鹏
    Mysql中文乱码 小强斋
    面试题>字符串匹配 小强斋
    选秀节目打分 小强斋
    面试题>字符串匹配 小强斋
  • 原文地址:https://www.cnblogs.com/tongxupeng/p/10259549.html
Copyright © 2011-2022 走看看