zoukankan      html  css  js  c++  java
  • Spark GraphX初探

    1. Graphx概念

    针对某些领域,如社交网络、语言建模等,graph-parallel系统可以高效地执行复杂的图形算法,比一般的data-parallel系统更快。

    Graphx是将graph-parallel的data-parallel统一到一个系统中。允许用户将数据当成一个图或一个集合RDD,而简化数据移动或复杂操作。

    2. 属性图

    属性图为有向多重图,带有链接到每个顶点和边的用户定义的对象。有向多重图多个并行的边共享相同源和目的地顶点。每个顶点由一个唯一的64位长的标识符(VertexId)作为key,顶点拥有相同的源和目的顶点标识符。

    属性图通过vertex(VD)和edge(ED)类型参数化,分别与每个顶点和边相关联的对象的类型。某些情况下,相同图形中希望顶点拥有不同属性类型,可通过继承实现。

    class VertexProperty()
    case class UserProperty(val name: String) extends VertexProperty
    case class ProductProperty(val name: String, val price: Double) extends VertexProperty
    
    var grapg: Graph[VertexProperty, String] = null
    View Code

    与RDD类似,属性图是不可变、分布式、容错的。图中的值或结构变化需要生成新的图实现。注意:原始图中的大部分可以在新图中重用,以减少固有功能数据结构成本。

    逻辑上,属性图对应一对类型化的集合RDD,包含了每一个顶点和边属性。

    class Graph[VD, ED]{
        val vertices: VertexRDD[VD]
        val edges: EdgeRDD[ED]
    }

    VertexRDD[VD]和EdgeRDD[ED]分别继承于RDD[(VertexID, VD)]和RDD[Edge[ED]]。

    Graph也包含一个三元组视图,三元组视图逻辑上将顶点和边的属性保存为一个RDD[EdgeTriplet[VD, ED]],EdgeTriplet可通过下图理解。

    EdgeTriplet继承于Edge类,并加入srcAttr和dstAttr成员,分别包含源和目标的属性。

    例:

    import org.apache.spark.graphx.{Edge, Graph, VertexId}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    class GraphTest1 {
    
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setAppName("GraphTest1"))
    
        // 创建顶点信息
        val users: RDD[(VertexId, (String, String))] = sc.parallelize(
          Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))
        )
    
        // 创建图的Edge类,Edge类具有srcId和dstId分别对应与源和目标点的标识符,次明早attr成员存储边属性
        val relationships: RDD[Edge[String]] = sc.parallelize(
          Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"))
        )
    
        // 定义一个默认用户
        val defaultUser = ("John Doe", "Missing")
    
        // 基于Graph对象构造初始化图
        val graph = Graph(users, relationships, defaultUser)
    
        // 统计用户为postdoc的总数
        // graph.vertices返回VertexRDD[(String, String)],继承于RDD[(VertexID, (String, String))]
        graph.vertices.filter{case (id, (name, pos)) => pos == "postdoc"}.count
    
        // 统计src > dst的边总数
        // graph.edges返回Edge[String]对象的EdgeRDD
    //    graph.edges.filter(e => e.srcId > e.dstId).count
        graph.edges.filter{case Edge(srcId, dstId, attr) => srcId > dstId}.count()
    
        // graph.triplets包含的属性有Array(((3,(rxin,student)),(7,(jgonzal,postdoc)),collab))
        val facts = graph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + triplet.dstAttr._1 )
        facts.collect().foreach(println)
    
        sc.stop()
      }
    
    }
    View Code

    3. 图操作符

    (1) 属性操作

    属性图包含操作如下,每个操作都产生一个新图,包含用户自定义map操作修改后的顶点或边的属性。

    a. mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]

    b. mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]

    c. mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]

    注意:每种情况下图结构均不受影响,如上操作的一个重要特征是允许所得图形重用原有图形的结构索引ndices。

    例:

    import org.apache.spark.graphx.{Edge, Graph, VertexId, GraphOps}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    class GraphTest1 {
    
      def main(args: Array[String]): Unit = {
        val sc = new SparkContext(new SparkConf().setAppName("GraphTest1"))
    
        // 创建顶点信息
        val users: RDD[(VertexId, (String, String))] = sc.parallelize(
          Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))
        )
    
        // 创建图的Edge类,Edge类具有srcId和dstId分别对应与源和目标点的标识符,次明早attr成员存储边属性
        val relationships: RDD[Edge[String]] = sc.parallelize(
          Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"))
        )
    
        // 定义一个默认用户
        val defaultUser = ("John Doe", "Missing")
    
        // 基于Graph对象构造初始化图
        val graph = Graph(users, relationships, defaultUser)
    
        // 统计用户为postdoc的总数
        // graph.vertices返回VertexRDD[(String, String)],继承于RDD[(VertexID, (String, String))]
        graph.vertices.filter{case (id, (name, pos)) => pos == "postdoc"}.count
    
        // 统计src > dst的边总数
        // graph.edges返回Edge[String]对象的EdgeRDD
    //    graph.edges.filter(e => e.srcId > e.dstId).count
        graph.edges.filter{case Edge(srcId, dstId, attr) => srcId > dstId}.count()
    
        // graph.triplets包含的属性有Array(((3,(rxin,student)),(7,(jgonzal,postdoc)),collab))
        val facts = graph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + triplet.dstAttr._1 )
        facts.collect().foreach(println)
    
        // 指定新图,顶点属性为出度
        val inputGraph: Graph[Int, String] = graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
    
        // Construct a graph where each edge contains the weight and each vertex is the initial PageRank
        val outputGraph:Graph[Double, Double] = inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((_id, _) => 1.0)
        
        sc.stop()
      }
    
    }
    View Code

    (2) 结构性操作

    图中基本的结构性操作包含:

    a. reverse: Graph[VD, ED]:返回新图,图的边的方向都是反转,可用于计算反转的PageRank

    b. subgraph(epred: EdgeTriplet[VD,ED] => Boolean, vpred: (VertexId, VD) => Boolean): Graph[VD, ED]:利用顶点和边的predicates,返回的图仅仅包含满足顶点predicates的顶点,满足边predicates的边以及满足顶点predicates的连接顶点(connect vertices)。应用场景:

    获取感兴趣的 顶点和边组成的图或者清除断开链接后的图。

    例:

    val validGraph = graph.subgraph(vpred = (id, attr) => !attr._2.equals("Missing"))
        validGraph.vertices.collect.foreach(println)
        validGraph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of "+triplet.dstAttr._1).collect
    View Code

    c. mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]:构建子图,包含输入图中的顶点和边。可与subgraph结合,基于另一个相关图的特征去约束一个图。

    例:利用缺失顶点的图运行连通体,返回有效子图

    val ccGraph = graph.connectedComponents() // No longer contains missing field
        val validCCGraph = ccGraph.mask(validGraph) // Restrict the answer to the valid subgraph
    View Code

    d. groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]:合并图中的并行边(如顶点对之间重复的边),降低图的大小

    (3) 连接操作

    用于将外部数据加入到图中。

    a. joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]:将输入RDD和顶点相结合,返回一个新的带有顶点特征的图。

    注意:对于给定顶点,RDD中有超过1个匹配值时,则仅使用其中一个。建议使用如下方法,保证RDD的唯一性。

    val nonUniqueCosts: RDD[(VertexId, Double)]
        val uniqueCosts: VertexRDD[Double] = graph.vertices.aggregateUsingIndex(nonUniqueCosts, (a, b) => a + b)
        val joinedGraph = graph.joinVertices(uniqueCosts)((id, oldCost, extraCost) => oldCost + extraCost)
    View Code

     b. outerJoinVertices(mapFunc: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED]:与joinVertices类似,因为不是所有顶点在RDD中拥有匹配的值,map函数需要一个Option类型

  • 相关阅读:
    ES6 随记(1)-- let 与 const
    this 机制的四种规则
    BEM(一种 CSS 命名规则)
    WebSocket 的后记
    WebSocket 初体验
    “空”的艺术-当数据为空时显示什么
    前端路由以及浏览器回退,hash & history & location
    体验 WebFont,网页上的艺术字
    dedecms安装全过程(集成环境)
    面向对象(五)
  • 原文地址:https://www.cnblogs.com/mengrennwpu/p/10543237.html
Copyright © 2011-2022 走看看