zoukankan      html  css  js  c++  java
  • spark PageRank

    import java.io.{File, PrintWriter}
    import java.util
    import java.util.regex.Pattern
    
    import org.apache.spark.graphx.GraphLoader
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.{SparkConf, SparkContext}
    
    import scala.collection.mutable
    import scala.io.Source
    
    
    object PageRankTest {
    
      def main(args: Array[String]): Unit = {
        val masterUrl = "local[2]"
        val appName = "PageRank_test"
        val sparkConf = new SparkConf().setMaster(masterUrl).setAppName(appName)
        val sc = new SparkContext(sparkConf)
        sc.setLogLevel("ERROR")
    
        //原始数据文件
        val urlSourceFilePath="Peanut/httpFile.txt"
        //预处理后要写入的数据集1 : 边
        val urlFollowerFilePath="Peanut/urlFollower.txt"
        //预处理后要写入的数据集2 : 点
        val urlPointFilePath="Peanut/urlPoint.txt"
        //结果文件
        val PageRankResultPath="Peanut/PageRankResult.txt"
    
        
        // 每个url 对应一个数值(int)
        val urlToIntMap = mutable.Map[String,Int]()
        var count:Int = 1
    
        // 将原始数据中url映射成一个int数值
        val sourceFile=Source.fromFile(urlSourceFilePath)
        for(line <- sourceFile.getLines){
          val list = evaluate(line)
          list.map(x=>{
            if (!urlToIntMap.contains(x)) {
              urlToIntMap.put(x,count)
              count += 1
            }
          })
        }
    
        // 写 点数据集: point.txt
        val writerFollower = new PrintWriter(new File(urlPointFilePath))
        urlToIntMap.foreach(x=>{
          val writeContain = x._2 + "	" + x._1 + "
    "
          writerFollower.write(writeContain)
        })
        writerFollower.close()
    
    
        //写 边数据集: follower.txt
        val sourceFile2=Source.fromFile(urlSourceFilePath)
        val writerPoint = new PrintWriter(new File(urlFollowerFilePath))
        for(line <- sourceFile2.getLines) {
          val list = evaluate(line).toList
          val firstUrl = list.head
          val firstUrlNum = urlToIntMap(firstUrl)
          val otherUrlList = list.tail
          otherUrlList.foreach(x=>{
            val writeNum = urlToIntMap(x)
            val writeString = firstUrlNum + "	" + writeNum + "
    "
            writerPoint.write(writeString)
          })
        }
        writerPoint.close()
        sourceFile.close
        sourceFile2.close
    
    
    
    
        // 从特定的边列表文件中读取数据生成图框架
        val graph = GraphLoader.edgeListFile(sc, urlFollowerFilePath)
    
        // 核心api:    pageRank
        // 0.0001为前后两次收敛的误差阈值,小于这个阈值时则结束计算,越小精度越到
        val ranks = graph.pageRank(0.0001).vertices
    
    
       // 将上面得到的ranks(顶点属性)和用户进行关系连接
        // 首先也是读取一个包含了用户信息的文件,然后调用了一个map函数,即将文件里的每行数据按 ”,” 切开并返回存储了处理后数据的RDD
        val users = sc.textFile(urlPointFilePath).map { line =>
          val fields = line.split("	")
          (fields(0).toLong, fields(1))
        }
       // println("===users: "+users.collect().toBuffer)
    
        // 这里具体实现了将ranks和用户列表一一对应起来
        // 从map函数的内容可以看出是按id来进行连接,但返回的结果只含用户名和它的相应rank值
        val ranksByUsername = users.join(ranks).map {
          case (id, (username, rank)) => (username, rank)
        }
    
        // 结果
        val pageRankResult=ranksByUsername.collect()  //返回 Array[String,Double]
        //打印数据
        println(pageRankResult.mkString("
    "))
        val writerPageRankResult = new PrintWriter(new File(PageRankResultPath))
        pageRankResult.toList.foreach(x=>{
          val writeString=x._1+","+x._2+"
    "
          writerPageRankResult.write(writeString)
        })
        writerPageRankResult.close()
    
    
      }
    
    
    
    
      /**
        * 预处理数据
        *
        * @param links
        * @return   第一个元素为 原url, 后面是 链接url
        */
      def evaluate(links: String) = {
        val pattern = "(\[){1,2}.*?, "
        val r = Pattern.compile(pattern)
        val m = r.matcher(links)
        val linksList = new util.ArrayList[String]
        val title = links.split("	")(0)
        linksList.add(title)
        while ( {
          m.find
        }) {
          val ret = m.group
          val lastIndex = ret.lastIndexOf("[")
          val re = ret.substring(lastIndex + 1, ret.length - 2)
          linksList.add(re)
        }
        linksList.toArray(new Array[String](0))
      }
    
    
    
    }
  • 相关阅读:
    centos7.6 安装与配置 MongoDB yum方式
    MongoDB 介绍
    centos 关闭selinux
    前端 HTML标签属性
    前端 HTML 标签嵌套规则
    前端 HTML 标签分类
    前端 HTML body标签相关内容 常用标签 表单标签 form里面的 input标签介绍
    前端 HTML body标签相关内容 常用标签 表单标签 form 表单控件分类
    前端 HTML form表单标签 select标签 option 下拉框
    POJ 1426
  • 原文地址:https://www.cnblogs.com/ShyPeanut/p/13628133.html
Copyright © 2011-2022 走看看