zoukankan      html  css  js  c++  java
  • nutch2.3.1源码分析——InjectorJob

    InjectorJob实现的功能是:从种子站点文件当中读取站点信息并且将这些站点的个数、url(url以 域名:协议/端口号/路径名 设为形式存储在数据库当中,为了提高读写速度)回写到Context类的实例context当中。

    InjectorJob类的运行流程如下:

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(NutchConfiguration.create(), new InjectorJob(),
            args);
        System.exit(res);
      }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    主函数,提供函数的入口,主要功能是创建一个ToolRunner类,先去加载Nutch的配置文件,配置文件默认情况下加载nutch-default.xml和nutch-site.xml两个文件,接收命令行输入的参数args并创建一个InjectorJob类运行。

    接下来,程序开始检查输入的参数是否合法等一系列操作:

    public int run(String[] args) throws Exception {
        if (args.length < 1) {
          System.err.println("Usage: InjectorJob <url_dir> [-crawlId <id>]");
          return -1;
        }
        for (int i = 1; i < args.length; i++) {
          if ("-crawlId".equals(args[i])) {
            getConf().set(Nutch.CRAWL_ID_KEY, args[i + 1]);//??什么功能?
            i++;
          } else {
            System.err.println("Unrecognized arg " + args[i]);
            return -1;
          }
        }
    
        try {
          inject(new Path(args[0]));
          return -0;
        } catch (Exception e) {
          LOG.error("InjectorJob: " + StringUtils.stringifyException(e));
          return -1;
        }
      }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    如果没有输入参数,程序将提醒输入参数的正确方法,如果参数输入成功,则跳转到inject(new Path(args[0]))函数进行下一步的操作。

     public void inject(Path urlDir) throws Exception {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        long start = System.currentTimeMillis();
        LOG.info("InjectorJob: starting at " + sdf.format(start));
        LOG.info("InjectorJob: Injecting urlDir: " + urlDir);
        run(ToolUtil.toArgMap(Nutch.ARG_SEEDDIR, urlDir));
        long end = System.currentTimeMillis();
        LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: "
            + TimingUtil.elapsedTime(start, end));
      }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    该函数在打印了基本的日志信息之后跳转到run(ToolUtil.toArgMap(Nutch.ARG_SEEDDIR, urlDir));函数执行inject最核心的功能。

    public Map<String, Object> run(Map<String, Object> args) throws Exception {
        getConf().setLong("injector.current.time", System.currentTimeMillis());         //setLong(String name,long value). set the value of the name property to a long即将name的属性设置成为Long型的。
        Path input;
        Object path = args.get(Nutch.ARG_SEEDDIR);
        if (path instanceof Path) {
          input = (Path) path;
        } else {
          input = new Path(path.toString());
        }
        numJobs = 1;
        currentJobNum = 0;
        currentJob = NutchJob.getInstance(getConf(), "inject " + input);
        FileInputFormat.addInputPath(currentJob, input);//add a path to the list of inputs for the map-reduce job(addInputPath函数的作用)
        /**
         * public void Job.setMapperClass(CLass<? extends Mapper> cls) throws IllegalStateException.
         * 作用:set the Mapper for the job
         */
        currentJob.setMapperClass(UrlMapper.class);
    
        //set the key class for the map output data.This allows the user to specify the map output key class to be different than the final output value。为map流程的输出键值对设置相应的类型
        currentJob.setMapOutputKeyClass(String.class);
    
        //set the value class for the map output data
        currentJob.setMapOutputValueClass(WebPage.class);
    
        //为该Job设置输出格式,采用Gora格式进行存储
        currentJob.setOutputFormatClass(GoraOutputFormat.class);
    
        DataStore<String, WebPage> store = StorageUtils.createWebStore(
            currentJob.getConfiguration(), String.class, WebPage.class);
        GoraOutputFormat.setOutput(currentJob, store, true);
    
        // NUTCH-1471 Make explicit which datastore class we use
        Class<? extends DataStore<Object, Persistent>> dataStoreClass = StorageUtils
            .getDataStoreClass(currentJob.getConfiguration());
        LOG.info("InjectorJob: Using " + dataStoreClass
            + " as the Gora storage class.");
    
        //set reducer for the job
        currentJob.setReducerClass(Reducer.class);
    
        //set the number of reduce tasks
        currentJob.setNumReduceTasks(0);
    
        currentJob.waitForCompletion(true);//通过调试发现,执行这一句的时候调用了内部类UrlMapper类的map函数
        ToolUtil.recordJobStatus(null, currentJob, results);
    
        // NUTCH-1370 Make explicit #URLs injected @runtime
        long urlsInjected = currentJob.getCounters()
            .findCounter("injector", "urls_injected").getValue();
        long urlsFiltered = currentJob.getCounters()
            .findCounter("injector", "urls_filtered").getValue();
        LOG.info("InjectorJob: total number of urls rejected by filters: "
            + urlsFiltered);
        LOG.info("InjectorJob: total number of urls injected after normalization and filtering: "
            + urlsInjected);
    
        return results;
      }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    在执行currentJob.waitForCompletion(true);这条语句时程序调用UrlMapper内部类执行setup和map函数。
    UrlMapper类实现对网页的一些基本信息的控制,包括url标准化urlNormalizers,fetch的时间间隔,网页的注入分数,网页过滤filters,分数赋值scfilters,当前时间等。
    其中setup(Context context)函数用来对该类的基本数据成员进行赋值,相当于是该类的构造函数;
    map(LongWritable key,Text value,Context context)函数主要有以下功能
    1.获取value当中的url,以一行为一个url,若其长度为0或者以“#”开头,则直接返回;
    2.将url中的metaname和metavalue值以Map的形式存储在matadata当中,metaname包括两种形式即nutchScoreMDName和nutchFetchIntervalMDName;
    3.标准化和过滤url,并给这些新注入的url赋予一定的初始分数,在赋予初始分数的过程当中,调用了org.apache.nutch.scoring包中的ScoringFilters类,这个类为了注入分数又调用了ScoringFilter接口,最后又根据用户想要使用那种方式去注入分数调用opic或tld等插件。如果想更改分数注入方式,则可以通过修改conf文件夹下面的nutch-default.xml文件中的plugin.includes的value值来实现; 4.记录注入网页的本次fetch的时间和其正常的两次fetch之间的时间间隔。

    UrlMapper类的源码如下所示:

    public static class UrlMapper extends
          Mapper<LongWritable, Text, String, WebPage> { 
    private URLNormalizers urlNormalizers;//url标准化
    private int interval;//fetch的时间间隔默认30天
    private float scoreInjected;
    private URLFilters filters;//过滤url
    private ScoringFilters scfilters;
    private long curTime;//当前时间
    @Override protected void setup(Context context) throws IOException, InterruptedException {
    urlNormalizers = new URLNormalizers(context.getConfiguration(),
    URLNormalizers.SCOPE_INJECT);
    interval = context.getConfiguration().getInt("db.fetch.interval.default",
    2592000); filters = new URLFilters(context.getConfiguration());
    scfilters = new ScoringFilters(context.getConfiguration());
    scoreInjected = context.getConfiguration().getFloat("db.score.injected",
    1.0f); curTime = context.getConfiguration().getLong("injector.current.time",
    System.currentTimeMillis());
    }
    protected void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
    String url = value.toString().trim(); // value is line of text。String的trim()函数用以返回字符串的副本,忽略前导空白和尾部空白
    System.out.println("输入的种子站点为:"+url);
    //若url不为空 且 url的长度不为0或url以“#”号开始,则直接返回???
    if (url != null && (url.length() == 0 || url.startsWith("#"))) {
    /* Ignore line that start with # */
    return;
    }
    // if tabs : metadata that could be stored
    // must be name=value and separated by
    float customScore = -1f;
    int customInterval = interval; Map<String, String> metadata = new TreeMap<String, String>(); if (url.indexOf(" ") != -1) {
    String[] splits = url.split(" ");
    url = splits[0];
    for (int s = 1; s < splits.length; s++) {
    // find separation between name and value
    int indexEquals = splits[s].indexOf("=");
    if (indexEquals == -1) {
    // skip anything without a =System.out.println(filters.getClass().getName());
    continue; } String metaname = splits[s].substring(0, indexEquals);
    String metavalue = splits[s].substring(indexEquals + 1);
    //System.out.println("metaname:" + metaname +" metavalue:"+metavalue);
    if (metaname.equals(nutchScoreMDName)) {
    try {
    customScore = Float.parseFloat(metavalue);
    } catch (NumberFormatException nfe) {
    } } else if (metaname.equals(nutchFetchIntervalMDName)) {
    try {
    customInterval = Integer.parseInt(metavalue);
    } catch (NumberFormatException nfe) {
    }
    } else
    metadata.put(metaname, metavalue);
    }
    }
    try {
    url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);
    url = filters.filter(url); // filter the url
    } catch (Exception e) {
    LOG.warn("Skipping " + url + ":" + e); url = null;
    } if (url == null) {
    context.getCounter("injector", "urls_filtered").increment(1); return;
    } else { // if it passes
    String reversedUrl = TableUtil.reverseUrl(url); // collect it
    WebPage row = WebPage.newBuilder().build();
    row.setFetchTime(curTime);
    row.setFetchInterval(customInterval);
    // now add the metadata
    Iterator<String> keysIter = metadata.keySet().iterator();
    while (keysIter.hasNext()) {
    String keymd = keysIter.next();
    String valuemd = metadata.get(keymd);
    row.getMetadata().put(new Utf8(keymd),
    ByteBuffer.wrap(valuemd.getBytes()));
    }
    //System.out.println("customScore:"+customScore);
    if (customScore != -1){
    //System.out.println("customScore:"+customScore);
    row.setScore(customScore);
    }
    else
    row.setScore(scoreInjected);
    //System.out.println("scoreInjected:" + scoreInjected);
    try {
    scfilters.injectedScore(url, row);
    //System.out.println("网页内容为"+row.getContent()+"的分数值是:" + row.getScore());
    } catch (ScoringFilterException e) {
    if (LOG.isWarnEnabled()) {
    LOG.warn("Cannot filter injected score for url " + url + ", using default (" + e.getMessage() + ")");
    }
    }
    context.getCounter("injector", "urls_injected").increment(1); row.getMarkers()
    .put(DbUpdaterJob.DISTANCE, new Utf8(String.valueOf(0)));
    Mark.INJECT_MARK.putMark(row, YES_STRING);
    context.write(reversedUrl, row);
    }
    }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124

    在UrlMapper类的map函数中,传入的参数有一个键值对,key和对应的value,还有一个Context context参数,符合Haddoop的map/reduce工作模式,map函数实现完上述功能之后,将注入的网页数目和处理之后的url回写到context当中。

    接下来程序回到public Map

  • 相关阅读:
    简单理解jQuery中$.getJSON、$.get、$.post、$.ajax用法
    适配器模式(Adapter Pattern)
    什么样的登录框才算是优秀的?
    transient的作用及序列化
    MySQL索引实现原理
    concurrentHashMap原理分析和总结(JDK1.8)
    HashMap实现原理(JDK1.8)
    深入理解Java中的IO
    多线程系列
    多线程系列
  • 原文地址:https://www.cnblogs.com/jpfss/p/7886363.html
Copyright © 2011-2022 走看看