zoukankan      html  css  js  c++  java
  • mutilple output reduce cannot write

    package org.lukey.hadoop.classifyBayes;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    
    public class Probability {
    
        // Client
        public static void main(String[] args) throws Exception {
    
            Configuration conf = new Configuration();
            
            //读取单词总数,设置到congfiguration中
            String totalWordsPath = "/user/hadoop/output/totalwords.txt";
            FileSystem fs = FileSystem.get(URI.create(totalWordsPath), conf);
            FSDataInputStream inputStream = fs.open(new Path(totalWordsPath));
            BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
            String strLine = buffer.readLine();
            String[] temp = strLine.split(":");
            if(temp.length == 2){
                //temp[0] = TOTALWORDS
                conf.setInt(temp[0], Integer.parseInt(temp[1]));
            }
        
            
    
            
            /*
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    
            if (otherArgs.length != 2) {
                System.out.println("Usage <in> <out>");
                System.exit(-1);
            }
    */
            Job job = new Job(conf, "file count");
    
            job.setJarByClass(Probability.class);
    
            job.setMapperClass(WordsOfClassCountMapper.class);
            job.setReducerClass(WordsOfClassCountReducer.class);
    
            String input = "/user/hadoop/mid/wordsFrequence";
            String output = "/user/hadoop/output/probability/";
            
            FileInputFormat.addInputPath(job, new Path(input));
            FileOutputFormat.setOutputPath(job, new Path(output));
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    
        }
    
        private static MultipleOutputs<Text, IntWritable> mos;
    
        // Mapper
        static class WordsOfClassCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
            private  static IntWritable number = new IntWritable();
            
    
            @Override
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {
    
                String[] temp = value.toString().split("	");
                if(temp.length == 3){
                    // 文件夹名类别名
                    String dirName = temp[0];
                    value.set(temp[1]);
                    number.set(Integer.parseInt(temp[2]));
                    mos.write(value, number, dirName);
    
                }
                
    
            }
    
            @Override
            protected void cleanup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                mos.close();
            }
    
            @Override
            protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                mos = new MultipleOutputs<Text, IntWritable>(context);
            }
    
        }
    
        // Reducer
        static class WordsOfClassCountReducer extends Reducer<Text, IntWritable, Text, DoubleWritable> {
    
            // result 表示每个文件里面单词个数
            DoubleWritable result = new DoubleWritable(3);
            Configuration conf = new Configuration();
            int total = conf.getInt("TOTALWORDS", 1);
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values,
                    Reducer<Text, IntWritable, Text, DoubleWritable>.Context context)
                            throws IOException, InterruptedException {
                // TODO Auto-generated method stub
    //            double sum = 0;
    //            for (IntWritable value : values) {
    //                sum += value.get();
    //            }
    //            result.set(sum);
    
                context.write(key, result);
            }
    
        }
    
    }
  • 相关阅读:
    js设置与获取Cookie
    js,正则应用
    Ajax支持跨域之Web API实现
    RSA加密解密在jsencrypt+c#的实现-博客园加密登录
    转:sqlserver无法创建索引,超时时间已到解决办法
    【转】asp.net 项目在 IE 11 下出现 “__doPostBack”未定义 的解决办法
    c#连接SFTP上传文件
    mac 修改mysql root密码
    Vue学习手记09-mock与axios拦截的使用
    Vue学习手记08-vue-cli的启动过程
  • 原文地址:https://www.cnblogs.com/luolizhi/p/4940179.html
Copyright © 2011-2022 走看看