https://www.bookstack.cn/read/spark-graphx-source-analysis/vertex-edge-triple.md
一、基本操作
1,当顶点和边的属性全都具备,直接构建Graph
import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.sql.SparkSession
case class Users(userid:Long,username:String,age:Int)
object MyGraphx01 {
def main(args: Array[String]): Unit = {
// 读所有的点和图
val spark = SparkSession.builder().appName("app").master("local[2]").getOrCreate();
val edge = spark.sparkContext.textFile("file:///D:/idea/ideaProjects/spark_projects/myspark_graphx/src/main/resources/graphx01/e.txt")
val vects = spark.sparkContext.textFile("file:///D:/idea/ideaProjects/spark_projects/myspark_graphx/src/main/resources/graphx01/v.txt")
// 所有的点构成1个 ((1l,users),(2L,users))
val vectorSeq = vects.map(x => {
val arr = x.split(",")
val key = arr(0).toLong
(key, Users(arr(0).toLong, arr(1), arr(2).toInt))
})
// 所有的边构成 ((1L,2L,同事),(2l,3l,同学))
val edgeSeq = edge.map(x => {
val arr = x.split(",")
Edge(arr(0).toLong, arr(1).toLong, arr(2))
})
// 所有的点边构成图结构
val graph = Graph(vectorSeq, edgeSeq).cache()
//.foreach(println)
spark.stop()
}
}
2,当顶点和边的属性不需要时(默认为1)
// 另一种构建Graph的方法: GraphLoader.edgeListFile
// 适用于不知道点和边的属性(默认为1[INT]),只有边 - 边的情况
// 格式必须是 1空格2
val graph = GraphLoader.edgeListFile(sc, "src/main/resources/graphx01/test02.txt").cache()
graph.edges.collect().foreach(println(_))
graph.vertices.collect().foreach(println(_))
二、应用
import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object MyLove {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[2]").appName("app").getOrCreate();
val sc = spark.sparkContext
// 创建所有的点
val points = Seq((1L,("Alice",28)),(2L,("bob",43)),(3L,("charlie",64)),(4L,("David",42)),(5L,("Ed",53)),(6L,("Fran",50)))
// 所有的边(关系和权重)
val eds = Seq(Edge(2L,1L,7),Edge(2L,4L,2),Edge(4L,1L,1),Edge(5L,2L,2),Edge(5L,3L,8)
,Edge(5L,6L,3),Edge(3L,2L,4),Edge(3L,6L,3))
val edges = sc.makeRDD(eds);
val vectices:RDD[(Long, (String, Int))] = sc.makeRDD(points)
// 创建图
val graph = Graph(vectices, edges)
// 找到大于30岁的人
graph.vertices.filter(x=>x._2._2>=60)
.foreach(println)
// 找真爱,打电话次数大于5次
// triplets => 包含顶点和边的属性:((srcId,srcAttr),(dstId,dstAttr),attr)
// srcAttr/dstAttr => 顶点属性
// attr => 权重
graph.triplets
.foreach(x=>println(x.srcAttr+"==>"+x.dstAttr+"=="+x.attr))
graph.triplets.filter(_.attr>5)
.foreach(x=>println(x.srcAttr._1,x.dstAttr._1))
// inDegrees => VertexRDD[VertexId,Int(有多少个入度)]
graph.inDegrees
.foreach(println(_))
println("====outdegrees====")
graph.outDegrees
.foreach(println(_))
println(graph.numVertices,graph.numEdges,graph.inDegrees,graph.outDegrees)
// 算子:增加权重
graph.mapEdges(x=>x.attr+2).triplets.filter(x=>x.attr>5).foreach(x=>println(x.srcAttr._1,x.dstAttr._1))
}
}
三、图的算子
1)属性算子 —— 只改变属性
2)结构算子 —— 对图结构改变
PS: EdgeTriplet[VD,ED]的属性接口有:
3)JOIN算子 —— 用外部RDD修改顶点属性:
joinVertices && outerjoinVertices
// joinVertices: 只能修改顶点属性的值,不能修改属性的类型
// 顶点的属性 VD: ("Alice",28)
// U: newVal
// newVD: ("ALICE"+@+newVal,VD._2)
// (RDD[(VertexId, U))(mapFunc: (VertexId, 顶点的属性VD, 新值U) => 顶点的属性VD2)
// 满足条件的处理,不满足不处理
graph.joinVertices(newPoints)((id,old,newval)=>(old._1+"@"+newval,old._2)).vertices
//.foreach(println(_))
// 满足条件的处理;不满足条件的另一种处理方法
graph.outerJoinVertices(newPoints)((id,old,newval)=>(old._1+"@"+newval.getOrElse("XXX"),old._2)).vertices
.foreach(println(_))
<= 结果对比 =>