zoukankan      html  css  js  c++  java
  • 在集群中使用文件加载graph

    从hdfs上加载文件并创建graph

    scala> var graphs = GraphLoader.edgeListFile(sc,"/tmp/dataTest/graphTest.txt")
    graphs: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@ab5670d
     可以看到只有一个task,也就是说,他的默认task数量默认就是1,我手动设置一下
    scala> val graphs = GraphLoader.edgeListFile(sc, "/tmp/dataTest/graphTest.txt",numEdgePartitions=4)
    graphs: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@409ea4d1
     这时显示就是4个task
     
    查看前10个vertices和edge(vertices和edge的属性值默认会是1)

     

    我来对vertices的值进行修改
    scala> var verttmp = graphs.mapVertices((id,attr) => attr*2)
    verttmp: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@25d7eb44
    scala> verttmp.vertices.take(10)
    17/02/26 08:17:21 WARN executor.Executor: 1 block locks were not released by TID = 21:
    [rdd_37_0]
    17/02/26 08:17:22 WARN executor.Executor: 1 block locks were not released by TID = 22:
    [rdd_37_1]
    res4: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((200000000072,2), (2000032300072,2), (45200000072,2), (2008680000072,2), (200544530000072,2), (2000042300072,2), (20006876000072,2), (20000757000072,2), (2000423000072,2), (2000034200072,2))
    也可以使用这个方式,这个方式更优化一些
    scala> var verttmp = graphs.mapVertices((_,attr) => attr*3)
    verttmp: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@76828ce4
     
    修改edge的属性值
    scala> var edgetmp=graphs.mapEdges(e => e.attr*2)
    edgetmp: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@42ce3be7
    scala> edgetmp.edges.take(10)
    17/02/26 08:28:23 WARN executor.Executor: 1 block locks were not released by TID = 28:
    [rdd_26_0]
    17/02/26 08:28:23 WARN executor.Executor: 1 block locks were not released by TID = 29:
    [rdd_26_1]
    res6: Array[org.apache.spark.graphx.Edge[Int]] = Array(Edge(45200000072,2000002320072,2), Edge(200000000072,200000000072,2), Edge(200000000072,200000000072,2), Edge(2000042300072,200000000072,2), Edge(2000423000072,200000000072,2), Edge(2042300000072,2000004340072,2), Edge(2000043420000072,200000000072,2), Edge(200000000072,200000000072,2), Edge(200000000072,200000000072,2), Edge(200000000072,2000034200072,2))
     
    修改triplets的属性值(要求是:将srcAttr修改为以前的2倍,dstAttr修改为以前的3倍)
    scala> var triptmp = graphs.mapTriplets(t => t.srcAttr*2 + t.dstAttr*3)
    triptmp: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@318ec664
    scala> triptmp.triplets.take(10)
    [Stage 25:> (0 + 0) / 3]17/02/26 08:34:01 WARN executor.Executor: 1 block locks were not released by TID = 33:
    [rdd_26_0]
    17/02/26 08:34:01 WARN executor.Executor: 1 block locks were not released by TID = 34:
    [rdd_26_1]
    res7: Array[org.apache.spark.graphx.EdgeTriplet[Int,Int]] = Array(((45200000072,1),(2000002320072,1),5), ((200000000072,1),(200000000072,1),5), ((200000000072,1),(200000000072,1),5), ((2000042300072,1),(200000000072,1),5), ((2000423000072,1),(200000000072,1),5), ((2042300000072,1),(2000004340072,1),5), ((2000043420000072,1),(200000000072,1),5), ((200000000072,1),(200000000072,1),5), ((200000000072,1),(200000000072,1),5), ((200000000072,1),(2000034200072,1),5))
     
     
    structural operators的操作有以下几种函数
    class Graph[VD, ED] {
    def reverse: Graph[VD, ED]
    def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
    vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
    def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
    def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
    }
     
    subgraph操作
    def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
    vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
    //改函数返回的graph是满足一个boolean条件的graph
    //vd就是verticesRdd,包含vertexId和attr vpred:(vertexId,(vertexId,attr))
    subgraph大数多应用场景:限制图的顶点和边,消除失效的链接
    scala> var subg = graphs.subgraph(epred = e =>e.srcId>e.dstId)
    subg: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@51483f93
    查看结果
    scala> subg.edges.take(10)
    res12: Array[org.apache.spark.graphx.Edge[Int]] = Array(
    Edge(2000042300072,200000000072,1), 
    Edge(2000423000072,200000000072,1), 
    Edge(2042300000072,2000004340072,1), 
    Edge(2000043420000072,200000000072,1), 
    Edge(2008680000072,200000000072,1), 
    Edge(20006876000072,200000000072,1), 
    Edge(200008670000072,200000000072,1), 
    Edge(20000342300072,200000000072,1), 
    Edge(20000757000072,2000032300072,1), 
    Edge(20453400000072,200000000072,1))
    查看subgraph的vertices和edge
    scala> subg.vertices.count
    res11: Long = 18
    scala> subg.edges.count
    res13: Long = 10
    查看原来的graphs的vertices和edge
    scala> graphs.vertices.count
    res9: Long = 18
    scala> graphs.edges.count
    res10: Long = 21
     
    Degrees 有(indegrees,outdegrees,Degrees)
     
    indegrees:就是srcID到dstId的度数 ,自我理解就是条数
    scala> graphs.inDegrees
    res15: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((200000000072,15), 
    (2000032300072,1), (200544530000072,1), (2000034200072,1), (400000000072,1), (2000002320072,1),
    (2000004340072,1))
    outdegrees:就是dstId到srcId的度数
    scala> graphs.outDegrees.collect
    [Stage 62:>(0 + 0) / 3]17/02/26 11:27:57 WARN executor.Executor: 1
    res18: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((200000000072,10), (45200000072,1),
    (2008680000072,1), (2000042300072,1), (20006876000072,1), (20000757000072,1), (2000423000072,1), 
    (20000342300072,1), (2042300000072,1), (20453400000072,1), (2000043420000072,1), (200008670000072,1))
    degrees:总度数
     
    查出最大的出度,入度,总度数
    创建函数
    scala> def max(a:(VertexId,Int),b:(VertexId,Int))={if(a._2>b._2) a else b }
    max: (a: (org.apache.spark.graphx.VertexId, Int), b: (org.apache.spark.graphx.VertexId, Int))
    (org.apache.spark.graphx.VertexId, Int)
    inDdgrees
    scala> graphs.inDegrees.reduce(max)
    17/02/27 04:46:31 WARN executor.Executor: 1 block locks were not released by TID = 19:
    [rdd_14_0]
    res35: (org.apache.spark.graphx.VertexId, Int) = (1,2)
     
    scala> graphs.outDegrees.reduce(max)
    17/02/27 04:48:47 WARN executor.Executor: 1 block locks were not released by TID = 20:
    [rdd_14_0]
    res36: (org.apache.spark.graphx.VertexId, Int) = (2,1)
     
    scala> graphs.degrees.reduce(max)
    17/02/27 04:49:48 WARN executor.Executor: 1 block locks were not released by TID = 22:
    [rdd_14_0]
    res38: (org.apache.spark.graphx.VertexId, Int) = (1,3)
    joinVertices:将各个顶点改为他的入度 
    outerJoinVertices:将各个顶点改为他的出度
    将graphs中所有的vertexId的属性都设置为0
    scala> var rawG=graphs.mapVertices((id,attr) => 0)
    rawG: org.apache.spark.graphx.Graph[Int,String] = org.apache.spark.graphx.impl.GraphImpl@43d06473
    查看结果
    scala> rawG.vertices.collect
    res47: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,0), (1,0), (3,0), (2,0))
    获取rwaG的inDegrees数据集
    scala> var ind=rawG.inDegrees;
    ind: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[68] at RDD at VertexRDD.scala:57
    查看结果
    scala> ind.collect
    17/02/27 05:05:55 WARN executor.Executor: 1 block locks were not released by TID = 28:
    [rdd_60_0]
    res49: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,1), (1,2), (3,1))
    使用joinVertices
    scala> var temp=rawG.joinVertices[Int](ind)((_,_,optdeg) => optdeg)
    temp: org.apache.spark.graphx.Graph[Int,String] = org.apache.spark.graphx.impl.GraphImpl@af0e7ce
    查看结果
    scala> temp.vertices.take(10);
    17/02/27 05:12:21 WARN executor.Executor: 2 block locks were not released by TID = 29:
    [rdd_60_0, rdd_77_0]
    res51: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,1), (1,2), (3,1), (2,0))
    joinVertices从字面上看就是把两个数据集根据vertexId合并,集合的属性用右边的vertices,最后一个属性是0,是因为主的数据集没有vertexId与辅的对应,
     
    outerJoinVertices
     
     
    aggregateMessages
  • 相关阅读:
    自定义中间件
    ASP.NET Core后台任务
    Hosted Services+Quartz实现定时任务调度
    .NET Core 中的路径问题
    js Worker 线程
    postMessage解决跨域跨窗口消息传递
    CentOS搭建KMS服务器
    CentOS安装最新Git
    Linux访问https报错
    EntityFramework Core几个基本命令的使用
  • 原文地址:https://www.cnblogs.com/zhangXingSheng/p/6606979.html
Copyright © 2011-2022 走看看