学习进度笔记31
PageRank 演示
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
object PageRank {
def main(args: Array[String]) {
//屏蔽日志
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//设置运行环境
val conf = new SparkConf().setAppName("PageRank").setMaster("local")
val sc = new SparkContext(conf)
//读入数据文件
val articles: RDD[String] = sc.textFile("/home/hadoop/IdeaProjects/data/graphx/graphx-wiki-vertices.txt")
val links: RDD[String] = sc.textFile("/home/hadoop/IdeaProjects/data/graphx/graphx-wiki-edges.txt")
//装载顶点和边
val vertices = articles.map { line =>
val fields = line.split(' ')
(fields(0).toLong, fields(1))
}
val edges = links.map { line =>
val fields = line.split(' ')
Edge(fields(0).toLong, fields(1).toLong, 0)
}
//cache操作
//val graph = Graph(vertices, edges, "").persist(StorageLevel.MEMORY_ONLY_SER)
val graph = Graph(vertices, edges, "").persist()
//graph.unpersistVertices(false)
//测试
println("**********************************************************")
println("获取5个triplet信息")
println("**********************************************************")
graph.triplets.take(5).foreach(println(_))
//pageRank算法里面的时候使用了cache(),故前面persist的时候只能使用MEMORY_ONLY
println("**********************************************************")
println("PageRank计算,获取最有价值的数据")
println("**********************************************************")
val prGraph = graph.pageRank(0.001).cache()
val titleAndPrGraph = graph.outerJoinVertices(prGraph.vertices) {
(v, title, rank) => (rank.getOrElse(0.0), title)
}
titleAndPrGraph.vertices.top(10) {
Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1)
}.foreach(t => println(t._2._2 + ": " + t._2._1))
sc.stop()
}
}