zoukankan      html  css  js  c++  java
  • Spark GraphX企业运用

    ========== Spark GraphX 概述 ==========
    1、Spark GraphX是什么?
      (1)Spark GraphX 是 Spark 的一个模块,主要用于进行以图为核心的计算还有分布式图的计算。
      (2)GraphX 他的底层计算也是 RDD 计算,它和 RDD 共用一种存储形态,在展示形态上可以以数据集来表示,也可以图的形式来表示。

    2、Spark GraphX 有哪些抽象?
    (1)顶点。
      顶点的表示用 RDD[(VertexId, VD)] 来表示,(VertexId, VD) 这个元组用来具体表示一个顶点,VertexID 表示顶点的 ID,是 Long 类型的别名,VD 是顶点的属性,是一个类型参数,可以是任何类型。
    (2)边。
      边的表示用 RDD[Edge[ED]] 来表示,Edge 用来具体表示一个边,Edge 里面包含一个 ED 类型参数来设定的属性,ED 类型中包括 一个源顶点的 ID 和一个目标顶点的 ID。
    (3)三元组。
      三元组结构用 RDD[EdgeTriplet[VD, ED]] 来表示,EdgeTriplet[VD, ED] 来表示一个三元组,三元组包含了一个边、边的属性、源顶点 ID、源顶点属性、目标顶点 ID、目标顶点属性。VD 和 ED 是类型参数,VD 表示顶点的属性,ED 表示边的属性。
    (4)图。
      图在 Spark 中用 Graph[VD, ED] 来表示,可以通过顶点和边来构建。

    ========== Spark GraphX 图的构建 ==========
    1、对于 Vertex 顶点的构建:
    (1)对于 RDD[(VertexId, VD)] 这种版本:

    val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin""student")), (7L, ("jgonzal""postdoc")),(5L, ("franklin""prof")), (2L, ("istoica""prof"))))

    (2)对于 VertexRDD[VD] 这种版本:是顶点的构建的优化版本。说白了,就是对上面版本的包装,包装中进行了一些优化!

    val users1: VertexRDD[(String, String)] = VertexRDD[(String, String)](users)

    2、对于 Edge 边的构建:
    (1)对于 RDD[Edge[ED]] 这种版本:

    val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L7L"collab"), Edge(5L3L"advisor"), Edge(2L5L"colleague"), Edge(5L7L"pi")))

    (2)对于 EdgeRDD[ED] 这种版本:是边的构建的优化版本。说白了,就是对上面版本的包装,包装中进行了一些优化!

    val relationships1: EdgeRDD[String] = EdgeRDD.fromEdges(relationships)

    3、对于 Graph 图的构建:
    Graph[VD: ClassTag, ED: ClassTag]
    (1)通过 Graph 类的 apply 方法来构建。

    val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin""student")), (7L, ("jgonzal""postdoc")),(5L, ("franklin""prof")), (2L, ("istoica""prof"))))
    val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L7L"collab"), Edge(5L3L"advisor"), Edge(2L5L"colleague"), Edge(5L7L"pi")))
    val defaultUser = ("John Doe""Missing")
    val graph = Graph(users, relationships) 

    def apply[VD: ClassTag, ED: ClassTag](
        vertices: RDD[(VertexId, VD)],
        edges: RDD[Edge[ED]],
        defaultVertexAttr: VD = null.asInstanceOf[VD],
        edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
        vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

    测试代码:

    scala> val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin""student")), (7L, ("jgonzal""postdoc")),(5L, ("franklin""prof")), (2L, ("istoica""prof"))))
    users: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, (String, String))] = ParallelCollectionRDD[0] at parallelize at <console>:26

    scala> val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L7L"collab"), Edge(5L3L"advisor"), Edge(2L5L"colleague"), Edge(5L7L"pi")))
    relationships: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] = ParallelCollectionRDD[1] at parallelize at <console>:26

    scala> val defaultUser = ("John Doe""Missing")
    defaultUser: (String, String) = (John Doe,Missing)

    scala> val graph = Graph(users, relationships)
    graph: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@4285b1bd

    scala> graph.vertices.collect.foreach(println _)
    (5,(franklin,prof))                                                             
    (2,(istoica,prof))
    (3,(rxin,student))
    (7,(jgonzal,postdoc))

    scala> graph.edges.collect.foreach(println _)
    Edge(3,7,collab)
    Edge(5,3,advisor)
    Edge(2,5,colleague)
    Edge(5,7,pi)

    (2)通过 Graph 类提供 fromEdges 方法来构建。注意:对于顶点的属性是使用提供的默认属性。

    val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L7L"collab"), Edge(5L3L"advisor"), Edge(2L5L"colleague"), Edge(5L7L"pi")))
    val defaultUser = ("aaa""bbb")
    val graph2 = Graph.fromEdges(relationships, defaultUser)

    def fromEdges[VD: ClassTag, ED: ClassTag](
        edges: RDD[Edge[ED]],
        defaultValue: VD,
        edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
        vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

    测试代码:

    scala> val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L7L"collab"), Edge(5L3L"advisor"), Edge(2L5L"colleague"), Edge(5L7L"pi")))
    relationships: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] = ParallelCollectionRDD[15] at parallelize at <console>:26

    scala> val defaultUser = ("aaa""bbb")
    defaultUser: (String, String) = (aaa,bbb)

    scala> val graph2 = Graph.fromEdges(relationships, defaultUser)
    graph2: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@52fb37d7

    scala> graph2.vertices.collect.foreach(println _)
    (5,(aaa,bbb))
    (2,(aaa,bbb))
    (3,(aaa,bbb))
    (7,(aaa,bbb))

    scala> graph2.edges.collect.foreach(println _)
    Edge(3,7,collab)
    Edge(5,3,advisor)
    Edge(2,5,colleague)
    Edge(5,7,pi)

    (3)通过 Graph 类提供的 fromEdgeTuples 方法来构建。注意:对于顶点的属性是使用提供的默认属性,对于边的属性是相同边的数量。

    val relationships: RDD[(VertexId, VertexId)] = sc.parallelize(Array((3L7L), (5L3L), (2L5L), (5L7L)))
    val defaultUser = ("haha""heihei")
    val graph3 = Graph.fromEdgeTuples[(String, String)](relationships, defaultUser)

    def fromEdgeTuples[VD: ClassTag](
        rawEdges: RDD[(VertexId, VertexId)],
        defaultValue: VD,
        uniqueEdges: Option[PartitionStrategy] = None,
        edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
        vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int]

    测试代码:

    scala> val relationships: RDD[(VertexId, VertexId)] = sc.parallelize(Array((3L7L), (5L3L), (2L5L), (5L7L)))
    relationships: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.VertexId)] = ParallelCollectionRDD[26] at parallelize at <console>:26

    scala> val defaultUser = ("haha""heihei")
    defaultUser: (String, String) = (haha,heihei)

    scala> val graph3 = Graph.fromEdgeTuples[(String, String)](relationships, defaultUser)
    graph3: org.apache.spark.graphx.Graph[(String, String),Int] = org.apache.spark.graphx.impl.GraphImpl@5cb7311b

    scala> graph3.vertices.collect.foreach(println _)
    (5,(haha,heihei))
    (2,(haha,heihei))
    (3,(haha,heihei))
    (7,(haha,heihei))

    scala> graph3.edges.collect.foreach(println _)
    Edge(3,7,1)     第三个元素“1”表示的是相同边的数量
    Edge(5,3,1)
    Edge(2,5,1)
    Edge(5,7,1)

    ========== Spark GraphX 图的基本信息转换 ==========
    1、graph.numEdges 返回当前图的边的数量
    2、graph.numVertices 返回当前图的顶点的数量
    3、graph.inDegrees 返回当前图每个顶点入度的数量,返回类型为 VertexRDD[Int]
    4、graph.outDegrees 返回当前图每个顶点出度的数量,返回的类型为 VertexRDD[Int]
    5、graph.degrees 返回当前图每个顶点入度和出度的和,返回的类型为 VertexRDD[Int]

    ========== Spark GraphX 图的转换操作 ==========
    1、def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2) (implicit eq: VD =:= VD2 = null): Graph[VD2, ED]
      对当前图每一个顶点应用提供的 map 函数来修改顶点的属性,返回一个新的图。
    2、def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]
      对当前图每一条边应用提供的 map 函数来修改边的属性,返回一个新图。
    3、def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
      对当前图每一个三元组应用提供的 map 函数来修改边的属性,返回一个新图。

    ========== Spark GraphX 图的结构操作 ==========
    1、def reverse: Graph[VD, ED]
      该操作反转一个图,产生一个新图,新图中的每条边的方向和原图每条边的方向相反。
    2、def subgraph(epred: EdgeTriplet[VD, ED] => Boolean = (x => true), vpred: (VertexId, VD) => Boolean = ((v, d) => true)) : Graph[VD, ED]
      该操作返回一个当前图的子图,通过传入 epred 函数来过滤边,通过传入 vpred 函数来过滤顶点,返回满足 epred 函数值为 true 的边和满足 vpred 函数值为 true 顶点组成子图。
    3、def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]
      mask 函数用于求一张图和 other 这张图的交集,该交集的判别条件指的是:1、对于顶点,只对比顶点的 ID。2、对于边,只对比边的 srcID、dstID,如果 other 和当前图的交集中的边、顶点的属性不一致,那么 mask 产生的图默认采用当前图的属性。
    4、def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
      该操作实现将当前图中的两条相同边(边的 srcID 和 dstID 相同)合并。你需要传入一个 merge 函数,用于合并这两边的属性返回一个新的属性。注意:合并两条边的前提是,两条边在一个分区。

    ========== Spark GraphX 顶点关联操作 ==========
    1、def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
      该操作通过 mapFunc 函数将 table 中提供的数据更新到相同 VertexId 的属性里。
    2、def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, Option[U]) => VD2)(implicit eq: VD =:= VD2 = null): Graph[VD2, ED]
      该操作和 joinVertices 提供了相同的功能,但是,如果 table 中不存在相对应的顶点(也就是不存 VertexId),这个时候 U 默认是 None。

    ========== Spark GraphX 聚合操作 ==========
    1、def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
      该操作返回 EdgeDirection 定义的方向中相邻顶点的 ID 和属性的集合。
    2、def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
      改操作返回 EdgeDirection 定义的方向中相邻顶点的 ID 的集合。
    3、def aggregateMessages[A: ClassTag](sendMsg: EdgeContext[VD, ED, A] => Unit,mergeMsg: (A, A) => A,tripletFields: TripletFields = TripletFields.All): VertexRDD[A]
      该函数用于聚合发送到顶点的信息,A 是发送的信息的类型,sendMsg 是每一条边都会自动触发,到底有没有消息能够发送到顶点,使用 EdgeContext 里面的 sendToSrc和sendToDst 来实现。mergeMsg
    是每一个顶点都会在接受到所有消息之后调用,主要用于所有接收到的消息的聚合。然后整个函数返回消息的顶点集合 VertexRDD[A]。

  • 相关阅读:
    __all__
    python内置函数
    qZnF新存马王至许观制两利各
    PHP中获取当前页面的完整URL
    DedeCms用tag调用全站相关文章
    dedecms如何利用tag来调取相关文章
    SQL Server TempDB
    SQL Server Pivot 隐藏group
    Rebind and Rewind in Execution Plans
    batch 数字进制的问题
  • 原文地址:https://www.cnblogs.com/huanghanyu/p/12989050.html
Copyright © 2011-2022 走看看