zoukankan      html  css  js  c++  java
  • 学习进度笔记

    学习进度笔记31

    PageRank 演示

    import org.apache.log4j.{Level, Logger}

    import org.apache.spark.{SparkContext, SparkConf}

    import org.apache.spark.graphx._

    import org.apache.spark.rdd.RDD

    object PageRank {

      def main(args: Array[String]) {

        //屏蔽日志

        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

        //设置运行环境

        val conf = new SparkConf().setAppName("PageRank").setMaster("local")

        val sc = new SparkContext(conf)

        //读入数据文件

        val articles: RDD[String] = sc.textFile("/home/hadoop/IdeaProjects/data/graphx/graphx-wiki-vertices.txt")

        val links: RDD[String] = sc.textFile("/home/hadoop/IdeaProjects/data/graphx/graphx-wiki-edges.txt")

        //装载顶点和边

        val vertices = articles.map { line =>

          val fields = line.split(' ')

          (fields(0).toLong, fields(1))

        }

        val edges = links.map { line =>

          val fields = line.split(' ')

          Edge(fields(0).toLong, fields(1).toLong, 0)

        }

        //cache操作

        //val graph = Graph(vertices, edges, "").persist(StorageLevel.MEMORY_ONLY_SER)

        val graph = Graph(vertices, edges, "").persist()

        //graph.unpersistVertices(false)

        //测试

        println("**********************************************************")

        println("获取5个triplet信息")

        println("**********************************************************")

        graph.triplets.take(5).foreach(println(_))

        //pageRank算法里面的时候使用了cache(),故前面persist的时候只能使用MEMORY_ONLY

        println("**********************************************************")

        println("PageRank计算,获取最有价值的数据")

        println("**********************************************************")

        val prGraph = graph.pageRank(0.001).cache()

        val titleAndPrGraph = graph.outerJoinVertices(prGraph.vertices) {

          (v, title, rank) => (rank.getOrElse(0.0), title)

        }

        titleAndPrGraph.vertices.top(10) {

          Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1)

        }.foreach(t => println(t._2._2 + ": " + t._2._1))

        sc.stop()

      }

    }

  • 相关阅读:
    MySQL体系结构
    Java线程池ThreadPoolExecuter:execute()原理
    Java Thread 如何处理未捕获的异常?
    SSL/TSL握手过程详解
    LockSupport HotSpot里park/unpark的实现
    JAVA 对象内存结构
    JAVA 线程状态转换
    Spring源码解析(四)Bean的实例化和依赖注入
    Spring源码解析(五)循环依赖问题
    Spring源码解析(三)BeanDefinition的载入、解析和注册
  • 原文地址:https://www.cnblogs.com/xueqiuxiang/p/14467014.html
Copyright © 2011-2022 走看看