zoukankan      html  css  js  c++  java
  • mutilple output reduce cannot write

    package org.lukey.hadoop.classifyBayes;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    
    public class Probability {
    
        // Client
        public static void main(String[] args) throws Exception {
    
            Configuration conf = new Configuration();
            
            //读取单词总数,设置到congfiguration中
            String totalWordsPath = "/user/hadoop/output/totalwords.txt";
            FileSystem fs = FileSystem.get(URI.create(totalWordsPath), conf);
            FSDataInputStream inputStream = fs.open(new Path(totalWordsPath));
            BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
            String strLine = buffer.readLine();
            String[] temp = strLine.split(":");
            if(temp.length == 2){
                //temp[0] = TOTALWORDS
                conf.setInt(temp[0], Integer.parseInt(temp[1]));
            }
        
            
    
            
            /*
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    
            if (otherArgs.length != 2) {
                System.out.println("Usage <in> <out>");
                System.exit(-1);
            }
    */
            Job job = new Job(conf, "file count");
    
            job.setJarByClass(Probability.class);
    
            job.setMapperClass(WordsOfClassCountMapper.class);
            job.setReducerClass(WordsOfClassCountReducer.class);
    
            String input = "/user/hadoop/mid/wordsFrequence";
            String output = "/user/hadoop/output/probability/";
            
            FileInputFormat.addInputPath(job, new Path(input));
            FileOutputFormat.setOutputPath(job, new Path(output));
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    
        }
    
        private static MultipleOutputs<Text, IntWritable> mos;
    
        // Mapper
        static class WordsOfClassCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
            private  static IntWritable number = new IntWritable();
            
    
            @Override
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {
    
                String[] temp = value.toString().split("	");
                if(temp.length == 3){
                    // 文件夹名类别名
                    String dirName = temp[0];
                    value.set(temp[1]);
                    number.set(Integer.parseInt(temp[2]));
                    mos.write(value, number, dirName);
    
                }
                
    
            }
    
            @Override
            protected void cleanup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                mos.close();
            }
    
            @Override
            protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                mos = new MultipleOutputs<Text, IntWritable>(context);
            }
    
        }
    
        // Reducer
        static class WordsOfClassCountReducer extends Reducer<Text, IntWritable, Text, DoubleWritable> {
    
            // result 表示每个文件里面单词个数
            DoubleWritable result = new DoubleWritable(3);
            Configuration conf = new Configuration();
            int total = conf.getInt("TOTALWORDS", 1);
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values,
                    Reducer<Text, IntWritable, Text, DoubleWritable>.Context context)
                            throws IOException, InterruptedException {
                // TODO Auto-generated method stub
    //            double sum = 0;
    //            for (IntWritable value : values) {
    //                sum += value.get();
    //            }
    //            result.set(sum);
    
                context.write(key, result);
            }
    
        }
    
    }
  • 相关阅读:
    Real-time 3D face tracking and reconstruction from 2D video
    Matlab Multiple View Geometry
    Multi-View 3D Reconstruction
    Scene Reconstruction
    OpenCV C++ Stereo Fisheye Calibration
    史上最全的Matlab资源电子书教程和视频下载合集
    CF-Based-Recommendation
    语种识别工具
    gdb调试
    C语言常见的函数调用
  • 原文地址:https://www.cnblogs.com/luolizhi/p/4940179.html
Copyright © 2011-2022 走看看