zoukankan      html  css  js  c++  java
  • spark Pair RDD 基础操作

    下面是Pair RDD的API讲解

    转化操作
    reduceByKey:合并具有相同键的值;
    groupByKey:对具有相同键的值进行分组;
    keys:返回一个仅包含键值的RDD;
    values:返回一个仅包含值的RDD;
    sortByKey:返回一个根据键值排序的RDD;
    flatMapValues:针对Pair RDD中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生成一个对应原键的键值对记录;
    mapValues:对Pair RDD里每一个值应用一个函数,但是不会对键值进行操作;
    combineByKey:使用不同的返回类型合并具有相同键的值;
    subtractByKey:操作的RDD我们命名为RDD1,参数RDD命名为参数RDD,剔除掉RDD1里和参数RDD中键相同的元素;
    join:对两个RDD进行内连接;
    rightOuterJoin:对两个RDD进行连接操作,第一个RDD的键必须存在,第二个RDD的键不再第一个RDD里面有那么就会被剔除掉,相同键的值会被合并;
    leftOuterJoin:对两个RDD进行连接操作,第二个RDD的键必须存在,第一个RDD的键不再第二个RDD里面有那么就会被剔除掉,相同键的值会被合并;
    cogroup:将两个RDD里相同键的数据分组在一起
    

      

    行动操作
    countByKey:对每个键的元素进行分别计数;
    collectAsMap:将结果变成一个map;
    lookup:在RDD里使用键值查找数据
    

      

    采样相关操作:
       转化:sample:对RDD采样;
       行动:
        take(num):返回RDD里num个元素,随机的;
        top(num):返回RDD里最前面的num个元素,这个方法实用性还比较高;
        takeSample:从RDD里返回任意一些元素;
        sample:对RDD里的数据采样;
        takeOrdered:从RDD里按照提供的顺序返回最前面的num个元素
    
    构建Pair RDD
    def createPairMap():Unit = {
        val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26)))
        val r:RDD[(String,Int)] = rdd.reduceByKey((x,y) => x + y)
        println("=========createPairMap=========")
        println(r.collect().mkString(","))// (k01,29),(k03,2),(k02,6)
        println("=========createPairMap=========")
         
        /*
         * 测试文件数据:
         * x01,1,4
                 x02,11,1
                 x01,3,9
                 x01,2,6
           x02,18,12
           x03,7,9
         *
         * */
        val rddFile:RDD[(String,String)] = sc.textFile("file:///F:/sparkdata01.txt", 1).map { x => (x.split(",")(0),x.split(",")(1) + "," + x.split(",")(2)) }
        val rFile:RDD[String] = rddFile.keys
        println("=========createPairMap File=========")
        println(rFile.collect().mkString(","))// x01,x02,x01,x01,x02,x03
        println("=========createPairMap File=========")
      }
       
    

    ============下面有两段示例代码,注意下面示例代码中返回值的数据类型===========

    关于Pair RDD的转化操作和行动操作
    def pairMapRDD(path:String):Unit = {
        val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26)))
        val other:RDD[(String,Int)] = sc.parallelize(List(("k01",29)), 1)
         
        // 转化操作
        val rddReduce:RDD[(String,Int)] = rdd.reduceByKey((x,y) => x + y)
        println("====reduceByKey===:" + rddReduce.collect().mkString(","))// (k01,29),(k03,2),(k02,6)
        val rddGroup:RDD[(String,Iterable[Int])] = rdd.groupByKey()
        println("====groupByKey===:" + rddGroup.collect().mkString(","))// (k01,CompactBuffer(3, 26)),(k03,CompactBuffer(2)),(k02,CompactBuffer(6))
        val rddKeys:RDD[String] = rdd.keys
        println("====keys=====:" + rddKeys.collect().mkString(","))// k01,k02,k03,k01
        val rddVals:RDD[Int] = rdd.values
        println("======values===:" + rddVals.collect().mkString(","))// 3,6,2,26
        val rddSortAsc:RDD[(String,Int)] = rdd.sortByKey(true, 1)
        val rddSortDes:RDD[(String,Int)] = rdd.sortByKey(false, 1)
        println("====rddSortAsc=====:" + rddSortAsc.collect().mkString(","))// (k01,3),(k01,26),(k02,6),(k03,2)
        println("======rddSortDes=====:" + rddSortDes.collect().mkString(","))// (k03,2),(k02,6),(k01,3),(k01,26)
        val rddFmVal:RDD[(String,Int)] = rdd.flatMapValues { x => List(x + 10) }
        println("====flatMapValues===:" + rddFmVal.collect().mkString(","))// (k01,13),(k02,16),(k03,12),(k01,36)
        val rddMapVal:RDD[(String,Int)] = rdd.mapValues { x => x + 10 }
        println("====mapValues====:" + rddMapVal.collect().mkString(","))// (k01,13),(k02,16),(k03,12),(k01,36)
        val rddCombine:RDD[(String,(Int,Int))] = rdd.combineByKey(x => (x,1), (param:(Int,Int),x) => (param._1 + x,param._2 + 1), (p1:(Int,Int),p2:(Int,Int)) => (p1._1 + p2._1,p1._2 + p2._2))
        println("====combineByKey====:" + rddCombine.collect().mkString(","))//(k01,(29,2)),(k03,(2,1)),(k02,(6,1))
        val rddSubtract:RDD[(String,Int)] = rdd.subtractByKey(other);
        println("====subtractByKey====:" + rddSubtract.collect().mkString(","))// (k03,2),(k02,6)
        val rddJoin:RDD[(String,(Int,Int))] = rdd.join(other)
        println("=====rddJoin====:" + rddJoin.collect().mkString(","))// (k01,(3,29)),(k01,(26,29))
        val rddRight:RDD[(String,(Option[Int],Int))] = rdd.rightOuterJoin(other)
        println("====rightOuterJoin=====:" + rddRight.collect().mkString(","))// (k01,(Some(3),29)),(k01,(Some(26),29))
        val rddLeft:RDD[(String,(Int,Option[Int]))] = rdd.leftOuterJoin(other)
        println("=====rddLeft=====:" + rddLeft.collect().mkString(","))// (k01,(3,Some(29))),(k01,(26,Some(29))),(k03,(2,None)),(k02,(6,None))
        val rddCogroup: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd.cogroup(other)
        println("=====cogroup=====:" + rddCogroup.collect().mkString(","))// (k01,(CompactBuffer(3, 26),CompactBuffer(29))),(k03,(CompactBuffer(2),CompactBuffer())),(k02,(CompactBuffer(6),CompactBuffer()))
         
        // 行动操作
        val resCountByKey = rdd.countByKey()
        println("=====countByKey=====:" + resCountByKey)// Map(k01 -> 2, k03 -> 1, k02 -> 1)
        val resColMap = rdd.collectAsMap()
        println("=====resColMap=====:" + resColMap)//Map(k02 -> 6, k01 -> 26, k03 -> 2)
        val resLookup = rdd.lookup("k01")
        println("====lookup===:" + resLookup) // WrappedArray(3, 26)
      }
       
      /**
       * 其他一些不常用的RDD操作
       */
      def otherRDDOperate(){
        val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26)))
         
        println("=====first=====:" + rdd.first())//(k01,3)
        val resTop = rdd.top(2).map(x => x._1 + ";" + x._2)
        println("=====top=====:" + resTop.mkString(","))// k03;2,k02;6
        val resTake = rdd.take(2).map(x => x._1 + ";" + x._2)
        println("=======take====:" + resTake.mkString(","))// k01;3,k02;6
        val resTakeSample = rdd.takeSample(false, 2).map(x => x._1 + ";" + x._2)
        println("=====takeSample====:" + resTakeSample.mkString(","))// k01;26,k03;2
        val resSample1 = rdd.sample(false, 0.25)
        val resSample2 = rdd.sample(false, 0.75)
        val resSample3 = rdd.sample(false, 0.5)
        println("=====sample======:" + resSample1.collect().mkString(","))// 无
        println("=====sample======:" + resSample2.collect().mkString(","))// (k01,3),(k02,6),(k01,26)
        println("=====sample======:" + resSample3.collect().mkString(","))// (k01,3),(k01,26)
      }
    

      

  • 相关阅读:
    过度使用DBLINK做系统集成会带来的问题
    微服务架构优缺点
    linux + svn提交日志不能显示 日期一直都是1970-01-01
    maven 无法导入ojdbc 的jar包 解决方法
    认识webservice
    tensorflow-gpu2.1缺少libcudnn.so.7
    tensorflow-gpu2.1.0报错 so returning NUMA node zero解决办法
    基于YOLO-V2的行人检测(自训练)附pytorch安装方法
    电脑键盘背景灯无法控制
    pip升级失败
  • 原文地址:https://www.cnblogs.com/testzcy/p/9064023.html
Copyright © 2011-2022 走看看