zoukankan      html  css  js  c++  java
  • spark Graph 的PregelAPI 理解和使用

    spark Graph 的PregelAPI 理解和使用

    图本质上是一种递归的数据结构,可以使用Spark GraphX 的PregelAPI接口对图数据进行批量计算,
    之前一直不怎么理解Pregel计算模型,因此花点时间整理一下,该api的理解以及使用方法等。

    1、Pregel的计算模型

    Pregel接口的官方定义:

      /**
       * Execute a Pregel-like iterative vertex-parallel abstraction.  The
       * user-defined vertex-program `vprog` is executed in parallel on
       * each vertex receiving any inbound messages and computing a new
       * value for the vertex.  The `sendMsg` function is then invoked on
       * all out-edges and is used to compute an optional message to the
       * destination vertex. The `mergeMsg` function is a commutative
       * associative function used to combine messages destined to the
       * same vertex.
       *
       * On the first iteration all vertices receive the `initialMsg` and
       * on subsequent iterations if a vertex does not receive a message
       * then the vertex-program is not invoked.
       *
       * This function iterates until there are no remaining messages, or
       * for `maxIterations` iterations.
       *
       * @param A the Pregel message type
       *
       * @param initialMsg the message each vertex will receive at the on
       * the first iteration
       *
       * @param maxIterations the maximum number of iterations to run for
       *
       * @param activeDirection the direction of edges incident to a vertex that received a message in
       * the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only
       * out-edges of vertices that received a message in the previous round will run.
       *
       * @param vprog the user-defined vertex program which runs on each
       * vertex and receives the inbound message and computes a new vertex
       * value.  On the first iteration the vertex program is invoked on
       * all vertices and is passed the default message.  On subsequent
       * iterations the vertex program is only invoked on those vertices
       * that receive messages.
       *
       * @param sendMsg a user supplied function that is applied to out
       * edges of vertices that received messages in the current
       * iteration
       *
       * @param mergeMsg a user supplied function that takes two incoming
       * messages of type A and merges them into a single message of type
       * A.  ''This function must be commutative and associative and
       * ideally the size of A should not increase.''
       *
       * @return the resulting graph at the end of the computation
       *
       */
      def pregel[A: ClassTag](
          initialMsg: A,
          maxIterations: Int = Int.MaxValue,
          activeDirection: EdgeDirection = EdgeDirection.Either)(
          vprog: (VertexId, VD, A) => VD,
          sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
          mergeMsg: (A, A) => A)
        : Graph[VD, ED] = {
        Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
      }
    

    方法的注释根据自己的实验理解如下:

    执行类似Pregel的迭代顶点并行抽象。
    在一次迭代计算中,图的各个顶点收到默认消息或者上一轮迭代发送的消息后;
    首先调用mergeMsg函数将具有相同目的地的消息合并成一个消息;
    然后调用vprog顶点函数计算出新的顶点属性值;
    然后再调用sendMsg 函数向出边顶点发送下一轮迭代的消息;
    迭代计算直到没有消息剩余或者达到最大迭代次数退出。

    在首轮迭代的时候,所有的顶点都会接收到initialMsg消息,在次轮迭代的时候,如果顶点没有接收到消息,verteProgram则不会被调用。

    这些函数迭代会一直持续到没有剩余消息或者达到最大迭代次数maxIterations

    VD : 顶点的属性的数据类型。
    ED : 边的属性的数据类型
    VertexId : 顶点ID的类型
    A : Pregel message的类型。
    graph:计算的输入的图
    initialMsg : 图的每个顶点在首轮迭代时收到的初始化消息
    maxIterations:最大迭代的次数
    vprog
    vprog是用户定义的顶点程序,会运行在每一个顶点上,该vprog函数的功能是负责接收入站的message,
    并计算出的顶点的新属性值。
    在首轮迭代时,在所有的顶点上都会调用程序vprog函数,传人默认的defaultMessage;在次轮迭代时,只有接收到message消息的顶点才会调用vprog函数。

          vprog: (VertexId, VD, A) => VD
    	  输入参数: 顶点ID ,该顶点对应的顶点属性值,本轮迭代收到的message
          输出结果: 新的顶点属性值
                   
    

    sendMsg
    用户提供的函数,应用于以当前迭代计算收到消息的顶点为源顶点的边edges;sendMsg函数的功能
    是发送消息,消息的发送方向默认是沿着出边反向(向边的目的顶点发送消息)。

    sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
    输入参数是 EdgeTriplet :当前迭代计算收到消息的顶点为源顶点的边edges的EdgeTriplet对象。
    输出结果: 下一迭代的消息。
    

    mergeMsg
    用户提供定义的函数,将具有相同目的地的消息合并成一个;如果一个顶点,收到两个以上的A类型的消息message,该函数将他们合并成一个A类型消息。 这个函数必须是可交换的和关联的。理想情况下,A类型的message的size大小不应增加。

    mergeMsg: (A, A) => A)
    
    输入参数:当前迭代中,一个顶点收到的2个A类型的message。
    输出结果:A类型的消息
    

    下面的例子是使用Pregel计算单源最短路径,在图中节点间查找最短的路径是非常常见的图算法,所谓“单源最短路径”,就是指给定初始节点StartV,
    计算图中其他任意节点到该节点的最短距离。我简化了官方的示例,使我们可以更简单的理解pregel计算模型。

    package graphxTest
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.graphx.{Edge, Graph, VertexId}
    
    /**
      * Created by Mtime on 2018/1/25.
      */
    object GraphxPregelTest {
      val spark = SparkSession
        .builder
        .appName(s"${this.getClass.getSimpleName}").master("local[2]")
        .getOrCreate()
      val sc = spark.sparkContext
    
      /**
        * 计算最短路径
        **/
      def shortestPath(): Unit = {
        //生成一个图对象
        val graph: Graph[Long, Double] = genGraph
        //打印出图的值
        graph.triplets.foreach(t => {
          println(s"t.srcId=${t.srcId} t.dstId=${t.dstId}  t.srcAttr=${t.srcAttr} t.dstAttr=${t.dstAttr}")
        })
    
        val sourceId: VertexId = 1 // 计算顶点1到图各个顶点的最短路径
        // Initialize the graph such that all vertices except the root have distance infinity.
        val initialGraph = graph.mapVertices((id, att) =>
            if (id == sourceId) 0.0 else Double.PositiveInfinity)
    
        println("------------------------------")
        //打印出图的值
        initialGraph.triplets.foreach(t => {
          println(s"t.srcId=${t.srcId} t.dstId=${t.dstId}  t.srcAttr=${t.srcAttr} t.dstAttr=${t.dstAttr}")
        })
    
        val sssp:Graph[Double,Double] = initialGraph.pregel(Double.PositiveInfinity)(
          (vid, vidAttr, message) => math.min(vidAttr, message), // Vertex Program
          triplet => {
            // Send Message
            if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
              Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
            } else {
              Iterator.empty
            }
          },
          (message_a, message_b) => math.min(message_a, message_b) // Merge Message
        )
        println("------------------------------")
        //打印出计算结果
        println(sssp.vertices.collect.mkString("
    "))
      }
    
      /**
        * 初始化图对象
        *
        * @return
        */
      private def genGraph(): Graph[Long, Double] = {
        val vertices: RDD[(VertexId, Long)] =
          sc.parallelize(Array(
            (1L, 0L),
            (2L, 0L),
            (3L, 0L),
            (4L, 0L),
            (5L, 0L),
            (6L, 0L))
          )
        // Create an RDD for edges
        val edges: RDD[Edge[Double]] =
          sc.parallelize(Array(
            Edge(1L, 2L, 1.0),
            Edge(1L, 4L, 1.0),
            Edge(1L, 5L, 1.0),
            Edge(2L, 3L, 1.0),
            Edge(4L, 3L, 1.0),
            Edge(5L, 4L, 1.0),
            Edge(3L, 6L, 1.0)
          )
          )
        val graph: Graph[Long, Double] = Graph(vertices, edges, 0)
        graph
      }
    
      def main(args: Array[String]) {
        shortestPath
      }
    }
    
    
  • 相关阅读:
    03_ if 练习 _ little2big
    uva 11275 3D Triangles
    uva 12296 Pieces and Discs
    uvalive 3218 Find the Border
    uvalive 2797 Monster Trap
    uvalive 4992 Jungle Outpost
    uva 2218 Triathlon
    uvalive 3890 Most Distant Point from the Sea
    uvalive 4728 Squares
    uva 10256 The Great Divide
  • 原文地址:https://www.cnblogs.com/honeybee/p/8422338.html
Copyright © 2011-2022 走看看