zoukankan      html  css  js  c++  java
  • 029 RDD Join相关API,以及程序

    1.数据集  

      A表数据:
        1 a
        2 b
        3 c
      B表数据:
        1 aa1
        1 aa2
        2 bb1
        2 bb2
        2 bb3
        4 dd1

    2.join的分类

      inner join

      left outer join

      right outer join

      full outer join

      left semi join

      

    3.集中join的结果

      A inner join B:
        1 a 1 aa1
        1 a 1 aa2
        2 b 2 bb1
        2 b 2 bb2
        2 b 2 bb3

      A left outer join B:
        1 a 1 aa1
        1 a 1 aa2
        2 b 2 bb1
        2 b 2 bb2
        2 b 2 bb3
        3 c null null

      A right outer join B:
        1 a 1 aa1
        1 a 1 aa2
        2 b 2 bb1
        2 b 2 bb2
        2 b 2 bb3
        null null 4 dd1

      A full outer join B:
        1 a 1 aa1
        1 a 1 aa2
        2 b 2 bb1
        2 b 2 bb2
        2 b 2 bb3
        3 c null null
        null null 4 dd1

      A left semi join B:(。。。。。注意。。。。。。)
        1 a
        2 b

    4.API  

    def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
      返回值是RDD,RDD中的类型是一个二元组(a),a第一个元素是KEY类型的值(join的key), a第二个元素又是二元组(b), b的第一个元素是来自调用join函数的RDD的value,
      b的第二个元素是来自参数other这个RDD的value

    def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
      对于右边的数据返回的是Option类型是数据,所以如果右表数据不存在,返回的是None;否则是一个Some的具体数据

    def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
      对于左边的数据返回的是Option类型是数据,所以如果左表数据不存在,返回的是None;否则是一个Some的具体数据

    def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
      返回的value类型是Option封装后的数据,如果数据不存在, 返回的是None,存在返回的是Some具体数据

    5.其他方式实现join

      

     

    6.join程序以及非join实现join

      1 package com.ibeifeng.senior.join
      2 
      3 import org.apache.spark.{SparkConf, SparkContext}
      4 
      5 /**
      6   * RDD数据Join相关API讲解
      7   * Created by ibf on 02/09.
      8   */
      9 object RDDJoin {
     10   def main(args: Array[String]): Unit = {
     11     val conf = new SparkConf()
     12       .setMaster("local[*]")
     13       .setAppName("RDD-Join")
     14     val sc = SparkContext.getOrCreate(conf)
     15 
     16     // ==================具体代码======================
     17     // 模拟数据产生
     18     val rdd1 = sc.parallelize(Array(
     19       (1, "张三1"),
     20       (1, "张三2"),
     21       (2, "李四"),
     22       (3, "王五"),
     23       (4, "Tom"),
     24       (5, "Gerry"),
     25       (6, "莉莉")
     26     ), 1)
     27 
     28     val rdd2 = sc.parallelize(Array(
     29       (1, "上海"),
     30       (2, "北京1"),
     31       (2, "北京2"),
     32       (3, "南京"),
     33       (4, "纽约"),
     34       (6, "深圳"),
     35       (7, "香港")
     36     ), 1)
     37 
     38     // 调用RDD API实现内连接
     39     val joinResultRDD = rdd1.join(rdd2).map {
     40       case (id, (name, address)) => {
     41         (id, name, address)
     42       }
     43     }
     44     println("----------------")
     45     joinResultRDD.foreachPartition(iter => {
     46       iter.foreach(println)
     47     })
     48     // 调用RDD API实现左外连接
     49     val leftJoinResultRDd = rdd1.leftOuterJoin(rdd2).map {
     50       case (id, (name, addressOption)) => {
     51         (id, name, addressOption.getOrElse("NULL"))
     52       }
     53     }
     54     println("----------------")
     55     leftJoinResultRDd.foreachPartition(iter => {
     56       iter.foreach(println)
     57     })
     58     // 左外连接稍微变化一下:需要左表出现,右表不出现的数据(not in)
     59     println("----------------")
     60     rdd1.leftOuterJoin(rdd2).filter(_._2._2.isEmpty).map {
     61       case (id, (name, _)) => (id, name)
     62     }.foreachPartition(iter => {
     63       iter.foreach(println)
     64     })
     65 
     66     // 右外连接
     67     println("----------------")
     68     rdd1
     69       .rightOuterJoin(rdd2)
     70       .map {
     71         case (id, (nameOption, address)) => {
     72           (id, nameOption.getOrElse("NULL"), address)
     73         }
     74       }
     75       .foreachPartition(iter => iter.foreach(println))
     76 
     77     // 全外连接
     78     println("----------------")
     79     rdd1
     80       .fullOuterJoin(rdd2)
     81       .map {
     82         case (id, (nameOption, addressOption)) => {
     83           (id, nameOption.getOrElse("NULL"), addressOption.getOrElse("NULL"))
     84         }
     85       }
     86       .foreachPartition(iter => iter.foreach(println))
     87 
     88     ///////////////////////////////////////////假设rdd2的数据比较少,将rdd2的数据广播出去///////////////////////////////////////
     89     val leastRDDCollection = rdd2.collect()
     90     val broadcastRDDCollection = sc.broadcast(leastRDDCollection) 93     // Inner Join 95     rdd1
     96       // 过滤rdd1中的数据,只要在rdd1中出现的数据,没有出现的数据过滤掉
     97       .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))
     98       // 数据合并,由于一条rdd1的数据可能在rdd2中存在多条对应数据,所以使用fla  tMap
     99       .flatMap {
    100       case (id, name) => {
    101         broadcastRDDCollection.value.filter(_._1 == id).map {
    102           case (_, address) => {
    103             (id, name, address)
    104           }
    105         }
    106       }
    107     }
    108       .foreachPartition(iter => iter.foreach(println))
    109 
    110     // 左外连接
    111     println("---------------------")
    112     rdd1
    113       .flatMap {
    114         case (id, name) => {
    115           // 从右表所属的广播变量中获取对应id的集合列表
    116           val list = broadcastRDDCollection.value.filter(_._1 == id)
    117           // 对应id的集合可能为空,也可能数据有多个
    118           if (list.nonEmpty) {
    119             // 存在多个
    120             list.map(tuple => (id, name, tuple._2))
    121           } else {
    122             // id在右表中不存在,填默认值
    123             (id, name, "NULL") :: Nil
    124           }
    125         }
    126       }
    127       .foreachPartition(iter => iter.foreach(println))
    128 
    129     // 右外连接
    130     /**
    131       * rdd2中所有数据出现,由于rdd2中的数据在driver中可以存储,可以认为rdd1和rdd2通过right join之后的数据也可以在driver中保存下
    132       **/
    133     println("---------------------")
    134     // 将rdd1中符合条件的数据过滤出来保存到driver中
    135     val stage1 = rdd1
    136       .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))
    137       .collect()
    138     // 将driver中两个集合进行right join
    139     val stage2 = leastRDDCollection.flatMap {
    140       case (id, address) => {
    141         val list = stage1.filter(_._1 == id)
    142         if (list.nonEmpty) {
    143           list.map(tuple => (id, tuple._2, address))
    144         } else {
    145           Iterator.single((id, "NULL", address))
    146         }
    147       }
    148     }
    149     stage2.foreach(println)
    150 
    151     // TODO: 全外连接,不写代码,因为代码比较复杂
    152   
    153     //====================================
    154     // 左半连接:只出现左表数据(要求数据必须在右表中也出现过),如果左表的数据在右表中出现多次,最终结果只出现一次
    155     println("+++++++++++++++++")
    156     println("-----------------------")
    157     rdd1
    158       .join(rdd2)
    159       .map {
    160         case (id, (name, _)) => (id, name)
    161       }
    162       .distinct()
    163       .foreachPartition(iter => iter.foreach(println))
    164     println("------------------------")
    165     rdd1
    166       .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))
    167       .foreachPartition(iter => iter.foreach(println))
    168 
    169     // 休眠为了看4040页面
    170         Thread.sleep(1000000)
    171   }
    172 }

    6.

  • 相关阅读:
    有向图的邻接表--p137-p138
    有向图的邻接矩阵--p136
    无向带权图的邻接矩阵表示--p135
    什么是视频关键帧?流媒体服务器如何提取视频的关键帧?
    电力系统无人值守变电站如何通过流媒体服务器实现随时随地监控
    流媒体服务器如何通过opencv获取IP摄像头(IP-camera)实时视频流
    如何在脱离流媒体服务器的时候使用ffmpeg 监测.m3u8直播视频流的状态?
    流媒体服务器如何在浏览器播放RTSP格式的视频流?
    AI安防监控如何与越来越进步的智能时代结合?
    SDI摄像机和IPC网络高清摄像机有什么区别?如何选择?
  • 原文地址:https://www.cnblogs.com/juncaoit/p/6528078.html
Copyright © 2011-2022 走看看