zoukankan      html  css  js  c++  java
  • mapreduce项目中加入combiner

    combiner相当于是一个本地的reduce,它的存在是为了减少网络的负担,在本地先进行一次计算再叫计算结果提交给reduce进行二次处理。

    现在的流程为:

    对于combiner我们有这些理解:

     

     

       Mapper代码展示:

    package com.nenu.mprd.test;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class MyMap extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            //获取到单词
            String line=value.toString();
            String[] words=line.split(" ");
            //获取到文件名
            FileSplit filesplit = (FileSplit)context.getInputSplit();
            String fileName =  filesplit.getPath().getName().trim();//.substring(0,5).
            
            String outkey=null;
            for (String word : words) {
                //字母+:+文件名
                outkey=word.trim()+":"+fileName;
                System.out.println("map:"+outkey);
                
                context.write(new Text(outkey), new Text("1"));
            }
        }
    }
    View Code

      Combiner代码展示:

    package com.nenu.mprd.test;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MyCombiner extends Reducer<Text, Text, Text, Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
               Text n = null;//输出key
               int count=0;
               Text m=null;//输出value
               for(Text v :values){ //对同一个map输出的k,v对进行按k进行一次汇总。不同map的k,v汇总必须要用reduce方法
                     String[] words=key.toString().split(":");
                     n=new Text(words[0].trim());//字母--key
                     System.out.println("MyCombiner KEY:"+n);
                     
                     count+=Integer.parseInt(v.toString());
                     m=new Text("("+words[1].trim()+" "+count+")");
                     
               }
               System.out.println("MyCombiner value:"+m);
               context.write(n, m);
        }
    
    }
    View Code

     Reduce代码展示:

    package com.nenu.mprd.test;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MyReduce extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values,
                Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            System.out.println("reduce: key"+key);
            String out="";
            for (Text Text : values) {
                //sum+=intWritable.get();
                out+=Text.toString()+" ";
            }
            System.out.println("reduce value:"+out);
            context.write(key, new Text(out));
        }
    }
    View Code

     Job代码展示:

    package com.nenu.mprd.test;
    
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class MyJob extends Configured implements Tool{
        
        public static void main(String[] args) throws Exception {
            MyJob myJob=new MyJob();
            ToolRunner.run(myJob, null);
        }
        @Override
        public int run(String[] args) throws Exception {
            // TODO Auto-generated method stub
            Configuration conf=new Configuration();
            conf.set("fs.defaultFS", "hdfs://192.168.64.141:9000");
            
            //添加自动删除hadoop下的文件
            //如果导成架包则需要改变一些参数作为手动输入
            FileSystem filesystem =FileSystem.get(new URI("hdfs://192.168.64.141:9000"), conf, "root");
            Path deletePath=new Path("/hadoop/wordcount/city/out");
            if(filesystem.exists(deletePath)){
                filesystem.delete(deletePath,true);//str:  b:
            }
            
            
            Job job=Job.getInstance(conf);
            job.setJarByClass(MyJob.class);
            job.setMapperClass(MyMap.class);
            
            //设置combiner 如果combiner和reduce一样则可以不用设置
            job.setCombinerClass(MyCombiner.class);
            
            job.setReducerClass(MyReduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            FileInputFormat.addInputPath(job, new Path("/hadoop/wordcount/city"));
            FileOutputFormat.setOutputPath(job, new Path("/hadoop/wordcount/city/out"));
            job.waitForCompletion(true);
            return 0;
        }
    
    }
    View Code
  • 相关阅读:
    [转]修改远程桌面端口
    [转]3个著名加密算法(MD5、RSA、DES)的解析
    [转]常见HTTP状态(如200,304,404,503)
    用 SqlConnectionStringBuilder 来写连接字符串,向连接字符串添加设置
    windows 设置ipsec防火墙
    网络带宽单位换算
    Linux 检查端口gps命令
    设置Linux防火墙
    windows 服务器同步互联网时间
    windows 路由转发
  • 原文地址:https://www.cnblogs.com/Amyheartxy/p/9360501.html
Copyright © 2011-2022 走看看