zoukankan      html  css  js  c++  java
  • Naive Bayes在mapreduce上的实现

    Naive Bayes是比较常用的分类器,因为思想比较简单。之所以说是naive,是因为他假设用于分类的特征在类确定的条件下是条件独立的,这个假设使得分类变得很简单,但会损失一定的精度。
    具体推导可以看《统计学习方法》
    经过推导我们可知y=argMaxP(Y=ck)*P(X=x|Y=ck)。那么我们需要求先验概率也就是P(Y=ck)和求条件概率p(X=x|Y=ck).
    具体的例子以:http://blog.163.com/jiayouweijiewj@126/blog/static/1712321772010102802635243/来说明。
    我这里一共用了4个mapreduce,因为采用了多项式模型,先验概率P(c)= 类c下单词总数/整个训练样本的单词总数。类条件概率P(tk|c)=(类c下单词tk在各个文档中出现过的次数之和+1)/(类c下单词总数+|V|)(|V|是单词种类数)。输入是:
    1:Chinese Beijing Chinese
    1:Chinese Chinese Shanghai
    1:Chinese Macao
    0:Tokyo Japan Chinese
    1 一个mapreduce是用于求在各个类别下的单词数,这个是为了后面求先验概率用的。
    输出为:
    0              3
    1              8
     
    2 一个mapreduce用于求条件概率,输出为:
    0:Chinese               0.2222222222222222
    0:Japan   0.2222222222222222
    0:Tokyo 0.2222222222222222
    1:Beijing 0.14285714285714285
    1:Chinese               0.42857142857142855
    1:Macao 0.14285714285714285
    1:Shanghai            0.14285714285714285
     
    3 一个mapreduce用于计算单词种类数,输出为:
    num is 6
    4 最后一个mapreduce是用于预测的。
     
    下面说下各个mapreduce的实现:
    1 求各个类别下的单词数,这个比较简单,就是以类别为key,然后进行单词统计就好。
     附上代码:
     1 package hadoop.MachineLearning.Bayes.Pro;
     2 
     3 
     4 
     5 import org.apache.hadoop.conf.Configuration;
     6 import org.apache.hadoop.fs.Path;
     7 import org.apache.hadoop.io.IntWritable;
     8 import org.apache.hadoop.io.Text;
     9 import org.apache.hadoop.mapreduce.Job;
    10 import org.apache.hadoop.mapreduce.Mapper;
    11 import org.apache.hadoop.mapreduce.Reducer;
    12 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    14 
    15 public class PriorProbability {//用于求各个类别下的单词数,为后面求先验概率
    16 
    17     public static void main(String[] args) throws Exception {
    18         Configuration conf = new Configuration();
    19         String input="hdfs://10.107.8.110:9000/Bayes/Bayes_input/";
    20         String output="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Pro/";
    21         Job job = Job.getInstance(conf, "ProirProbability");
    22         job.setJarByClass(hadoop.MachineLearning.Bayes.Pro.PriorProbability.class);
    23         // TODO: specify a mapper
    24         job.setMapperClass(MyMapper.class);
    25         //job.setMapInputKeyClass(LongWritable.class);
    26         // TODO: specify a reducer
    27         job.setMapOutputKeyClass(Text.class);
    28         job.setMapOutputValueClass(Text.class);
    29         job.setReducerClass(MyReducer.class);
    30 
    31         // TODO: specify output types
    32         job.setOutputKeyClass(Text.class);
    33         job.setOutputValueClass(IntWritable.class);
    34 
    35         // TODO: specify input and output DIRECTORIES (not files)
    36         FileInputFormat.setInputPaths(job, new Path(input));
    37         FileOutputFormat.setOutputPath(job, new Path(output));
    38 
    39         if (!job.waitForCompletion(true))
    40             return;
    41     }
    42 
    43 }
    44 
    45 
    46 package hadoop.MachineLearning.Bayes.Pro;
    47 
    48 import java.io.IOException;
    49 
    50 import org.apache.hadoop.io.LongWritable;
    51 import org.apache.hadoop.io.Text;
    52 import org.apache.hadoop.mapreduce.Mapper;
    53 import org.apache.hadoop.mapreduce.Mapper.Context;
    54 
    55 public class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
    56 
    57     public void map(LongWritable ikey, Text ivalue, Context context)
    58             throws IOException, InterruptedException {
    59         String[] line=ivalue.toString().split(":| ");
    60         int size=line.length-1;
    61         context.write(new Text(line[0]),new Text(String.valueOf(size)));
    62     }
    63 
    64 }
    65 
    66 
    67 package hadoop.MachineLearning.Bayes.Pro;
    68 
    69 import java.io.IOException;
    70 
    71 import org.apache.hadoop.io.IntWritable;
    72 import org.apache.hadoop.io.Text;
    73 import org.apache.hadoop.mapreduce.Reducer;
    74 import org.apache.hadoop.mapreduce.Reducer.Context;
    75 
    76 public class MyReducer extends Reducer<Text, Text, Text, IntWritable> {
    77 
    78     public void reduce(Text _key, Iterable<Text> values, Context context)
    79             throws IOException, InterruptedException {
    80         // process values
    81         int sum=0;
    82         for (Text val : values) {
    83             sum+=Integer.parseInt(val.toString());
    84         }
    85         context.write(_key,new IntWritable(sum));
    86     }
    87 
    88 }
     
     
    2 求文档中的单词种类数,自己实现的方法不太好,思路是,对每一行的输入都以相同的key输出,然后在combiner中先利用set求得该节点上的不重复的单词,接着在reduce中再利用set,将所有单词求种类数。感觉好一点的话是先按照单词进行规约,最后再利用一个mapreduce对单词种类数进行统计。但是考虑到刚学会mapreduce不久还不会写链式,而且一个bayes已经写了4个mapreduce就不考虑再复杂化了。
    package hadoop.MachineLearning.Bayes.Count;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class Count {//计算文档中的单词种类数目
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "Count");
            String input="hdfs://10.107.8.110:9000/Bayes/Bayes_input";
            String output="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Count";
            job.setJarByClass(hadoop.MachineLearning.Bayes.Count.Count.class);
            // TODO: specify a mapper
            job.setMapperClass(MyMapper.class);
            // TODO: specify a reducer
            job.setCombinerClass(MyCombiner.class);
            job.setReducerClass(MyReducer.class);
            
            // TODO: specify output types
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            // TODO: specify input and output DIRECTORIES (not files)
            FileInputFormat.setInputPaths(job, new Path(input));
            FileOutputFormat.setOutputPath(job, new Path(output));
    
            if (!job.waitForCompletion(true))
                return;
        }
    
    }
    
    
    package hadoop.MachineLearning.Bayes.Count;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
    
        public void map(LongWritable ikey, Text ivalue, Context context)
                throws IOException, InterruptedException {
            String[] line=ivalue.toString().split(":| ");
            String key="1";
            System.out.println("   ");
            System.out.println("   ");
            System.out.println("   ");
            for(int i=1;i<line.length;i++){
                
                System.out.println(line[i]);
                context.write(new Text(key),new Text(line[i]));//以相同的key进行输出,使得能最后输出到一个reduce中
            }
        }
    
    }
    
    
    
    package hadoop.MachineLearning.Bayes.Count;
    
    import java.io.IOException;
    import java.util.HashSet;
    import java.util.Iterator;
    import java.util.Set;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MyCombiner extends Reducer<Text, Text, Text, Text> {//先在本地的节点上利用set删去重复的单词
    
        public void reduce(Text _key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            // process values
            Set set=new HashSet();
            for (Text val : values) {
                set.add(val.toString());
            }
            for(Iterator it=set.iterator();it.hasNext();){
                context.write(new Text("1"),new Text(it.next().toString()));
            }
        }
    
    }
    
    package hadoop.MachineLearning.Bayes.Count;
    
    import java.io.IOException;
    import java.util.HashSet;
    import java.util.Set;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MyReducer extends Reducer<Text, Text, Text, Text> {//通过combiner后,再利用set对单词进行去重,最后得到种类数
    
        public void reduce(Text _key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            // process values
            Set set=new HashSet();
            for (Text val : values) {
                set.add(val.toString());
            }
            context.write(new Text("num is "),new Text(String.valueOf(set.size())));
        }
    
    }
     
     
    3 求条件概率.这里需要用到该类别下该单词的数目sum,该类别下的单词总数,文档中的单词种类数。这些都可以在之前的输出文件中获得,我这里都用map去接受这些数据。由于有些单词没有出现在该类别下,例如P(Japan | yes)=P(Tokyo | yes),如果将他们当作0处理,那么导致该条件概率会是0,所以这里用了平滑的方法可以参考上述的链接。这里有个细节,就是条件概率生成的会比较多,需要一种高效的存储和查找方式,我这里因为水平不够,就直接用map来存放了,如果对于大的数据,这个会很低效。
     
    package hadoop.MachineLearning.Bayes.Cond;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class CondiPro {//用于求条件概率
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String input="hdfs://10.107.8.110:9000/Bayes/Bayes_input";
            String output="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Con";
            String proPath="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Pro";//这是之前求各个类别下单词数目的输出
            String countPath="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Count";//这是之前求的单词种类数
            conf.set("propath",proPath);
            conf.set("countPath",countPath);
            Job job = Job.getInstance(conf, "ConditionPro");
            
            job.setJarByClass(hadoop.MachineLearning.Bayes.Cond.CondiPro.class);
            // TODO: specify a mapper
            job.setMapperClass(MyMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            // TODO: specify a reducer
            job.setReducerClass(MyReducer.class);
    
            // TODO: specify output types
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            // TODO: specify input and output DIRECTORIES (not files)
            FileInputFormat.setInputPaths(job, new Path(input));
            FileOutputFormat.setOutputPath(job, new Path(output));
    
            if (!job.waitForCompletion(true))
                return;
        }
    
    }
    
    package hadoop.MachineLearning.Bayes.Cond;
    
    import java.io.IOException;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
        
        public void map(LongWritable ikey, Text ivalue, Context context)
                throws IOException, InterruptedException {
            String[] line=ivalue.toString().split(":| ");
            for(int i=1;i<line.length;i++){
                String key=line[0]+":"+line[i];
                context.write(new Text(key),new IntWritable(1));
            }
        }
    
    }
    
    package hadoop.MachineLearning.Bayes.Cond;
    
    import java.io.IOException;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MyReducer extends Reducer<Text, IntWritable, Text, DoubleWritable> {
        public Map<String,Integer> map;
        public int count=0;
        public void setup(Context context) throws IOException{
            Configuration conf=context.getConfiguration();
            
            String proPath=conf.get("propath");
            String countPath=conf.get("countPath");//
            map=Utils.getMapFormHDFS(proPath);//获得各个类别下的单词数
            count=Utils.getCountFromHDFS(countPath);//获得单词种类数
        }
        public void reduce(Text _key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            // process values
            int sum=0;
            for (IntWritable val : values) {
                sum+=val.get();
            }
            int type=Integer.parseInt(_key.toString().split(":")[0]);
            double probability=0.0;
            for(Map.Entry<String,Integer> entry:map.entrySet()){
                if(type==Integer.parseInt(entry.getKey())){
                    probability=(sum+1)*1.0/(entry.getValue()+count);//条件概率的计算
                }
            }
            context.write(_key,new DoubleWritable(probability));
        }
    
    }
    
    package hadoop.MachineLearning.Bayes.Cond;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.util.LineReader;
    
    public class Utils {
    
        /**
         * @param args
         * @throws IOException 
         */
        
        public static Map<String,Integer> getMapFormHDFS(String input) throws IOException{
            Configuration conf=new Configuration();
            Path path=new Path(input);
            FileSystem fs=path.getFileSystem(conf);
            
            FileStatus[] stats=fs.listStatus(path);
            Map<String,Integer> map=new HashMap();
            for(int i=0;i<stats.length;i++){
                if(stats[i].isFile()){
                    FSDataInputStream infs=fs.open(stats[i].getPath());
                    LineReader reader=new LineReader(infs,conf);
                    Text line=new Text();
                    while(reader.readLine(line)>0){
                        String[] temp=line.toString().split("    ");
                        //System.out.println(temp.length);
                        map.put(temp[0],Integer.parseInt(temp[1]));
                    }
                    reader.close();
                }
            }
            
            return map;
        
        }
        
        public static Map<String,Double> getMapFormHDFS(String input,boolean j) throws IOException{
            Configuration conf=new Configuration();
            Path path=new Path(input);
            FileSystem fs=path.getFileSystem(conf);
            
            FileStatus[] stats=fs.listStatus(path);
            Map<String,Double> map=new HashMap();
            for(int i=0;i<stats.length;i++){
                if(stats[i].isFile()){
                    FSDataInputStream infs=fs.open(stats[i].getPath());
                    LineReader reader=new LineReader(infs,conf);
                    Text line=new Text();
                    while(reader.readLine(line)>0){
                        String[] temp=line.toString().split("    ");
                        //System.out.println(temp.length);
                        map.put(temp[0],Double.parseDouble(temp[1]));
                    }
                    reader.close();
                }
            }
            
            return map;
        
        }
        
        
        public static int getCountFromHDFS(String input) throws IOException{
            Configuration conf=new Configuration();
            Path path=new Path(input);
            FileSystem fs=path.getFileSystem(conf);
            
            FileStatus[] stats=fs.listStatus(path);
            
            int count=0;
            for(int i=0;i<stats.length;i++){
                if(stats[i].isFile()){
                    FSDataInputStream infs=fs.open(stats[i].getPath());
                    LineReader reader=new LineReader(infs,conf);
                    Text line=new Text();
                    while(reader.readLine(line)>0){
                        String[] temp=line.toString().split("    ");
                        //System.out.println(temp.length);
                        count=Integer.parseInt(temp[1]);
                    }
                    reader.close();
                }
            }
            return count;
        }
        
        public static void main(String[] args) throws IOException {
            // TODO Auto-generated method stub
            String proPath="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Pro";
            String countPath="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Count/";
            Map<String,Integer> map=Utils.getMapFormHDFS(proPath);
            for(Map.Entry<String,Integer> entry:map.entrySet()){
                System.out.println(entry.getKey()+"->"+entry.getValue());
            }
            
            int count=Utils.getCountFromHDFS(countPath);
            System.out.println("count is "+count);
        }
    
    }
     
    4 预测,例如输入Chinese, Chinese, Chinese, Tokyo, Japan。那就分别对每个单词以0,1的类别进行输出,输出为type:words,接着就是在条件概率中查找,进行简单的累乘即可。
     
    package hadoop.MachineLearning.Bayes.Predict;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class Predict {
    
        public static void main(String[] args) throws Exception {//预测
            Configuration conf = new Configuration();
            String input="hdfs://10.107.8.110:9000/Bayes/Predict_input";
            String output="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Predict";
            String condiProPath="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Con";
            String proPath="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Pro";
            String countPath="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Count";
            conf.set("condiProPath",condiProPath);
            conf.set("proPath",proPath);
            conf.set("countPath",countPath);
            Job job = Job.getInstance(conf, "Predict");
            job.setJarByClass(hadoop.MachineLearning.Bayes.Predict.Predict.class);
            // TODO: specify a mapper
            job.setMapperClass(MyMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            // TODO: specify a reducer
            job.setReducerClass(MyReducer.class);
    
            // TODO: specify output types
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DoubleWritable.class);
    
            // TODO: specify input and output DIRECTORIES (not files)
            FileInputFormat.setInputPaths(job, new Path(input));
            FileOutputFormat.setOutputPath(job, new Path(output));
    
            if (!job.waitForCompletion(true))
                return;
        }
    
    }
    
    package hadoop.MachineLearning.Bayes.Predict;
    
    
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
        
        public Map<String,Integer> map=new HashMap();
        
        public void setup(Context context) throws IOException{
            Configuration conf=context.getConfiguration();
            String proPath=conf.get("proPath");
            map=Utils.getMapFormHDFS(proPath);
        }
        
        public void map(LongWritable ikey, Text ivalue, Context context)
                throws IOException, InterruptedException {
            for(Map.Entry<String,Integer> entry:map.entrySet()){
                context.write(new Text(entry.getKey()),ivalue);//对每一行数据,打上所有类别,方便后续的求条件概率
            }
        }
    
    }
    
    package hadoop.MachineLearning.Bayes.Predict;
    
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.DoubleWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class MyReducer extends Reducer<Text, Text, Text, DoubleWritable> {
        
        public Map<String,Double> mapDouble=new HashMap();//存放条件概率
        
        public Map<String,Integer> mapInteger=new HashMap();//存放各个类别下的单词数
        
        public Map<String,Double> noFind=new HashMap();//用于那些单词没有出现在某个类别中的
        
        public Map<String,Double> prePro=new HashMap();//求的后的先验概率
        
        public void setup(Context context) throws IOException{
            Configuration conf=context.getConfiguration();
            
        
            
            String condiProPath=conf.get("condiProPath");
            String proPath=conf.get("proPath");
            String countPath=conf.get("countPath");
            mapDouble=Utils.getMapFormHDFS(condiProPath,true);
            mapInteger=Utils.getMapFormHDFS(proPath);
            int count=Utils.getCountFromHDFS(countPath);
            for(Map.Entry<String,Integer> entry:mapInteger.entrySet()){
                double pro=0.0;
                noFind.put(entry.getKey(),(1.0/(count+entry.getValue())));
            }
            int sum=0;
            for(Map.Entry<String,Integer> entry:mapInteger.entrySet()){
                sum+=entry.getValue();
            }
            
            for(Map.Entry<String,Integer> entry:mapInteger.entrySet()){
                prePro.put(entry.getKey(),(entry.getValue()*1.0/sum));
            }
            
        }
        
        public void reduce(Text _key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            // process values
            String type=_key.toString();
            double pro=1.0;
            for (Text val : values) {
                String[] words=val.toString().split(" ");
                for(int i=0;i<words.length;i++){
                    String condi=type+":"+words[i];
                    if(mapDouble.get(condi)!=null){//如果该单词出现在该类别中,说明有条件概率
                        pro=pro*mapDouble.get(condi);
                    }else{//如果该单词不在该类别中,就采用默认的条件概率
                        pro=pro*noFind.get(type);
                    }
                }
            }
            pro=pro*prePro.get(type);
            context.write(new Text(type),new DoubleWritable(pro));
        }
    
    }
    
    
    package hadoop.MachineLearning.Bayes.Predict;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.util.LineReader;
    
    public class Utils {
    
        /**
         * @param args
         * @throws IOException 
         */
        
        public static Map<String,Integer> getMapFormHDFS(String input) throws IOException{
            Configuration conf=new Configuration();
            Path path=new Path(input);
            FileSystem fs=path.getFileSystem(conf);
            
            FileStatus[] stats=fs.listStatus(path);
            Map<String,Integer> map=new HashMap();
            for(int i=0;i<stats.length;i++){
                if(stats[i].isFile()){
                    FSDataInputStream infs=fs.open(stats[i].getPath());
                    LineReader reader=new LineReader(infs,conf);
                    Text line=new Text();
                    while(reader.readLine(line)>0){
                        String[] temp=line.toString().split("    ");
                        //System.out.println(temp.length);
                        map.put(temp[0],Integer.parseInt(temp[1]));
                    }
                    reader.close();
                }
            }
            
            return map;
        
        }
        
        public static Map<String,Double> getMapFormHDFS(String input,boolean j) throws IOException{
            Configuration conf=new Configuration();
            Path path=new Path(input);
            FileSystem fs=path.getFileSystem(conf);
            
            FileStatus[] stats=fs.listStatus(path);
            Map<String,Double> map=new HashMap();
            for(int i=0;i<stats.length;i++){
                if(stats[i].isFile()){
                    FSDataInputStream infs=fs.open(stats[i].getPath());
                    LineReader reader=new LineReader(infs,conf);
                    Text line=new Text();
                    while(reader.readLine(line)>0){
                        String[] temp=line.toString().split("    ");
                        //System.out.println(temp.length);
                        map.put(temp[0],Double.parseDouble(temp[1]));
                    }
                    reader.close();
                }
            }
            
            return map;
        
        }
        
        
        public static int getCountFromHDFS(String input) throws IOException{
            Configuration conf=new Configuration();
            Path path=new Path(input);
            FileSystem fs=path.getFileSystem(conf);
            
            FileStatus[] stats=fs.listStatus(path);
            
            int count=0;
            for(int i=0;i<stats.length;i++){
                if(stats[i].isFile()){
                    FSDataInputStream infs=fs.open(stats[i].getPath());
                    LineReader reader=new LineReader(infs,conf);
                    Text line=new Text();
                    while(reader.readLine(line)>0){
                        String[] temp=line.toString().split("    ");
                        //System.out.println(temp.length);
                        count=Integer.parseInt(temp[1]);
                    }
                    reader.close();
                }
            }
            return count;
        }
        
        public static void main(String[] args) throws IOException {
            // TODO Auto-generated method stub
            String proPath="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Pro";
            String countPath="hdfs://10.107.8.110:9000/Bayes/Bayes_output/Count/";
            Map<String,Integer> map=Utils.getMapFormHDFS(proPath);
            for(Map.Entry<String,Integer> entry:map.entrySet()){
                System.out.println(entry.getKey()+"->"+entry.getValue());
            }
            
            int count=Utils.getCountFromHDFS(countPath);
            System.out.println("count is "+count);
        }
    
    }
     
     
     
     
     
     
     
  • 相关阅读:
    晚上打死个老鼠
    今早服务器出现的问题
    打球
    出于对Atlas自带AutoCompleteBehavior的不满,自定义了一个支持模版的AutoCompleteBehavior
    PetShop4.0项目分解
    WebSnapr-生成你的网站缩略图
    Lost HTML Intellisense within ASP.NET AJAX Controls
    调整调出输入法的顺序
    儿童网址大全
    gridview列 数字、货币和日期 显示格式
  • 原文地址:https://www.cnblogs.com/sunrye/p/4553732.html
Copyright © 2011-2022 走看看