zoukankan      html  css  js  c++  java
  • hadoop之mapper类妙用

    1. Mapper类

    首先 Mapper类有四个方法:

    (1) protected void setup(Context context)

    (2) Protected void map(KEYIN key,VALUEIN value,Context context)

    (3) protected void cleanup(Context context)

    (4) public void run(Context context)

    setup()方法一般用来加载一些初始化的工作,像全局文件建立数据库的链接等等;cleanup()方法是收尾工作,如关闭文件或者执行map()后的键值分发等;map()函数就不多说了.

    默认的Mapper的run()方法的核心代码如下:

    public void run(Context context) throws IOException,InterruptedException
    {
        setup(context);
        while(context.nextKeyValue())
              map(context.getCurrentKey(),context,context.getCurrentValue(),context);
        cleanup(context);
    }

    从代码中也可以看出先执行setup函数,然后是map处理代码,最后是cleanup的收尾工作.值得注意的是,setup函数和cleanup函数由系统作为回调函数只做一次,并不像map函数那样执行多次.

    2.setup函数应用

       经典的wordcount在setup函数中加入黑名单就可以实现对黑名单中单词的过滤,详细代码如下:

     
    public class WordCount {  
       static private String blacklistFileName= "blacklist.dat";
      
        public static class WordCountMap extends  
                Mapper<LongWritable, Text, Text, IntWritable> {  
      
            private final IntWritable one = new IntWritable(1);  
            private Text word = new Text(); 
            private Set<String> blacklist;
      
            protected void setup(Context context) throws IOException,InterruptedException {
                blacklist=new TreeSet<String>();
    
                try{
                  FileReader fileReader=new FileReader(blacklistFileName);
                  BufferedReader bufferedReader=bew BufferedReader(fileReader);
                  String str;
                  while((str=bufferedReader.readLine())!=null){
                    blacklist.add(str);
                  }
                } catch(IOException e){
                    e.printStackTrace();
                }
            } 
    
            public void map(LongWritable key, Text value, Context context)  
                    throws IOException, InterruptedException {  
                String line = value.toString();  
                StringTokenizer token = new StringTokenizer(line);  
                while (token.hasMoreTokens()) {  
                    word.set(token.nextToken());
                    if(blacklist.contains(word.toString())){
                       continue;
                    }
                    context.write(word, one);  
                }  
            }  
        }  
      
        public static class WordCountReduce extends  
                Reducer<Text, IntWritable, Text, IntWritable> {  
      
            public void reduce(Text key, Iterable<IntWritable> values,  
                    Context context) throws IOException, InterruptedException {  
                int sum = 0;  
                for (IntWritable val : values) {  
                    sum += val.get();  
                }  
                context.write(key, new IntWritable(sum));  
            }  
        }  
      
        public static void main(String[] args) throws Exception {  
            Configuration conf = new Configuration();  
            Job job = new Job(conf);  
            job.setJarByClass(WordCount.class);  
            job.setJobName("wordcount");  
      
            job.setOutputKeyClass(Text.class);  
            job.setOutputValueClass(IntWritable.class);  
      
            job.setMapperClass(WordCountMap.class);  
            job.setCombinerClass(WordCountReduce.class);
            job.setReducerClass(WordCountReduce.class);  
      
            job.setInputFormatClass(TextInputFormat.class);  
            job.setOutputFormatClass(TextOutputFormat.class);  
      
            FileInputFormat.addInputPath(job, new Path(args[0]));  
            FileOutputFormat.setOutputPath(job, new Path(args[1]));  
      
            System.exit(job.waitForCompletion(true) ? 0 : 1);  
        }  
    }  

    3.cleanup应用

          求最值最简单的办法就是对该文件进行一次遍历得出最值,但是现实中数据比量比较大,这种方法不能实现。在传统的MapReduce思想中,将文件的数据经 过map迭代出来送到reduce中,在Reduce中求出最大值。但这个方法显然不够优化,我们可采用“分而治之”的思想,不需要map的所有数据全部 送到reduce中,我们可以在map中先求出最大值,将该map任务的最大值送reduce中,这样就减少了数据的传输量。那么什么时候该把这个数据写 出去呢?我们知道,每一个键值对都会调用一次map(),由于数据量大调用map()的次数也就多了,显然在map()函数中将该数据写出去是不明智的, 所以最好的办法该Mapper任务结束后将该数据写出去。我们又知道,当Mapper/Reducer任务结束后会调用cleanup函数,所以我们可以 在该函数中将该数据写出去。了解了这些我们可以看一下程序的代码:

    public class TopKApp {
        static final String INPUT_PATH = "hdfs://hadoop:9000/input2";
        static final String OUT_PATH = "hdfs://hadoop:9000/out2";
        
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
            final Path outPath = new Path(OUT_PATH);
            if(fileSystem.exists(outPath)){
                fileSystem.delete(outPath, true);
            }
            
            final Job job = new Job(conf , WordCountApp.class.getSimpleName());
            FileInputFormat.setInputPaths(job, INPUT_PATH);
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            job.setOutputKeyClass(LongWritable.class);
            job.setOutputValueClass(NullWritable.class);
            FileOutputFormat.setOutputPath(job, outPath);
            job.waitForCompletion(true);
        }
        static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{
            long max = Long.MIN_VALUE;
            protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
                final long temp = Long.parseLong(v1.toString());
                if(temp>max){
                    max = temp;
                }
            }
            
            protected void cleanup(org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,LongWritable, NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
                context.write(new LongWritable(max), NullWritable.get());
            }
        }
    
        static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{
            long max = Long.MIN_VALUE;
            protected void reduce(LongWritable k2, java.lang.Iterable<NullWritable> arg1, org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context arg2) 
             throws java.io.IOException ,InterruptedException {
                final long temp = k2.get();
                if(temp>max){
                    max = temp;
                }
            }
            
            protected void cleanup(org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
                context.write(new LongWritable(max), NullWritable.get());
            }
        }        
    }
  • 相关阅读:
    slqmap简单使用
    Sql注入类型
    路由器协议----IGP、EGP、RIP、OSPF、BGP、MPLS
    TCPIP协议簇-各层主要协议帧格式
    TCP/IP协议(7):应用层
    TCP/IP协议(5):传输层之TCP
    TCP/IP协议(6):传输层之UDP
    Mysql数据库优化之SQL及索引优化
    公众号支付时,如何判断是否是微信浏览器
    ajax返回数据为undefined
  • 原文地址:https://www.cnblogs.com/wzyj/p/4692570.html
Copyright © 2011-2022 走看看