一、数据:user和各自粉丝的关系
((User47,86566510),(User83,15647839))
((User47,86566510),(User83,15647839))
((User47,86566510),(User83,15647839))
((User47,86566510),(User83,15647839))
((User47,86566510),(User83,15647839))
((User47,86566510),(User83,15647839))
((User47,86566510),(User83,15647839))
((User47,86566510),(User83,15647839))
((User47,86566510),(User42,197134784))
((User89,74286565),(User49,19315174))
((User89,74286565),(User49,19315174))
((User89,74286565),(User49,19315174))
((User89,74286565),(User49,19315174))
((User89,74286565),(User49,19315174))
((User89,74286565),(User49,19315174))
((User89,74286565),(User49,19315174))
((User16,22679419),(User69,45705189))
((User16,22679419),(User69,45705189))
((User16,22679419),(User69,45705189))
((User16,22679419),(User69,45705189))
((User16,22679419),(User69,45705189))
((User16,22679419),(User69,45705189))
((User16,22679419),(User69,45705189))
((User37,14559570),(User64,24742040))
((User31,63644892),(User10,123004655))
((User10,123004655),(User50,17613979))
((User37,14559570),(User11,14269220))
((User78,3365291),(User30,93905958))
((User14,199097645),(User60,16547411))
((User3,14874480),(User42,197134784))
((User40,813286),(User9,15434432))
((User10,123004655),(User34,10211502))
((User90,34870269),(User53,25566593))
((User12,24741956),(User60,16547411))
((User12,24741956),(User5,18927441))
((User37,14559570),(User39,22831657))
((User89,74286565),(User32,15670515))
((User89,74286565),(User32,15670515))
((User89,74286565),(User32,15670515))
((User89,74286565),(User32,15670515))
((User89,74286565),(User32,15670515))
((User89,74286565),(User32,15670515))
((User80,520835497),(User94,15913))
((User80,520835497),(User94,15913))
((User80,520835497),(User94,15913))
((User80,520835497),(User94,15913))
((User80,520835497),(User94,15913))
((User80,520835497),(User94,15913))
((User28,7465732),(User65,90569268))
((User89,74286565),(User9,15434432))
((User26,16112634),(User71,20065583))
((User80,520835497),(User94,15913))
((User75,15919138),(User94,15913))
((User1,849131),(User86,101760242))
((User36,59804598),(User79,30582362))
((User42,197134784),(User4,34236703))
((User48,63433165),(User25,20397258))
((User72,18859819),(User5,18927441))
((User47,86566510),(User24,183967095))
((User9,15434432),(User32,15670515))
((User85,38521400),(User74,811377))
((User28,7465732),(User78,3365291))
((User36,59804598),(User68,29758446))
((User41,15959795),(User36,59804598))
((User88,90961424),(User64,24742040))
((User13,16409781),(User87,24741685))
((User93,174347580),(User92,460486951))
((User93,174347580),(User77,54959836))
((User93,174347580),(User77,54959836))
二、
import org.apache.spark.graphx.{Edge, Graph} import org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer object Celebrity { /** * 思路 => * 拿顶点:读文件,压平,distinct * 拿边:ID,ID,1 => 计数权重 * 拿到第一indegrees的id,通过vertices.filter拿到顶点属性 * @param args */ def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("app").master("local[*]").getOrCreate() val sc = spark.sparkContext val rdd = sc.textFile("src/main/resources/Celebrity/twitter_graph_data.txt") // 拿顶点 val vertices = rdd // ((User47,86566510),(User83,15647839)) .flatMap(line => { val reg = "\(([a-zA-Z]+[0-9]{1,2}),([0-9]+)\)".r var arr: ListBuffer[(Long, String)] = ListBuffer[(Long, String)]() for (patternMatch <- reg.findAllMatchIn(line)) { arr.append((patternMatch.group(2).toLong, patternMatch.group(1))) } arr }).distinct() //.foreach(println) // 拿边:((User47,86566510),(User83,15647839)) => (User47,User83,1) val edge = rdd.map(line=>{ // 拿出来的是一个String,需要分割或者正则 val reg = "\([a-zA-Z]+[0-9]{1,2},([0-9]+)\),\([a-zA-Z]+[0-9]{1,2},([0-9]+)\)".r var arr: ListBuffer[(String, String)] = ListBuffer[(String, String)]() for (patternMatch <- reg.findAllMatchIn(line)) { arr.append((patternMatch.group(1),patternMatch.group(2))) } val tuple = arr.toList.head ((tuple._1.toLong,tuple._2.toLong),1) // 或者可以维持ListBuffer,然后map=>x._1 } ) .reduceByKey(_+_) .map(x=>Edge(x._1._1.toLong,x._1._2.toLong,x._2)) // .foreach(println) // 网络红人:inDegrees大 val graph = Graph(vertices, edge) // inDegrees => VertexRDD[VertexId,Int(有多少个入度)] val id = graph.inDegrees // local[*] 开启不同分区 // sort by 部分排序 // 现在全部合成一个 sortBy就行了 // .repartition(1).sortBy( - _._2).take(1)(0)._1 // .sortBy(_._2,false,1).take(1)(0)._1 // .sortBy( - _._2).collect.take(1)(0)._1 // 先局部有序,再分区效率高 // .map(x=>(x._2,x._1)).sortByKey(false,1).take(1)(0)._1 vertices.filter(f=>f._1==id) //.foreach(println) spark.stop() } }
涉及sortBy的分区排序方法:见 https://www.cnblogs.com/sabertobih/p/13792372.html