从hdfs上加载文件并创建graph
scala> var graphs = GraphLoader.edgeListFile(sc,"/tmp/dataTest/graphTest.txt") graphs: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@ab5670d
可以看到只有一个task,也就是说,他的默认task数量默认就是1,我手动设置一下
scala> val graphs = GraphLoader.edgeListFile(sc, "/tmp/dataTest/graphTest.txt",numEdgePartitions=4) graphs: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@409ea4d1
这时显示就是4个task
查看前10个vertices和edge(vertices和edge的属性值默认会是1)
我来对vertices的值进行修改
scala> var verttmp = graphs.mapVertices((id,attr) => attr*2) verttmp: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@25d7eb44 scala> verttmp.vertices.take(10) 17/02/26 08:17:21 WARN executor.Executor: 1 block locks were not released by TID = 21: [rdd_37_0] 17/02/26 08:17:22 WARN executor.Executor: 1 block locks were not released by TID = 22: [rdd_37_1] res4: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((200000000072,2), (2000032300072,2), (45200000072,2), (2008680000072,2), (200544530000072,2), (2000042300072,2), (20006876000072,2), (20000757000072,2), (2000423000072,2), (2000034200072,2))
也可以使用这个方式,这个方式更优化一些
scala> var verttmp = graphs.mapVertices((_,attr) => attr*3) verttmp: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@76828ce4
修改edge的属性值
scala> var edgetmp=graphs.mapEdges(e => e.attr*2) edgetmp: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@42ce3be7 scala> edgetmp.edges.take(10) 17/02/26 08:28:23 WARN executor.Executor: 1 block locks were not released by TID = 28: [rdd_26_0] 17/02/26 08:28:23 WARN executor.Executor: 1 block locks were not released by TID = 29: [rdd_26_1] res6: Array[org.apache.spark.graphx.Edge[Int]] = Array(Edge(45200000072,2000002320072,2), Edge(200000000072,200000000072,2), Edge(200000000072,200000000072,2), Edge(2000042300072,200000000072,2), Edge(2000423000072,200000000072,2), Edge(2042300000072,2000004340072,2), Edge(2000043420000072,200000000072,2), Edge(200000000072,200000000072,2), Edge(200000000072,200000000072,2), Edge(200000000072,2000034200072,2))
修改triplets的属性值(要求是:将srcAttr修改为以前的2倍,dstAttr修改为以前的3倍)
scala> var triptmp = graphs.mapTriplets(t => t.srcAttr*2 + t.dstAttr*3) triptmp: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@318ec664 scala> triptmp.triplets.take(10) [Stage 25:> (0 + 0) / 3]17/02/26 08:34:01 WARN executor.Executor: 1 block locks were not released by TID = 33: [rdd_26_0] 17/02/26 08:34:01 WARN executor.Executor: 1 block locks were not released by TID = 34: [rdd_26_1] res7: Array[org.apache.spark.graphx.EdgeTriplet[Int,Int]] = Array(((45200000072,1),(2000002320072,1),5), ((200000000072,1),(200000000072,1),5), ((200000000072,1),(200000000072,1),5), ((2000042300072,1),(200000000072,1),5), ((2000423000072,1),(200000000072,1),5), ((2042300000072,1),(2000004340072,1),5), ((2000043420000072,1),(200000000072,1),5), ((200000000072,1),(200000000072,1),5), ((200000000072,1),(200000000072,1),5), ((200000000072,1),(2000034200072,1),5))
structural operators的操作有以下几种函数
class Graph[VD, ED] { def reverse: Graph[VD, ED] def subgraph(epred: EdgeTriplet[VD,ED] => Boolean, vpred: (VertexId, VD) => Boolean): Graph[VD, ED] def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED] }
subgraph操作
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean, vpred: (VertexId, VD) => Boolean): Graph[VD, ED] //改函数返回的graph是满足一个boolean条件的graph //vd就是verticesRdd,包含vertexId和attr vpred:(vertexId,(vertexId,attr))
subgraph大数多应用场景:限制图的顶点和边,消除失效的链接
scala> var subg = graphs.subgraph(epred = e =>e.srcId>e.dstId) subg: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@51483f93
查看结果
scala> subg.edges.take(10) res12: Array[org.apache.spark.graphx.Edge[Int]] = Array( Edge(2000042300072,200000000072,1), Edge(2000423000072,200000000072,1), Edge(2042300000072,2000004340072,1), Edge(2000043420000072,200000000072,1), Edge(2008680000072,200000000072,1), Edge(20006876000072,200000000072,1), Edge(200008670000072,200000000072,1), Edge(20000342300072,200000000072,1), Edge(20000757000072,2000032300072,1), Edge(20453400000072,200000000072,1))
查看subgraph的vertices和edge
scala> subg.vertices.count res11: Long = 18 scala> subg.edges.count res13: Long = 10
查看原来的graphs的vertices和edge
scala> graphs.vertices.count res9: Long = 18 scala> graphs.edges.count res10: Long = 21
Degrees 有(indegrees,outdegrees,Degrees)
indegrees:就是srcID到dstId的度数 ,自我理解就是条数
scala> graphs.inDegrees res15: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((200000000072,15), (2000032300072,1), (200544530000072,1), (2000034200072,1), (400000000072,1), (2000002320072,1), (2000004340072,1))
outdegrees:就是dstId到srcId的度数
scala> graphs.outDegrees.collect [Stage 62:>(0 + 0) / 3]17/02/26 11:27:57 WARN executor.Executor: 1 res18: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((200000000072,10), (45200000072,1), (2008680000072,1), (2000042300072,1), (20006876000072,1), (20000757000072,1), (2000423000072,1), (20000342300072,1), (2042300000072,1), (20453400000072,1), (2000043420000072,1), (200008670000072,1))
degrees:总度数
查出最大的出度,入度,总度数
创建函数
scala> def max(a:(VertexId,Int),b:(VertexId,Int))={if(a._2>b._2) a else b } max: (a: (org.apache.spark.graphx.VertexId, Int), b: (org.apache.spark.graphx.VertexId, Int)) (org.apache.spark.graphx.VertexId, Int)
inDdgrees
scala> graphs.inDegrees.reduce(max) 17/02/27 04:46:31 WARN executor.Executor: 1 block locks were not released by TID = 19: [rdd_14_0] res35: (org.apache.spark.graphx.VertexId, Int) = (1,2) scala> graphs.outDegrees.reduce(max) 17/02/27 04:48:47 WARN executor.Executor: 1 block locks were not released by TID = 20: [rdd_14_0] res36: (org.apache.spark.graphx.VertexId, Int) = (2,1) scala> graphs.degrees.reduce(max) 17/02/27 04:49:48 WARN executor.Executor: 1 block locks were not released by TID = 22: [rdd_14_0] res38: (org.apache.spark.graphx.VertexId, Int) = (1,3)
joinVertices:将各个顶点改为他的入度
outerJoinVertices:将各个顶点改为他的出度
将graphs中所有的vertexId的属性都设置为0
scala> var rawG=graphs.mapVertices((id,attr) => 0) rawG: org.apache.spark.graphx.Graph[Int,String] = org.apache.spark.graphx.impl.GraphImpl@43d06473
查看结果
scala> rawG.vertices.collect res47: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,0), (1,0), (3,0), (2,0))
获取rwaG的inDegrees数据集
scala> var ind=rawG.inDegrees; ind: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[68] at RDD at VertexRDD.scala:57
查看结果
scala> ind.collect 17/02/27 05:05:55 WARN executor.Executor: 1 block locks were not released by TID = 28: [rdd_60_0] res49: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,1), (1,2), (3,1))
使用joinVertices
scala> var temp=rawG.joinVertices[Int](ind)((_,_,optdeg) => optdeg) temp: org.apache.spark.graphx.Graph[Int,String] = org.apache.spark.graphx.impl.GraphImpl@af0e7ce
查看结果
scala> temp.vertices.take(10); 17/02/27 05:12:21 WARN executor.Executor: 2 block locks were not released by TID = 29: [rdd_60_0, rdd_77_0] res51: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,1), (1,2), (3,1), (2,0))
joinVertices从字面上看就是把两个数据集根据vertexId合并,集合的属性用右边的vertices,最后一个属性是0,是因为主的数据集没有vertexId与辅的对应,
outerJoinVertices
aggregateMessages