package Spark_GraphX import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object 属性图 { def main(args: Array[String]): Unit = { val conf=new SparkConf().setAppName("SimpleGraphX").setMaster("local[2]") val sc=new SparkContext(conf) //定义顶点 val users:RDD[(VertexId,(String,String))]=sc.parallelize(Array((3L,("soyo","student")),(7L,("soyo2","postdoc")),(5L,("xiaozhou","professor")),(2L,("xiaocui","professor")))) //定义边 val relationships:RDD[Edge[String]]=sc.parallelize(Array(Edge(3L,7L,"collab"),Edge(5L,3L,"advisor"),Edge(2L,5L,"colleague"),Edge(5L,7L,"parent"))) //定义默认的作者,以防与不存在的作者有边 val defaultUser=("Jone","Dance") val graph=Graph(users,relationships,defaultUser) println("*****************") println("找到图中属性是student的点") graph.vertices.filter{case (id,(name,occupation))=>occupation=="student"}.collect.foreach{case(id,(name,occupation))=>println(s"$name is $occupation")} println("--------------------------") println("找到途中边的属性是advisor的边") graph.edges.filter(x=>x.attr=="advisor").collect().foreach(x=>println(s"${x.srcId} to ${x.dstId} 属性为 ${x.attr}")) println("--------------------------") println("找到图中的最大出度,入度,度数") println("最大的出度:"+graph.outDegrees.reduce(max)) println("最大的入度:"+graph.inDegrees.reduce(max)) println("最大的度数:"+graph.degrees.reduce(max)) //Scala 可直接调用简单Java程序,集合等操作可以相互转换(使用Scala的JavaConversions方法对象) // System.out.print("hello word") } //VertexId:顶点,Int:度数 def max(a:(VertexId,Int),b:(VertexId,Int)):(VertexId,Int)={ if(a._2>b._2)a else b } }
结果:
*****************
找到图中属性是student的点
soyo is student
--------------------------
找到途中边的属性是advisor的边
5 to 3 属性为 advisor
--------------------------
找到图中的最大出度,入度,度数
最大的出度:(5,2)
最大的入度:(7,2)
最大的度数:(5,3)