zoukankan      html  css  js  c++  java
  • GraphX Pregel API

    link

     Pregel核心部分是三个函数:

    1. 节点处理消息的函数  vprog: (VertexId, VD, A) => VD (节点id,节点属性,消息) => 节点属性。其作用是接受消息,并进行处理,根据处理结果更新节点属性。
    2. 节点发送消息的函数 sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)]   (边元组) => Iterator[(目标节点id,消息)]。其作用是根据所定义的标准,判断是否向邻居节点发送消息,如果满足条件,发送Iterator[(目标节点id,消息)];如果不满足条件,发送Iterator.empty。
    3. 消息合并函数 mergeMsg: (A, A) => A)    (消息,消息) => 消息。起作用是,由于图中每一个节点可能有多个邻居与它连接,所以可能每一个节点会接收到多个节点发送来的消息,该函数就是将接收到的多个消息进行合并处理。 
      def pregel[A: ClassTag](
            initialMsg: A,
            maxIterations: Int = Int.MaxValue,
            activeDirection: EdgeDirection = EdgeDirection.Either)(
            vprog: (VertexId, VD, A) => VD,
            sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
            mergeMsg: (A, A) => A)
          : Graph[VD, ED] = {
          Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
        }

      initialMsg: 初始化消息,这个初始消息会被用来初始化图中的每个节点的属性,在pregel进行调用时,会首先在图上使用mapVertices来根据initialMsg的值更新每个节点的值,至于如何更新,则由vprog参数而定,vprog函数就接收了initialMsg消息做为参数来更新对应节点的值

      maxIterations: 最大迭代次数

      activeDirection: 表示边的活跃方向,什么是活跃方向呢,首先要解释一下活跃消息与活跃顶点的概念,活跃节点是指在某一轮迭代中,pregel会以sendMsg和mergeMsg为参数来调用graph的aggregateMessage方法后收到消息的节点,活跃消息就是这轮迭代中所有被收成功收到的消息。这样一来,有的边的src节点是活跃节点,有的dst节点是活跃节点,而有的边两端节点都是活跃节点。如果activeDirection参数指定为“EdgeDirection.Out”,则在下一轮迭代时,只有接收消息的出边(src—>dst)才会执行sendMsg函数,也就是说,sendMsg回调函数会过滤掉”dst—>src”的edgeTriplet上下文参数

      vprog: 节点变换函数,在初始时,以及每轮迭代后,pregel会根据上一轮使用的msg和这里的vprod函数在图上调用joinVertices方法变化每个收到消息的节点,注意这个函数除初始时外,都是仅在接收到消息的节点上运行,这一点可以从源码中看到,源码中用的是joinVertices(message)(vprog),因此,没有收到消息的节点在join之后就滤掉了

      sendMsg: 消息发送函数,该函数的运行参数是一个代表边的上下文,pregel在调用aggregateMessages时,会将EdgeContext转换成EdgeTriplet对象(ctx.toEdgeTriplet)来使用,用户需要通过Iterator[(VertexId,A)]指定发送哪些消息,发给那些节点,发送的内容是什么,因为在一条边上可以发送多个消息,如sendToDst,如sendToSrc,所以这里是个Iterator,每一个元素是一个tuple,其中的vertexId表示要接收此消息的节点的id,它只能是该边上的srcId或dstId,而A就是要发送的内容,因此如果是需要由src发送一条消息A给dst,则有:Iterator((dstId,A)),如果什么消息也不发送,则可以返回一个空的Iterator:Iterator.empty

      mergeMsg: 邻居节点收到多条消息时的合并逻辑,注意它区别于vprog函数,mergeMsg仅能合并消息内容,但合并后并不会更新到节点中去,而vprog函数可以根据收到的消息(就是mergeMsg产生的结果)更新节点属性

    单源最短路径的例子

    GraphX中的单源点最短路径例子,使用的是类Pregel的方式。

    核心部分是三个函数:

    1.节点处理消息的函数  vprog: (VertexId, VD, A) => VD (节点id,节点属性,消息) => 节点属性

    2.节点发送消息的函数 sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId,A)]   (边元组) => Iterator[(目标节点id,消息)]

    3.消息合并函数 mergeMsg: (A, A) => A)    (消息,消息) => 消息

    package myclass.GraphX
    
    import org.apache.spark.graphx._
    import org.apache.spark.SparkContext
    
    // Import random graph generation library
    
    import org.apache.spark.graphx.util.GraphGenerators
    
    /**
     * Created by jack on 3/4/14.
     */
    object Pregel {
    	def main(args: Array[String]) {
    		val sc = new SparkContext("local", "pregel test", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
    		// A graph with edge attributes containing distances
    		//初始化一个随机图,节点的度符合对数正态分布,边属性初始化为1
    		val graph: Graph[Int, Double] =
    			GraphGenerators.logNormalGraph(sc, numVertices = 10).mapEdges(e => e.attr.toDouble)
    graph.edges.foreach(println)
    		val sourceId: VertexId = 4 // The ultimate source
    
    		// Initialize the graph such that all vertices except the root have distance infinity.
    		//初始化各节点到原点的距离
    		val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
    
    		val sssp = initialGraph.pregel(Double.PositiveInfinity)(
    			// Vertex Program,节点处理消息的函数,dist为原节点属性(Double),newDist为消息类型(Double)
    			(id, dist, newDist) => math.min(dist, newDist),
    
    			// Send Message,发送消息函数,返回结果为(目标节点id,消息(即最短距离))
    			triplet => {
    				if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
    					Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
    				} else {
    					Iterator.empty
    				}
    			},
    			//Merge Message,对消息进行合并的操作,类似于Hadoop中的combiner
    			(a, b) => math.min(a, b)
    		)
    
    		println(sssp.vertices.collect.mkString("
    "))
    	}
    }

    1. 最短路径测试代码 


    下面主要是对Spark图计算框架GraphX中的单源点最短路径的源码进行解析。 

        test("shortPaths") {
            // 测试的真实结果,后面用于对比
            val shortestPaths = Set(
            (1, Map(1 -> 0, 4 -> 2)), (2, Map(1 -> 1, 4 -> 2)), (3, Map(1 -> 2, 4 -> 1)),
            (4, Map(1 -> 2, 4 -> 0)), (5, Map(1 -> 1, 4 -> 1)), (6, Map(1 -> 3, 4 -> 1)))
    
            // 构造有向图的边序列
            val edgeSeq = Seq((1, 2), (1, 5), (2, 3), (2, 5), (3, 4), (4, 5), (4, 6)).flatMap {
                case e => Seq(e, e.swap)
            }
    
            // 构造有向图
            val edges = sc.parallelize(edgeSeq).map { case (v1, v2) => (v1.toLong, v2.toLong) }
            val graph = Graph.fromEdgeTuples(edges, 1)
    
            // 要求最短路径的点集合
            val landmarks = Seq(1, 4).map(_.toLong)
    
            // 计算最短路径
            val results = ShortestPaths.run(graph, landmarks).vertices.collect.map {
            case (v, spMap) => (v, spMap.mapValues(i => i))
            }
    
            // 与真实结果对比
            assert(results.toSet === shortestPaths)
        }

    2. Graphx底层实现代码

        package org.apache.spark.graphx.lib
    
        import org.apache.spark.graphx._
        import scala.reflect.ClassTag
    
        object ShortestPaths {
            // 定义一个Map[VertexId,Int]类型的Map函数,别名为SPMap,函数的属性Key为VertexId类型,
            // 其实也就是scala中的Long类型,它在图中的别名是VertexId,还有Int类型的路径的长度。
            type SPMap = Map[VertexId, Int]
    
            // 初始化图的属性信息
            private def makeMap(x: (VertexId, Int)*) = Map(x: _*)
    
            // 主要用于将自身的属性值(即源顶点属性值)中路径的长度加1(这里说明该最短路径模型只能应用与非带权图,即权值都相等的图),然后和目标定点的属性值比较
            private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) }
    
            // 比较源顶点属性和发送信息过来顶点的属性取最小值。
            private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap =
            // 先将两个集合spmap1和spma2的顶点整合要一起,这里用了一个++来处理
            // 再形成一个新的k->v的map
            // 其中v是两个消息中值最小的一个
            (spmap1.keySet ++ spmap2.keySet).map {
                k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
            }.toMap
    
            // 计算给定了起始和终点序列的最短路径
            // ED是边的属性值,计算过程中不会被使用
            // graph是要计算最短路径的图
            // landmarks是要求最短路径顶点id的集合,最短路径会计算每一个landmark
            // 返回的是一个图,每个顶点的属性就是landmark点间的最短路径
            def run[VD, ED: ClassTag](graph: Graph[VD, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED] = {
            val spGraph = graph.mapVertices { (vid, attr) =>
            // 如果landmark只有一个点1
            // 将landmarks中的顶点初始化为Map(1-> 0),即自身到自身的距离为0,其余的顶点属性初始化为Map()。
            if (landmarks.contains(vid)) makeMap(vid -> 0) else makeMap()
            }
    
            // 定义一个initMessage它的值为Map()
            // 作用是在Pregel第一次运行的时候,所有图中的顶点都会接收到initMessage。
            val initialMessage = makeMap()
    
            // 用户定义的顶点程序运行在每一个顶点中,负责接收进来的信息,和计算新的顶点值。
            // 在第一次迭代的时候,所有的顶点程序将会被默认的defaultMessage调用,在次轮迭代中,顶点程序只有接收到message才会被调用。
            def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
                addMaps(attr, msg)
            }
    
    
            // 该函数应用于邻居顶点在当前迭代中接收message
            // 一旦收到通知,相对于发送该消息的点,就是目的节点,相对于收到消息的点就是源节点
            // 这个地方从源节点考虑
            def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] = {
            // 对所有目的节点值加1
    
            val newAttr = incrementMap(edge.dstAttr)
            // 求得最短路径,将源节点的值发送给所有所有的源节点,其实这里源节点就是相邻点的意思,换成目的节点应该也是可以的
            if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr))
            else Iterator.empty
            }
    
            // 调用pregel函数
            // 第一个参数列表包含配置参数初始消息、最大迭代数、发送消息的边的方向(默认是沿边方向出)
            // 第二个参数列表包含用户 自定义的函数用来接收消息(vprog)、计算消息(sendMsg)、合并消息(mergeMsg)
            Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)
            }
        }
    

    GraphX最短路径求解中使用了Pregel模型,这是一个非常高效的图计算模型。但目前最短路径有如下限制:

    1. 只能用于非带权图(权值相等);
    2. 利用的算法是迪杰斯特拉求解最短路径。

    相关讨论:[1][2][3][4]

    关注公众号 海量干货等你
  • 相关阅读:
    JavaScript日期处理类库momentjs
    sublime text 2 学习
    node.js代理设置
    使用nodejs将html5 canvas base64编码图片保存为文件
    Sublime Text 2 常用插件介绍
    把silverlight treeview 节点前面的小三角换成自定义的图片
    文件监控
    linux内核中的红黑树代码解析
    static关键字
    红黑树的实现(二)
  • 原文地址:https://www.cnblogs.com/sowhat1412/p/12734176.html
Copyright © 2011-2022 走看看