zoukankan      html  css  js  c++  java
  • 【Nutch2.2.1源代码分析之5】索引的基本流程


    一、各个主要类之间的关系
    SolrIndexerJob extends IndexerJob
    1、IndexerJob:主要完成
    2、SolrIndexerJob:主要完成
    3、IndexUtil:主要只有一个方法public NutchDocument index(String key, WebPage page),用于根据网页信息,返回一个solr的Document对象。

    二、程序调用流程

    查看Nutch中的执行脚本--nutch,得到以下信息:
    elif [ "$COMMAND" = "solrindex" ] ; then
    CLASS=org.apache.nutch.indexer.solr.SolrIndexerJob

    因此程序入口位于SolrIndexerJob类中。

    (一)org.apache.nutch.indexer.SolrIndexerJob
    1、程序入口
      public static void main(String[] args) throws Exception {
        final int res = ToolRunner.run(NutchConfiguration.create(),
            new SolrIndexerJob(), args);
        System.exit(res);
      }
    使用了ToolRunner.run()来执行程序,可参考:使用ToolRunner运行Hadoop程序基本原理分析
    其中第一个参数主要是加载了nutch相关的参数,主要包括hadoop的core-default.xml、core-site.xml以及nutch的nutch-default.xml、nutch-site.xml。
    第二个参数指明了运行SolrIndexerJob的run(String[])方法.

    2、执行SolrIndexerJob类的run(String[])方法
      
      public int run(String[] args) throws Exception {
        if (args.length < 2) {
          System.err.println("Usage: SolrIndexerJob <solr url> (<batchId> | -all | -reindex) [-crawlId <id>]");
          return -1;
        }
    
        if (args.length == 4 && "-crawlId".equals(args[2])) {
          getConf().set(Nutch.CRAWL_ID_KEY, args[3]);
        }
        try {
          indexSolr(args[0], args[1]);
          return 0;
        } catch (final Exception e) {
          LOG.error("SolrIndexerJob: " + StringUtils.stringifyException(e));
          return -1;
        }
      }
    先判断参数的合理性,然后执行执行indexSolr(String,String)方法。

    3、执行indexSolr(String,String)方法
      
    public void indexSolr(String solrUrl, String batchId) throws Exception {
        LOG.info("SolrIndexerJob: starting");
    
        run(ToolUtil.toArgMap(
            Nutch.ARG_SOLR, solrUrl,
            Nutch.ARG_BATCH, batchId));
        // do the commits once and for all the reducers in one go
        getConf().set(SolrConstants.SERVER_URL,solrUrl);
        SolrServer solr = SolrUtils.getCommonsHttpSolrServer(getConf());
        if (getConf().getBoolean(SolrConstants.COMMIT_INDEX, true)) {
          solr.commit();
        }
        LOG.info("SolrIndexerJob: done.");
      }

    4、执行run(Map<...>)方法 
    @Override
      public Map<String,Object> run(Map<String,Object> args) throws Exception {
        String solrUrl = (String)args.get(Nutch.ARG_SOLR);
        String batchId = (String)args.get(Nutch.ARG_BATCH);
        NutchIndexWriterFactory.addClassToConf(getConf(), SolrWriter.class);
        getConf().set(SolrConstants.SERVER_URL, solrUrl);
    
        currentJob = createIndexJob(getConf(), "solr-index", batchId);
    
        currentJob.waitForCompletion(true);
        ToolUtil.recordJobStatus(null, currentJob, results);
        return results;
      }


    (二)org.apache.nutch.indexer.IndexerJob
    1、执行createIndexJob()方法。
      protected Job createIndexJob(Configuration conf, String jobName, String batchId)
      throws IOException, ClassNotFoundException {
        conf.set(GeneratorJob.BATCH_ID, batchId);
        Job job = new NutchJob(conf, jobName);
        // TODO: Figure out why this needs to be here
        job.getConfiguration().setClass("mapred.output.key.comparator.class",
            StringComparator.class, RawComparator.class);
    
        Collection<WebPage.Field> fields = getFields(job);
        StorageUtils.initMapperJob(job, fields, String.class, NutchDocument.class,
            IndexerMapper.class);
        job.setNumReduceTasks(0);
        job.setOutputFormatClass(IndexerOutputFormat.class);
        return job;
      }
    }

    2、执行map相关的方法,包括setup(),map(),cleanup()
      public static class IndexerMapper
          extends GoraMapper<String, WebPage, String, NutchDocument> {
        public IndexUtil indexUtil;
        public DataStore<String, WebPage> store;
       
        protected Utf8 batchId;
    
        @Override
        public void setup(Context context) throws IOException {
          Configuration conf = context.getConfiguration();
          batchId = new Utf8(conf.get(GeneratorJob.BATCH_ID, Nutch.ALL_BATCH_ID_STR));
          indexUtil = new IndexUtil(conf);
          try {
            store = StorageUtils.createWebStore(conf, String.class, WebPage.class);
          } catch (ClassNotFoundException e) {
            throw new IOException(e);
          }
        }
       
        protected void cleanup(Context context) throws IOException ,InterruptedException {
          store.close();
        };
    
        @Override
        public void map(String key, WebPage page, Context context)
        throws IOException, InterruptedException {
          ParseStatus pstatus = page.getParseStatus();
          if (pstatus == null || !ParseStatusUtils.isSuccess(pstatus)
              || pstatus.getMinorCode() == ParseStatusCodes.SUCCESS_REDIRECT) {
            return; // filter urls not parsed
          }
    
          Utf8 mark = Mark.UPDATEDB_MARK.checkMark(page);
          if (!batchId.equals(REINDEX)) {
            if (!NutchJob.shouldProcess(mark, batchId)) {
              if (LOG.isDebugEnabled()) {
                LOG.debug("Skipping " + TableUtil.unreverseUrl(key) + "; different batch id (" + mark + ")");
              }
              return;
            }
          }
         
          NutchDocument doc = indexUtil.index(key, page);
          if (doc == null) {
            return;
          }
          if (mark != null) {
            Mark.INDEX_MARK.putMark(page, Mark.UPDATEDB_MARK.checkMark(page));
            store.put(key, page);
          }
          context.write(key, doc);
        }
      }


    3、调用context.write()
    由于  job.setOutputFormatClass(IndexerOutputFormat.class);
     所以写入index??


    (三)public class IndexUtil 
    1、调用index()方法
      public NutchDocument index(String key, WebPage page) {
        NutchDocument doc = new NutchDocument();
        doc.add("id", key);
        doc.add("digest", StringUtil.toHexString(page.getSignature()));
        if (page.getBatchId() != null) {
          doc.add("batchId", page.getBatchId().toString());
        }
       
        String url = TableUtil.unreverseUrl(key);
    
        if (LOG.isDebugEnabled()) {
          LOG.debug("Indexing URL: " + url);
        }
    
        try {
          doc = filters.filter(doc, url, page);
        } catch (IndexingException e) {
          LOG.warn("Error indexing "+key+": "+e);
          return null;
        }
    
        // skip documents discarded by indexing filters
        if (doc == null) return null;
    
        float boost = 1.0f;
        // run scoring filters
        try {
          boost = scoringFilters.indexerScore(url, doc, page, boost);
        } catch (final ScoringFilterException e) {
          LOG.warn("Error calculating score " + key + ": " + e);
          return null;
        }
    
        doc.setScore(boost);
        // store boost for use by explain and dedup
        doc.add("boost", Float.toString(boost));
    
        return doc;
      }


    三、plugin中的字段索引
    1、关于basic字段的索引在public class BasicIndexingFilter implements IndexingFilter 中

  • 相关阅读:
    javascript事件流讲解和实例应用
    Javascripts事件基础和事件绑定
    javascript-节点属性详解
    js数据类型检测的四种方式
    原生JS的window.onload与Jquery的$(document).ready(function() {}),$(function () {})有什么不同?
    Js字符串方法大全
    什么是原型链?
    new操作符具体干了什么呢
    document.write和innerHTML的区别
    一个页面从输入URL到页面加载显示完成,这个过程中都发生了什么?
  • 原文地址:https://www.cnblogs.com/jediael/p/4304061.html
Copyright © 2011-2022 走看看