zoukankan      html  css  js  c++  java
  • spark graphx图计算

    一、使用graph做好友推荐

    import org.apache.spark.graphx.{Edge, Graph, VertexId}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    //求共同好友
    object CommendFriend {
    
      def main(args: Array[String]): Unit = {
        //创建入口
        val conf: SparkConf = new SparkConf().setAppName("CommendFriend").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
        //点的集合
        //
        val uv: RDD[(VertexId,(String,Int))] = sc.parallelize(Seq(
          (133, ("毕东旭", 58)),
          (1, ("贺咪咪", 18)),
          (2, ("范闯", 19)),
          (9, ("贾璐燕", 24)),
          (6, ("马彪", 23)),
    
          (138, ("刘国建", 40)),
          (16, ("李亚茹", 18)),
          (21, ("任伟", 25)),
          (44, ("张冲霄", 22)),
    
          (158, ("郭佳瑞", 22)),
          (5, ("申志宇", 22)),
          (7, ("卫国强", 22))
        ))
        //边的集合
        //边Edge
        val ue: RDD[Edge[Int]] = sc.parallelize(Seq(
          Edge(1, 133,0),
          Edge(2, 133,0),
          Edge(9, 133,0),
          Edge(6, 133,0),
    
          Edge(6, 138,0),
          Edge(16, 138,0),
          Edge(44, 138,0),
          Edge(21, 138,0),
    
          Edge(5, 158,0),
          Edge(7, 158,0)
        ))
        //构建图(连通图)
        val graph: Graph[(String, Int), Int] = Graph(uv,ue)
        //调用连通图算法
        graph
          .connectedComponents()
          .vertices
          .join(uv)
          .map{
            case (uid,(minid,(name,age)))=>(minid,(uid,name,age))
          }.groupByKey()
          .foreach(println(_))
        //关闭
      }
    }

    二、用户标签数据合并Demo

    测试数据

    陌上花开 旧事酒浓 多情汉子 APP爱奇艺:10 BS龙德广场:8

    多情汉子 满心闯 K韩剧:20

    满心闯 喜欢不是爱 不是唯一 APP爱奇艺:10

    装逼卖萌无所不能 K欧莱雅面膜:5

    计算结果数据

    (-397860375,(List(喜欢不是爱, 不是唯一, 多情汉子, 多情汉子, 满心闯, 满心闯, 旧事酒浓, 陌上花开),List((APP爱奇艺,20), (K韩剧,20), (BS龙德广场,8))))

    (553023549,(List(装逼卖萌无所不能),List((K欧莱雅面膜,5))))

    import org.apache.spark.graphx.{Edge, Graph, VertexId}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    
    
    object UserRelationDemo {
    
      def main(args: Array[String]): Unit = {
        //创建入口
        val conf: SparkConf = new SparkConf().setAppName("CommendFriend").setMaster("local[*]")
        val sc: SparkContext = new SparkContext(conf)
    
        //读取数据
        val rdd: RDD[String] = sc.textFile("F:\dmp\graph")
    
        //点的集合
        val uv: RDD[(VertexId, (String, List[(String, Int)]))] = rdd.flatMap(line => {
          val arr: Array[String] = line.split(" ")
          val tags: List[(String, Int)] = arr.filter(_.contains(":")).map(tagstr => {
            val arr: Array[String] = tagstr.split(":")
            (arr(0), arr(1).toInt)
          }).toList
          val filterd: Array[String] = arr.filter(!_.contains(":"))
          filterd.map(nickname => {
           if(nickname.equals(filterd(0))) {
             (nickname.hashCode.toLong, (nickname, tags))
           }else{
             (nickname.hashCode.toLong, (nickname, List.empty))
           }
          })
        })
        //边的集合
        val ue: RDD[Edge[Int]] = rdd.flatMap(line => {
          val arr: Array[String] = line.split(" ")
          val filterd: Array[String] = arr.filter(!_.contains(":"))
          filterd.map(nickname => Edge(filterd(0).hashCode.toLong, nickname.hashCode.toLong, 0))
        })
        //构建图
        val graph: Graph[(String, List[(String, Int)]), Int] = Graph(uv,ue)
    
        //连通图算法找关系
        graph
          .connectedComponents()
          .vertices
          .join(uv)
          .map{
            case (uid,(minid,(nickname,list))) => (minid,(List(uid),List(nickname),list))
          }
          .reduceByKey{
            case (t1,t2) =>
              (
                t1._1++t2._1 distinct ,
                t1._2++t2._2 distinct,
                t1._3++t2._3.groupBy(_._1).mapValues(_.map(_._2).reduce(_+_))
                //.groupBy(_._1).mapValues(_.map(_._2).sum)
                // list.groupBy(_._1).mapValues(_.map(_._2).foldLeft(0)(_+_))
              )
          }
          .foreach(println(_))
    
        //关闭
        sc.stop()
      }
    }

    三、用户标签数据合并

    package cn.bw.mock.tags

    import cn.bw.mock.utils.TagsUtil
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.broadcast.Broadcast
    import org.apache.spark.graphx.{Edge, Graph, VertexId, VertexRDD}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{Row, SparkSession}
    import scala.collection.mutable.ListBuffer
    /**
      * Created by zcw on 2018/10/16
      */
    object TagsContextV2 {
      def main(args: Array[String]): Unit = {
        //1.判断参数的合法性
        if(args.length != 4){
          println(
            """
              |cn.bw.mock.tags.TagsContext
              |参数数量错误!!!
              |需要:
              |LogInputPath
              |AppDicPath
              |StopWordsDicPath
              |ResultOutputPath
            """.stripMargin)
          sys.exit()
        }
        //2.接受参数
        val Array(logInputPath,appDicPath,stopWordsDicPath,resultOutputPath) = args
        //3.创建SparkSession
        val conf: SparkConf = new SparkConf()
          .setAppName(s"${this.getClass.getSimpleName}")
          .setMaster("local")
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        val spark: SparkSession = SparkSession
          .builder()
          .config(conf)
          .getOrCreate()
        val sc: SparkContext = spark.sparkContext
        //4.读取app字典
        val appDic: Map[String, String] = sc.textFile(appDicPath).map(line => {
          val fields: Array[String] = line.split(":")
          (fields(0), fields(1))
        }).collect().toMap
        //5.广播app字典
        val appdicBC: Broadcast[Map[String, String]] = sc.broadcast(appDic)
        //6.读取停用词
        val stopwordsDic: Map[String, Int] = sc.textFile(stopWordsDicPath).map((_,1)).collect().toMap
        //7.广播通用词典
        val stopwordsBC: Broadcast[Map[String, Int]] = sc.broadcast(stopwordsDic)
        import spark.implicits._
        val baseRDD: RDD[Row] = spark.read.parquet(logInputPath).where(TagsUtil.hasSomeUserIdCondition).rdd
        //点
        val uv: RDD[(VertexId, (ListBuffer[String], List[(String, Int)]))] = baseRDD.map(
          row => {
            //广告标签
            val adsMap: Map[String, Int] = Tags4Ads.makeTags(row)
            //APP标签
            val appMap: Map[String, Int] = Tags4App.makeTags(row, appdicBC.value)
            //地域标签
            val areaMap: Map[String, Int] = Tags4Area.makeTags(row)
            //设备标签
            val deviceMap: Map[String, Int] = Tags4Device.makeTags(row)
            //关键词标签
            val keywordsMap: Map[String, Int] = Tags4KeyWords.makeTags(row, stopwordsBC.value)
            //获取用户id
            val allUserIDs: ListBuffer[String] = TagsUtil.getAllUserId(row)
            //用户的标签
            val tags = (adsMap ++ appMap ++ areaMap ++ deviceMap ++ keywordsMap).toList
            (allUserIDs(0).hashCode.toLong, (allUserIDs, tags))
          }
        )
        //边
        val ue: RDD[Edge[Int]] = baseRDD.flatMap(row => {
          //获取用户id
          val allUserIDs: ListBuffer[String] = TagsUtil.getAllUserId(row)
          allUserIDs.map(uid => Edge(allUserIDs(0).hashCode.toLong, uid.hashCode.toLong, 0))
        })
        //图
        val graph = Graph(uv,ue)
        //连通图
        val vertices: VertexRDD[VertexId] = graph.connectedComponents().vertices
        //join
        vertices.join(uv).map{
          case(uid,(commid,(uids,tags))) => (commid,(uids,tags))
        }.reduceByKey{
          case (t1,t2) => (t1._1 ++ t2._1.distinct,(t1._2 ++ t2._2).groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2)).toList)
        }.saveAsTextFile(resultOutputPath)
        //关闭SparkSession
        spark.close()
      }
    }

    四、用户最终标签和衰减系数

  • 相关阅读:
    Codeforces 834D The Bakery
    hdu 1394 Minimum Inversion Number
    Codeforces 837E Vasya's Function
    Codeforces 837D Round Subset
    Codeforces 825E Minimal Labels
    Codeforces 437D The Child and Zoo
    Codeforces 822D My pretty girl Noora
    Codeforces 799D Field expansion
    Codeforces 438D The Child and Sequence
    Codeforces Round #427 (Div. 2) Problem D Palindromic characteristics (Codeforces 835D)
  • 原文地址:https://www.cnblogs.com/JBLi/p/11552443.html
Copyright © 2011-2022 走看看