zoukankan      html  css  js  c++  java
  • Hadoop实战训练————MapReduce实现PageRank算法

    经过一段时间的学习,对于Hadoop有了一些了解,于是决定用MapReduce实现PageRank算法,以下简称PR

    先简单介绍一下PR算法(摘自百度百科:https://baike.baidu.com/item/google%20pagerank/2465380?fr=aladdin&fromid=111004&fromtitle=pagerank):

    PageRank让链接来"投票"
    一个页面的“得票数”由所有链向它的页面的重要性来决定,到一个页面的超链接相当于对该页投一票。一个页面的PageRank是由所有链向它的页面(“链入页面”)的重要性经过递归算法得到的。一个有较多链入的页面会有较高的等级,相反如果一个页面没有任何链入页面,那么它没有等级。
      2005年初,Google为网页链接推出一项新属性nofollow,使得网站管理员和网站作者可以做出一些Google不计票的链接,也就是说这些链接不算作"投票"。nofollow的设置可以抵制评论垃圾。
    假设一个由4个页面组成的小团体:A,B,C和D。如果所有页面都链向A,那么A的PR(PageRank)值将是B,C及D的Pagerank总和。
    继续假设B也有链接到C,并且D也有链接到包括A的3个页面。一个页面不能投票2次。所以B给每个页面半票。以同样的逻辑,D投出的票只有三分之一算到了A的PageRank上。
    换句话说,根据链出总数平分一个页面的PR值。
    最后,所有这些被换算为一个百分比再乘上一个系数。由于“没有向外链接的页面”传递出去的PageRank会是0,所以,Google通过数学系统给了每个页面一个最小值:
    说明:在Sergey Brin和Lawrence Page的1998年原文中给每一个页面设定的最小值是1-d,而不是这里的
    (1-d)/N。 所以一个页面的PageRank是由其他页面的PageRank计算得到。Google不断的重复计算每个页面的PageRank。如果给每个页面一个随机PageRank值(非0),那么经过不断的重复计算,这些页面的PR值会趋向于稳定,也就是收敛的状态。这就是搜索引擎使用它的原因。
     
    通过以上文字,可以总结出以下几点:
    1.PR中每个页面都需要需要一个初始值
    2.PR算法是一个趋于收敛的无限循环,因此需要一个条件来确定收敛完毕
    一般而言收敛条件有以下三种情况:

    1、每个页面的PR值和上一次计算的PR相等

    2、设定一个差值指标(0.0001)。当所有页面和上一次计算的PR差值平均小于该标准时,则收敛。

    3、设定一个百分比(99%),当99%的页面和上一次计算的PR相等

    本文将采用第二种方式来实现该算法:

    首先定义一个初始互联网环境,如下图所示:
    转化为文件则内容如下:

    A B D
    B C
    C A B
    D B C

    其中每一行的后面的页面为第一个页面的出链(A可以链到B和C)

    由于需要统计每个页面的入链页面和出链数,因此需要两个MapReduce,第一个用于统计入链和出链,第二个用于循环统计PR值,代码如下:

    package com.tyx.mapreduce.PageRank;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * Created by tyx on 2017/11/29.
     */
    public class RunPageRankJob {
    
    //    统计所有链接的入链
        private static Map<String,String > allInLine = new HashMap<>();
    //    统计所有链接的出链
        private static Map<String,Integer> allOutLine = new HashMap<>();
    //    统计所有链接的现有pagerank
        private static Map<String,Double> allPageRank = new HashMap<>();
    //    统计所有链接计算后的pagerank
        private static Map<String ,Double> allNextPageRank = new HashMap<>();
    
        public static void main(String[] args) {
            Configuration configuration = new Configuration();
    //            configuration.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR," ");
            configuration.set("fs.defaultFS", "hdfs://node1:8020");
            configuration.set("yarn.resourcemanager.hostname", "node1");
    //        第一个MapReduce为了统计出每个页面的入链,和每个页面的出链数
            if (run1(configuration)){
                run2(configuration);
            }
        }
    
        /*
        输入数据:
        A    B    D
        B    C
        C    A    B
        D    B    C*/
    
        static class AcountOutMapper extends Mapper<Text,Text,Text,Text>{
            @Override
            protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
    //            super.map(key, value, context);
                int num = 0;
    //            若A能连接到B,则说明B是A的一条出链
                String[] outLines = value.toString().split("	");
                for (int i=0;i<outLines.length;i++){
                    context.write(new Text(outLines[i]),key);
                }
                num = outLines.length;
    //            统计出链
                context.write(key,new Text("--"+num));
            }
        }
    
        static class AcountOutReducer extends Reducer<Text,Text,Text,Text>{
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    //            super.reduce(key, values, context);
    //            统计该页面的入链页面
                String outStr = "";
                int sum = 0;
                for (Text text : values){
    //                统计出链数目
                    if (text.toString().contains("--")){
                        sum += Integer.parseInt(text.toString().replaceAll("--",""));
                    }else {
                        outStr += text+"	";
                    }
                }
                context.write(key,new Text(outStr+sum));
                allOutLine.put(key.toString(),sum);
                allInLine.put(key.toString(),outStr);
                allPageRank.put(key.toString(),1.0);
            }
        }
    
        public static boolean run1(Configuration configuration){
            try {
                Job job = Job.getInstance(configuration);
                FileSystem fileSystem = FileSystem.get(configuration);
    
                job.setJobName("acountline");
    
                job.setJarByClass(RunPageRankJob.class);
    
                job.setMapperClass(AcountOutMapper.class);
                job.setReducerClass(AcountOutReducer.class);
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(Text.class);
    
                job.setInputFormatClass(KeyValueTextInputFormat.class);
    
                Path intPath = new Path("/usr/output/pagerank.txt");
                FileInputFormat.addInputPath(job,intPath);
    
    
                Path outPath = new Path("/usr/output/acoutline");
                if (fileSystem.exists(outPath)){
                    fileSystem.delete(outPath,true);
                }
                FileOutputFormat.setOutputPath(job,outPath);
    
                boolean f = job.waitForCompletion(true);
                return f;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return false;
        }
    
        /*第一次MapReduce输出数据:
        A    C
        B    A   C   D
        C    B   D
        D    A*/
    
        static class PageRankMapper extends Mapper<Text,Text,Text,Text>{
            @Override
            protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
    //            super.map(key, value, context);
                String myUrl = key.toString();
    //            取出该页面所有的入链页面
                String inLines = allInLine.get(myUrl);
                context.write(key,new Text(inLines));
            }
        }
    
        static class PageRankReducer extends Reducer<Text,Text,Text,Text>{
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    //            super.reduce(key, values, context);
    //            后半段求和公式的和 (PR(1)/L(1)…………PR(i)/L(i)
                double sum = 0.0;
                String outStr = "";
                for (Text text : values){
                    String[] arr = text.toString().split("	");
                    for (int i=0;i<arr.length;i++){
                        outStr += arr[i]+"	";
                        sum += allPageRank.get(arr[i])/allOutLine.get(arr[i]);
                    }
                }
    //            算出该页面本次的PR结果
                double nowPr = (1-0.85)/allPageRank.size()+0.85*sum;
    
                allNextPageRank.put(key.toString(),nowPr);
                context.write(key,new Text(outStr));
            }
        }
    
        public static void run2(Configuration configuration){
            double d = 0.001;
            int i=1;
    //        迭代循环趋于收敛
            while (true){
                try {
                    configuration.setInt("count",i);
                    i++;
                    Job job = Job.getInstance(configuration);
                    FileSystem fileSystem = FileSystem.get(configuration);
    
                    job.setJobName("pagerank");
                    job.setJarByClass(RunPageRankJob.class);
                    job.setJobName("Pr"+i);
    
                    job.setMapperClass(PageRankMapper.class);
                    job.setReducerClass(PageRankReducer.class);
                    job.setMapOutputKeyClass(Text.class);
                    job.setMapOutputValueClass(Text.class);
    
                    job.setInputFormatClass(KeyValueTextInputFormat.class);
    
                    Path intPath = new Path("/usr/output/pagerank.txt");
                    if (i>2){
                        intPath = new Path("/usr/output/Pr"+(i-1));
                    }
                    FileInputFormat.addInputPath(job,intPath);
    
    
                    Path outPath = new Path("/usr/output/Pr"+i);
                    if (fileSystem.exists(outPath)){
                        fileSystem.delete(outPath,true);
                    }
                    FileOutputFormat.setOutputPath(job,outPath);
    
                    boolean f = job.waitForCompletion(true);
                    if (f){
                        System.out.println("job执行完毕");
                        double sum = 0.0;
    //                    提取本轮所有页面的PR值和上一轮作比较,
                        for (String key : allPageRank.keySet()){
                            System.out.println(key+"--------------------------"+allPageRank.get(key));
                            sum += Math.abs(allNextPageRank.get(key)-allPageRank.get(key));
                            allPageRank.put(key,allNextPageRank.get(key));
                        }
                        System.out.println(sum);
    //                    若平均差小于d则表示收敛完毕
                        if (sum/allPageRank.size()<d){
                            break;
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    最终结果输入如下:

    由图可知在这四个页面组成的互联网集群中,页面C的重要性是最高的

    本次操作一共经过了30次循环:

    若有不对之处请不吝指教,谢谢

    转载请注明出处http://www.cnblogs.com/liuxiaopang/p/7930508.html

    人生苦短,远离IT脱离苦海
  • 相关阅读:
    如何在Android Studio中添加注释模板信息?
    Android Activity标签属性
    Android Activity全面解析
    Mac office ppt无法正常输入文字的问题解决方案
    将Android Studio默认布局ConstraintLayout切换成LinearLayout
    Java中Double保留后小数位的几种方法
    java文件传输接口
    纯JS编写打地鼠游戏
    JavaScript监听手机物理返回键的两种解决方法
    spring注解方式实现定时器
  • 原文地址:https://www.cnblogs.com/liuxiaopang/p/7930508.html
Copyright © 2011-2022 走看看