zoukankan      html  css  js  c++  java
  • Apache Nutch 1.3 学习笔记七(CrawlDb updatedb)

    这里主要看一下CrawlDb中的updatedb,它主要是用来更新CrawlDb数据库的

    1. bin/nutch updatedb

    我们用nutch的命令行时会看到一个方法叫updatedb,其实这个方法就是调用CrawlDb.java类中的update方法,它的参数帮助如下:

     

    1. Usage: CrawlDb <crawldb> (-dir <segments> | <seg1> <seg2> ...) [-force] [-normalize] [-filter] [-noAdditions]  
    2.     crawldb CrawlDb to update  
    3.     -dir segments   parent directory containing all segments to update from  
    4.     seg1 seg2 ...   list of segment names to update from  
    5.     -force  force update even if CrawlDb appears to be locked (CAUTION advised)  
    6.     -normalize  use URLNormalizer on urls in CrawlDb and segment (usually not needed)  
    7.     -filter use URLFilters on urls in CrawlDb and segment  
    8.     -noAdditions    only update already existing URLs, don't add any newly discovered URLs  

    2. 下面我们来分析一下其update方法到底做了些什么

    2.1 update的任务提交参数,部分代码如下

     

    1. // 生成一个新的任务,这里面也做了一些相应的配置,  
    2.     // 加入了current目录,就是初始的CrawlDb目录,设置了输入格式为SequenceFileInputFormat  
    3.     // 配置了Map-ReducerCrawlDbFilter-CrawlDbReducer  
    4.     // 配置了输出格式为MapFileOutputFormat  
    5.     // 还配置了输出的<key,value>类型<Text,CrawlDatum>  
    6.     JobConf job = CrawlDb.createJob(getConf(), crawlDb);  
    7.     // 配置一些参数  
    8.     job.setBoolean(CRAWLDB_ADDITIONS_ALLOWED, additionsAllowed);  
    9.     job.setBoolean(CrawlDbFilter.URL_FILTERING, filter);  
    10.     job.setBoolean(CrawlDbFilter.URL_NORMALIZING, normalize);  
    11.     // 加入输入目录,一个是crawl_fetch,另一个是crawl_parse  
    12.     for (int i = 0; i < segments.length; i++) {  
    13.       Path fetch = new Path(segments[i], CrawlDatum.FETCH_DIR_NAME);  
    14.       Path parse = new Path(segments[i], CrawlDatum.PARSE_DIR_NAME);  
    15.       if (fs.exists(fetch) && fs.exists(parse)) {  
    16.         FileInputFormat.addInputPath(job, fetch);  
    17.         FileInputFormat.addInputPath(job, parse);  
    18.       } else {  
    19.         LOG.info(" - skipping invalid segment " + segments[i]);  
    20.       }  
    21.     }  

    2.2 分析一下其任务的Map-Reducer做了些什么

    CrawlDbFilter主要是对url进行过滤和正规化。
    CrawlDbReducer
    主要是用来聚合相同url(老的与新产生的)的,这东东写得很复杂,下面来分析一下其源代码:

     

    1. public void reduce(Text key, Iterator<CrawlDatum> values,  
    2.                      OutputCollector<Text, CrawlDatum> output, Reporter reporter)  
    3.     throws IOException {  
    4.     
    5.     
    6.     CrawlDatum fetch = new CrawlDatum();  
    7.     CrawlDatum old = new CrawlDatum();  
    8.     
    9.     
    10.     boolean fetchSet = false;  
    11.     boolean oldSet = false;  
    12.     byte[] signature = null;  
    13.     boolean multiple = false; // avoid deep copy when only single value exists  
    14.     linked.clear();  
    15.     org.apache.hadoop.io.MapWritable metaFromParse = null;  
    16.         
    17.     // 这个循环主要是遍历所有相同urlvalue(CrawlDatum)值,对oldfetch两个变量进行赋值。  
    18.     // 和收集其外链接,把它们放入一个按分数排序的优先队列中去  
    19.     while (values.hasNext()) {  
    20.       CrawlDatum datum = (CrawlDatum)values.next();  
    21.       // 判断是否要对CrawlDatum进行深度复制  
    22.       if (!multiple && values.hasNext()) multiple = true;  
    23.       // 判断CrawlDatum中是否有数据库相关的参数,如STATUS_DB_(UNFETCHED|FETCHED|GONE|REDIR_TEMP|REDIR_PERM|NOTMODIFIED)  
    24.       if (CrawlDatum.hasDbStatus(datum)) {  
    25.         if (!oldSet) {  
    26.           if (multiple) {  
    27.             old.set(datum);  
    28.           } else {  
    29.             // no need for a deep copy - this is the only value  
    30.             old = datum;  
    31.           }  
    32.           oldSet = true;  
    33.         } else {  
    34.           // always take the latest version  
    35.           // 总是得到最新的CrawlDatum版本  
    36.           if (old.getFetchTime() < datum.getFetchTime()) old.set(datum);  
    37.         }  
    38.         continue;  
    39.       }  
    40.     
    41.     
    42.         // 判断CrawlDatum是否有关抓取的状态,如STATUS_FETCH_(SUCCESS|RETRY|REDIR_TEMP|REDIR_PERM|GONE|NOTMODIFIED)  
    43.       if (CrawlDatum.hasFetchStatus(datum)) {  
    44.         if (!fetchSet) {  
    45.           if (multiple) {  
    46.             fetch.set(datum);  
    47.           } else {  
    48.             fetch = datum;  
    49.           }  
    50.           fetchSet = true;  
    51.         } else {  
    52.           // always take the latest version  
    53.           if (fetch.getFetchTime() < datum.getFetchTime()) fetch.set(datum);  
    54.         }  
    55.         continue;  
    56.       }  
    57.     
    58.     
    59.     // 根据CrawlDatum的状态来收集另一些信息  
    60.       switch (datum.getStatus()) {                // collect other info  
    61.           // 如果这个CrawlDatum是一个外链接,那放入一个优先队列中,按分数的降序来做  
    62.       case CrawlDatum.STATUS_LINKED:  
    63.         CrawlDatum link;  
    64.         if (multiple) {  
    65.           link = new CrawlDatum();  
    66.           link.set(datum);  
    67.         } else {  
    68.           link = datum;  
    69.         }  
    70.         linked.insert(link);  
    71.         break;  
    72.       case CrawlDatum.STATUS_SIGNATURE:  
    73.         // 得到其唯一ID  
    74.         signature = datum.getSignature();  
    75.         break;  
    76.       case CrawlDatum.STATUS_PARSE_META:  
    77.         // 得到其元数据  
    78.         metaFromParse = datum.getMetaData();  
    79.         break;  
    80.       default:  
    81.         LOG.warn("Unknown status, key: " + key + ", datum: " + datum);  
    82.       }  
    83.     }  
    84.         
    85.     // copy the content of the queue into a List  
    86.     // in reversed order  
    87.     int numLinks = linked.size();  
    88.     List<CrawlDatum> linkList = new ArrayList<CrawlDatum>(numLinks);  
    89.     for (int i = numLinks - 1; i >= 0; i--) {  
    90.       linkList.add(linked.pop());  
    91.     }  
    92.         
    93.     // 如果这个CrawlDatum集合中没有数据库相关的状态(也就是说没有这个url的原始状态)或者配置了不添加外链接,直接返回  
    94.     // if it doesn't already exist, skip it  
    95.     if (!oldSet && !additionsAllowed) return;  
    96.         
    97.     // if there is no fetched datum, perhaps there is a link  
    98.     // 如果这个CrawlDatum集合中没有和抓取相关的状态,并且外链接数量要大于0  
    99.     if (!fetchSet && linkList.size() > 0) {  
    100.       fetch = linkList.get(0); // 得到第一个外链接  
    101.       fetchSet = true;  
    102.     }  
    103.         
    104.     // still no new data - record only unchanged old data, if exists, and return  
    105.     // 如果没有抓取相头的状态,也没有外链接,也就是说这个CrawlDatum是老的,  
    106.     if (!fetchSet) {  
    107.         // 判断是否有和数据库相关的状态,有的话就输出,没有的话就直接返回  
    108.       if (oldSet) {// at this point at least "old" should be present  
    109.         output.collect(key, old);  
    110.       } else {  
    111.         LOG.warn("Missing fetch and old value, signature=" + signature);  
    112.       }  
    113.       return;  
    114.     }  
    115.         
    116.     // 下面是用来初始化最新的CrawlDatum版本  
    117.     if (signature == null) signature = fetch.getSignature();  
    118.     long prevModifiedTime = oldSet ? old.getModifiedTime() : 0L;  
    119.     long prevFetchTime = oldSet ? old.getFetchTime() : 0L;  
    120.     
    121.     
    122.     // initialize with the latest version, be it fetch or link  
    123.     result.set(fetch);  
    124.     if (oldSet) {  
    125.       // copy metadata from old, if exists  
    126.       if (old.getMetaData().size() > 0) {  
    127.         result.putAllMetaData(old);  
    128.         // overlay with new, if any  
    129.         if (fetch.getMetaData().size() > 0)  
    130.           result.putAllMetaData(fetch);  
    131.       }  
    132.       // set the most recent valid value of modifiedTime  
    133.       if (old.getModifiedTime() > 0 && fetch.getModifiedTime() == 0) {  
    134.         result.setModifiedTime(old.getModifiedTime());  
    135.       }  
    136.     }  
    137.         
    138.     下面是用来确定其最新的状态  
    139.     switch (fetch.getStatus()) {                // determine new status  
    140.     
    141.     
    142.     case CrawlDatum.STATUS_LINKED:                // it was link  
    143.       if (oldSet) {                          // if old exists  
    144.         result.set(old);                          // use it  
    145.       } else {  
    146.         result = schedule.initializeSchedule((Text)key, result);  
    147.         result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);  
    148.         try {  
    149.           scfilters.initialScore((Text)key, result);  
    150.         } catch (ScoringFilterException e) {  
    151.           if (LOG.isWarnEnabled()) {  
    152.             LOG.warn("Cannot filter init score for url " + key +  
    153.                      ", using default: " + e.getMessage());  
    154.           }  
    155.           result.setScore(0.0f);  
    156.         }  
    157.       }  
    158.       break;  
    159.           
    160.     case CrawlDatum.STATUS_FETCH_SUCCESS:         // succesful fetch  
    161.     case CrawlDatum.STATUS_FETCH_REDIR_TEMP:      // successful fetch, redirected  
    162.     case CrawlDatum.STATUS_FETCH_REDIR_PERM:  
    163.     case CrawlDatum.STATUS_FETCH_NOTMODIFIED:     // successful fetch, notmodified  
    164.       // determine the modification status  
    165.       int modified = FetchSchedule.STATUS_UNKNOWN;  
    166.       if (fetch.getStatus() == CrawlDatum.STATUS_FETCH_NOTMODIFIED) {  
    167.         modified = FetchSchedule.STATUS_NOTMODIFIED;  
    168.       } else {  
    169.         if (oldSet && old.getSignature() != null && signature != null) {  
    170.           if (SignatureComparator._compare(old.getSignature(), signature) != 0) {  
    171.             modified = FetchSchedule.STATUS_MODIFIED;  
    172.           } else {  
    173.             modified = FetchSchedule.STATUS_NOTMODIFIED;  
    174.           }  
    175.         }  
    176.       }  
    177.       // set the schedule  
    178.       result = schedule.setFetchSchedule((Text)key, result, prevFetchTime,  
    179.           prevModifiedTime, fetch.getFetchTime(), fetch.getModifiedTime(), modified);  
    180.       // set the result status and signature  
    181.       if (modified == FetchSchedule.STATUS_NOTMODIFIED) {  
    182.         result.setStatus(CrawlDatum.STATUS_DB_NOTMODIFIED);  
    183.         if (oldSet) result.setSignature(old.getSignature());  
    184.       } else {  
    185.         switch (fetch.getStatus()) {  
    186.         case CrawlDatum.STATUS_FETCH_SUCCESS:  
    187.           result.setStatus(CrawlDatum.STATUS_DB_FETCHED);  
    188.           break;  
    189.         case CrawlDatum.STATUS_FETCH_REDIR_PERM:  
    190.           result.setStatus(CrawlDatum.STATUS_DB_REDIR_PERM);  
    191.           break;  
    192.         case CrawlDatum.STATUS_FETCH_REDIR_TEMP:  
    193.           result.setStatus(CrawlDatum.STATUS_DB_REDIR_TEMP);  
    194.           break;  
    195.         default:  
    196.           LOG.warn("Unexpected status: " + fetch.getStatus() + " resetting to old status.");  
    197.           if (oldSet) result.setStatus(old.getStatus());  
    198.           else result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);  
    199.         }  
    200.         result.setSignature(signature);  
    201.         if (metaFromParse != null) {  
    202.             for (Entry<Writable, Writable> e : metaFromParse.entrySet()) {  
    203.               result.getMetaData().put(e.getKey(), e.getValue());  
    204.             }  
    205.           }  
    206.       }  
    207.       // if fetchInterval is larger than the system-wide maximum, trigger  
    208.       // an unconditional recrawl. This prevents the page to be stuck at  
    209.       // NOTMODIFIED state, when the old fetched copy was already removed with  
    210.       // old segments.  
    211.       if (maxInterval < result.getFetchInterval())  
    212.         result = schedule.forceRefetch((Text)key, result, false);  
    213.       break;  
    214.     case CrawlDatum.STATUS_SIGNATURE:  
    215.       if (LOG.isWarnEnabled()) {  
    216.         LOG.warn("Lone CrawlDatum.STATUS_SIGNATURE: " + key);  
    217.       }     
    218.       return;  
    219.     case CrawlDatum.STATUS_FETCH_RETRY:           // temporary failure  
    220.       if (oldSet) {  
    221.         result.setSignature(old.getSignature());  // use old signature  
    222.       }  
    223.       result = schedule.setPageRetrySchedule((Text)key, result, prevFetchTime,  
    224.           prevModifiedTime, fetch.getFetchTime());  
    225.       if (result.getRetriesSinceFetch() < retryMax) {  
    226.         result.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);  
    227.       } else {  
    228.         result.setStatus(CrawlDatum.STATUS_DB_GONE);  
    229.       }  
    230.       break;  
    231.     
    232.     
    233.     case CrawlDatum.STATUS_FETCH_GONE:            // permanent failure  
    234.       if (oldSet)  
    235.         result.setSignature(old.getSignature());  // use old signature  
    236.       result.setStatus(CrawlDatum.STATUS_DB_GONE);  
    237.       result = schedule.setPageGoneSchedule((Text)key, result, prevFetchTime,  
    238.           prevModifiedTime, fetch.getFetchTime());  
    239.       break;  
    240.     
    241.     
    242.     default:  
    243.       throw new RuntimeException("Unknown status: " + fetch.getStatus() + " " + key);  
    244.     }  
    245.     
    246.     
    247.     // 这里用来更新result的分数  
    248.     try {  
    249.       scfilters.updateDbScore((Text)key, oldSet ? old : null, result, linkList);  
    250.     } catch (Exception e) {  
    251.       if (LOG.isWarnEnabled()) {  
    252.         LOG.warn("Couldn't update score, key=" + key + ": " + e);  
    253.       }  
    254.     }  
    255.     // remove generation time, if any  
    256.     result.getMetaData().remove(Nutch.WRITABLE_GENERATE_TIME_KEY);  
    257.     output.collect(key, result);   // 写出数据  
    258.   }  
    259.       
    260. }  

    3. 总结

    • 这里大概分析了一下CrawlDb的更新流程,有一些地方还是没有看得太明白,可能要通过测试来更深入的理解。
    • 其中流程就是对三个目录进行合并,对相同的urlvalue(CrawlDatum)进行聚合,产生新的CarwlDatum,再写回原来的数据库中。
    • 其复杂的地方在于如果对聚合后的结果进行处理,这个有空还要再看一下。

    作者:http://blog.csdn.net/amuseme_lu


    相关文章阅读及免费下载:

    Apache Nutch 1.3 学习笔记目录

    Apache Nutch 1.3 学习笔记一

    Apache Nutch 1.3 学习笔记二

    Apache Nutch 1.3 学习笔记三(Inject)

    Apache Nutch 1.3 学习笔记三(Inject CrawlDB Reader)

    Apache Nutch 1.3 学习笔记四(Generate)

    Apache Nutch 1.3 学习笔记四(SegmentReader分析)

    Apache Nutch 1.3 学习笔记五(FetchThread)

    Apache Nutch 1.3 学习笔记五(Fetcher流程)

    Apache Nutch 1.3 学习笔记六(ParseSegment)

    Apache Nutch 1.3 学习笔记七(CrawlDb - updatedb)

    Apache Nutch 1.3 学习笔记八(LinkDb)

    Apache Nutch 1.3 学习笔记九(SolrIndexer)

    Apache Nutch 1.3 学习笔记十(Ntuch 插件机制简单介绍)

    Apache Nutch 1.3 学习笔记十(插件扩展)

    Apache Nutch 1.3 学习笔记十(插件机制分析)

    Apache Nutch 1.3 学习笔记十一(页面评分机制 OPIC)

    Apache Nutch 1.3 学习笔记十一(页面评分机制 LinkRank 介绍)

    Apache Nutch 1.3 学习笔记十二(Nutch 2.0 的主要变化)

    更多《Apache Nutch文档》,尽在开卷有益360 http://www.docin.com/book_360

  • 相关阅读:
    ZipArchive 的使用
    Bootstrap使用心得
    SQL SERVER 级联删除
    ASP.NET 使用C#代码设置页面元素中的样式或属性
    GDI+中发生一般性错误之文件被占用
    .Net 中资源的使用方式
    一张图全解析个性化邮件那么重要
    看天猫EDM营销学企业EDM营销
    细数EDM营销中存在的两大盲点
    如何进行EDM邮件内容的撰写
  • 原文地址:https://www.cnblogs.com/ibook360/p/2222172.html
Copyright © 2011-2022 走看看