zoukankan      html  css  js  c++  java
  • SparkGraphXTest.scala

    /**
     * Created by root on 9/8/15.
     */
    import org.apache.spark._
    import org.apache.spark.graphx._
    import org.apache.spark.rdd.RDD
    
    object SparkGraphXTest {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("graphx app").setMaster("local")
        val sc = new SparkContext(conf)
        val users: RDD[(VertexId, (String, String))] = sc.parallelize(
          Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
        val relationships: RDD[Edge[String]] = sc.parallelize(
          Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
        val defaultUser = ("John Doe", "Missing")
        val graph = Graph(users, relationships, defaultUser)
        val count1 = graph.vertices.filter{ case (id, (name, pos)) => pos == "postdoc" }.count()
        val count2 = graph.edges.filter(e => e.srcId > e.dstId).count()
        val count3 = graph.edges.filter{ case Edge(src, dst, prop) => src > dst }.count()
        println(count1)
        println(count2)
        println(count3)
        val facts: RDD[String] = graph.triplets.map(triplet =>
          triplet.srcAttr._2 + " is the " + triplet.attr + " of " +triplet.dstAttr._2)
        facts.collect().foreach(println(_))
    
        val users2: RDD[(VertexId, (String, String ,String))] = sc.parallelize(
          Array((3L, ("rxin", "student", "20")), (7L, ("jgonzal", "postdoc", "22")), (5L, ("franklin", "prof", "24")), (2L, ("istoica", "prof", "26"))))
        val relationships2: RDD[Edge[String]] = sc.parallelize(
          Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
        val defaultUser2 = ("Amy Sun", "aaa", "18")
        val graph2 = Graph(users2, relationships2, defaultUser2)
        val facts2: RDD[String] = graph2.triplets.map(triplet =>
          triplet.srcAttr.toString() + " is the " + triplet.attr + " of " +triplet.dstAttr.toString())
        facts2.collect().foreach(println(_))
      }
    }
  • 相关阅读:
    简单排序算法
    线程池第二篇:线程池相关类
    枚举的使用
    线程池第一篇:线程池相关接口
    mysql InnoDB引擎与MyISAM引擎区别及索引原理
    待重写
    mysql登录命令
    浅拷贝、深拷贝
    grpc:What is gRPC
    protocol buffer第一篇:语法介绍
  • 原文地址:https://www.cnblogs.com/sunflower627/p/4795435.html
Copyright © 2011-2022 走看看