zoukankan      html  css  js  c++  java
  • spark Graphx 之 应用: 谁是网络红人?

    一、数据:user和各自粉丝的关系

    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User42,197134784))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User37,14559570),(User64,24742040))
    ((User31,63644892),(User10,123004655))
    ((User10,123004655),(User50,17613979))
    ((User37,14559570),(User11,14269220))
    ((User78,3365291),(User30,93905958))
    ((User14,199097645),(User60,16547411))
    ((User3,14874480),(User42,197134784))
    ((User40,813286),(User9,15434432))
    ((User10,123004655),(User34,10211502))
    ((User90,34870269),(User53,25566593))
    ((User12,24741956),(User60,16547411))
    ((User12,24741956),(User5,18927441))
    ((User37,14559570),(User39,22831657))
    ((User89,74286565),(User32,15670515))
    ((User89,74286565),(User32,15670515))
    ((User89,74286565),(User32,15670515))
    ((User89,74286565),(User32,15670515))
    ((User89,74286565),(User32,15670515))
    ((User89,74286565),(User32,15670515))
    ((User80,520835497),(User94,15913))
    ((User80,520835497),(User94,15913))
    ((User80,520835497),(User94,15913))
    ((User80,520835497),(User94,15913))
    ((User80,520835497),(User94,15913))
    ((User80,520835497),(User94,15913))
    ((User28,7465732),(User65,90569268))
    ((User89,74286565),(User9,15434432))
    ((User26,16112634),(User71,20065583))
    ((User80,520835497),(User94,15913))
    ((User75,15919138),(User94,15913))
    ((User1,849131),(User86,101760242))
    ((User36,59804598),(User79,30582362))
    ((User42,197134784),(User4,34236703))
    ((User48,63433165),(User25,20397258))
    ((User72,18859819),(User5,18927441))
    ((User47,86566510),(User24,183967095))
    ((User9,15434432),(User32,15670515))
    ((User85,38521400),(User74,811377))
    ((User28,7465732),(User78,3365291))
    ((User36,59804598),(User68,29758446))
    ((User41,15959795),(User36,59804598))
    ((User88,90961424),(User64,24742040))
    ((User13,16409781),(User87,24741685))
    ((User93,174347580),(User92,460486951))
    ((User93,174347580),(User77,54959836))
    ((User93,174347580),(User77,54959836))

    二、

    import org.apache.spark.graphx.{Edge, Graph}
    import org.apache.spark.sql.SparkSession
    
    import scala.collection.mutable.ListBuffer
    
    object Celebrity {
    
      /**
       * 思路 =>
       * 拿顶点:读文件,压平,distinct
       * 拿边:ID,ID,1 => 计数权重
       * 拿到第一indegrees的id,通过vertices.filter拿到顶点属性
       * @param args
       */
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("app").master("local[*]").getOrCreate()
        val sc = spark.sparkContext
        val rdd = sc.textFile("src/main/resources/Celebrity/twitter_graph_data.txt")
        // 拿顶点
        val vertices = rdd
          // ((User47,86566510),(User83,15647839))
          .flatMap(line => {
            val reg = "\(([a-zA-Z]+[0-9]{1,2}),([0-9]+)\)".r
            var arr: ListBuffer[(Long, String)] = ListBuffer[(Long, String)]()
            for (patternMatch <- reg.findAllMatchIn(line)) {
              arr.append((patternMatch.group(2).toLong, patternMatch.group(1)))
            }
            arr
          }).distinct()
        //.foreach(println)
        // 拿边:((User47,86566510),(User83,15647839)) => (User47,User83,1)
        val edge = rdd.map(line=>{
          // 拿出来的是一个String,需要分割或者正则
          val reg = "\([a-zA-Z]+[0-9]{1,2},([0-9]+)\),\([a-zA-Z]+[0-9]{1,2},([0-9]+)\)".r
          var arr: ListBuffer[(String, String)] = ListBuffer[(String, String)]()
          for (patternMatch <- reg.findAllMatchIn(line)) {
            arr.append((patternMatch.group(1),patternMatch.group(2)))
          }
          val tuple = arr.toList.head
          ((tuple._1.toLong,tuple._2.toLong),1)
          // 或者可以维持ListBuffer,然后map=>x._1
        }
        )
          .reduceByKey(_+_)
          .map(x=>Edge(x._1._1.toLong,x._1._2.toLong,x._2))
         // .foreach(println)
        // 网络红人:inDegrees大
        val graph = Graph(vertices, edge)
        // inDegrees => VertexRDD[VertexId,Int(有多少个入度)]
        val id = graph.inDegrees
          // local[*] 开启不同分区
          // sort by 部分排序
          // 现在全部合成一个 sortBy就行了
        //  .repartition(1).sortBy( - _._2).take(1)(0)._1
        // .sortBy(_._2,false,1).take(1)(0)._1
        //  .sortBy( - _._2).collect.take(1)(0)._1
        // 先局部有序,再分区效率高
        //  .map(x=>(x._2,x._1)).sortByKey(false,1).take(1)(0)._1
    
        vertices.filter(f=>f._1==id)
          //.foreach(println)
    
        spark.stop()
    
      }
    
    }

    涉及sortBy的分区排序方法:见 https://www.cnblogs.com/sabertobih/p/13792372.html

  • 相关阅读:
    request-log-analyzer日志分析
    ubuntu下git输出的颜色变化
    vundle安装 给vim插上翅膀
    安装ruby
    【HDU1944】S-Nim-博弈论:SG函数
    【HDU1944】S-Nim-博弈论:SG函数
    我对SG函数的理解
    我对SG函数的理解
    【POJ2154】Color-Polya定理+欧拉函数
    【POJ2154】Color-Polya定理+欧拉函数
  • 原文地址:https://www.cnblogs.com/sabertobih/p/13792365.html
Copyright © 2011-2022 走看看