zoukankan      html  css  js  c++  java
  • 《大数据Spark企业级实战》

    大数据Spark企业级实战 PDF电子书下载 带书签目录 完整版.pdf

    Graph的部分抄的《快刀初试:Spark GraphX在淘宝的实践》

    Akka:Actor 

    Spark 为何如此之快?

    - RDD统一抽象;

    - 基于内存的迭代计算;

    - DAG;

    - 出色的容错机制;

     

     

    https://endymecy.gitbooks.io/spark-graphx-source-analysis/content/vertex-cut.html

    点分割存储

      在第一章分布式图系统中,我们介绍了图存储的两种方式:点分割存储和边分割存储。GraphX借鉴powerGraph,使用的是点分割方式存储图。这种存储方式特点是任何一条边只会出现在一台机器上,每个点有可能分布到不同的机器上。 当点被分割到不同机器上时,是相同的镜像,但是有一个点作为主点,其他的点作为虚点,当点的数据发生变化时,先更新主点的数据,然后将所有更新好的数据发送到虚点所在的所有机器,更新虚点。 这样做的好处是在边的存储上是没有冗余的,而且对于某个点与它的邻居的交互操作,只要满足交换律和结合律,就可以在不同的机器上面执行,网络开销较小。但是这种分割方式会存储多份点数据,更新点时, 会发生网络传输,并且有可能出现同步问题。

      GraphX在进行图分割时,有几种不同的分区(partition)策略,它通过PartitionStrategy专门定义这些策略。在PartitionStrategy中,总共定义了EdgePartition2DEdgePartition1DRandomVertexCut以及 CanonicalRandomVertexCut这四种不同的分区策略。下面分别介绍这几种策略。

    1 RandomVertexCut

    case object RandomVertexCut extends PartitionStrategy {
        override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
          math.abs((src, dst).hashCode()) % numParts
        }
      }
    

      这个方法比较简单,通过取源顶点和目标顶点id的哈希值来将边分配到不同的分区。这个方法会产生一个随机的边分割,两个顶点之间相同方向的边会分配到同一个分区。

    2 CanonicalRandomVertexCut

    case object CanonicalRandomVertexCut extends PartitionStrategy {
        override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
          if (src < dst) {
            math.abs((src, dst).hashCode()) % numParts
          } else {
            math.abs((dst, src).hashCode()) % numParts
          }
        }
      }
    

      这种分割方法和前一种方法没有本质的不同。不同的是,哈希值的产生带有确定的方向(即两个顶点中较小id的顶点在前)。两个顶点之间所有的边都会分配到同一个分区,而不管方向如何。

    3 EdgePartition1D

    case object EdgePartition1D extends PartitionStrategy {
        override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
          val mixingPrime: VertexId = 1125899906842597L
          (math.abs(src * mixingPrime) % numParts).toInt
        }
      }
    

      这种方法仅仅根据源顶点id来将边分配到不同的分区。有相同源顶点的边会分配到同一分区。

    4 EdgePartition2D

    case object EdgePartition2D extends PartitionStrategy {
        override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
          val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
          val mixingPrime: VertexId = 1125899906842597L
          if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) {
            // Use old method for perfect squared to ensure we get same results
            val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
            val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
            (col * ceilSqrtNumParts + row) % numParts
          } else {
            // Otherwise use new method
            val cols = ceilSqrtNumParts
            val rows = (numParts + cols - 1) / cols
            val lastColRows = numParts - rows * (cols - 1)
            val col = (math.abs(src * mixingPrime) % numParts / rows).toInt
            val row = (math.abs(dst * mixingPrime) % (if (col < cols - 1) rows else lastColRows)).toInt
            col * rows + row
          }
        }
      }
    

      这种分割方法同时使用到了源顶点id和目的顶点id。它使用稀疏边连接矩阵的2维区分来将边分配到不同的分区从而保证顶点的备份数不大于2 * sqrt(numParts)的限制。这里numParts表示分区数。 这个方法的实现分两种情况,即分区数能完全开方和不能完全开方两种情况。当分区数能完全开方时,采用下面的方法:

     val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
     val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
     (col * ceilSqrtNumParts + row) % numParts
    

      当分区数不能完全开方时,采用下面的方法。这个方法的最后一列允许拥有不同的行数。

    val cols = ceilSqrtNumParts
    val rows = (numParts + cols - 1) / cols
    //最后一列允许不同的行数
    val lastColRows = numParts - rows * (cols - 1)
    val col = (math.abs(src * mixingPrime) % numParts / rows).toInt
    val row = (math.abs(dst * mixingPrime) % (if (col < cols - 1) rows else lastColRows)).toInt
    col * rows + row
    

      下面举个例子来说明该方法。假设我们有一个拥有12个顶点的图,要把它切分到9台机器。我们可以用下面的稀疏矩阵来表示:

              __________________________________
         v0   | P0 *     | P1       | P2    *  |
         v1   |  ****    |  *       |          |
         v2   |  ******* |      **  |  ****    |
         v3   |  *****   |  *  *    |       *  |
              ----------------------------------
         v4   | P3 *     | P4 ***   | P5 **  * |
         v5   |  *  *    |  *       |          |
         v6   |       *  |      **  |  ****    |
         v7   |  * * *   |  *  *    |       *  |
              ----------------------------------
         v8   | P6   *   | P7    *  | P8  *   *|
         v9   |     *    |  *    *  |          |
         v10  |       *  |      **  |  *  *    |
         v11  | * <-E    |  ***     |       ** |
              ----------------------------------
    

      上面的例子中*表示分配到处理器上的边。E表示连接顶点v11v1的边,它被分配到了处理器P6上。为了获得边所在的处理器,我们将矩阵切分为sqrt(numParts) * sqrt(numParts)块。 注意,上图中与顶点v11相连接的边只出现在第一列的块(P0,P3,P6)或者最后一行的块(P6,P7,P8)中,这保证了V11的副本数不会超过2 * sqrt(numParts)份,在上例中即副本不能超过6份。

      在上面的例子中,P0里面存在很多边,这会造成工作的不均衡。为了提高均衡,我们首先用顶点id乘以一个大的素数,然后再shuffle顶点的位置。乘以一个大的素数本质上不能解决不平衡的问题,只是减少了不平衡的情况发生。

    5 参考文献

    【1】spark源码

     
       
    https://www.cnblogs.com/shishanyuan/p/4747793.html  

    2.2.2.2 邻边聚合

    mrTriplets(mapReduceTriplets)是GraphX中最核心的一个接口。Pregel也基于它而来,所以对它的优化能很大程度上影响整个GraphX的性能。mrTriplets运算符的简化定义是:

     

    它的计算过程为:map,应用于每一个Triplet上,生成一个或者多个消息,消息以Triplet关联的两个顶点中的任意一个或两个为目标顶点;reduce,应用于每一个Vertex上,将发送给每一个顶点的消息合并起来。

    mrTriplets最后返回的是一个VertexRDD[A],包含每一个顶点聚合之后的消息(类型为A),没有接收到消息的顶点不会包含在返回的VertexRDD中。

    在最近的版本中,GraphX针对它进行了一些优化,对于Pregel以及所有上层算法工具包的性能都有重大影响。主要包括以下几点。

    1. Caching for Iterative mrTriplets & Incremental Updates for Iterative mrTriplets:在很多图分析算法中,不同点的收敛速度变化很大。在迭代后期,只有很少的点会有更新。因此,对于没有更新的点,下一次mrTriplets计算时EdgeRDD无需更新相应点值的本地缓存,大幅降低了通信开销。

    2.Indexing Active Edges:没有更新的顶点在下一轮迭代时不需要向邻居重新发送消息。因此,mrTriplets遍历边时,如果一条边的邻居点值在上一轮迭代时没有更新,则直接跳过,避免了大量无用的计算和通信。

    3.Join Elimination:Triplet是由一条边和其两个邻居点组成的三元组,操作Triplet的map函数常常只需访问其两个邻居点值中的一个。例如,在PageRank计算中,一个点值的更新只与其源顶点的值有关,而与其所指向的目的顶点的值无关。那么在mrTriplets计算中,就不需要VertexRDD和EdgeRDD的3-way join,而只需要2-way join。

    所有这些优化使GraphX的性能逐渐逼近GraphLab。虽然还有一定差距,但一体化的流水线服务和丰富的编程接口,可以弥补性能的微小差距。

    2.2.2.3 进化的Pregel模式

    GraphX中的Pregel接口,并不严格遵循Pregel模式,它是一个参考GAS改进的Pregel模式。定义如下:

     

    这种基于mrTrilets方法的Pregel模式,与标准Pregel的最大区别是,它的第2段参数体接收的是3个函数参数,而不接收messageList。它不会在单个顶点上进行消息遍历,而是将顶点的多个Ghost副本收到的消息聚合后,发送给Master副本,再使用vprog函数来更新点值。消息的接收和发送都被自动并行化处理,无需担心超级节点的问题。

    常见的代码模板如下所示:

    可以看到,GraphX设计这个模式的用意。它综合了Pregel和GAS两者的优点,即接口相对简单,又保证性能,可以应对点分割的图存储模式,胜任符合幂律分布的自然图的大型计算。另外,值得注意的是,官方的Pregel版本是最简单的一个版本。对于复杂的业务场景,根据这个版本扩展一个定制的Pregel是很常见的做法。

     

    3 triplets

      在GraphX中,triplets对应着EdgeTriplet。它是一个三元组视图,这个视图逻辑上将顶点和边的属性保存为一个RDD[EdgeTriplet[VD, ED]]。可以通过下面的Sql表达式表示这个三元视图的含义:

    SELECT src.id, dst.id, src.attr, e.attr, dst.attr
    FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
    ON e.srcId = src.Id AND e.dstId = dst.Id
    

      同样,也可以通过下面图解的形式来表示它的含义:

    3.1

      EdgeTriplet的源代码如下所示:

    class EdgeTriplet[VD, ED] extends Edge[ED] {
      //源顶点属性
      var srcAttr: VD = _ // nullValue[VD]
      //目标顶点属性
      var dstAttr: VD = _ // nullValue[VD]
      protected[spark] def set(other: Edge[ED]): EdgeTriplet[VD, ED] = {
        srcId = other.srcId
        dstId = other.dstId
        attr = other.attr
        this
      }
    

      EdgeTriplet类继承自Edge类,我们来看看这个父类:

    @specialized 是为了解决由于泛型擦除而需要对基础数据类型进行装箱/拆箱操作所带来的性能问题

    case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] (
        var srcId: VertexId = 0,
        var dstId: VertexId = 0,
        var attr: ED = null.asInstanceOf[ED])
      extends Serializable
    

      Edge类中包含源顶点id,目标顶点id以及边的属性。所以从源代码中我们可以知道,triplets既包含了边属性也包含了源顶点的id和属性、目标顶点的id和属性

    4 参考文献

    【1】spark源码

     

     

     

     

     

     
  • 相关阅读:
    Uva10305(dfs)
    Uva572
    Uva122
    Uva679
    Uva136
    Uva489
    Uva133
    Uva1339
    Uva1588
    《世纪的哭泣》读后感 读书笔记
  • 原文地址:https://www.cnblogs.com/cx2016/p/12986748.html
Copyright © 2011-2022 走看看