zoukankan      html  css  js  c++  java
  • MapReduce TopK统计加排序

    Hadoop技术内幕中指出Top K算法有两步,一是统计词频,二是找出词频最高的前K个词。在网上找了很多MapReduce的Top K案例,这些案例都只有排序功能,所以自己写了个案例。

    这个案例分两个步骤,第一个是就是wordCount案例,二就是排序功能。

    一,统计词频

     1 package TopK;
     2 import java.io.IOException;
     3 import java.util.StringTokenizer;
     4 
     5 import org.apache.hadoop.conf.Configuration;
     6 import org.apache.hadoop.fs.Path;
     7 import org.apache.hadoop.io.IntWritable;
     8 import org.apache.hadoop.io.Text;
     9 import org.apache.hadoop.mapreduce.Job;
    10 import org.apache.hadoop.mapreduce.Mapper;
    11 import org.apache.hadoop.mapreduce.Reducer;
    12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    14 
    15 /**
    16  * 统计词频
    17  * @author zx
    18  * zhangxian1991@qq.com
    19  */
    20 public class WordCount {
    21     
    22     /**
    23      * 读取单词
    24      * @author zx
    25      *
    26      */
    27     public static class Map extends Mapper<Object,Text,Text,IntWritable>{
    28 
    29         IntWritable count = new IntWritable(1);
    30         
    31         @Override
    32         protected void map(Object key, Text value, Context context)
    33                 throws IOException, InterruptedException {
    34             StringTokenizer st = new StringTokenizer(value.toString());
    35             while(st.hasMoreTokens()){    
    36                 String word = st.nextToken().replaceAll(""", "").replace("'", "").replace(".", "");
    37                 context.write(new Text(word), count);
    38             }
    39         }
    40         
    41     }
    42     
    43     /**
    44      * 统计词频
    45      * @author zx
    46      *
    47      */
    48     public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{
    49 
    50         @SuppressWarnings("unused")
    51         @Override
    52         protected void reduce(Text key, Iterable<IntWritable> values,Context context)
    53                 throws IOException, InterruptedException {
    54             int count = 0;
    55             for (IntWritable intWritable : values) {
    56                 count ++;
    57             }
    58             context.write(key,new IntWritable(count));
    59         }
    60         
    61     }
    62     
    63     @SuppressWarnings("deprecation")
    64     public static boolean run(String in,String out) throws IOException, ClassNotFoundException, InterruptedException{
    65         
    66         Configuration conf = new Configuration();
    67         
    68         Job job = new Job(conf,"WordCount");
    69         job.setJarByClass(WordCount.class);
    70         job.setMapperClass(Map.class);
    71         job.setReducerClass(Reduce.class);
    72         
    73         // 设置Map输出类型
    74         job.setMapOutputKeyClass(Text.class);
    75         job.setMapOutputValueClass(IntWritable.class);
    76 
    77         // 设置Reduce输出类型
    78         job.setOutputKeyClass(Text.class);
    79         job.setOutputValueClass(IntWritable.class);
    80 
    81         // 设置输入和输出目录
    82         FileInputFormat.addInputPath(job, new Path(in));
    83         FileOutputFormat.setOutputPath(job, new Path(out));
    84         
    85         return job.waitForCompletion(true);
    86     }
    87     
    88 }

    二,排序 并求出频率最高的前K个词

      1 package TopK;
      2 
      3 import java.io.IOException;
      4 import java.util.Comparator;
      5 import java.util.Map.Entry;
      6 import java.util.Set;
      7 import java.util.StringTokenizer;
      8 import java.util.TreeMap;
      9 import java.util.regex.Pattern;
     10 
     11 import org.apache.hadoop.conf.Configuration;
     12 import org.apache.hadoop.fs.Path;
     13 import org.apache.hadoop.io.IntWritable;
     14 import org.apache.hadoop.io.Text;
     15 import org.apache.hadoop.mapreduce.Job;
     16 import org.apache.hadoop.mapreduce.Mapper;
     17 import org.apache.hadoop.mapreduce.Reducer;
     18 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     19 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     20 import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
     21 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
     22 
     23 /**
     24  * 以单词出现的频率排序
     25  * 
     26  * @author zx
     27  * zhangxian1991@qq.com
     28  */
     29 public class Sort {
     30 
     31     /**
     32      * 读取单词(词频 word)
     33      * 
     34      * @author zx
     35      * 
     36      */
     37     public static class Map extends Mapper<Object, Text, IntWritable, Text> {
     38 
     39         // 输出key 词频
     40         IntWritable outKey = new IntWritable();
     41         Text outValue = new Text();
     42 
     43         @Override
     44         protected void map(Object key, Text value, Context context)
     45                 throws IOException, InterruptedException {
     46 
     47             StringTokenizer st = new StringTokenizer(value.toString());
     48             while (st.hasMoreTokens()) {
     49                 String element = st.nextToken();
     50                 if (Pattern.matches("\d+", element)) {
     51                     outKey.set(Integer.parseInt(element));
     52                 } else {
     53                     outValue.set(element);
     54                 }
     55             }
     56 
     57             context.write(outKey, outValue);
     58         }
     59 
     60     }
     61 
     62     /**
     63      * 根据词频排序
     64      * 
     65      * @author zx
     66      * 
     67      */
     68     public static class Reduce extends
     69             Reducer<IntWritable, Text, Text, IntWritable> {
     70         
     71         private static MultipleOutputs<Text, IntWritable> mos = null;
     72         
     73         //要获得前K个频率最高的词
     74         private static final int k = 10;
     75         
     76         //用TreeMap存储可以利用它的排序功能
     77         //这里用 MyInt 因为TreeMap是对key排序,且不能唯一,而词频可能相同,要以词频为Key就必需对它封装
     78         private static TreeMap<MyInt, String> tm = new TreeMap<MyInt, String>(new Comparator<MyInt>(){
     79             /**
     80              * 默认是从小到大的顺序排的,现在修改为从大到小
     81              * @param o1
     82              * @param o2
     83              * @return
     84              */
     85             @Override
     86             public int compare(MyInt o1, MyInt o2) {
     87                 return o2.compareTo(o1);
     88             }
     89             
     90         }) ;
     91         
     92         /*
     93          * 以词频为Key是要用到reduce的排序功能
     94          */
     95         @Override
     96         protected void reduce(IntWritable key, Iterable<Text> values,
     97                 Context context) throws IOException, InterruptedException {
     98             for (Text text : values) {
     99                 context.write(text, key);
    100                 tm.put(new MyInt(key.get()),text.toString());
    101                 
    102                 //TreeMap以对内部数据进行了排序,最后一个必定是最小的
    103                 if(tm.size() > k){
    104                     tm.remove(tm.lastKey());
    105                 }
    106                 
    107             }
    108         }
    109 
    110         @Override
    111         protected void cleanup(Context context)
    112                 throws IOException, InterruptedException {
    113             String path = context.getConfiguration().get("topKout");
    114             mos = new MultipleOutputs<Text, IntWritable>(context);
    115             Set<Entry<MyInt, String>> set = tm.entrySet();
    116             for (Entry<MyInt, String> entry : set) {
    117                 mos.write("topKMOS", new Text(entry.getValue()), new IntWritable(entry.getKey().getValue()), path);
    118             }
    119             mos.close();
    120         }
    121 
    122         
    123         
    124     }
    125 
    126     @SuppressWarnings("deprecation")
    127     public static void run(String in, String out,String topKout) throws IOException,
    128             ClassNotFoundException, InterruptedException {
    129 
    130         Path outPath = new Path(out);
    131 
    132         Configuration conf = new Configuration();
    133         
    134         //前K个词要输出到哪个目录
    135         conf.set("topKout",topKout);
    136         
    137         Job job = new Job(conf, "Sort");
    138         job.setJarByClass(Sort.class);
    139         job.setMapperClass(Map.class);
    140         job.setReducerClass(Reduce.class);
    141 
    142         // 设置Map输出类型
    143         job.setMapOutputKeyClass(IntWritable.class);
    144         job.setMapOutputValueClass(Text.class);
    145 
    146         // 设置Reduce输出类型
    147         job.setOutputKeyClass(Text.class);
    148         job.setOutputValueClass(IntWritable.class);
    149 
    150         //设置MultipleOutputs的输出格式
    151         //这里利用MultipleOutputs进行对文件输出
    152         MultipleOutputs.addNamedOutput(job,"topKMOS",TextOutputFormat.class,Text.class,Text.class);
    153         
    154         // 设置输入和输出目录
    155         FileInputFormat.addInputPath(job, new Path(in));
    156         FileOutputFormat.setOutputPath(job, outPath);
    157         job.waitForCompletion(true);
    158 
    159     }
    160 
    161 }

    自己封装的Int

     1 package TopK;
     2 
     3 public class MyInt implements Comparable<MyInt>{
     4     private Integer value;
     5 
     6     public MyInt(Integer value){
     7         this.value = value;
     8     }
     9     
    10     public int getValue() {
    11         return value;
    12     }
    13 
    14     public void setValue(int value) {
    15         this.value = value;
    16     }
    17 
    18     @Override
    19     public int compareTo(MyInt o) {
    20         return value.compareTo(o.getValue());
    21     }
    22     
    23     
    24 }

    运行入口

     1 package TopK;
     2 
     3 import java.io.IOException;
     4 
     5 /**
     6  * 
     7  * @author zx
     8  *zhangxian1991@qq.com
     9  */
    10 public class TopK {
    11     public static void main(String args[]) throws ClassNotFoundException, IOException, InterruptedException{
    12         
    13         //要统计字数,排序的文字
    14         String in = "hdfs://localhost:9000/input/MaDing.text";
    15         
    16         //统计字数后的结果
    17         String wordCout = "hdfs://localhost:9000/out/wordCout";
    18         
    19         //对统计完后的结果再排序后的内容
    20         String sort = "hdfs://localhost:9000/out/sort";
    21         
    22         //前K条
    23         String topK = "hdfs://localhost:9000/out/topK";
    24         
    25         //如果统计字数的job完成后就开始排序
    26         if(WordCount.run(in, wordCout)){
    27             Sort.run(wordCout, sort,topK);
    28         }
    29         
    30     }
    31 }
  • 相关阅读:
    mysql导入导出数据过大命令
    thinkphp条件查询
    php表单提交安全方法
    ubuntu软件(查看文件差异)
    thinkphp if标签
    thinkphp导出报表
    jquery.easing.js下载地址
    水平手风琴切换效果插件亲自试过很好用
    li ie6/7 3px bug
    placeholder兼容IE6-9代码
  • 原文地址:https://www.cnblogs.com/chaoku/p/3739145.html
Copyright © 2011-2022 走看看