zoukankan      html  css  js  c++  java
  • GraphX 图数据建模和存储

    背景

    简单分析一下GraphX是怎么为图数据建模和存储的。

    入口

    能够看GraphLoader的函数。

    def edgeListFile(
          sc: SparkContext,
          path: String,
          canonicalOrientation: Boolean = false,
          numEdgePartitions: Int = -1,
          edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
          vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
        : Graph[Int, Int]
    1. path能够是本地路径(文件或目录),也能够是hdfs路径,本质上是使用sc.textFile来生成HadoopRDD的,numEdgePartitions是分区数。
    2. Graph的存储是分EdgeRDD和VertexRDD两块,能够分别设置StorageLevel。默认是内存。
    3. 这个函数接受边文件。即’1 2’, ‘4 1’这种点到点的数据对组成的文件。

      把这份文件按分区数和存储level转化成一个能够操作的图。

    流程

    1. sc.textFile读文件。生成原始的RDD
    2. 每一个分区(的计算节点)把每条记录放进PrimitiveVector里,这个结构是spark里为primitive数据优化的存储结构。

    3. PrimitiveVector里的数据一条条取出。转化成EdgePartition,即EdgeRDD的分区实现。这个过程中生成了面向列存的结构:src点的array,dst点的array。edge的属性array,以及两个正反向map(用于相应点的local id和global id)。

    4. EdgeRDD 做一次count触发这次边建模任务,真正persist起来。

    5. EdgePartition去生成一个RoutingTablePartition,里面是vertexId到partitionId的相应关系。借助RoutingTablePartition生成VertexRDD

    6. EdgeRDDVertexRDD生成Graph。前者维护了边的属性、边两头顶点的属性、两头顶点各自的global vertexID、两头顶点各自的local Id(在一个edge分区里的array index)、用于寻址array的正反向map。

      后者维护了点存在于哪个边的分区上的Map。

    下面是代码,比較清晰地展现了内部存储结构。

    private[graphx]
    class EdgePartition[
        @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag](
        localSrcIds: Array[Int],
        localDstIds: Array[Int],
        data: Array[ED],
        index: GraphXPrimitiveKeyOpenHashMap[VertexId, Int],
        global2local: GraphXPrimitiveKeyOpenHashMap[VertexId, Int],
        local2global: Array[VertexId],
        vertexAttrs: Array[VD],
        activeSet: Option[VertexSet])
      extends Serializable {
    /**
     * Stores the locations of edge-partition join sites for each vertex attribute in a particular
     * vertex partition. This provides routing information for shipping vertex attributes to edge
     * partitions.
     */
    private[graphx]
    class RoutingTablePartition(
        private val routingTable: Array[(Array[VertexId], BitSet, BitSet)]) extends Serializable {

    细节

    分区摆放

    EdgeRDD的分区怎么切分的呢?由于数据是依据HadoopRDD从文件中依据offset扫出来的。能够理解为对边数据的切分是没有不论什么处理的。由于文件也没有特殊排列过,所以切分成多少个分区应该就是随机的。

    VertexRDD的分区怎么切分的呢?EdgeRDD生成的vertexIdToPartitionId这份RDD数据是RDD[VertexId, Int]型,它依据hash分区规则,分成和EdgeRDD分区数一样大。

    所以VertexRDD的分区数和Edge一样。分区规则是Long取hash。

    所以我能够想象的计算过程是:

    对点操作的时候,首先对vertexId(是个Long)进行hash,找到相应分区的位置,在这个分区上,假设是内存存储的VertexRDD,那非常快能够查到它的边所在的几个Edge分区的所在位置,然后把计算分到这几个Edge所在的分区上去计算。
    第一步依据点hash后找边分区位置的过程就相似一次建好索引的查询。

    配官方图方面理解:
    这里写图片描写叙述

    高效数据结构

    对原生类型的存储和读写有比較好的数据结构支持,典型的是EdgePartition里使用的map:

    /**
     * A fast hash map implementation for primitive, non-null keys. This hash map supports
     * insertions and updates, but not deletions. This map is about an order of magnitude
     * faster than java.util.HashMap, while using much less space overhead.
     *
     * Under the hood, it uses our OpenHashSet implementation.
     */
    private[graphx]
    class GraphXPrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
                                  @specialized(Long, Int, Double) V: ClassTag](

    以及之前提到的vector

    /**
     * An append-only, non-threadsafe, array-backed vector that is optimized for primitive types.
     */
    private[spark]
    class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize: Int = 64) {
      private var _numElements = 0
      private var _array: Array[V] = _

    全文完 :)

  • 相关阅读:
    dir 函数
    模块的 __name__
    from..import 语句
    pass
    可变参数
    python 中的一点新知识
    Numpy中的一点小知识
    使用ipython %matplotlib inline
    numpy.random.rand
    Python:numpy中shape和reshape的用法
  • 原文地址:https://www.cnblogs.com/yjbjingcha/p/7105779.html
Copyright © 2011-2022 走看看