zoukankan      html  css  js  c++  java
  • spark PageRank

    import java.io.{File, PrintWriter}
    import java.util
    import java.util.regex.Pattern
    
    import org.apache.spark.graphx.GraphLoader
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.collection.mutable
    import scala.io.Source
    
    
    object PageRankTest {
    
      def main(args: Array[String]): Unit = {
        val masterUrl = "local[2]"
        val appName = "PageRank_test"
        val sparkConf = new SparkConf().setMaster(masterUrl).setAppName(appName)
        val sc = new SparkContext(sparkConf)
        sc.setLogLevel("ERROR")
    
        //原始数据文件
        val urlSourceFilePath="Peanut/httpFile.txt"
        //预处理后要写入的数据集1 : 边
        val urlFollowerFilePath="Peanut/urlFollower.txt"
        //预处理后要写入的数据集2 : 点
        val urlPointFilePath="Peanut/urlPoint.txt"
        //结果文件
        val PageRankResultPath="Peanut/PageRankResult.txt"
    
        
        // 每个url 对应一个数值(int)
        val urlToIntMap = mutable.Map[String,Int]()
        var count:Int = 1
    
        // 将原始数据中url映射成一个int数值
        val sourceFile=Source.fromFile(urlSourceFilePath)
        for(line <- sourceFile.getLines){
          val list = evaluate(line)
          list.map(x=>{
            if (!urlToIntMap.contains(x)) {
              urlToIntMap.put(x,count)
              count += 1
            }
          })
        }
    
        // 写 点数据集: point.txt
        val writerFollower = new PrintWriter(new File(urlPointFilePath))
        urlToIntMap.foreach(x=>{
          val writeContain = x._2 + "	" + x._1 + "
    "
          writerFollower.write(writeContain)
        })
        writerFollower.close()
    
    
        //写 边数据集: follower.txt
        val sourceFile2=Source.fromFile(urlSourceFilePath)
        val writerPoint = new PrintWriter(new File(urlFollowerFilePath))
        for(line <- sourceFile2.getLines) {
          val list = evaluate(line).toList
          val firstUrl = list.head
          val firstUrlNum = urlToIntMap(firstUrl)
          val otherUrlList = list.tail
          otherUrlList.foreach(x=>{
            val writeNum = urlToIntMap(x)
            val writeString = firstUrlNum + "	" + writeNum + "
    "
            writerPoint.write(writeString)
          })
        }
        writerPoint.close()
        sourceFile.close
        sourceFile2.close
    
    
    
    
        // 从特定的边列表文件中读取数据生成图框架
        val graph = GraphLoader.edgeListFile(sc, urlFollowerFilePath)
    
        // 核心api:    pageRank
        // 0.0001为前后两次收敛的误差阈值,小于这个阈值时则结束计算,越小精度越到
        val ranks = graph.pageRank(0.0001).vertices
    
    
       // 将上面得到的ranks(顶点属性)和用户进行关系连接
        // 首先也是读取一个包含了用户信息的文件,然后调用了一个map函数,即将文件里的每行数据按 ”,” 切开并返回存储了处理后数据的RDD
        val users = sc.textFile(urlPointFilePath).map { line =>
          val fields = line.split("	")
          (fields(0).toLong, fields(1))
        }
       // println("===users: "+users.collect().toBuffer)
    
        // 这里具体实现了将ranks和用户列表一一对应起来
        // 从map函数的内容可以看出是按id来进行连接,但返回的结果只含用户名和它的相应rank值
        val ranksByUsername = users.join(ranks).map {
          case (id, (username, rank)) => (username, rank)
        }
    
        // 结果
        val pageRankResult=ranksByUsername.collect()  //返回 Array[String,Double]
        //打印数据
        println(pageRankResult.mkString("
    "))
        val writerPageRankResult = new PrintWriter(new File(PageRankResultPath))
        pageRankResult.toList.foreach(x=>{
          val writeString=x._1+","+x._2+"
    "
          writerPageRankResult.write(writeString)
        })
        writerPageRankResult.close()
    
    
      }
    
    
    
    
      /**
        * 预处理数据
        *
        * @param links
        * @return   第一个元素为 原url, 后面是 链接url
        */
      def evaluate(links: String) = {
        val pattern = "(\[){1,2}.*?, "
        val r = Pattern.compile(pattern)
        val m = r.matcher(links)
        val linksList = new util.ArrayList[String]
        val title = links.split("	")(0)
        linksList.add(title)
        while ( {
          m.find
        }) {
          val ret = m.group
          val lastIndex = ret.lastIndexOf("[")
          val re = ret.substring(lastIndex + 1, ret.length - 2)
          linksList.add(re)
        }
        linksList.toArray(new Array[String](0))
      }
    
    
    
    }
  • 相关阅读:
    Objects in this class cannot be updated outside
    操作系统原理好书推荐
    Can't initialize OCI
    比较好的GIS blog
    栅格数据开发
    arcgis 本地地图服务 silverlight 调用报错 .
    (转载)Rasterdataset Load data耗时
    网络达人梁宏达
    arcengine总结(1)栅格数据开发
    MyNPOI V1.2发布并开放源码,让.NET Excel导出将简单进行到底【转】
  • 原文地址:https://www.cnblogs.com/ShyPeanut/p/13628133.html
Copyright © 2011-2022 走看看