zoukankan      html  css  js  c++  java
  • PageRank在Hadoop和spark下的实现以及对比

    关于PageRank的地位,不必多说。
    主要思想:对于每个网页,用户都有可能点击网页上的某个链接,例如
    A:B,C,D
    B:A,D
    C:A
    D:B,C
    由这个我们可以得到网页的转移矩阵
         A    B    C    D
    A  0    1/2  1    0
    B 1/3   0    0    0
    C 1/3  1/2  0    0
    D 1/3  0     0    1/2
     
    Aij表示网页j到网页i的转移概率。假设起始状态每个用户对ABCD四个网站的点击概率相同都是0.25,那么各个网站第一次被访问的概率为(0.25,0.25,0.25,0.25),第二次访问考虑到在页面跳转,利用转移矩阵对于网站A的概率为(0,1/2,1,0)*(0.25,0.25,0.25,0.25)T,一次类推,经过若干次迭代会收敛到某个值。但是考虑到有些链接是单链即没有别的链接只想他,他也不指向别的链接,以及有些链接是自己指向自己,那么上述的方式将无法收敛。所以后面加了一个阻尼系数一般取0.85,至于为什么是这样,挺复杂的证明。
    最后的公式为alaph=factor*matrix*(alaph)T+(1-facotr)/n*
    详细的介绍可以参考:http://blog.jobbole.com/71431/
    接下来便是对比Hadoop和spark了。这里只是单纯的讨论两个环境下编程的效率,不讨论性能。
    Hadoop:
    输入的文件:
    A 0.25:B,C,D
    B 0.25:A,D
    C 0.25:A
    D 0.25:B,C
    这里得先说一句,之所以加了0.25是因为初始的概率为1/n,而n为网站数,这里统计网站数又得需要一个MapReduce来实现,所以作罢,权当n是手工输入的。
    由于每次迭代后的结果只能放在文件中,所以这里花了很多时间在规范如何输出,以及map和reduce之间如何传值的问题。
    在map中,我们要做的是从输入文件中获取alaph和每个网站的转移概率。例如
    A 0.25:B,C,D
    B的转移概率为1/3而且是从A转向B的,所以输出的是<"B","link:A 0.333">link表示这是个转移概率,A表示是从A出发的
    alaph的表示:<"B","alaph: A 0.25">这里的A表示这个alaph值对应这A。
    由于我们这里迭代后的输入文件都是从输出文件中获取,所以我们需要将输出文件搞的和一开始输入文件一样,所以在map阶段需要输出<"A","content:B,C,D">方便reduce输出和输入文件一样格式的输出。
    在reduce阶段,此时对于键值B而言,会收到如下
    <"B","link:A 0.333">
    <"B","link:D 0.5">
    <"B","alaph: A 0.25">
    <"B","alaph: D 0.25">
    <"B","content:A,D">
    我们根据不同的单词,将value整合。这的alaph=0.333*0.25+0.5*0.25,接着再加上阻尼系数等,得到最后的alaph值。然后利用content对应的value,最后输出<"B:0.375","A,D">
    这样迭代若干次。
    附上代码:
     
      1 package org.apache.hadoop.PageRank;
      2 
      3 import java.util.ArrayList;
      4 
      5 import org.apache.hadoop.conf.Configuration;
      6 import org.apache.hadoop.fs.FileSystem;
      7 import org.apache.hadoop.fs.Path;
      8 import org.apache.hadoop.io.Text;
      9 import org.apache.hadoop.mapreduce.Job;
     10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     12 
     13 public class PageRank {
     14 
     15     public static void run(){
     16         
     17     }
     18     
     19     public static void main(String[] args) throws Exception {
     20         double factor=0;
     21         if(args.length>1){
     22             factor=Double.parseDouble(args[0]);
     23         }else{
     24             factor=0.85;
     25         }
     26         String input="hdfs://10.107.8.110:9000/PageRank_input";
     27         String output="hdfs://10.107.8.110:9000/PageRank/output";
     28         ArrayList<String> pathList=new ArrayList<String>();        
     29         for(int i=0;i<20;i++){
     30             Configuration conf = new Configuration();
     31             conf.set("num","4");
     32             conf.set("factor",String.valueOf(factor));
     33             Job job = Job.getInstance(conf, "PageRank");
     34             job.setJarByClass(org.apache.hadoop.PageRank.PageRank.class);
     35             job.setMapperClass(MyMapper.class);
     36             job.setReducerClass(MyReducer.class);
     37             job.setOutputKeyClass(Text.class);
     38             job.setOutputValueClass(Text.class);
     39             FileInputFormat.setInputPaths(job, new Path(input));
     40             FileOutputFormat.setOutputPath(job, new Path(output));
     41             input=output;
     42             pathList.add(output);
     43             output=output+1;
     44             
     45             System.out.println("the "+i+"th iterator is finished");
     46             job.waitForCompletion(true);
     47         }
     48         for(int i=0;i<pathList.size()-1;i++){
     49             Configuration conf=new Configuration();
     50             Path path=new Path(pathList.get(i));
     51             FileSystem fs=path.getFileSystem(conf);
     52             fs.delete(path,true);
     53         }
     54     }
     55 
     56 }
     57 
     58 
     59 
     60 package org.apache.hadoop.PageRank;
     61 
     62 import java.io.IOException;
     63 import java.util.HashMap;
     64 import java.util.Map;
     65 
     66 
     67 import org.apache.hadoop.io.LongWritable;
     68 import org.apache.hadoop.io.Text;
     69 import org.apache.hadoop.mapreduce.Mapper;
     70 
     71 public class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
     72 
     73     
     74     public void map(LongWritable ikey, Text ivalue, Context context)
     75             throws IOException, InterruptedException {
     76         String[] line=ivalue.toString().split(":");
     77         String content=line[1];
     78         int num=content.split(",").length;
     79         String word=line[0].split("    ")[0];
     80         String alaph=line[0].split("    ")[1];
     81         context.write(new Text(word),new Text("content:"+content));
     82         for(String w:content.split(",")){
     83             context.write(new Text(w),new Text("link:"+word+" "+String.valueOf(1.0/num)));
     84             context.write(new Text(w),new Text("alaph:"+word+" "+alaph));
     85         }
     86     }
     87 
     88 }
     89 
     90 
     91 
     92 package org.apache.hadoop.PageRank;
     93 
     94 import java.io.IOException;
     95 import java.util.HashMap;
     96 import java.util.Map;
     97 
     98 import org.apache.hadoop.conf.Configuration;
     99 import org.apache.hadoop.io.Text;
    100 import org.apache.hadoop.mapreduce.Reducer;
    101 
    102 public class MyReducer extends Reducer<Text, Text, Text, Text> {
    103 
    104     public void reduce(Text _key, Iterable<Text> values, Context context)
    105             throws IOException, InterruptedException {
    106         // process values
    107         Configuration conf=context.getConfiguration();
    108         double factor=Double.parseDouble(conf.get("factor"));
    109         int num=Integer.parseInt(conf.get("num"));
    110         
    111         Map<String,Double> alaph=new HashMap<String,Double>();
    112         Map<String,Double> link=new HashMap<String,Double>();
    113         
    114         String content="";
    115         for (Text val : values) {
    116             String[] line=val.toString().split(":");
    117             if(line[0].compareTo("content")==0){
    118                 content=line[1];
    119             }else {
    120                 String[] s=line[1].split(" ");
    121                 double d=Double.parseDouble(s[1]);
    122                 if(line[0].compareTo("alaph")==0){
    123                     alaph.put(s[0],d);
    124                 }else if(line[0].compareTo("link")==0){
    125                     link.put(s[0],d);
    126                 }
    127             }
    128         }
    129         double sum=0;
    130         for(Map.Entry<String,Double> entry:alaph.entrySet()){
    131             sum+=link.get(entry.getKey())*entry.getValue();
    132         }
    133         
    134         System.out.println("    ");
    135         System.out.println("sum is "+sum);
    136         System.out.println("    ");
    137         double result=factor*sum+(1-factor)/num;
    138         context.write(_key,new Text(String.valueOf(result)+":"+content));
    139         
    140     }
    141 
    142 }
     
     
     
     
    我们可以看出,其实在MapReduce中我们将大把的精力花在了map的输出上,而之所以这样是因为我们不能直接利用他的结果,并且为了能迭代,我们又只能格式化输出,如果数据很多的,那么在map阶段将有很多的资源需要传递。总而言之,Hadoop让我们将大部分精力花在不该花的地方。
     
    接下来看spark 。我这里用的是python,在pyspark下运行。输入文件:
    A:B,C,D
    B:A,D
    C:A
    D:B,C
    先看代码
    def f(x):
        links=x[1][0]
        rank=x[1][1]
        n=len(links.split(","))
        result=[]
        for s in links.split(","):
            result.append((s,rank*1.0/n))
        return result
    
    file="hdfs://10.107.8.110:9000/spark_test/pagerank.txt"
    
    data=sc.textFile(file)
    link=data.map(lambda x:(x.split(":")[0],x.split(":")[1]))
    n=data.count()
    rank=link.mapValues(lambda x:1.0/n)
    
    for i in range(10):
        rank=link.join(rank).flatMap(f).reduceByKey(lambda x,y:x+y).mapValues(lambda x:0.15/n+0.85*x)


    直接分析,data=sc.textFile(file)从hdfs中获取text文件。
    通过data.collect()可以发现内容为
     
    我们需要将其转换为键值对,那么这里就需要map函数
    此时lambda x的x值为字符串,所以通过:将其分割
     
    接着通过n=data.count()我们可以直接获得网站数,而不必手动输入
     
     
    接着通过link.join(rank),让link和rank根据key而join进来
    link.join(rank).flatMap(f)用于提取键值,由于输入的是(page,(links,rank)),所以这里定义了一个函数f用于分割links,让links分割成若干个link,并加上rank输出。
    最后只需将其按照key值进行reduce即可
    link.join(rank).flatMap(f).reduceByKey(lambda x,y:x+y),这样就会将相同key的概率相加,得到alaph,接着再加上阻尼系数即可
     
    link.join(rank).flatMap(f).reduceByKey(lambda x,y:x+y).mapValues(lambda x:0.15/n+0.85*x)这样就是一个完整的计算
    通过迭代若干次就可以了。
    从代码量上说(虽然python比java简明)spark的确是比Hadoop好很多。原因也说了,1每次迭代不必将结果存放在文件中 2提供了更多的范式
  • 相关阅读:
    FreeRTOS 任务栈大小确定及其溢出检测
    FreeRTOS任务优先级说明
    leetcode 263 Ugly Number
    L2,breakfast or lunch
    Redis(2)用jedis实现在java中使用redis
    L1,a private conversation
    Redis(1)在windows环境下的安装和测试
    springMVC的拦截器工作流程
    求交集,差集,并集,善用java的set
    java下发电子邮件demo
  • 原文地址:https://www.cnblogs.com/sunrye/p/4611570.html
Copyright © 2011-2022 走看看