zoukankan      html  css  js  c++  java
  • mapreduce 实现pagerank

    输入格式:
    A  1  B,C,D
       B  1  C,D
    map:
       B  A  1/3
       C  A  1/3
       D  A  1/3
       A  |B,C,D
       C  B  1/2
       D  B  1/2
       B  |C,D
    reduce:
       B  
    (1-0.85)+0.85*1/3  C,D

       C  (1-0.85)+0.85*5/6  
         D  (1-0.85)+0.85*5/6
    A (1-0.85)+0.85*0  B,C,D


    import
    java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class PageRankIter { private static final double damping = 0.85; public static class PRIterMapper extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] tuple = line.split(" "); String pageKey = tuple[0]; double pr = Double.parseDouble(tuple[1]); if (tuple.length > 2) { String[] linkPages = tuple[2].split(","); for (String linkPage : linkPages) { String prValue = pageKey + " " + String.valueOf(pr / linkPages.length); context.write(new Text(linkPage), new Text(prValue)); } context.write(new Text(pageKey), new Text("|" + tuple[2])); } } } public static class PRIterReducer extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String links = ""; double pagerank = 0; for (Text value : values) { String tmp = value.toString(); if (tmp.startsWith("|")) { links = " " + tmp.substring(tmp.indexOf("|") + 1);// index从0开始 continue; } String[] tuple = tmp.split(" "); if (tuple.length > 1) pagerank += Double.parseDouble(tuple[1]); } pagerank = (double) (1 - damping) + damping * pagerank; // PageRank的计算迭代公式 context.write(new Text(key), new Text(String.valueOf(pagerank) + links)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job2 = new Job(conf, "PageRankIter"); job2.setJarByClass(PageRankIter.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); job2.setMapperClass(PRIterMapper.class); job2.setReducerClass(PRIterReducer.class); FileInputFormat.addInputPath(job2, new Path(args[0])); FileOutputFormat.setOutputPath(job2, new Path(args[1])); job2.waitForCompletion(true); } }
    输入为上述的输出
    输入格式为:
    A  pr
    B  pr
    ...

    import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class PageRankViewer { public static class PageRankViewerMapper extends Mapper<LongWritable, Text, FloatWritable, Text> { private Text outPage = new Text(); private FloatWritable outPr = new FloatWritable(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split(" "); String page = line[0]; float pr = Float.parseFloat(line[1]); outPage.set(page); outPr.set(pr); context.write(outPr, outPage); } } /**重载key的比较函数,使其经过shuffle和sort后反序(从大到小)输出**/ public static class DescFloatComparator extends FloatWritable.Comparator { // @Override public float compare(WritableComparator a, WritableComparable<FloatWritable> b) { return -super.compare(a, b); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -super.compare(b1, s1, l1, b2, s2, l2); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job3 = new Job(conf, "PageRankViewer"); job3.setJarByClass(PageRankViewer.class); job3.setOutputKeyClass(FloatWritable.class); job3.setSortComparatorClass(DescFloatComparator.class); job3.setOutputValueClass(Text.class); job3.setMapperClass(PageRankViewerMapper.class); FileInputFormat.addInputPath(job3, new Path(args[0])); FileOutputFormat.setOutputPath(job3, new Path(args[1])); job3.waitForCompletion(true); } }

      

  • 相关阅读:
    高精度计算
    高精度除以低精度
    P1258 小车问题
    POJ 2352 stars (树状数组入门经典!!!)
    HDU 3635 Dragon Balls(超级经典的带权并查集!!!新手入门)
    HDU 3938 Portal (离线并查集,此题思路很强!!!,得到所谓的距离很巧妙)
    POJ 1703 Find them, Catch them(确定元素归属集合的并查集)
    HDU Virtual Friends(超级经典的带权并查集)
    HDU 3047 Zjnu Stadium(带权并查集,难想到)
    HDU 3038 How Many Answers Are Wrong(带权并查集,真的很难想到是个并查集!!!)
  • 原文地址:https://www.cnblogs.com/liutoutou/p/4059756.html
Copyright © 2011-2022 走看看