zoukankan      html  css  js  c++  java
  • mutilple output reduce cannot write

    package org.lukey.hadoop.classifyBayes;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    
    public class Probability {
    
        // Client
        public static void main(String[] args) throws Exception {
    
            Configuration conf = new Configuration();
            
            //读取单词总数,设置到congfiguration中
            String totalWordsPath = "/user/hadoop/output/totalwords.txt";
            FileSystem fs = FileSystem.get(URI.create(totalWordsPath), conf);
            FSDataInputStream inputStream = fs.open(new Path(totalWordsPath));
            BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
            String strLine = buffer.readLine();
            String[] temp = strLine.split(":");
            if(temp.length == 2){
                //temp[0] = TOTALWORDS
                conf.setInt(temp[0], Integer.parseInt(temp[1]));
            }
        
            
    
            
            /*
            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    
            if (otherArgs.length != 2) {
                System.out.println("Usage <in> <out>");
                System.exit(-1);
            }
    */
            Job job = new Job(conf, "file count");
    
            job.setJarByClass(Probability.class);
    
            job.setMapperClass(WordsOfClassCountMapper.class);
            job.setReducerClass(WordsOfClassCountReducer.class);
    
            String input = "/user/hadoop/mid/wordsFrequence";
            String output = "/user/hadoop/output/probability/";
            
            FileInputFormat.addInputPath(job, new Path(input));
            FileOutputFormat.setOutputPath(job, new Path(output));
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    
        }
    
        private static MultipleOutputs<Text, IntWritable> mos;
    
        // Mapper
        static class WordsOfClassCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
            private  static IntWritable number = new IntWritable();
            
    
            @Override
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {
    
                String[] temp = value.toString().split("	");
                if(temp.length == 3){
                    // 文件夹名类别名
                    String dirName = temp[0];
                    value.set(temp[1]);
                    number.set(Integer.parseInt(temp[2]));
                    mos.write(value, number, dirName);
    
                }
                
    
            }
    
            @Override
            protected void cleanup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                mos.close();
            }
    
            @Override
            protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                mos = new MultipleOutputs<Text, IntWritable>(context);
            }
    
        }
    
        // Reducer
        static class WordsOfClassCountReducer extends Reducer<Text, IntWritable, Text, DoubleWritable> {
    
            // result 表示每个文件里面单词个数
            DoubleWritable result = new DoubleWritable(3);
            Configuration conf = new Configuration();
            int total = conf.getInt("TOTALWORDS", 1);
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values,
                    Reducer<Text, IntWritable, Text, DoubleWritable>.Context context)
                            throws IOException, InterruptedException {
                // TODO Auto-generated method stub
    //            double sum = 0;
    //            for (IntWritable value : values) {
    //                sum += value.get();
    //            }
    //            result.set(sum);
    
                context.write(key, result);
            }
    
        }
    
    }
  • 相关阅读:
    HTML DOM 06 节点关系
    HTML DOM 05 事件(三)
    HTML DOM 05 事件(二)
    HTML DOM 05 事件(一)
    html DOM 04 样式
    html DOM 03 节点的属性
    html DOM 02 获取节点
    html DOM 01 节点概念
    JavaScript 29 计时器
    JavaScript 28 弹出框
  • 原文地址:https://www.cnblogs.com/luolizhi/p/4940179.html
Copyright © 2011-2022 走看看