zoukankan      html  css  js  c++  java
  • hadoop之mapper类妙用

    1. Mapper类

    首先 Mapper类有四个方法:

    (1) protected void setup(Context context)

    (2) Protected void map(KEYIN key,VALUEIN value,Context context)

    (3) protected void cleanup(Context context)

    (4) public void run(Context context)

    setup()方法一般用来加载一些初始化的工作,像全局文件建立数据库的链接等等;cleanup()方法是收尾工作,如关闭文件或者执行map()后的键值分发等;map()函数就不多说了.

    默认的Mapper的run()方法的核心代码如下:

    public void run(Context context) throws IOException,InterruptedException
    {
        setup(context);
        while(context.nextKeyValue())
              map(context.getCurrentKey(),context,context.getCurrentValue(),context);
        cleanup(context);
    }

    从代码中也可以看出先执行setup函数,然后是map处理代码,最后是cleanup的收尾工作.值得注意的是,setup函数和cleanup函数由系统作为回调函数只做一次,并不像map函数那样执行多次.

    2.setup函数应用

       经典的wordcount在setup函数中加入黑名单就可以实现对黑名单中单词的过滤,详细代码如下:

     
    public class WordCount {  
       static private String blacklistFileName= "blacklist.dat";
      
        public static class WordCountMap extends  
                Mapper<LongWritable, Text, Text, IntWritable> {  
      
            private final IntWritable one = new IntWritable(1);  
            private Text word = new Text(); 
            private Set<String> blacklist;
      
            protected void setup(Context context) throws IOException,InterruptedException {
                blacklist=new TreeSet<String>();
    
                try{
                  FileReader fileReader=new FileReader(blacklistFileName);
                  BufferedReader bufferedReader=bew BufferedReader(fileReader);
                  String str;
                  while((str=bufferedReader.readLine())!=null){
                    blacklist.add(str);
                  }
                } catch(IOException e){
                    e.printStackTrace();
                }
            } 
    
            public void map(LongWritable key, Text value, Context context)  
                    throws IOException, InterruptedException {  
                String line = value.toString();  
                StringTokenizer token = new StringTokenizer(line);  
                while (token.hasMoreTokens()) {  
                    word.set(token.nextToken());
                    if(blacklist.contains(word.toString())){
                       continue;
                    }
                    context.write(word, one);  
                }  
            }  
        }  
      
        public static class WordCountReduce extends  
                Reducer<Text, IntWritable, Text, IntWritable> {  
      
            public void reduce(Text key, Iterable<IntWritable> values,  
                    Context context) throws IOException, InterruptedException {  
                int sum = 0;  
                for (IntWritable val : values) {  
                    sum += val.get();  
                }  
                context.write(key, new IntWritable(sum));  
            }  
        }  
      
        public static void main(String[] args) throws Exception {  
            Configuration conf = new Configuration();  
            Job job = new Job(conf);  
            job.setJarByClass(WordCount.class);  
            job.setJobName("wordcount");  
      
            job.setOutputKeyClass(Text.class);  
            job.setOutputValueClass(IntWritable.class);  
      
            job.setMapperClass(WordCountMap.class);  
            job.setCombinerClass(WordCountReduce.class);
            job.setReducerClass(WordCountReduce.class);  
      
            job.setInputFormatClass(TextInputFormat.class);  
            job.setOutputFormatClass(TextOutputFormat.class);  
      
            FileInputFormat.addInputPath(job, new Path(args[0]));  
            FileOutputFormat.setOutputPath(job, new Path(args[1]));  
      
            System.exit(job.waitForCompletion(true) ? 0 : 1);  
        }  
    }  

    3.cleanup应用

          求最值最简单的办法就是对该文件进行一次遍历得出最值,但是现实中数据比量比较大,这种方法不能实现。在传统的MapReduce思想中,将文件的数据经 过map迭代出来送到reduce中,在Reduce中求出最大值。但这个方法显然不够优化,我们可采用“分而治之”的思想,不需要map的所有数据全部 送到reduce中,我们可以在map中先求出最大值,将该map任务的最大值送reduce中,这样就减少了数据的传输量。那么什么时候该把这个数据写 出去呢?我们知道,每一个键值对都会调用一次map(),由于数据量大调用map()的次数也就多了,显然在map()函数中将该数据写出去是不明智的, 所以最好的办法该Mapper任务结束后将该数据写出去。我们又知道,当Mapper/Reducer任务结束后会调用cleanup函数,所以我们可以 在该函数中将该数据写出去。了解了这些我们可以看一下程序的代码:

    public class TopKApp {
        static final String INPUT_PATH = "hdfs://hadoop:9000/input2";
        static final String OUT_PATH = "hdfs://hadoop:9000/out2";
        
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
            final Path outPath = new Path(OUT_PATH);
            if(fileSystem.exists(outPath)){
                fileSystem.delete(outPath, true);
            }
            
            final Job job = new Job(conf , WordCountApp.class.getSimpleName());
            FileInputFormat.setInputPaths(job, INPUT_PATH);
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            job.setOutputKeyClass(LongWritable.class);
            job.setOutputValueClass(NullWritable.class);
            FileOutputFormat.setOutputPath(job, outPath);
            job.waitForCompletion(true);
        }
        static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{
            long max = Long.MIN_VALUE;
            protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
                final long temp = Long.parseLong(v1.toString());
                if(temp>max){
                    max = temp;
                }
            }
            
            protected void cleanup(org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,LongWritable, NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
                context.write(new LongWritable(max), NullWritable.get());
            }
        }
    
        static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{
            long max = Long.MIN_VALUE;
            protected void reduce(LongWritable k2, java.lang.Iterable<NullWritable> arg1, org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context arg2) 
             throws java.io.IOException ,InterruptedException {
                final long temp = k2.get();
                if(temp>max){
                    max = temp;
                }
            }
            
            protected void cleanup(org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
                context.write(new LongWritable(max), NullWritable.get());
            }
        }        
    }
  • 相关阅读:
    606. Construct String from Binary Tree
    696. Count Binary Substrings
    POJ 3255 Roadblocks (次短路)
    POJ 2823 Sliding Window (单调队列)
    POJ 1704 Georgia and Bob (博弈)
    UVa 1663 Purifying Machine (二分匹配)
    UVa 10801 Lift Hopping (Dijkstra)
    POJ 3281 Dining (网络流之最大流)
    UVa 11100 The Trip, 2007 (题意+贪心)
    UVaLive 4254 Processor (二分+优先队列)
  • 原文地址:https://www.cnblogs.com/wzyj/p/4692570.html
Copyright © 2011-2022 走看看