zoukankan      html  css  js  c++  java
  • Hadoop 文本分类 终于跑通了

    Training 

    入口

    package org.wordCount;
    
    import java.util.ArrayList;
    import java.util.List;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class WordMain {
    
        // private static List<String> secondDir = new ArrayList<String>();
    
        public static void main(String[] args) throws Exception {
        
            Configuration conf = new Configuration();
            //下面两行很重要
            conf.set("mapred.jar", "E://eclipse//jar-work//WordMain.jar");
            conf.set("mapred.job.tracker", "192.168.190.128:9001");
            
            //设置单词先验概率的保存路径
            String priorProbality = "hdfs://192.168.190.128:9000/user/hadoop/output/priorP/priorProbability.txt";
            conf.set("priorProbality", priorProbality);
            
            //单词总种类数的保存路径
            String totalWordsPath = "hdfs://192.168.190.128:9000/user/hadoop/output/totalwords.txt";
            conf.set("totalWordsPath", totalWordsPath);
            
            //每个类别中单词总数
            String wordsInClassPath = "hdfs://192.168.190.128:9000/user/hadoop/mid/wordsFrequence/_wordsInClass/wordsInClass-r-00000";
            conf.set("wordsInClassPath", wordsInClassPath);
            
            //设置输入 和 单词词频的输出路径
            // "/user/hadoop/input/NBCorpus/Country"
            String input = "hdfs://192.168.190.128:9000/user/hadoop/input/NBCorpus/Country";
            String wordsOutput = "hdfs://192.168.190.128:9000/user/hadoop/mid/wordsFrequence";
            conf.set("input", input);
            conf.set("wordsOutput", wordsOutput);
            
            //每个类别单词概率保存路径,
            //单词词频的输入路径也就是单词词频的输出路径
            
            String freqOutput = "hdfs://192.168.190.128:9000/user/hadoop/output/probability/";
            conf.set("freqOutput", freqOutput);
    
            
            
            
            FileCount.run(conf);
            WordCount.run(conf);
            Probability.run(conf);
    /*        
            System.out.print("----------");
            
            
            String[] otherArgs = new String[] { "hdfs://192.168.190.128:9000/user/hadoop/test/",
                    "hdfs://192.168.190.128:9000/user/hadoop/wordcount/output2/" };
            conf.set("mapred.jar", "E://eclipse//jar-work//WordMain.jar");
        
            Job job = new Job(conf, "word count");
            job.setJarByClass(WordMain.class);
    
            job.setInputFormatClass(MyInputFormat.class);
    
            job.setMapperClass(WordMapper.class);
    //        job.setCombinerClass(WordReducer.class);
            job.setReducerClass(WordReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            // MyUtils.addInputPath(job, new Path(otherArgs[0]), conf);
    
            
            
            List<Path> inputPaths = getSecondDir(conf, otherArgs[0]);
            for (Path path : inputPaths) {
                System.out.println("path = " + path.toString());
                MyInputFormat.addInputPath(job, path);
                
            }
            System.out.println("addinputpath     ok" );
    //        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
            
    
    
            
            System.exit(job.waitForCompletion(true) ? 0 : 1);*/
            
            
        }
    
        // 获取文件夹下面二级文件夹路径的方法
        static List<Path> getSecondDir(Configuration conf, String folder) throws Exception {
            Path path = new Path(folder);
            FileSystem fs = path.getFileSystem(conf);
            FileStatus[] stats = fs.listStatus(path);
            List<Path> folderPath = new ArrayList<Path>();
            for (FileStatus stat : stats) {
                if (stat.isDir()) {
                    if (fs.listStatus(stat.getPath()).length > 10) { // 筛选出文件数大于10个的类别作为
                                                                        // 输入路径
                        folderPath.add(stat.getPath());
                    }
                }
            }
            return folderPath;
        }
    
        
    
        
    
    }
    View Code

    统计各个类别文本数

    package org.wordCount;
    
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    
    /**
     * 
     * 获取文件个数,并计算先验概率 先验概率保存在/user/hadoop/output/priorP/prior.txt
     * 
     */
    
    public class FileCount {
    
        public static void run(Configuration conf) throws Exception {
    
            int sum = 0;
            String in = conf.get("input");
    
    
            Map<String, Integer> map = new HashMap<>();
            Map<String, Double> priorMap = new HashMap<>();
    
    
            // map传值(需要筛选测试集,有的类别文本数太少要删除)
            map = FileCount.getFileNumber(in);
    
            //测试打印出每个类别和文件总数
            Iterator<Map.Entry<String, Integer>> itrs = map.entrySet().iterator();
            while (itrs.hasNext()) {
    //            System.out.println("ok");
                Map.Entry<String, Integer> it = itrs.next();
                if(it.getValue() <= 10){    //这两行代码可以不计算文本数少于10的类别
                    itrs.remove();
                }else{
                    sum += it.getValue();
                    System.out.println(it.getKey() + "	" + it.getValue());
                }
            }
            
            System.out.println("sum = " + sum);
    
    
            String output = conf.get("priorProbality");
            
            Path outputPath = new Path(output);
            FileSystem fs = outputPath.getFileSystem(conf);
            FSDataOutputStream outputStream = fs.create(outputPath);
            
            //计算每个类别文本占总文本的比率,即先验概率
            String ctx = "";
            for (Map.Entry<String, Integer> entry : map.entrySet()) {
                Double result = 0.0;
                result = Double.parseDouble(entry.getValue().toString()) / sum;
                priorMap.put(entry.getKey(), result);//保存在priorMap中
                ctx += entry.getKey() + "	" + result + "
    ";
            }
            outputStream.writeBytes(ctx);
            IOUtils.closeStream(outputStream);
            
            // 打印概率信息,同时可以写入文件中
            // map的另外一种遍历方法
            Iterator<Map.Entry<String, Double>> iterators = priorMap.entrySet().iterator();
            while (iterators.hasNext()) {
                Map.Entry<String, Double> iterator = iterators.next();
                System.out.println(iterator.getKey() + "	" + iterator.getValue());
            }
    
        }
    
        // get 方法
        public static Map<String, Integer> getFileNumber(String folderPath) throws Exception {
        
            Map<String, Integer> fileMap = new HashMap<>();
            Configuration conf = new Configuration();
            
            Path path = new Path(folderPath);
            FileSystem hdfs = path.getFileSystem(conf);
            FileStatus[] status = hdfs.listStatus(path);
    //        System.out.println(folderPath);
    //        System.out.println("status.length = " + status.length);
            
            for (FileStatus stat : status) {
                if (stat.isDir()) {
                    int length = hdfs.listStatus(stat.getPath()).length;
                    String name = stat.getPath().getName();
                    fileMap.put(name, length);
                }
            }
    
        
            return fileMap;
        }
    
    }
    View Code

    文本中单词计数

    package org.wordCount;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Counter;
    import org.apache.hadoop.mapreduce.Counters;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    
    public class WordCount {
    
        private static MultipleOutputs<Text, IntWritable> mos;
        // static String baseOutputPath = "/user/hadoop/test_out";
    
        // 设计两个map分别计算每个类别的文本数//和每个类别的单词总数
        // private static Map<String, List<String>> fileCountMap = new
        // HashMap<String, List<String>>();
        // private static Map<String, Integer> fileCount = new HashMap<String,
        // Integer>();
        // static Map<String, List<String>> wordsCountInClassMap = new
        // HashMap<String, List<String>>();
    
        static enum WordsNature {
            CLSASS_NUMBER, CLASS_WORDS, TOTALWORDS
        }
    
        // map
        static class First_Mapper extends Mapper<Text, Text, Text, IntWritable> {
    
            private final static IntWritable one = new IntWritable(1);
            private final static IntWritable zero = new IntWritable(0);
    
            private Text countryName = new Text();
    
            @Override
            protected void map(Text key, Text value, Mapper<Text, Text, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {
    
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    String word = itr.nextToken();
                    if (!(MyUtils.hasDigit(word) || word.contains("."))) { // 去掉无意义词
                        countryName.set(key.toString() + "	" + word);
                        
                        context.write(countryName, one); // 统计每个类别中的单词个数 ABL have 1
                        context.write(key, one); // 统计类别中的单词总数
                        context.write(new Text(word), zero); // 统计单词总数
                    }
                }
    
            }
        }
    
        // Reducer
        static class First_Reducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
            // result 表示每个类别中每个单词的个数
            IntWritable result = new IntWritable();
            Map<String, List<String>> classMap = new HashMap<String, List<String>>();
            Map<String, List<String>> fileMap = new HashMap<String, List<String>>();
    
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values,
                    Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                            throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable value : values) {
                    sum += value.get();
                }
    
                // sum为0,总得单词数加1,统计所有单词的种类
                if (sum == 0) {
                    context.getCounter(WordsNature.TOTALWORDS).increment(1);
                } else {// sum不为0时,通过key的长度来判断,
                    String[] temp = key.toString().split("	");
                    if (temp.length == 2) { // 用tab分隔类别和单词
                        result.set(sum);
                        context.write(key, result);
                        // mos.write(new Text(temp[1]), result, temp[0]);
                    } else { // 类别中单词总数
                        result.set(sum);
                        mos.write(key, result, "_wordsInClass" + "\" + "wordsInClass");
                    }
    
                }
    
            }
    
            @Override
            protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {
    
                mos.close();
            }
    
            @Override
            protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                    throws IOException, InterruptedException {
    
                mos = new MultipleOutputs<Text, IntWritable>(context);
            }
    
        }
    
        public static int run(Configuration conf) throws Exception {
    //        Configuration conf = new Configuration();
            // System.out.print("---run-------");
            // 设置不同文件的路径
            // 文本数路径
    //        String priorProbality = "hdfs://192.168.190.128:9000/user/hadoop/output/priorP/priorProbality.txt";
    //        conf.set("priorProbality", priorProbality);
    
            
            
    
            Job job = new Job(conf, "file count");
    
            job.setJarByClass(WordCount.class);
    
            job.setInputFormatClass(MyInputFormat.class);
    
            job.setMapperClass(WordCount.First_Mapper.class);
            job.setReducerClass(WordCount.First_Reducer.class);
            // System.out.println("---job-------");
            // 过滤掉文本数少于10的类别
    
            String input = conf.get("input");
            
            List<Path> inputPaths = MyUtils.getSecondDir(conf, input);
            for (Path path : inputPaths) {
                System.out.println("path = " + path.toString());
                MyInputFormat.addInputPath(job, path);
            }
    
            String wordsOutput = conf.get("wordsOutput");
            FileOutputFormat.setOutputPath(job, new Path(wordsOutput));
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            int exitCode = job.waitForCompletion(true) ? 0 : 1;
    
            // 调用计数器
            Counters counters = job.getCounters();
            Counter c1 = counters.findCounter(WordsNature.TOTALWORDS);
            System.out.println("-------------->>>>: " + c1.getDisplayName() + ":" + c1.getName() + ": " + c1.getValue());
    
            // 将单词种类数写入文件中
            Path totalWordsPath = new Path("hdfs://192.168.190.128:9000/user/hadoop/output/totalwords.txt");
            FileSystem fs = totalWordsPath.getFileSystem(conf);
            FSDataOutputStream outputStream = fs.create(totalWordsPath);
            outputStream.writeBytes(c1.getDisplayName() + ":" + c1.getValue());
            IOUtils.closeStream(outputStream);
    
            // 下次求概率是尝试单词总种类数写到configuration中
            //
            // conf.set("TOTALWORDS", totalWords.toString());
    
            return exitCode;
    
        }
    
    }
    View Code

    MyInputFormat

    package org.wordCount;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    
    public class Probability {
    
        private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
        public static int total = 0;
        private static MultipleOutputs<Text, DoubleWritable> mos;
    
        // Client
        public static void run(Configuration conf) throws Exception {
    
    
            // 读取单词总数,设置到congfiguration中
            String totalWordsPath = conf.get("totalWordsPath");
    //        String wordsInClassPath = conf.get("wordsInClassPath");
    
    
    
    
            // 先读取单词总类别数
            FileSystem fs = FileSystem.get(URI.create(totalWordsPath), conf);
            FSDataInputStream inputStream = fs.open(new Path(totalWordsPath));
            BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
            String strLine = buffer.readLine();
            String[] temp = strLine.split(":");
            if (temp.length == 2) {
                // temp[0] = TOTALWORDS
                conf.set(temp[0], temp[1]);// 设置两个String
            }
    
            total = Integer.parseInt(conf.get("TOTALWORDS"));
            LOG.info("------>total = " + total);
    
            System.out.println("total ==== " + total);
            
            
            Job job = new Job(conf, "file count");
    
            job.setJarByClass(Probability.class);
    
            job.setMapperClass(WordsOfClassCountMapper.class);
            job.setReducerClass(WordsOfClassCountReducer.class);
    
            String input = conf.get("wordsOutput");
            String output = conf.get("freqOutput");
    
            FileInputFormat.addInputPath(job, new Path(input));
            FileOutputFormat.setOutputPath(job, new Path(output));
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DoubleWritable.class);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    
        }
    
        // Mapper
        static class WordsOfClassCountMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
    
            private static DoubleWritable number = new DoubleWritable();
            private static Text className = new Text();
    
            // 保存类别中单词总数
            private static Map<String, Integer> filemap = new HashMap<String, Integer>();
    
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
                            throws IOException, InterruptedException {
                Configuration conf = context.getConfiguration();
                int tot = Integer.parseInt(conf.get("TOTALWORDS"));
    
                System.out.println("total = " + total);
                System.out.println("tot = " + tot);
    
                // 输入的格式如下:
                // ALB weekend 1
                // ALB weeks 3
                Map<String, Map<String, Integer>> baseMap = new HashMap<String, Map<String, Integer>>(); // 保存基础数据
                // Map<String, Map<String, Double>> priorMap = new HashMap<String,
                // Map<String, Double>>(); // 保存每个单词出现的概率
    
                String[] temp = value.toString().split("	");
                // 先将数据存到baseMap中
                if (temp.length == 3) {
                    // 文件夹名类别名
                    if (baseMap.containsKey(temp[0])) {
                        baseMap.get(temp[0]).put(temp[1], Integer.parseInt(temp[2]));
                    } else {
                        Map<String, Integer> oneMap = new HashMap<String, Integer>();
                        oneMap.put(temp[1], Integer.parseInt(temp[2]));
                        baseMap.put(temp[0], oneMap);
                    }
    
                } // 读取数据完毕,全部保存在baseMap中
    
                int allWordsInClass = 0;
                
    
                for (Map.Entry<String, Map<String, Integer>> entries : baseMap.entrySet()) { // 遍历类别
                    allWordsInClass = filemap.get(entries.getKey());
                    for (Map.Entry<String, Integer> entry : entries.getValue().entrySet()) { // 遍历类别中的单词词频求概率
                        double p = (entry.getValue() + 1.0) / (allWordsInClass + tot);
    
                        className.set(entries.getKey() + "	" + entry.getKey());
                        number.set(p);
                        LOG.info("------>p = " + p);
                        mos.write(new Text(entry.getKey()), number, entries.getKey() /*+ "\" + entries.getKey()*/);//最后一个参数是为了生成文件夹对应的文件
    
    //                    context.write(className, number);
                    }
                }
    
            }
    
            //最后计算类别中不存在单词的概率,每个类别都是一个常数
            protected void cleanup(Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
                    throws IOException, InterruptedException {
                
                Configuration conf = context.getConfiguration();
                int tot = Integer.parseInt(conf.get("TOTALWORDS"));
                for (Map.Entry<String, Integer> entry : filemap.entrySet()) { // 遍历类别
                    
                    double notFind =  (1.0) / (entry.getValue() + tot);
                    number.set(notFind);
                    mos.write(new Text(entry.getKey()), number, "_notFound" + "\" +"notFound");
                
                }
                mos.close();
            }
    
            protected void setup(Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                Configuration conf = context.getConfiguration();
                mos = new MultipleOutputs<Text, DoubleWritable>(context);
                String filePath = conf.get("wordsInClassPath");
                FileSystem fs = FileSystem.get(URI.create(filePath), conf);
                FSDataInputStream inputStream = fs.open(new Path(filePath));
                BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
                String strLine = null;
                while ((strLine = buffer.readLine()) != null) {
                    String[] temp = strLine.split("	");
                    filemap.put(temp[0], Integer.parseInt(temp[1]));
                }
            }
    
        }
    
        // Reducer
        static class WordsOfClassCountReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
    
            // result 表示每个文件里面单词个数
            DoubleWritable result = new DoubleWritable();
            // Configuration conf = new Configuration();
            // int total = conf.getInt("TOTALWORDS", 1);
    
            protected void reduce(Text key, Iterable<DoubleWritable> values,
                    Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
                            throws IOException, InterruptedException {
    
                double sum = 0L;
                for (DoubleWritable value : values) {
                    sum += value.get();
                }
                result.set(sum);
    
                context.write(key, result);
            }
    
        }
    
    }
    View Code

    两个小工具

    package org.wordCount;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class MyUtils {
    
        // 获取文件夹下面二级文件夹路径的方法
            static List<Path> getSecondDir(Configuration conf, String folder) throws Exception {
    //            System.out.println("----getSencondDir----" + folder);
                Path path = new Path(folder);
                
                FileSystem fs = path.getFileSystem(conf);
                FileStatus[] stats = fs.listStatus(path);
                System.out.println("stats.length = " + stats.length);
                List<Path> folderPath = new ArrayList<Path>();
                for (FileStatus stat : stats) {
                    if (stat.isDir()) {
    //                    System.out.println("----stat----" + stat.getPath());
                        if (fs.listStatus(stat.getPath()).length > 10) { // 筛选出文件数大于10个的类别作为
                                                                            // 输入路径
                            folderPath.add(stat.getPath());
                        }
                    }
                }
    //            System.out.println("----folderPath----" + folderPath.size());
                return folderPath;
            }
    
            // 判断一个字符串是否含有数字
            static boolean hasDigit(String content) {
    
                boolean flag = false;
    
                Pattern p = Pattern.compile(".*\d+.*");
    
                Matcher m = p.matcher(content);
    
                if (m.matches())
    
                    flag = true;
    
                return flag;
    
            }
        
    }
    View Code

    计算概率

    package org.wordCount;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    
    public class Probability {
    
        private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
        public static int total = 0;
        private static MultipleOutputs<Text, DoubleWritable> mos;
    
        // Client
        public static void run(Configuration conf) throws Exception {
    
    
            // 读取单词总数,设置到congfiguration中
            String totalWordsPath = conf.get("totalWordsPath");
    //        String wordsInClassPath = conf.get("wordsInClassPath");
    
    
    
    
            // 先读取单词总类别数
            FileSystem fs = FileSystem.get(URI.create(totalWordsPath), conf);
            FSDataInputStream inputStream = fs.open(new Path(totalWordsPath));
            BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
            String strLine = buffer.readLine();
            String[] temp = strLine.split(":");
            if (temp.length == 2) {
                // temp[0] = TOTALWORDS
                conf.set(temp[0], temp[1]);// 设置两个String
            }
    
            total = Integer.parseInt(conf.get("TOTALWORDS"));
            LOG.info("------>total = " + total);
    
            System.out.println("total ==== " + total);
            
            
            Job job = new Job(conf, "file count");
    
            job.setJarByClass(Probability.class);
    
            job.setMapperClass(WordsOfClassCountMapper.class);
            job.setReducerClass(WordsOfClassCountReducer.class);
    
            String input = conf.get("wordsOutput");
            String output = conf.get("freqOutput");
    
            FileInputFormat.addInputPath(job, new Path(input));
            FileOutputFormat.setOutputPath(job, new Path(output));
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DoubleWritable.class);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    
        }
    
        // Mapper
        static class WordsOfClassCountMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
    
            private static DoubleWritable number = new DoubleWritable();
            private static Text className = new Text();
    
            // 保存类别中单词总数
            private static Map<String, Integer> filemap = new HashMap<String, Integer>();
    
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
                            throws IOException, InterruptedException {
                Configuration conf = context.getConfiguration();
                int tot = Integer.parseInt(conf.get("TOTALWORDS"));
    
                System.out.println("total = " + total);
                System.out.println("tot = " + tot);
    
                // 输入的格式如下:
                // ALB weekend 1
                // ALB weeks 3
                Map<String, Map<String, Integer>> baseMap = new HashMap<String, Map<String, Integer>>(); // 保存基础数据
                // Map<String, Map<String, Double>> priorMap = new HashMap<String,
                // Map<String, Double>>(); // 保存每个单词出现的概率
    
                String[] temp = value.toString().split("	");
                // 先将数据存到baseMap中
                if (temp.length == 3) {
                    // 文件夹名类别名
                    if (baseMap.containsKey(temp[0])) {
                        baseMap.get(temp[0]).put(temp[1], Integer.parseInt(temp[2]));
                    } else {
                        Map<String, Integer> oneMap = new HashMap<String, Integer>();
                        oneMap.put(temp[1], Integer.parseInt(temp[2]));
                        baseMap.put(temp[0], oneMap);
                    }
    
                } // 读取数据完毕,全部保存在baseMap中
    
                int allWordsInClass = 0;
                
    
                for (Map.Entry<String, Map<String, Integer>> entries : baseMap.entrySet()) { // 遍历类别
                    allWordsInClass = filemap.get(entries.getKey());
                    for (Map.Entry<String, Integer> entry : entries.getValue().entrySet()) { // 遍历类别中的单词词频求概率
                        double p = (entry.getValue() + 1.0) / (allWordsInClass + tot);
    
                        className.set(entries.getKey() + "	" + entry.getKey());
                        number.set(p);
                        LOG.info("------>p = " + p);
                        mos.write(new Text(entry.getKey()), number, entries.getKey() /*+ "\" + entries.getKey()*/);//最后一个参数是为了生成文件夹对应的文件
    
    //                    context.write(className, number);
                    }
                }
    
            }
    
            //最后计算类别中不存在单词的概率,每个类别都是一个常数
            protected void cleanup(Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
                    throws IOException, InterruptedException {
                
                Configuration conf = context.getConfiguration();
                int tot = Integer.parseInt(conf.get("TOTALWORDS"));
                for (Map.Entry<String, Integer> entry : filemap.entrySet()) { // 遍历类别
                    
                    double notFind =  (1.0) / (entry.getValue() + tot);
                    number.set(notFind);
                    mos.write(new Text(entry.getKey()), number, "_notFound" + "\" +"notFound");
                
                }
                mos.close();
            }
    
            protected void setup(Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                Configuration conf = context.getConfiguration();
                mos = new MultipleOutputs<Text, DoubleWritable>(context);
                String filePath = conf.get("wordsInClassPath");
                FileSystem fs = FileSystem.get(URI.create(filePath), conf);
                FSDataInputStream inputStream = fs.open(new Path(filePath));
                BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
                String strLine = null;
                while ((strLine = buffer.readLine()) != null) {
                    String[] temp = strLine.split("	");
                    filemap.put(temp[0], Integer.parseInt(temp[1]));
                }
            }
    
        }
    
        // Reducer
        static class WordsOfClassCountReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
    
            // result 表示每个文件里面单词个数
            DoubleWritable result = new DoubleWritable();
            // Configuration conf = new Configuration();
            // int total = conf.getInt("TOTALWORDS", 1);
    
            protected void reduce(Text key, Iterable<DoubleWritable> values,
                    Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
                            throws IOException, InterruptedException {
    
                double sum = 0L;
                for (DoubleWritable value : values) {
                    sum += value.get();
                }
                result.set(sum);
    
                context.write(key, result);
            }
    
        }
    
    }
    View Code

    预测

    package org.wordCount;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
    
    public class Probability {
    
        private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
        public static int total = 0;
        private static MultipleOutputs<Text, DoubleWritable> mos;
    
        // Client
        public static void run(Configuration conf) throws Exception {
    
    
            // 读取单词总数,设置到congfiguration中
            String totalWordsPath = conf.get("totalWordsPath");
    //        String wordsInClassPath = conf.get("wordsInClassPath");
    
    
    
    
            // 先读取单词总类别数
            FileSystem fs = FileSystem.get(URI.create(totalWordsPath), conf);
            FSDataInputStream inputStream = fs.open(new Path(totalWordsPath));
            BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
            String strLine = buffer.readLine();
            String[] temp = strLine.split(":");
            if (temp.length == 2) {
                // temp[0] = TOTALWORDS
                conf.set(temp[0], temp[1]);// 设置两个String
            }
    
            total = Integer.parseInt(conf.get("TOTALWORDS"));
            LOG.info("------>total = " + total);
    
            System.out.println("total ==== " + total);
            
            
            Job job = new Job(conf, "file count");
    
            job.setJarByClass(Probability.class);
    
            job.setMapperClass(WordsOfClassCountMapper.class);
            job.setReducerClass(WordsOfClassCountReducer.class);
    
            String input = conf.get("wordsOutput");
            String output = conf.get("freqOutput");
    
            FileInputFormat.addInputPath(job, new Path(input));
            FileOutputFormat.setOutputPath(job, new Path(output));
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DoubleWritable.class);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    
        }
    
        // Mapper
        static class WordsOfClassCountMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
    
            private static DoubleWritable number = new DoubleWritable();
            private static Text className = new Text();
    
            // 保存类别中单词总数
            private static Map<String, Integer> filemap = new HashMap<String, Integer>();
    
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
                            throws IOException, InterruptedException {
                Configuration conf = context.getConfiguration();
                int tot = Integer.parseInt(conf.get("TOTALWORDS"));
    
                System.out.println("total = " + total);
                System.out.println("tot = " + tot);
    
                // 输入的格式如下:
                // ALB weekend 1
                // ALB weeks 3
                Map<String, Map<String, Integer>> baseMap = new HashMap<String, Map<String, Integer>>(); // 保存基础数据
                // Map<String, Map<String, Double>> priorMap = new HashMap<String,
                // Map<String, Double>>(); // 保存每个单词出现的概率
    
                String[] temp = value.toString().split("	");
                // 先将数据存到baseMap中
                if (temp.length == 3) {
                    // 文件夹名类别名
                    if (baseMap.containsKey(temp[0])) {
                        baseMap.get(temp[0]).put(temp[1], Integer.parseInt(temp[2]));
                    } else {
                        Map<String, Integer> oneMap = new HashMap<String, Integer>();
                        oneMap.put(temp[1], Integer.parseInt(temp[2]));
                        baseMap.put(temp[0], oneMap);
                    }
    
                } // 读取数据完毕,全部保存在baseMap中
    
                int allWordsInClass = 0;
                
    
                for (Map.Entry<String, Map<String, Integer>> entries : baseMap.entrySet()) { // 遍历类别
                    allWordsInClass = filemap.get(entries.getKey());
                    for (Map.Entry<String, Integer> entry : entries.getValue().entrySet()) { // 遍历类别中的单词词频求概率
                        double p = (entry.getValue() + 1.0) / (allWordsInClass + tot);
    
                        className.set(entries.getKey() + "	" + entry.getKey());
                        number.set(p);
                        LOG.info("------>p = " + p);
                        mos.write(new Text(entry.getKey()), number, entries.getKey() /*+ "\" + entries.getKey()*/);//最后一个参数是为了生成文件夹对应的文件
    
    //                    context.write(className, number);
                    }
                }
    
            }
    
            //最后计算类别中不存在单词的概率,每个类别都是一个常数
            protected void cleanup(Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
                    throws IOException, InterruptedException {
                
                Configuration conf = context.getConfiguration();
                int tot = Integer.parseInt(conf.get("TOTALWORDS"));
                for (Map.Entry<String, Integer> entry : filemap.entrySet()) { // 遍历类别
                    
                    double notFind =  (1.0) / (entry.getValue() + tot);
                    number.set(notFind);
                    mos.write(new Text(entry.getKey()), number, "_notFound" + "\" +"notFound");
                
                }
                mos.close();
            }
    
            protected void setup(Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stub
                Configuration conf = context.getConfiguration();
                mos = new MultipleOutputs<Text, DoubleWritable>(context);
                String filePath = conf.get("wordsInClassPath");
                FileSystem fs = FileSystem.get(URI.create(filePath), conf);
                FSDataInputStream inputStream = fs.open(new Path(filePath));
                BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
                String strLine = null;
                while ((strLine = buffer.readLine()) != null) {
                    String[] temp = strLine.split("	");
                    filemap.put(temp[0], Integer.parseInt(temp[1]));
                }
            }
    
        }
    
        // Reducer
        static class WordsOfClassCountReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
    
            // result 表示每个文件里面单词个数
            DoubleWritable result = new DoubleWritable();
            // Configuration conf = new Configuration();
            // int total = conf.getInt("TOTALWORDS", 1);
    
            protected void reduce(Text key, Iterable<DoubleWritable> values,
                    Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
                            throws IOException, InterruptedException {
    
                double sum = 0L;
                for (DoubleWritable value : values) {
                    sum += value.get();
                }
                result.set(sum);
    
                context.write(key, result);
            }
    
        }
    
    }
    View Code

    预测的inputformat

    package org.wordCount.predict;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    public class WholeFileInputFormat extends FileInputFormat<LongWritable, Text>{
    
        @Override
        public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
                throws IOException, InterruptedException {
            WholeFileRecordReader reader = new WholeFileRecordReader();
            reader.initialize(split, context);
            return reader;
        }
    
        @Override
        protected boolean isSplitable(JobContext context, Path filename) {
            
            return false;
        }
    
    }
    
    class WholeFileRecordReader extends RecordReader<LongWritable, Text>{
        
        private FileSplit fileSplit;    //保存输入的分片,他将被转换成一条<key, value>记录
        private Configuration conf;        //配置对象
        private Text value = new Text();//
        private LongWritable key = new LongWritable();    //key对象,为空
        private boolean processed = false;    //布尔变量记录记录是否被处理过
        
        
        
    
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            this.fileSplit = (FileSplit)split;        //将输入分片强制转换成fileSplit
            this.conf = context.getConfiguration();
            
        }
    
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if(!processed){
                byte[] contents = new byte[(int)fileSplit.getLength()];
                Path file = fileSplit.getPath();
                FileSystem fs = file.getFileSystem(conf);
                FSDataInputStream in = null;
                try{
                    in = fs.open(file);
                    IOUtils.readFully(in, contents, 0, contents.length);
                    value.set(contents, 0, contents.length);    
                }finally{
                    IOUtils.closeStream(in);
                }
                processed = true;
                return true;
            }
            return false;
        }
    
        @Override
        public LongWritable getCurrentKey() throws IOException, InterruptedException {
            
            return key;
        }
    
        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            return value;
        }
    
        @Override
        public float getProgress() throws IOException, InterruptedException {
            
            return processed ? 1.0f : 0.0f;
        }
    
        @Override
        public void close() throws IOException {
            // TODO Auto-generated method stub
            
        }
        
        
        
        
    }
    View Code
  • 相关阅读:
    5000 端口 转发
    程序的运行环境=内存+运行库+系统调用
    日志异步落库
    malloc分配的空间是连续的吗?
    PE/ELF文件里面存的是什么呢?
    [Python]编码声明:是coding:utf-8还是coding=utf-8呢
    基于可执行代码的缓冲区溢出检测模型
    SU Demos-06Selecting Traces
    SU Demos-05Sorting Traces-03susorty
    SU Demos-05Sorting Traces-02Demos
  • 原文地址:https://www.cnblogs.com/luolizhi/p/4964390.html
Copyright © 2011-2022 走看看