package Spark_GraphX import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object 属性图 { def main(args: Array[String]): Unit = { val conf=new SparkConf().setAppName("SimpleGraphX").setMaster("local[4]") val sc=new SparkContext(conf) //定义顶点 val users:RDD[(VertexId,(String,String))]=sc.parallelize(Array((3L,("soyo","student")),(7L,("soyo2","postdoc")),(5L,("xiaozhou","professor")),(2L,("xiaocui","professor")))) //定义边 val relationships:RDD[Edge[String]]=sc.parallelize(Array(Edge(3L,7L,"collab"),Edge(5L,3L,"advisor"),Edge(2L,5L,"colleague"),Edge(5L,7L,"parent"))) //定义默认的作者,以防与不存在的作者有边 val defaultUser=("Jone","Dance") val graph=Graph(users,relationships,defaultUser) println("*****************") println("找到图中属性是student的点") graph.vertices.filter{case (id,(name,occupation))=>occupation=="student"}.collect.foreach{case(id,(name,occupation))=>println(s"$name is $occupation")} println("--------------------------") println("找到途中边的属性是advisor的边") graph.edges.filter(x=>x.attr=="advisor").collect().foreach(x=>println(s"${x.srcId} to ${x.dstId} 属性为 ${x.attr}")) println("--------------------------") println("找到图中的最大出度,入度,度数") println("最大的出度:"+graph.outDegrees.reduce(max)) println("最大的入度:"+graph.inDegrees.reduce(max)) println("最大的度数:"+graph.degrees.reduce(max)) //Scala 可直接调用Java程序 // System.out.print("hello word") //属性操作 println("------------------------") println("给图中每个顶点的职业属性上加上“spark字符串") graph.mapVertices{case (id,(name,occupation))=>(id,(name,occupation+"Spark"))}.vertices.collect.foreach(x=>println(s"${x._2._1} is ${x._2._2} : ${x._1} : ${x._2}")) println("------------------------") println("给途中每个元组的Edge的属性值设置为源顶点属性值+边的属性值+目标定点属性值:") graph.mapTriplets(x=>x.srcAttr._2+"+"+x.attr+"+"+x.dstAttr._2).edges.collect().foreach(println) //可以证明:属性操作下,图的结构都不受影响. graph.mapTriplets(x=>x.srcId+x.dstId).edges.collect().foreach(println) //结构操作 :triplets(表示边) /* reverse操作返回一个所有边方向取反的新图.该反转操作并没有修改图中顶点,边的属性,更没有增加边的数量. subgraph操作主要利用顶点和边进行判断,返回的新图中包含满足判断要求的顶点,边.该操作常用于一些情景,比如:限制感兴趣的图顶点和边,删除损坏连接. */ println("------结构操作---------") graph.triplets.map(x=>x.srcAttr._1+" is the "+x.attr+" of "+x.dstAttr._1).foreach(println) println("-------删除职业是postdoc的节点,构建子图----------") val validGraph=graph.subgraph(vpred=(id,attr)=>attr._2!="postdoc") validGraph.vertices.foreach(println) validGraph.triplets.map(x=>x.srcAttr._1+" is the "+x.attr+" of "+x.dstAttr._1).foreach(println) println("----------构建职业是professor的子图,并打印子图的顶点--------") val subGraph=graph.subgraph(vpred = (id,attr)=>attr._2=="professor") subGraph.vertices.collect().foreach(x=>println(s"${x._2._1} is ${x._2._2}")) } //VertexId:顶点,Int:度数 def max(a:(VertexId,Int),b:(VertexId,Int)):(VertexId,Int)={ if(a._2>b._2)a else b } }
结果:
***************** 找到图中属性是student的点 soyo is student -------------------------- 找到途中边的属性是advisor的边 5 to 3 属性为 advisor -------------------------- 找到图中的最大出度,入度,度数 最大的出度:(5,2) 最大的入度:(7,2) 最大的度数:(5,3) ------------------------ 给图中每个顶点的职业属性上加上“spark字符串 5 is (xiaozhou,professorSpark) : 5 : (5,(xiaozhou,professorSpark)) 2 is (xiaocui,professorSpark) : 2 : (2,(xiaocui,professorSpark)) 3 is (soyo,studentSpark) : 3 : (3,(soyo,studentSpark)) 7 is (soyo2,postdocSpark) : 7 : (7,(soyo2,postdocSpark)) ------------------------ 给途中每个元组的Edge的属性值设置为源顶点属性值+边的属性值+目标定点属性值: Edge(3,7,student+collab+postdoc) Edge(5,3,professor+advisor+student) Edge(2,5,professor+colleague+professor) Edge(5,7,professor+parent+postdoc) Edge(3,7,10) Edge(5,3,8) Edge(2,5,7) Edge(5,7,12) ------结构操作--------- xiaozhou is the parent of soyo2 soyo is the collab of soyo2 xiaozhou is the advisor of soyo xiaocui is the colleague of xiaozhou -------删除职业是postdoc的节点,构建子图---------- (5,(xiaozhou,professor)) (2,(xiaocui,professor)) (3,(soyo,student)) xiaozhou is the advisor of soyo xiaocui is the colleague of xiaozhou ----------构建职业是professor的子图,并打印子图的顶点-------- xiaozhou is professor xiaocui is professor