zoukankan      html  css  js  c++  java
  • MapReduce_TopK

    vim topk.txt

    a  1000
    b  2000
    c  90000
    d  88
    e  999999
    f  9998
    g  13223

     1 package MapReduce;
     2 
     3 import java.io.IOException;
     4 import java.net.URI;
     5 import java.net.URISyntaxException;
     6 import java.util.TreeMap;  
     7   
     8 import org.apache.hadoop.conf.Configuration;
     9 import org.apache.hadoop.fs.FileSystem;
    10 import org.apache.hadoop.fs.Path;  
    11 import org.apache.hadoop.io.IntWritable;  
    12 import org.apache.hadoop.io.LongWritable;  
    13 import org.apache.hadoop.io.Text;  
    14 import org.apache.hadoop.mapreduce.Job;  
    15 import org.apache.hadoop.mapreduce.Mapper;  
    16 import org.apache.hadoop.mapreduce.Reducer;  
    17 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    18 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    19   
    20 public class TopK {  
    21     private static final String INPUT_PATH = "hdfs://h201:9000/user/hadoop/input_TopK";
    22     private static final String OUTPUT_PATH = "hdfs://h201:9000/user/hadoop/output";
    23     public static final int K = 2;  
    24       
    25     public static class KMap extends Mapper<LongWritable,Text,IntWritable,Text> {    
    26         TreeMap<Integer, String> map = new TreeMap<Integer, String>();    
    27         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {                
    28             String line = value.toString();  
    29             if(line.trim().length() > 0 && line.indexOf("	") != -1) {  //indexof如果没有找到字符串,则返回-1                  
    30                 String[] arr = line.split("	", 2);  
    31                 String name = arr[0];  
    32                 Integer num = Integer.parseInt(arr[1]);  
    33                 map.put(num, name);      
    34                 if(map.size() > K) {  
    35                     map.remove(map.firstKey());  
    36                 }  
    37             }  
    38         }  
    39         @Override  
    40         protected void cleanup(  
    41                 Mapper<LongWritable, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException {                
    42             for(Integer num : map.keySet()) {  
    43                 context.write(new IntWritable(num), new Text(map.get(num)));  
    44             }                
    45         }           
    46     }             
    47     public static class KReduce extends Reducer<IntWritable, Text, IntWritable, Text> {            
    48         TreeMap<Integer, String> map = new TreeMap<Integer, String>();           
    49         public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {                    
    50             map.put(key.get(), values.iterator().next().toString());  
    51             if(map.size() > K) {  
    52                 map.remove(map.firstKey());  
    53             }  
    54         }  
    55         @Override  
    56         protected void cleanup(Reducer<IntWritable, Text, IntWritable, Text>.Context context) throws IOException, InterruptedException {  
    57             for(Integer num : map.keySet()) {  
    58                 context.write(new IntWritable(num), new Text(map.get(num)));  
    59             }  
    60         }  
    61     }  
    62   
    63     public static void main(String[] args) throws IOException, URISyntaxException {  
    64         // TODO Auto-generated method stub  
    65           
    66         Configuration conf = new Configuration();
    67         conf.set("mapred.jar","tk.jar");
    68         final FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf);
    69         fileSystem.delete(new Path(OUTPUT_PATH), true);
    70         try {  
    71             Job job = new Job(conf, "my own word count"); 
    72         
    73             job.setJarByClass(TopK.class);  
    74             job.setMapperClass(KMap.class);  
    75             job.setCombinerClass(KReduce.class);  
    76             job.setReducerClass(KReduce.class);  
    77             job.setOutputKeyClass(IntWritable.class);  
    78             job.setOutputValueClass(Text.class);  
    79             FileInputFormat.setInputPaths(job, INPUT_PATH);  
    80             FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));  
    81             System.out.println(job.waitForCompletion(true));  
    82         } catch (IOException e) {  
    83             // TODO Auto-generated catch block  
    84             e.printStackTrace();  
    85         } catch (ClassNotFoundException e) {  
    86             // TODO Auto-generated catch block  
    87             e.printStackTrace();  
    88         } catch (InterruptedException e) {  
    89             // TODO Auto-generated catch block  
    90             e.printStackTrace();  
    91         }   
    92     }  
    93 }  

    /usr/jdk1.7.0_25/bin/javac TopK.java

    /usr/jdk1.7.0_25/bin/jar cvf tk.jar TopK*class

    [hadoop@h201 ~]$ hadoop fs -cat /user/hadoop/output/part-r-00000
    18/06/10 15:49:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    90000   c
    999999  e

  • 相关阅读:
    257. Binary Tree Paths
    324. Wiggle Sort II
    315. Count of Smaller Numbers After Self
    350. Intersection of Two Arrays II
    295. Find Median from Data Stream
    289. Game of Life
    287. Find the Duplicate Number
    279. Perfect Squares
    384. Shuffle an Array
    E
  • 原文地址:https://www.cnblogs.com/jieran/p/9163565.html
Copyright © 2011-2022 走看看