zoukankan      html  css  js  c++  java
  • 第一个GraphX程序

       程序功能:收集顶点指向的邻居中所在地

    /*
     * 找出每一个顶点所指向的邻居中所在的地区
     */
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    import scala.collection.mutable.Map
    import org.apache.spark._
    import org.apache.spark.graphx._
    import org.apache.spark.rdd.RDD
    
    object testApp{
        def main(args:Array[String]){
            val conf = new SparkConf().setAppName("testApp")
            val sc = new SparkContext(conf)
    
            val graph = GraphLoader.edgeListFile(sc, "/home/spark/spark/graphx/data/followers.txt")//载入边时顶点是边上出现的点
    
            val users = sc.textFile("/home/spark/spark/graphx/data/users.txt").map { line =>
                val fields = line.split(",")
                (fields(0).toLong,(fields(1),fields(2)))//解析顶点数据:ID(一定转成Long型),姓名,地区
            }
    
            val myGraph=Graph.apply(users,graph.edges)//重构图,顶点数据以users为准
    
            val vertices=myGraph.mapReduceTriplets[Map[String,Int]](//收集每一个定点指向的邻居所在的地区
                triplet=>Iterator((triplet.srcId,Map[String,Int](triplet.dstAttr._2->1))),//Map function单向发送消息给有向边的源顶点
                (a,b)=>{//Reduce function汇集消息
                    var myMap=Map[String,Int]()
                    for((k,v)<-a){
                        if(b.contains(k))
                        {
                            var t=a(k)+b(k)
                            myMap+=(k->t)
                        }
                        else
                            myMap+=(k->a(k))
                    }
                    myMap //返回汇集的结果
                }
            )
    
            vertices.collect.foreach(a=>print(a+"
    "))//打印收集的邻居所在地
        }
    }
    


    users.txt顶点数据:ID,姓名。地区

    1,BarackObama,American
    2,ladygaga,American
    3,John,American
    4,xiaoming,Beijing
    6,Hanmeimei,Beijing
    7,Polly,American
    8,Tom,American


    followers.txt边数据:仅仅有源顶点和目标顶点,中间以空格隔开,多余的列无用,如:2 1 other 有3列数据,可是graphx仅仅会读取前两列

    2 1
    4 1
    1 2
    6 3
    7 3
    7 6
    6 7
    3 7

    结果:

    (4,Map(American -> 1))
    (6,Map(American -> 2))
    (2,Map(American -> 1))
    (1,Map(American -> 1))
    (3,Map(American -> 1))
    (7,Map(American -> 1))


    project文件夹结构:

    ./test.sbt
    ./src
    ./src/main
    ./src/main/scala
    ./src/main/scala/testApp.scala


    test.sbt内容:

    name := "test Project"
    
    version := "1.0"
    
    scalaVersion := "2.10.4"
    
    libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.1"
    
    libraryDependencies += "org.apache.spark" %% "spark-graphx" %"1.0.1"
    
    resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
    



    命令行运行例如以下进行编译:

    sbt package



    命令行运行例如以下向集群提交(开启了集群为前提),这里我的project建在spark安装文件夹下apps/testApp下,因此以下开头是../../

     ../../bin/spark-submit --class "testApp" --master local[4] target/scala-2.10/test-project_2.10-1.0.jar


      这个程序有个潜在的bug,就是Graph.apply函数另一个语义是:若多个顶点ID反复则随意选择一个顶点,若边中edges上的两个顶点有不在users中时将以默认的属性初始化该顶点,

    val myGraph=Graph.apply(users,graph.edges)//重构图。顶点数据以users为准
       若在兴许对图中进行操作时会发现顶点数据格式有可能不一致的情形,部分顶点在edges中存在而在users中不存在时是以默认值作为顶点数据的。而这里顶点数据是个元组(name,location),spark以null作为默认值。可能在操作图时候出现 java.lang.NullPointerException 

      解决的方法有两个:

    1  加入顶点的默认数据

    val defaultAttr=("null","null")
    val myGraph=Graph.apply(users,graph.edges,defaultAttr)

    2  去除那些顶点数据为null的节点和边

    val myGraph=Graph.apply(users,graph.edges)//重构图。顶点数据以users为准
    val newGraph=myGraph.subgraph(triplet=>triplet.srcAttr!=null && triplet.dstAttr!=null,(id,attr)=>attr!=null)//取顶点数据非null的子图


  • 相关阅读:
    其他内容
    html标签
    ambari安装集群下安装kafka manager
    greenplum-cc-web4.0监控安装
    ambari安装集群下python连接hbase之安装thrift
    hadoop运维问题记录
    Ambari2.6.0 安装HDP2.6.3(离线安装)
    mongodb 定时备份
    linux top命令详解
    Sublime Text3配置Python环境
  • 原文地址:https://www.cnblogs.com/lxjshuju/p/6927748.html
Copyright © 2011-2022 走看看