zoukankan      html  css  js  c++  java
  • spark Graphx 之 应用: 谁是网络红人?

    一、数据:user和各自粉丝的关系

    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User83,15647839))
    ((User47,86566510),(User42,197134784))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User89,74286565),(User49,19315174))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User16,22679419),(User69,45705189))
    ((User37,14559570),(User64,24742040))
    ((User31,63644892),(User10,123004655))
    ((User10,123004655),(User50,17613979))
    ((User37,14559570),(User11,14269220))
    ((User78,3365291),(User30,93905958))
    ((User14,199097645),(User60,16547411))
    ((User3,14874480),(User42,197134784))
    ((User40,813286),(User9,15434432))
    ((User10,123004655),(User34,10211502))
    ((User90,34870269),(User53,25566593))
    ((User12,24741956),(User60,16547411))
    ((User12,24741956),(User5,18927441))
    ((User37,14559570),(User39,22831657))
    ((User89,74286565),(User32,15670515))
    ((User89,74286565),(User32,15670515))
    ((User89,74286565),(User32,15670515))
    ((User89,74286565),(User32,15670515))
    ((User89,74286565),(User32,15670515))
    ((User89,74286565),(User32,15670515))
    ((User80,520835497),(User94,15913))
    ((User80,520835497),(User94,15913))
    ((User80,520835497),(User94,15913))
    ((User80,520835497),(User94,15913))
    ((User80,520835497),(User94,15913))
    ((User80,520835497),(User94,15913))
    ((User28,7465732),(User65,90569268))
    ((User89,74286565),(User9,15434432))
    ((User26,16112634),(User71,20065583))
    ((User80,520835497),(User94,15913))
    ((User75,15919138),(User94,15913))
    ((User1,849131),(User86,101760242))
    ((User36,59804598),(User79,30582362))
    ((User42,197134784),(User4,34236703))
    ((User48,63433165),(User25,20397258))
    ((User72,18859819),(User5,18927441))
    ((User47,86566510),(User24,183967095))
    ((User9,15434432),(User32,15670515))
    ((User85,38521400),(User74,811377))
    ((User28,7465732),(User78,3365291))
    ((User36,59804598),(User68,29758446))
    ((User41,15959795),(User36,59804598))
    ((User88,90961424),(User64,24742040))
    ((User13,16409781),(User87,24741685))
    ((User93,174347580),(User92,460486951))
    ((User93,174347580),(User77,54959836))
    ((User93,174347580),(User77,54959836))

    二、

    import org.apache.spark.graphx.{Edge, Graph}
    import org.apache.spark.sql.SparkSession
    
    import scala.collection.mutable.ListBuffer
    
    object Celebrity {
    
      /**
       * 思路 =>
       * 拿顶点:读文件,压平,distinct
       * 拿边:ID,ID,1 => 计数权重
       * 拿到第一indegrees的id,通过vertices.filter拿到顶点属性
       * @param args
       */
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("app").master("local[*]").getOrCreate()
        val sc = spark.sparkContext
        val rdd = sc.textFile("src/main/resources/Celebrity/twitter_graph_data.txt")
        // 拿顶点
        val vertices = rdd
          // ((User47,86566510),(User83,15647839))
          .flatMap(line => {
            val reg = "\(([a-zA-Z]+[0-9]{1,2}),([0-9]+)\)".r
            var arr: ListBuffer[(Long, String)] = ListBuffer[(Long, String)]()
            for (patternMatch <- reg.findAllMatchIn(line)) {
              arr.append((patternMatch.group(2).toLong, patternMatch.group(1)))
            }
            arr
          }).distinct()
        //.foreach(println)
        // 拿边:((User47,86566510),(User83,15647839)) => (User47,User83,1)
        val edge = rdd.map(line=>{
          // 拿出来的是一个String,需要分割或者正则
          val reg = "\([a-zA-Z]+[0-9]{1,2},([0-9]+)\),\([a-zA-Z]+[0-9]{1,2},([0-9]+)\)".r
          var arr: ListBuffer[(String, String)] = ListBuffer[(String, String)]()
          for (patternMatch <- reg.findAllMatchIn(line)) {
            arr.append((patternMatch.group(1),patternMatch.group(2)))
          }
          val tuple = arr.toList.head
          ((tuple._1.toLong,tuple._2.toLong),1)
          // 或者可以维持ListBuffer,然后map=>x._1
        }
        )
          .reduceByKey(_+_)
          .map(x=>Edge(x._1._1.toLong,x._1._2.toLong,x._2))
         // .foreach(println)
        // 网络红人:inDegrees大
        val graph = Graph(vertices, edge)
        // inDegrees => VertexRDD[VertexId,Int(有多少个入度)]
        val id = graph.inDegrees
          // local[*] 开启不同分区
          // sort by 部分排序
          // 现在全部合成一个 sortBy就行了
        //  .repartition(1).sortBy( - _._2).take(1)(0)._1
        // .sortBy(_._2,false,1).take(1)(0)._1
        //  .sortBy( - _._2).collect.take(1)(0)._1
        // 先局部有序,再分区效率高
        //  .map(x=>(x._2,x._1)).sortByKey(false,1).take(1)(0)._1
    
        vertices.filter(f=>f._1==id)
          //.foreach(println)
    
        spark.stop()
    
      }
    
    }

    涉及sortBy的分区排序方法:见 https://www.cnblogs.com/sabertobih/p/13792372.html

  • 相关阅读:
    Word批量转PDF(内容转图片,防复制文字)
    Word批量转PDF或者图片(实现方式二)
    Word批量生成软件(实现方式三)
    合同批量生成软件/工具(实现方式三)
    Word批量打印软件/工具
    Word文件批量查找替换字符串
    Word批量生成软件(实现方式二)
    Word批量生成软件
    合同批量生成软件/工具(实现方式二)
    MySQL处理大量数据的效率问题
  • 原文地址:https://www.cnblogs.com/sabertobih/p/13792365.html
Copyright © 2011-2022 走看看