zoukankan      html  css  js  c++  java
  • spark图操作

    spark graphx图操作

    import org.apache.spark.graphx.{Edge, Graph, VertexId}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    object PropertiesGraph {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("graph").setMaster("local[*]")
        val sc = new SparkContext(conf)
        sc.setLogLevel("OFF")
        //定义点
        val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array(
          (3L, ("hanmeimei", "student")),
          (7L, ("Lilei", "postdoc")),
          (5L, ("zixuan", "prof")),
          (2L, ("haoran", "prof"))
        )
        )
        //定义边
        val relationships: RDD[(Edge[String])] = sc.parallelize(
          Array(
            Edge(3L, 7L, "cooperate"),
            Edge(5L, 7L, "advisor"),
            Edge(2L, 5L, "colleague")
          )
        )
        val defaultUser = ("Jack ma", "defaultUser")
        val graph = Graph(users, relationships, defaultUser)
        //找到图中属性是student的点
        graph.vertices.filter{case (id,(name, work))=> work=="student"}
          .collect().foreach{case (id,(name, work))=> println(s"${name} is ${work}") }
    
        //找到图中属性为advisor的边
        graph.edges.filter(x=> x.attr=="advisor")
          .collect().foreach(x=>println(s"${x.srcId} to ${x.dstId} 属性为 ${x.attr}"))
    
        //出度入度
        def max(a:(VertexId,Int),b:(VertexId,Int)) : (VertexId,Int)={
          if(a._2>b._2) a else b
        }
        println("最大的出度" + graph.outDegrees.reduce(max))
        println("最大的入度" + graph.inDegrees.reduce(max))
        println("最大的度" + graph.degrees.reduce(max))
        // 给每个点的职业属性上加一个字符串
        graph.mapVertices{case (id, (name,work))=>(id,(name, work+"spark"))}
          .vertices.collect().foreach(println)
        //triplet
        //给图中每个元组的edge属性值设置为源的属性值+边的属性值+目标点的属性值
        graph.mapTriplets((x=>x.srcAttr._2 + "+" + x.attr + "+" + x.dstAttr._2))
          .edges.collect().foreach(println)
        graph.triplets.map(x=>x.srcAttr._1 + "is the" + x.attr + "of" + x.dstAttr._1).foreach(println)
        //删除节点,构建子图
        val validGraph = graph.subgraph(vpred=(id, attr) => attr._2 != "postdoc")
        validGraph.vertices.foreach(println)
        validGraph.triplets.map(x=>x.srcAttr._1 + "is the" + x.attr + "of" + x.dstAttr._1).foreach(println)
        //构建职业为professor的子图
        val subGraph = graph.subgraph(vpred = (id,attr)=>attr._2=="professor")
        subGraph.vertices.collect().foreach(x=>println(s"${x._2._1} is ${x._2._2}"))
      }
    }

    spark连接neo4j操作

    import org.apache.spark.SparkConf
    import org.apache.spark.api.java.JavaSparkContext
    import org.apache.spark.graphx.Graph
    import org.apache.spark.ml.tree.InternalNode
    import org.neo4j.spark.Neo4j
    
    object SparkGraphxConnector {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
          .setAppName("neo4j")
          .setMaster("local[*]")
          .set("spark.neo4j.bolt.url","bolt://192.168.1.21")
          .set("spark.neo4j.bolt.user","neo4j")
          .set("spark.neo4j.bolt.password","123")
    
        val sc = new JavaSparkContext(conf)
        sc.setLogLevel("OFF")
        val neo4j = Neo4j(sc)
        val rdd = neo4j.cypher("match (n:Person) return n").loadRowRdd
        val personRDD = rdd.map(row =>{
          val map = row.get(0).asInstanceOf[InternalNode]
          new Person(map.get("home").asString(),
            map.get("name").asString(),
            map.get("personId").asString())
        })
        personRDD.foreach(println)
        val graphQuery = "match (p:Person) -[r]-(a:Person) return id(p) as source,id(a) as target, type(r) as value"
        val graph: Graph[String,String] = neo4j.rels(graphQuery).loadGraph
        graph.edges.foreach(println(_))
      }
      case class Person(
                       val home:String,
                       val name:String,
                       val person: String
                       )
    }
    

    pageRank

    import org.apache.spark.SparkConf
    import org.apache.spark.api.java.JavaSparkContext
    import org.neo4j.spark.Neo4j
    
    object pageRank {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
          .setAppName("neo4j")
          .setMaster("local[*]")
          .set("spark.neo4j.bolt.url","bolt://192.168.1.21")
          .set("spark.neo4j.bolt.user","neo4j")
          .set("spark.neo4j.bolt.password","123")
    
        val sc = new JavaSparkContext(conf)
        sc.setLogLevel("OFF")
        val neo4j = Neo4j(sc)
        val graphFrame = neo4j.pattern(("Page","value"),("TO","null"),("Page","value"))
          .partitions(partitions = 1).rows(rows = 100).loadGraphFrame//只有用loadGraphFrame才可以直接使用pagerank
        val pageRankFrame = graphFrame.pageRank.maxIter(value = 100).run()
        val ranked = pageRankFrame.vertices
        ranked.foreach(println(_))
      }
    }

      

  • 相关阅读:
    21.满足条件的01序列 卡特兰数
    20.求组合数 IV
    19.求组合数 III
    18.求组合数 II
    17.求组合数 I
    14.表达整数的奇怪方式 中国剩余定理 --------待复习标志--------
    16.高斯消元解异或线性方程组
    15.高斯消元解线性方程组
    writing: improvised lecture
    writing: a lesson about coronavirus epidemic
  • 原文地址:https://www.cnblogs.com/fionacai/p/12245988.html
Copyright © 2011-2022 走看看