zoukankan      html  css  js  c++  java
  • spark Graphx 之 应用: 谁是网络红人?

    一、数据:user和各自粉丝的关系

    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User42,197134784))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User37,14559570),(User64,24742040))
    ((User31,63644892),(User10,123004655))
    ((User10,123004655),(User50,17613979))
    ((User37,14559570),(User11,14269220))
    ((User78,3365291),(User30,93905958))
    ((User14,199097645),(User60,16547411))
    ((User3,14874480),(User42,197134784))
    ((User40,813286),(User9,15434432))
    ((User10,123004655),(User34,10211502))
    ((User90,34870269),(User53,25566593))
    ((User12,24741956),(User60,16547411))
    ((User12,24741956),(User5,18927441))
    ((User37,14559570),(User39,22831657))
    ((User89,74286565),(User32,15670515))
    ((User89,74286565),(User32,15670515))
    ((User89,74286565),(User32,15670515))
    ((User89,74286565),(User32,15670515))
    ((User89,74286565),(User32,15670515))
    ((User89,74286565),(User32,15670515))
    ((User80,520835497),(User94,15913))
    ((User80,520835497),(User94,15913))
    ((User80,520835497),(User94,15913))
    ((User80,520835497),(User94,15913))
    ((User80,520835497),(User94,15913))
    ((User80,520835497),(User94,15913))
    ((User28,7465732),(User65,90569268))
    ((User89,74286565),(User9,15434432))
    ((User26,16112634),(User71,20065583))
    ((User80,520835497),(User94,15913))
    ((User75,15919138),(User94,15913))
    ((User1,849131),(User86,101760242))
    ((User36,59804598),(User79,30582362))
    ((User42,197134784),(User4,34236703))
    ((User48,63433165),(User25,20397258))
    ((User72,18859819),(User5,18927441))
    ((User47,86566510),(User24,183967095))
    ((User9,15434432),(User32,15670515))
    ((User85,38521400),(User74,811377))
    ((User28,7465732),(User78,3365291))
    ((User36,59804598),(User68,29758446))
    ((User41,15959795),(User36,59804598))
    ((User88,90961424),(User64,24742040))
    ((User13,16409781),(User87,24741685))
    ((User93,174347580),(User92,460486951))
    ((User93,174347580),(User77,54959836))
    ((User93,174347580),(User77,54959836))

    二、

    import org.apache.spark.graphx.{Edge, Graph}
    import org.apache.spark.sql.SparkSession
    
    import scala.collection.mutable.ListBuffer
    
    object Celebrity {
    
      /**
       * 思路 =>
       * 拿顶点:读文件,压平,distinct
       * 拿边:ID,ID,1 => 计数权重
       * 拿到第一indegrees的id,通过vertices.filter拿到顶点属性
       * @param args
       */
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("app").master("local[*]").getOrCreate()
        val sc = spark.sparkContext
        val rdd = sc.textFile("src/main/resources/Celebrity/twitter_graph_data.txt")
        // 拿顶点
        val vertices = rdd
          // ((User47,86566510),(User83,15647839))
          .flatMap(line => {
            val reg = "\(([a-zA-Z]+[0-9]{1,2}),([0-9]+)\)".r
            var arr: ListBuffer[(Long, String)] = ListBuffer[(Long, String)]()
            for (patternMatch <- reg.findAllMatchIn(line)) {
              arr.append((patternMatch.group(2).toLong, patternMatch.group(1)))
            }
            arr
          }).distinct()
        //.foreach(println)
        // 拿边:((User47,86566510),(User83,15647839)) => (User47,User83,1)
        val edge = rdd.map(line=>{
          // 拿出来的是一个String,需要分割或者正则
          val reg = "\([a-zA-Z]+[0-9]{1,2},([0-9]+)\),\([a-zA-Z]+[0-9]{1,2},([0-9]+)\)".r
          var arr: ListBuffer[(String, String)] = ListBuffer[(String, String)]()
          for (patternMatch <- reg.findAllMatchIn(line)) {
            arr.append((patternMatch.group(1),patternMatch.group(2)))
          }
          val tuple = arr.toList.head
          ((tuple._1.toLong,tuple._2.toLong),1)
          // 或者可以维持ListBuffer,然后map=>x._1
        }
        )
          .reduceByKey(_+_)
          .map(x=>Edge(x._1._1.toLong,x._1._2.toLong,x._2))
         // .foreach(println)
        // 网络红人:inDegrees大
        val graph = Graph(vertices, edge)
        // inDegrees => VertexRDD[VertexId,Int(有多少个入度)]
        val id = graph.inDegrees
          // local[*] 开启不同分区
          // sort by 部分排序
          // 现在全部合成一个 sortBy就行了
        //  .repartition(1).sortBy( - _._2).take(1)(0)._1
        // .sortBy(_._2,false,1).take(1)(0)._1
        //  .sortBy( - _._2).collect.take(1)(0)._1
        // 先局部有序,再分区效率高
        //  .map(x=>(x._2,x._1)).sortByKey(false,1).take(1)(0)._1
    
        vertices.filter(f=>f._1==id)
          //.foreach(println)
    
        spark.stop()
    
      }
    
    }

    涉及sortBy的分区排序方法:见 https://www.cnblogs.com/sabertobih/p/13792372.html

  • 相关阅读:
    Timer定时任务
    spring boot配置多数据源
    消费者模块调用提供者集群报错
    修改windHost文件
    spring常用注解+Aop
    添加ClustrMaps
    无题
    2020年3月21日 ICPC训练联盟周赛,Benelux Algorithm Programming Contest 2019
    2020年3月14日 ICPC训练联盟周赛,Preliminaries for Benelux Algorithm Programming Contest 2019
    2020.4.12 个人rating赛 解题+补题报告
  • 原文地址:https://www.cnblogs.com/sabertobih/p/13792365.html
Copyright © 2011-2022 走看看