zoukankan      html  css  js  c++  java
  • Twenty Newsgroups Classification任务之二seq2sparse(2)

    接上篇,SequenceFileTokenizerMapper的输出文件在/home/mahout/mahout-work-mahout0/20news-vectors/tokenized-documents/part-m-00000文件即可查看,同时可以编写下面的代码来读取该文件(该代码是根据前面读出聚类中心点文件改编的),如下:

    package mahout.fansy.test.bayes.read;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Writable;
    import org.apache.mahout.common.StringTuple;
    import org.apache.mahout.common.iterator.sequencefile.PathFilters;
    import org.apache.mahout.common.iterator.sequencefile.PathType;
    import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterable;
    
    public class ReadFromTokenizedDocuments {
    
    	/**
    	 * @param args
    	 */
    	private static Configuration conf;
    	
    	public static void main(String[] args) {
    		conf=new Configuration();
    		conf.set("mapred.job.tracker", "ubuntu:9001");
    		String path="hdfs://ubuntu:9000/home/mahout/mahout-work-mahout0/20news-vectors/tokenized-documents/part-m-00000";
    		
    		getValue(path,conf);
    	}
    	
    	 /**
         * 把序列文件读入到一个变量中;
         * @param path 序列文件
         * @param conf  Configuration
         * @return  序列文件读取的变量
         */
        public static List<StringTuple> getValue(String path,Configuration conf){
        	Path hdfsPath=new Path(path);
        	List<StringTuple> list = new ArrayList<StringTuple>();
        	for (Writable value : new SequenceFileDirValueIterable<Writable>(hdfsPath, PathType.LIST,
        	        PathFilters.partFilter(), conf)) {
        	      Class<? extends Writable> valueClass = value.getClass();
        	      if (valueClass.equals(StringTuple.class)) {
        	    	  StringTuple st = (StringTuple) value;
        	          list.add(st);
        	      } else {
        	        throw new IllegalStateException("Bad value class: " + valueClass);
        	      }
        	    }
        	return list;
        }
    
    }
    

    通过上面的文件可以读取到第一个StringTuple的单词个数有1320个(去掉stop words的单词数);

    然后就又是一堆参数的设置,一直到267行,判断processIdf是否为非true,因为前面设置的是tfdif,所以这里进入else代码块,如下:

    if (!processIdf) {
            DictionaryVectorizer.createTermFrequencyVectors(tokenizedPath, outputDir, tfDirName, conf, minSupport, maxNGramSize,
              minLLRValue, norm, logNormalize, reduceTasks, chunkSize, sequentialAccessOutput, namedVectors);
          } else {
            DictionaryVectorizer.createTermFrequencyVectors(tokenizedPath, outputDir, tfDirName, conf, minSupport, maxNGramSize,
              minLLRValue, -1.0f, false, reduceTasks, chunkSize, sequentialAccessOutput, namedVectors);
          }

    这里直接调用DictionaryVectorizer的createTermFrequencyVectors方法,进入该方法(DictionaryVectorizer的145行),可以看到首先也是一些参数的设置,然后就到了startWordCounting方法了,进入这个方法可以看到这个是一个Job的基本设置,其Mapper、Combiner、Reducer分别为:TermCountMapper、TermCountCombiner、TermCountReducer,下面分别来看各个部分的作用(其实和最基本的wordcount很相似):

    TermCountMapper,首先贴代码:

    protected void map(Text key, StringTuple value, final Context context) throws IOException, InterruptedException {
        OpenObjectLongHashMap<String> wordCount = new OpenObjectLongHashMap<String>();
        for (String word : value.getEntries()) {
          if (wordCount.containsKey(word)) {
            wordCount.put(word, wordCount.get(word) + 1);
          } else {
            wordCount.put(word, 1);
          }
        }
        wordCount.forEachPair(new ObjectLongProcedure<String>() {
          @Override
          public boolean apply(String first, long second) {
            try {
              context.write(new Text(first), new LongWritable(second));
            } catch (IOException e) {
              context.getCounter("Exception", "Output IO Exception").increment(1);
            } catch (InterruptedException e) {
              context.getCounter("Exception", "Interrupted Exception").increment(1);
            }
            return true;
          }
        });

    该部分代码首先定义了一个Mahout开发人员定义的Map类,然后遍历value中的各个单词(比如第一个value中有1320个单词);当遇到map中没有的单词就把其加入map中,否则把map中该单词的数量加1更新原来的单词的数量,即for循环里面做的事情;然后就是forEachPair方法了,这里应该是复写了该方法?好像是直接新建了一个类然后把这个新建的类作为forEachPair的参数;直接看context.write吧,应该是把wordCount(这个变量含有每个单词和它的计数)中的各个单词和单词计数分别作为key和value输出;

    然后是TermCountCombiner和TermCountReducer,这两个代码一样的和当初学习Hadoop入门的第一个例子是一样的,这里就不多说了。查看log信息,可以看到reduce一共输出93563个单词。

    然后就到了createDictionaryChunks函数了,进入到DictionaryVectorizer的215行中的该方法:

     List<Path> chunkPaths = Lists.newArrayList();
        
        Configuration conf = new Configuration(baseConf);
        
        FileSystem fs = FileSystem.get(wordCountPath.toUri(), conf);
    
        long chunkSizeLimit = chunkSizeInMegabytes * 1024L * 1024L;
        int chunkIndex = 0;
        Path chunkPath = new Path(dictionaryPathBase, DICTIONARY_FILE + chunkIndex);
        chunkPaths.add(chunkPath);
        
        SequenceFile.Writer dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class);
    
        try {
          long currentChunkSize = 0;
          Path filesPattern = new Path(wordCountPath, OUTPUT_FILES_PATTERN);
          int i = 0;
          for (Pair<Writable,Writable> record
               : new SequenceFileDirIterable<Writable,Writable>(filesPattern, PathType.GLOB, null, null, true, conf)) {
            if (currentChunkSize > chunkSizeLimit) {
              Closeables.closeQuietly(dictWriter);
              chunkIndex++;
    
              chunkPath = new Path(dictionaryPathBase, DICTIONARY_FILE + chunkIndex);
              chunkPaths.add(chunkPath);
    
              dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class);
              currentChunkSize = 0;
            }
    
            Writable key = record.getFirst();
            int fieldSize = DICTIONARY_BYTE_OVERHEAD + key.toString().length() * 2 + Integer.SIZE / 8;
            currentChunkSize += fieldSize;
            dictWriter.append(key, new IntWritable(i++));
          }
          maxTermDimension[0] = i;
        } finally {
          Closeables.closeQuietly(dictWriter);
        }

    这里看到新建了一个Writer,然后遍历该文件的key和value,但是只读取key值,即单词,然后把这些单词进行编码,即第一个单词用0和它对应,第二个单词用1和它对应。

    上面代码使用的dictWriter查看变量并没有看到哪个属性是存储单词和对应id的,所以这里的写入文件的机制是append就写入?还是我没有找到正确的属性?待查。。。

    分享,快乐,成长


    转载请注明出处:http://blog.csdn.net/fansy1990 


  • 相关阅读:
    一线架构师实践指南阅读体会_需求之于架构
    Multi-modal Sentence Summarization with Modality Attention and Image Filtering 论文笔记
    开始用PyTorch
    【微软大法好】VS Tools for AI全攻略(4)——选择适合自己的虚拟机
    【零基础】【Fungus首个中文教程】10分钟快速构建Unity中的万能对话系统 / 叙事系统 / 剧情系统
    Unity使用脚本进行批量动态加载贴图
    【转】Python处理wave文件
    让linux远程主机在后台运行脚本
    【bug清除】新Surface Pro使用OneNote出现毛刺现象的解决方案
    【bug清除】Surface Pro系列使用Drawboard PDF出现手写偏移、卡顿、延迟现象的解决方式
  • 原文地址:https://www.cnblogs.com/pangblog/p/3292179.html
Copyright © 2011-2022 走看看