zoukankan      html  css  js  c++  java
  • PageRank算法MapReduce实现

    如果你现在需要计算网页的排名只有4一:数据如下面的:

    baidu	10.00 google,sina,nefu
    google	10.00 baidu
    sina	10.00 google
    nefu	10.00 sina,google

    1. baidu  存在三个外链接

    2.google 存在1个外链接

    3.sina 存在1个外链接

    4.nefu. 存在2个外链接

    由数据能够看出:全部链接都指向了google,所以google的PR应该最高。而由google指向的baidu的PR值 应该也非常高。

    代码例如以下:

    package PageRank;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
    
    public class PageRank {
    	/**
    	 * @author XD
    	 */
    	static enum PageCount{
    		Count,TotalPR
    	}
    	public static class  Map extends Mapper < LongWritable , Text , Text , Text >{
    		protected void map(LongWritable key, Text value , Context context) throws IOException, InterruptedException{
    			context.getCounter(PageCount.Count).increment(1);
    			String[] kv = value.toString().split("	");
    			String _key = kv[0];
    			String _value = kv[1];
    			String _PRnLink[] = _value.split(" ");
    			String pr = _PRnLink[0];
    			String link = _PRnLink[1];
    			context.write(new Text(_key),new Text(link));
    			String[] site = link.split(",");
    			float score = Float.valueOf(pr)/(site.length)*1.0f;
    			for(int i=0;i<site.length;i++){
    				context.write(new Text(site[i]), new Text(String.valueOf(score)));
    			}
    		}
    	}
    	public static class Reduce extends Reducer < Text , Text , Text, Text>{
    		protected void reduce(Text key , Iterable<Text> values ,Context context) throws IOException, InterruptedException{
    			StringBuilder sb = new StringBuilder();
    			float factor  = 0.85f;	//阻尼因子
    			float pr = 0f;
    			for(Text val : values){
    				String value = val.toString();
    				int s = value.indexOf(".");
    				if(s != -1){
    					pr += Float.valueOf(value);
    				}else{
    					String site[] = value.split(",");
    					int _len = site.length;
    					for(int k=0;k<_len;k++){
    						sb.append(site[k]);
    						sb.append(",");
    					}
    				}
    			}
    			pr = ((1-factor)+(factor*(pr)));
    			context.getCounter(PageCount.TotalPR).increment((int)(pr*1000));
    			String output = pr+" "+sb.toString();
    			context.write(key, new Text(output));
    		}
    	}
    	public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
    		// TODO Auto-generated method stub
    		String input,output;
    		int threshold = 100;
    		int iteration = 0;
    		int iterationLimit = 10;
    	
    		boolean status = false;
    		
    		while(iteration < iterationLimit){
    			//展开重复迭代  注意 输入输出的路径 
    			if((iteration % 2) == 0){
    				input = "hdfs://localhost:9000/output_pr/p*";
    				output = "hdfs://localhost:9000/output_pr2";
    			}else{
    				input = "hdfs://localhost:9000/output_pr2/p*";
    				output = "hdfs://localhost:9000/output_pr";
    			}
    			Configuration conf = new Configuration();
    			final FileSystem filesystem = FileSystem.get(new URI(input),conf);
    			final Path outPath = new Path(output);
    			if(filesystem.exists(outPath)){
    				filesystem.delete(outPath, true);
    			}
    			Job job = new Job(conf,PageRank.class.getSimpleName());
    			
    			//1.1 读取文件 位置
    			FileInputFormat.setInputPaths(job, input);
    			
    			//1.2指定的map类//1.3 map输出的key value 类型 要是和终于的输出类型是一样的 能够省略
    			job.setMapperClass(Map.class);
    			job.setMapOutputKeyClass(Text.class);
    			job.setMapOutputValueClass(Text.class);
    			job.setJarByClass(PageRank.class);
    			
    			//1.3 分区
    			job.setPartitionerClass(HashPartitioner.class);
    			
    			job.setReducerClass(Reduce.class);
    			//指定 reduce的输出类型
    			job.setOutputKeyClass(Text.class);
    			job.setOutputValueClass(Text.class);
    			
    			//指定写出到什么位置
    			FileOutputFormat.setOutputPath(job, new Path(output));
    			status = job.waitForCompletion(true);
    			iteration++;
    			long count = job.getCounters().findCounter(PageCount.Count).getValue();
    			long TotalPr = job.getCounters().findCounter(PageCount.TotalPR).getValue();
    			System.out.println("PageCount:"+count);
    			System.out.println("TotalPR:"+TotalPr);
    			double per_pr = TotalPr/(count*1.0d);
    			System.out.println("PEr_er:"+per_pr);
    			if((int)per_pr == threshold){
    				System.out.println("Iteration:"+iteration);
    				break;
    			}	
    		}
            System.exit(status?

    0:1); } }


    最后输出结果例如以下:



  • 相关阅读:
    【C#】调度程序进程已挂起,但消息仍在处理中;
    【WCF】wcf不支持的返回类型
    power design教程
    C# 动态修改Config
    【C#】delegate(委托) 将方法作为参数在类class 之间传递
    【.NET】Nuget包,把自己的dll放在云端
    【C#】C# 队列,
    【Chrome】新建Chrome插件,新建,事件行为,本地存储
    【IIS】iis6.1下添加两个ftp站点,
    【Jquery】$.Deferred 对象
  • 原文地址:https://www.cnblogs.com/hrhguanli/p/4590884.html
Copyright © 2011-2022 走看看