zoukankan      html  css  js  c++  java
  • Apache Nutch 1.3 学习笔记六(ParseSegment)

    1. bin/nutch parse

    这个命令主要是用来解析抓取的内容,对其进行外链接分析,计算分数等操作,这个解析在抓取的时候就可以设置是否进行,如果在抓取的时候没有设置解析抓取的网页内容,那这边可以单独用一个Map-Reduce任务来做。
    后面的参数为:Usage: ParseSegment segment
    这里是一个segment的目录名

    2. ParseSegment源代码分析

    2.1 任务的启动

    ParseSegment任务的启动也是用一个Map-Reduce任务的,下面是它的源代码

     

    1. // 配置一个Job  
    2. JobConf job = new NutchJob(getConf());  
    3.    job.setJobName("parse " + segment);  
    4.     
    5.     
    6.    // add content directory to FileInputFormat path  
    7. // segment目录下的content目录加入输入路径中  
    8.    FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));  
    9.    job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());  
    10.    // set input format  
    11. // 设置输入格式  
    12.    job.setInputFormat(SequenceFileInputFormat.class);  
    13. // 设置Map-Reduce方法  
    14.    job.setMapperClass(ParseSegment.class);  
    15.    job.setReducerClass(ParseSegment.class);  
    16.        
    17. // 设置输出路径  
    18.    FileOutputFormat.setOutputPath(job, segment);  
    19.    // Parse Output Format to output  
    20. // 设置输出格式  
    21.    job.setOutputFormat(ParseOutputFormat.class);  
    22. // 设置输出的<key,value>类型<Text,ParseImpl>  
    23.    job.setOutputKeyClass(Text.class);  
    24. // NOTE:这里注意一下,输出的valueParseImpl,而ParseOutputFormat的输出为Parse  
    25. // 这里的ParseImpl是实现Parse接口的,是is-a的关系  
    26.    job.setOutputValueClass(ParseImpl.class);  
    27.     
    28.     
    29.    JobClient.runJob(job);  

    2.2 ParseSegment类中的MapReduce分析

    这个类主要是用来分析content中的内容,它实现了MapperReducer接口
    Mapper中,主要是对content内容进行调用相应的插件进行解析,产生一个ParseResult,再遍历这个ParseResult,把其中解析出来的内容collect出去。这个ParseResult是一个收集解析结果的容器,其元素为<Text,Parse>对,这里解析可能产生多个这样的输出元素对,因为这里可能有多个内容与原url进行关联,所以就有可能产生多个<Text,Parse>输出
    这里的Reduce很有趣,只是收集第一个<Text,Parse>对,还不知道是为什么,可能是因为它认为第一个<Text,Parse>的权重最大吧。如果有谁知道的,请告诉我一下。

    2.3 ParseOutputFormat的分析

    我们知道,在写关于Map-Reduce的时候,有时我们想自己控制输出的源,这里你就要实现其架构提供的OutputFormat,前提是你没有找到合适的输出方法,因为Hadoop框架提出了几个常用的OutputFormat方法。
    在实现的OutputFormat接口,主要是实现一个叫getRecordWriter,这个方法返回一个自定义的RecordWriter的子类,用用于写出Reducer的输出<key,value>对,注意一下,在Hadoop架构中,一个<key,value>也叫一条记录。


    下面我们来分析一下这个getReocrdWriter方法,源代码如下:
    呵呵,不要被吓到,一步步分析,老外的代码还是很好看的

     

    1. public RecordWriter<Text, Parse> getRecordWriter(FileSystem fs, JobConf job,  
    2.                                      String name, Progressable progress) throws IOException {  
    3.     
    4.     
    5. // 这里根据配置生成一个url过滤器  
    6.    this.filters = new URLFilters(job);  
    7. // 这里生成一个url的规格化对象  
    8.    this.normalizers = new URLNormalizers(job, URLNormalizers.SCOPE_OUTLINK);  
    9. // 这里生成一个分数计算器  
    10.    this.scfilters = new ScoringFilters(job);  
    11. // 配置url的抓取间隔  
    12.    final int interval = job.getInt("db.fetch.interval.default", 2592000);  
    13. // 得到是否要解析外链接  
    14.    final boolean ignoreExternalLinks = job.getBoolean("db.ignore.external.links", false);  
    15. // 得到每一个网页外链接的解析个数,默认是100个,  
    16.    int maxOutlinksPerPage = job.getInt("db.max.outlinks.per.page", 100);  
    17.    final int maxOutlinks = (maxOutlinksPerPage < 0) ? Integer.MAX_VALUE  
    18.                                                     : maxOutlinksPerPage;  
    19.     
    20.     
    21. // 设置输出的压缩方法  
    22.    final CompressionType compType = SequenceFileOutputFormat.getOutputCompressionType(job);  
    23. // 设置输出的路径  
    24.    Path out = FileOutputFormat.getOutputPath(job);  
    25.        
    26. // 这里是得到输出的三个目录名,crawl_parse,parse_data,parse_text  
    27.    Path text = new Path(new Path(out, ParseText.DIR_NAME), name);  
    28.    Path data = new Path(new Path(out, ParseData.DIR_NAME), name);  
    29.    Path crawl = new Path(new Path(out, CrawlDatum.PARSE_DIR_NAME), name);  
    30.        
    31. // 得到元数据的解析配置  
    32.    final String[] parseMDtoCrawlDB = job.get("db.parsemeta.to.crawldb","").split(" *, *");  
    33.        
    34. // 生成parse_text目录的输出方法  
    35.    final MapFile.Writer textOut =  
    36.      new MapFile.Writer(job, fs, text.toString(), Text.class, ParseText.class,  
    37.          CompressionType.RECORD, progress);  
    38.        
    39. // 生成parse_data目录的输出方法  
    40.    final MapFile.Writer dataOut =  
    41.      new MapFile.Writer(job, fs, data.toString(), Text.class, ParseData.class,  
    42.          compType, progress);  
    43.     
    44.     
    45.    // 生成crawl_parse的输出方法  
    46.    final SequenceFile.Writer crawlOut =  
    47.      SequenceFile.createWriter(fs, job, crawl, Text.class, CrawlDatum.class,  
    48.          compType, progress);  
    49.        
    50. // 这里使用了inner class  
    51.    return new RecordWriter<Text, Parse>() {  
    52.     
    53.     
    54.     // 实现writer方法,写出<key,value>到指定的输出源  
    55.        public void write(Text key, Parse parse)  
    56.          throws IOException {  
    57.              
    58.          String fromUrl = key.toString();  
    59.          String fromHost = null;   
    60.          String toHost = null;            
    61.       // 输出解析后的文本到parse_text目录  
    62.          textOut.append(key, new ParseText(parse.getText()));  
    63.              
    64.          ParseData parseparseData = parse.getData();  
    65.       // 这里抓取的网页内容是否有唯一的标记,如果有的话,用这个标记再生成一个CrawlDatum  
    66.       // 输出到crawl_parse目录去  
    67.          // recover the signature prepared by Fetcher or ParseSegment  
    68.          String sig = parseData.getContentMeta().get(Nutch.SIGNATURE_KEY);  
    69.          if (sig != null) {  
    70.            byte[] signature = StringUtil.fromHexString(sig);  
    71.            if (signature != null) {  
    72.              // append a CrawlDatum with a signature  
    73.              CrawlDatum d = new CrawlDatum(CrawlDatum.STATUS_SIGNATURE, 0);  
    74.              d.setSignature(signature);  
    75.              crawlOut.append(key, d); // 输出到crawl_parse目录中去  
    76.            }  
    77.          }  
    78.              
    79.        // see if the parse metadata contain things that we'd like  
    80.        // to pass to the metadata of the crawlDB entry  
    81.     // 查看解析的内容中是否包括设置的元数据信息,如果包含定义的元数据,  
    82.     // 那就新生成一个CrawlDatum,输出到crawl_parse目录  
    83.        CrawlDatum parseMDCrawlDatum = null;  
    84.        for (String mdname : parseMDtoCrawlDB) {  
    85.          String mdvalue = parse.getData().getParseMeta().get(mdname);  
    86.          if (mdvalue != null) {  
    87.            if (parseMDCrawlDatum == null) parseMDCrawlDatum = new CrawlDatum(  
    88.                CrawlDatum.STATUS_PARSE_META, 0);  
    89.            parseMDCrawlDatum.getMetaData().put(new Text(mdname),  
    90.                new Text(mdvalue));  
    91.          }  
    92.        }  
    93.     // 输出新生成的CrawlDatum  
    94.        if (parseMDCrawlDatum != null) crawlOut.append(key, parseMDCrawlDatum);  
    95.     
    96.     
    97.     // 这一块是处理页面的重定向的,如果当前url被重定向的了,并且这个重定向后的url没有被过滤  
    98.     // 那新生成一个CrawlDatum,输出到crawl_parse目录  
    99.          try {  
    100.            ParseStatus pstatus = parseData.getStatus();  
    101.            if (pstatus != null && pstatus.isSuccess() &&  
    102.                pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {  
    103.              String newUrl = pstatus.getMessage();  
    104.              int refreshTime = Integer.valueOf(pstatus.getArgs()[1]);  
    105.              try {  
    106.                newUrl = normalizers.normalize(newUrl,  
    107.                    URLNormalizers.SCOPE_FETCHER);  
    108.              } catch (MalformedURLException mfue) {  
    109.                newUrl = null;  
    110.              }  
    111.              if (newUrl != null) newUrl = filters.filter(newUrl);  
    112.              String url = key.toString();  
    113.              if (newUrl != null && !newUrl.equals(url)) {  
    114.                String reprUrl =  
    115.                  URLUtil.chooseRepr(url, newUrl,  
    116.                                     refreshTime < Fetcher.PERM_REFRESH_TIME);  
    117.                CrawlDatum newnewDatum = new CrawlDatum();  
    118.                newDatum.setStatus(CrawlDatum.STATUS_LINKED);  
    119.                if (reprUrl != null && !reprUrl.equals(newUrl)) {  
    120.                  newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,  
    121.                                             new Text(reprUrl));  
    122.                }  
    123.                crawlOut.append(new Text(newUrl), newDatum);  
    124.              }  
    125.            }  
    126.          } catch (URLFilterException e) {  
    127.            // ignore  
    128.          }  
    129.     
    130.     
    131.       // 这一块主要是处理外链接的  
    132.          // collect outlinks for subsequent db update  
    133.          Outlink[] links = parseData.getOutlinks();  
    134.       // 得到要存储的外链接数量  
    135.          int outlinksToStore = Math.min(maxOutlinks, links.length);  
    136.          if (ignoreExternalLinks) {  
    137.            try {  
    138.             // 得到当前urlhost  
    139.              fromHost = new URL(fromUrl).getHost().toLowerCase();  
    140.            } catch (MalformedURLException e) {  
    141.              fromHost = null;  
    142.            }  
    143.          } else {  
    144.            fromHost = null;  
    145.          }  
    146.     
    147.     
    148.     // 这一块主要是对链接进行过滤,规格化  
    149.          int validCount = 0;  
    150.          CrawlDatum adjust = null;  
    151.          List<Entry<Text, CrawlDatum>> targets = new ArrayList<Entry<Text, CrawlDatum>>(outlinksToStore);  
    152.          List<Outlink> outlinkList = new ArrayList<Outlink>(outlinksToStore);  
    153.          for (int i = 0; i < links.length && validCount < outlinksToStore; i++) {  
    154.            String toUrl = links[i].getToUrl();  
    155.            // ignore links to self (or anchors within the page)  
    156.            if (fromUrl.equals(toUrl)) {  
    157.              continue;  
    158.            }  
    159.            if (ignoreExternalLinks) {  
    160.              try {  
    161.                toHost = new URL(toUrl).getHost().toLowerCase();  
    162.              } catch (MalformedURLException e) {  
    163.                toHost = null;  
    164.              }  
    165.              if (toHost == null || !toHost.equals(fromHost)) { // external links  
    166.                continue; // skip it  
    167.              }  
    168.            }  
    169.            try {  
    170.              toUrl = normalizers.normalize(toUrl,URLNormalizers.SCOPE_OUTLINK); // normalize the url  
    171.              toUrl = filters.filter(toUrl);   // filter the url  
    172.              if (toUrl == null) {  
    173.                continue;  
    174.              }  
    175.            } catch (Exception e) {  
    176.              continue;  
    177.            }  
    178.     
    179.     
    180.         // 生成新的CrawlDatum,初始化其抓取间隔与分数  
    181.            CrawlDatum target = new CrawlDatum(CrawlDatum.STATUS_LINKED, interval);  
    182.            Text targetUrl = new Text(toUrl);  
    183.            try {  
    184.              scfilters.initialScore(targetUrl, target);  
    185.            } catch (ScoringFilterException e) {  
    186.              LOG.warn("Cannot filter init score for url " + key +  
    187.                       ", using default: " + e.getMessage());  
    188.              target.setScore(0.0f);  
    189.            }  
    190.                
    191.         //放入目标容器,用于后面计算每一个外链接的分数  
    192.            targets.add(new SimpleEntry(targetUrl, target));  
    193.            outlinkList.add(links[i]);  
    194.            validCount++;  
    195.          }  
    196.          try {  
    197.            // compute score contributions and adjustment to the original score  
    198.         // 计算每一个外链接的贡献值,用来调整原url的分数  
    199.            adjust = scfilters.distributeScoreToOutlinks((Text)key, parseData,   
    200.                      targets, null, links.length);  
    201.          } catch (ScoringFilterException e) {  
    202.            LOG.warn("Cannot distribute score from " + key + ": " + e.getMessage());  
    203.          }  
    204.       // 输出链接到crawl_parse目录中  
    205.          for (Entry<Text, CrawlDatum> target : targets) {  
    206.            crawlOut.append(target.getKey(), target.getValue());  
    207.          }  
    208.       // 看源url是否有调整,有的话就输出到crawl_parse目录中  
    209.          if (adjust != null) crawlOut.append(key, adjust);  
    210.     
    211.     
    212.         // 得到过滤后的外链接  
    213.          Outlink[] filteredLinks = outlinkList.toArray(new Outlink[outlinkList.size()]);  
    214.       // 生成新的ParseData对象  
    215.          parseData = new ParseData(parseData.getStatus(), parseData.getTitle(),   
    216.                                    filteredLinks, parseData.getContentMeta(),   
    217.                                    parseData.getParseMeta());  
    218.       // 写出到parse_data目录中  
    219.          dataOut.append(key, parseData);  
    220.       // 判断解析的数据是否来由当前原url,如果不是,那新生成一个CrawlDatum,输出到crawl_parse目录中  
    221.          if (!parse.isCanonical()) {  
    222.            CrawlDatum datum = new CrawlDatum();  
    223.            datum.setStatus(CrawlDatum.STATUS_FETCH_SUCCESS);  
    224.            String timeString = parse.getData().getContentMeta().get(Nutch.FETCH_TIME_KEY);  
    225.            try {  
    226.              datum.setFetchTime(Long.parseLong(timeString));  
    227.            } catch (Exception e) {  
    228.              LOG.warn("Can't read fetch time for: " + key);  
    229.              datum.setFetchTime(System.currentTimeMillis());  
    230.            }  
    231.            crawlOut.append(key, datum);  
    232.          }  
    233.        }  


    3.
    总结
    这里主要看了一下ParseSegment的实现流程和分析了一下其源代码,其中用到了OutputFormat的多路输出方法,这里还实现了对于源链接分数的调整算法,使用了插件中的一个叫scoring-opic的插件,叫OPICScoringFilter,全称叫Online Page Importance Computation

    作者:http://blog.csdn.net/amuseme_lu


    相关文章阅读及免费下载:

    Apache Nutch 1.3 学习笔记目录

    Apache Nutch 1.3 学习笔记一

    Apache Nutch 1.3 学习笔记二

    Apache Nutch 1.3 学习笔记三(Inject)

    Apache Nutch 1.3 学习笔记三(Inject CrawlDB Reader)

    Apache Nutch 1.3 学习笔记四(Generate)

    Apache Nutch 1.3 学习笔记四(SegmentReader分析)

    Apache Nutch 1.3 学习笔记五(FetchThread)

    Apache Nutch 1.3 学习笔记五(Fetcher流程)

    Apache Nutch 1.3 学习笔记六(ParseSegment)

    Apache Nutch 1.3 学习笔记七(CrawlDb - updatedb)

    Apache Nutch 1.3 学习笔记八(LinkDb)

    Apache Nutch 1.3 学习笔记九(SolrIndexer)

    Apache Nutch 1.3 学习笔记十(Ntuch 插件机制简单介绍)

    Apache Nutch 1.3 学习笔记十(插件扩展)

    Apache Nutch 1.3 学习笔记十(插件机制分析)

    Apache Nutch 1.3 学习笔记十一(页面评分机制 OPIC)

    Apache Nutch 1.3 学习笔记十一(页面评分机制 LinkRank 介绍)

    Apache Nutch 1.3 学习笔记十二(Nutch 2.0 的主要变化)

    更多《Apache Nutch文档》,尽在开卷有益360 http://www.docin.com/book_360

  • 相关阅读:
    xx
    office 2016 下载链接
    Revit 2019 下载链接
    AE cc 2019 下载链接
    Premiere Pro cc 2019 下载链接
    Photoshop cc 2019 下载链接
    百度云单机版、软件包及教程
    Visual Studio 2017 软件包及教程
    归并排序:逆序对问题
    归并排序:小和问题
  • 原文地址:https://www.cnblogs.com/ibook360/p/2222171.html
Copyright © 2011-2022 走看看