zoukankan      html  css  js  c++  java
  • Spark学习笔记--Graphx

    浅谈Graphx: http://blog.csdn.net/shangwen_/article/details/38645601

    Pregel: http://blog.csdn.net/shangwen_/article/details/38479835

    Bagel: http://ju.outofmemory.cn/entry/712

    Graphx的主要接口:

    基本信息接口(numEdges , num Vertices , degrees(in/out) )
    聚合操作 (mapVertices , mapEdges , mapTriplets)
    转换接口 (mapReduceTriplets , collectNeighbors)
    结构操作 (reverse , subgraph , mask , groupEdges)
    缓存操作 (cache , unpersistVertices)

    要点:

    每个图由3个RDD组成

    名称 对应RDD 包含的属性
    Vertices VertexRDD ID、点属性
    Edges EdgeRDD 源顶点的ID,目标顶点的ID,边属性
    Triplets   源顶点ID,源顶点属性,边属性,目标顶点ID,目标顶点属性

    Triplets其实是对Vertices和Edges做了join操作
    点分割、边分割

    应用:

    基于最大连通图的社区发现
    基于三角形计数的关系衡量
    基于随机游走的用户属性传播

    注意:

    GraphX通过引入*Resilient Distributed Property Graph*(一种点和边都带属性的有向多图)扩展了Spark RDD这种抽象数据结构,这种Property Graph拥有两种Table和Graph两种视图(及视图对应的一套API),而只有一份物理存储。
    Table视图将视图看成Vertex Property Table和Edge Property Table等的组合,这些组合继承了Spark RDD的API(filter,map等)。
    Graph视图上包括reverse/subgraph/mapV(E)/joinV(E)/mrTriplets等操作。

    Graph上的函数:(官网)

    /** Summary of the functionality in the property graph */
    class Graph[VD, ED] {
      // Information about the Graph ===================================================================
      val numEdges: Long
      val numVertices: Long
      val inDegrees: VertexRDD[Int]
      val outDegrees: VertexRDD[Int]
      val degrees: VertexRDD[Int]
      // Views of the graph as collections =============================================================
      val vertices: VertexRDD[VD]
      val edges: EdgeRDD[ED]
      val triplets: RDD[EdgeTriplet[VD, ED]]
      // Functions for caching graphs ==================================================================
      def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
      def cache(): Graph[VD, ED]
      def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
      // Change the partitioning heuristic  ============================================================
      def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
      // Transform vertex and edge attributes ==========================================================
      def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
      def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
      def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
      def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
      def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
        : Graph[VD, ED2]
      // Modify the graph structure ====================================================================
      def reverse: Graph[VD, ED]
      def subgraph(
          epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
          vpred: (VertexID, VD) => Boolean = ((v, d) => true))
        : Graph[VD, ED]
      def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
      def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
      // Join RDDs with the graph ======================================================================
      def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED]
      def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])
          (mapFunc: (VertexID, VD, Option[U]) => VD2)
        : Graph[VD2, ED]
      // Aggregate information about adjacent triplets =================================================
      def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
      def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]]
      def aggregateMessages[Msg: ClassTag](
          sendMsg: EdgeContext[VD, ED, Msg] => Unit,
          mergeMsg: (Msg, Msg) => Msg,
          tripletFields: TripletFields = TripletFields.All)
        : VertexRDD[A]
      // Iterative graph-parallel computation ==========================================================
      def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
          vprog: (VertexID, VD, A) => VD,
          sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
          mergeMsg: (A, A) => A)
        : Graph[VD, ED]
      // Basic graph algorithms ========================================================================
      def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
      def connectedComponents(): Graph[VertexID, ED]
      def triangleCount(): Graph[Int, ED]
      def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED]
    }

    pregel函数参数解释:

    VD:顶点的数据类型。
    ED:边的数据类型
    A:Pregel message的类型。

    graph:输入的图
    initialMsg:在第一次迭代的时候顶点收到的消息。
    maxIterations:迭代的次数
    vprog:用户定义的顶点程序运行在每一个顶点中,负责接收进来的信息,和计算新的顶点值。在第一次迭代的时候,所有的顶点程序将会被默认的defaultMessage调用,在次轮迭代中,顶点程序只有接收到message才会被调用。
    sendMsg:用户提供的函数,应用于边缘顶点在当前迭代中接收message
    mergeMsg:用户提供定义的函数,将两个类型为A的message合并为一个类型为A的message。(thisfunction must be commutative and associative and ideally the size of A shouldnot increase)

    示例:

    import org.apache.spark.graphx._
    // Import random graph generation library
    import org.apache.spark.graphx.util.GraphGenerators
    // A graph with edge attributes containing distances
    val graph: Graph[Long, Double] = GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
    val sourceId: VertexId = 42 // The ultimate source
    // Initialize the graph such that all vertices except the root have distance infinity.
    val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
    val sssp = initialGraph.pregel(Double.PositiveInfinity)(
      (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
      triplet => {  // Send Message
        if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
          Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
        } else {
          Iterator.empty
        }
      },
      (a,b) => math.min(a,b) // Merge Message
      )
    println(sssp.vertices.collect.mkString("
    "))

     

  • 相关阅读:
    Expression Blend实例中文教程(13)
    Expression Blend实例中文教程(12)
    Expression Blend实例中文教程(11)
    【转】Python3中urllib详细使用方法(header,代理,超时,认证,异常处理)
    cookie
    【转】cookielib模块
    代理的步骤
    urllib.parse.urlencode
    urllib.request.Request
    【转】python3 urllib.request 网络请求操作
  • 原文地址:https://www.cnblogs.com/gnivor/p/5037439.html
Copyright © 2011-2022 走看看