zoukankan      html  css  js  c++  java
  • Spark GraphX图处理编程实例

    所构建的图如下:

    Scala程序代码如下:

    import org.apache.spark._
    import org.apache.spark.graphx._
    // To make some of the examples work we will also need RDD
    import org.apache.spark.rdd.RDD
    object Test {
      def main(args: Array[String]): Unit = {
            // 初始化SparkContext
            val sc: SparkContext = new SparkContext("local[2]", "Spark Graphx");
            // 创造一个点的RDD
            val users: RDD[(VertexId, (String, String))] =
            sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                    (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
            // 创造一个边的RDD,包含各种关系
            val relationships: RDD[Edge[String]] =
            sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
                    Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
            // 定义一个缺省的用户,其主要作用就在于当描述一种关系中不存在的目标顶点时就会使用这个缺省的用户
            val defaultUser = ("John Doe", "Missing")
            // 构造图
            val graph = Graph(users, relationships, defaultUser)
            // 输出Graph的信息
            graph.vertices.collect().foreach(println(_))
            graph.triplets.map(triplet => triplet.srcAttr + "----->" + triplet.dstAttr + "    attr:" + triplet.attr)
                .collect().foreach(println(_))
            // 统计所有用户当中postdoc的数量
          val cnt1 = graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
          System.out.println("所有用户当中postdoc的数量为:"+cnt1);
          // 统计所有源顶点大于目标顶点src > dst的边的数量 
          val cnt2 = graph.edges.filter(e => e.srcId > e.dstId).count
          System.out.println("所有源顶点大于目标顶点 src > dst的边的数量为:"+cnt2);
          // 统计图各个顶点的入度
          val inDegrees: VertexRDD[Int] = graph.inDegrees
          inDegrees.collect().foreach(println(_))
        }
    }

    相关内置的图操作方法有:

    /** Summary of the functionality in the property graph */
    class Graph[VD, ED] {
      // Information about the Graph ===================================================================
      val numEdges: Long
      val numVertices: Long
      val inDegrees: VertexRDD[Int]
      val outDegrees: VertexRDD[Int]
      val degrees: VertexRDD[Int]
      // Views of the graph as collections =============================================================
      val vertices: VertexRDD[VD]
      val edges: EdgeRDD[ED]
      val triplets: RDD[EdgeTriplet[VD, ED]]
      // Functions for caching graphs ==================================================================
      def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
      def cache(): Graph[VD, ED]
      def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
      // Change the partitioning heuristic  ============================================================
      def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
      // Transform vertex and edge attributes ==========================================================
      def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
      def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
      def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
      def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
      def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
        : Graph[VD, ED2]
      // Modify the graph structure ====================================================================
      def reverse: Graph[VD, ED]
      def subgraph(
          epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
          vpred: (VertexID, VD) => Boolean = ((v, d) => true))
        : Graph[VD, ED]
      def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
      def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
      // Join RDDs with the graph ======================================================================
      def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED]
      def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])
          (mapFunc: (VertexID, VD, Option[U]) => VD2)
        : Graph[VD2, ED]
      // Aggregate information about adjacent triplets =================================================
      def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
      def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]]
      def aggregateMessages[Msg: ClassTag](
          sendMsg: EdgeContext[VD, ED, Msg] => Unit,
          mergeMsg: (Msg, Msg) => Msg,
          tripletFields: TripletFields = TripletFields.All)
        : VertexRDD[A]
      // Iterative graph-parallel computation ==========================================================
      def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
          vprog: (VertexID, VD, A) => VD,
          sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
          mergeMsg: (A, A) => A)
        : Graph[VD, ED]
      // Basic graph algorithms ========================================================================
      def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
      def connectedComponents(): Graph[VertexID, ED]
      def triangleCount(): Graph[Int, ED]
      def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED]
    }

    参考链接:

    http://spark.apache.org/docs/latest/graphx-programming-guide.html

  • 相关阅读:
    神通广大的CSS3选择器
    CF1153E Serval and Snake【构造】
    CF1153F Serval and Bonus Problem 【期望】
    李超线段树学习笔记
    Luogu5327【ZJOI2019】语言【树上差分,线段树合并】
    Luogu4191 [CTSC2010]性能优化【多项式,循环卷积】
    Codeforces Round #564 比赛总结
    CF917D Stranger Trees【矩阵树定理,高斯消元】
    【CTS2019】珍珠【生成函数,二项式反演】
    斯特林数学习笔记
  • 原文地址:https://www.cnblogs.com/gaopeng527/p/5048853.html
Copyright © 2011-2022 走看看