zoukankan      html  css  js  c++  java
  • Nutch源码阅读进程4

    前面依次看了nutch的准备工作inject和generate部分,抓取的fetch部分的代码,趁热打铁,我们下面来一睹parse即页面解析部分的代码,这块代码主要是集中在ParseSegment类里面,Let‘s go~~~
    上期回顾:上回主要讲的是nutch的fetch部分的功能代码实现,主要是先将segments目录下的指定文件夹作为输入,读取里面将要爬取的url信息存入爬取队列,再根据用户输入的爬取的线程个数thread决定消费者的个数,线程安全地取出爬取队列里的url,然后在执行爬取页面,解析页面源码得出url等操作,最终在segments目录下生成content和crawl_fetch三个文件夹,下面来瞧瞧nutch的parse是个怎么回事……
    1.parse部分的入口从代码 parseSegment.parse(segs[0]);开始,进入到ParseSegment类下的parse方法后,首先设置一个当前时间(方便后面比较结束时间之差来得到整个parse所需的时间)。然后就是一个mapreduce过程,初始化了一个job,具体代码如下:
    JobConf job = new NutchJob(getConf());
    job.setJobName("parse " + segment);

    FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));
    job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());
    job.setInputFormat(SequenceFileInputFormat.class);
    job.setMapperClass(ParseSegment.class);
    job.setReducerClass(ParseSegment.class);

    FileOutputFormat.setOutputPath(job, segment);
    job.setOutputFormat(ParseOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(ParseImpl.class);

    JobClient.runJob(job);
    可以看出设置的输入为segment文件夹下的文件,输出也是segment文件夹,当然变化的是segment下生成了新的文件夹,提交的mapper和reducer都是parsesegment类。
    2.下面就来分别看看ParseSegment类的map和reducer方法。map()方法中首先是一些判定的代码,该函数的主要功能还是集中在以下代码中:
    ParseResult parseResult = null;
    try {
    parseResult = new ParseUtil(getConf()).parse(content);
    } catch (Exception e) {
    LOG.warn("Error parsing: " + key + ": " + StringUtils.stringifyException(e));
    return;
    }

    for (Entry<Text, Parse> entry : parseResult) {
    Text url = entry.getKey();//http://www.ahu.edu.cn/
    Parse parse = entry.getValue();
    ParseStatus parseStatus = parse.getData().getStatus();//success(1,0)
    long start = System.currentTimeMillis();

    reporter.incrCounter("ParserStatus", ParseStatus.majorCodes[parseStatus.getMajorCode()], 1);

    if (!parseStatus.isSuccess()) {
    LOG.warn("Error parsing: " + key + ": " + parseStatus);
    parse = parseStatus.getEmptyParse(getConf());
    }

    // pass segment name to parse data
    parse.getData().getContentMeta().set(Nutch.SEGMENT_NAME_KEY, 
    getConf().get(Nutch.SEGMENT_NAME_KEY));

    // compute the new signature
    byte[] signature = 
    SignatureFactory.getSignature(getConf()).calculate(content, parse); 
    parse.getData().getContentMeta().set(Nutch.SIGNATURE_KEY, 
    StringUtil.toHexString(signature));

    try {
    scfilters.passScoreAfterParsing(url, content, parse);
    } catch (ScoringFilterException e) {
    if (LOG.isWarnEnabled()) {
    LOG.warn("Error passing score: "+ url +": "+e.getMessage());
    }
    }
    long end = System.currentTimeMillis();
    LOG.info("Parsed (" + Long.toString(end - start) + "ms):" + url);

    output.collect(url, new ParseImpl(new ParseText(parse.getText()), 
    parse.getData(), parse.isCanonical()));
    }
    其中parseResult 是通过new ParseUtil(getConf()).parse(content);产生的,进入ParseUtil我们可以看出该函数全貌如下:
    public ParseUtil(Configuration conf) {
    this.parserFactory = new ParserFactory(conf);
    MAX_PARSE_TIME=conf.getInt("parser.timeout", 30);
    }
    而ParserFactory就是调用一个插件来解决页面解析这部分问题的,ParseFactory的代码如下:
    public ParserFactory(Configuration conf) {
    this.conf = conf;
    ObjectCache objectCache = ObjectCache.get(conf);
    this.extensionPoint = PluginRepository.get(conf).getExtensionPoint(
    Parser.X_POINT_ID);
    this.parsePluginList = (ParsePluginList)objectCache.getObject(ParsePluginList.class.getName());
    if (this.parsePluginList == null) {
    this.parsePluginList = new ParsePluginsReader().parse(conf);
    objectCache.setObject(ParsePluginList.class.getName(), this.parsePluginList);
    }

    if (this.extensionPoint == null) {
    throw new RuntimeException("x point " + Parser.X_POINT_ID + " not found.");
    }
    if (this.parsePluginList == null) {
    throw new RuntimeException(
    "Parse Plugins preferences could not be loaded.");
    }
    }
    当然了,如何调用插件来解决这个问题作者还不是很清楚,但是隐约从代码中已经看到了PluginRepository(插件仓库)、extensionPoint (扩展点)这样的名词了。
    让我们再回到map方法,通过调试我们可以看到ParseResult包含了以下信息:
    Version: -1
    url: http://www.ahu.edu.cn/
    base: http://www.ahu.edu.cn/
    contentType: application/xhtml+xml
    metadata: Date=Sat, 02 Aug 2014 13:46:36 GMT nutch.crawl.score=1.0 _fst_=33 nutch.segment.name=20140802214742 Content-Type=text/html Connection=close Accept-Ranges=bytes Server=Apache/2.2.8 (Unix) mod_ssl/2.2.8 OpenSSL/0.9.8e-fips-rhel5 DAV/2 Resin/3.0.25 
    Content:
    <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
    <html xmlns="http://www.w3.org/1999/xhtml">
    <head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />……
    随后再通过一个for循环,遍历出其中的解析的详细内容,我们可以看到 Text url = entry.getKey();就是得到当前要解析的url,紧接着执行Parse parse = entry.getValue();其中的Text属性就是解析后的网页的主体信息即过滤了一些网页标签后的结果。剩下的代码主要实现将解析的内容collect出去。
    3.执行完map方法后就是reduce,reducer的代码很简洁就一行: output.collect(key, (Writable)values.next()); // collect first value,自带的注解“collect first value”大概的意思就是map中每次只针对某一个url进行处理,所以收集到的解析的<text,parse>也就是唯一一个,自己的拙见啦~~~至此整个parse的过程就执行完毕了。
    4.关于segment文件夹下的crawl_parse,parse_data,parse_text三个文件夹是如何生成的,我们可以看看上面job的输出ParseOutputFormat类。进入该类的主体方法getRecordWriter(),首先是一些初始化和变量的赋值,比如url过滤器、url规格化对象的生成,时间间隔、解析的上限等变量的赋值。然后通过以下三行代码定义输出目录:
    Path text = new Path(new Path(out, ParseText.DIR_NAME), name);  // parse_text
    Path data = new Path(new Path(out, ParseData.DIR_NAME), name);//parse_data     Path crawl = new Path(new Path(out, CrawlDatum.PARSE_DIR_NAME), name);//crawl_parse
    然后再通过以下三个方法生成这三个目录
    final MapFile.Writer textOut =
    new MapFile.Writer(job, fs, text.toString(), Text.class, ParseText.class,
    CompressionType.RECORD, progress);
    final MapFile.Writer dataOut =
    new MapFile.Writer(job, fs, data.toString(), Text.class, ParseData.class,
    compType, progress);
    final SequenceFile.Writer crawlOut =
    SequenceFile.createWriter(fs, job, crawl, Text.class, CrawlDatum.class,
    compType, progress);
    以上就是对于parse过程的一个简单解析,相比前面的三个流程来说,parse模块的实现逻辑相对简单。。。
    (备注:涉及到ParseOutputFormat部分还有一些东西没有搞懂,下面的参考博文给了详细的解释,有兴趣可以拜读下)
    参考博文:http://blog.csdn.net/amuseme_lu/article/details/6727516

    友情赞助

    如果你觉得博主的文章对你那么一点小帮助,恰巧你又有想打赏博主的小冲动,那么事不宜迟,赶紧扫一扫,小额地赞助下,攒个奶粉钱,也是让博主有动力继续努力,写出更好的文章^^。

        1. 支付宝                          2. 微信

                          

  • 相关阅读:
    mysqldump 逻辑备份和物理备份
    mysql备份
    MySQL日志
    mysql 查询缓存
    Error: xz compression not available的解决办法
    rsync + crontab + expect 快速搭建同步
    同步文件,不需要密码
    logstash 值得收藏的
    MySQL5.7主从,单slave多master
    微信代扣签名
  • 原文地址:https://www.cnblogs.com/yujihaia/p/7468310.html
Copyright © 2011-2022 走看看