zoukankan      html  css  js  c++  java
  • 【原创】大叔案例分享(5)id打通

    经常有一些需要做id打通的场景,比如用户id打通等,

    问题抽象是每条数据都可以解析出一个或多个kv pair:(id_type,id),然后需要将某一个kv pair匹配的多条数据进行merge;

    比如:

    data1: Array(('type1', 'id1'), ('type2', 'id2'))

    data2: Array(('type1', 'id1'), ('type3', 'id3'))

    data3: Array(('type2', 'id2'), ('type4', 'id4'))

    其中data1和data2通过('type1', 'id1')打通,data1和data3通过('type2', 'id2')打通,最终data1、data2、data3打通成一条数据

    data_union: Array(('type1', 'id1'), ('type2', 'id2'), , ('type3', 'id3'), , ('type4', 'id4'))

    先定义基础类和方法

      class Data {
        def getId : String = ""
      }
    
      def merge(dataArr : Array[(Map[Byte, String], Data)]) : (Map[Byte, String], Data) = dataArr.head
      def generateUUID : String = ""

    其中

    1)Data表示数据抽象,每条数据都有一个id;

    2)Map[Byte, String]表示数据中的kv pair,即 Map[id_type, id]

    3)merge将多条数据打通成一条数据;

    先看最简单的递归实现

      def unionDataRDD1(rdd : RDD[(Map[Byte, String], Data)]) : RDD[(Map[Byte, String], Data)] = {
        var result = rdd.keyBy(_._2.getId).groupByKey.map(item => merge(item._2.toArray)).cache
        //Array[id_type]
        val idTypes = result.flatMap(item => item._1.keys).distinct.collect
        idTypes.foreach(item => result = result.filter(_._1.contains(item)).keyBy(_._1.get(item).get).groupByKey.map(item => merge(item._2.toArray)).union(result.filter(!_._1.contains(item))))
        result
      }

    性能不太好,再看优化后的非递归实现

      def unionDataRDD2(rdd : RDD[(Map[Byte, String], Data)]) : RDD[(Map[Byte, String], Data)] = {
        val result = rdd.keyBy(_._2.getId).groupByKey.map(item => merge(item._2.toArray)).cache
    
        //((id_type, id), group)
        val idGroupRDD = result.flatMap(item => {val uuid = generateUUID; item._1.toArray.map(entry => (entry, uuid))}).cache
        //Array(Array(group))
        val unionMap = idGroupRDD.groupByKey.map(_._2.toArray.distinct).filter(_.length > 1).collect
          //Map(group -> union_group)
          .foldLeft(Map[String, String]())((resultUnion, arr) => {
          val existingGroupMap = arr.collect({case group : String if resultUnion.contains(group) => (group, resultUnion.get(group).get)}).toMap
          if (existingGroupMap == null || existingGroupMap.isEmpty) resultUnion ++ arr.collect({case group : String => (group -> arr.head)}).toMap
          else if (existingGroupMap.size == 1) resultUnion ++ arr.collect({case group : String => (group -> existingGroupMap.head._2)}).toMap
          else {
            val newUnionMap = existingGroupMap.map(_._2).collect({case group : String => (group -> existingGroupMap.head._2)}).toMap
            resultUnion.collect({case entry : (String, String) => if (newUnionMap.contains(entry._2)) (entry._1, newUnionMap.get(entry._2).get) else entry}) ++ arr.collect({case group : String => (group -> newUnionMap.head._2)}).toMap
          }
        })
    
        //((id_type, id), union_group)
        val groupMap = idGroupRDD.map(item => (item._1, if (unionMap.contains(item._2)) unionMap.get(item._2).get else null)).filter(_._2 != null).collect.toMap
        //(union_group, data)
        val groupRDDWithUnion = result.map(item => (item._1.collectFirst({case entry : (Byte, String) if groupMap.contains(entry) => groupMap.get(entry).get}), item)).cache
        groupRDDWithUnion.filter(_._1 != None).groupByKey.map(item => merge(item._2.toArray)).union(groupRDDWithUnion.filter(_._1 == None).map(_._2))
      }

    第二版优化

      def unionDataRDD3(rdd : RDD[(Map[Byte, String], Data)]) : RDD[(Map[Byte, String], Data)] = {
        val result = rdd.keyBy(_._2.getId).groupByKey.map(item => merge(item._2.toArray)).cache
    
        //((id_type, id), Set[group])
        val idGroupArray = result.zipWithUniqueId().flatMap(item => item._1._1.toArray.map(entry => (entry, item._2.toString))).aggregateByKey(Set[String]())((result, item) => result + item, (result1, result2) => result1 ++ result2).collect
    
        //Array(Array(group))
        val unionMap = idGroupArray.map(_._2).foldLeft(Map[String, String]())((resultUnion, arr) => {
          val existingGroupMap = arr.collect({case group : String if resultUnion.contains(group) => (group, resultUnion.get(group).get)}).toMap
          if (existingGroupMap == null || existingGroupMap.isEmpty) resultUnion ++ arr.collect({case group : String => (group -> arr.head)}).toMap
          else if (existingGroupMap.size == 1) resultUnion ++ arr.collect({case group : String => (group -> existingGroupMap.head._2)}).toMap
          else {
            val newUnionMap = existingGroupMap.map(_._2).collect({case group : String => (group -> existingGroupMap.head._2)}).toMap
            resultUnion.collect({case entry : (String, String) => if (newUnionMap.contains(entry._2)) (entry._1, newUnionMap.get(entry._2).get) else entry}) ++ arr.collect({case group : String => (group -> newUnionMap.head._2)}).toMap
          }
        })
    
        //(id_type, (id, union_group))
        val groupMap = idGroupArray.foldLeft(Map[Byte, Map[String, String]]())((result, item) => if (!result.contains(item._1._1)) result + (item._1._1 -> Map(item._1._2 -> unionMap.get(item._2.head).get)) else result + (item._1._1 -> (result.get(item._1._1).get + (item._1._2 -> unionMap.get(item._2.head).get))))
        //(union_group, order)
        result.map(item => (item._1.collectFirst({case entry : (Byte, String) if groupMap.contains(entry._1) && groupMap.get(entry._1).get.contains(entry._2) => groupMap.get(entry._1).get.get(entry._2).get}), item)).groupByKey.map(item => merge(item._2.toArray))
      }
  • 相关阅读:
    读书笔记,《我还是喜欢东京——带你感受城市细节》
    学习笔记:Maven的ArcheType的学习笔记
    如何从中企动力(新网)转移域名到阿里云(万网)
    Maven自定义Archetype(zz)
    读书笔记,《Java 8实战》第五章,使用流
    读书笔记,《Java 8实战》,第四章,引入流
    读书笔记,《Java 8实战》,第三章,Lambda表达式
    读书笔记,《Java8实战》第一章,为什么要关心 Java8
    读书笔记,《深入理解java虚拟机》,第三章 垃圾收集器与内存分配策略
    行业知识:关于发电量与碳排放和等效植树的换算关系
  • 原文地址:https://www.cnblogs.com/barneywill/p/10987452.html
Copyright © 2011-2022 走看看