zoukankan      html  css  js  c++  java
  • GraphX学习笔记——Programming Guide

    学习的资料是官网的Programming Guide

    https://spark.apache.org/docs/latest/graphx-programming-guide.html
    

     首先是GraphX的简介

    GraphX是Spark中专门负责图和图并行计算的组件。

    GraphX通过引入了图形概念来继承了Spark RDD:一个连接节点和边的有向图

    为了支持图计算,GraphX引入了一些算子: subgraphjoinVertices, and aggregateMessages

    和 Pregel API,此外还有一些algorithmsbuilders 来简化图分析任务。

    关于构建 节点Vertex边Edge

    1.如果需要将节点定义成一个类

    package graphx
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.graphx._
    import org.apache.spark.rdd.RDD
    import org.graphstream.graph.implementations.{AbstractEdge, SingleGraph, SingleNode}
    
    /**
      * Created by common on 18-1-22.
      */
    
    // 抽象节点
    class VertexProperty()
    // User节点
    case class UserProperty(val name: String) extends VertexProperty
    // Product节点
    case class ProductProperty(val name: String, val price: Double) extends VertexProperty
    
    object GraphxLearning {
    
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setAppName("GraphX").setMaster("local")
        val sc = new SparkContext(conf)
    
        // The graph might then have the type:
        var graph: Graph[VertexProperty, String] = null
    
      }
    }
    

    和节点一样,边也可以定义成一个class,同时Graph类需要和定义的节点和边的类型相对应

    class Graph[VD, ED] {    // VD表示节点类型,ED表示边类型
      val vertices: VertexRDD[VD]
      val edges: EdgeRDD[ED]
    }
    

    2.如果节点的类型比较简单,例如只是一个String或者(String,String),就不需要定义成一个类

    package graphx
    
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.graphx._
    import org.apache.spark.rdd.RDD
    import org.graphstream.graph.implementations.{AbstractEdge, SingleGraph, SingleNode}
    
    /**
      * Created by common on 18-1-22.
      */
    object GraphxLearning {
    
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setAppName("GraphX").setMaster("local")
        val sc = new SparkContext(conf)
    
        // Create an RDD for the vertices
        val users: RDD[(VertexId, (String, String))] =
          sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
            (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
        // Create an RDD for edges
        val relationships: RDD[Edge[String]] =
          sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
            Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
        //Define a default user in case there are relationship with missing user
        val defaultUser = ("John Doe", "Missing")
    
        // 使用多个RDDs建立一个Graph,Graph的类型分别是节点加上边的类型,有两种节点,一种有ID,一种没有
        val srcGraph: Graph[(String, String), String] = Graph(users, relationships, defaultUser)
    
      }
    }
    

     的一些算子

    图信息

     

    numEdges: Long

    计算整个图中边的数目

    numVertices: Long

    计算整个图中顶点的数目

    inDegrees: VertexRDD[Int]

    计算所有点的入度,若顶点无入度,则不会出现在结果中

    outDegrees: VertexRDD[Int]

    计算所有点的出度,和inDegrees相似,若顶点无出度则不会出现在结果中

    degrees: VertexRDD[Int]

    计算所有顶点的出入度之和,孤立的顶点(无边与之相连)不会出现在结果中

    查看图中的集合

     

    vertices: VertexRDD[VD]

     节点VertexRDD

    edges: EdgeRDD[ED]

     边EdgeRDD

    triplets: RDD[EdgeTriplet[VD, ED]]

    三元组RDD
    图存储  
    persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]  
    cache(): Graph[VD, ED]  

    unpersistVertices(blocking: Boolean = true): Graph[VD, ED]

     

    操作partition的算子

     

    partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]

     
    操作Vertex和Edge的算子,以生成新的Graph  

    mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]

     

    mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]

     

    mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]

     

    mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]

     
    mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]) : Graph[VD, ED2]  
    修改图结构的算子  
     

    reverse: Graph[VD, ED]

     改变有向边的方向

    subgraph( epred: EdgeTriplet[VD,ED] => Boolean = (x => true), vpred: (VertexId, VD) => Boolean = ((v, d) => true)) : Graph[VD, ED]

    子图

    mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]

     

    groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]

     graphx中两个节点之间可以存在多条边,可以用于将这多条边合并
    Join算子  

    joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]

    使用顶点的更新数据生成新的顶点数据。将图数据与输入数据做内连接操作,过滤输入数据中不存在的顶点,并对连接结果使用指定的UDF进行计算,若输入数据中未包含图中某些顶点的更新数据,则在新图中使用顶点的旧数据

    outerJoinVertices[U, VD2](other: RDD[(VertexId, U)]) (mapFunc: (VertexId, VD, Option[U]) => VD2) : Graph[VD2, ED]

     
    聚合算子  

    collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]

    收集每个顶点的相邻顶点的ID数据,edgeDirection用来控制收集的方向

    collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]

    收集每个顶点的相邻顶点的数据,当图中顶点的出入度较大时,可能会占用很大的存储空间,参数edgeDirection用于控制收集方向

    aggregateMessages[Msg: ClassTag]( sendMsg: EdgeContext[VD, ED, Msg] => Unit, mergeMsg: (Msg, Msg) => Msg, tripletFields: TripletFields = TripletFields.All) : VertexRDD[A]

     

    迭代图并行计算的算子

     

    pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)( vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)], mergeMsg: (A, A) => A) : Graph[VD, ED]

     

    基础图算法

     

    pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]

     

    connectedComponents(): Graph[VertexId, ED]

     联通,无向联通的节点将会有一个相同的VertexId
    triangleCount(): Graph[Int, ED]  

    stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED] }

     强联通,有向联通的节点将会有一个相同的VertexId

    LabelPropagation

     标签传播算法

    算法终止条件:它要求所有的node都满足,node的label一定是它的邻居label中出现次数最多的(或最多的之一),这意味着,每个node的邻居中,和它处于同一个community的数量一定大于等于处于其它community的数量

    ShortestPaths

     最短路径算法

    SVDPlusPlus

     SVD算法
     
       
       
  • 相关阅读:
    随机获取Mysql数据表的一条或多条记录
    swap 释放
    linux sed
    mongodb url
    mysql doc
    mysql 8.0 主从复制的优化
    innobackupex 远程备份
    MySQL 8.0新特性:彻底解决困扰运维的复制延迟问题
    pycharm 激活码及使用方式
    MySQL运行内存不足时应采取的措施?
  • 原文地址:https://www.cnblogs.com/tonglin0325/p/8360908.html
Copyright © 2011-2022 走看看