zoukankan      html  css  js  c++  java
  • mapreduce 实现pagerank

    输入格式:
    A  1  B,C,D
       B  1  C,D
    map:
       B  A  1/3
       C  A  1/3
       D  A  1/3
       A  |B,C,D
       C  B  1/2
       D  B  1/2
       B  |C,D
    reduce:
       B  
    (1-0.85)+0.85*1/3  C,D

       C  (1-0.85)+0.85*5/6  
         D  (1-0.85)+0.85*5/6
    A (1-0.85)+0.85*0  B,C,D


    import
    java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class PageRankIter { private static final double damping = 0.85; public static class PRIterMapper extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] tuple = line.split(" "); String pageKey = tuple[0]; double pr = Double.parseDouble(tuple[1]); if (tuple.length > 2) { String[] linkPages = tuple[2].split(","); for (String linkPage : linkPages) { String prValue = pageKey + " " + String.valueOf(pr / linkPages.length); context.write(new Text(linkPage), new Text(prValue)); } context.write(new Text(pageKey), new Text("|" + tuple[2])); } } } public static class PRIterReducer extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String links = ""; double pagerank = 0; for (Text value : values) { String tmp = value.toString(); if (tmp.startsWith("|")) { links = " " + tmp.substring(tmp.indexOf("|") + 1);// index从0开始 continue; } String[] tuple = tmp.split(" "); if (tuple.length > 1) pagerank += Double.parseDouble(tuple[1]); } pagerank = (double) (1 - damping) + damping * pagerank; // PageRank的计算迭代公式 context.write(new Text(key), new Text(String.valueOf(pagerank) + links)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job2 = new Job(conf, "PageRankIter"); job2.setJarByClass(PageRankIter.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); job2.setMapperClass(PRIterMapper.class); job2.setReducerClass(PRIterReducer.class); FileInputFormat.addInputPath(job2, new Path(args[0])); FileOutputFormat.setOutputPath(job2, new Path(args[1])); job2.waitForCompletion(true); } }
    输入为上述的输出
    输入格式为:
    A  pr
    B  pr
    ...

    import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class PageRankViewer { public static class PageRankViewerMapper extends Mapper<LongWritable, Text, FloatWritable, Text> { private Text outPage = new Text(); private FloatWritable outPr = new FloatWritable(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split(" "); String page = line[0]; float pr = Float.parseFloat(line[1]); outPage.set(page); outPr.set(pr); context.write(outPr, outPage); } } /**重载key的比较函数,使其经过shuffle和sort后反序(从大到小)输出**/ public static class DescFloatComparator extends FloatWritable.Comparator { // @Override public float compare(WritableComparator a, WritableComparable<FloatWritable> b) { return -super.compare(a, b); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -super.compare(b1, s1, l1, b2, s2, l2); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job3 = new Job(conf, "PageRankViewer"); job3.setJarByClass(PageRankViewer.class); job3.setOutputKeyClass(FloatWritable.class); job3.setSortComparatorClass(DescFloatComparator.class); job3.setOutputValueClass(Text.class); job3.setMapperClass(PageRankViewerMapper.class); FileInputFormat.addInputPath(job3, new Path(args[0])); FileOutputFormat.setOutputPath(job3, new Path(args[1])); job3.waitForCompletion(true); } }

      

  • 相关阅读:
    CentOS7.4下载与安装
    Windows 环境下vue+webpack前端开发环境搭建
    PHPSSO通信一直失败。
    TortoiseGit安装和使用的图文教程
    TortoiseGit安装教程
    HTML精确定位:scrollLeft,scrollWidth,clientWidth,offsetWidth之完全详解
    linux 安装xamp
    linux的rpm命令
    0和空的判断
    mysql中 case when的使用
  • 原文地址:https://www.cnblogs.com/liutoutou/p/4059756.html
Copyright © 2011-2022 走看看