zoukankan      html  css  js  c++  java
  • MapReduce TOP n

    1.pom.xml

    <dependency>
    			<groupId>org.apache.hadoop</groupId>
    			<artifactId>hadoop-hdfs</artifactId>
    			<version>2.9.0</version>
    		</dependency>
    		
    		<dependency>
    			<groupId>org.apache.hadoop</groupId>
    			<artifactId>hadoop-yarn-api</artifactId>
    			<version>2.9.0</version>
    		</dependency>
    
    		<dependency>
    			<groupId>org.apache.hadoop</groupId>
    			<artifactId>hadoop-client</artifactId>
    			<version>2.9.0</version>
    		</dependency>
    
    		<dependency>
    			<groupId>org.apache.hadoop</groupId>
    			<artifactId>hadoop-common</artifactId>
    			<version>2.9.0</version>
    		</dependency>
                    
    

    2. java文件

      1 package ex.topn;
      2 
      3 import java.util.Comparator;
      4 
      5 public class MySalaryComparator implements Comparator<Salary>  {
      6     @Override
      7     public int compare(Salary e1, Salary e2) {
      8         if(e1.getSum() > e2.getSum()) {
      9             return 1;
     10         }else{
     11             return -1;
     12         }
     13     }
     14 }
     15 
     16 
     17 package ex.topn;
     18 
     19 public class Salary {
     20     private int sum;
     21     
     22     public Salary(int sum) {
     23         this.sum = sum;
     24     }
     25     
     26     public int getSum() {
     27         return sum;
     28     }
     29     
     30     public void setSum(int sum) {
     31         this.sum = sum;
     32     }
     33 }
     34 
     35 
     36 
     37 
     38 package ex.topn;
     39 
     40 import java.io.IOException;
     41 import java.util.Iterator;
     42 import java.util.Map.Entry;
     43 import java.util.TreeMap;
     44 
     45 import org.apache.hadoop.io.LongWritable;
     46 import org.apache.hadoop.io.NullWritable;
     47 import org.apache.hadoop.io.Text;
     48 import org.apache.hadoop.mapreduce.Mapper;
     49 
     50 public class Top10Mapper extends Mapper<LongWritable, Text, NullWritable, Text> {
     51     public static TreeMap<Salary, Text> TopRecordMap = new TreeMap<Salary, Text>(new MySalaryComparator());
     52     
     53     public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
     54         String line = value.toString();
     55         String[] tokens = line.split(",");
     56         
     57         // get salary
     58         int salary = Integer.parseInt(tokens[3]);
     59         System.out.println("=> salary=" + salary);
     60         TopRecordMap.put(new Salary(salary), new Text(value));
     61         
     62         Iterator<Entry<Salary, Text>> iter = TopRecordMap.entrySet().iterator();
     63         Entry<Salary, Text> entry = null;
     64         
     65         System.out.println("TopRecordMap.size()=" + TopRecordMap.size());
     66         while(TopRecordMap.size() > 10) {
     67             entry = iter.next();
     68             iter.remove();
     69         }
     70     }    
     71     
     72     protected void cleanup(Context context) throws IOException, InterruptedException {
     73         // output our ten records to the reducers with a null key
     74     
     75         for(Text t : TopRecordMap.values()) {
     76             context.write(NullWritable.get(), t);
     77             System.out.println(" => " + t + ", " + t.toString());
     78         }
     79     }
     80 }
     81 
     82 
     83 
     84 
     85 package ex.topn;
     86 
     87 import java.io.IOException;
     88 import java.util.Iterator;
     89 import java.util.Map.Entry;
     90 import java.util.TreeMap;
     91 
     92 import org.apache.hadoop.io.NullWritable;
     93 import org.apache.hadoop.io.Text;
     94 import org.apache.hadoop.mapreduce.Reducer;
     95 
     96 public class Top10Reducer extends Reducer<NullWritable, Text, NullWritable, Text> {
     97     public static TreeMap<Salary, Text> TopRecordMap = new TreeMap<Salary, Text>(new MySalaryComparator());
     98     
     99     public void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    100         
    101         for(Text value : values) {
    102             String line = value.toString();
    103             if(line.length() > 0) {
    104                 String[] tokens = line.split(",");
    105                 int salary = Integer.parseInt(tokens[3]);
    106                 System.out.println("=> salary=" + salary);
    107                 TopRecordMap.put(new Salary(salary), new Text(value));
    108                 
    109                 Iterator<Entry<Salary, Text>> iter = TopRecordMap.entrySet().iterator();
    110                 Entry<Salary, Text> entry = null;
    111                 
    112                 System.out.println("TopRecordMap.size()=" + TopRecordMap.size());
    113                 while(TopRecordMap.size() > 10) {
    114                     entry = iter.next();
    115                     iter.remove();
    116                 }        
    117             }
    118         }
    119         
    120 //        Iterator<Entry<Salary, Text>> iter = TopRecordMap.entrySet().iterator();
    121 //        while(iter.hasNext()) {
    122 //            System.out.println("    => " + iter.next().getValue());
    123 //        }
    124         
    125         for(Text t : TopRecordMap.descendingMap().values()) {
    126             context.write(NullWritable.get(), t);
    127         }
    128     }
    129 }
    130 
    131 
    132 
    133 package ex.topn;
    134 
    135 import java.io.IOException;
    136 import java.net.URI;
    137 
    138 import org.apache.hadoop.conf.Configuration;
    139 import org.apache.hadoop.fs.FileSystem;
    140 import org.apache.hadoop.fs.Path;
    141 import org.apache.hadoop.io.NullWritable;
    142 import org.apache.hadoop.io.Text;
    143 import org.apache.hadoop.mapreduce.Job;
    144 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    145 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    146 
    147 public class Topn {
    148     
    149     public static void main(String[] args) throws Exception {
    150         Configuration conf = new Configuration();
    151         
    152 153 Job job = Job.getInstance(conf,"top n"); 154 job.setJarByClass(Topn.class); 155 job.setMapperClass(Top10Mapper.class); 156 job.setCombinerClass(Top10Reducer.class); 157 job.setReducerClass(Top10Reducer.class); 158 job.setNumReduceTasks(1); 159 160 //job.setMapOutputKeyClass(NullWritable.class); 161 //job.setMapOutputValueClass(IntWritable.class); 162 163 job.setOutputKeyClass(NullWritable.class); 164 job.setOutputValueClass(Text.class); 165 166 FileInputFormat.addInputPath(job, new Path(args[0])); 167 FileOutputFormat.setOutputPath(job, new Path(args[1])); 168 System.exit(job.waitForCompletion(true) ? 0 : 1); 169 }170 }

    3. 从eclipse export一个jar:  topn.jar, 上传到 hadoop:  /opt/hadoop-2.9.0/files

    4. 准备三个文件,放入hdfs:   /user/root/topn/input, 每行四个数字,原来的例子第四个字段是代表工资,在这些数据中取前10个工资最高的行,可以有重复。

        这里用比较简单的数字代字代替,前三个字段根本用不到,所以大部分是1,2,3这种。

        

    file03:
    1985,1,2,11
    1985,1,2,12
    1985,1,2,13
    1985,1,2,8
    1985,1,2,7
    1985,1,2,16
    1985,1,2,7
    1985,1,2,20
    195,1,2,25
    1,2,3,22
    1,2,3,27
    12,2,3,18
    1,2,3,17
    1,2,3,15
    
    
    file04:
    1,2,3,2
    12,3,3,3
    1,2,3,4
    12,3,4,5
    1,2,3,36
    1,2,3,34
    12,2,3,27
    1,2,3,48
    1,2,3,35
    12,3,3,28
    1,2,3,19
    12,2,2,31
    1,2,3,29
    
    
    file05:
    1,2,3,55
    1,2,3,39
    1,2,3,2
    a,a,a,35
    

     5.执行:

    例子以root用户运行    
    # cd /opt/hadoop-2.9.0 # bin/yarn jar files/topn.jar ex.topn.Topn topn/input topn/output

     6.运行结果:

    1,2,3,55
    1,2,3,48
    1,2,3,39
    1,2,3,36
    a,a,a,35
    1,2,3,35
    1,2,3,34
    12,2,2,31
    1,2,3,29
    12,3,3,28
    
  • 相关阅读:
    servlet多线程同步问题
    servlet之request
    servlet方法
    非静态内部类不能有静态成员
    接口与抽象类的区别
    枚举
    Install CUDA 6.0 on Ubuntu 14.04 LTS
    Introduction to Deep Learning Algorithms
    codeblocks 使用汇总
    矩阵奇异值分解(SVD)及其应用
  • 原文地址:https://www.cnblogs.com/bear129/p/9415567.html
Copyright © 2011-2022 走看看