zoukankan      html  css  js  c++  java
  • Graphx二度关系

    import org.apache.spark.graphx._
    import org.apache.spark.rdd.RDD
    val vertexArray = Array(
    (1L, ("Alice", 28)),
    (2L, ("Bob", 27)),
    (3L, ("Charlie", 65)),
    (4L, ("David", 42)),
    (5L, ("Ed", 55)),
    (6L, ("Fran", 50)),
    (7L, ("Tian", 55))
    )
    //边的数据类型ED:Int
    val edgeArray = Array(
    Edge(2L, 1L, 7),
    Edge(2L, 4L, 2),
    Edge(3L, 2L, 4),
    Edge(3L, 6L, 3),
    Edge(4L, 1L, 1),
    Edge(5L, 2L, 2),
    Edge(5L, 3L, 8),
    Edge(5L, 6L, 3),
    Edge(7L, 3L, 8),
    Edge(7L, 6L, 3),
    Edge(7L, 6L, 6)
    )
    val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)
    val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
    val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)

    scala> import scala.collection.immutable.HashSet
    import scala.collection.immutable.HashSet

    获取顶点3的首度邻居

    scala> val firstNeighbor=graph.aggregateMessages[Int](triplet=>{if(triplet.srcId==3) triplet.sendToDst(1)},(a,b)=>a )
    firstNeighbor: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[21] at RDD at VertexRDD.scala:57

    scala> var fistids=new HashSet[Long]()
    fistids: scala.collection.immutable.HashSet[Long] = Set()

    scala> firstNeighbor.collect.foreach(a=>fistids+=a._1)

    顶点3的首度邻居列表如下
    scala> fistids
    res1: scala.collection.immutable.HashSet[Long] = Set(6, 2)

    边信息如下:

    Edge(3L, 2L, 4),
    Edge(3L, 6L, 3),

    可以确定首度邻居为2,6

    对顶点3的首度邻居进行循环获取二度邻居的顶点ID列表

    scala> var secondids=new HashSet[Long]()
    secondids: scala.collection.immutable.HashSet[Long] = Set()

    scala> fistids.foreach(a=>{val secondneibors=graph.aggregateMessages[Int](trp=>{if(trp.srcId==a) trp.sendToDst(1)},(a,b)=>a);secondneibors.collect.foreach(a=>{secondids+=a._1})})

    scala> secondids
    res7: scala.collection.immutable.HashSet[Long] = Set(1, 4)

    根据边可以看出,6没有出度

    2的出度为1和4

    获取二度邻居的信息

    scala> graph.vertices.filter(a=>secondids.contains(a._1)).collect
    res8: Array[(org.apache.spark.graphx.VertexId, (String, Int))] = Array((1,(Alice,28)), (4,(David,42)))

    获取一度二度关系

    scala> graph.vertices.filter(a=>secondids.contains(a._1) || fistids.contains(a._1)).collect
    res10: Array[(org.apache.spark.graphx.VertexId, (String, Int))] = Array((1,(Alice,28)), (2,(Bob,27)), (4,(David,42)), (6,(Fran,50)))

    ---------------------------------------- 

     顶点2的入度出度邻居

    scala> graph.collectNeighbors(EdgeDirection.Either).filter(_._1==2).map(_._2).collect
    res15: Array[Array[(org.apache.spark.graphx.VertexId, (String, Int))]] = Array(Array((1,(Alice,28)), (4,(David,42)), (3,(Charlie,65)), (5,(Ed,55))))

    顶点2的入度邻居

    scala> graph.collectNeighbors(EdgeDirection.In).filter(_._1==2).map(_._2).collect
    res16: Array[Array[(org.apache.spark.graphx.VertexId, (String, Int))]] = Array(Array((3,(Charlie,65)), (5,(Ed,55))))

     顶点2的出度邻居

    scala> graph.collectNeighbors(EdgeDirection.Out).filter(_._1==2).map(_._2).collect
    res17: Array[Array[(org.apache.spark.graphx.VertexId, (String, Int))]] = Array(Array((1,(Alice,28)), (4,(David,42))))

     ------------------------------------------------------

    其他方式 获取二度出度邻居

    scala> val firstNids=graph.collectNeighborIds(EdgeDirection.Out).filter(_._1==3).map(a=>a._2).first
    firstNids: Array[org.apache.spark.graphx.VertexId] = Array(2, 6)

    scala> graph.collectNeighbors(EdgeDirection.Out).filter(a=>firstNids.contains(a._1)).map(_._2).filter(_.length>0).first
    res116: Array[(org.apache.spark.graphx.VertexId, (String, Int))] = Array((1,(Alice,28)), (4,(David,42)))

     --------------------------------------------------

    获取双向二度邻居

    获取顶点3的邻居ID列表

    scala> val firstNids=graph.collectNeighborIds(EdgeDirection.Either).filter(_._1==3).map(a=>a._2).first
    firstNids: Array[org.apache.spark.graphx.VertexId] = Array(2, 6, 5, 7)

    获取首度邻居顶点的邻居,生成一个多维数组,使用reduce 合并多维数组为一维数组并去重,然后将顶点3以及顶点3的邻居过滤掉获取二度邻居

    scala> graph.collectNeighbors(EdgeDirection.Either).filter(a=>firstNids.contains(a._1)).map(_._2).reduce((a,b)=>a++b).distinct.filter(a=>{!firstNids.contains(a._1) && a._1!=3 })
    res155: Array[(org.apache.spark.graphx.VertexId, (String, Int))] = Array((1,(Alice,28)), (4,(David,42)))

  • 相关阅读:
    PHP实现用户在线状态检测
    php面试题汇集2
    php 调用银联接口 【转载】
    【基础算法】基础算法【转载】
    下ue节点
    Python 字典 列表 嵌套 复杂排序大全
    Linux IO 监控与深入分析
    ELK之kibana的web报错[request] Data too large, data for [<agg [2]>] would be larger than limit of
    Elasticsearch聚合优化 | 聚合速度提升5倍
    elasticsearch bulk批量导入 大文件拆分
  • 原文地址:https://www.cnblogs.com/playforever/p/14958867.html
Copyright © 2011-2022 走看看