zoukankan      html  css  js  c++  java
  • spark Graphx 之 应用: 谁是网络红人?

    一、数据:user和各自粉丝的关系

    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User42,197134784))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User37,14559570),(User64,24742040))
    ((User31,63644892),(User10,123004655))
    ((User10,123004655),(User50,17613979))
    ((User37,14559570),(User11,14269220))
    ((User78,3365291),(User30,93905958))
    ((User14,199097645),(User60,16547411))
    ((User3,14874480),(User42,197134784))
    ((User40,813286),(User9,15434432))
    ((User10,123004655),(User34,10211502))
    ((User90,34870269),(User53,25566593))
    ((User12,24741956),(User60,16547411))
    ((User12,24741956),(User5,18927441))
    ((User37,14559570),(User39,22831657))
    ((User89,74286565),(User32,15670515))
    ((User89,74286565),(User32,15670515))
    ((User89,74286565),(User32,15670515))
    ((User89,74286565),(User32,15670515))
    ((User89,74286565),(User32,15670515))
    ((User89,74286565),(User32,15670515))
    ((User80,520835497),(User94,15913))
    ((User80,520835497),(User94,15913))
    ((User80,520835497),(User94,15913))
    ((User80,520835497),(User94,15913))
    ((User80,520835497),(User94,15913))
    ((User80,520835497),(User94,15913))
    ((User28,7465732),(User65,90569268))
    ((User89,74286565),(User9,15434432))
    ((User26,16112634),(User71,20065583))
    ((User80,520835497),(User94,15913))
    ((User75,15919138),(User94,15913))
    ((User1,849131),(User86,101760242))
    ((User36,59804598),(User79,30582362))
    ((User42,197134784),(User4,34236703))
    ((User48,63433165),(User25,20397258))
    ((User72,18859819),(User5,18927441))
    ((User47,86566510),(User24,183967095))
    ((User9,15434432),(User32,15670515))
    ((User85,38521400),(User74,811377))
    ((User28,7465732),(User78,3365291))
    ((User36,59804598),(User68,29758446))
    ((User41,15959795),(User36,59804598))
    ((User88,90961424),(User64,24742040))
    ((User13,16409781),(User87,24741685))
    ((User93,174347580),(User92,460486951))
    ((User93,174347580),(User77,54959836))
    ((User93,174347580),(User77,54959836))

    二、

    import org.apache.spark.graphx.{Edge, Graph}
    import org.apache.spark.sql.SparkSession
    
    import scala.collection.mutable.ListBuffer
    
    object Celebrity {
    
      /**
       * 思路 =>
       * 拿顶点:读文件,压平,distinct
       * 拿边:ID,ID,1 => 计数权重
       * 拿到第一indegrees的id,通过vertices.filter拿到顶点属性
       * @param args
       */
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("app").master("local[*]").getOrCreate()
        val sc = spark.sparkContext
        val rdd = sc.textFile("src/main/resources/Celebrity/twitter_graph_data.txt")
        // 拿顶点
        val vertices = rdd
          // ((User47,86566510),(User83,15647839))
          .flatMap(line => {
            val reg = "\(([a-zA-Z]+[0-9]{1,2}),([0-9]+)\)".r
            var arr: ListBuffer[(Long, String)] = ListBuffer[(Long, String)]()
            for (patternMatch <- reg.findAllMatchIn(line)) {
              arr.append((patternMatch.group(2).toLong, patternMatch.group(1)))
            }
            arr
          }).distinct()
        //.foreach(println)
        // 拿边:((User47,86566510),(User83,15647839)) => (User47,User83,1)
        val edge = rdd.map(line=>{
          // 拿出来的是一个String,需要分割或者正则
          val reg = "\([a-zA-Z]+[0-9]{1,2},([0-9]+)\),\([a-zA-Z]+[0-9]{1,2},([0-9]+)\)".r
          var arr: ListBuffer[(String, String)] = ListBuffer[(String, String)]()
          for (patternMatch <- reg.findAllMatchIn(line)) {
            arr.append((patternMatch.group(1),patternMatch.group(2)))
          }
          val tuple = arr.toList.head
          ((tuple._1.toLong,tuple._2.toLong),1)
          // 或者可以维持ListBuffer,然后map=>x._1
        }
        )
          .reduceByKey(_+_)
          .map(x=>Edge(x._1._1.toLong,x._1._2.toLong,x._2))
         // .foreach(println)
        // 网络红人:inDegrees大
        val graph = Graph(vertices, edge)
        // inDegrees => VertexRDD[VertexId,Int(有多少个入度)]
        val id = graph.inDegrees
          // local[*] 开启不同分区
          // sort by 部分排序
          // 现在全部合成一个 sortBy就行了
        //  .repartition(1).sortBy( - _._2).take(1)(0)._1
        // .sortBy(_._2,false,1).take(1)(0)._1
        //  .sortBy( - _._2).collect.take(1)(0)._1
        // 先局部有序,再分区效率高
        //  .map(x=>(x._2,x._1)).sortByKey(false,1).take(1)(0)._1
    
        vertices.filter(f=>f._1==id)
          //.foreach(println)
    
        spark.stop()
    
      }
    
    }

    涉及sortBy的分区排序方法:见 https://www.cnblogs.com/sabertobih/p/13792372.html

  • 相关阅读:
    换个角度看Salesforce之基础配置学习笔记(二)
    换个角度看Salesforce之基础配置学习笔记(一)
    C# LINQ学习笔记
    Oracle使用总结
    UML图及Visio 2010使用总结
    常见的DOS命令
    ansible笔记
    jsoncpp1.9.4源码解析
    fabric链码
    fabric数据结构
  • 原文地址:https://www.cnblogs.com/sabertobih/p/13792365.html
Copyright © 2011-2022 走看看