zoukankan      html  css  js  c++  java
  • graph小案例

    (小案例,有五个人他们参见相亲节目,这个五个人分别是0,1,2,3,4,号选手,计算出追随者年龄大于被追随者年龄的人数和平均年龄)
    scala> import org.apache.spark._
    import org.apache.spark._
    scala> import org.apache.spark.rdd.RDD
    import org.apache.spark.rdd.RDD
    scala> import org.apache.spark.graphx._
    import org.apache.spark.graphx._
    scala> import org.apache.spark.graphx.util._
    import org.apache.spark.graphx.util._
     
    创建graph
    scala> var graphs = 
    GraphGenerators.logNormalGraph(sc, numVertices = 5).mapVertices( (id, _) => id.toDouble )
    graphs: org.apache.spark.graphx.Graph[Double,Int] = org.apache.spark.graphx.impl.GraphImpl@1461f52d
    GraphGenerators.logNormalGraph://生成一个图的顶点的出度分布是日志正常。
    numVertices:生成5个顶点,也就是五个人
    查看生成的graph
    vertices
    VertexId, Double)] = Array((4,4.0), (0,0.0), (1,1.0), (3,3.0), (2,2.0))
    srcid attr srcid:选手编号 attr:年龄
    0 0
    1 1
    2 2
    3 3
    4 4
    edges
    Array(Edge(0,1,1), Edge(0,1,1), Edge(0,3,1), Edge(0,4,1), Edge(1,2,1), Edge(1,4,1), Edge(2,0,1), Edge(2,0,1), Edge(2,3,1), Edge(2,4,1), Edge(3,3,1), Edge(4,0,1), Edge(4,3,1))
    srcid dstid attr srcid:追随者 dstid:被追随者 attr:年龄
    0 1 1
    0 1 1
    0 3 1
    0    4    1
    1    2    1
    1    4    1
    2 0    1
    2 0    1
    2 3    1
    2 4    1
    1 3    1
    4 0    1
    4 3    1
    获取srcid>dstid的数据(把追随者年龄大于被追随者年龄的人,发送给被追随者)
    scala> val olderFollowers: VertexRDD[(Int, Double)] = graphs.aggregateMessages[(Int, Double)](
    | triplet => { // Map Function
    | if (triplet.srcAttr > triplet.dstAttr) {
    | // Send message to destination vertex containing counter and age
    | triplet.sendToDst(1, triplet.srcAttr)
    | }
    | },
    | // Add counter and age
    | (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
    | )
    获取graphs的triplets,过滤出来srcAttr>srcDst的数据,并将结果发送给DstId
     triplet.sendToDst(1, triplet.srcAttr):中的1是一个计数器,
    查看数据
    res3: Array[(org.apache.spark.graphx.VertexId, (Int, Double))] = Array((0,(3,8.0)), (3,(1,4.0)))
    (0,(3,8.0):
    0是srcId
    3是srcId的入度
    8.0是attr的和
    计算出平均年龄
    scala> val avgAgeOfOlderFollowers: VertexRDD[Double] =
    | olderFollowers.mapValues( (id, value) =>
    | value match { case (count, totalAge) => totalAge / count } )
    avgAgeOfOlderFollowers: org.apache.spark.graphx.VertexRDD[Double] = VertexRDDImpl[48] at RDD at VertexRDD.scala:57
    olderFollowers.mapValues:只对value进行操作
    count是计数后的值
    tatalAge:是attr的和
  • 相关阅读:
    java程序后台报错java.net.SocketException: Too many open files
    linux中,查看某个命令是来自哪个RPM包或者是通过哪个RPM包安装的
    Oracle卸载之linux快速卸载rac脚本-一键卸载
    40个DBA日常维护的SQL脚本
    Oracle SQL开发 之 Select语句完整的执行顺序
    Oracle开发 之 主-外键约束FK及约束的修改
    drop user 报错ora-00604
    oracle Bug 4287115(ora-12083)
    Ora-1157 ora-1110错误解决案例一枚
    rac库grid目录权限(6751)导致数据库宕机案例 此方法仅用于紧急救助
  • 原文地址:https://www.cnblogs.com/zhangXingSheng/p/6606980.html
Copyright © 2011-2022 走看看