zoukankan      html  css  js  c++  java
  • Apache Nutch 1.3 学习笔记四(Generate)

    1. Generate的作用

       Inject之后就是Generate,这个方法主要是从CrawlDb中产生一个Fetch可以抓取的url集合(fetchlist),再结合一定的过滤条件,它的命令行如下:
       

     

    1. bin/nutch generate  
    2.    Usage: Generator <crawldb> <segments_dir> [-force] [-topN N] [-numFetchers numFetchers] [-adddays numDays] [-noFilter] [-noNorm][-maxNumSegments num]  


       
    参数说明:
       * crawldb: crawldb
    的相对路径
       * segments: segments
    的相对路径
       * force:  
    这个主要是对目录进行加锁用的配置,如果为true,当目标锁文件存在的,会认为是有效的,但如果为false,当目标文件存在时,就就会抛出IOException
       * topN:
    这里表示产生TopNurl
       * numFetchers:
    这里是指GenerateMP任务要几个Reducer节点,也就是要几个输出文件,这个配置会影响到FetcherMap个数。
       * numDays:
    这里是表示当前的日期,是在对url过滤中用到的
       * noFilter:
    这里表示是否对url进行过滤
       * noNorm:
    这里表示是否以url进行规格化
       * maxNumSegments:
    这里表示segment的最大个数
       
    Nutch 1.3 版本中,支持在一次Generate为多个segment产生相应的fetchlists,而IP地址的解析只针对那些准备被抓取的url,在一个segment中,所有url都以IP,domain或者host来分类。

    2. Generate源代码分析

       generate可能主要分成三部分,

    1.    + 第一部分是产生要抓取的url子集,进行相应的过滤和规格化操作
    2.    + 第二部分是读取上面产生的url子集,生成多个segment
    3.    + 第三部分是更新crawldb数据库,以保证下一次Generate不会包含相同的url

       2.1 第一部分,产生url子集分析

       这里主要是一个MP任务,用于产生相应的url抓取集合,主要代码如下:
      

     

    1. // map to inverted subset due for fetch, sort by score  
    2.    JobConf job = new NutchJob(getConf());  
    3.    job.setJobName("generate: select from " + dbDir);  
    4.     
    5.     
    6.    // 如果用户没有设置numFetchers这个值,那就默认为Map的个数  
    7.    if (numLists == -1) { // for politeness make  
    8.      numLists = job.getNumMapTasks(); // a partition per fetch task  
    9.    }  
    10. // 如果MapReduce的设置为local,那就产生一个输出文件  
    11. // NOTE:这里partition也是Hadoop中的一个概念,就是在Map后,它会对每一个key进行partition操作,看这个key会映射到哪一个reduce上,  
    12. // 所以相同keyvalue就会聚合到这个reduce节点上  
    13.    if ("local".equals(job.get("mapred.job.tracker")) && numLists != 1) {  
    14.      // override  
    15.      LOG.info("Generator: jobtracker is 'local', generating exactly one partition.");  
    16.      numLists = 1;  
    17.    }  
    18.    job.setLong(GENERATOR_CUR_TIME, curTime);  
    19.    // record real generation time  
    20.    long generateTime = System.currentTimeMillis();  
    21.    job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);  
    22.    job.setLong(GENERATOR_TOP_N, topN);  
    23.    job.setBoolean(GENERATOR_FILTER, filter);  
    24.    job.setBoolean(GENERATOR_NORMALISE, norm);  
    25.    job.setInt(GENERATOR_MAX_NUM_SEGMENTS, maxNumSegments);  
    26.     
    27.     
    28.    // 配置输入路径  
    29.    FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));  
    30.    job.setInputFormat(SequenceFileInputFormat.class);  // 配置CrawlDb的输入格式  
    31.     
    32.     
    33.    // 配置Mapper,PartitionerReducer,这里都是Selector,因为它继承了这三个抽象接口  
    34.    job.setMapperClass(Selector.class);                   
    35.    job.setPartitionerClass(Selector.class);  
    36.    job.setReducerClass(Selector.class);  
    37.     
    38.     
    39.    FileOutputFormat.setOutputPath(job, tempDir);  
    40. // 配置输出格式  
    41.    job.setOutputFormat(SequenceFileOutputFormat.class);  
    42. // 配置输出的<key,value>的类型<FloatWritable,SelectorEntry>  
    43.    job.setOutputKeyClass(FloatWritable.class);  
    44. // 因为Map的输出会按key来排序,所以这里扩展了一个排序比较方法  
    45.    job.setOutputKeyComparatorClass(DecreasingFloatComparator.class);  
    46.    job.setOutputValueClass(SelectorEntry.class);  
    47. // 设置输出格式,这个类继承自OutputFormat,如果用户要扩展自己的OutputFormat,那必须继承自这个抽象接口  
    48.    job.setOutputFormat(GeneratorOutputFormat.class);  
    49.     
    50.     
    51.    try {  
    52.      JobClient.runJob(job);   // 提交任务  
    53.    } catch (IOException e) {  
    54.      throw e;  
    55.    }  



    下面主要分析一下Selector这个类,它使用了多重继承,同时实现了三个接口,Mapper,Partitioner,Reducer

    * 下面是SelectorMapper的分析

     这里的Map主要做了几件事:

    • 如果有filter设置,先对url进行过滤
    • 通过FetchSchedule查看当前url是不达到了抓取的时间,没有达到抓取时间的就过滤掉
    • 计算新的排序分数,根据url的当前分数,这里调用了ScoringFiltersgeneratorSortValue方法
    • 对上一步产生的分数进行过滤,当这个分数小于一定的阀值时,对url进行过滤
    • 收集所有没有被过滤的url信息,输出为<FloatWritable,SelectorEntry>类型,这里的key就是第三步计算出来的分数,  Map的输出会调用DecreasingFloatComparator方法来对这个key进行排序

    * Selector中的Partition方法主要是调用了URLPartition来进行相应的分块操作

     这里会首先根据urlhashCode来进行partition,如果用户设置了根据domain或者ip来进行partition,那这里会根据用户的配置来
     
    进行相应的partition操作,最后调用如下方法来得到一个映射的reduceID
     (hashCode & Integer.MAX_VALUE) % numReduceTasks;

    * Selector中的Reducer操作主要是收集没有被过滤的url,每个reducerurl个数不会超过limit个数,这个limit是通过如下公式计算的

     

    1. limit = job.getLong(GENERATOR_TOP_N, Long.MAX_VALUE) / job.getNumReduceTasks();  

     GENERATOR_TOP_N是用户定义的,reducer的个数也是用户定义的。
     
    在一个reducer任务中,如果收集的url个数超过了这个limit,那就新开一个segment,这里的segment也有一个上限,就是用户设置的maxNumSegments, 当新开的segment个数大小这个maxNumSegment时,url就会被过滤掉。
     
    这里urlsegment中的分布有两个情况,一种是当没有设置GENERATOR_MAX_COUNT这个参数时,每一个segment中所包含的url个数不超过limit上限,segmetn中对urlhost个数没有限制,而segment个数的上限为maxNumSegments这个变量的值,这个变量是通过设置GENERATOR_MAX_NUM_SEGMENTS这个参数得到的,默认为1,所以说默认只产生一个segment; 而当设置了GENERATOR_MAX_COUNT的时候,每一个segment中所包含的urlhost的个数的上限就是这个maxCount的值,也就是说每一个segment所包含的同一个hosturl的个数不能超过maxCount这个值,当超过这个值后,就把这个url放到下一个segment中去。 
     
    举个简单的例子,如果Reducer中收到10url,而现在maxNumSegments2limit5,也就是说一个segment最多放5url,那这时如果用第一种设置的话,那0-4url会放在第一个segment中,5-9url会放在第二个segment,这样的话,两个segment都放了5url;但如果用第二种方法,这里设置的maxCount4,但我们这里的10urlhost分成2类,也就是说0-4url属于同一个host1, 5-9url属于host2,那这里会把0-4个中的前4url放在segment1中,host1的第5url放在segmetn2中,而host2中的5-8url会放在segment1中,而第9个网页会放在segment2中,因为这里的maxCount设置为4,也就是说在每一个segment中,一个host所对应的url不能超过4,所以这里的segment1放了8url,而segment2放了2url,这里会现出不均匀的情况。


    *
    有没有注意到这里的OutputFormat使用了GenerateOutputFormat,它扩展了MultipleSequenceFileOutputFormat,重写了generateFileNameForKeyValue这个方法,就是对不同的segment生成不同的目录名,生成规则如下
      

     

    1. "fetchlist-" + value.segnum.toString() + "/" + name;  

    2.2 第二部分是读取上面产生的url子集,生成多个segment,主要代码如下:

         

     

    1. // read the subdirectories generated in the temp  
    2.    // output and turn them into segments  
    3.    List<Path> generatedSegments = new ArrayList<Path>();  
    4.     
    5.     
    6.    FileStatus[] status = fs.listStatus(tempDir);  // 这里读取上面生成的多个fetchlistsegment  
    7.    try {  
    8.      for (FileStatus stat : status) {  
    9.        Path subfetchlist = stat.getPath();  
    10.        if (!subfetchlist.getName().startsWith("fetchlist-")) continue;   // 过滤不是以fetchlist-开头的文件  
    11.        // start a new partition job for this segment  
    12.        Path newSeg = partitionSegment(fs, segments, subfetchlist, numLists);   // segment进行Partition操作,产生一个新的目录  
    13.        generatedSegments.add(newSeg);  
    14.      }  
    15.    } catch (Exception e) {  
    16.      LOG.warn("Generator: exception while partitioning segments, exiting ...");  
    17.      fs.delete(tempDir, true);  
    18.      return null;  
    19.    }  
    20.     
    21.     
    22.    if (generatedSegments.size() == 0) {  
    23.      LOG.warn("Generator: 0 records selected for fetching, exiting ...");  
    24.      LockUtil.removeLockFile(fs, lock);  
    25.      fs.delete(tempDir, true);  
    26.      return null;  
    27.    }  

      * 下面主要对这个partitionSegment函数进行分析,看看到底做了些什么

     

    1.    // invert again, partition by host/domain/IP, sort by url hash  
    2. // 从代码的注释中我们可以看到,这里主要是对urlhost/domain/IP进行分类  
    3. // NOTE:这里的分类就是Partition的意思,就是相同host或者是domain或者是IPurl发到同一台机器上  
    4. // 这里主要是通过URLPartitioner来做的,具体是按哪一个来分类,是通用参数来配置的,这里有PARTITION_MODE_DOMAINPARTITION_MODE_IP  
    5. // 来配置,默认是按UrlhashCode来分。  
    6.     if (LOG.isInfoEnabled()) {  
    7.         LOG.info("Generator: Partitioning selected urls for politeness.");  
    8.     }  
    9.     Path segment = new Path(segmentsDir, generateSegmentName()); // 也是在segmentDir目录产生一个新的目录,以当前时间命名  
    10.     Path output = new Path(segment, CrawlDatum.GENERATE_DIR_NAME); // 在上面的目录下再生成一个特定的crawl_generate目录  
    11.     
    12.     
    13.     LOG.info("Generator: segment: " + segment);  
    14. 下面又用一个MP任务来做  
    15.     NutchJob job = new NutchJob(getConf());  
    16.     job.setJobName("generate: partition " + segment);  
    17.     job.setInt("partition.url.seed", new Random().nextInt()); // 这里产生一个Partition的随机数  
    18.     
    19.     
    20.     FileInputFormat.addInputPath(job, inputDir);                // 输入目录名  
    21.   job.setInputFormat(SequenceFileInputFormat.class);          // 输入文件格式  
    22.     
    23.     
    24.     job.setMapperClass(SelectorInverseMapper.class);            // 输入的Mapper,主要是过滤原来的key,使用url来做为新的key  
    25.     job.setMapOutputKeyClass(Text.class);                       // Mapperkey输出类型,这里就是url的类型  
    26.     job.setMapOutputValueClass(SelectorEntry.class);            // Mappervalue的输出类型,这里还是原因的SelectorEntry类型  
    27.     job.setPartitionerClass(URLPartitioner.class);              // 这里的key(url)Partition使用这个类来做,这个类前面有说明  
    28.     job.setReducerClass(PartitionReducer.class);                // 这里的Reducer类,  
    29.     job.setNumReduceTasks(numLists);                            // 这里配置工作的Reducer的个数,也就是生成几个相应的输出文件  
    30.     
    31.     
    32.     FileOutputFormat.setOutputPath(job, output);                // 配置输出路径  
    33.   job.setOutputFormat(SequenceFileOutputFormat.class);      // 配置输出格式  
    34.     job.setOutputKeyClass(Text.class);                          // 配置输出的keyvalue的类型  
    35.     job.setOutputValueClass(CrawlDatum.class);                  // 注意这里返回的类型为<Text,CrawlDatum>  
    36.     job.setOutputKeyComparatorClass(HashComparator.class);      // 这里定义控制key排序的比较方法  
    37.     JobClient.runJob(job);                                      // 提交任务  
    38.     return segment;   

    2.3 第三部分是更新crawldb数据库,以保证下一次Generate不会包含相同的url,这个是可以配置的,主要代码如下:

     

    1. if (getConf().getBoolean(GENERATE_UPDATE_CRAWLDB, false)) {   // 判断是否要把状态更新到原来的数据库中  
    2.         // update the db from tempDir  
    3.         Path tempDir2 = new Path(getConf().get("mapred.temp.dir", ".") + "/generate-temp-"  
    4.          + System.currentTimeMillis());  
    5.     
    6.     
    7.         job = new NutchJob(getConf()); // 生成MP任务的配置  
    8.         job.setJobName("generate: updatedb " + dbDir);  
    9.         job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);  
    10.     // 加上面生成的所有segment的路径做为输入  
    11.         for (Path segmpaths : generatedSegments) { // add each segment dir to input path  
    12.         Path subGenDir = new Path(segmpaths, CrawlDatum.GENERATE_DIR_NAME);  
    13.         FileInputFormat.addInputPath(job, subGenDir);  
    14.         }  
    15.         // add current crawldb to input path  
    16.     // 把数据库的路径也做为输入  
    17.         FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));  
    18.         job.setInputFormat(SequenceFileInputFormat.class);          // 定义了输入格式  
    19.         job.setMapperClass(CrawlDbUpdater.class);                   // 定义了MapperReducer方法  
    20.         job.setReducerClass(CrawlDbUpdater.class);  
    21.         job.setOutputFormat(MapFileOutputFormat.class);             // 定义了输出格式  
    22.         job.setOutputKeyClass(Text.class);                          // 定义了输出的keyvalue的类型  
    23.         job.setOutputValueClass(CrawlDatum.class);  
    24.         FileOutputFormat.setOutputPath(job, tempDir2);              // 定义了临时输出目录  
    25.         try {  
    26.           JobClient.runJob(job);  
    27.           CrawlDb.install(job, dbDir);                              // 删除原来的数据库,把上面的临时输出目录重命名为真正的数据目录名  
    28.         } catch (IOException e) {  
    29.           LockUtil.removeLockFile(fs, lock);  
    30.           fs.delete(tempDir, true);  
    31.           fs.delete(tempDir2, true);  
    32.           throw e;  
    33.         }  
    34.         fs.delete(tempDir2, true);  
    35.     }     

    * 下面我们来看一下CrawlDbUpdater类做了些什么,它实现了MapperReducer的接口,接口说明如下

    它是用来更新CrawlDb数据库,以保证下一次Generate不会包含相同的url
    它的map函数很简单,只是收集相应的<key,value>操作,没有做其它操作,下面我们来看一下它的reduce方法做了些什么

     

    1. genTime.set(0L);  
    2.             while (values.hasNext()) { // 这里遍历相同urlCrawlDatum  
    3.         CrawlDatum val = values.next();  
    4.         if (val.getMetaData().containsKey(Nutch.WRITABLE_GENERATE_TIME_KEY)) {   // 判断当前url是否已经被generate  
    5.                 LongWritable gt = (LongWritable) val.getMetaData().get(  
    6.                 Nutch.WRITABLE_GENERATE_TIME_KEY);                              // 得到Generate的时间   
    7.                 genTime.set(gt.get());  
    8.                 if (genTime.get() != generateTime) {    // 还没看明白这里是什么意思,一种情况会产生不相同,当这个url已经被generate一次,这里被第二次generate,所以会产生时间不同  
    9.                 orig.set(val);  
    10.                 genTime.set(0L);  
    11.                 continue;       // 不知道这里为什么要加continue,加与不加应该没什么区别  
    12.                 }  
    13.             } else {  
    14.                 orig.set(val);  
    15.             }  
    16.             }  
    17.             if (genTime.get() != 0L) {        // NOTE:想想这里什么时候genTime0,当这个url被过滤掉,或者没有符合Generate要求,或者分数小于相应的阀值时  
    18.         orig.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);  // 设置新的Generate时间  
    19.             }  
    20.             output.collect(key, orig);  

    3. 总结

    这里大概介绍了一下Generate的流程,其中大量用到了MapReduce任务,还有大量的配置,要深入理解还需要去自己实践来加深理解。

    作者:http://blog.csdn.net/amuseme_lu


    相关文章阅读及免费下载:

    Apache Nutch 1.3 学习笔记目录

    Apache Nutch 1.3 学习笔记一

    Apache Nutch 1.3 学习笔记二

    Apache Nutch 1.3 学习笔记三(Inject)

    Apache Nutch 1.3 学习笔记三(Inject CrawlDB Reader)

    Apache Nutch 1.3 学习笔记四(Generate)

    Apache Nutch 1.3 学习笔记四(SegmentReader分析)

    Apache Nutch 1.3 学习笔记五(FetchThread)

    Apache Nutch 1.3 学习笔记五(Fetcher流程)

    Apache Nutch 1.3 学习笔记六(ParseSegment)

    Apache Nutch 1.3 学习笔记七(CrawlDb - updatedb)

    Apache Nutch 1.3 学习笔记八(LinkDb)

    Apache Nutch 1.3 学习笔记九(SolrIndexer)

    Apache Nutch 1.3 学习笔记十(Ntuch 插件机制简单介绍)

    Apache Nutch 1.3 学习笔记十(插件扩展)

    Apache Nutch 1.3 学习笔记十(插件机制分析)

    Apache Nutch 1.3 学习笔记十一(页面评分机制 OPIC)

    Apache Nutch 1.3 学习笔记十一(页面评分机制 LinkRank 介绍)

    Apache Nutch 1.3 学习笔记十二(Nutch 2.0 的主要变化)

    更多《Apache Nutch文档》,尽在开卷有益360 http://www.docin.com/book_360

  • 相关阅读:
    WinForm多线程+委托防止界面假死
    网页制作知识库
    HTML Agility Pack:簡單好用的快速 HTML Parser
    .NET 4.0 和 .NET 4.0 Client Profile 区别
    使用OPCNetAPI连接OPCServer
    Win7系统删除微软拼音
    Unity3D脚本18:可视化辅助设置类 Gizmos
    mysql 索引
    重启oracle方法一二三
    php7 安装扩展
  • 原文地址:https://www.cnblogs.com/ibook360/p/2221484.html
Copyright © 2011-2022 走看看