zoukankan      html  css  js  c++  java
  • Mapreduce -- PageRank

    PageRank 简单理解为网页排名,但是网页是根据什么排名的,接下来就简单介绍一下。

    举例:

    假设网页 A 的内容中有网页 B,C 和 D 的链接,并且 A 的 PageRank的值为0.25。

    那接下里我们就可以计算在网页 A 中的其他网页的PageRank的值了。我们拿网页 B 来进行说明,

    在网页 A 中的网页 B 的 PageRank 为 0.25 * (1/n) 其中n为网页 A 中网页链接数,结果则为 0.25*(1/3)。

    可以简单理解为A的PageRank被B,C 和 D 平分了,B分到了0.25的三分之一。

    然后将所有网页中的关于网页B的pagerank值求出来,就是网页B真实的pagerank了。

    但是上面的例子没有考虑到如下的特殊情况:

    1 网页A中只有指向自己的网页链接。

    2 网页A中没有任何链接。

    如果出现以上情况,会导致pagerank结果不准确。

    所以出现了下面的公式:

    result = sum * n + (1-n)/N

    sum 为上面计算出来的,如网页B在所有网页中的pagerank值的总和。

    n 可以理解为停留在当前网页继续进行网页跳转浏览的概率

    1-n 可以理解为不访问当前网页的任何链接,从浏览器的地址栏输入,转去其他网页的概率。

    N 为网页的数量

    下面介绍通过MapReduce实现PageRank

    简单的流程分析:

    Map

    取一行数据进行说明

    A    B    C    D

    网页A中有网页B,C,D的链接;

    刚开始给网页A一个默认的pagerank值,然后根据这个值计算其他网页链接在网页A中的PageRank。处理后的数据如下:

    A    0.25    B    C    D
    B    0.25*(1/3)
    C    0.25*(1/3)
    D    0.25*(1/3)

    Reduce

    然后通过 Reduce 计算出各个网页在其他网页中的 PageRank 的总和 sum,然后代入公式计算实际的PageRank,并更新 A 0.25 B C D 中的数据,这里将0.25更新为计算出来真实的 PageRank。

    重复计算各个网页的 PageRank 的值,直到 PageRank 的结果收敛,值趋于稳定。

    A    0.25    B    C    D  =>A    ?    B    C    D

    测试数据:

    A    B       C       D
    B    A       D
    C    C
    D    B       C

    测试代码:

    RunJob.java

      1 import org.apache.hadoop.conf.Configuration;
      2 import org.apache.hadoop.fs.FileSystem;
      3 import org.apache.hadoop.fs.Path;
      4 import org.apache.hadoop.io.Text;
      5 import org.apache.hadoop.mapreduce.Job;
      6 import org.apache.hadoop.mapreduce.Mapper;
      7 import org.apache.hadoop.mapreduce.Reducer;
      8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      9 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
     10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     11 
     12 import java.io.IOException;
     13 
     14 /**
     15  * Created by Edward on 2016/7/13.
     16  */
     17 public class RunJob {
     18 
     19     public static enum ValueEnum{
     20         CLOSURE_VALUE;
     21     }
     22 
     23     public static void main(String[] args)
     24     {
     25 
     26         //access hdfs's user
     27         System.setProperty("HADOOP_USER_NAME","root");
     28 
     29         Configuration conf = new Configuration();
     30         conf.set("fs.defaultFS", "hdfs://node1:8020");
     31 
     32         try {
     33             int i = 0;
     34             while(true) {
     35                 i++;
     36                 conf.setInt("count", i);
     37 
     38                 FileSystem fs = FileSystem.get(conf);
     39                 Job job = Job.getInstance(conf);
     40                 job.setJarByClass(RunJob.class);
     41                 job.setMapperClass(MyMapper.class);
     42                 job.setReducerClass(MyReducer.class);
     43 
     44                 //需要指定 map out 的 key 和 value
     45                 job.setOutputKeyClass(Text.class);
     46                 job.setOutputValueClass(Text.class);
     47 
     48                 //设置输入类的
     49                 job.setInputFormatClass(KeyValueTextInputFormat.class);
     50                 if(i==1)
     51                     FileInputFormat.addInputPath(job, new Path("/test/pagerank/input"));
     52                 else
     53                     FileInputFormat.addInputPath(job, new Path("/test/pagerank/output/pr"+(i-1)));
     54 
     55                 Path path = new Path("/test/pagerank/output/pr"+i);
     56                 if (fs.exists(path))//如果目录存在,则删除目录
     57                 {
     58                     fs.delete(path, true);
     59                 }
     60                 FileOutputFormat.setOutputPath(job, path);
     61 
     62                 boolean b = job.waitForCompletion(true);
     63                 if (b) {
     64                     long closure =job.getCounters().findCounter(ValueEnum.CLOSURE_VALUE).getValue();
     65                     double avg= closure/4000.0;//计算收敛的平均值,浮动小于0.001则认为收敛
     66                     System.out.println("执行第"+i+"次, closure="+closure+",avg="+avg);
     67                     if(avg < 0.001){
     68                         System.out.println("总共执行了"+i+"次,之后收敛");
     69                         break;
     70                     }
     71                 }
     72             }
     73 
     74         } catch (Exception e) {
     75             e.printStackTrace();
     76         }
     77     }
     78 
     79 
     80     public static class MyMapper extends Mapper<Text, Text, Text, Text> {
     81         @Override
     82         protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
     83             AdjacentNodes adjacentNodes  = new AdjacentNodes();
     84             int count = context.getConfiguration().getInt("count", 1);
     85 
     86             if(count == 1)
     87             {
     88                 AdjacentNodes firstAdj = new AdjacentNodes();
     89                 firstAdj.setValue(1.0);
     90                 firstAdj.setNodes(value.toString().split("	"));
     91                 adjacentNodes = firstAdj;
     92             }
     93             else
     94             {
     95                 //格式化 value: 1.0 B C D
     96                 adjacentNodes.formatInfo(value.toString());
     97             }
     98             //A 1.0 B C D
     99             context.write(key, new Text(adjacentNodes.toString()));
    100 
    101             double pagerank = adjacentNodes.getValue()/adjacentNodes.getNum();
    102             for(int i=0; i<adjacentNodes.getNum(); i++)
    103             {
    104                 String node = adjacentNodes.getNodes()[i];
    105                 //B 0.333
    106                 context.write(new Text(node), new Text(pagerank+""));
    107             }
    108         }
    109     }
    110 
    111     public static class MyReducer extends Reducer<Text, Text, Text, Text> {
    112         @Override
    113         protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    114 
    115             AdjacentNodes adjacentNodes = new AdjacentNodes();
    116 
    117             Double sum = 0.0;
    118             for(Text adj : values)
    119             {
    120                 String str = adj.toString();
    121                 if(str.split("	").length>1)
    122                 {
    123                     adjacentNodes.formatInfo(str);
    124                 }
    125                 else{
    126                     sum += Double.parseDouble(str);   //对节点的 pagerank 求和,
    127                 }
    128             }
    129 
    130             //计算pagerank
    131             double n = 0.80;
    132             double pagerank = sum * n + (1-n)/4.0;// 计算pagerank 公式 sum * n + (1-n)/N
    133 
    134             //计算收敛的差值
    135             int closure =(int)(Math.abs(pagerank - adjacentNodes.getValue()) * 1000);
    136             //通过context.getCounter(ENUM)方法,每次执行reduce将closure的值进行累加,结果传递给主函数,
    137 
    138             context.getCounter(ValueEnum.CLOSURE_VALUE).increment(closure);
    139 
    140             adjacentNodes.setValue(pagerank);
    141             context.write(key, new Text(adjacentNodes.toString()));
    142         }
    143     }
    144 }
    
    
    AdjacentNodes.java
    /**
     * Created by Edward on 2016/7/14.
     */
    public class AdjacentNodes {
    
        double value = 0.0;
        int num = 0;
        String[] nodes = null;
    
        public void formatInfo(String str)
        {
            String[] val = str.split("	");
            this.setValue(Double.parseDouble(val[0]));
            this.setNum(val.length-1);
    
            if(this.num != 0)
                this.nodes = new String[this.num];
    
            for(int i=1; i<val.length; i++)
            {
                this.nodes[i-1] = val[i];
            }
        }
    
        public boolean isEmpty()
        {
            if(this.num == 0)
                return true;
            else
                return false;
        }
    
        public void setNodes(String[] nodes) {
            this.nodes = nodes;
            this.num = nodes.length;
        }
    
        public void setNum(int num) {
            this.num = num;
        }
    
        public void setValue(double value) {
            this.value = value;
        }
    
        public double getValue() {
            return value;
        }
    
        public int getNum() {
            return num;
        }
    
        public String[] getNodes() {
            return nodes;
        }
    
    
        @Override
        public String toString() {
    
            StringBuffer stringBuffer = new StringBuffer(this.value+"");
    
            for(int i=0; i<this.num; i++)
            {
                stringBuffer.append("	"+this.nodes[i]);
            }
    
            return stringBuffer.toString();
        }
    }

    结果数据:

    A       0.10135294176208584     B       C       D
    B       0.12838069628609527     A       D
    C       0.6560527651143326      C
    D       0.12838069628609527     B       C

    总共执行了24次,之后收敛;

    PageRank值越高,越值得推荐。

  • 相关阅读:
    多线程和多进程通信原理
    在树莓派4b上安装 ROS MELODIC 源码安装
    使用Android手机作为树莓派的屏幕
    树莓派桌面设置快捷键
    linux 下使用dd制作启动U盘 安装linux
    manjaro 18.10 install soft
    install slax record
    slax中改变终端字体
    为win10下的linux子系统终端添加powerline
    为ubuntu安装powerline记录
  • 原文地址:https://www.cnblogs.com/one--way/p/5672207.html
Copyright © 2011-2022 走看看